2016-04-01 2 views
4

Я программирую клиента для работы с kafka 0.9. Я хочу знать, как создать тему. Этот ответ: How to create a Topic in Kafka through Java похож на то, что я прошу. Кроме того, это решение работает только для Kafka 0.8.2, которое сильно отличается от API Kafka 0.9.Создание темы для Apache Kafka 0.9 Использование Java

+0

Другими словами, вы пробовали аналогичное решение, и оно не работает, не так ли? Пожалуйста, опишите, что вы пробовали, и проблемы, с которыми вы столкнулись. –

+0

Я работаю в компании. Он запускает Kafka 0.8.2. Я реализовал его на основе ссылки. Теперь компания хочет перейти на Kafka 0.9. Мне нужно быстрое решение для обновления моего кода до 0,9. –

ответ

8

После просмотра scala api и различных ссылок в Интернете.

Это решение, которое я нашел:

зависимостями Maven:

<dependency> 
    <groupId>org.apache.kafka</groupId> 
    <artifactId>kafka_2.11</artifactId> 
    <version>0.9.0.0</version> 
</dependency> 
<dependency> 
    <groupId>com.101tec</groupId> 
    <artifactId>zkclient</artifactId> 
    <version>0.7</version> 
</dependency> 

Код:

import java.util.Properties; 

import org.I0Itec.zkclient.ZkClient; 
import org.I0Itec.zkclient.ZkConnection; 

import kafka.admin.AdminUtils; 
import kafka.utils.ZKStringSerializer$; 
import kafka.utils.ZkUtils; 

public class KafkaJavaExample { 

    public static void main(String[] args) { 
     String zookeeperConnect = "zkserver1:2181,zkserver2:2181"; 
     int sessionTimeoutMs = 10 * 1000; 
     int connectionTimeoutMs = 8 * 1000; 

     ZkClient zkClient = new ZkClient(
      zookeeperConnect, 
      sessionTimeoutMs, 
      connectionTimeoutMs, 
      ZKStringSerializer$.MODULE$); 

     // Security for Kafka was added in Kafka 0.9.0.0 
     boolean isSecureKafkaCluster = false; 
     // ZkUtils for Kafka was used in Kafka 0.9.0.0 for the AdminUtils API 
     ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperConnect), isSecureKafkaCluster); 

     String topic = "my-topic"; 
     int partitions = 2; 
     int replication = 3; 

     // Add topic configuration here 
     Properties topicConfig = new Properties(); 

     AdminUtils.createTopic(zkUtils, topic, partitions, replication, topicConfig); 
     zkClient.close(); 
    } 
} 

Если вы задаетесь вопросом, почему следующий код не похож на Java:

ZKStringSerializer$.MODULE$ 

Это потому, что ZkStringSerializer является объектом Scala. Вы можете прочитать более подробную информацию о том, что здесь:

How create Kafka ZKStringSerializer in Java?

Примечание: Вы должны инициализировать ZkClient с ZKStringSerializer.
Если вы этого не сделаете, то createTopic() будет работать только (Другими словами: он вернется без ошибок).
Тема будет существовать только в Zookeeper и работает только при выборе тем. т.е. команда список ниже работает отлично

bin/kafka-topics.sh --list --zookeeper localhost:2181 

но сам Кафка не создает тему. Чтобы проиллюстрировать, описанная ниже команда выдает ошибку.

bin/kafka-topics.sh --describe --zookeeper localhost:2181 

Поэтому убедитесь, что вы инициализируете его с помощью ZKStringSerializer $ .MODULE $.

Ссылки: How Can we create a topic in Kafka from the IDE using API из-за-язя, использующих API-

Вскоре Чи Loong, Университет Торонто

+1

Исходная ссылка - [http://stackoverflow.com/questions/16946778/how-can-we-create-a-topic-in-kafka-from-the-ide-using-api](http:// stackoverflow.com/questions/16946778/how-can-we-create-a-topic-in-kafka-from-the-ide-using-api) не http://www.askdaima.com/question/0e3b996eda49e3e4 –

+0

@ JayaAnanthram Исправлено, спасибо. Я не знал об исходной ссылке. –

8

Я попытался следующий Soon ответ Chee Loong с Кафкой 0.9.0.1, но должен был сделайте одно изменение. ZKStringSerializer теперь закрыт. Для создания ZkUtils я использовал следующий API (он создает ZkClient внутренне):

ZkUtils.apply(
    "zookeeper1:port1,zookeeper2:port2", 
    sessionTimeoutMs, 
    connectionTimeoutMs, 
    false) 
+0

Спасибо за это, он также работал с Kafka 0.10.0.1 – Prashant

Смежные вопросы