2016-07-18 2 views
8

Я пишу потребителю kafka с использованием Java. Я хочу сохранить реальное время сообщения, поэтому, если слишком много сообщений, ожидающих потребления, например, 1000 или более, я должен отказаться от непонятых сообщений и начать использовать их с самого последнего смещения.Как я могу получить ПОСЛЕДНЕЕ смещение темы кафки?

Для этой проблемы я пытаюсь сравнить последнее зафиксированное смещение и последнее смещение темы (только 1 раздел), если разница между этими двумя смещениями больше определенной суммы, я установлю последнее смещение в качестве следующего смещения, чтобы я мог отказаться от этих избыточных сообщений.

Теперь моя проблема заключается в том, как получить последнее смещение темы, некоторые люди говорят, что я могу использовать старого потребителя, но это слишком сложно, новый пользователь имеет эту функцию?

ответ

12

Новый потребитель также сложный.

//assign the topic consumer.assign();

//seek to end of the topic consumer.seekToEnd();

//the position is the latest offset consumer.position();

+0

Это не работает, если вы просто хотите рассчитать разницу между текущим смещением вашего клиента и последним известным смещением темы кафки! – hiaclibe

5

Для версии Кафки: 0.10.1.1

// Get the diff of current position and latest offset 
Set<TopicPartition> partitions = new HashSet<TopicPartition>(); 
TopicPartition actualTopicPartition = new TopicPartition(record.topic(), record.partition()); 
partitions.add(actualTopicPartition); 
Long actualEndOffset = this.consumer.endOffsets(partitions).get(actualTopicPartition); 
long actualPosition = consumer.position(actualTopicPartition);   
System.out.println(String.format("diff: %s (actualEndOffset:%s; actualPosition=%s)", actualEndOffset -actualPosition ,actualEndOffset, actualPosition)); 
0
KafkaConsumer<String, String> consumer = ... 
consumer.subscribe(Collections.singletonList(topic)); 
TopicPartition topicPartition = new TopicPartition(topic, partition); 
consumer.poll(0); 
consumer.seekToEnd(Collections.singletonList(topicPartition)); 
long currentOffset = consumer.position(topicPartition) -1; 

Над сниппета возвращает текущее покончила сообщение смещения для данной темы и раздела номер.

Смежные вопросы