2014-09-11 3 views
1

Это мой первый пост, так что, надеюсь, я не испортил форматирование слишком плохо.Нет возврата к откату транзакции xa activemq + spring-integration + spring-jms

У меня есть пружинная интеграция маршрут, который направляет сообщение следующим образом:

Gateway -> маршрутизатор -> исходящего JMS-канал -> управляемых сообщений-канал -> Сервис-активатор

I» m, используя версию интеграции весны 4.0.3 и activemq версии 5.10.0. В этом конкретном случае я запускаюсь в контейнере weblogic (хотя это не всегда так).

Маршрут отправляет сообщение JMS в очередь activemq. Если во время выполнения прослушивателя я столкнулся с исключением во время выполнения, я ожидаю, что произойдет откат любых транзакционных ресурсов, зачисленных в прослушиватель, и чтобы сообщение было повторно добавлено. В настоящее время я вижу откат, но не перенаправляет сообщение.

Моя конфигурация весной выглядит следующим образом:

<!-- TaskExecutor is defined elsewhere and is our own object, it's effectively the equivalent of calling Executors.newCachedThreadPool(). --> 

<bean id="TransactionManager" class="org.springframework.transaction.jta.WebLogicJtaTransactionManager"/> 

<bean id="XADataSource" class="org.springframework.jndi.JndiObjectFactoryBean"> 
    <property name="jndiName" value="${xaJndiName}"/> 
</bean> 

<amq:broker brokerName="messageBroker" useJmx="true" persistent="true" start="true" schedulerSupport="false"> 
    <amq:transportConnectors> 
     <amq:transportConnector id="tcpConnector" uri="tcp://garethDell.leeds.retailexp.com:9783"/> 
    </amq:transportConnectors> 
    <amq:persistenceAdapter> 
     <amq:jdbcPersistenceAdapter dataSource="#XADataSource"/> 
    </amq:persistenceAdapter> 
</amq:broker> 

<amq:xaConnectionFactory id="amqConnectionFactory" brokerURL="tcp://garethDell.leeds.retailexp.com:9783" useAsyncSend="false"> 
    <amq:prefetchPolicy> 
     <amq:prefetchPolicy all="1"/> 
    </amq:prefetchPolicy> 
    <amq:redeliveryPolicy> 
     <amq:redeliveryPolicy id="redeliveryPolicy" initialRedeliveryDelay="5000" maximumRedeliveries="5" useExponentialBackOff="true" backOffMultiplier="2" queue="*" /> 
    </amq:redeliveryPolicy> 
</amq:xaConnectionFactory> 

<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> 
    <property name="targetConnectionFactory" ref="amqConnectionFactory"/> 
    <property name="reconnectOnException" value="true"/> 
</bean> 

<int:gateway id="messageGateway" service-interface="com.retailexp.amp.server.spring.MessageGateway" default-request-channel="requestChannel"/> 
<bean id="MessagePublisher" class="com.retailexp.amp.server.common.publishing.impl.MessagePublisher"> 
    <property name="gateway" ref="messageGateway"/> 
</bean> 

<!-- Routes the messages to the appropriate destination, each queue/topic should have an appropriate channel associated with it which must be registered here--> 
<int:channel id="requestChannel"/> 
<int:header-value-router id="destinationRouter" input-channel="requestChannel" header-name="destination"> 
    <int:mapping value="test" channel="testOutChannel"/> 
</int:header-value-router> 

<int:channel id="testOutChannel"/> 
<int-jms:outbound-channel-adapter id="testOut" channel="testOutChannel" connection-factory="connectionFactory" destination="testQueue"/> 

<int:channel id="testListener"/> 
<int-jms:message-driven-channel-adapter id="testIn" connection-factory="connectionFactory" container="testQueueContainer" channel="testListener"/> 

<!-- Define the queues --> 
<bean id="testQueue" class="org.apache.activemq.command.ActiveMQQueue"> 
    <constructor-arg value="test"/> 
</bean> 

<!-- Define the message listeners --> 
<bean id="testMessageHandler" class="com.retailexp.amp.server.spring.TestListener"/> 
<int:service-activator input-channel="testListener" ref="testMessageHandler" method="onMessage"/> 

<bean id="testQueueContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> 
    <property name="connectionFactory" ref="connectionFactory"/> 
    <property name="transactionManager" ref="TransactionManager"/> 
    <property name="taskExecutor" ref="TaskExecutor"/> 
    <property name="maxConcurrentConsumers" value="10"/> 
    <property name="sessionTransacted" value="true"/> 
    <property name="sessionAcknowledgeMode" value="0"/> 
    <property name="destination" ref="testQueue"/> 
    <property name="cacheLevelName" value="CACHE_CONSUMER"/> 
</bean> 

Мой слушатель очень прямо вперед на данном этапе, он выходит из сообщения затем сохраняет один из наших объектов предметной области и, наконец, бросает исключение во время выполнения, чтобы заставить откат. Откат на этом этапе появляется в журналах, и изменения базы данных корректно откатываются, поэтому я знаю, что в слушателе я получаю откат транзакции. Я предполагаю, что потребитель не зачисляется как часть транзакции XA, когда это должно быть, поэтому потребление сообщения фиксируется как часть отдельной транзакции (как-то не транзакционно?).

Я вижу ошибку, определяющую уровень изоляции транзакции из-за драйвера xa jdbc в TransactionContext, однако я не думаю, что это причина проблемы. Я попытался использовать различные уровни изоляции, например. 8 TRANSACTION_SERIALIZABLE, поэтому я считаю, что это просто ограничение самого драйвера.

Исключение:

[11 Sep 2014 16:53:07,464] [ActiveMQ Transport: tcp:///192.168.50.100:[email protected] ] [   ] [   ] TRACE jdbc.TransactionContext     - Cannot set transaction isolation to 1 due Due to vendor limitations, setting transaction isolation for "Oracle XA" JDBC XA driver is not supported.. This exception is ignored. 
java.sql.SQLException: Due to vendor limitations, setting transaction isolation for "Oracle XA" JDBC XA driver is not supported. 
    at weblogic.jdbc.wrapper.JTAConnection.setTransactionIsolation(JTAConnection.java:492) 
    at org.apache.activemq.store.jdbc.TransactionContext.getConnection(TransactionContext.java:74) 
    at org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter.doAddMessage(DefaultJDBCAdapter.java:220) 
    at org.apache.activemq.store.jdbc.JDBCMessageStore.addMessage(JDBCMessageStore.java:126) 
    at org.apache.activemq.store.memory.MemoryTransactionStore.addMessage(MemoryTransactionStore.java:343) 
    at org.apache.activemq.store.memory.MemoryTransactionStore$1.asyncAddQueueMessage(MemoryTransactionStore.java:159) 
    at org.apache.activemq.broker.region.Queue.doMessageSend(Queue.java:910) 
    at org.apache.activemq.broker.region.Queue.send(Queue.java:733) 
    at org.apache.activemq.broker.region.AbstractRegion.send(AbstractRegion.java:424) 
    at org.apache.activemq.broker.region.RegionBroker.send(RegionBroker.java:445) 
    at org.apache.activemq.broker.jmx.ManagedRegionBroker.send(ManagedRegionBroker.java:297) 
    at org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java:147) 
    at org.apache.activemq.broker.CompositeDestinationBroker.send(CompositeDestinationBroker.java:96) 
    at org.apache.activemq.broker.TransactionBroker.send(TransactionBroker.java:307) 
    at org.apache.activemq.broker.MutableBrokerFilter.send(MutableBrokerFilter.java:152) 
    at org.apache.activemq.broker.TransportConnection.processMessage(TransportConnection.java:496) 
    at org.apache.activemq.command.ActiveMQMessage.visit(ActiveMQMessage.java:756) 
    at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:294) 
    at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:148) 
    at org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:50) 
    at org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:113) 
    at org.apache.activemq.transport.AbstractInactivityMonitor.onCommand(AbstractInactivityMonitor.java:270) 
    at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83) 
    at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:214) 
    at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:196) 
    at java.lang.Thread.run(Thread.java:722) 

У меня есть журналы на уровне трассировки, которые доступны, если они необходимы, они очень многословным, хотя так толкали меня за предел для этой должности. Дайте мне знать, если они будут полезны, и я постараюсь сделать их доступными.

Может ли кто-нибудь увидеть, есть ли что-то явно неправильное в вышеуказанной конфигурации?

Edit: журнал трассировки можно посмотреть здесь: http://pastebin.com/0awmzY2D

Благодаря

+0

Было бы лучше, если бы вы указали StackTrace на откате TX вместо 'Oracle XA limit message'. В любом случае, последний говорит вам, что вы не можете этого сделать. –

+0

Спасибо за ответ, трассировка не очень полезна при откатке, ящик транзакции показывает запрос отката, а затем начальный откат без стека ошибок или что-то еще. Полный журнал трассировки можно найти здесь: http://pastebin.com/0awmzY2D –

+0

Похоже, что что-то не так с ActiveMQ: DEBUG listener.DefaultMessageListenerContainer - Откат транзакции из-за исключения слушателя throw: org.springframework.messaging.MessageHandlingException: java.lang .RuntimeException: откат пожалуйста ???!. Может быть, сообщения не стойкие? –

ответ

1

Оказывается, что не было никакого XID на TransactionContext, состоявшейся на сессии в то время сообщение ударил слушателя. Фактически сеанс не включался в транзакцию, поэтому я получал новую транзакцию, которая только что завернула код слушателя.

Я немного подумал о том, где должен состоять призыв, и похоже, что фабрика соединений xa не делает ничего, чтобы заручиться сеансом транзакции, когда она ее создает, в ретроспективе. Я не уверен, почему Я думал, что это не будет иметь никакого отношения к менеджеру транзакций.

Изменения к XAPooledConnectionFactory результатов в XaConnectionPool, которая делает делать автоматическую вербовку, конфигурация изменяется следующим образом:

<bean id="connectionFactory" class="org.apache.activemq.jms.pool.XaPooledConnectionFactory"> 
    <property name="connectionFactory" ref="xaConnectionFactory"/> 
    <property name="tmJndiName" value="weblogic.transaction.TransactionManager"/> 
    <property name="tmFromJndi" value="true"/> 
</bean> 

<bean id="testQueueContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> 
    <property name="connectionFactory" ref="connectionFactory"/> 
    <property name="transactionManager" ref="TransactionManager"/> 
    <property name="taskExecutor" ref="TaskExecutor"/> 
    <property name="maxConcurrentConsumers" value="10"/> 
    <property name="sessionTransacted" value="true"/> 
    <property name="destination" ref="testQueue"/> 
</bean> 

Я теперь получаю сообщение Redelivery. У меня все еще есть проблема, когда соблюдение сроков доставки и отмена политики не соблюдаются (хотя, как ни странно, максимальные релейные потоки), но я уверен, что смогу это сделать.

Спасибо за ваше время.

+0

Если это кому-то полезно; политика переадресации игнорируется, поскольку перераспределение осуществляется потребителем, который необходимо кэшировать, чтобы обеспечить учет предыдущей задержки. Поэтому для контейнера прослушивателя сообщений необходимо использовать уровень кэша CACHE_CONSUMER. К сожалению, в этом случае это не сработает, потому что сеанс также кэшируется и не будет иметь связанной транзакции, поэтому откат не приведет к повторной доставке. Я решил это, настроив переадресацию брокера, и все работает, как я ожидаю сейчас. –

Смежные вопросы