2016-07-07 4 views
1

У меня возникли проблемы с определением способа задержки уровня сообщений в SpringAMQP. Я вызываю Webservice, если служба недоступна или если она генерирует некоторое исключение, я сохраняю все запросы в очередь RabbitMQ, и я продолжаю повторять вызов службы до тех пор, пока он не будет успешно выполнен. Если служба продолжает выдавать ошибку или ее недоступно, слушатель rabbitMQ продолжает цикл. (Значение Listener получает сообщение и выполняет служебный вызов, если какая-либо ошибка переупорядочивает сообщение)SpringAMQP delay

Я ограничил цикл до X часов с помощью MessagePostProcessor однако я хотел включить задержку на уровне сообщений и каждый раз, когда он пытается получить доступ к сервису. Например, 1-я попытка задержки 3000 мс и второе время 6000 мс, пока я не попробую х количество раз.

Было бы здорово, если бы вы предоставили несколько примеров.

Не могли бы вы предоставить мне некоторое представление об этом?

ответ

1
I added CustomeMessage delay exchange 

    @Bean 
    CustomExchange delayExchange() { 
     Map<String, Object> args = new HashMap<>(); 
     args.put("x-delayed-type", "direct"); 
     return new CustomExchange("delayed-exchange", "x-delayed-message", true, false, args); 
    } 

Added MessagePostProcessor 

    if (message.getMessageProperties().getHeaders().get("x-delay") == null) { 
      message.getMessageProperties().setHeader("x-delay", 10000); 
     } else { 

      Integer integer = (Integer) message.getMessageProperties().getHeaders().get("x-delay"); 
      if (integer < 60000) { 
       integer = integer + 10000; 
       message.getMessageProperties().setHeader("x-delay", integer); 
      } 
     } 

First time it delays 30 seconds and adds 10seconds each time till it reaches 600 seconds.This should be configurable. 

And finally send the message to 

    rabbitTemplate.convertAndSend("delayed-exchange", queueName,message, rabbitMQMessagePostProcessor); 
+0

Ok. Выглядит интересно, но я нахожу это как накладные расходы, чтобы совершить полное путешествие к брокеру только для повторных попыток ... –

+0

Да, но таким образом мы не теряем никаких сообщений, и повторение продолжается до тех пор, пока не пройдет x минут, а затем переместите сообщение на другая очередь. – sea

+1

Мы также не теряем их с помощью Spring Retry, просто потому, что мы не признаем это на брокере. И да, мы можем отправить его в другую очередь. См. Документы, которые я указал в своем ответе. –

1

Ну, это невозможно, как вы это делаете.

Повторная очередь сообщений полностью похожа на транзакционную транзакцию, где система возвращается к состоянию перед исключением. Таким образом, вы не можете изменить сообщение, чтобы вернуться в очередь.

Возможно, вам нужно взглянуть на проект Spring Retry по той же причине и опросить сообщение из очереди только один раз и повторить попытку в памяти до тех пор, пока успешный ответ или повторная политика не будут исчерпаны. В конце вы можете просто отправить сообщение из очереди или переместить его в DLQ.

Дополнительная информация в Reference Manual.

+0

Hi Artem, Спасибо за ответ. Я достиг этого с помощью MessagePostProcessor и CustomExchange. @Bean CustomExchange delayExchange() { Карта args = new HashMap <>(); args.put ("x-delayed-type", "direct"); возвращает новый CustomExchange («cos-delayed-exchange», «x-delayed-message», true, false, args); } – sea

+0

Хорошо. Если вы найдете его хорошим, вы можете поделиться им здесь как ответом на свой вопрос. Просто напишите мне здесь в комментарии, чтобы обратить внимание на это: –

+0

if (message.getMessageProperties(). GetHeaders(). Get ("x-delay") == null) { message.getMessageProperties(). SetHeader ("x- задержка ", 10000); } else { Integer integer = (Integer) message.getMessageProperties(). GetHeaders(). Get ("x-delay"); if (integer <60000) { integer = integer + 10000; message.getMessageProperties(). SetHeader ("x-delay", integer); } } – sea