2015-09-10 4 views
2

Редакция: Поток ввода работает только для 1 ввода, отправляемой в агрегатор для выходного канала 'out'. Последующие сообщения поступают только в журнал «logLateArrivers». Какое условие используется для отправки на канал сброса?Пример интеграции с интеграцией весны с Websphere

Описание: Попытка сопоставить пример интеграции Spring для базовых jms с использованием aggegator с использованием WebSphere.

ОБНОВЛЕНИЕ: - Включение отладки показывает, что работает poller. Сообщения вытягиваются и помещаются в MQ, и ответ подбирается. Однако для сценария MQ после первого набора сообщений не используется AggregatingMessageHandler. Сообщения отправляются в адаптер «logLateArrivers» на канале discard vs channel 'out' для вывода. Я повторяю формулировку проблемы более конкретно.

Спринг Интеграция Пример: Spring Integration Github Example

Выход использованием Спринг интеграции Пример:

test1 
test2 
[TEST1, TEST1] 
[TEST2, TEST2] 

Выход использованием Spring Интеграция с Websphere

test1 
test2 
[TEST1, TEST2] 
[TEST1, TEST2] 

адаптированное Изменения с помощью WebSphere MQ

  1. common.xml

    <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> 
        <property name="targetConnectionFactory"> 
         <bean class="com.ibm.mq.jms.MQConnectionFactory"> 
          <property name="channel" value="channelName" /> 
          <property name="hostName" value="host1234" /> 
          <property name="port" value="1111" /> 
          <property name="queueManager" value="testqmgr" /> 
          <property name="transportType" value="1" /> 
         </bean> 
        </property> 
        <property name="sessionCacheSize" value="10"/> 
    </bean> 
    
    <bean id="requestQueue" class="com.ibm.mq.jms.MQQueue"> 
        <constructor-arg value="requestQueue"/> 
    </bean> 
    
    <bean id="requestTopic" class="com.ibm.mq.jms.MQTopic"> 
        <constructor-arg value="topic.demo"/> 
    </bean> 
    
    <bean id="replyQueue" class="com.ibm.mq.jms.MQQueue"> 
        <constructor-arg value="replyQueue"/> 
    </bean> 
    
    <!-- Poller that is the stream in channel for console input --> 
    <integration:poller id="poller" default="true" fixed-delay="1000"/> 
    

  2. Aggregation.xml

    <int-stream:stdin-channel-adapter id="stdin" channel="stdinToJmsoutChannel"/> 
    
    <int:channel id="stdinToJmsoutChannel"/> 
    
    <int:chain input-channel="stdinToJmsoutChannel"> 
        <int:header-enricher> 
         <int:header name="jms_replyTo" ref="replyQueue" /> 
        </int:header-enricher> 
        <int-jms:outbound-channel-adapter destination="requestTopic" /> 
    </int:chain> 
    
    <int-jms:message-driven-channel-adapter channel="jmsReplyChannel" 
        destination="replyQueue"/> 
    
    <int:channel id="jmsReplyChannel" /> 
    
    <int:aggregator input-channel="jmsReplyChannel" output-channel="out" 
        group-timeout="5000" 
        expire-groups-upon-timeout="false" 
        send-partial-result-on-expiry="true" 
        discard-channel="logLateArrivers" 
        correlation-strategy-expression="headers['jms_correlationId']" 
        release-strategy-expression="size() == 2"/> 
    
    <int:logging-channel-adapter id="logLateArrivers" /> 
    
    <!-- Subscribers --> 
    
    <int-jms:inbound-gateway request-channel="upcase" request-destination="requestTopic" /> 
    
    <int-jms:inbound-gateway request-channel="upcase" request-destination="requestTopic" /> 
    
    <int:transformer input-channel="upcase" expression="payload.toUpperCase()" /> 
    
    <!-- Profiles --> 
    
    <beans profile="default"> 
    
        <int-stream:stdout-channel-adapter id="out" append-newline="true"/> 
    
    </beans> 
    
    <beans profile="testCase"> 
    
        <int:bridge input-channel="out" output-channel="queueChannel"/> 
    
        <int:channel id="queueChannel"> 
         <int:queue /> 
        </int:channel> 
    
    </beans> 
    

ответ

1

Сообщения должны быть скоррелированы по адресу jms_correlationId. Включите ведение журнала DEBUG и сравните поток сообщений между образцом и вашей версией. Возможно, идентификатор корреляции не настроен правильно.

Входящие шлюзы используют эту логику ...

replyMessage.setJMSCorrelationID(requestMessage.getJMSMessageID()); 

Таким образом, сообщения, связанные с каждым запросом должны получить те же jms_correlationId при отправке агрегатора.

Ваш тест показывает, что оба сообщения имеют один и тот же идентификатор сообщения.

РЕДАКТИРОВАТЬ

сообщения, поступающие с тем же идентификатором корреляции (в данном случае headers['jms_correlationId']) не будет отброшен (конец arrivers), если expire-groups-upon-completion="true" - что позволяет новую группу, чтобы начать вместо отбрасывания. Вам нужно выяснить, почему вторая группа имеет тот же самый идентификатор корреляции, что и первый.

+0

Извините за задержку, потребовалось некоторое время для отладки и отслеживания. Я обновил заявление о проблеме. С точки зрения корреляции id, его разные делают это через activeMQ vs websphere mq. Пример интеграции Spring является локальным с файловой системы, где MQ находится через диспетчера очереди, поэтому, возможно, поэтому у него другое поведение. Какое условие приводит сообщения к каналу сброса? – haju

+0

Сообщения, поступающие с одинаковым идентификатором корреляции (в этом случае 'heads ['jms_correlationId']') будут отброшены (поздние приращения), если только '\t \t expire-groups-on-completion =" true "' - что позволяет новый для начала. Вам нужно выяснить, почему вторая группа имеет одинаковый идентификатор корреляции. –

+0

Не должен ли stdinToJmsoutChannel создать новый идентификатор корреляции, когда poller захватывает новые сообщения? Это похоже на пример по умолчанию. – haju