2014-09-12 6 views
4

У меня есть простой автономный тест клиент-сервер, в котором клиент отправляет 500 байтов на сервер, а сервер возвращает 2000 байт обратно в ответ. Он работает в цикле, и я печатаю время на каждые 50000 таких запросов на запрос/ответ. Я сравниваю производительность трех реализаций на основе API-интерфейса блокирующего сокета, Netty и NIO2. Тест показывает, что блокирующий сокет выполняет значительно быстрее, чем Netty или NIO2. Я понимаю, что в этом тесте нет параллелизма, для которого был разработан NIO. Тем не менее, имеет ли это различие в производительности объяснение или я делаю что-то очень неэффективно? Есть ли способ улучшить код на основе Netty для достижения производительности, близкой к блокировке сокета? Я попытался использовать прямой буфер для чтения - никакой существенной разницы.Производительность Netty против блокирующего сокета

Тесты выполнялись с помощью java 1.7.0_55 на двух серверах Linux в гигабитной сети. Результаты первых четырех чтений из этих испытаний, в миллисекундах, были:

  • Blocking: 9754, 9307, 9305
  • Нетти: 14879, 11872, 11781
  • NiO2: 14474, 12117, 12149

Еще одна загадка заключается в том, что реализации Netty и NIO2 выполняются медленно в начале, а затем стабилизируются. В случае Netty стабилизация происходит примерно через 10000 циклов.

Ниже приведен исходный код.

Config.java - используется всеми тремя реализациями

public class Config { 
    static final String HOST = "192.168.1.121"; 
    static final int PORT = 10000; 

    static int requestLength = 500; 
    static int responseLength = 2000; 
    static int numOfCalls = 50000; 

    static byte[] request = new byte[requestLength]; 
    static byte[] response = new byte[responseLength]; 
} 

BlockingClient.java

public class BlockingClient { 

    public static void main(String[] args) { 
     Socket socket = null; 
     try { 
      socket = new Socket(Config.HOST, Config.PORT); 

      InputStream is = socket.getInputStream(); 
      OutputStream os = socket.getOutputStream(); 
      int callCount = 0; 

      long startTime = System.currentTimeMillis(); 

      while (true) { 
       os.write(Config.request); 
       read(is, Config.response); 
       callCount++; 
       if (callCount == Config.numOfCalls) { 
        System.out.println("numOfcalls=" + Config.numOfCalls + " time: " + (System.currentTimeMillis() - startTime)); 
        callCount = 0; 
        startTime = System.currentTimeMillis(); 
       } 
      } 
     } catch (Exception e) { 
      e.printStackTrace(); 
     } finally { 
      try { 
       socket.close(); 
      } catch (IOException e) { 
       e.printStackTrace(); 
      } 
     } 
    } 

    public static void read(InputStream is, byte[] bytes) throws IOException { 
     int num = 0; 
     while(num < bytes.length) { 
      num += is.read(bytes, num, bytes.length - num); 
     } 
    } 

} 

BlockingServer.java

public class BlockingServer { 

    public static void main(String[] args) { 
     try { 
      ServerSocket srvSocket = new ServerSocket(Config.PORT); 

      while (true) { 
       final Socket socket = srvSocket.accept(); 

       new Thread() { 
        @Override 
        public void run() { 
         try { 
          InputStream is = socket.getInputStream(); 
          OutputStream os = socket.getOutputStream(); 
          while (true) { 
           BlockingClient.read(is, Config.request); 
           os.write(Config.response); 
          } 
         } catch (Exception e) { 
          e.printStackTrace(); 
         } finally { 
          try { 
           socket.close(); 
          } catch (IOException e) { 
           e.printStackTrace(); 
          } 
         } 
        } 
       }.start(); 
      } 
     } catch (Exception e) { 
      e.printStackTrace(); 
     } 

    } 

} 

NettyClient.java

public final class NettyClient { 

     public static void main(String[] args) throws Exception { 
      EventLoopGroup group = new NioEventLoopGroup(); 
      try { 
       Bootstrap b = new Bootstrap(); 
       b.group(group) 
       .channel(NioSocketChannel.class) 
       .handler(new ChannelInitializer<SocketChannel>() { 
        @Override 
        public void initChannel(SocketChannel ch) throws Exception { 
         ChannelPipeline p = ch.pipeline(); 
         p.addLast(
           new NettyClientHandler()); 
        } 
       }); 

       b.connect(Config.HOST, Config.PORT).sync().channel().closeFuture().sync(); 

      } finally { 
       group.shutdownGracefully(); 
      } 
     } 
    } 

NettyClientHandler.java

public class NettyClientHandler extends ChannelInboundHandlerAdapter { 

    private static ByteBuf responseBuf = Unpooled.wrappedBuffer(Config.response).clear(); 
    //private static ByteBuf responseBuf = Unpooled.directBuffer(Config.responseLength).clear(); 

    private int readLen = 0; 
    private int callCount = 0; 
    private long startTime; 
    private long chunks = 0; 

    @Override 
    public void channelActive(ChannelHandlerContext ctx) { 
     // Send the first message 
     initLog(); 
     writeRequest(ctx); 
    } 

    @Override 
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 
     ByteBuf buf = (ByteBuf)msg; 

     int received = buf.readableBytes(); 
     responseBuf.writeBytes(buf); 
     readLen += received; 
     chunks++; 

     if (readLen == Config.responseLength) { 
      if (responseBuf.isWritable()) { 
       System.out.println("Error. responseBuf.isWritable()==true"); 
      } 
      readLen = 0; 
      responseBuf.clear(); 

      if (callCount++ == Config.numOfCalls - 1) { 
       doLog(); 
       initLog(); 
      } 
      writeRequest(ctx); 

     } else if (readLen > Config.responseLength) { 
      System.out.println("Error. readLen is too big: " + readLen); 
     } 

     buf.release(); 

    } 

    private void initLog() { 
     callCount = 0; 
     chunks = 0; 
     startTime = System.currentTimeMillis(); 
    } 

    private void doLog() { 
     System.out.println(Config.numOfCalls + " performed in " + chunks + " chunks, time: "+ (System.currentTimeMillis() - startTime)); 
    } 

    private void writeRequest(ChannelHandlerContext ctx) { 
     ctx.writeAndFlush(Unpooled.wrappedBuffer(Config.request)); 
    } 

    @Override 
    public void channelInactive(ChannelHandlerContext ctx) throws Exception { 
     ctx.fireChannelInactive(); 
    } 

    @Override 
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 
     cause.printStackTrace(); 
     ctx.close(); 
    } 

} 

NettyServer.java

public final class NettyServer { 

    public static void main(String[] args) throws Exception { 
     EventLoopGroup group = new NioEventLoopGroup(1); 
     try { 
      ServerBootstrap b = new ServerBootstrap(); 
      b.group(group, group) 
      .channel(NioServerSocketChannel.class) 
      .childHandler(new ChannelInitializer<SocketChannel>() { 
       @Override 
       public void initChannel(SocketChannel ch) throws Exception { 
        ChannelPipeline p = ch.pipeline(); 
        p.addLast(
          new NettyServerHandler() 
        ); 
       } 
      }); 

      b.bind(Config.PORT).sync().channel().closeFuture().sync(); 
     } finally { 
      group.shutdownGracefully(); 
     } 
    } 
} 

NettyServerHandler.java

public class NettyServerHandler extends ChannelInboundHandlerAdapter { 

    private static ByteBuf requestBuf = Unpooled.wrappedBuffer(Config.request).clear(); 
    //private static ByteBuf requestBuf = Unpooled.directBuffer(Config.requestLength).clear();; 

    private int readLen = 0; 

    @Override 
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 
     ByteBuf buf = (ByteBuf)msg; 

     int received = buf.readableBytes(); 
     requestBuf.writeBytes(buf); 
     readLen += received; 

     if (readLen == Config.requestLength) { 
      if (requestBuf.isWritable()) { 
       System.out.println("requestBuf.isWritable"); 
      } 
      readLen = 0; 
      requestBuf.clear(); 
      writeResponse(ctx); 
     } else if (readLen > Config.responseLength) { 
      System.out.println("readLen is too big: " + readLen); 
     } 

     buf.release(); 

    } 

    private void writeResponse(ChannelHandlerContext ctx) { 
     ctx.writeAndFlush(Unpooled.wrappedBuffer(Config.response)); 
    } 

    @Override 
    public void channelInactive(ChannelHandlerContext ctx) throws Exception { 
     ctx.fireChannelInactive(); 
    } 

    @Override 
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 
     cause.printStackTrace(); 
     ctx.close(); 
    } 

} 

Nio2Base.java

public abstract class Nio2Base { 

    public static int numOfCalls = 50000; 

    abstract ByteBuffer getWriteBuffer(); 
    abstract ByteBuffer getReadBuffer(); 
    abstract void messageReceived(ByteBuffer buffer); 

    protected class ReadHandler implements CompletionHandler<Integer, Void> { 
     private AsynchronousSocketChannel channel; 
     private ByteBuffer buffer; 

     ReadHandler(AsynchronousSocketChannel channel, ByteBuffer buffer) { 
      this.channel = channel; 
      this.buffer = buffer; 
     } 

     @Override 
     public void completed(Integer result, Void a) { 
      if (buffer.hasRemaining()) { 
       channel.read(buffer, null, this); 
      } else { 
       messageReceived(buffer); 
       buffer.clear(); 
       ByteBuffer writeBuffer = getWriteBuffer(); 
       channel.write(writeBuffer, null, new WriteHandler(channel, writeBuffer)); 
      } 

     } 

     @Override 
     public void failed(Throwable exc, Void a) { 
      exc.printStackTrace(); 
     } 

    } 

    protected class WriteHandler implements CompletionHandler<Integer, Void> { 
     private AsynchronousSocketChannel channel; 
     private ByteBuffer buffer; 

     WriteHandler(AsynchronousSocketChannel channel, ByteBuffer buffer) { 
      this.channel = channel; 
      this.buffer = buffer; 
     } 

     @Override 
     public void completed(Integer result, Void attachment) { 
      if (buffer.hasRemaining()) { 
       channel.write(buffer, null, this); 
      } else { 
       buffer.clear(); 
       ByteBuffer readBuffer = getReadBuffer(); 
       channel.read(readBuffer, null, new ReadHandler(channel, readBuffer)); 
      } 
     } 

     @Override 
     public void failed(Throwable exc, Void attachment) { 
      exc.printStackTrace(); 
     } 
    } 

} 

Nio2Client.java

public class Nio2Client extends Nio2Base { 

    private static ByteBuffer requestBuffer = ByteBuffer.wrap(Config.request); 
    private static ByteBuffer readBuffer = ByteBuffer.wrap(Config.response); 

    private int count; 
    private long startTime; 
    private AsynchronousSocketChannel channel; 

    public static void main(String[] args) throws Exception { 
     new Nio2Client().init(); 

     // Wait 
     System.in.read(); 
    } 

    public void init() { 
     // create an asynchronous socket channel bound to the default group 
     try { 
      channel = AsynchronousSocketChannel.open(); 
      if (channel.isOpen()) { 
       // connect this channel's socket 
       channel.connect(new InetSocketAddress(Config.HOST, Config.PORT), null, new ConnectHandler(channel)); 
      } else { 
       System.out.println("The asynchronous socket channel cannot be opened!"); 
      } 
     } catch (IOException ex) { 
      System.err.println(ex); 
     } 
    } 

    private class ConnectHandler implements CompletionHandler<Void, Void> { 
     private AsynchronousSocketChannel channel; 

     public ConnectHandler(AsynchronousSocketChannel channel) { 
      this.channel = channel; 
     } 

     @Override 
     public void completed(Void result, Void attachment) { 
      try { 
       System.out.println("Successfully connected at: " + channel.getRemoteAddress()); 
       ByteBuffer buffer = getWriteBuffer(); 
       startTime = System.currentTimeMillis(); 
       count = 0; 
       channel.write(buffer, null, new WriteHandler(channel, buffer)); 

      } catch (Exception e) { 
       System.err.println(e); 
      } 
     } 

     @Override 
     public void failed(Throwable exc, Void attachment) { 
      exc.printStackTrace(); 
      throw new UnsupportedOperationException("Connection cannot be established!"); 
     } 

    } 

    @Override 
    ByteBuffer getWriteBuffer() { 
     ByteBuffer ret = requestBuffer.duplicate(); 
     ret.position(ret.capacity()); 
     ret.flip(); 
     return ret; 
    } 

    @Override 
    ByteBuffer getReadBuffer() { 
     return (ByteBuffer)readBuffer.clear(); 
    } 

    @Override 
    void messageReceived(ByteBuffer buffer) { 

     count++; 

     if (count == numOfCalls) { 

      System.out.println("Calls: " + count + " time: " + (System.currentTimeMillis() - startTime)); 

      count = 0; 
      startTime = System.currentTimeMillis(); 
     } 
    } 

} 

Nio2Server.java

public class Nio2Server extends Nio2Base { 

    private static byte[] response = new byte[Config.responseLength]; 
    private static ByteBuffer responseBuffer = ByteBuffer.wrap(response); 
    private static ByteBuffer readBuffer = ByteBuffer.wrap(Config.request); 


    public static void main(String[] args) { 
     new Nio2Server().init(); 
    } 

    public void init() { 
     // create an asynchronous server socket channel bound to the default group 
     try (AsynchronousServerSocketChannel serverChannel = AsynchronousServerSocketChannel.open()) { 
      if (serverChannel.isOpen()) { 

       // bind the server socket channel to local address 
       serverChannel.bind(new InetSocketAddress(Config.HOST, Config.PORT)); 

       // display a waiting message while ... waiting clients 
       System.out.println("Waiting for connections ..."); 

       AcceptHandler acceptHandler = new AcceptHandler(serverChannel); 

       serverChannel.accept(null, acceptHandler); 

       // Wait 
       System.in.read(); 

      } else { 
       System.out.println("The asynchronous server-socket channel cannot be opened!"); 
      } 
     } catch (IOException ex) { 
      System.err.println(ex); 
     } 
    } 

    private class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, Void> { 
     private AsynchronousServerSocketChannel serverChannel; 

     public AcceptHandler(AsynchronousServerSocketChannel serverChannel) { 
      this.serverChannel = serverChannel; 
     } 

     @Override 
     public void completed(AsynchronousSocketChannel channel, Void attachment) { 

      serverChannel.accept(null, this); 

      ByteBuffer buffer = getReadBuffer(); 

      try { 
       System.out.println("Incoming connection from: " + channel.getRemoteAddress()); 

       channel.read(buffer, null, new ReadHandler(channel, buffer)); 

      } catch (Exception e) { 
       e.printStackTrace(); 
      } 
     } 

     @Override 
     public void failed(Throwable exc, Void attachment) { 
      exc.printStackTrace(); 
      serverChannel.accept(null, this); 
      throw new UnsupportedOperationException("Cannot accept connections!"); 
     } 
    } 

    @Override 
    ByteBuffer getWriteBuffer() { 
     return responseBuffer.duplicate(); 
    } 

    @Override 
    ByteBuffer getReadBuffer() { 
     return (ByteBuffer)readBuffer.clear(); 
    } 

    @Override 
    void messageReceived(ByteBuffer buffer) { 
    } 


} 
+0

Недопустимый ваш 'BlockingServer'. Он неправильно определяет конец потока. – EJP

ответ

0

Я бы предположить, медленные разогреть из-за JIT нуждающегося время, чтобы согреться, и NIO будучи гораздо более сложным, чем простой блокировкой ввод-вывод (и нуждаются в большей оптимизации). Я думаю, что блокирование ввода-вывода будет более высокой, когда есть только несколько клиентов, так как netty и NIO накладные расходы на их сложность.Тем не менее, NIO намного больше масштабируемое, чем блокирование ввода-вывода (особенно с back-back-сетью netty) и легко справляется с тысячами клиентов.

Кроме того, прежде всего, преждевременная оптимизация - это корень всего зла. Если вы пишете простую прикладную программу командной строки, netty и NIO являются излишними, и вы должны придерживаться блокировки ввода-вывода. Однако, если вы намереваетесь написать надежное, поддерживаемое и высококачественное сетевое приложение, вы должны использовать netty для. Если позже вы решите переключиться с NIO на блокировку ввода-вывода, вы можете сделать это без проблем с netty, так как netty также имеет blocking io backend, который они рекомендуют для небольшого количества подключений.

Смежные вопросы