2016-09-16 3 views
5

Я новичок в Spring AMQP. У меня есть приложение, которое является производителем, отправляющим сообщения другому приложению, являющемуся потребителем.Как использовать Ack или Nack весной AMQP

Как только потребитель получит сообщение, мы проверим достоверность данных.

Если данные правильные, у нас есть ACK, и сообщение должно быть удалено из очереди. Если данные являются неправильными, мы должны NACK (отрицательное подтверждение) данных, чтобы они были переупознаны в RabbitMQ.

я наткнулся

**factory.setDefaultRequeueRejected(false);** (Он не будет снова поставить сообщение на всех)

**factory.setDefaultRequeueRejected(true);** (Это будет снова поставить в сообщение, когда происходит исключение)

Но мой случай я признаю сообщение на основе при проверке. Затем он должен удалить сообщение. Если NACK затем запросит сообщение.

Я прочитал на веб-сайте RabbitMQ

AMQP спецификация определяет метод basic.reject, что позволяет клиентам отказаться от индивидуума, доставленные сообщения, инструктаж брокера либо отказаться от них или их снова поставить

Как для достижения вышеуказанного сценария? Пожалуйста, предоставьте мне несколько примеров.

Я попробовал небольшую программу

 logger.info("Job Queue Handler::::::::::" + new Date()); 
     try { 

     }catch(Exception e){ 

      logger.info("Activity Object Not Found Exception so message should be Re-queued the Message::::::::::::::"); 

     } 

     factory.setErrorHandler(new ConditionalRejectingErrorHandler(cause ->{ 
      return cause instanceof XMLException; 
     })); 

Сообщение не повторно очереди за исключением различного factory.setDefaultRequeueRejected (истинный)

09: 46: 38854 ERROR [STDERR] (SimpleAsyncTaskExecutor -1) org.activiti.engine.ActivitiObjectNotFoundException: никаких процессов, развернутых с ключом 'WF89012'

09: 46: 39102 ИНФО [com.example.bip.rabbitmq.handler.ErrorQueueHandler] (SimpleAsyncTaskExecutor-1) Получено из очереди ошибок: {ОШИБКА = Может не совершать JPA сделки; вложенное исключение javax.persistence.RollbackException: Сделка помечается как rollbackOnly}

ответ

4

См the documentation.

По умолчанию (с defaultRequeueRejected=true) контейнер будет вызывать сообщение (заставляя его удалять), если слушатель нормально выходит или отклоняет (и запрашивает) его, если слушатель выдает исключение.

Если слушатель (или обработчик ошибок) выдает AmqpRejectAndDontRequeueException поведение по умолчанию отменяется, и сообщение отбрасывается (или направляется в DLX/DLQ, если так настроено) - контейнер вызывает basicReject(false) вместо basicReject(true).

Итак, если ваша проверка не удалась, введите AmqpRejectAndDontRequeueException. Или настройте своего слушателя с помощью специального обработчика ошибок, чтобы преобразовать ваше исключение в AmqpRejectAndDontRequeueException.

Это описано в this answer.

Если вы действительно хотите, чтобы взять на себя ответственность за ACKing себя, установить режим квитирования для MANUAL и использовать ChannelAwareMessageListener или this technique, если вы используете @RabbitListener.

Но большинство людей просто позволяют контейнеру заботиться о вещах (как только они понимают, что происходит). Как правило, использование ручных ключей для особых случаев использования, например, отсрочки или раннего подтверждения.

EDIT

Был ошибка в ответе я указал вам (исправлено); вы должны посмотреть на причину ListenerExecutionFailedException. Я просто проверил это и работает, как ожидалось ...

@SpringBootApplication 
public class So39530787Application { 

    private static final String QUEUE = "So39530787"; 

    public static void main(String[] args) throws Exception { 
     ConfigurableApplicationContext context = SpringApplication.run(So39530787Application.class, args); 
     RabbitTemplate template = context.getBean(RabbitTemplate.class); 
     template.convertAndSend(QUEUE, "foo"); 
     template.convertAndSend(QUEUE, "bar"); 
     template.convertAndSend(QUEUE, "baz"); 
     So39530787Application bean = context.getBean(So39530787Application.class); 
     bean.latch.await(10, TimeUnit.SECONDS); 
     System.out.println("Expect 1 foo:" + bean.fooCount); 
     System.out.println("Expect 3 bar:" + bean.barCount); 
     System.out.println("Expect 1 baz:" + bean.bazCount); 
     context.close(); 
    } 

    @Bean 
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) { 
     SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); 
     factory.setConnectionFactory(connectionFactory); 
     factory.setErrorHandler(new ConditionalRejectingErrorHandler(
       t -> t instanceof ListenerExecutionFailedException && t.getCause() instanceof FooException)); 
     return factory; 
    } 

    @Bean 
    public Queue queue() { 
     return new Queue(QUEUE, false, false, true); 
    } 
    private int fooCount; 

    private int barCount; 

    private int bazCount; 

    private final CountDownLatch latch = new CountDownLatch(5); 

    @RabbitListener(queues = QUEUE) 
    public void handle(String in) throws Exception { 
     System.out.println(in); 
     latch.countDown(); 
     if ("foo".equals(in) && ++this.fooCount < 3) { 
      throw new FooException(); 
     } 
     else if ("bar".equals(in) && ++this.barCount < 3) { 
      throw new BarException(); 
     } 
     else if ("baz".equals(in)) { 
      this.bazCount++; 
     } 
    } 

    @SuppressWarnings("serial") 
    public static class FooException extends Exception { } 

    @SuppressWarnings("serial") 
    public static class BarException extends Exception { } 

} 

Результат:

Expect 1 foo:1 
Expect 3 bar:3 
Expect 1 baz:1 
+0

Спасибо за объяснение. Я попробовал какую-то программу. Не могли бы вы исправить ошибку? – Chandan

+0

Не имеет смысла всегда бросать 'AmqpRejectAndDontRequeueException' из обработчика ошибок; для этого просто установите 'default RequeueRejected = false'. Используйте технику в [этом ответе] (http://stackoverflow.com/questions/39509745/negative-acknowledgement-to-rabbitmq-queue-to-re-queue-the-message-using-spring/39511357#39511357), чтобы просто выбросьте его за «XMLException» - ваш слушатель должен повторно бросить его. –

+0

См. Изменение в моем ответе. –

Смежные вопросы