2015-12-08 5 views
2

Я пытался играть немного с примером, так как здесь https://github.com/spring-projects/spring-integration-samples/commit/3c855f82047c2e5f639bbec47ad44b4782b366da, так вместо строки:Spring Integration - Барьер и маршрутизатор

<int:splitter input-channel="processChannel" output-channel="process" order="1" /> 

я добавил ниже:

<int:splitter input-channel="processChannel" output-channel="someCheckGate" order="1"/> 

<int:channel id="someCheckGate"/> 

<int:router input-channel="someCheckGate" apply-sequence="false" default-output-channel="aggregatorChannel" expression="1 eq 1"> 
    <int:mapping value="true" channel="aggregatorChannel"/> 
    <int:mapping value="false" channel="aggregatorChannel"/> 
</int:router> 

(истина/ложные точки на один и тот же канал для описания моего вопроса). То, что я пытался сделать, - обходить канал процесса на основе какого-либо условия (либо на основе заголовков, либо в полезной нагрузке ..., в настоящий момент всегда верно), и он обходит отлично, агрегаты отлично, но он зависает в течение определенного таймаута после BarrierMessageHandler. триггер называется:

if (!syncQueue.offer(message, timeout, TimeUnit.MILLISECONDS)) 

и возвращается обратно в следующей строке:

this.logger.error("Suspending thread timed out or did not arrive within timeout for: " + message); 

размер syncQueue равно 0.

После того, как поток назад он пытается освободить, но затем он снова вешает на :

Message<?> releaseMessage = syncQueue.poll(this.timeout, TimeUnit.MILLISECONDS); 

Получение нулевого значения и, наконец, бросание ReplyRequiredException на время ожидания барьера. Кажется, здесь довольно хорошая логика ... :) Можете ли вы посоветовать, что мне здесь не хватает? Почему этот маршрутизатор здесь не ноги?

ответ

1

Проблема заключается в том, что в этом случае вы пытаетесь освободить барьер в том же потоке, что и тот, который вы хотите приостановить.

В этом случае (начиная с process пропускается для всех расщеплений), выпуск происходит до приостановки.

Способ, которым работает барьер, заключается в том, что если выпуск происходит первым, этот поток ожидает, пока не появится суспендирующая нить. Поскольку это тот же поток, этого никогда не произойдет.

Обратите внимание, что process является каналом очереди - эти сообщения передаются в другой поток. Следовательно, вам нужен еще один канал очереди для «пропущенных» расщеплений.

<int:router input-channel="someCheckGate" apply-sequence="false" default-output-channel="toAgg" expression="1 eq 1"> 
    <int:mapping value="true" channel="toAgg"/> 
    <int:mapping value="false" channel="toAgg"/> 
</int:router> 

<int:channel id="toAgg"> 
    <int:queue /> 
</int:channel> 

<int:bridge input-channel="toAgg" output-channel="aggregatorChannel"> 
    <int:poller fixed-delay="1000" /> 
</int:bridge> 

Также этот пример написан для ожидания исключений; поскольку они все успешны, структура хочет отправить ответ.

См. this commit for a full working version with the router in place.

Другим решением было бы сделать aggregatorChannelExecutorChannel:

<int:channel id="aggregatorChannel"> 
    <int:dispatcher task-executor="exec" /> 
</int:channel> 
+0

Да, именно - «Предлагать» был поднят, но не приостановлено нить будет выпущен ... Теперь работает отлично - магия !;) Я полагаю, : 'output-channel =" transform "' также может быть заменен на 'output-channel =" nullChannel "' в конфигурации барьера (просто для исключения DestinationResolutionException в конце). Спасибо +1 – m52509791

+0

Да - маршрутизация в 'nullChannel' будет работать. –