Я пытаюсь использовать низкоуровневый пользовательский Java API для управления смещениями вручную, с последним kafka_2.10-0.8.2.1. Чтобы убедиться, что смещения, которые я совершаю/читаю из Kafka, верны, я использую инструмент kafka.tools.ConsumerOffsetChecker.Пояснения к офсетным операциям API Java Kafka
Ниже приведен пример вывода для моей темы/группы потребителей:
./bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group elastic_search_group --zookeeper localhost:2181 --topic my_log_topic
Group Topic Pid Offset logSize Lag Owner
elastic_search_group my_log_topic 0 5 29 24 none
Вот моя интерпретация результата:
Смещение = 5 -> это текущее смещение мой «elastic_search_group» потребитель
logSize = 29 -> это самый последний сдвиг - смещение следующего сообщения, которое придет на эту тему/раздел
Лаг = 24 -> 29-5 - сколько сообщений еще не обработаны моим 'elastic_search_group' потребитель
Pid - идентификатор раздела
Q1: правильно ли это?
Теперь я хочу получить ту же информацию от своего потребителя Java. Здесь я обнаружил, что мне пришлось использовать два разных API:
kafka.javaapi. OffsetRequest, чтобы получить самые ранние и последние смещения, но kafka.javaapi. OffsetFetchRequest для получения текущего смещения.
Чтобы получить раннее (или Latest) смещение я:
TopicAndPartition topicAndPartition = new TopicAndPartition(myTopic, myPartition);
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(OffsetRequest.EarliestTime(), 1));
// OR for Latest: requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(OffsetRequest.LatestTime(), 1));
kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
OffsetResponse response = simpleConsumer.getOffsetsBefore(request);
long[] offsets = response.offsets(topic, partition);
long myEarliestOffset = offsets[0];
// OR for Latest: long myLatestOffset = offsets[0];
А чтобы получить текущее смещение я должен использовать совершенно другой API:
short versionID = 0;
int correlationId = 0;
List<TopicAndPartition> topicPartitionList = new ArrayList<TopicAndPartition>();
TopicAndPartition myTopicAndPartition = new TopicAndPartition(myTopic, myPartition);
topicPartitionList.add(myTopicAndPartition);
OffsetFetchRequest offsetFetchReq = new OffsetFetchRequest(
kafkaGroupId, topicPartitionList, versionID, correlationId, kafkaGroupId);
OffsetFetchResponse offsetFetchResponse = simpleConsumer.fetchOffsets(offsetFetchReq);
long currentOffset = offsetFetchResponse.offsets().get(myTopicAndPartition).offset();
Q2: это правильно? почему существуют два разных API, чтобы получить очень похожую информацию?
Q3: имеет значение, какая версияId и correId я использую здесь? Я, хотя versionId должен быть 0 для pre-0.8.2.1 kafka, и должен быть 1 для 0.8.2.1 и более поздних версий, но похоже, что он работает с 0 для 0.8.2.1, а также - см. Ниже?
Итак, для примера, состояния вышеуказанной темы, и над выходом ConsumerOffsetChecker, вот что я получаю от моего Java кода:
currentOffset = 5; earliestOffset = 29; lastOffset = 29
«currentOffset», похоже, хорошо, «lastOffset» тоже верен, но «раннее окно»? Я бы ожидал, что это будет как минимум «5»?
Q4: Как могло случиться, что самое раннееОфис выше, чем currentOffset? Мое единственное подозрение в том, что сообщения из темы были очищены из-за политики хранения .... В других случаях это могло случиться?
Спасибо, @ Shades88! После некоторого теста для № 4 - я пришел к такому же выводу, что такая ситуация произойдет, когда журналы будут очищены из-за политики хранения. Таким образом, я добавил обработку этого аргумента в свою потребительскую логику - подтвердите, что текущее смещение> = самое раннее смещение и установите его в значение «Раннее окно», если нет. Благодаря! – Marina
Что касается 'versionId', если вы указываете' 0', смещения сохраняются в Zookeeper, и если вы используете '1', смещение сохраняется в специальной теме Kafka. –
Полезная страница http://grokbase.com/t/kafka/users/154g34g133/simpleconsumer-getoffsetsbefore-problem –