Я программирую клиента для работы с kafka 0.9. Я хочу знать, как создать тему. Этот ответ: How to create a Topic in Kafka through Java похож на то, что я прошу. Кроме того, это решение работает только для Kafka 0.8.2, которое сильно отличается от API Kafka 0.9.Создание темы для Apache Kafka 0.9 Использование Java
ответ
После просмотра scala api и различных ссылок в Интернете.
Это решение, которое я нашел:
зависимостями Maven:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.9.0.0</version>
</dependency>
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.7</version>
</dependency>
Код:
import java.util.Properties;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import kafka.admin.AdminUtils;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
public class KafkaJavaExample {
public static void main(String[] args) {
String zookeeperConnect = "zkserver1:2181,zkserver2:2181";
int sessionTimeoutMs = 10 * 1000;
int connectionTimeoutMs = 8 * 1000;
ZkClient zkClient = new ZkClient(
zookeeperConnect,
sessionTimeoutMs,
connectionTimeoutMs,
ZKStringSerializer$.MODULE$);
// Security for Kafka was added in Kafka 0.9.0.0
boolean isSecureKafkaCluster = false;
// ZkUtils for Kafka was used in Kafka 0.9.0.0 for the AdminUtils API
ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperConnect), isSecureKafkaCluster);
String topic = "my-topic";
int partitions = 2;
int replication = 3;
// Add topic configuration here
Properties topicConfig = new Properties();
AdminUtils.createTopic(zkUtils, topic, partitions, replication, topicConfig);
zkClient.close();
}
}
Если вы задаетесь вопросом, почему следующий код не похож на Java:
ZKStringSerializer$.MODULE$
Это потому, что ZkStringSerializer является объектом Scala. Вы можете прочитать более подробную информацию о том, что здесь:
How create Kafka ZKStringSerializer in Java?
Примечание: Вы должны инициализировать ZkClient с ZKStringSerializer.
Если вы этого не сделаете, то createTopic() будет работать только (Другими словами: он вернется без ошибок).
Тема будет существовать только в Zookeeper и работает только при выборе тем. т.е. команда список ниже работает отлично
bin/kafka-topics.sh --list --zookeeper localhost:2181
но сам Кафка не создает тему. Чтобы проиллюстрировать, описанная ниже команда выдает ошибку.
bin/kafka-topics.sh --describe --zookeeper localhost:2181
Поэтому убедитесь, что вы инициализируете его с помощью ZKStringSerializer $ .MODULE $.
Ссылки: How Can we create a topic in Kafka from the IDE using API из-за-язя, использующих API-
Вскоре Чи Loong, Университет Торонто
Исходная ссылка - [http://stackoverflow.com/questions/16946778/how-can-we-create-a-topic-in-kafka-from-the-ide-using-api](http:// stackoverflow.com/questions/16946778/how-can-we-create-a-topic-in-kafka-from-the-ide-using-api) не http://www.askdaima.com/question/0e3b996eda49e3e4 –
@ JayaAnanthram Исправлено, спасибо. Я не знал об исходной ссылке. –
Я попытался следующий Soon ответ Chee Loong с Кафкой 0.9.0.1, но должен был сделайте одно изменение. ZKStringSerializer теперь закрыт. Для создания ZkUtils я использовал следующий API (он создает ZkClient внутренне):
ZkUtils.apply(
"zookeeper1:port1,zookeeper2:port2",
sessionTimeoutMs,
connectionTimeoutMs,
false)
Спасибо за это, он также работал с Kafka 0.10.0.1 – Prashant
- 1. Kafka Consumer - Java (0.9 API)
- 2. Apache Kafka 0.9 Java API Использование всех сообщений с начала темы
- 3. Kafka 0.9 Потребитель
- 4. Apache Apex - Кафка 0.9 безопасные темы Кафка
- 5. Kafka 0.9 - Как создать тему через java api
- 6. Как получить все темы в apache kafka?
- 7. Весна Кафка Интеграция для Kafka 0.9
- 8. Apache Kafka Scaling Темы с использованием разделов
- 9. Как скомпилировать тестовый оператор Kafka 0.9 с Apache Apex?
- 10. Что такое ограничения имен темы Apache Kafka?
- 11. Проверка закрытия KafkaProducer. (Java, 0.9)
- 12. Коннектор Kafka - JMSSourceConnector для темы Kafka
- 13. Kafka 0.9 Весна Интеграция Конфигурация DSL
- 14. Почему производитель kafka 0.10 не может отправлять сообщения kafka 0.9?
- 15. Использование Apache Drill для запроса kafka
- 16. Инструменты для чтения потребительского смещения от kafka 0.9
- 17. Производительность Kafka Producer 0.9 с небольшими сообщениями
- 18. Как издатель публиковать сообщение для темы в Apache Kafka?
- 19. Свойства Apache Kafka-Zookeeper
- 20. Конфигурирование ACL для темы kafka
- 21. Is kafka потребитель 0.9 задняя совместимость?
- 22. Apache Kafka - потребитель iOS
- 23. Apache Kafka Заказ поставки
- 24. Java - создание новой темы
- 25. Разделы темы Kafka для Spark streaming
- 26. Использование темы kafka с помощью logstash для elasticSearch
- 27. Создание индекса в улье 0.9
- 28. Kafka 0.9: Долговечность и устойчивость векторов смещения потребителя
- 29. Spark 1.5 как клиент для KAFKA 0.9 или 0.10
- 30. Apache Kafka для сохранения данных временных рядов
Другими словами, вы пробовали аналогичное решение, и оно не работает, не так ли? Пожалуйста, опишите, что вы пробовали, и проблемы, с которыми вы столкнулись. –
Я работаю в компании. Он запускает Kafka 0.8.2. Я реализовал его на основе ссылки. Теперь компания хочет перейти на Kafka 0.9. Мне нужно быстрое решение для обновления моего кода до 0,9. –