2016-07-26 3 views
2

Я выполнил следующие инструкции, чтобы настроить кластер kafka с несколькими узлами. Теперь, как подключиться к zookeeper? Можно ли подключиться только к одному zookeeper со стороны Producer/consumer в JAVA или есть способ подключить все узлы zookeeper?Подключение к Zookeeper в кластере Apache Kafka Multi Node

Установка нескольких узлов Apache Zookeeper кластера

На каждом узле кластера добавьте следующие строки в файл конфигурации Кафка// zookeeper.properties

server.1=zNode01:2888:3888 
    server.2=zNode02:2888:3888 
    server.3=zNode03:2888:3888 
    #add here more servers if you want 
    initLimit=5 
    syncLimit=2 

На каждом узле кластера создать файл myid в папке, представленной свойством dataDir (по умолчанию в папке находится/tmp/zookeeper). Файл MyID должен только содержит идентификатор znode («1» для zNode01, «2» для ZNode02, и т.д ...)

Установка несколько брокера Apache Кафка кластера

На каждом узле кластера модифицирует изменить свойство zookeeper.connect из файла Кафка/Config/server.properties:

zookeeper.connect=zNode01:2181,zNode02:2181,zNode03:2181 

на каждом узле кластера изменить свойство host.name из файла Кафка/Config/server.properties: host.name = zNode0x

На каждом узле кластера измените свойство broker.id из файла kafka/config/server.properties (каждый брокер в кластере должен иметь уникальный идентификатор)

ответ

3

Вы можете передать все узлы производителя или потребителя , Кафка достаточно умен, что он будет подключаться к узлу, который имеет данные, требуется на основании коэффициента репликации или раздела

Здесь потребитель код:

Properties props = new Properties(); 
    props.put("bootstrap.servers", "acbd.com:9092,defg.com:9092"); 
    props.put("group.id", "test"); 
    props.put("enable.auto.commit", "true"); 
    props.put("auto.commit.interval.ms", "1000"); 
    props.put("session.timeout.ms", "30000"); 
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); 
    consumer.subscribe(Arrays.asList("foo", "bar")); 
    while (true) { 
     ConsumerRecords<String, String> records = consumer.poll(100); 
     for (ConsumerRecord<String, String> record : records) 
      System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value()); 
    } 

Вы можете найти более подробную информацию here

Примечание: Проблема с этим утверждением заключается в том, что он откроет несколько соединений, чтобы узнать, какой узел хранит данные. Для более надежных и масштабируемых систем вы можете сохранить карту номер раздела и имя узла, это также поможет в балансировке нагрузки.

Вот пример продюсер

Properties props = new Properties(); 
props.put("bootstrap.servers", "acbd.com:9092,defg.com:9092"); 
props.put("acks", "all"); 
props.put("retries", 0); 
props.put("batch.size", 16384); 
props.put("linger.ms", 1); 
props.put("buffer.memory", 33554432); 
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 

Producer<String, String> producer = new KafkaProducer<>(props); 
for(int i = 0; i < 100; i++) 
    producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i))); 

producer.close(); 

подробнее here

+0

Как создать несколько разделов для темы? Как это может быть сделано? Разве нам не нужно делегировать это через ZkClient? Обсуждается здесь: http://stackoverflow.com/questions/27036923/how-to-create-a-topic-in-kafka-through-java – amateur

+0

AdminUtils.createTopic (zkUtils, topicName, noOfPartitions, noOfReplication, topicConfiguration); – amateur

+0

Вы можете использовать AdminUtils для создания тем ..Но лучшей практикой является создание его в самом узле с командой coz, это очередная задача. Формат команды /bin/kafka-topics.sh --zookeeper c6401.ambari.apache.org:2181 --create --topic test_topic --partitions 2 --replication-factor 2 Создал тему "test_topic". – Shettyh

0

Нет необходимости передавать свойства соединения Zookeeper в клиентах Кафки (производитель & Consumer).

От Kafka-v9 и выше, Kafka Producer и Consumer не общаются с Zookeeper.

+0

Я использую V9, im gettting следующее исключение, информация о zookeeper tht требуется в свойствах. – amateur

+0

Вызвано: java.lang.IllegalArgumentException: требование не выполнено: отсутствует требуемое свойство 'zookeeper.connect' \t at scala.Predef $ .require (Predef.scala: 233) \t at kafka.utils.VerifiableProperties.getString (VerifiableProperties.scala: 177) \t at kafka.utils.ZKConfig. (ZkUtils.scala: 740) – amateur

+0

Используйте KafkaProducer и KafkaConsumer из библиотеки 'kafka-clients'. –

Смежные вопросы