2016-04-01 4 views
1

Я хочу реализовать функцию повторной попытки (когда есть проблема у потребителя) 3 раза, а затем сообщение должно перейти в другую очередь (обмен мертвыми буквами). Я настроил очередь/обмен, как показано нижецепочка консультаций в RabbitMQ Spring не работает

регулярного имя обмена сообщений: test_exchange очереди сообщений: test_queue test_queue связывается с test_exchange с маршрутизацией ключа test_queue

мертво имя письма обмена: test_dlq_exchange мертвой очереди сообщений: test_dlq_queue test_dlq_queue связываются с test_dlq_exchange с маршрутизацией ключа test_dlq_queue

в консоли RabbitMQ UI, я настроил "X-мертвую письмо-обмен" как "test_exchange"

Ниже приведен код для SimpleMessageListenerContainer

@Bean 
    SimpleMessageListenerContainer getMessageListenerContainer(){ 
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); 
    container.setConnectionFactory(connectionFactory); 
    container.setQueueNames("test_queue"); 

    MessageListenerAdapter adapter = new MessageListenerAdapter(); 
    //configured my message listener class 
    //configured Jackson2JsonMessageConverter as converter 
    container.setMessageListener(adapter); 
    container.setAdviceChain(new Advice[] {retryAdvice()} 
    return container; 
    } 

    //configuration for retryAdvice 

    @Bean 
    public MethodInterceptor retryAdvice{ 

     ExponentialBackOffPolicy backoffPolicy = new ExponentialBackOffPolicy(); 
     backoffPolicy.setInitialInterval(10); 
     backoffPolicy.setMaxInterval(1000); 
     backoffPolicy.setMultiplier(2); 
     RabbitTemplate retryTemplate = new RabbitTemplate(connectionFactory()); 
     retryTemplate.setQueue("test_dl_queue"); 
     return RetryInterceptorBuilder 
        .stateful() 
        .backOffPolicy(backoffPolicy) 
        .maxAttempts(3) 
        .recoverer(
         new RepublishMessageRecoverer 
         (retryTemplate,"test_dl_exchange","test_queue")).build(); 
    } 

Мой пользовательских сообщений Слушатель Сообщение инициативе POJO, который имеет только мой MessageObject.

Поскольку я использую stateful, я включил createMessageIds (true). В моем прослушивателе сообщений я снова вызываю метод targetobject. После запуска контейнера поток идет циклически.

ie Публикация сообщения в очередь -> Прослушиватель сообщений вызывает целевой метод на основе какого-то исключения -> снова публикует сообщение для очереди -> метод вызова ... и т.п. Он не подталкивает сообщение к обмену мертвой буквой/очереди и бесконечному циклу.

В журнале я вижу, как показано ниже

 o.s.r.i.StatefulRetryOperationsInterceptor - Executing proxied method in stateful retry public abstract void org.springframework.amqp.rabbitlistener.SimpleMessageListenerContainer$ContainerDelegate.invokeListener(Channel,Message) throws java.lang.Exception(55435ee) 

Может кто-нибудь помочь мне решить эту проблему?

ответ

0

Поскольку вы используете RepublishMessageRecoverer, настройка DLX/DLQ на брокер ничего не делает. Чтобы перенаправить на DLQ, вам понадобится RejectAndDontRequeueRecover - когда повторные попытки будут исчерпаны, сообщение будет отправлено DLX/DLQ.

Похоже, вы переиздание в ту же очередь

(retryTemplate,"test_dl_exchange","test_queue")) 

Следовательно, бесконечный цикл.

+0

Спасибо, Гэри. Он отлично работает после того, как я включил RejectAndDontRequeueRecoverer. Теперь, после достижения максимальных попыток повтора, он перемещает сообщение в DLQ. – Raja

Смежные вопросы