Я играл с spring-cloud-stream (1.0.0.BUILD-SNAPSHOT с вяжущим kafka) и заметил, что с пользователем в автономном режиме любые отправленные сообщения теряются. Когда я запускаю потребитель, он не обрабатывает отставание запросов, которые были отправлены в kafka. Это намеренно?spring-cloud-stream kafka offline Сообщения потребителя потеряны
ответ
Мы, безусловно, должны улучшить нашу документацию, но вот некоторые указатели в то же время.
Если вы хотите, чтобы потребитель обрабатывал сообщения, созданные во время их остановки, вам необходимо указать имя группы потребителей, например. spring.cloud.stream.bindings.<bindingName>.group=foo
. Когда указывается группа потребителей, приложение запускает либо а) последнее непросвещенное сообщение, если клиент с той же группой потребителей уже запущен (т. Е. Мы записали расходуемые смещения для этого потребителя), либо b) значение, указанное spring.cloud.stream.binder.kafka.start-offset
(которое может быть earliest
или latest
, представляющий начало или конец темы). Поэтому перезапуск потребителей, которые сохраняют потребительскую группу, будет потреблять с того места, где они ушли, а новые потребители начнут действовать в соответствии с параметром start. Если группа не указана, то потребитель будет считаться «анонимным», и его будут интересовать только сообщения, созданные после его запуска, поэтому он всегда будет начинаться в конце набора разделов.
Если вы хотите обойти уже сохраненное значение, вы можете использовать spring.cloud.stream.binder.kafka.reset-offets=true
, что заставит клиента сбросить сохраненные смещения и начать с значения, указанного spring.cloud.stream.binder.kafka.start-offset
.
Это отражает поведение, ожидаемое (и поддерживаемое) 0.8.2. Мы будем обновлять вещи соответственно после того, как мы обновим до 0,9.
Да; по умолчанию начинается прослушивание с конца темы.
Использование
spring:
cloud:
stream:
binder:
kafka:
start-offset: earliest
Будет ли эта настройка повторно обрабатывать старые сообщения comsumer после перезапуска? – gadams00
YMMV, в зависимости от версии кафки; Я тестировал с 0.9.0.1, а «самый ранний» начинается с первого непрочитанного сообщения. Я остановил приложение; опубликовал несколько сообщений, затем перезапустил приложение, и я получил только новые. Затем я запустил консольного пользователя (с '--from-begin') и получил их все (включая те, что были в предыдущем приложении). В настоящее время связующее использует клиент 0.8.x.x, который работает с 0.9.x.x; мы работаем над обновлением до 'spring-integration-kafka' до 0.9.x.x (и его чистого java-клиента) и вскоре после этого переработаем связующее kafka, чтобы использовать его. Это обеспечит гораздо большую гибкость. –
- 1. Недостающие записи потребителя Kafka
- 2. Потребляйте сообщения, не отправляясь с Kafka 10 потребителя
- 3. InstanceAlreadyExistsException, исходящий от потребителя kafka
- 4. Ограничить количество записей потребителя Kafka
- 5. Запуск потребителя kafka (новый потребительский API) навсегда
- 6. Невозможно скомпилировать потребителя Kafka в Scala
- 7. Производитель консоли Kafka потеряет сообщения
- 8. Kafka Spring Integration: заголовки не подходят для потребителя kafka
- 9. FirebaseMessaging Android некоторые сообщения потеряны
- 10. не может получить образец производителя/потребителя kafka
- 11. Выбрав смещение от потребителя высокого уровня kafka
- 12. Реализация потребителя kafka в spark-1.0
- 13. мультиплексирование потребителя и производителя в kafka
- 14. Эквивалентное свойство `num.consumer.fetchers` для нового потребителя kafka
- 15. Поведение потребителя Kafka для несуществующего смещения
- 16. Как отправить OffsetCommitRequest для потребителя в Kafka?
- 17. Kafka Продюсер и информация для потребителя
- 18. spring-cloud-stream-kafka не почитание одного потребителя для группы
- 19. SpringCloudStream - AggregateApplicationBuilder ожидает брокер
- 20. SpringCloudStream-HttpSource возвращается 404
- 21. может использовать kafka broker ip у потребителя или производителя?
- 22. Ошибка отправки сообщения Kafka
- 23. Производитель Kafka пропускает сообщения
- 24. Missing Offline Сообщения asmack (Android)
- 25. Как гарантировать, что сообщения не потеряны с использованием пружины KafkaMessageListenerContainer.java
- 26. Golang Kafka не топил все сообщения offsetnewest
- 27. Как управлять фиксацией потребляемого сообщения kafka с использованием kafka-node
- 28. asp.net mvc - Данные потеряны после сообщения
- 29. сообщения потеряны при использовании Java сокеты readUTF
- 30. Akka - Измерение времени потребителя
Спасибо @ marius-bogoevici - Сегодня я вижу разные результаты вчера - должно быть, был пилотный обман - мой потребитель надежно запускается при первом непросвещенном сообщении. и 'start-offset = earlyliest' вместе с' reset-offsets' дает мне все сообщения, сохраненные в брокере. –