2016-01-17 3 views
13

Я делаю Kafka Quickstart для Kafka 0.9.0.0.Потребитель не получает сообщения, kafka console, новый потребитель api, Kafka 0.9

Я имею Zookeeper слушать в localhost:2181, потому что я побежал

bin/zookeeper-server-start.sh config/zookeeper.properties 

У меня есть один брокер прослушивания в localhost:9092, потому что я побежал

bin/kafka-server-start.sh config/server.properties 

У меня есть продюсер объявление на тему «тест», потому что я пробег

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 
yello 
is this thing on? 
let's try another 
gimme more 

Когда я запускаю старый API consu Мер, он работает, запустив

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning 

Однако, когда я запускаю новый API потребитель, я не получаю ничего, когда я бегу

bin/kafka-console-consumer.sh --new-consumer --topic test --from-beginning \ 
    --bootstrap-server localhost:9092 

Можно ли подписаться на тему из пользователь консоли, используя новый api? Как я могу это исправить?

+0

Какая версия scala вы используете? Вы компилировали кафку? У меня была пара незначительных проблем с kafka_2.10-0.9.0.0.tgz, но с kafka_2.101-0.9.0.0.tgz он работает как шарм, включая ваш пример. – vlain

+0

Хорошо спасибо, это было с 2.10. Если я попробую снова, это будет с 2.11. – EthanP

+0

Вы создали тему 'test'? –

ответ

1

Можете ли вы попробовать так:

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic 
+1

Эта команда предназначена для запуска Old Consumer API. Ваш ответ уже упоминается в проблеме. –

0

Используйте это: бен/kafka-console-consumer.sh --bootstrap-сервер локальный: 9092 --topic тест --from-начало

Примечания: Удалить «--Новогодний-потребитель» из вашей команды

Для справки смотрите здесь: https://kafka.apache.org/quickstart

+0

Тогда он не будет использовать нового потребителя, и вопрос в том, как получать сообщения с использованием нового потребителя. –

+0

0.9.0.0 в этой версии Kafka, их новый потребитель консоли не работал, они дали java-потребителю, но не консольному потребителю. И теперь они полностью удалили «-new-consumer» из более поздних версий. –

0

у меня такая же проблема, теперь я понял,.

Когда вы используете --zookeeper, он должен быть снабжен адресом zookeeper в качестве параметра.

Когда вы используете --bootstrap-сервер, в качестве параметра должен быть указан адрес брокера.

+0

В вопросе говорится, что они уже предоставляют адрес брокера в качестве параметра; Порт 9092 является портом Kafka по умолчанию. –

+0

ну, не всегда, если вы загружаете песочницу HDP для докеров, она по умолчанию задает ее до 6667 – Loebre

1

Я только что столкнулся с этой проблемой, и решение было удалить /brokers в zookeeper и перезапустить узлы kafka.

bin/zookeeper-shell <zk-host>:2181

, а затем

rmr /brokers

Не знаю, почему это решает ее.

Когда я активировал ведение журнала отладки, я видел это сообщение об ошибке снова и снова в потребителе:

2017-07-07 01:20:12 DEBUG AbstractCoordinator:548 - Sending GroupCoordinator request for group test to broker xx.xx.xx.xx:9092 (id: 1007 rack: null) 2017-07-07 01:20:12 DEBUG AbstractCoordinator:559 - Received GroupCoordinator response ClientResponse(receivedTimeMs=1499390412231, latencyMs=84, disconnected=false, requestHeader={api_key=10,api_version=0,correlation_id=13,client_id=consumer-1}, responseBody={error_code=15,coordinator={node_id=-1,host=,port=-1}}) for group test 2017-07-07 01:20:12 DEBUG AbstractCoordinator:581 - Group coordinator lookup for group test failed: The group coordinator is not available. 2017-07-07 01:20:12 DEBUG AbstractCoordinator:215 - Coordinator discovery failed for group test, refreshing metadata

-2

В kafka_2.11-0.11.0.0 сервере Zookeeper устарела и и использует bootstrap-server, и он будет принимать адрес и порт брокерского IP-адреса. Если вы дадите правильные параметры брокера, вы сможете использовать сообщения.

например. $ bin/kafka-console-consumer.sh --bootstrap-server: 9093 --topic test - от начала

Я использую порт 9093, для вас он может отличаться.

Отношения.

+0

В вопросе говорится, что это то, что они уже делают. –

0

Ваш местный хост здесь. , если вы замените слово localhost на фактическое имя хоста, оно должно работать.

так:

производитель

./bin/kafka-console-producer.sh --broker-list \ 
sandbox-hdp.hortonworks.com:9092 --topic test 

потребитель:

./bin/kafka-console-consumer.sh --topic test --from-beginning \  
--bootstrap-server bin/kafka-console-consumer.sh --new-consumer \ 
--topic test --from-beginning \ 
--bootstrap-server localhost:9092 
1

Эта проблема также влияет глотания данные из Кафки, используя желоб и тонуть данные HDFS.

Чтобы устранить вышеуказанную проблему:

  1. Стоп Кафки брокер
  2. Подключения к Zookeeper кластера и удалить/брокер г узла
  3. Restart Кафка брокеры

Там нет вопроса относительно для версии клиента kafka и версии scala, которую мы используем в кластере. Zookeeper может иметь неправильную информацию о хозяевах брокера.

Чтобы проверить действие:

Создать тему в Кафке.

Кафка-консоль потребитель --bootstrap-сервер slavenode01.cdh.com:9092 --topic rkkrishnaa3210 --from-начало

Открыть канал продюсера и кормить некоторые сообщения к нему.

Кафка-консоль продюсер --broker-лист slavenode03.cdh.com:9092 --topic rkkrishnaa3210

Открытого потребитель канал потребить сообщение от конкретной темы.

Кафка-консоль потребителя --bootstrap-сервер slavenode01.cdh.com:9092 --topic rkkrishnaa3210 --from-начало

Чтобы проверить это с лотком:

Flume агент конфигурации:

rk.sources = source1 
rk.channels = channel1 
rk.sinks = sink1 

rk.sources.source1.type = org.apache.flume.source.kafka.KafkaSource 
rk.sources.source1.zookeeperConnect = ip-20-0-21-161.ec2.internal:2181 
rk.sources.source1.topic = rkkrishnaa321 
rk.sources.source1.groupId = flume1 
rk.sources.source1.channels = channel1 
rk.sources.source1.interceptors = i1 
rk.sources.source1.interceptors.i1.type = timestamp 
rk.sources.source1.kafka.consumer.timeout.ms = 100 
rk.channels.channel1.type = memory 
rk.channels.channel1.capacity = 10000 
rk.channels.channel1.transactionCapacity = 1000 
rk.sinks.sink1.type = hdfs 
rk.sinks.sink1.hdfs.path = /user/ce_rk/kafka/%{topic}/%y-%m-%d 
rk.sinks.sink1.hdfs.rollInterval = 5 
rk.sinks.sink1.hdfs.rollSize = 0 
rk.sinks.sink1.hdfs.rollCount = 0 
rk.sinks.sink1.hdfs.fileType = DataStream 
rk.sinks.sink1.channel = channel1 

Run желобе агент:

flume-ng agent --conf . -f flume.conf -Dflume.root.logger=DEBUG,console -n rk 

Наблюдайте журналы от потребителя, что меня ssage из темы написано в HDFS.

18/02/16 05:21:14 INFO internals.AbstractCoordinator: Successfully joined group flume1 with generation 1 
18/02/16 05:21:14 INFO internals.ConsumerCoordinator: Setting newly assigned partitions [rkkrishnaa3210-0] for group flume1 
18/02/16 05:21:14 INFO kafka.SourceRebalanceListener: topic rkkrishnaa3210 - partition 0 assigned. 
18/02/16 05:21:14 INFO kafka.KafkaSource: Kafka source source1 started. 
18/02/16 05:21:14 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: source1: Successfully registered new MBean. 
18/02/16 05:21:14 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: source1 started 
18/02/16 05:21:41 INFO hdfs.HDFSDataStream: Serializer = TEXT, UseRawLocalFileSystem = false 
18/02/16 05:21:42 INFO hdfs.BucketWriter: Creating /user/ce_rk/kafka/rkkrishnaa3210/18-02-16/FlumeData.1518758501920.tmp 
18/02/16 05:21:48 INFO hdfs.BucketWriter: Closing /user/ce_rk/kafka/rkkrishnaa3210/18-02-16/FlumeData.1518758501920.tmp 
18/02/16 05:21:48 INFO hdfs.BucketWriter: Renaming /user/ce_rk/kafka/rkkrishnaa3210/18-02-16/FlumeData.1518758501920.tmp to /user/ce_rk/kafka/rkkrishnaa3210/18-02-16/FlumeData.1518758501920 
18/02/16 05:21:48 INFO hdfs.HDFSEventSink: Writer callback called.