Я очень новичок в Kafka. Я создаю две темы и публикую эти две темы от двух продюсеров. У меня есть один потребитель, который потребляет сообщения из обеих тем. Это потому, что я хочу обрабатывать в соответствии с приоритетом.Kafka Consumer для чтения из нескольких тем
Я получаю поток от обеих тем, но как только я начинаю итерацию на ConsumerItreator
любого потока, он блокируется там. Как написано в документации, он будет заблокирован до получения нового сообщения.
Кто-нибудь знает, как читать из двух тем и двух потоков от одного потребителя Kafka?
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(KafkaConstants.HIGH_TEST_TOPIC, new Integer(1));
topicCountMap.put(KafkaConstants.LOW_TEST_TOPIC, new Integer(1));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector.createMessageStreams(topicCountMap);
KafkaStream<byte[], byte[]> highPriorityStream = consumerMap.get(KafkaConstants.HIGH_TEST_TOPIC).get(0);
ConsumerIterator<byte[], byte[]> highPrioerityIterator = highPriorityStream.iterator();
while (highPriorityStream.nonEmpty() && highPrioerityIterator.hasNext())
{
byte[] bytes = highPrioerityIterator.next().message();
Object obj = null;
CLoudDataObject thunderDataObject = null;
try
{
obj = SerializationUtils.deserialize(bytes);
if (obj instanceof CLoudDataObject)
{
thunderDataObject = (CLoudDataObject) obj;
System.out.println(thunderDataObject);
// TODO Got the Thunder object here, now write code to send it to Thunder service.
}
}
catch (Exception e)
{
}
}
Спасибо за ваш ответ. Но просто установив приоритет потока не поможет. Мой случай использования сначала потребляет тему с высоким приоритетом, если он пуст, а затем потребляет из темы с низким приоритетом. Пожалуйста, проверьте ответ, присутствующий в этом сообщении, и кажется, что «небо» достигло той же функциональности: http://stackoverflow.com/questions/30655361/does-apache-kafka-supports-priority-for-topic-or-message – aviundefined
Теперь я понимаю ваше требование. Как насчет установки свойства consumer.timeout.ms и catch ConsumerTimeoutException, чтобы обнаружить, что потребитель достигает последнего доступного сообщения? По умолчанию установлено значение -1, чтобы не вызывать тайм-аут. http://kafka.apache.org/07/configuration.html – gonbe
Это также не решит проблему. Он не может справиться со следующим сценарием: 1) Допустим, у нас есть две темы «Высокий» и «Низкий», а в разделе «Низкий» у нас очень большой поток сообщений. Поэтому, как только он начнет считывать сообщение из «Низкого», он не ударит по таймауту, пока поток «Низкий» не станет пустым для времени тайм-аута (которое очень низкое как 100 мс) Пожалуйста, исправьте меня, если я ошибаюсь – aviundefined