2016-03-06 9 views
3

Я играл с spring-cloud-stream (1.0.0.BUILD-SNAPSHOT с вяжущим kafka) и заметил, что с пользователем в автономном режиме любые отправленные сообщения теряются. Когда я запускаю потребитель, он не обрабатывает отставание запросов, которые были отправлены в kafka. Это намеренно?spring-cloud-stream kafka offline Сообщения потребителя потеряны

ответ

3

Мы, безусловно, должны улучшить нашу документацию, но вот некоторые указатели в то же время.

Если вы хотите, чтобы потребитель обрабатывал сообщения, созданные во время их остановки, вам необходимо указать имя группы потребителей, например. spring.cloud.stream.bindings.<bindingName>.group=foo. Когда указывается группа потребителей, приложение запускает либо а) последнее непросвещенное сообщение, если клиент с той же группой потребителей уже запущен (т. Е. Мы записали расходуемые смещения для этого потребителя), либо b) значение, указанное spring.cloud.stream.binder.kafka.start-offset (которое может быть earliest или latest, представляющий начало или конец темы). Поэтому перезапуск потребителей, которые сохраняют потребительскую группу, будет потреблять с того места, где они ушли, а новые потребители начнут действовать в соответствии с параметром start. Если группа не указана, то потребитель будет считаться «анонимным», и его будут интересовать только сообщения, созданные после его запуска, поэтому он всегда будет начинаться в конце набора разделов.

Если вы хотите обойти уже сохраненное значение, вы можете использовать spring.cloud.stream.binder.kafka.reset-offets=true, что заставит клиента сбросить сохраненные смещения и начать с значения, указанного spring.cloud.stream.binder.kafka.start-offset.

Это отражает поведение, ожидаемое (и поддерживаемое) 0.8.2. Мы будем обновлять вещи соответственно после того, как мы обновим до 0,9.

+0

Спасибо @ marius-bogoevici - Сегодня я вижу разные результаты вчера - должно быть, был пилотный обман - мой потребитель надежно запускается при первом непросвещенном сообщении. и 'start-offset = earlyliest' вместе с' reset-offsets' дает мне все сообщения, сохраненные в брокере. –

0

Да; по умолчанию начинается прослушивание с конца темы.

Использование

spring: 
    cloud: 
    stream: 
     binder: 
     kafka: 
      start-offset: earliest 
+0

Будет ли эта настройка повторно обрабатывать старые сообщения comsumer после перезапуска? – gadams00

+0

YMMV, в зависимости от версии кафки; Я тестировал с 0.9.0.1, а «самый ранний» начинается с первого непрочитанного сообщения. Я остановил приложение; опубликовал несколько сообщений, затем перезапустил приложение, и я получил только новые. Затем я запустил консольного пользователя (с '--from-begin') и получил их все (включая те, что были в предыдущем приложении). В настоящее время связующее использует клиент 0.8.x.x, который работает с 0.9.x.x; мы работаем над обновлением до 'spring-integration-kafka' до 0.9.x.x (и его чистого java-клиента) и вскоре после этого переработаем связующее kafka, чтобы использовать его. Это обеспечит гораздо большую гибкость. –

Смежные вопросы