2015-03-12 5 views
1

Я пытаюсь создать поток мула с входящей конечной точкой TCP, которая является TCP-сервером, который слушает порт. Когда удаленное соединение с клиентом идентифицируется, перед получением какого-либо запроса от клиента мне нужно написать сообщение в сокет (который позволяет клиенту знать, что я слушаю), только после чего клиент отправляет мне дальнейшие запросы. Вот как я это делаю с программой образец Java:Конфигурация TCP-сервера в Mule - запись в клиентский сокет

import java.net.*; 
import java.io.*; 

public class TCPServer 
{ 
public static void main(String[] args) throws IOException 
    { 
    ServerSocket serverSocket = null; 

    try { 
     serverSocket = new ServerSocket(4445); 
     } 
    catch (IOException e) 
     { 
     System.err.println("Could not listen on port: 4445."); 
     System.exit(1); 
     } 

    Socket clientSocket = null; 
    System.out.println ("Waiting for connection....."); 

    try { 
     clientSocket = serverSocket.accept(); 
     } 
    catch (IOException e) 
     { 
     System.err.println("Accept failed."); 
     System.exit(1); 
     } 

    System.out.println ("Connection successful"); 
    System.out.println ("Sending output message - ....."); 

    //Sending a message to the client to indicate that the server is active 
    PrintStream pingStream = new PrintStream(clientSocket.getOutputStream()); 
    pingStream.print("Server listening"); 
    pingStream.flush(); 

    //Now start listening for messages 
    System.out.println ("Waiting for incoming message - ....."); 
    PrintWriter out = new PrintWriter(clientSocket.getOutputStream(),true); 
    BufferedReader in = new BufferedReader(new InputStreamReader(clientSocket.getInputStream())); 

    String inputLine; 

    while ((inputLine = in.readLine()) != null) 
     { 
     System.out.println ("Server: " + inputLine); 
     out.println(inputLine); 

     if (inputLine.equals("Bye.")) 
      break; 
     } 

    out.close(); 
    in.close(); 
    clientSocket.close(); 
    serverSocket.close(); 
    } 
} 

Я попытался использовать TCP въездного конечную точку мула в качестве сервера, но я не могу понять, как я могу определить успешное соединение от клиента, чтобы инициировать исходящее сообщение. Поток запускается только тогда, когда сообщение отправляется от клиента. Есть ли способ расширить функциональность соединителя Mule TCP и получить прослушиватель, который мог бы выполнить вышеуказанное требование?

На основании ответа при условии, что это, как я реализовал это -

public class TCPMuleOut extends TcpMessageReceiver { 

    boolean InitConnection = false; 
    Socket clientSocket = null; 

    public TCPMuleOut(Connector connector, FlowConstruct flowConstruct, 
      InboundEndpoint endpoint) throws CreateException { 
     super(connector, flowConstruct, endpoint); 
    } 

    protected Work createWork(Socket socket) throws IOException { 
     return new MyTcpWorker(socket, this); 
    } 


    protected class MyTcpWorker extends TcpMessageReceiver.TcpWorker { 

     public MyTcpWorker(Socket socket, AbstractMessageReceiver receiver) 
       throws IOException { 
      super(socket, receiver); 
      // TODO Auto-generated constructor stub 
     } 

     @Override 
     protected Object getNextMessage(Object resource) throws Exception { 
      if (InitConnection == false) { 

       clientSocket = this.socket; 
       logger.debug("Sending logon message"); 
       PrintStream pingStream = new PrintStream(
         clientSocket.getOutputStream()); 
       pingStream.print("Log on message"); 
       pingStream.flush(); 
       InitConnection = true; 
      } 

      long keepAliveTimeout = ((TcpConnector) connector) 
        .getKeepAliveTimeout(); 

      Object readMsg = null; 
      try { 
       // Create a monitor if expiry was set 
       if (keepAliveTimeout > 0) { 
        ((TcpConnector) connector).getKeepAliveMonitor() 
          .addExpirable(keepAliveTimeout, 
            TimeUnit.MILLISECONDS, this); 
       } 

       readMsg = protocol.read(dataIn); 

       // There was some action so we can clear the monitor 
       ((TcpConnector) connector).getKeepAliveMonitor() 
         .removeExpirable(this); 

       if (dataIn.isStreaming()) { 
       } 

       return readMsg; 
      } catch (SocketTimeoutException e) { 
       ((TcpConnector) connector).getKeepAliveMonitor() 
         .removeExpirable(this); 
       System.out.println("Socket timeout"); 
      } finally { 
       if (readMsg == null) { 
        // Protocols can return a null object, which means we're 
        // done 
        // reading messages for now and can mark the stream for 
        // closing later. 
        // Also, exceptions can be thrown, in which case we're done 
        // reading. 
        dataIn.close(); 
        InitConnection = false; 
        logger.debug("Client closed"); 
       } 
      } 
      return null; 
     } 
    } 
} 

И разъем TCP, как показано ниже:

<tcp:connector name="TCP" doc:name="TCP connector" 
    clientSoTimeout="100000" receiveBacklog="0" receiveBufferSize="0" 
    sendBufferSize="0" serverSoTimeout="100000" socketSoLinger="0" 
    validateConnections="true" keepAlive="true"> 
    <receiver-threading-profile 
     maxThreadsActive="5" maxThreadsIdle="5" /> 
    <reconnect-forever /> 
    <service-overrides messageReceiver="TCPMuleOut" /> 
    <tcp:direct-protocol payloadOnly="true" /> 
</tcp:connector> 

ответ

1

То, что вы пытаетесь сделать, это немного трудно для достижения, но не невозможного. Сообщения принимаются классом org.mule.transport.tcp.TcpMessageReceiver, и этот класс всегда потребляет данные во входном потоке, чтобы создать сообщение, которое вводит в поток. Однако вы можете расширить этот приемник и проинструктировать модуль TCP, чтобы использовать ваш, добавив тег service-overrides в разъем tcp вашего потока (задокументированный here) и заменив элемент messageReceiver. В расширенном приемнике вы должны изменить метод TcpWorker.getNextMessage, чтобы отправить сообщение ack перед чтением из входного потока. HTH, Маркос.

+0

Это хороший совет. Я бы сравнил его с другим вариантом: вместо расширения классов Mule (который работает, но предоставляет вам внутренние изменения API), jvas может создать пользовательский модуль с DevKit. Последний предоставит ему поддержку Studio в качестве бонуса :) –

+0

Спасибо MarcosNC & @David Dossot. Я смог сделать это, расширив класс TCpMessageReceiver. Я верну свое решение в качестве ответа. – jvas

Смежные вопросы