5

Я новичок в Kafka 0.9 и тестирую некоторые функции, которые я понял странным поведением в реализованной Java-потребителе (KafkaConsumer).Метод poll опроса Kafka Consumer заблокирован

Брокер Kafka находится в Ambari внешняя машина.

Даже я могу реализовать Продюсера и начать отправлять сообщения внешнему брокеру, я не знаю, почему, когда потребитель пытается прочитать события (опрос), он застревает.

Я знаю, что производитель работает хорошо, так как я могу потреблять сообщения через консольного потребителя (который работает локально на ambari). Но когда я выполняю Java-потребитель, ничего не происходит, просто застревает. Отладка кода я мог видеть, что он будет заблокирован на poll() линии:

ConsumerRecords<String, String> records = consumer.poll(100); 

Таймаут ничего не делает, кстати. Неважно, если вы ставите 0, 100 или 1000 мс, потребитель блокируется в этой строке и не выполняет тайм-аут и не исключает исключения.

Я попробовал все виды альтернативных свойств, таких как advertised.host.name, advertised.listener ... и так далее, с нулевой удачи.

Любая помощь будет высоко оценена. Заранее спасибо!

+0

Можете ли вы использовать сообщения по-другому, например, используя 'kafka-console-consumer.sh'? –

+0

Да, я. Из машины, на которой размещается амбари, я могу потреблять сообщения через консольного пользователя –

+0

А как насчет машины, на которой вы запускаете своего потребителя? Вы попробовали консольный потребитель? –

ответ

1

Причиной может быть машина, на которой работает ваш потребительский код, не может подключиться к zookeeper. Попробуйте запустить тот же код пользователя на компьютере, где установлен ваш Kafka (я попробовал это и работал для меня). Я также решил проблему, указав ниже свойства в файле server.properties: advertised.host.name="ip address which you want to expose" // В моем случае это общедоступный ip машины ec2, у меня есть kafka и zookeeper, установленные на том же ec2. advertised.port=9092 ConsumerRecords<String, String> records = consumer.poll(100); Вышеприведенное заявление не означает, что потребитель будет тайм-аут после 100 мс, это период опроса. Независимо от того, какие данные он захватывает за 100 мс, считывается в коллекцию записей.

Смежные вопросы