2016-08-19 7 views
1

установки Моего потока выглядит следующим образом:завершения транзакции не детерминированная

<int:channel id="channel1"> 
    <int:queue/> 
</int:channel> 
<int:channel id="channel2"> 
    <int:queue/> 
</int:channel> 

<int:chain id="chain1" input-channel="channel1" output-channel="channel2"> 
    <int:poller> 
     <int:transactional propagation="REQUIRES_NEW"/> 
    </int:poller> 
    <Authorzier/> 
    <JMS_put1/> 
    <DB_update_state1/> 
</int:chain> 

<int:chain id="chain2" input-channel="channel2" output-channel="nullChannel"> 
    <int:poller> 
     <int:transactional propagation="REQUIRES_NEW"/> 
    </int:poller> 
    <Transformer/> 
    <JMS_put2/> 
    <DB_update_state2/> 
</int:chain> 

В настоящее время в некоторых случаях сделки по chain2 завершается до сделки chain1 и я DB_state_1 в базе данных.

Как я могу заставить транзакцию chain1 завершить, прежде чем сообщение будет отправлено на output-channel цепи1?

Я знаю, что могу использовать TransactionSynchronization, который отправляет сообщение channel2 в afterCommit(), но я предполагаю, что должно быть более элегантное решение.

EDIT CURRENT WORKAROUND

@ServiceActivator 
public void sendToDestinationFlow(Message<?> message) { 
    TxSenderSyncer s = new TxSenderSyncer(message, this.channel, this.errorChannel); 
    TransactionSynchronizationManager.registerSynchronization(s); 
} 

private static class TxSenderSyncer implements TransactionSynchronization { 

    private Message<?> message; 
    private MessageChannel channel; 
    private MessageChannel errorChannel; 

    public TxSenderSyncer(Message<?> message, MessageChannel channel, MessageChannel errorChannel) { 
     this.message = message; 
     this.channel = channel; 
     this.errorChannel = errorChannel; 
    } 

    @Override 
    public void afterCompletion(int paramInt) { 
     if (paramInt == STATUS_ROLLED_BACK) { 
      errorChannel.send(MessageBuilder.withPayload(new MessagingException(message, "Transaction rolled back")).build()); 
     } else { 
      channel.send(message); 
     } 
    } 
} 

ответ

1

Ваша проблема, что у вас есть две очереди, которые обрабатываются на своих собственных Poller нитей.

Поэтому сообщение может быть отправлено на второе и обработано там еще до конца работы в потоке первой цепи.

Рассмотрите возможность использования <request-handler-advice-chain> с <tx:advice> для этого <DB_update_state1/>. В этом случае только обновление БД будет перенесено в TX. Посыл к output-channel вызывается после финиша работы на handleRequestMessage: http://docs.spring.io/spring-integration/docs/4.3.1.RELEASE/reference/html/messaging-endpoints-chapter.html#message-handler-advice-chain

EDIT

Мы сделали некоторое исследование, и это выглядит не так много выбора, если ваш QueueChannel ISN» т на основе транзакции MessageStore. В этом случае вы должны просто быть уверены, что часть «опроса» этого TX QueueChannel находится в режиме READ_COMMITED, где часть «send» также имеет границы TX. И даже мы отправляем сообщение в эту очередь до передачи TX, это будет недоступно для опроса до реального фиксации. Но, да, он работает только для транзакционных MessageStore, как JdbcChannelMessageStore.

Если вы не используете такие устройства, вы не должны расширять границы транзакций шире, чем ресурсы TX.

Чтобы обернуть эти ваши JMS и услуги DB в той же транзакции мы предлагаем конфиг так:

<service-activator ref="txGateway" input-channel="channel1" output-channel="channel2"> 
    <poller/> 
    <request-handler-advice-chain> 
     <tx:advice/> 
    </request-handler-advice-chain> 
</service-activator> 

<gateway id="txGateway" default-request-channel="txChain"/> 

<chain input-channel="txChain"> 
    <Authorzier/> 
    <JMS_put1/> 
    <DB_update_state1/> 
</chain> 
+0

Я редактировал мой пример. Обычно цепочка с poller, которая представляет транзакцию, имеет другие обработчики, некоторые из которых также являются транзакционными ресурсами. Поэтому мне нужна общая транзакция для цепочки, которая завершается до того, как сообщение будет отправлено в «выходной канал» цепочки. –

+0

Пожалуйста, изучите «EDIT» в моем ответе. –

+0

Это решение работает, но у нас есть другая проблема [см. Это сообщение] (http://stackoverflow.com/questions/39097422/dataaccessexception-not-wrapped-in-messagingexception). В настоящее время мы используем обходное решение, вставленное в вопрос в разделе «EDIT CURRENT WORKAROUND», и оценим ваше мнение по нему. –

Смежные вопросы