2015-06-09 3 views
0

Я очень новичок в Kafka. Я создаю две темы и публикую эти две темы от двух продюсеров. У меня есть один потребитель, который потребляет сообщения из обеих тем. Это потому, что я хочу обрабатывать в соответствии с приоритетом.Kafka Consumer для чтения из нескольких тем

Я получаю поток от обеих тем, но как только я начинаю итерацию на ConsumerItreator любого потока, он блокируется там. Как написано в документации, он будет заблокирован до получения нового сообщения.

Кто-нибудь знает, как читать из двух тем и двух потоков от одного потребителя Kafka?

Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); 
      topicCountMap.put(KafkaConstants.HIGH_TEST_TOPIC, new Integer(1)); 
      topicCountMap.put(KafkaConstants.LOW_TEST_TOPIC, new Integer(1)); 
      Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector.createMessageStreams(topicCountMap); 
      KafkaStream<byte[], byte[]> highPriorityStream = consumerMap.get(KafkaConstants.HIGH_TEST_TOPIC).get(0); 
      ConsumerIterator<byte[], byte[]> highPrioerityIterator = highPriorityStream.iterator(); 

      while (highPriorityStream.nonEmpty() && highPrioerityIterator.hasNext()) 
      { 
       byte[] bytes = highPrioerityIterator.next().message(); 
       Object obj = null; 
       CLoudDataObject thunderDataObject = null; 
       try 
       { 

        obj = SerializationUtils.deserialize(bytes); 
        if (obj instanceof CLoudDataObject) 
        { 
         thunderDataObject = (CLoudDataObject) obj; 
         System.out.println(thunderDataObject); 
         // TODO Got the Thunder object here, now write code to send it to Thunder service. 
        } 

       } 
       catch (Exception e) 
       { 
       } 
      } 

ответ

0

Если вы не хотите обрабатывать сообщения с более низким приоритетом перед высокими приоритетными, как насчет установки consumer.timeout.ms собственности и поймать ConsumerTimeoutException обнаружить, что потоки для достижения высокого приоритета последнего сообщения доступно? По умолчанию для него установлено значение -1, пока не поступит новое сообщение. (http://kafka.apache.org/07/configuration.html)

Ниже объясняется способ обработки нескольких потоков одновременно с различными приоритетами.

Kafka требует многопоточного программирования. В вашем случае потоки двух тем должны обрабатываться потоками для потоков. Поскольку каждый поток будет работать независимо для обработки сообщений, один поток блокировки (поток) не будет влиять на другие потоки.

Реализация ThreadPool Java может помочь в создании многопоточного приложения. Вы можете найти пример реализации здесь:

https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example

Что касается приоритета выполнения, вы можете вызвать метод Thread.currentThread.setPriority иметь правильные приоритеты потоков, основанные на их обслуживающую Кафке тему.

+0

Спасибо за ваш ответ. Но просто установив приоритет потока не поможет. Мой случай использования сначала потребляет тему с высоким приоритетом, если он пуст, а затем потребляет из темы с низким приоритетом. Пожалуйста, проверьте ответ, присутствующий в этом сообщении, и кажется, что «небо» достигло той же функциональности: http://stackoverflow.com/questions/30655361/does-apache-kafka-supports-priority-for-topic-or-message – aviundefined

+1

Теперь я понимаю ваше требование. Как насчет установки свойства consumer.timeout.ms и catch ConsumerTimeoutException, чтобы обнаружить, что потребитель достигает последнего доступного сообщения? По умолчанию установлено значение -1, чтобы не вызывать тайм-аут. http://kafka.apache.org/07/configuration.html – gonbe

+0

Это также не решит проблему. Он не может справиться со следующим сценарием: 1) Допустим, у нас есть две темы «Высокий» и «Низкий», а в разделе «Низкий» у нас очень большой поток сообщений. Поэтому, как только он начнет считывать сообщение из «Низкого», он не ударит по таймауту, пока поток «Низкий» не станет пустым для времени тайм-аута (которое очень низкое как 100 мс) Пожалуйста, исправьте меня, если я ошибаюсь – aviundefined

Смежные вопросы