2013-09-01 5 views
0

Буду признателен за вашу помощь по этому вопросу.Apache Kafka - Потребитель не получает сообщения от производителя

Я строю потребитель Apache Kafka, чтобы подписаться на другой уже запущенный Kafka. Теперь моя проблема заключается в том, что, когда мой продюсер толкает сообщение на сервер ... мой потребитель не получает их .. и я получаю ниже информацию в моих журналах печататься ::

13/08/30 18:00:58 INFO producer.SyncProducer: Connected to xx.xx.xx.xx:6667:false for producing 
13/08/30 18:00:58 INFO producer.SyncProducer: Disconnecting from xx.xx.xx.xx:6667:false 
13/08/30 18:00:58 INFO consumer.ConsumerFetcherManager: [ConsumerFetcherManager-  1377910855898] Stopping leader finder thread 
13/08/30 18:00:58 INFO consumer.ConsumerFetcherManager: [ConsumerFetcherManager- 1377910855898] Stopping all fetchers 
13/08/30 18:00:58 INFO consumer.ConsumerFetcherManager: [ConsumerFetcherManager- 1377910855898] All connections stopped 

Я не уверен, если я отсутствует какая-либо важная конфигурация здесь ... Тем не менее, я могу видеть некоторые сообщения, поступающие с моего сервера с помощью WireShark, но они не потребляются моим потребителем ....

Мой код является точной копией примерного примера потребителя: https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example

UPDATE:

[2013-09-03 00:57:30,146] INFO Starting ZkClient event thread.  
(org.I0Itec.zkclient.ZkEventThread) 
[2013-09-03 00:57:30,146] INFO Opening socket connection to server /xx.xx.xx.xx:2181 (org.apache.zookeeper.ClientCnxn) 
[2013-09-03 00:57:30,235] INFO Connected to xx.xx.xx:6667 for producing (kafka.producer.SyncProducer) 
[2013-09-03 00:57:30,299] INFO Socket connection established to 10.224.62.212/10.224.62.212:2181, initiating session (org.apache.zookeeper.ClientCnxn) 
[2013-09-03 00:57:30,399] INFO Disconnecting from xx.xx.xx.net:6667 (kafka.producer.SyncProducer) 
[2013-09-03 00:57:30,400] INFO [ConsumerFetcherManager-1378195030845] Stopping leader finder thread (kafka.consumer.ConsumerFetcherManager) 
[2013-09-03 00:57:30,400] INFO [ConsumerFetcherManager-1378195030845] Stopping all fetchers (kafka.consumer.ConsumerFetcherManager) 
[2013-09-03 00:57:30,400] INFO [ConsumerFetcherManager-1378195030845] All connections stopped (kafka.consumer.ConsumerFetcherManager) 
[2013-09-03 00:57:30,400] INFO [console-consumer-49997_xx.xx.xx-1378195030443-cce6fc51], Cleared all relevant queues for this fetcher (kafka.consumer.ZookeeperConsumerConnector) 
[2013-09-03 00:57:30,400] INFO [console-consumer-49997_xx.xx.xx.-1378195030443-cce6fc51], Cleared the data chunks in all the consumer message iterators (kafka.consumer.ZookeeperConsumerConnector) 
[2013-09-03 00:57:30,400] INFO [console-consumer-49997_xx.xx.xx.xx-1378195030443-cce6fc51], Committing all offsets after clearing the fetcher queues (kafka.consumer.ZookeeperConsumerConnector) 
[2013-09-03 00:57:30,401] ERROR [console-consumer-49997_xx.xx.xx.xx-1378195030443-cce6fc51], zk client is null. Cannot commit offsets (kafka.consumer.ZookeeperConsumerConnector) 
[2013-09-03 00:57:30,401] INFO [console-consumer-49997_xx.xx.xx.xx-1378195030443-cce6fc51], Releasing partition ownership (kafka.consumer.ZookeeperConsumerConnector) 
[2013-09-03 00:57:30,401] INFO [console-consumer-49997_xx.xx.xx.xx-1378195030443-cce6fc51], exception during rebalance (kafka.consumer.ZookeeperConsumerConnector) 
java.util.NoSuchElementException: None.get 
at scala.None$.get(Option.scala:185) 
at scala.None$.get(Option.scala:183) 
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance$2.apply(ZookeeperConsumerConnector.scala:434) 
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance$2.apply(ZookeeperConsumerConnector.scala:429) 
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) 
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) 
at scala.collection.Iterator$class.foreach(Iterator.scala:631) 
at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161) 
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194) 
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) 
at scala.collection.mutable.HashMap.foreach(HashMap.scala:80) 
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:429) 
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:374) 
at scala.collection.immutable.Range$ByOne$class.foreach$mVc$sp(Range.scala:282) 
at scala.collection.immutable.Range$$anon$2.foreach$mVc$sp(Range.scala:265) 
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:369) 
at kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:681) 
at kafka.consumer.ZookeeperConsumerConnector$WildcardStreamsHandler.<init>(ZookeeperConsumerConnector.scala:715) 
at kafka.consumer.ZookeeperConsumerConnector.createMessageStreamsByFilter(ZookeeperConsumerConnector.scala:140) 
at kafka.consumer.ConsoleConsumer$.main(ConsoleConsumer.scala:196) 
at kafka.consumer.ConsoleConsumer.main(ConsoleConsumer.scala) 

ответ

0

Можете ли вы предоставить образец кода производителя?

У вас есть последняя версия версии 0.8? Похоже, что существует некоторая известная проблема с consumerFetched deadlock, которая исправлена ​​и исправлена ​​в текущей версии

вы можете попробовать использовать сценарий консоли администратора, чтобы потреблять сообщения, чтобы убедиться, что ваш производитель работает нормально.

Если возможно разместить несколько больше журналов и фрагмент кода, должны помочь отладке дальнейшего (это, кажется, мне нужно больше репутации, чтобы сделать комментарий, так должен был ответить, а)

+0

Спасибо за реагирование ... Что журналы вам нужно в частности. Кроме того, где я могу найти сценарий консоли администратора для проверки моего продюсера ... Я думаю, что мой продюсер работает, потому что я могу видеть некоторые сообщения, поступающие с моего сервера, используя WireShark на потребительском терминале. – ASingh

+0

Скрипт консоли можно найти внутри вашего пути 'KAFKA_DIR/bin' .. и [здесь] (http://kafka.apache.org/08/quickstart.html) пример команды, чтобы вы начали, как ваш код производителя выглядит как ? – user2720864

+0

Спасибо, что снова ответили ... Я попробовал использовать скрипт потребительской консоли ... Однако, когда я пытаюсь подключиться к моему серверу, я получаю трассировку ниже исключения, напечатанную на консоли ... Я обновил свой вопрос с исключением след .. какой-то перебалансировка .. я dnt понимаю .. – ASingh

Смежные вопросы