2016-02-16 2 views
1

EDIT: Я также наблюдаю то же поведение с Kafka 9 Consumer API.Kafka 8.2.2 Динамическая тема отменяет первое событие

У меня есть простой производитель Kafaka 8.2.2 с включенным свойством создания темы true. Он создаст новую тему, когда будет создано событие с несуществующим тегом, но событие, которое создает эту тему, не заканчивается в Kafka, а возвращенная запись RecordMetadata не имеет ошибок.

public void receiveEvent(@RequestBody EventWrapper events) throws InterruptedException, ExecutionException, TimeoutException { 
    log.info("Sending " + events.getEvents().size() + " Events "); 
    for (Event event : events.getEvents()) { 
     log.info("Sending Event - " + event); 
     ProducerRecord<String, String> record = new ProducerRecord<>(event.getTopic(), event.getData()); 

     Future<RecordMetadata> ack = eventProducer.send(record); 
     log.info("ACK - " + ack.get()); 
    } 

    log.info("SENT!"); 
} 

У меня есть программа, опросы на новые темы (я не был счастлив с динамическим/регулярное выражение темы кода в Кафки 8) и он находит новую очередь и подписывается, и он действительно видит последующие события, но никогда это первое событие.

Я также попробовал скрипт kafka-console-consumer, и он видит то же самое. Первое событие никогда не было видно, после чего события начинают течь.

Идеи?

ответ

0

Оказывается, есть свойство, которое вы можете установить props.put ("auto.offset.reset", "самое раннее");

И после этого потребитель получает первое событие, поставленное на эту тему.

Смежные вопросы