2015-11-25 2 views
12

Я пытаюсь выяснить, что заставляет моих текущих потребителей высокого уровня отработать. Я использую Kafka 0.8.2.1, с no «offset.storage», установленный в server.properties Kafka - что, я думаю, означает, что смещения хранятся в Kafka. (Я также проверил, что никаких зачетов не сохраняются в Zookeeper, проверяя этот путь в оболочке Zk: /потребителей/consumer_group_name/Смещение/TOPIC_NAME/partition_number)Kafka 0.8.2.1 как читать с __consumer_offsets topic

Я пытался слушать __consumer_offsets тему, чтобы увидеть, потребитель сохраняет какое значение смещения, но он не работает ...

Я попытался следующие:

создал конфигурационный файл для консоли потребителя следующим образом:

=> more kafka_offset_consumer.config 

exclude.internal.topics=false 

и попробовал две версии потребительских сценариев консоли:

#1: 
bin/kafka-console-consumer.sh --consumer.config kafka_offset_consumer.config --topic __consumer_offsets --zookeeper localhost:2181 

#2 
./bin/kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 0 --broker-list localhost:9092 --formatter "kafka.server.OffsetManager\$OffsetsMessageFormatter" --consumer.config kafka_offset_consumer.config 

Ни работал - он просто сидит там, но не печатает ничего, несмотря на то, что потребители активно потребляющих/сохранение зачетов.

Я пропустил другую конфигурацию/свойства?

спасибо!

Марина

ответ

22

Я наткнулся на этот вопрос, пытаясь также потреблять от __consumer_offsets тему. мне удалось понять это для разных версий Кафки и думал, что я бы поделиться тем, что я нашел

Для Кафки 0.8.2.x

#Create consumer config 
echo "exclude.internal.topics=false" > /tmp/consumer.config 
#Consume all offsets 
./kafka-console-consumer.sh --consumer.config /tmp/consumer.config \ 
--formatter "kafka.server.OffsetManager\$OffsetsMessageFormatter" \ 
--zookeeper localhost:2181 --topic __consumer_offsets --from-beginning 

Для Кафки 0.9.xx и 0.10.0.0

#Create consumer config 
echo "exclude.internal.topics=false" > /tmp/consumer.config 
#Consume all offsets 
./kafka-console-consumer.sh --consumer.config /tmp/consumer.config \ 
--formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter" \ 
--zookeeper localhost:2181 --topic __consumer_offsets --from-beginning 

Для 0.11.0.0

#Create consumer config 
echo "exclude.internal.topics=false" > /tmp/consumer.config 
#Consume all offsets 
./kafka-console-consumer.sh --consumer.config /tmp/consumer.config \ 
--formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" \ 
--zookeeper localhost:2181 --topic __consumer_offsets --from-beginning 
+0

awesome, спасибо за обмен! – Marina

+3

На сегодняшний день форматтер обновлен до 'kafka.coordinator.group.GroupMetadataManager \ $ OffsetsMessageFormatter'. – lafolle

+0

Для более новых версий (.10 или новее, я предполагаю) рассмотрим использование --bootstrap-server: 9092 вместо --zookeeper: 2181 – TheFiddlerWins

2

Если добавить --from-beginning это скорее всего должно дает некоторые результаты, по крайней мере, это было, когда я попробовал сам. И если вы не указали этот аргумент, но прочитали больше сообщений (и совершили смещение затухания), пока вы слушаете этого пользователя, это также должно отображать сообщения.

+0

Спасибо, Гийом. К сожалению, добавление - от начала не помогло - такое же поведение ....потребитель просто сидит там, не получая никаких данных, когда я Cntr-C это - он говорит «0 сообщений» потребляется :( – Marina

3

Хорошо, я выяснил, в чем проблема. Мой Кафка фактически использовал Zookeeper как смещение хранения, а не Кафка .... Причина я не обнаружил, что сразу же, потому что я проверял содержание ZK неверно:

Я делал

ls /consumers/consumer_group_name/offsets/topic_name/partition_number 

и ничего не видя. Вместо этого я должен был «получить» содержание - что сделало шоу правильных смещения для своих потребителей, как показано ниже:

get /consumers/consumer_group_name/offsets/topic_name/partition_number 
185530404 
cZxid = 0x70789ad05 
ctime = Mon Nov 23 17:49:46 GMT 2015 
mZxid = 0x7216cdc5c 
mtime = Thu Dec 03 20:18:57 GMT 2015 
pZxid = 0x70789ad05 
cversion = 0 
dataVersion = 3537384 
aclVersion = 0 
ephemeralOwner = 0x0 
dataLength = 9 
numChildren = 0 
Смежные вопросы