2016-12-07 8 views
2

Таким образом, установка следующее:Весна интеграции и JDBC в одной транзакции

<tx:advice id="txAdvice2" transaction-manager="dataSourceTransactionManager"> 
    <tx:attributes> 
     <tx:method name="*" rollback-for="Throwable" no-rollback-for="ListenerExecutionFailedException"/> 
    </tx:attributes> 
</tx:advice> 

<int-amqp:inbound-channel-adapter channel="input-channel" queue-names="probni" message-converter="jsonMessageConverter" 
            channel-transacted="true" 
            advice-chain="txAdvice2" /> 

<int:chain input-channel="input-channel" output-channel="output-channel"> 
    <int:service-activator ref="h1Handler" method="handle" /> 
    <int:service-activator ref="h2Handler" method="handle" /> 
    <int:service-activator ref="h3Handler" method="handle" /> 
    <int:splitter /> 
</int:chain> 

<int-amqp:outbound-channel-adapter channel="output-channel" exchange-name="outputit" amqp-template="rabbitTemplate" /> 

Если во время выполнения этой нити (так как все это цепь amqpIN-процесс-amqpOUT shold выполнить в одном потоке) кидаю ListenerExecutionFailedException, dataSourceTransactionManager выполнит фиксацию, но amqp также запросит сообщение, потому что исключение распространяется.

Как я могу сказать кролику, чтобы сообщение ACK было успешным в этом случае?

Кроме того, я увидел, что мне нужно было установить класс исключительных исключений атрибута no-rollback-for, поскольку мое внутреннее исключение хранится только в атрибуте «причина», который не проверяется атрибутом RuleBasedTransactionAttribute. более

Одна вещь, если я конфиг так:

<int-amqp:inbound-channel-adapter channel="input-channel" queue-names="probni" message-converter="jsonMessageConverter" 
            channel-transacted="true" 
            transaction-manager="dataSourceTransactionManager" 
            transaction-attribute="transactionAttribute" /> 

transactionAttribute который RuleBasedTransactionAttribute не считается вообще и dataSourceTransactionManager всегда rollbacked, даже если у меня нет-откат-за не установлены правильно.

Спасибо!

ответ

2

Вы можете добавить пользовательские ErrorHandler к контейнеру слушателя ( вы должны настроить контейнер снаружи и содержат ссылки на container атрибут ).

Обработчик ошибок по умолчанию является ConditionalRejectingErrorHandler с DefaultExceptionStrategy, которая рассматривает некоторые исключения LEFE вызывают как со смертельным исходом:

private boolean isCauseFatal(Throwable cause) { 
     return cause instanceof MessageConversionException 
       || cause instanceof org.springframework.messaging.converter.MessageConversionException 
       || cause instanceof MethodArgumentNotValidException 
       || cause instanceof MethodArgumentTypeMismatchException 
       || cause instanceof NoSuchMethodException 
       || cause instanceof ClassCastException 
       || isUserCauseFatal(cause); 
    } 

Начиная с версии 1.6.4 вы можете унаследовать по умолчанию DefaultExceptionStrategy и добавить причину (ы) для isUserCauseFatal().

Перед тем, как 1.6.4 вам пришлось предоставить свой собственный FatalExceptionStrategy (или реализация обработчика ошибок).

Для фатальных причин обработчик выбрасывает AmqpRejectAndDontRequeueException, который сообщает контейнеру о том, чтобы отправить (и не требовать) сообщение.

EDIT

Кстати, нет необходимости для вас, чтобы обернуть исключение, контейнер будет делать это для вас ...

protected Exception wrapToListenerExecutionFailedExceptionIfNeeded(Exception e, Message message) { 
    if (!(e instanceof ListenerExecutionFailedException)) { 
     // Wrap exception to ListenerExecutionFailedException. 
     return new ListenerExecutionFailedException("Listener threw exception", e, message); 
    } 
    return e; 
} 

EDIT2

Моя ошибка, ErrorHandler может быть указана с использованием атрибута error-handler.

EDIT3

В качестве альтернативы, просто бросаться AmqpRejectAndDontRequeueException (который будет обернут в LEFE).

+0

Вам не нужно обертывать исключение; см. мое редактирование. Кроме того, вы можете использовать атрибут 'error-handler'. –

+0

Спасибо за быстрый ответ. Попробуем это завтра.Если я отклонюAndDontRequeue - это закончится в DeadLetterQueue? Я бы хотел, чтобы это рассматривалось как обычный ACK, так как я хочу сделать это, если сообщение будет повторно отправлено (доставлено хотя бы один раз +, возможно, больше). Вы знаете, кстати, почему атрибут транзакции не работает с транзакцией при поставке внутри входящего канала-adater, но работает, если применяется в качестве совета? (Вторая часть вопроса). –

+0

То есть, если сообщение повторно отправлено, но jdbc уже завершен, я хочу выставить исключение, которое будет откатывать jdbc, но вы получите сообщение о кролике (так как я делаю commit и ack в конце конвейера обработки). –

Смежные вопросы