Если этот код работает в потоке контейнера слушателя (onMessage()
или @RabbitListener
), а в контейнере и шаблоне есть setChannelTransacted(true)
, то публикация (и доставка) будет выполняться в той же транзакции; выброс исключений заставит все откатываться.
Если это в какой-то произвольный класс Java (не работает на контейнерном потоке), то вам нужно начать транзакцию перед запуском метод ...
@Transactional
public void send(String in) {
this.template.convertAndSend("foo", in);
if (in.equals("foo")) {
throw new RuntimeException("test");
}
this.template.convertAndSend("bar", in);
}
Вот Загрузочное приложение полный Spring, который демонстрирует особенность ...
@SpringBootApplication
@EnableTransactionManagement
public class So40749877Application {
public static void main(String[] args) {
ConfigurableApplicationContext context = SpringApplication.run(So40749877Application.class, args);
Foo foo = context.getBean(Foo.class);
try {
foo.send("foo");
}
catch (Exception e) {}
foo.send("bar");
RabbitTemplate template = context.getBean(RabbitTemplate.class);
// should not get any foos...
System.out.println(template.receiveAndConvert("foo", 10_000));
System.out.println(template.receiveAndConvert("bar", 10_000));
// should be null
System.out.println(template.receiveAndConvert("foo", 0));
RabbitAdmin admin = context.getBean(RabbitAdmin.class);
admin.deleteQueue("foo");
admin.deleteQueue("bar");
context.close();
}
@Bean
public RabbitTemplate amqpTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setChannelTransacted(true);
return rabbitTemplate;
}
@Bean
public Queue foo() {
return new Queue("foo");
}
@Bean
public Queue bar() {
return new Queue("bar");
}
@Bean
public Foo fooBean() {
return new Foo();
}
@Bean
public PlatformTransactionManager transactionManager(ConnectionFactory connectionFactory) {
return new RabbitTransactionManager(connectionFactory);
}
public static class Foo {
@Autowired
private RabbitTemplate template;
@Transactional
public void send(String in) {
this.template.convertAndSend("foo", in);
if (in.equals("foo")) {
throw new RuntimeException("test");
}
this.template.convertAndSend("bar", in);
}
}
}
EDIT
Операции на стороне потребителя; это обычно не применяется при использовании Spring, поскольку он управляет сделки, но при использовании клиента непосредственно ...
Connection connection = cf.createConnection();
Channel channel = connection.createChannel(true);
channel.basicQos(1);
channel.txSelect();
CountDownLatch latch = new CountDownLatch(1);
channel.basicConsume("foo", new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body) throws IOException {
System.out.println(new String(body));
getChannel().txRollback(); // delivery won't be requeued; remains unacked
if (envelope.isRedeliver()) {
getChannel().basicAck(envelope.getDeliveryTag(), false);
getChannel().txCommit(); // commit the ack so the message is removed
getChannel().basicCancel(consumerTag);
latch.countDown();
}
else { // first time, let's requeue
getChannel().basicReject(envelope.getDeliveryTag(), true);
getChannel().txCommit(); // commit the reject so the message will be requeued
}
}
});
latch.await();
channel.close();
connection.close();
Обратите внимание на txRollback
ничего не делает в этом случае; только ack (или отклонение) являются транзакционными.
Thansk Gary, я сделал то же самое, но пропустил @EnableTransactionManagement, теперь он работает. Хотя в этом методе я использую amqpAdmin для объявления очереди (я должен сделать это, я знаю, что это не хорошо), сейчас его не будет отката, хотя, хотя его не biggie, есть способ откат тоже! – user3444718
Нет; изменения инфраструктуры не участвуют в транзакциях - см. [здесь] (https://www.rabbitmq.com/semantics.html) для семантики транзакций rabbitmq. –
еще раз спасибо. это помогает. – user3444718