2017-02-21 4 views
0

Я пытаюсь установить максимальное количество попыток из моего приложения на кролика-брокера. у меня повторных попыток перехватчик,Весной amqp кролик Максимальное количество попыток подключения к потребителю

@Bean 
public RetryOperationsInterceptor retryOperationsInterceptor() { 
    return RetryInterceptorBuilder.stateless() 
      .maxAttempts(CommonConstants.MAX_AMQP_RETRIES) 
      .backOffOptions(500, 2.0, 3000) 
      .build(); 
} 

и это используется внутри слушателя контейнера,

container.setAdviceChain(new Advice[]{retryOperationsInterceptor()}); 

Тем не менее, после нескольких повторных попыток, потребитель пытается установить соединение сначала в виде бесконечной петли,

2017-02-21 15:03:12.229 WARN 9292 --- [nsumerThread_92] o.s.a.r.l.SimpleMessageListenerContainer : Consumer raised exception, processing can restart if the connection factory supports it. Exception summary: org.springframework.amqp.AmqpConnectException: java.net.ConnectException: Connection refused: connect 
2017-02-21 15:03:12.229 INFO 9292 --- [nsumerThread_92] o.s.a.r.l.SimpleMessageListenerContainer : Restarting Consumer: tags=[{}], channel=null, acknowledgeMode=AUTO local queue size=0 
2017-02-21 15:03:13.245 WARN 9292 --- [nsumerThread_93] o.s.a.r.l.SimpleMessageListenerContainer : Consumer raised exception, processing can restart if the connection factory supports it. Exception summary: org.springframework.amqp.AmqpConnectException: java.net.ConnectException: Connection refused: connect 
2017-02-21 15:03:13.245 INFO 9292 --- [nsumerThread_93] o.s.a.r.l.SimpleMessageListenerContainer : Restarting Consumer: tags=[{}], channel=null, acknowledgeMode=AUTO local queue size=0 
2017-02-21 15:03:13.261 ERROR 9292 --- [nsumerThread_83] o.s.a.r.l.SimpleMessageListenerContainer : Failed to check/redeclare auto-delete queue(s). 

Я хочу, чтобы приложение потерпело неудачу и отказало из-за отсутствия возможности подключения к брокеру после ограничения MAX_RETRY #.

спасибо за помощь

EDIT

Как было предложено @ Artem-Биланом, я закончил с использованием Component

public class BrokerFailureEventListener implements ApplicationListener<ListenerContainerConsumerFailedEvent> 

В этом классе onApplicationEvent я подсчитал количество отказов и затем предпримите соответствующие действия.

В случае производителя стороны, это немного другой сценарий. Как пояснил @ artem-bilan, приложение должно будет заботиться о любых проблемах. Я исследовал с использованием netflix-hystrix и добавил метод fallback для производственного метода и пойдет по этому маршруту. Большое спасибо.

ответ

1

Ну, вы неправильно поняли container.setAdviceChain(new Advice[]{retryOperationsInterceptor()});. Это относится к бизнес-ошибкам при обработке сообщений:

Обработка бизнес-исключений, в отличие от ошибок протокола и отброшенных соединений, может потребоваться больше размышлений и некоторой настраиваемой конфигурации, особенно если используются транзакции и/или контейнеры. До 2.8.x, у RabbitMQ не было определения поведения мертвых букв, поэтому по умолчанию сообщение, которое отклонено или откатывается из-за бизнес-исключения, может быть повторно добавлено до бесконечности. Чтобы поставить лимит в клиенте на количество повторных поставок, одним из вариантов является StatefulRetryOperationsInterceptor в цепочке консультаций слушателя. Перехватчик может иметь обратный вызов восстановления, который реализует обычное действие мертвой буквы: все, что подходит для вашей конкретной среды.

В противоположность к:

В самом деле петли бесконечно пытается перезапустить потребителя, и только тогда, когда потребитель очень плохо себя действительно это будет отказаться. Один побочный эффект заключается в том, что если брокер не работает, когда контейнер запускается, он будет продолжать попытки до тех пор, пока соединение не будет установлено.

Что вам нужно ListenerContainerConsumerFailedEvent, который выделяется как:

private void logConsumerException(Throwable t) { 
     if (logger.isDebugEnabled() 
       || !(t instanceof AmqpConnectException || t instanceof ConsumerCancelledException)) { 
      logger.warn(
        "Consumer raised exception, processing can restart if the connection factory supports it", 
        t); 
     } 
     else { 
      logger.warn("Consumer raised exception, processing can restart if the connection factory supports it. " 
        + "Exception summary: " + t); 
     } 
     publishConsumerFailedEvent("Consumer raised exception, attempting restart", false, t); 
    } 

Таким образом, вы можете слушать тех событий и остановить приложение, когда достигается некоторое условие.

+0

ok great Artem. Я смог добавить слушателя и подсчитать неудачи там и выйти из приложения.Мне было интересно, есть ли способ отключить приложение, но прекратить обработку AMQP? то есть может быть остановлено все потребители или контейнер listenercontainer? Большое спасибо – user3169330

+0

Это неважно. Просто извлеките соответствующие компоненты из cxt и вызовите их 'stop()'. –

+0

спасибо Артем, имеет смысл, теперь все это на стороне потребителя, но что происходит с продюсером, использующим rabbitTemplate, какое событие выбрано или используется механизм повтора? Если бы я хотел остановить попытки или предотвратить бесконечные попытки? – user3169330

Смежные вопросы