2016-05-17 4 views
1

Я просмотрел документацию для spring-cloud-stream 1.0.0.RELEASE и не могу найти документацию об обработке ошибок.Ошибка обработки кафки spring-cloud-stream

Основываясь на наблюдении с помощью kafka 0.9, если мой потребитель выбрасывает исключение RuntimeException, я вижу 3 попытки. После 3 попыток, я вижу это в журналах:

2016-05-17 09:35:59.216 ERROR 8983 --- [ kafka-binder-] o.s.i.k.listener.LoggingErrorHandler  : Error while processing: KafkaMessage [Message(magic = 0, attributes = 0, crc = 3731457175, key = null, payload = java.nio.HeapByteBuffer[pos=0 lim=130 cap=130]), KafkaMessageMetadata [offset=2, nextOffset=3, Partition[topic='reservation', id=1]] 

org.springframework.messaging.MessagingException: Exception thrown while invoking demo.sink.ReservationConsumer#handleReservation[1 args]; nested exception is java.lang.RuntimeException: no message 

в этой точке, потребитель смещения лаг 1, и если я перезагружать потребитель, сообщение повторяется снова 3 раза. Однако, если я затем отправлю другое сообщение в тот же раздел, чтобы потребитель не выдавал исключение, потребительское смещение обновляется, и исходное сообщение, которое мы выбрали для исключения, больше не будет повторено после перезапуска.

Является ли это документированным где-то, что я не нашел? Является ли обработка ошибок специфичной для связывания или абстрагированием s-c-s, которая должна быть согласованной между связующими? Я подозреваю, что это незапланированное последствие того, как корректировки потребительских изменений обновляются с помощью связки kafka. Я вижу, что добавлена ​​потребительская собственность enableDlq kafka, и я собираюсь протестировать ее, но я не уверен, как мы можем иметь дело с мертвыми буквами в кафке. Я знаком с очередями с мертвой буквой в rabbitmq, но с помощью rabbitmq мы можем использовать плагин для кролика для повторной публикации и повторения сообщений dlq, чтобы охватить случаи, когда отказ произошел из-за временного отключения службы. Я не знаю о какой-либо подобной функциональности, доступной для kafka, не написав подобную полезность самостоятельно.

ОБНОВЛЕНИЕ. Тестирование с включенным потребительским имуществом enableDlq kafka показывает ту же проблему смещения пользователя с обработкой ошибок. Когда потребитель бросает RuntimeException, я вижу 3 повтора, после чего сообщение об ошибке не регистрируется, и я вижу сообщение, опубликованное в error.<destination>.<group>, как задокументировано, но потребительское смещение не обновляется и отстает на 1. Если я перезапущу пользователя, он пытается снова обработать одно и то же сообщение об ошибке из исходного раздела темы, повторяет три раза и снова помещает то же сообщение в error.<destination>.<group> (дублирующее сообщение dlq). Если я опубликую другое сообщение в том же разделе темы, для которого потребитель не выбрасывает исключение RuntimeException, смещение обновляется, а исходное сообщение об ошибке больше не повторяется при перезагрузке.

Я думаю, что потребитель должен обновлять потребительское смещение в kafka, когда потребитель выдает ошибку, независимо от того, является ли enableDlq истинным или нет. Это, по крайней мере, сделало бы его последовательным, чтобы сообщение, которое не удалило попытки повторения, либо отбрасывается (когда enableDlq ложно), либо публикуется в dlq и никогда не повторится (когда enableDlq истинно).

ответ

1

Похож на ошибку для меня - контейнер-слушатель имеет свойство autoCommitOnError (false по умолчанию), которое не подвергается (или не устанавливается) связующим. После вызова обработчика ошибок (который публикует ошибку), если логическое значение true, выполняется смещение.

Пожалуйста, сообщите об этом как о проблеме в github.

+0

благодарит за подтверждение. https://github.com/spring-cloud/spring-cloud-stream/issues/542 – gadams00

Смежные вопросы