У меня есть случай, когда я читаю файл, конвертирую содержимое в String. Затем разделите строку на несколько полезных нагрузок и отправьте эти полезные нагрузки в очередь. Я хочу использовать JmsTransactionManager, чтобы все сообщения были отправлены или вообще не были отправлены.TransactionSynchronizationFactory в сочетании с JmsTransactionManager не работает
Когда TX успешно, я хочу переместить файл в папку Archive, иначе переместите его в папку Failed. Я прочитал, что я могу использовать transactionSynchronizationFactory для этого. Но в сочетании с JmsTransactionManager файл не перемещается. Если я использую PseudoTransactionManager, тогда файл перемещается, но я теряю свое JmsTransaction.
Я сделал упрощенную версию, чтобы воспроизвести проблему. (Содержание файла в этом случае представляет собой простой список разделенных запятыми значений.)
@Bean
public IntegrationFlow fileInboundAdaptor() {
return IntegrationFlows
.from(s -> s.file(new File(INBOUND_PATH))
.patternFilter("*.txt"),
e -> e.poller(Pollers.fixedDelay(5000)
.transactionSynchronizationFactory(transactionSynchronizationFactory())
.transactional(new JmsTransactionManager(connectionFactory))
)
)
.transform(Transformers.fileToString())
.split(s -> s.applySequence(false).get().getT2().setDelimiters(","))
.handle((GenericHandler<String>) (payload, headers) -> {
jmsTemplate.send("SOME_QUEUE", (Session session) -> session.createTextMessage(payload));
return payload;
})
.channel(MessageChannels.queue("fileReadingResultChannel"))
.get();
}
transactionSynchronizationFactory выглядит следующим образом:
@Bean
public TransactionSynchronizationFactory transactionSynchronizationFactory() {
ExpressionParser parser = new SpelExpressionParser();
ExpressionEvaluatingTransactionSynchronizationProcessor syncProcessor
= new ExpressionEvaluatingTransactionSynchronizationProcessor();
syncProcessor.setBeanFactory(applicationContext.getAutowireCapableBeanFactory());
syncProcessor.setAfterCommitExpression(parser.parseExpression(
"payload.renameTo(new java.io.File('test/archive' " +
" + T(java.io.File).separator + 'ARCHIVE-' + payload.name))"));
syncProcessor.setAfterRollbackExpression(parser.parseExpression(
"payload.renameTo(new java.io.File('test/fail' " +
" + T(java.io.File).separator + 'FAILED-' + payload.name))"));
return new DefaultTransactionSynchronizationFactory(syncProcessor);
}
Так что мой вопрос: действительно TransactionSynchronizationFactory работает только с PseudoTransactionManager или должен работать с JmsTransactionManager?
Решение
мне нужно установить transactionSynchronization на JmsTransaction. Что-то вроде этого:
public JmsTransactionManager transactionManager() {
JmsTransactionManager jmsTransactionManager = new JmsTransactionManager(connectionFactory);
jmsTransactionManager.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
return jmsTransactionManager;
}
попытаться сделать что 'JmsTransactionManager' как '@ Bean'. Он не зарегистрирован в ApplicationContext из этой опции .transactional() ' –
Это не проблема. Я добавил решение в свой первоначальный пост. –