Редакция: Поток ввода работает только для 1 ввода, отправляемой в агрегатор для выходного канала 'out'. Последующие сообщения поступают только в журнал «logLateArrivers». Какое условие используется для отправки на канал сброса?Пример интеграции с интеграцией весны с Websphere
Описание: Попытка сопоставить пример интеграции Spring для базовых jms с использованием aggegator с использованием WebSphere.
ОБНОВЛЕНИЕ: - Включение отладки показывает, что работает poller. Сообщения вытягиваются и помещаются в MQ, и ответ подбирается. Однако для сценария MQ после первого набора сообщений не используется AggregatingMessageHandler. Сообщения отправляются в адаптер «logLateArrivers» на канале discard vs channel 'out' для вывода. Я повторяю формулировку проблемы более конкретно.
Спринг Интеграция Пример: Spring Integration Github Example
Выход использованием Спринг интеграции Пример:
test1
test2
[TEST1, TEST1]
[TEST2, TEST2]
Выход использованием Spring Интеграция с Websphere
test1
test2
[TEST1, TEST2]
[TEST1, TEST2]
адаптированное Изменения с помощью WebSphere MQ
common.xml
<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <property name="targetConnectionFactory"> <bean class="com.ibm.mq.jms.MQConnectionFactory"> <property name="channel" value="channelName" /> <property name="hostName" value="host1234" /> <property name="port" value="1111" /> <property name="queueManager" value="testqmgr" /> <property name="transportType" value="1" /> </bean> </property> <property name="sessionCacheSize" value="10"/> </bean> <bean id="requestQueue" class="com.ibm.mq.jms.MQQueue"> <constructor-arg value="requestQueue"/> </bean> <bean id="requestTopic" class="com.ibm.mq.jms.MQTopic"> <constructor-arg value="topic.demo"/> </bean> <bean id="replyQueue" class="com.ibm.mq.jms.MQQueue"> <constructor-arg value="replyQueue"/> </bean> <!-- Poller that is the stream in channel for console input --> <integration:poller id="poller" default="true" fixed-delay="1000"/>
Aggregation.xml
<int-stream:stdin-channel-adapter id="stdin" channel="stdinToJmsoutChannel"/> <int:channel id="stdinToJmsoutChannel"/> <int:chain input-channel="stdinToJmsoutChannel"> <int:header-enricher> <int:header name="jms_replyTo" ref="replyQueue" /> </int:header-enricher> <int-jms:outbound-channel-adapter destination="requestTopic" /> </int:chain> <int-jms:message-driven-channel-adapter channel="jmsReplyChannel" destination="replyQueue"/> <int:channel id="jmsReplyChannel" /> <int:aggregator input-channel="jmsReplyChannel" output-channel="out" group-timeout="5000" expire-groups-upon-timeout="false" send-partial-result-on-expiry="true" discard-channel="logLateArrivers" correlation-strategy-expression="headers['jms_correlationId']" release-strategy-expression="size() == 2"/> <int:logging-channel-adapter id="logLateArrivers" /> <!-- Subscribers --> <int-jms:inbound-gateway request-channel="upcase" request-destination="requestTopic" /> <int-jms:inbound-gateway request-channel="upcase" request-destination="requestTopic" /> <int:transformer input-channel="upcase" expression="payload.toUpperCase()" /> <!-- Profiles --> <beans profile="default"> <int-stream:stdout-channel-adapter id="out" append-newline="true"/> </beans> <beans profile="testCase"> <int:bridge input-channel="out" output-channel="queueChannel"/> <int:channel id="queueChannel"> <int:queue /> </int:channel> </beans>
Извините за задержку, потребовалось некоторое время для отладки и отслеживания. Я обновил заявление о проблеме. С точки зрения корреляции id, его разные делают это через activeMQ vs websphere mq. Пример интеграции Spring является локальным с файловой системы, где MQ находится через диспетчера очереди, поэтому, возможно, поэтому у него другое поведение. Какое условие приводит сообщения к каналу сброса? – haju
Сообщения, поступающие с одинаковым идентификатором корреляции (в этом случае 'heads ['jms_correlationId']') будут отброшены (поздние приращения), если только '\t \t expire-groups-on-completion =" true "' - что позволяет новый для начала. Вам нужно выяснить, почему вторая группа имеет одинаковый идентификатор корреляции. –
Не должен ли stdinToJmsoutChannel создать новый идентификатор корреляции, когда poller захватывает новые сообщения? Это похоже на пример по умолчанию. – haju