Я хочу реализовать функцию повторной попытки (когда есть проблема у потребителя) 3 раза, а затем сообщение должно перейти в другую очередь (обмен мертвыми буквами). Я настроил очередь/обмен, как показано нижецепочка консультаций в RabbitMQ Spring не работает
регулярного имя обмена сообщений: test_exchange очереди сообщений: test_queue test_queue связывается с test_exchange с маршрутизацией ключа test_queue
мертво имя письма обмена: test_dlq_exchange мертвой очереди сообщений: test_dlq_queue test_dlq_queue связываются с test_dlq_exchange с маршрутизацией ключа test_dlq_queue
в консоли RabbitMQ UI, я настроил "X-мертвую письмо-обмен" как "test_exchange"
Ниже приведен код для SimpleMessageListenerContainer
@Bean
SimpleMessageListenerContainer getMessageListenerContainer(){
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames("test_queue");
MessageListenerAdapter adapter = new MessageListenerAdapter();
//configured my message listener class
//configured Jackson2JsonMessageConverter as converter
container.setMessageListener(adapter);
container.setAdviceChain(new Advice[] {retryAdvice()}
return container;
}
//configuration for retryAdvice
@Bean
public MethodInterceptor retryAdvice{
ExponentialBackOffPolicy backoffPolicy = new ExponentialBackOffPolicy();
backoffPolicy.setInitialInterval(10);
backoffPolicy.setMaxInterval(1000);
backoffPolicy.setMultiplier(2);
RabbitTemplate retryTemplate = new RabbitTemplate(connectionFactory());
retryTemplate.setQueue("test_dl_queue");
return RetryInterceptorBuilder
.stateful()
.backOffPolicy(backoffPolicy)
.maxAttempts(3)
.recoverer(
new RepublishMessageRecoverer
(retryTemplate,"test_dl_exchange","test_queue")).build();
}
Мой пользовательских сообщений Слушатель Сообщение инициативе POJO, который имеет только мой MessageObject.
Поскольку я использую stateful, я включил createMessageIds (true). В моем прослушивателе сообщений я снова вызываю метод targetobject. После запуска контейнера поток идет циклически.
ie Публикация сообщения в очередь -> Прослушиватель сообщений вызывает целевой метод на основе какого-то исключения -> снова публикует сообщение для очереди -> метод вызова ... и т.п. Он не подталкивает сообщение к обмену мертвой буквой/очереди и бесконечному циклу.
В журнале я вижу, как показано ниже
o.s.r.i.StatefulRetryOperationsInterceptor - Executing proxied method in stateful retry public abstract void org.springframework.amqp.rabbitlistener.SimpleMessageListenerContainer$ContainerDelegate.invokeListener(Channel,Message) throws java.lang.Exception(55435ee)
Может кто-нибудь помочь мне решить эту проблему?
Спасибо, Гэри. Он отлично работает после того, как я включил RejectAndDontRequeueRecoverer. Теперь, после достижения максимальных попыток повтора, он перемещает сообщение в DLQ. – Raja