2015-05-20 2 views
9

Я пытаюсь использовать низкоуровневый пользовательский Java API для управления смещениями вручную, с последним kafka_2.10-0.8.2.1. Чтобы убедиться, что смещения, которые я совершаю/читаю из Kafka, верны, я использую инструмент kafka.tools.ConsumerOffsetChecker.Пояснения к офсетным операциям API Java Kafka

Ниже приведен пример вывода для моей темы/группы потребителей:

./bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group elastic_search_group --zookeeper localhost:2181 --topic my_log_topic 
Group           Topic                          Pid Offset          logSize         Lag             Owner 
elastic_search_group my_log_topic              0   5               29              24              none 

Вот моя интерпретация результата:

Смещение = 5 -> это текущее смещение мой «elastic_search_group» потребитель

logSize = 29 -> это самый последний сдвиг - смещение следующего сообщения, которое придет на эту тему/раздел

Лаг = 24 -> 29-5 - сколько сообщений еще не обработаны моим 'elastic_search_group' потребитель

Pid - идентификатор раздела

Q1: правильно ли это?

Теперь я хочу получить ту же информацию от своего потребителя Java. Здесь я обнаружил, что мне пришлось использовать два разных API:

kafka.javaapi. OffsetRequest, чтобы получить самые ранние и последние смещения, но kafka.javaapi. OffsetFetchRequest для получения текущего смещения.

Чтобы получить раннее (или Latest) смещение я:

TopicAndPartition topicAndPartition = new TopicAndPartition(myTopic, myPartition); 
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>(); 
requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(OffsetRequest.EarliestTime(), 1)); 
// OR for Latest: requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(OffsetRequest.LatestTime(), 1)); 
kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName); 
OffsetResponse response = simpleConsumer.getOffsetsBefore(request); 
long[] offsets = response.offsets(topic, partition); 
long myEarliestOffset = offsets[0]; 
// OR for Latest: long myLatestOffset = offsets[0]; 

А чтобы получить текущее смещение я должен использовать совершенно другой API:

short versionID = 0; 
int correlationId = 0; 
List<TopicAndPartition> topicPartitionList = new ArrayList<TopicAndPartition>();  
TopicAndPartition myTopicAndPartition = new TopicAndPartition(myTopic, myPartition); 
topicPartitionList.add(myTopicAndPartition); 
OffsetFetchRequest offsetFetchReq = new OffsetFetchRequest(
kafkaGroupId, topicPartitionList, versionID, correlationId, kafkaGroupId); 
OffsetFetchResponse offsetFetchResponse = simpleConsumer.fetchOffsets(offsetFetchReq); 
long currentOffset = offsetFetchResponse.offsets().get(myTopicAndPartition).offset(); 

Q2: это правильно? почему существуют два разных API, чтобы получить очень похожую информацию?

Q3: имеет значение, какая версияId и correId я использую здесь? Я, хотя versionId должен быть 0 для pre-0.8.2.1 kafka, и должен быть 1 для 0.8.2.1 и более поздних версий, но похоже, что он работает с 0 для 0.8.2.1, а также - см. Ниже?

Итак, для примера, состояния вышеуказанной темы, и над выходом ConsumerOffsetChecker, вот что я получаю от моего Java кода:

currentOffset = 5; earliestOffset = 29; lastOffset = 29

«currentOffset», похоже, хорошо, «lastOffset» тоже верен, но «раннее окно»? Я бы ожидал, что это будет как минимум «5»?

Q4: Как могло случиться, что самое раннееОфис выше, чем currentOffset? Мое единственное подозрение в том, что сообщения из темы были очищены из-за политики хранения .... В других случаях это могло случиться?

ответ

10

Я искал способы поиска отставания в перегородках. И это включает в себя те же шаги, которые вы предприняли. До сих пор, из того, что я узнал, я могу дать вам ответы.

  1. logSize указывает, сколько сообщений было накоплено в этом конкретном разделе. Или он определяет максимальное смещение сообщений в этом разделе. Смещение - это смещение последнего успешно используемого сообщения. Таким образом, отставание - это просто разница между размером журнала и смещением.
  2. Да, это правильно. Пока что это единственные два способа найти текущее смещение и самое раннее или последнее смещение
  3. Я не знаю, почему необходимо указывать versionId. Вы можете использовать kafka.api.OffsetRequest.CurrentVersion(), чтобы получить версию. Поэтому можно избежать жесткого кодирования. Вы можете смело предположить, что relativeId равен 0.
  4. Это странно. Когда я использую EarliestTime(), я получаю самое раннее смещение как 0, даже когда мое текущее смещение продвигалось намного дальше. Это означает, что это начало раздела. Поэтому, когда некоторые сообщения истекли в будущем, это самое раннее смещение будет тогда отличным от нуля. Теперь, если сообщения были очищены из-за задержки в политике хранения, должны были быть изменены. Я не уверен в этом поведении. Один из способов быть уверенным в том, что он будет работать с потребителем, отметив такое чтение и проверку в своих журналах. Он должен показывать такие строки.

    2015-06-09 18:49:15 :: DEBUG :: PartitionTopicInfo: 52 :: сброс потребления смещения запросов: 2: выборки смещения = 405952: потребленное смещение = 335372 до 335372 2015-06-09 18 : 49: 15 :: ОТЛАДКИ :: PartitionTopicInfo: 52 :: сброса потребляет смещение запросов: 2: принесенное смещение = 405952: потребляло смещение = 335373 до 335373

Обратите внимание, что в вышеуказанных строках логов, принесенное смещение сохраняется то же самое и потребляемое смещение увеличивается. Наконец она закончится в

2015-06-09 18:49:16 :: DEBUG :: PartitionTopicInfo: 52 :: сброса потребляет смещение запросов: 2: принесенное смещение = 405952: потребляло смещение = 405952 до 405952

Тогда это будет означать, что из-за политики хранения журналов с 335372 до 405952 истек

+1

Спасибо, @ Shades88! После некоторого теста для № 4 - я пришел к такому же выводу, что такая ситуация произойдет, когда журналы будут очищены из-за политики хранения. Таким образом, я добавил обработку этого аргумента в свою потребительскую логику - подтвердите, что текущее смещение> = самое раннее смещение и установите его в значение «Раннее окно», если нет. Благодаря! – Marina

+0

Что касается 'versionId', если вы указываете' 0', смещения сохраняются в Zookeeper, и если вы используете '1', смещение сохраняется в специальной теме Kafka. –

+0

Полезная страница http://grokbase.com/t/kafka/users/154g34g133/simpleconsumer-getoffsetsbefore-problem –

Смежные вопросы