2015-07-01 3 views
0

У меня есть входящая конечная точка TCP, которая ссылается на TCP-коннектор. Это конечная точка запроса-ответа. Клиент TCP является сторонним приложением, которое отправляет запросы только на один сокет. Вот как я установил конечную точку TCP. Endpoint:Входящая конечная точка TCP - Mule ESB - многопоточность

<tcp:inbound-endpoint exchange-pattern="request-response" 
      responseTimeout="10000" doc:name="TCP" address="${Endpoint}" encoding="ISO-8859-1" connector-ref="TCP"/> 

Разъем:

<tcp:connector name="TCP" doc:name="TCP connector" 
    clientSoTimeout="${Client_SO_Timeout}" receiveBacklog="0" receiveBufferSize="0" 
    sendBufferSize="0" serverSoTimeout="${Server_SO_Timeout}" socketSoLinger="0" 
    validateConnections="true" keepAlive="true" sendTcpNoDelay="true"> 
      <receiver-threading-profile maxThreadsActive="${TCP_MaxThreadsActive}" maxThreadsIdle = "${TCP_MaxThreadsIdle}" /> 
    <reconnect-forever /> 
    <service-overrides messageReceiver="CustomMessageReceiver" /> 
    <tcp:custom-protocol ref="CustomLengthProtocol" /> 
</tcp:connector> 

поток работает нормально. Но когда необходимо обрабатывать параллельные запросы, последние несколько запросов синхронизируются. Я понимаю, что сообщения ожидаются в обрабатываемом приемнике (поскольку используется только один сеанс TCP) до тех пор, пока предыдущий запрос не будет завершен потоком мула.

Чтобы настроить это, я ищу способ изменить поток мула, как показано ниже: После получения запросов от клиента мне нужно отправить его в поток мула, который может обрабатывать его асинхронно и нажать ответ на тот же сокет. Как только запрос будет получен в конечной точке, ему не нужно ждать завершения потока предыдущего запроса до обработки следующего запроса. Нет требования сохранять последовательность запросов/ответов от потока мула. Есть ли способ достичь этого, расширив функциональность конечной точки мула TCP? Это похоже на стратегию обработки очереди асинхронного потока, за исключением того, что ответ должен быть отправлен обратно в исходный TCP-сокет.

ответ

0

Это, как я решил это: (я) Вместо того, чтобы использовать протокол TCP запроса-ответа въездной конечной точки, я использую TCP въездного (в одну сторону) и TCP исходящего (односторонний) (II) TCP-Inbound получает запрос, протокол пользовательской длины разбивает сообщение и передает его в поток. (iii) Используйте область запроса-запроса с конечными точками VM (входящие исходящие & исходящие) (iv) Конечные точки vm направляют сообщение отдельному потоку, который имеет обработкуStrategy как «queued-асинхронный». Я планирую настроить maxThreads на этот уровень потока для объединения потоков. (v) Второй поток выполняет бизнес-логику, а ответ отправляется обратно в основной поток, который должен быть отправлен в сокет. (vi) Чтобы получить доступ к сокету из входящей конечной точки, я переопределил метод preRouteMuleMessage в классе TCPMessageReceiver и добавил свойство outbound, называемое «ClientSocket», в mulemessage. Затем я распространяю это свойство до конца основного потока на исходящую конечную точку. На конечной точке TCPOutbound я создал собственный TCPMessageDispatcher и расширил метод doDispatch. Вместо использования сокета из пула потоков исходящей конечной точки я использую объект сокета, который был разделен как часть mulemessage. Образец расхода:

<tcp:connector name="TCP" doc:name="TCP connector" 
    clientSoTimeout="70000" receiveBacklog="0" receiveBufferSize="0" 
    sendBufferSize="0" serverSoTimeout="70000" socketSoLinger="0" 
    validateConnections="true" keepAlive="true" sendTcpNoDelay="true" keepSendSocketOpen="true"> 
    <receiver-threading-profile 
     maxThreadsActive="1" maxThreadsIdle="1" /> 
    <reconnect-forever /> 
    <service-overrides messageReceiver="CustomMessageReceiver" /> 

    <tcp:custom-protocol ref="CustomLengthProtocol" /> 
</tcp:connector> 

<tcp:connector name="TCP2" doc:name="TCP connector" 
    clientSoTimeout="70000" receiveBacklog="0" receiveBufferSize="0" 
    sendBufferSize="0" serverSoTimeout="70000" socketSoLinger="0" 
    validateConnections="true" keepAlive="true" sendTcpNoDelay="true" 
    keepSendSocketOpen="true"> 
    <receiver-threading-profile 
     maxThreadsActive="1" maxThreadsIdle="1" /> 
    <reconnect-forever /> 
    <service-overrides dispatcherFactory="CustomMessageDispatcherFactory"/> 
    <tcp:custom-protocol ref="CustomLengthProtocol" /> 

</tcp:connector> 


<spring:beans> 
    <spring:bean id="CustomLengthProtocol" name="CustomLengthProtocol" 
     class="CustomLengthProtocol" /> 
</spring:beans> 
<flow name="tcptestFlow" doc:name="tcptestFlow"> 
    <tcp:inbound-endpoint address="tcp://localhost:4444" 
     responseTimeout="100000" doc:name="TCP" connector-ref="TCP" /> 
    <byte-array-to-string-transformer 
     doc:name="Byte Array to String" /> 
    <logger level="INFO" category="Expression" doc:name="Logger" /> 

    <set-session-variable variableName="Socket" 
     value="#[message.outboundProperties['ClientSocket']]" 
     doc:name="Session Variable" /> 
    <logger message="#[payload] - #[Socket]" level="INFO" category="Request" 
     doc:name="Logger" /> 

    <request-reply doc:name="Request-Reply"> 
     <vm:outbound-endpoint exchange-pattern="one-way" 
      path="/qin" doc:name="VM" > 
      <message-properties-transformer scope="outbound"> 
       <delete-message-property key="MULE_REPLYTO"></delete-message-property> 
      </message-properties-transformer> 
      </vm:outbound-endpoint> 
     <vm:inbound-endpoint exchange-pattern="one-way" 
      path="/qout" doc:name="VM" /> 
    </request-reply> 
    <logger message="#[payload]" level="INFO" doc:name="Logger" 
     category="Response" /> 
    <string-to-byte-array-transformer 
     doc:name="String to Byte Array" /> 
    <tcp:outbound-endpoint address="tcp://localhost:4444" 
     responseTimeout="10000" doc:name="TCP" connector-ref="TCP2" /> 

</flow> 
<flow name="tcptestFlow1" processingStrategy="queued-asynchronous" 
    doc:name="tcptestFlow1"> 

    <vm:inbound-endpoint exchange-pattern="one-way" 
     path="/qin" doc:name="VM" /> 
    <logger message="Inside VM Flow" level="INFO" doc:name="Logger" /> 
    <set-payload value="Appended Response - #[payload]" 
     doc:name="Set Payload" /> 
    <vm:outbound-endpoint exchange-pattern="one-way" 
     path="/qout" doc:name="VM" /> 

</flow> 
Смежные вопросы