0

фонInstruct RabbitMQ для повторной передачи недоставленных сообщений периодически

Мы используем langohr взаимодействовать с RabbitMQ. Мы пробовали два разных подхода, чтобы позволить RabbitMQ повторно отправлять сообщения, которые еще не были должным образом обработаны нашим сервисом. Один из способов, который работает, - отправить basic.nack с requeue, установленным на true, но это будет отправить сообщение немедленно, пока служба не ответит basic.ack. Это немного проблематично, если служба, например, пытается сохранить сообщение в хранилище данных, которое в настоящее время недоступно (и некоторое время не работает). Было бы лучше, если бы мы просто получали недостоверные сообщения каждые 20 секунд или около того (то есть мы не делаем basic.ack или basic.nack, если хранилище данных не работает, мы просто оставляем сообщения в очереди). Мы попытались осуществить это с помощью ExecutorService которого суть реализуется следующим образом:

(let [chan (lch/open conn)] ; We create a new channel since channels in Langohr are not thread-safe 
    (log/info "Triggering \"recover\" for channel" chan) 
    (try 
     (lb/recover chan) 
     (catch Exception e (log/error "Failed to call recover" e)) 
     (finally (lch/close chan)))) 

К сожалению, это не похоже на работу (сообщения не просто повторно доставлены и остается в очереди). Если мы перезапустим службу, сообщения в очереди будут потребляться правильно. Однако у нас есть другие сервисы, которые реализованы с использованием spring-rabbitmq (на Java), и, похоже, они заботятся об этом из коробки. Я пробовал посмотреть в source code, чтобы выяснить, как они это делают, но пока мне этого еще не удалось.

Вопрос

Как вы заставляете RabbitMQ к (пере) доставлять сообщения в очереди периодически (предпочтительно с использованием Langohr)?

ответ

1

Я не уверен, что вы делаете со своими приложениями Spring AMQP, но для этого в RabbitMQ нет ничего встроенного.

Однако довольно легко настроить мертвую букву с использованием TTL для возврата к исходной очереди через некоторый период времени. См this answer примеры, ссылки и т.д.

EDIT

Однако Spring AMQP делает есть retry interceptor which can be configured to suspend the consumer thread for some period(s) during retry.

Отказоустойчивые повторные отклонения и требования; повторение попыток повторения попыток повторения попыток повторного запуска внутри страны и не взаимодействует с брокером во время повторных попыток.

+0

Спасибо, тогда мы должны ошибаться в весеннем amqp. Когда мы исследовали поведение в одной из наших производственных систем, и он обратился к нам за тем, что у него такое поведение, но мы скорее всего ошибаемся. Большое спасибо за разъяснение этого. Знаете ли вы, что Spring-rabbitmq просто делает nack с запросом или делает что-то более интересное? – Johan

+0

basicNack с требованием true/false в зависимости от того, хотите ли вы немедленно выполнить повтор или отклонить/отправить DLX/DLQ. Также см. Мое редактирование. –

1

См. this answer, в котором есть инструкции: мы вытаскиваем сообщение, nack помещает сообщение в очередь ожидания в течение N секунд, затем он выдает TTL из этой очереди и в другую очередь, которая помещает ее обратно в исходную очередь.

Для настройки потребовалось немного работы, но он отлично работает!

Смежные вопросы