2017-02-15 8 views
0

Я изучаю Netty и прототипирую простое приложение, которое отправляет объект через TCP. Моя проблема в том, что когда я вызываю Channel.write со стороны сервера с моим сообщением, он, похоже, не достигает обработчиков в конвейере. Когда я отправляю сообщение от клиента на сервер, он работает так, как ожидалось.Netty channel write not reach handlers

Вот код.

Сервер:

public class Main {  
    private int serverPort; 

    private EventLoopGroup bossGroup; 
    private EventLoopGroup workerGroup; 

    private ServerBootstrap boot; 
    private ChannelFuture future; 

    private SomeDataChannelDuplexHandler duplex; 

    private Channel ch; 

    public Main(int serverPort) { 
     this.serverPort = serverPort; 
    } 

    public void initialise() {  
     boot = new ServerBootstrap();  
     bossGroup = new NioEventLoopGroup(); 
     workerGroup = new NioEventLoopGroup(); 

     boot.group(bossGroup, workerGroup) 
      .channel(NioServerSocketChannel.class) 
      .childHandler(new ChannelInitializer<SocketChannel>() { 
       @Override 
       public void initChannel(SocketChannel ch) throws Exception { 
        ch.pipeline().addLast("idleStateHandler", new IdleStateHandler(0, 0, 2)); 

        // Inbound 
        ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(65535, 0, 2, 0, 0)); 
        ch.pipeline().addLast(new SomeDataDecoder()); 

        // Outbound 
        ch.pipeline().addLast(new LengthFieldPrepender(2)); 
        ch.pipeline().addLast(new SomeDataEncoder()); 

        // In-Out 
        ch.pipeline().addLast(new SomeDataChannelDuplexHandler()); 
       } 
      })  
      .option(ChannelOption.SO_BACKLOG, 128) 
      .childOption(ChannelOption.SO_KEEPALIVE, true); 
    } 

    public void sendMessage() { 
     SomeData fd = new SomeData("hello", "localhost", 1234);  
     ChannelFuture future = ch.writeAndFlush(fd);   
     future.addListener(new ChannelFutureListener() { 
      @Override 
      public void operationComplete(ChannelFuture future) throws Exception { 
       if (!future.isSuccess()) { 
        System.out.println("send error: " + future.cause().toString()); 
       } else { 
        System.out.println("send message ok"); 
       } 
      } 
     }); 
    } 

    public void startServer(){ 
     try { 
      future = boot.bind(serverPort) 
        .sync() 
        .addListener(new ChannelFutureListener() { 
         @Override 
         public void operationComplete(ChannelFuture future) throws Exception { 
          ch = future.channel(); 
         } 
      }); 
     } catch (InterruptedException e) { 
      // log failure 
     } 
    } 

    public void stopServer() { 
     workerGroup.shutdownGracefully() 
      .addListener(e -> System.out.println("workerGroup shutdown")); 

     bossGroup.shutdownGracefully() 
      .addListener(e -> System.out.println("bossGroup shutdown")); 
    } 

    public static void main(String[] args) throws InterruptedException { 

     Main m = new Main(5000); 

     m.initialise(); 
     m.startServer(); 

     final Scanner scanner = new Scanner(System.in); 

     System.out.println("running."); 

     while (true) { 

      final String input = scanner.nextLine(); 

      if ("q".equals(input.trim())) { 
       break; 
      } else { 
       m.sendMessage(); 
      } 
     } 

     scanner.close(); 
     m.stopServer(); 
    } 
} 

Обработчик дуплексный канал:

public class SomeDataChannelDuplexHandler extends ChannelDuplexHandler { 

    @Override 
    public void channelActive(ChannelHandlerContext ctx) { 
     System.out.println("duplex channel active"); 
     ctx.fireChannelActive(); 
    } 

    @Override 
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 
     System.out.println("duplex channelRead"); 
     if (msg instanceof SomeData) { 
      SomeData sd = (SomeData) msg; 
      System.out.println("received: " + sd); 
     } else { 
      System.out.println("some other object"); 
     } 
     ctx.fireChannelRead(msg); 
    } 

    @Override 
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { 
     cause.printStackTrace(); 
     ctx.close(); 
    } 

    @Override 
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { 
     if (evt instanceof IdleStateEvent) { 
      IdleStateEvent event = (IdleStateEvent) evt; 
      if (event.state() == IdleState.ALL_IDLE) { // idle for no read and write 
       System.out.println("idle: " + event.state()); 
      } 
     } 
    } 
} 

И, наконец, кодер (декодер аналогичен):

public class SomeDataEncoder extends MessageToByteEncoder<SomeData> { 

    @Override 
    protected void encode(ChannelHandlerContext ctx, SomeData msg, ByteBuf out) throws Exception { 

     System.out.println("in encoder, msg = " + msg); 
     ByteArrayOutputStream bos = new ByteArrayOutputStream(); 
     ObjectOutputStream oos = new ObjectOutputStream(bos); 

     oos.writeObject(msg.getName()); 
     oos.writeObject(msg.getIp()); 
     oos.writeInt(msg.getPort()); 
     oos.close(); 

     byte[] serialized = bos.toByteArray(); 
     int size = serialized.length; 

     ByteBuf encoded = ctx.alloc().buffer(size); 
     encoded.writeBytes(bos.toByteArray()); 

     out.writeBytes(encoded); 
    } 
} 

клиентской стороны:

public class Client { 

    String host = "10.188.36.66"; 
    int port = 5000; 

    EventLoopGroup workerGroup = new NioEventLoopGroup(); 
    ChannelFuture f; 
    private Channel ch; 

    public Client() { 
    } 

    public void startClient() throws InterruptedException { 
     Bootstrap boot = new Bootstrap(); 
     boot.group(workerGroup); 
     boot.channel(NioSocketChannel.class); 
     boot.option(ChannelOption.SO_KEEPALIVE, true); 
     boot.handler(new ChannelInitializer<SocketChannel>() { 
      @Override 
      public void initChannel(SocketChannel ch) throws Exception {    
       // Inbound 
       ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(65535, 0, 2, 0, 0)); 
       ch.pipeline().addLast(new SomeDataDecoder()); 

       // Outbound 
       ch.pipeline().addLast(new LengthFieldPrepender(2)); 
       ch.pipeline().addLast(new SomeDataEncoder()); 

       // Handler 
       ch.pipeline().addLast(new SomeDataHandler()); 
      } 
     }); 

     // Start the client 
     f = boot.connect(host, port).sync(); 
     f.addListener(new ChannelFutureListener() { 
      public void operationComplete(ChannelFuture future) throws Exception { 
       System.out.println("connected to server"); 
       ch = f.channel(); 
      } 
     }); 
    } 

    public void stopClient() {  
     workerGroup.shutdownGracefully(); 
    } 

    private void writeMessage(String input) { 
     SomeData data = new SomeData("client", "localhost", 3333); 
     ChannelFuture fut = ch.writeAndFlush(data); 
     fut.addListener(new ChannelFutureListener() { 
      @Override 
      public void operationComplete(ChannelFuture future) throws Exception { 
       System.out.println("send message"); 
      } 
     }); 
    } 

    public static void main(String[] args) throws InterruptedException { 
     Client client = new Client(); 
     client.startClient();   

     System.out.println("running.\n\n"); 
     final Scanner scanner = new Scanner(System.in); 

     while (true) { 

      final String input = scanner.nextLine(); 

      if ("q".equals(input.trim())) { 
       break; 
      } else { 
       client.writeMessage(input); 
      } 
     } 

     scanner.close(); 
     client.stopClient(); //call this at some point to shutdown the client 
    } 

} 

и обработчик:

public class SomeDataHandler extends SimpleChannelInboundHandler<SomeData> { 

    private ChannelHandlerContext ctx; 

    @Override 
    public void channelActive(ChannelHandlerContext ctx) { 
     System.out.println("connected"); 
     this.ctx = ctx; 
    } 

    @Override 
    protected void channelRead0(ChannelHandlerContext ctx, SomeData msg) throws Exception { 
     System.out.println("got message: " + msg); 
    } 

    @Override 
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { 
     System.out.println("caught exception: " + cause.getMessage()); 
     ctx.close(); 
    } 
} 

Когда я отправить сообщение через консоль на стороне сервера, я получаю выход:

running. 
duplex channel active 
duplex read 
idle: ALL_IDLE 
idle: ALL_IDLE 

send message ok 

Так это выглядит, как будто отправляется сообщение, но ничего получен на стороне клиента.

Когда я делаю это на стороне клиента я получаю (на консоли сервера):

in decoder, numBytes in message = 31 
duplex channelRead 
received: SomeData [name=client, ip=localhost, port=3333] 

чего я ожидал.

Итак, где проблема? Это как-то связано с использованием ChannelDuplexHandler на стороне сервера и SimpleChannelInboundHandler на стороне клиента? Есть ли что-то, что мне нужно для вызова сообщения по трубопроводу?

UPDATE Я добавил чек на future.isSuccess() в серверном методе SendMessage и я получаю send error: java.lang.UnsupportedOperationException на консоли.

ответ

1

(Отправлено от имени ОП).

Для всех, кому это интересно, проблема заключалась в том, что я пытался отправить сообщение на серверный канал, а не на обычный канал. This post указал мне в правильном направлении.

Смежные вопросы