2015-10-15 2 views
1

Так что я недавно немного поработал с netty (5.0.0.Alpha2), и мне это очень нравится! Но, к сожалению, я не могу получать сообщения/получать сообщения. Странная вещь в том, что соединение и отключение работают как шарм. Сервер получает сообщения, которые клиент подключил/отключил, и канал добавляется/удаляется. Просто сообщения не работают должным образом.Netty: проблемы с отправкой сообщения

Я попробовал это многими другими способами (например, без кодировщика), но он никогда не работал. Может быть, у кого-то есть идея? Я действительно был бы признателен!

Заранее благодарен! Вы можете найти исходный код, используемый ниже:

CoreClient.java

package me.creepsterlgc.coreclient; 

import io.netty.bootstrap.Bootstrap; 
import io.netty.channel.Channel; 
import io.netty.channel.ChannelFuture; 
import io.netty.channel.ChannelInitializer; 
import io.netty.channel.ChannelPipeline; 
import io.netty.channel.EventLoopGroup; 
import io.netty.channel.nio.NioEventLoopGroup; 
import io.netty.channel.socket.SocketChannel; 
import io.netty.channel.socket.nio.NioSocketChannel; 
import io.netty.handler.codec.DelimiterBasedFrameDecoder; 
import io.netty.handler.codec.Delimiters; 
import io.netty.handler.codec.string.StringDecoder; 
import io.netty.handler.codec.string.StringEncoder; 

public class CoreClient extends Thread { 

    private String host; 
    private int port; 

    private Channel channel; 

    public CoreClient(String host, int port) { 
     this.host = host; 
     this.port = port; 
    } 

    public void run() { 
     EventLoopGroup group = new NioEventLoopGroup(); 

     try { 
      Bootstrap bootstrap = new Bootstrap() 
      .group(group) 
      .channel(NioSocketChannel.class) 
      .handler(new ChannelInitializer<SocketChannel>() { 

        @Override 
        protected void initChannel(SocketChannel channel) throws Exception { 

         ChannelPipeline pipeline = channel.pipeline(); 
         pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter())); 
         pipeline.addLast("encoder", new StringEncoder()); 
         pipeline.addLast("decoder", new StringDecoder()); 
         pipeline.addLast("handler", new CoreClientHandler()); 

        } 

      }); 

      ChannelFuture f = bootstrap.connect(host, port); 
      channel = f.sync().channel(); 
      ChannelFuture cf = null; 
      try { 
       cf = channel.writeAndFlush("Testing..").sync(); 
      } catch (InterruptedException e) { 
       // TODO Auto-generated catch block 
       e.printStackTrace(); 
      } 
      if (!cf.isSuccess()) { 
       System.out.println("Send failed: " + cf.cause()); 
      } 

     } 
     catch (InterruptedException e) { 
      e.printStackTrace();  
     } 
     finally { 

     } 

    } 

    public void send(String message) { 
     ChannelFuture cf = null; 
     try { 
      cf = channel.writeAndFlush(message).sync(); 
     } catch (InterruptedException e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     } 
     channel.flush(); 
     if (!cf.isSuccess()) { 
      System.out.println("Send failed: " + cf.cause()); 
     } 
    } 

    public void shutdown() { 

    } 

} 

CoreClientHandler.java

package me.creepsterlgc.coreclient; 

import io.netty.channel.Channel; 
import io.netty.channel.ChannelHandlerAdapter; 
import io.netty.channel.ChannelHandlerContext; 
import io.netty.channel.group.ChannelGroup; 
import io.netty.channel.group.DefaultChannelGroup; 
import io.netty.util.concurrent.GlobalEventExecutor; 

public class CoreClientHandler extends ChannelHandlerAdapter { 

    ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); 

    @Override 
    public void channelRead(ChannelHandlerContext context, Object message) throws Exception { 
     context.write(message); 
     Channel channel = context.channel(); 
     Log.message(channel.remoteAddress().toString(), message.toString()); 
    } 

    @Override 
    public void channelReadComplete(ChannelHandlerContext context) { 
     context.flush(); 
    } 

    @Override 
    public void exceptionCaught(ChannelHandlerContext context, Throwable cause) { 
     cause.printStackTrace(); 
     context.close(); 
    } 

} 

CoreServer.java

package me.creepsterlgc.coreserver; 

import io.netty.bootstrap.ServerBootstrap; 
import io.netty.channel.ChannelFuture; 
import io.netty.channel.ChannelInitializer; 
import io.netty.channel.ChannelPipeline; 
import io.netty.channel.EventLoopGroup; 
import io.netty.channel.nio.NioEventLoopGroup; 
import io.netty.channel.socket.SocketChannel; 
import io.netty.channel.socket.nio.NioServerSocketChannel; 
import io.netty.handler.codec.DelimiterBasedFrameDecoder; 
import io.netty.handler.codec.Delimiters; 
import io.netty.handler.codec.string.StringDecoder; 
import io.netty.handler.codec.string.StringEncoder; 

public class CoreServer extends Thread { 

    private int port; 

    public CoreServer(int port) { 
     this.port = port; 
    } 

    public void run() { 

     EventLoopGroup boss = new NioEventLoopGroup(); 
     EventLoopGroup worker = new NioEventLoopGroup(); 

     try { 
      ServerBootstrap bootstrap = new ServerBootstrap() 
      .group(boss, worker) 
      .channel(NioServerSocketChannel.class) 
      .childHandler(new ChannelInitializer<SocketChannel>() { 

        @Override 
        protected void initChannel(SocketChannel channel) throws Exception { 

         ChannelPipeline pipeline = channel.pipeline(); 
         pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter())); 
         pipeline.addLast("encoder", new StringEncoder()); 
         pipeline.addLast("decoder", new StringDecoder()); 
         pipeline.addLast("handler", new CoreServerHandler()); 

        } 

      }); 

      ChannelFuture f = bootstrap.bind(port).sync(); 
      f.channel().closeFuture().sync(); 

     } 
     catch (InterruptedException e) { 
      e.printStackTrace(); 
     } 
     finally { 
      boss.shutdownGracefully(); 
      worker.shutdownGracefully(); 
     } 

    } 

} 

CoreServerHandler.java

package me.creepsterlgc.coreserver; 

import io.netty.buffer.ByteBuf; 
import io.netty.channel.Channel; 
import io.netty.channel.ChannelFuture; 
import io.netty.channel.ChannelHandlerAdapter; 
import io.netty.channel.ChannelHandlerContext; 
import io.netty.channel.group.ChannelGroup; 
import io.netty.channel.group.DefaultChannelGroup; 
import io.netty.util.ReferenceCountUtil; 
import io.netty.util.concurrent.GlobalEventExecutor; 

public class CoreServerHandler extends ChannelHandlerAdapter { 

    public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); 

    @Override 
    public void handlerAdded(ChannelHandlerContext context) { 
     Channel channel = context.channel(); 
     channels.add(channel); 
     Log.connect(channel.remoteAddress().toString()); 
     System.out.println("There are currently " + channels.size() + " clients connected."); 
     ChannelFuture cf = null; 
     cf = channel.write("Successfully connected to: master"); 
     channel.flush(); 
     if (!cf.isSuccess()) { 
      System.out.println("Send failed: " + cf.cause()); 
     } 
    } 

    @Override 
    public void handlerRemoved(ChannelHandlerContext context) { 
     Channel channel = context.channel(); 
     channels.remove(channel); 
     Log.disconnect(channel.remoteAddress().toString()); 
    } 

    @Override 
    public void channelRead(ChannelHandlerContext context, Object message) throws Exception { 
     context.write(message); 
     System.out.println("Received: "); 
     ByteBuf in = (ByteBuf) message; 
     try { 
      while (in.isReadable()) { 
       System.out.print((char) in.readByte()); 
       System.out.flush(); 
      } 
     } finally { 
      ReferenceCountUtil.release(message); 
     } 
     Channel channel = context.channel(); 
     Log.message(channel.remoteAddress().toString(), message.toString()); 
    } 

    @Override 
    public void channelReadComplete(ChannelHandlerContext context) { 
     context.flush(); 
    } 

    @Override 
    public void exceptionCaught(ChannelHandlerContext context, Throwable cause) { 
     cause.printStackTrace(); 
     context.close(); 
    } 

    public static void read() { 
     for(Channel channel : channels) channel.read(); 
    } 

} 
+0

что это версия Нетти вы используете? – Sudheera

+0

Hi Sudheera, Iam, используя следующую версию: 5.0.0.Alpha2 – CreepsterLGC

ответ

0

Вы должны заменить

try { 
    cf = channel.write("Testing..").sync(); 
} catch (InterruptedException e) { 
    // TODO Auto-generated catch block 
    e.printStackTrace(); 
} 
channel.flush(); 

с:

try { 
    cf = channel.writeAndFlush("Testing..").sync(); 
} catch (InterruptedException e) { 
    // TODO Auto-generated catch block 
    e.printStackTrace(); 
} 
+0

Привет, Norman, спасибо за ваш андерсер! Я заменил channel.flush(); с прямым .writeAndFlush(); но, похоже, до сих пор не отправлено сообщение. Я получаю следующий журнал после перенаправления клиента дважды: http://pastebin.com/yaYewqUY – CreepsterLGC

Смежные вопросы