У меня возникли серьезные проблемы с попыткой реализовать решение для моих нужд в отношении KafkaConsumer (> = 0,9).Kafka Consumer - поведение опроса
Предположим, у меня есть функция, которая должна читать только n сообщений с темы kafka.
Например: getMsgs(5)
->получает следующие 5 сообщений kafka в теме.
Итак, у меня есть цикл, который выглядит следующим образом:
for (boolean exit= false;!exit;)
{
Records = consumer.poll(200);
for (Record r:records) {
processRecord(r); //do my things
numMss++;
if (numMss==maximum) //maximum=5
exit=true;
}
}
Принимая это во внимание, то проблема заключается в том, что метод опроса() может получить более 5 сообщений. Например, если он получает 10 сообщений, мой код забудет навсегда эти другие 5 сообщений, так как Кафка подумает, что они уже потреблены.
Я попробовал смещение совершал, но не похожа на работу:
consumer.commitSync(Collections.singletonMap(partition,
new OffsetAndMetadata(record.offset() + 1)));
Даже с смещением конфигурации, когда я снова запустить потребитель, он не будет начинаться с 6-го сообщения (помните, я просто хотел 5 сообщений), но из 11th (так как в первом опросе было 10 сообщений).
Есть ли какое-либо решение для этого, или, может быть, (самое главное), я что-то упустил?
Спасибо заранее!
auto.offset.reset должен быть самым ранним, и он срабатывает, только когда нет потребителя group.id. без идентификатора группы невозможно сохранить смещения. если уже есть идентификатор группы потребителей auto.offset.reset, ничего не сделает, и по умолчанию потребитель выбирает из последнего зафиксированного смещения. – user1870400