2016-11-22 2 views
0

Возможно ли запустить код под транзакцией в транзакции, поэтому, если в бизнес-обработке выбрано исключение, мы можем отбросить сообщение, отправленное в очередь?RabbitMQ отправка сообщения в транзакции

rabbitTemplate.convertAndSend("queue1", data); 

//do some processing 

rabbitTemplate.convertAndSend("queue2", data); 

Потребность это то, что если что-то пошло не так после отправки сообщения queueq, но мы не можем отправить сообщение queue2. Или что делать, если проблема в сети или какая-то другая проблема при отправке сообщения в очередь.

ответ

1

Если этот код работает в потоке контейнера слушателя (onMessage() или @RabbitListener), а в контейнере и шаблоне есть setChannelTransacted(true), то публикация (и доставка) будет выполняться в той же транзакции; выброс исключений заставит все откатываться.

Если это в какой-то произвольный класс Java (не работает на контейнерном потоке), то вам нужно начать транзакцию перед запуском метод ...

@Transactional 
    public void send(String in) { 
     this.template.convertAndSend("foo", in); 
     if (in.equals("foo")) { 
      throw new RuntimeException("test"); 
     } 
     this.template.convertAndSend("bar", in); 
    } 

Вот Загрузочное приложение полный Spring, который демонстрирует особенность ...

@SpringBootApplication 
@EnableTransactionManagement 
public class So40749877Application { 

    public static void main(String[] args) { 
     ConfigurableApplicationContext context = SpringApplication.run(So40749877Application.class, args); 
     Foo foo = context.getBean(Foo.class); 
     try { 
      foo.send("foo"); 
     } 
     catch (Exception e) {} 
     foo.send("bar"); 
     RabbitTemplate template = context.getBean(RabbitTemplate.class); 
     // should not get any foos... 
     System.out.println(template.receiveAndConvert("foo", 10_000)); 
     System.out.println(template.receiveAndConvert("bar", 10_000)); 
     // should be null 
     System.out.println(template.receiveAndConvert("foo", 0)); 
     RabbitAdmin admin = context.getBean(RabbitAdmin.class); 
     admin.deleteQueue("foo"); 
     admin.deleteQueue("bar"); 
     context.close(); 
    } 

    @Bean 
    public RabbitTemplate amqpTemplate(ConnectionFactory connectionFactory) { 
     RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); 
     rabbitTemplate.setChannelTransacted(true); 
     return rabbitTemplate; 
    } 

    @Bean 
    public Queue foo() { 
     return new Queue("foo"); 
    } 

    @Bean 
    public Queue bar() { 
     return new Queue("bar"); 
    } 

    @Bean 
    public Foo fooBean() { 
     return new Foo(); 
    } 

    @Bean 
    public PlatformTransactionManager transactionManager(ConnectionFactory connectionFactory) { 
     return new RabbitTransactionManager(connectionFactory); 
    } 

    public static class Foo { 

     @Autowired 
     private RabbitTemplate template; 

     @Transactional 
     public void send(String in) { 
      this.template.convertAndSend("foo", in); 
      if (in.equals("foo")) { 
       throw new RuntimeException("test"); 
      } 
      this.template.convertAndSend("bar", in); 
     } 

    } 

} 

EDIT

Операции на стороне потребителя; это обычно не применяется при использовании Spring, поскольку он управляет сделки, но при использовании клиента непосредственно ...

Connection connection = cf.createConnection(); 
Channel channel = connection.createChannel(true); 
channel.basicQos(1); 
channel.txSelect(); 
CountDownLatch latch = new CountDownLatch(1); 
channel.basicConsume("foo", new DefaultConsumer(channel) { 

    @Override 
    public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, 
      byte[] body) throws IOException { 
     System.out.println(new String(body)); 

     getChannel().txRollback(); // delivery won't be requeued; remains unacked 

     if (envelope.isRedeliver()) { 
      getChannel().basicAck(envelope.getDeliveryTag(), false); 
      getChannel().txCommit(); // commit the ack so the message is removed 
      getChannel().basicCancel(consumerTag); 
      latch.countDown(); 
     } 
     else { // first time, let's requeue 
      getChannel().basicReject(envelope.getDeliveryTag(), true); 
      getChannel().txCommit(); // commit the reject so the message will be requeued 
     } 
    } 

}); 
latch.await(); 
channel.close(); 
connection.close(); 

Обратите внимание на txRollback ничего не делает в этом случае; только ack (или отклонение) являются транзакционными.

+0

Thansk Gary, я сделал то же самое, но пропустил @EnableTransactionManagement, теперь он работает. Хотя в этом методе я использую amqpAdmin для объявления очереди (я должен сделать это, я знаю, что это не хорошо), сейчас его не будет отката, хотя, хотя его не biggie, есть способ откат тоже! – user3444718

+0

Нет; изменения инфраструктуры не участвуют в транзакциях - см. [здесь] (https://www.rabbitmq.com/semantics.html) для семантики транзакций rabbitmq. –

+0

еще раз спасибо. это помогает. – user3444718

Смежные вопросы