2016-11-09 3 views
2

Я использую адаптер интеграции tcp-outbound-adapter и tcp-inbound-адаптер Spring для связи с внешней сторонней системой через TCP.Spring Integration tcp client multiple connections

Завод связи, который я использую, имеет тип «клиент» и имеет одноразовое значение = «ложь», поскольку характер связи с внешней системой представляет собой сеанс из нескольких десятков запросов и ответов. Внешняя система ожидает, что я открою новое TCP-соединение для каждого сеанса.

Есть ли способ сделать это с помощью интеграции с весной?

Мой код успешно использует SI для одного такого сеанса. Но я хочу, чтобы моя система открывала несколько таких соединений, поэтому я могу обрабатывать несколько параллельных сеансов. В настоящее время, если я отправляю сообщение нового сеанса входящему адаптеру, он использует то же TCP-соединение.

Пожалуйста, помогите.

UPDATE:

При использовании ThreadAffinity решение, данное Гэри здесь, мы получаем это исключение, когда мы делаем больше, чем 4 одновременных запросов. Любая идея, почему?

11:08:02.083 [pool-1-thread-2] 193.xxx.yyy.zz:443:55729:46c71372-5933-4707-a27b-93cc4bf78c59 Message sent GenericMessage [payload=byte[326], headers={replyChannel=org.springframewor[email protected]2fb866, errorChannel=org.springframewor[email protected]2fb866, ip_tcp_remotePort=55718, ip_connectionId=127.0.0.1:55718:4444:7f71ce96-eaac-4b21-8b2c-bf736102f818, ip_localInetAddress=/127.0.0.1, ip_address=127.0.0.1, id=2dc3e330-d703-8a61-c46c-012233cadf6f, ip_hostname=127.0.0.1, timestamp=1481706480700}] 
11:08:12.093 [pool-1-thread-2] Remote Timeout on 193.xxx.yyy.zz:443:55729:46c71372-5933-4707-a27b-93cc4bf78c59 
11:08:12.093 [pool-1-thread-2] Tcp Gateway exception 
org.springframework.integration.MessageTimeoutException: Timed out waiting for response 
      at org.springframework.integration.ip.tcp.TcpOutboundGateway.handleRequestMessage(TcpOutboundGateway.java:146) 
      at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109) 
      at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) 
      at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) 
      at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148) 
      at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121) 
      at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77) 
      at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:423) 
      at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:373) 
      at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) 
      at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) 
      at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) 
      at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:292) 
      at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:212) 
      at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:129) 
      at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115) 
      at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) 
      at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) 
      at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148) 
      at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121) 
      at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77) 
      at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:423) 
      at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) 
      at org.springframework.messaging.core.GenericMessagingTemplate.doSendAndReceive(GenericMessagingTemplate.java:150) 
      at org.springframework.messaging.core.GenericMessagingTemplate.doSendAndReceive(GenericMessagingTemplate.java:45) 
      at org.springframework.messaging.core.AbstractMessagingTemplate.sendAndReceive(AbstractMessagingTemplate.java:42) 
      at org.springframework.integration.core.MessagingTemplate.sendAndReceive(MessagingTemplate.java:97) 
      at org.springframework.integration.gateway.MessagingGatewaySupport.doSendAndReceive(MessagingGatewaySupport.java:441) 
      at org.springframework.integration.gateway.MessagingGatewaySupport.sendAndReceiveMessage(MessagingGatewaySupport.java:409) 
      at org.springframework.integration.ip.tcp.TcpInboundGateway.doOnMessage(TcpInboundGateway.java:120) 
      at org.springframework.integration.ip.tcp.TcpInboundGateway.onMessage(TcpInboundGateway.java:98) 
      at org.springframework.integration.ip.tcp.connection.TcpConnectionInterceptorSupport.onMessage(TcpConnectionInterceptorSupport.java:159) 
      at org.springframework.integration.ip.tcp.connection.TcpNetConnection.run(TcpNetConnection.java:182) 
      at org.springframework.integration.ip.tcp.connection.TcpConnectionInterceptorSupport.run(TcpConnectionInterceptorSupport.java:111) 
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
      at java.lang.Thread.run(Thread.java:745) 

ответ

1

Это зависит от того, что представляет собой «сеанс» - если все запросы сеанса на стороне клиента бегут все в одном потоке, вы могли бы написать простую оболочку для фабрики соединений, которые сохраняет соединение в ThreadLocal. Вам понадобится какой-то механизм для вызова фабричной оболочки после последнего запроса, чтобы закрыть соединение и удалить его из ThreadLocal.

Если запросы на сеанс могут возникать в нескольких потоках, это будет немного сложнее, но вы все равно можете сделать это с помощью ThreadLocal, который сопоставляется с экземпляром соединения.

EDIT

Вот пример ...

@SpringBootApplication 
public class So40507731Application { 

    public static void main(String[] args) throws Exception { 
     ConfigurableApplicationContext context = SpringApplication.run(So40507731Application.class, args); 
     MessageChannel channel = context.getBean("clientFlow.input", MessageChannel.class); 
     MessagingTemplate template = new MessagingTemplate(channel); 
     ThreadAffinityClientConnectionFactory affinityCF = context.getBean(ThreadAffinityClientConnectionFactory.class); 
     ExecutorService exec = Executors.newCachedThreadPool(); 
     CountDownLatch latch = new CountDownLatch(2); 
     exec.execute(() -> { 
      String result = new String(template.convertSendAndReceive("foo", byte[].class)); 
      System.out.println(Thread.currentThread().getName() + " " + result); 
      result = new String(template.convertSendAndReceive("foo", byte[].class)); 
      System.out.println(Thread.currentThread().getName() + " " + result); 
      affinityCF.release(); 
      latch.countDown(); 
     }); 
     exec.execute(() -> { 
      String result = new String(template.convertSendAndReceive("foo", byte[].class)); 
      System.out.println(Thread.currentThread().getName() + " " + result); 
      result = new String(template.convertSendAndReceive("foo", byte[].class)); 
      System.out.println(Thread.currentThread().getName() + " " + result); 
      affinityCF.release(); 
      latch.countDown(); 
     }); 
     latch.await(10, TimeUnit.SECONDS); 
     context.close(); 
     exec.shutdownNow(); 
    } 

    @Bean 
    public TcpNetClientConnectionFactory delegateCF() { 
     TcpNetClientConnectionFactory clientCF = new TcpNetClientConnectionFactory("localhost", 1234); 
     clientCF.setSingleUse(true); // so each thread gets his own connection 
     return clientCF; 
    } 

    @Bean 
    public ThreadAffinityClientConnectionFactory affinityCF() { 
     return new ThreadAffinityClientConnectionFactory(delegateCF()); 
    } 

    @Bean 
    public TcpOutboundGateway outGate() { 
     TcpOutboundGateway outGate = new TcpOutboundGateway(); 
     outGate.setConnectionFactory(affinityCF()); 
     return outGate; 
    } 

    @Bean 
    public IntegrationFlow clientFlow() { 
     return f -> f.handle(outGate()); 
    } 

    @Bean 
    public TcpNetServerConnectionFactory serverCF() { 
     return new TcpNetServerConnectionFactory(1234); 
    } 

    @Bean 
    public TcpInboundGateway inGate() { 
     TcpInboundGateway inGate = new TcpInboundGateway(); 
     inGate.setConnectionFactory(serverCF()); 
     return inGate; 
    } 

    @Bean 
    public IntegrationFlow serverFlow() { 
     return IntegrationFlows.from(inGate()) 
       .transform(Transformers.objectToString()) 
       .transform("headers['ip_connectionId'] + ' ' + payload") 
       .get(); 
    } 

    public static class ThreadAffinityClientConnectionFactory extends AbstractClientConnectionFactory 
      implements TcpListener { 

     private final AbstractClientConnectionFactory delegate; 

     private final ThreadLocal<TcpConnectionSupport> connection = new ThreadLocal<>(); 

     public ThreadAffinityClientConnectionFactory(AbstractClientConnectionFactory delegate) { 
      super("", 0); 
      delegate.registerListener(this); 
      this.delegate = delegate; 
     } 

     @Override 
     protected TcpConnectionSupport obtainConnection() throws Exception { 
      TcpConnectionSupport tcpConnection = this.connection.get(); 
      if (tcpConnection == null || !tcpConnection.isOpen()) { 
       tcpConnection = this.delegate.getConnection(); 
       this.connection.set(tcpConnection); 
      } 
      return tcpConnection; 
     } 

     public void release() { 
      TcpConnectionSupport connection = this.connection.get(); 
      if (connection != null) { 
       connection.close(); 
       this.connection.remove(); 
      } 
     } 

     @Override 
     public void start() { 
      this.delegate.start(); 
      setActive(true); 
      super.start(); 
     } 

     @Override 
     public void stop() { 
      this.delegate.stop(); 
      setActive(false); 
      super.stop(); 
     } 

     @Override 
     public boolean onMessage(Message<?> message) { 
      return getListener().onMessage(message); 
     } 

    } 

} 

Результат:

pool-2-thread-2 localhost:64559:1234:3d898822-ea91-421d-97f2-5f9620b9d369 foo 
pool-2-thread-1 localhost:64560:1234:227f8a9f-1461-41bf-943c-68a56f708b0c foo 
pool-2-thread-2 localhost:64559:1234:3d898822-ea91-421d-97f2-5f9620b9d369 foo 
pool-2-thread-1 localhost:64560:1234:227f8a9f-1461-41bf-943c-68a56f708b0c foo 
+0

Спасибо за быстрый ответ. Разве это не обычный случай использования, для которого уже существует подготовленное решение в SI? –

+0

Я предполагаю, что оболочка будет похожа на CachingClientConnectionFactory? Но как мне настроить фабрику int-ip: tcp-connection-factory, чтобы вернуть класс оболочки? –

+0

Не могли бы вы посоветовать более подробно, как это сделать? Я попробовал несколько возможностей, и никто из них не работает. Кажется, что логика наличия одной общей связи (в случае singleUse = "false") встречается во многих классах. TIA за любую помощь. –