2015-10-23 3 views
0

У меня есть реализация шлюза, как показано ниже. Я определил канал ошибок в шлюзе, чтобы уловить любое исключение во всем потоке SI. OutputChannel отправляет ответ обратно из шлюза. Как видите, в системе, созданной с помощью разделителя и маршрутизатора, имеется 3 параллельных потока.Обработка исключений в шлюзе Async

В ErrorHandlingService я устанавливаю соответствующие коды ошибок и отправляю сообщение обратно в aggregateDSLResponseChannel, чтобы весь поток прошел гладко. Мне нужна реализация, когда весь поток заканчивается в DSLFlowEndService, даже когда есть исключение, а затем возвращает ответ обратно вызывающему абоненту по настройкам соответствующего объекта ответа. Причина в том, что есть запрос на подключение, чтобы сказать WLDP и IRIS, и сказать, что поток WLDP не прошел но IRIS преуспевает, я не хочу отправлять полный ответ как отказ. Вместо этого я отправил соответствующее сообщение обратно вызывающему абоненту с указанием частичного успеха (с подробными сведениями о том, что не удалось и удалось)

Чтобы проверить, я ввел исключение в WLDPFlowEndService. Исключение приходит к каналу ошибок, а затем сообщение также возвращается обратно в aggregateDSLResponseChannel, а затем в DSLFlowEndService. Однако ответ не отправляется обратно вызывающему внешнему шлюзу. По существу, интеграция с интеграцией Spring никогда не завершается и время истекает через 30 секунд. Ниже приводится предупреждение, что приходит в журналах:

15:13:23.271 [dslParallelExecutor-2] WARN org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel {} - Reply message being sent, but the receiving thread has already received a reply:[Payload=DSLResponseVO 
15:13:23.271 [dslParallelExecutor-2] DEBUG org.springframework.integration.channel.DirectChannel {} - postSend (sent=true) on channel 'outputChannel', message: [Payload=DSLResponseVO 

** Обновлено Конфигурация- один, который работает (супер спасибо Gary) **:

<beans xmlns="http://www.springframework.org/schema/beans" 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
xmlns:int="http://www.springframework.org/schema/integration" 
xmlns:task="http://www.springframework.org/schema/task" 
xsi:schemaLocation="http://www.springframework.org/schema/beans 
     http://www.springframework.org/schema/beans/spring-beans.xsd 
     http://www.springframework.org/schema/integration 
     http://www.springframework.org/schema/integration/spring-integration.xsd 
     http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd"> 

<int:channel id="inputChannel"></int:channel> 

<int:channel id="outputChannel"></int:channel> 

<int:channel id="dslFlowInitiatorOutputChannel"/> 

<int:channel id="routingChannel"/> 

<int:channel id="wldpInputChannel"> 
    <int:dispatcher task-executor="wldpParallelExecutor" /> 
</int:channel> 

<int:channel id="lcInputChannel"> 
    <int:dispatcher task-executor="lcParallelExecutor" /> 
</int:channel> 

<int:channel id="irisInputChannel"> 
    <int:dispatcher task-executor="irisParallelExecutor" /> 
</int:channel> 

<int:publish-subscribe-channel id="aggregateDSLResponseChannel"></int:publish-subscribe-channel> 

<int:channel id="aggregateDSLOutputChannel"> 
    <int:dispatcher task-executor="dslParallelExecutor" /> 
</int:channel> 

<!-- all thread pools to execute tasks in parallel --> 
<task:executor id="dslParallelExecutor" pool-size="50" /> 
<task:executor id="wldpParallelExecutor" pool-size="25" /> 
<task:executor id="lcParallelExecutor" pool-size="25" /> 
<task:executor id="irisParallelExecutor" pool-size="25" /> 

<int:gateway id="dslServiceFacade" service-interface="com.mycompany.tss.ls.dsl.gateway.IDSLServiceFacade" 
    default-request-channel="inputChannel" default-reply-channel="outputChannel" error-channel="gatewayErrorChannel" async-executor="dslParallelExecutor"> 
    <int:method name="invoke" request-channel="inputChannel" request-timeout="5000" reply-timeout="3000000"/> 
</int:gateway> 

<int:service-activator input-channel="inputChannel" 
    output-channel="dslFlowInitiatorOutputChannel" ref="dslFlowInitiatorService" 
    method="invoke" id="dslFlowInitiator" /> 

<bean id="dslFlowInitiatorService" class="com.mycompany.tss.ls.dsl.service.DSLFlowInitiatorService" /> 

<int:splitter id="systemSplitter" input-channel="dslFlowInitiatorOutputChannel" method="split" 
       output-channel="routingChannel" ref="systemMessageSplitter"> 
</int:splitter> 

<bean id="systemMessageSplitter" class="com.mycompany.tss.ls.dsl.splitter.SystemMessageSplitter"/> 

<int:router id="systemRouter" input-channel="routingChannel" default-output-channel="nullChannel" 
    expression="headers.get('systemId')"> 
    <int:mapping value="WLDP" channel="wldpInputChannel" /> 
    <int:mapping value="LC" channel="lcInputChannel" /> 
    <int:mapping value="IRIS" channel="irisInputChannel" /> 
</int:router> 

<int:aggregator id="dslResponseAggregator" input-channel="aggregateDSLResponseChannel" output-channel="aggregateDSLOutputChannel" 
    message-store="dslResponseMessageStore" correlation-strategy-expression="headers['requestId']" 
    send-partial-result-on-expiry="true"> 
</int:aggregator> 

<int:service-activator input-channel="aggregateDSLOutputChannel" 
    output-channel="outputChannel" ref="dslFlowEndService" 
    method="invoke" id="dslFlowEndActivator" /> 

<bean id="dslFlowEndService" class="com.mycompany.tss.ls.dsl.service.DSLFlowEndService" /> 

<bean id="dslResponseMessageStore" class="org.springframework.integration.store.SimpleMessageStore" /> 

<bean id="dslResponseMessageStoreReaper" class="org.springframework.integration.store.MessageGroupStoreReaper"> 
    <property name="messageGroupStore" ref="dslResponseMessageStore" /> 
    <property name="timeout" value="4000" /> 
</bean> 

<task:scheduled-tasks> 
    <task:scheduled ref="dslResponseMessageStoreReaper" method="run" fixed-rate="10000" /> 
</task:scheduled-tasks> 

<int:wire-tap pattern="*Channel" order="3" channel="wiretap"/> 
<int:message-history/> 

<int:logging-channel-adapter id="wiretap" level="DEBUG"/> 


<!-- Error handling Service Activator to log exceptions and send the response --> 
<int:service-activator id="errorHandlingServiceActivator" input-channel="gatewayErrorChannel" 
    method="invoke" output-channel="aggregateDSLResponseChannel" ref="errorHandlingService"> 
</int:service-activator> 

<!-- Error Handling Service class --> 
<bean id="errorHandlingService" class="com.mycompany.tss.ls.dsl.service.ErrorHandlingService" /> 
<bean id="wldpErrorHandlingService" class="com.mycompany.tss.ls.dsl.service.ErrorHandlingService" /> 
<bean id="irisErrorHandlingService" class="com.mycompany.tss.ls.dsl.service.ErrorHandlingService" /> 

<int:channel id="wldpStartChannel"></int:channel> 
<int:channel id="irisStartChannel"></int:channel> 
<int:channel id="wldpErrorChannel"></int:channel> 
<int:channel id="irisErrorChannel"></int:channel> 
<int:channel id="wldpOutputChannel"></int:channel> 
<int:channel id="irisOutputChannel"></int:channel> 

<int:service-activator id="midFlowWLDPActivator" input-channel="wldpInputChannel" 
    ref="midFlowWLDPGateway" output-channel="aggregateDSLResponseChannel"/> 

<int:service-activator id="midFlowIRISActivator" input-channel="irisInputChannel" 
    ref="midFlowIRISGateway" output-channel="aggregateDSLResponseChannel"/> 

<int:gateway id="midFlowWLDPGateway" default-request-channel="wldpStartChannel" 
    service-interface="com.mycompany.tss.ls.dsl.gateway.IDSLWLDPFacade" error-channel="wldpErrorChannel" 
/> 


<int:gateway id="midFlowIRISGateway" default-request-channel="irisStartChannel" 
    service-interface="com.mycompany.tss.ls.dsl.gateway.IDSLIRISFacade" error-channel="irisErrorChannel" 
/> 


<int:service-activator id="errorHandlingWLDPServiceActivator" input-channel="wldpErrorChannel" 
    method="invoke" ref="wldpErrorHandlingService"> 
</int:service-activator> 

<int:service-activator id="errorHandlingIRISServiceActivator" input-channel="irisErrorChannel" 
    method="invoke" ref="irisErrorHandlingService"> 
</int:service-activator> 

<!-- Responsible for closing all the activities related to WLDP before sending response --> 
<int:service-activator id="wldpFlowEndServiceActivator" input-channel="wldpStartChannel" 
    method="invoke" ref="wldpFlowEndService"> 
</int:service-activator> 

<bean id="wldpFlowEndService" class="com.mycompany.tss.ls.dsl.service.wldp.WLDPFlowEndService" /> 

<!-- Responsible for closing all the activities related to IRIS before sending response --> 
<int:service-activator id="irisFlowEndServiceActivator" input-channel="irisStartChannel" 
    method="invoke"> 
</int:service-activator> 


<bean id="irisFlowEndService" class="com.mycompany.tss.ls.dsl.service.iris.IrisFlowEndService" /> 

ErrorHandlingService:

общественный класс ErrorHandlingService {

Logger logger = LoggerFactory.getLogger(ErrorHandlingService.class); 

public Message<DSLResponseVO> invoke(Message<?> requestMessage) { 

    Object exception = requestMessage.getPayload(); 
    if(exception instanceof MessagingException){ 
     DSLResponseVO responseVO = new DSLResponseVO(); 
     logger.error("Exception has occurred: {}", ((MessagingException)exception).getMessage()); 
     logger.error("Exception Stacktrace is: ", exception); 
     responseVO.setRequestId((String)((MessagingException)exception).getFailedMessage().getHeaders().get("requestId")); 
     responseVO.setSystemId((String)((MessagingException)exception).getFailedMessage().getHeaders().get("systemId")); 
     responseVO.setErrorCode(DSLErrorConstants.FAILURE_CODE); 
     responseVO.setErrorMessage(DSLErrorConstants.FAILURE_DESC); 

     Message<DSLResponseVO> failedMessage = MessageBuilder.withPayload(responseVO) 
       //.copyHeadersIfAbsent(((MessagingException)exception).getFailedMessage().getHeaders()) 
       .setHeaderIfAbsent("requestId", (String)((MessagingException)exception).getFailedMessage().getHeaders().get("requestId")) 
       .setHeaderIfAbsent("systemId", (String)((MessagingException)exception).getFailedMessage().getHeaders().get("systemId")) 
       .setHeaderIfAbsent(MessageHeaders.SEQUENCE_NUMBER, (Integer)((MessagingException)exception).getFailedMessage().getHeaders().get(MessageHeaders.SEQUENCE_NUMBER)) 
       .setHeaderIfAbsent(MessageHeaders.SEQUENCE_SIZE, (Integer)((MessagingException)exception).getFailedMessage().getHeaders().get(MessageHeaders.SEQUENCE_SIZE)) 
       .setHeaderIfAbsent(MessageHeaders.REPLY_CHANNEL, ((MessagingException)exception).getFailedMessage().getHeaders().get(MessageHeaders.REPLY_CHANNEL)) 
       .build(); 

     return failedMessage; 
    } 

    return null; 
} 

} 

ответ

0

Вы не можете отправить несколько ответов на шлюз; вам необходимо агрегировать результаты (или сбои) в одно ответное сообщение.

Каждый подпоток должен отправить результат, или ошибку, и агрегатор.

Для каждого потока требуется поток ошибок. Если вы используете канал очереди вместо канала-исполнителя, вы можете поместить канал ошибки на poller.

При использовании канала-исполнителя вам нужен шлюз с промежуточным потоком, чтобы добавить поток ошибок (активатор услуги, который ссылается на <gateway/>).

+0

Спасибо Гэри за ваш ответ. Я фактически создаю ответ от потока обработки ошибок, а затем отправляю его обратно в агрегатор. Вы видите какую-то проблему? Поскольку сообщение становится агрегированным в одно сообщение, но есть предупреждение, и ответ никогда не будет выписан. Если использование poller на субпотоке с каналом ошибки и исполнителем задачи является правильным способом, я могу попробовать это ... Я надеюсь, что когда исключение выбрасывается в любом компоненте под субпотоком, оно будет иметь канал ошибки опроса субпотока, а не канал ошибки шлюза. Правильно ли я понимаю? –

+0

Я думаю, что ваш предложенный подход к очередному каналу с poller для каждого подтока является лучшим подходом. –

+0

Вы должны быть осторожны при ошибках маршрутизации в агрегаторе; здесь было несколько вопросов. Если вы не можете найти тот, на который я ответил, дайте мне знать, и я попытаюсь его найти. Суть в том, что заголовки корреляции не находятся в 'ErrorMessage', они находятся в свойстве' failMessage' полезной нагрузки сообщения об ошибке. В вашем потоке ошибок вам необходимо преобразовать в сообщение об ошибке (или иным образом рекламировать заголовки) перед отправкой ошибки в агрегатор. –

Смежные вопросы