2016-02-11 2 views
1

Я пытаюсь отслеживать потребительские смещения данной группы с Java API. Я создаю еще одного потребителя, который не подписывается ни на какую тему, а просто вызывает consumer.committed(topic), чтобы получить информацию о смещении. Этот вид работ, но:Kafka 0.9 новый потребитель api --- как просто смотреть потребительские смещения

Для тестирования я использую только одного реального потребителя (т. Е. Тот, который подписывается на эту тему). Когда я закрываю его, используя close(), а затем перезагружаю его, требуется 27 секунд между подпиской и первым потреблением сообщений, несмотря на то, что я использую poll(1000).

Я предполагаю, что это связано с перебалансировкой, возможно, смущенной не подписывающим потребителем. Это возможно? Есть ли лучший способ отслеживать смещения с помощью Java API (я знаю об инструменте командной строки, но вам нужно использовать API).

ответ

1

Существуют различные способы, чтобы проверить смещение от темы, в зависимости от цели, что вы хотите его, к тому же из «привержен», что вы описали выше, вот еще два варианта:

1), если вы хотите знать смещение идентификатора, с которого начала потребитель для извлечения данных из брокера в следующий раз тему (ы) начала (ов), то вы должны использовать «позиции», как

long offsetPosition; 
TopicPartition tPartition = new TopicPartition(topic,partitionToReview); 
    offsetPosition = kafkaConsumer.position(tPartition); 
    System.out.println("offset of the next record to fetch is : " + position); 

2), призывающую «смещение()» метод от объекта ConsumerRecord, после проведения опроса от kafkaConsumer

Iterator<ConsumerRecord<byte[],byte[]>> it = kafkaConsumer.poll(1000).iterator(); 
while(it.hasNext()){ 
ConsumerRecord<byte[],byte[]> record = it.next(); 
System.out.println("offset : " + record.offset()); 
} 
+0

Эти два метода работают только для подписчиков. Дело в том, что у меня есть потребитель, который просто контролирует. Он не будет принимать участие в потреблении, поэтому он не может использовать эти методы. – Harald

1

Обнаружено, что потребитель мониторинга добавил к путанице, но не был виновником. В конце концов, это легко понять, хотя и немного неожиданно (для меня как минимум):

По умолчанию для session.timeout.ms составляет 30 секунд. Когда потребитель исчезает, он занимает до 30 секунд, прежде чем он будет объявлен мертвым, и работа будет перебалансирована. Для тестирования я остановил единственного потребителя, который у меня был, подождал три секунды и перезапустил новый. Это заняло 27 секунд до его начала, заполнив 30-секундный тайм-аут.

Я бы ожидал, что один одиночный потребительский запуск не дожидается истечения времени ожидания, но начнет «перебалансировку», то есть немедленно захватит работу. Кажется, что тайм-аут должен истечь, пока работа не будет перебалансирована, даже если есть только один потребитель.

Для тестирования, чтобы пройти быстрее, я изменил конфигурацию, используя нижний session.timeout.ms для потребителя, а также group.min.session.timeout.ms для брокера.

В заключение: использование потребителя, не подписавшегося на какую-либо тему для мониторинга смещений, отлично работает и, похоже, не мешает процессу перебалансировки.

Смежные вопросы