2016-04-20 2 views
2

У меня есть поток интеграции, где некоторые из шагов - асинхронные и некоторые из них. Я хочу использовать barrier, чтобы заблокировать поток Main, пока не завершится выполнение всех задач async. Основываясь на документации, существует два способа использования барьера.Использование барьера для ожидания завершения интеграционного потока

  1. Отправьте второе сообщение триггера на входной канал барьера.
  2. Вызвать метод запуска вручную барьер

В моем случае использования приходит сообщение в потоке, а затем проходит через несколько компонентов до тех пор, пока не достигнет completed канала. Я хочу, чтобы основной поток был заблокирован, пока исходные сообщения не достигнут завершенного канала. Поэтому представляется целесообразным использовать опцию №2 и вызвать метод триггера барьера после достижения состояния completed. Кажется, это не работает. Вот упрощенная версия моего потока.

<int:gateway 
    service-interface="...BarrierGateway" 
    id="barrierGateway" default-request-channel="input"> 
</int:gateway> 

<int:channel id="input"> 
    <int:dispatcher task-executor="executor" /> 
</int:channel> 

<int:service-activator input-channel="input" output-channel="completed"> 
    <bean class="...BarrierSA" /> 
</int:service-activator> 

<int:channel id="completed" /> 
<int:service-activator input-channel="completed" 
    ref="barrier1.handler" method="trigger" /> 

<int:barrier id="barrier1" input-channel="input" timeout="10000" /> 

Я посылаю сообщение gateway, который передает его на input канал, который с помощью dispatcher так новый поток запускается передать сообщение вперед. На этом этапе я хочу заблокировать поток main, пока поток Executor-1 проходит через поток. Остальная часть потока проста. Мой service-activator спит в течение 3 секунд, прежде чем возвращать сообщение, чтобы имитировать задержку. Как только сообщение получено в канале completed, активатор службы должен вызывать метод barrier trigger, и только в этот момент основной поток должен быть освобожден. Вместо этого основной поток освобождается сразу после запуска диспетчера нового потока. Я попытался указать постоянный идентификатор корреляции ('abc'), но это не помогло.

ответ

2

Я вижу, вы попались в ловушку.

<int:barrier> приостанавливает поток только в сообщении сообщения, но только тот поток, который приносит ему это сообщение. Глядя на вашу конфигурацию, это тот же самый input канал с Executor. Цель ExecutorChannel перевести сообщение в другой поток, но не приостанавливать поток вызывающего.

С другой стороны у вас есть еще одна ошибка вокруг этого input. Вы объявляете для него двух подписчиков, где только одна из них будет вызвана стратегией балансирования round-robin.

Чтобы исправить вашу задачу, мы должны иметь еще один канал верхнего уровня, как <publish-subscribe-channel>. И правильно, уже сейчас у вас могут быть два подписчика.

Один из них должен быть <bridge> на ваш inputExecutorChannel. И еще один желаемый <barrier>. И только теперь он может приостановить (блокировать ваши термины) основной поток от <gateway>.

С другой стороны, более простым решением было бы не использовать <barrier>. У <gateway> есть возможность блокировать поток вызывающего и ждать ответа. Конечно, это работает, когда методы шлюза не являются void.

И еще один пункт к вашей конфигурации: если вы не ждать ответа на шлюзе, то <barrier> проваливается с

throw new DestinationResolutionException("no output-channel or replyChannel header available"); 

Итак, рассмотрим использовать что-то как output-channel там колодцем.

+0

Барьер предназначен для более сложных сред; для простого примера использования, как это описано в @Artem, основной поток будет заблокирован в шлюзе (если есть тип возврата из метода) до тех пор, пока поток не завершится (независимо от каких-либо асинхронных передач обслуживания) или до тех пор, пока «ответ- timeout' превышен - по умолчанию это бесконечность. Если у вас есть пустота, то барьер является правильным решением, но каждая сторона барьера должна работать на отдельной нити, как говорит Артем. –

Смежные вопросы