2015-09-24 3 views
4

Я запускаю кластер Kafka из 6 брокеров, используя библиотеку mesos/kafka. Я могу добавлять и запускать брокеров по 6 различным машинам и отправлять сообщения в кластер с помощью Python SimpleProducer и скрипта kafka-console-producer.sh.Пользователь Kafka «Не удалось добавить лидера для перегородок» ошибка на Mesos

Однако я не могу заставить потребителей работать исправно. Я бег следующей команды потребительской:

bin/kafka-console-consumer.sh --zookeeper 192.168.1.199:2181 --topic test --from-beginning --consumer.config config/consumer.properties --delete-consumer-offsets 

В consumer.properties файл я поставил group.id в my.group и установить zookeeeper.connect к числу узлов в ансамбле Zookeeper. Я получаю следующие warninng сообщения от запуска этого потребителя:

  [2015-09-24 16:01:06,609] WARN [my.group_my_host-1443106865779-b5a3a1e1-leader-finder-thread], Failed to add l 
    eader for partitions [test,4],[test,1],[test,5],[test,2],[test,0],[test,3]; will retry (kafka.consumer.ConsumerFetcherM 
    anager$LeaderFinderThread) 
    java.nio.channels.ClosedChannelException 
      at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) 
      at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78) 
      at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68) 
      at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:127) 
      at kafka.consumer.SimpleConsumer.earliestOrLatestOffset(SimpleConsumer.scala:166) 
      at kafka.consumer.ConsumerFetcherThread.handleOffsetOutOfRange(ConsumerFetcherThread.scala:60) 
      at kafka.server.AbstractFetcherThread$$anonfun$addPartitions$2.apply(AbstractFetcherThread.scala:177) 
      at kafka.server.AbstractFetcherThread$$anonfun$addPartitions$2.apply(AbstractFetcherThread.scala:172) 
      at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) 
      at scala.collection.immutable.Map$Map1.foreach(Map.scala:109) 
      at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) 
      at kafka.server.AbstractFetcherThread.addPartitions(AbstractFetcherThread.scala:172) 
      at kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:87) 
      at kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:77) 
      at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) 
      at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224) 
      at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403) 
      at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) 
      at kafka.server.AbstractFetcherManager.addFetcherForPartitions(AbstractFetcherManager.scala:77) 
      at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:95) 
      at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) 
    {'some':2} 
    [2015-09-24 16:20:02,362] WARN [my.group_my_host-1443108001180-fa0c93e4-leader-finder-thread], Failed to add leader for partitions [test,4],[test,1],[test,5],[test,2],[test,0],[test,3]; will retry (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread) 
    java.nio.channels.ClosedChannelException 
      at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) 
      at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78) 
      at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68) 
      at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:127) 
      at kafka.consumer.SimpleConsumer.earliestOrLatestOffset(SimpleConsumer.scala:166) 
      at kafka.consumer.ConsumerFetcherThread.handleOffsetOutOfRange(ConsumerFetcherThread.scala:60) 
      at kafka.server.AbstractFetcherThread$$anonfun$addPartitions$2.apply(AbstractFetcherThread.scala:177) 
      at kafka.server.AbstractFetcherThread$$anonfun$addPartitions$2.apply(AbstractFetcherThread.scala:172) 
      at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) 
      at scala.collection.immutable.Map$Map1.foreach(Map.scala:109) 
      at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) 
      at kafka.server.AbstractFetcherThread.addPartitions(AbstractFetcherThread.scala:172) 
      at kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:87) 
      at kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:77) 
      at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) 
      at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224) 
      at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403) 
      at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) 
      at kafka.server.AbstractFetcherManager.addFetcherForPartitions(AbstractFetcherManager.scala:77) 
      at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:95) 
      at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) 
    ... 
    // Lots more of this 
    ... 
    Consumed 1 messages 

Я не знаю, почему он не может добавить лидер, лидеры, кажется, в Zookeeper уже. Как и все эти сообщения об ошибках, я могу только получить сообщение одному потребителю. Строка {'some':2} - это сообщение, отправленное с консоли.

Я нашел ошибку в server.log одного из рабов Mesos, не уверен, если это имеет отношение:

[2015-09-24 17:09:41,926] ERROR Closing socket for /192.168.1.199 because of error (kafka.network.Processor) 
java.io.IOException: Broken pipe 
      at sun.nio.ch.FileDispatcherImpl.write0(Native Method) 
      at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) 
      at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) 
      at sun.nio.ch.IOUtil.write(IOUtil.java:65) 
      at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) 
      at kafka.api.TopicDataSend.writeTo(FetchResponse.scala:123) 
      at kafka.network.MultiSend.writeTo(Transmission.scala:101) 
      at kafka.api.FetchResponseSend.writeTo(FetchResponse.scala:231) 
      at kafka.network.Processor.write(SocketServer.scala:472) 
      at kafka.network.Processor.run(SocketServer.scala:342) 
      at java.lang.Thread.run(Thread.java:745) 

Любые предложения относительно того, что может происходить с потребителем или где я могу смотреть устранить проблему?

Zookeeper брокера раздел состояния для одного из журналов разделов:

[zk: localhost:2181(CONNECTED) 166] get /brokers/topics/test/partitions/0/state 
{"controller_epoch":1,"leader":0,"version":1,"leader_epoch":0,"isr":[0]} 

ОС: Ubuntu 14.0.4 Mesos: 0,23 Кафка: 2.10-0.8.2.1

Update: делает некоторые дополнительные испытания с использованием kafka-console-consumer.sh сообщений, похоже, проходят. Сообщения об ошибках являются постоянными, поэтому вы не видите все сообщения в stdout. Python KafkaConsumer с ошибкой с FailedPayloadsError.

ответ

3

Я думаю, вам нужно изучить стоимость недвижимости «advertised.host.name». Я также недавно столкнулся с этой проблемой и исправил использование вышеуказанного имущества.
Пожалуйста, убедитесь, что вы указали правильный IP адрес для каждого BROKER.
Сообщите мне, если не работает.

+0

Спасибо за ответ @Bector. Планировщик mesos/kafka, похоже, не позволяет мне установить это значение. Я поместил файл server.properties, добавив брокер, но он, похоже, фактически не установлен. Даже установка порта с этой библиотекой не работает для меня. Я буду поднимать вопрос на их странице Github. – dsimmie

+0

Возможно, вы не можете изменить его, но свойство, которое вы устанавливаете на server.properties, оно становится переопределенным от некоторых, поэтому вы не можете видеть измененные изменения. Хорошо, что вы подняли эту проблему, но вы должны иметь возможность изменить это свойство. – Bector2

1

Попробуйте выполнить следующую команду:

bin/kafka-topics.sh --zookeeper your.zookeeper:2181 --describe --topic your_topic 

Это покажет, какой брокер является лидером каждого из разделов вашей темы (см этой ссылки для получения более подробной информации: http://kafka.apache.org/documentation.html#quickstart_multibroker)

В моем случае, один брокеры, которые были поставлены лидером, потерпели неудачу и больше не существовали. Новый лидер должен был быть назначен, но по какой-то причине это не так.

Я исправил проблему путем:

  1. Остановка всех производителей и потребителей
  2. Перезапуск каждого оставшегося брокера

Затем я вновь выполнил команду describe (сверху) и было видно, что неудавшийся брокер больше не фигурировал в списке лидеров.

Я тогда основал новый брокер с тем же идентификатором, что и неудавшийся брокер. Кафка взял его оттуда и доставил все данные от моих других брокеров (это требует, чтобы ваша тема имела адекватный коэффициент репликации). Как только данные были закончены, Кафка сделал брокера лидером раздела.

Наконец, я снова начал производство и потребители.

+0

сталкиваюсь этот вопрос в настоящее время, и когда я пытаюсь вашу команду я получаю Тема: MY_TOPIC \t PartitionCount: 1 \t ReplicationFactor: 1 \t Configs: \t Тема: MY_TOPIC \t Раздел: 0 \t Лидер: -1 \t Реплики: 1012 \t Isr: – Ratha

+0

Лидер говорит = -1 – Ratha

+0

@ Ratha Вы нашли исправление к нему? У меня такая же проблема. – user2441441

0

Мой вопроса был:

  1. Run Zookeeper
  2. Создано тема
  3. Run Кафка

Тогда я получаю "Ни один лидера не найден исключение"

Но когда я создал тема, когда Zookeeper и Kafka работали нормально, работала нормально.

Смежные вопросы