2015-02-11 2 views
9

У меня есть простой Кафка Потребитель в Java с помощью следующего кодаКафка Потребитель висит на .hasNext в Java

public void run() { 
     ConsumerIterator<byte[], byte[]> it = m_stream.iterator(); 
     while (it.hasNext()&& !done){ 
      try { 
       System.out.println("Parsing data"); 
       byte[] data = it.next().message(); 
       System.out.println("Found data: "+data); 
       values.add(data); // array list 
      } catch (InvalidProtocolBufferException e) { 
       e.printStackTrace(); 
      } 
     } 
     done = true; 
    } 

Когда сообщение публикуемого, данные успешно считан, однако, когда он возвращается к проверке его .hasNext(), он остается в ожидании и никогда не возвращается.

Что может быть затормозить это?

m_stream является KafkaStream получается следующим образом:

Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); 
topicCountMap.put(topic, new Integer(a_numThreads)); 
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); 
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); 
executor = Executors.newFixedThreadPool(a_numThreads); 
for (final KafkaStream stream : streams) { 
    // m_stream is one of these streams 
} 

ответ

11

Решение было добавить свойство

"consumer.timeout.ms"

Теперь, когда таймаут достигнут ConsumerTimeoutException является брошенный

4

Метод hasNext() блокирует.

вы можете изменить тайм-аут блокировки в свойстве consumer.timeout.ms

Заметим, что он будет бросать TimeoutException, когда тайм-аут истекает.

читали бы эти документы о потребителей: https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example

Смежные вопросы