6

У меня возникли серьезные проблемы с попыткой реализовать решение для моих нужд в отношении KafkaConsumer (> = 0,9).Kafka Consumer - поведение опроса

Предположим, у меня есть функция, которая должна читать только n сообщений с темы kafka.

Например: getMsgs(5) ->получает следующие 5 сообщений kafka в теме.

Итак, у меня есть цикл, который выглядит следующим образом:

for (boolean exit= false;!exit;) 
{ 
    Records = consumer.poll(200); 
    for (Record r:records) { 
     processRecord(r); //do my things 
     numMss++; 
     if (numMss==maximum) //maximum=5 
      exit=true; 
    } 
} 

Принимая это во внимание, то проблема заключается в том, что метод опроса() может получить более 5 сообщений. Например, если он получает 10 сообщений, мой код забудет навсегда эти другие 5 сообщений, так как Кафка подумает, что они уже потреблены.

Я попробовал смещение совершал, но не похожа на работу:

consumer.commitSync(Collections.singletonMap(partition, 
    new OffsetAndMetadata(record.offset() + 1))); 

Даже с смещением конфигурации, когда я снова запустить потребитель, он не будет начинаться с 6-го сообщения (помните, я просто хотел 5 сообщений), но из 11th (так как в первом опросе было 10 сообщений).

Есть ли какое-либо решение для этого, или, может быть, (самое главное), я что-то упустил?

Спасибо заранее!

ответ

3

Вы можете установить max.poll.records в независимо от того, что вам нравится, что в большинстве случаев вы получите столько записей в каждом опросе.

Для вашего варианта использования, который вы указали в этой проблеме, вам не нужно явно выполнять смещения самостоятельно. вы можете просто установить enable.auto.commit в true и установить auto.offset.reset в earliest таким образом, что он будет срабатывать, когда нет потребителя group.id (другими словами, когда вы собираетесь начать чтение из раздела в первый раз). Когда у вас есть группа.id и некоторые потребительские смещения, хранящиеся в Kafka, и в случае, если ваш потребительский процесс Kafka умирает, он будет продолжаться с последнего зафиксированного смещения, поскольку это поведение по умолчанию, потому что, когда потребитель начинает, он сначала ищет, если есть какие-либо совершенные смещения, и если да, будет продолжаться с последнего зафиксированного смещения, а auto.offset.resetне будет.

0

set auto.offset.reset собственность как «последняя». Затем попробуйте использовать, вы получите потребляемые записи из зафиксированного смещения.

Или вы используете user.seek (TopicPartition, offset) api перед опросом.

+0

auto.offset.reset должен быть самым ранним, и он срабатывает, только когда нет потребителя group.id. без идентификатора группы невозможно сохранить смещения. если уже есть идентификатор группы потребителей auto.offset.reset, ничего не сделает, и по умолчанию потребитель выбирает из последнего зафиксированного смещения. – user1870400

0

Если вы отключили автоматическую фиксацию, установив enable.auto.commit на false. Вам нужно отключить это, если вы хотите вручную зафиксировать смещение. Без этого следующего вызова poll() будет автоматически зафиксировано последнее смещение сообщений, полученных вами из предыдущего опроса().

0

Из Kafka 0.9 изменены имена параметров auto.offset.reset;

Что делать, если нет смещения в Кафки, или если текущее смещение не существует больше на сервере (например, потому, что данные были удалены) начальная:

earliest: automatically reset the offset to the earliest offset 

latest: automatically reset the offset to the latest offset 

none: throw exception to the consumer if no previous offset is found for the consumer's group 

anything else: throw exception to the consumer. 
Смежные вопросы