У меня есть реализация шлюза, как показано ниже. Я определил канал ошибок в шлюзе, чтобы уловить любое исключение во всем потоке SI. OutputChannel отправляет ответ обратно из шлюза. Как видите, в системе, созданной с помощью разделителя и маршрутизатора, имеется 3 параллельных потока.Обработка исключений в шлюзе Async
В ErrorHandlingService я устанавливаю соответствующие коды ошибок и отправляю сообщение обратно в aggregateDSLResponseChannel, чтобы весь поток прошел гладко. Мне нужна реализация, когда весь поток заканчивается в DSLFlowEndService, даже когда есть исключение, а затем возвращает ответ обратно вызывающему абоненту по настройкам соответствующего объекта ответа. Причина в том, что есть запрос на подключение, чтобы сказать WLDP и IRIS, и сказать, что поток WLDP не прошел но IRIS преуспевает, я не хочу отправлять полный ответ как отказ. Вместо этого я отправил соответствующее сообщение обратно вызывающему абоненту с указанием частичного успеха (с подробными сведениями о том, что не удалось и удалось)
Чтобы проверить, я ввел исключение в WLDPFlowEndService. Исключение приходит к каналу ошибок, а затем сообщение также возвращается обратно в aggregateDSLResponseChannel, а затем в DSLFlowEndService. Однако ответ не отправляется обратно вызывающему внешнему шлюзу. По существу, интеграция с интеграцией Spring никогда не завершается и время истекает через 30 секунд. Ниже приводится предупреждение, что приходит в журналах:
15:13:23.271 [dslParallelExecutor-2] WARN org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel {} - Reply message being sent, but the receiving thread has already received a reply:[Payload=DSLResponseVO
15:13:23.271 [dslParallelExecutor-2] DEBUG org.springframework.integration.channel.DirectChannel {} - postSend (sent=true) on channel 'outputChannel', message: [Payload=DSLResponseVO
** Обновлено Конфигурация- один, который работает (супер спасибо Gary) **:
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:task="http://www.springframework.org/schema/task"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">
<int:channel id="inputChannel"></int:channel>
<int:channel id="outputChannel"></int:channel>
<int:channel id="dslFlowInitiatorOutputChannel"/>
<int:channel id="routingChannel"/>
<int:channel id="wldpInputChannel">
<int:dispatcher task-executor="wldpParallelExecutor" />
</int:channel>
<int:channel id="lcInputChannel">
<int:dispatcher task-executor="lcParallelExecutor" />
</int:channel>
<int:channel id="irisInputChannel">
<int:dispatcher task-executor="irisParallelExecutor" />
</int:channel>
<int:publish-subscribe-channel id="aggregateDSLResponseChannel"></int:publish-subscribe-channel>
<int:channel id="aggregateDSLOutputChannel">
<int:dispatcher task-executor="dslParallelExecutor" />
</int:channel>
<!-- all thread pools to execute tasks in parallel -->
<task:executor id="dslParallelExecutor" pool-size="50" />
<task:executor id="wldpParallelExecutor" pool-size="25" />
<task:executor id="lcParallelExecutor" pool-size="25" />
<task:executor id="irisParallelExecutor" pool-size="25" />
<int:gateway id="dslServiceFacade" service-interface="com.mycompany.tss.ls.dsl.gateway.IDSLServiceFacade"
default-request-channel="inputChannel" default-reply-channel="outputChannel" error-channel="gatewayErrorChannel" async-executor="dslParallelExecutor">
<int:method name="invoke" request-channel="inputChannel" request-timeout="5000" reply-timeout="3000000"/>
</int:gateway>
<int:service-activator input-channel="inputChannel"
output-channel="dslFlowInitiatorOutputChannel" ref="dslFlowInitiatorService"
method="invoke" id="dslFlowInitiator" />
<bean id="dslFlowInitiatorService" class="com.mycompany.tss.ls.dsl.service.DSLFlowInitiatorService" />
<int:splitter id="systemSplitter" input-channel="dslFlowInitiatorOutputChannel" method="split"
output-channel="routingChannel" ref="systemMessageSplitter">
</int:splitter>
<bean id="systemMessageSplitter" class="com.mycompany.tss.ls.dsl.splitter.SystemMessageSplitter"/>
<int:router id="systemRouter" input-channel="routingChannel" default-output-channel="nullChannel"
expression="headers.get('systemId')">
<int:mapping value="WLDP" channel="wldpInputChannel" />
<int:mapping value="LC" channel="lcInputChannel" />
<int:mapping value="IRIS" channel="irisInputChannel" />
</int:router>
<int:aggregator id="dslResponseAggregator" input-channel="aggregateDSLResponseChannel" output-channel="aggregateDSLOutputChannel"
message-store="dslResponseMessageStore" correlation-strategy-expression="headers['requestId']"
send-partial-result-on-expiry="true">
</int:aggregator>
<int:service-activator input-channel="aggregateDSLOutputChannel"
output-channel="outputChannel" ref="dslFlowEndService"
method="invoke" id="dslFlowEndActivator" />
<bean id="dslFlowEndService" class="com.mycompany.tss.ls.dsl.service.DSLFlowEndService" />
<bean id="dslResponseMessageStore" class="org.springframework.integration.store.SimpleMessageStore" />
<bean id="dslResponseMessageStoreReaper" class="org.springframework.integration.store.MessageGroupStoreReaper">
<property name="messageGroupStore" ref="dslResponseMessageStore" />
<property name="timeout" value="4000" />
</bean>
<task:scheduled-tasks>
<task:scheduled ref="dslResponseMessageStoreReaper" method="run" fixed-rate="10000" />
</task:scheduled-tasks>
<int:wire-tap pattern="*Channel" order="3" channel="wiretap"/>
<int:message-history/>
<int:logging-channel-adapter id="wiretap" level="DEBUG"/>
<!-- Error handling Service Activator to log exceptions and send the response -->
<int:service-activator id="errorHandlingServiceActivator" input-channel="gatewayErrorChannel"
method="invoke" output-channel="aggregateDSLResponseChannel" ref="errorHandlingService">
</int:service-activator>
<!-- Error Handling Service class -->
<bean id="errorHandlingService" class="com.mycompany.tss.ls.dsl.service.ErrorHandlingService" />
<bean id="wldpErrorHandlingService" class="com.mycompany.tss.ls.dsl.service.ErrorHandlingService" />
<bean id="irisErrorHandlingService" class="com.mycompany.tss.ls.dsl.service.ErrorHandlingService" />
<int:channel id="wldpStartChannel"></int:channel>
<int:channel id="irisStartChannel"></int:channel>
<int:channel id="wldpErrorChannel"></int:channel>
<int:channel id="irisErrorChannel"></int:channel>
<int:channel id="wldpOutputChannel"></int:channel>
<int:channel id="irisOutputChannel"></int:channel>
<int:service-activator id="midFlowWLDPActivator" input-channel="wldpInputChannel"
ref="midFlowWLDPGateway" output-channel="aggregateDSLResponseChannel"/>
<int:service-activator id="midFlowIRISActivator" input-channel="irisInputChannel"
ref="midFlowIRISGateway" output-channel="aggregateDSLResponseChannel"/>
<int:gateway id="midFlowWLDPGateway" default-request-channel="wldpStartChannel"
service-interface="com.mycompany.tss.ls.dsl.gateway.IDSLWLDPFacade" error-channel="wldpErrorChannel"
/>
<int:gateway id="midFlowIRISGateway" default-request-channel="irisStartChannel"
service-interface="com.mycompany.tss.ls.dsl.gateway.IDSLIRISFacade" error-channel="irisErrorChannel"
/>
<int:service-activator id="errorHandlingWLDPServiceActivator" input-channel="wldpErrorChannel"
method="invoke" ref="wldpErrorHandlingService">
</int:service-activator>
<int:service-activator id="errorHandlingIRISServiceActivator" input-channel="irisErrorChannel"
method="invoke" ref="irisErrorHandlingService">
</int:service-activator>
<!-- Responsible for closing all the activities related to WLDP before sending response -->
<int:service-activator id="wldpFlowEndServiceActivator" input-channel="wldpStartChannel"
method="invoke" ref="wldpFlowEndService">
</int:service-activator>
<bean id="wldpFlowEndService" class="com.mycompany.tss.ls.dsl.service.wldp.WLDPFlowEndService" />
<!-- Responsible for closing all the activities related to IRIS before sending response -->
<int:service-activator id="irisFlowEndServiceActivator" input-channel="irisStartChannel"
method="invoke">
</int:service-activator>
<bean id="irisFlowEndService" class="com.mycompany.tss.ls.dsl.service.iris.IrisFlowEndService" />
ErrorHandlingService:
общественный класс ErrorHandlingService {
Logger logger = LoggerFactory.getLogger(ErrorHandlingService.class);
public Message<DSLResponseVO> invoke(Message<?> requestMessage) {
Object exception = requestMessage.getPayload();
if(exception instanceof MessagingException){
DSLResponseVO responseVO = new DSLResponseVO();
logger.error("Exception has occurred: {}", ((MessagingException)exception).getMessage());
logger.error("Exception Stacktrace is: ", exception);
responseVO.setRequestId((String)((MessagingException)exception).getFailedMessage().getHeaders().get("requestId"));
responseVO.setSystemId((String)((MessagingException)exception).getFailedMessage().getHeaders().get("systemId"));
responseVO.setErrorCode(DSLErrorConstants.FAILURE_CODE);
responseVO.setErrorMessage(DSLErrorConstants.FAILURE_DESC);
Message<DSLResponseVO> failedMessage = MessageBuilder.withPayload(responseVO)
//.copyHeadersIfAbsent(((MessagingException)exception).getFailedMessage().getHeaders())
.setHeaderIfAbsent("requestId", (String)((MessagingException)exception).getFailedMessage().getHeaders().get("requestId"))
.setHeaderIfAbsent("systemId", (String)((MessagingException)exception).getFailedMessage().getHeaders().get("systemId"))
.setHeaderIfAbsent(MessageHeaders.SEQUENCE_NUMBER, (Integer)((MessagingException)exception).getFailedMessage().getHeaders().get(MessageHeaders.SEQUENCE_NUMBER))
.setHeaderIfAbsent(MessageHeaders.SEQUENCE_SIZE, (Integer)((MessagingException)exception).getFailedMessage().getHeaders().get(MessageHeaders.SEQUENCE_SIZE))
.setHeaderIfAbsent(MessageHeaders.REPLY_CHANNEL, ((MessagingException)exception).getFailedMessage().getHeaders().get(MessageHeaders.REPLY_CHANNEL))
.build();
return failedMessage;
}
return null;
}
}
Спасибо Гэри за ваш ответ. Я фактически создаю ответ от потока обработки ошибок, а затем отправляю его обратно в агрегатор. Вы видите какую-то проблему? Поскольку сообщение становится агрегированным в одно сообщение, но есть предупреждение, и ответ никогда не будет выписан. Если использование poller на субпотоке с каналом ошибки и исполнителем задачи является правильным способом, я могу попробовать это ... Я надеюсь, что когда исключение выбрасывается в любом компоненте под субпотоком, оно будет иметь канал ошибки опроса субпотока, а не канал ошибки шлюза. Правильно ли я понимаю? –
Я думаю, что ваш предложенный подход к очередному каналу с poller для каждого подтока является лучшим подходом. –
Вы должны быть осторожны при ошибках маршрутизации в агрегаторе; здесь было несколько вопросов. Если вы не можете найти тот, на который я ответил, дайте мне знать, и я попытаюсь его найти. Суть в том, что заголовки корреляции не находятся в 'ErrorMessage', они находятся в свойстве' failMessage' полезной нагрузки сообщения об ошибке. В вашем потоке ошибок вам необходимо преобразовать в сообщение об ошибке (или иным образом рекламировать заголовки) перед отправкой ошибки в агрегатор. –