2014-09-24 6 views
0

У меня есть простой ECHO-сервер и клиент, написанный с использованием Netty. Сервер и клиент находятся на одной машине. Я ожидал среднюю задержку порядка нескольких миллисекунд, однако, независимо от того, что я пытаюсь, я никогда не могу довести латентность до субмиллисекундной продолжительности. Любая помощь будет принята с благодарностью.Настройка примера клиент/сервер с низкой задержкой в ​​Netty

Обновление: Даже при использовании System.nanoTime я вижу задержку около 25-30 мс.

EchoClient

import org.jboss.netty.bootstrap.ClientBootstrap; 
import org.jboss.netty.channel.*; 
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; 
import org.jboss.netty.handler.execution.ExecutionHandler; 
import org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor; 

import java.net.InetSocketAddress; 
import java.util.concurrent.Executors; 

public class EchoClient { 

    public static void main(String[] args) { 

     if (args.length != 1) { 
      System.err.println(String.format("usage: %s <num-msgs>", EchoClient.class.getCanonicalName())); 
      System.exit(1); 
     } 

     final long NUM_MSGS = Integer.parseInt(args[0]); 

     final EchoClientHandler echoClientHandler = new EchoClientHandler(); 

     final ExecutionHandler e = 
       new ExecutionHandler(new OrderedMemoryAwareThreadPoolExecutor(4, 128 * 1024L, 128 * 1024L)); 
     ChannelFactory factory = 
       new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), 
                Executors.newCachedThreadPool()); 

     ClientBootstrap bootstrap = new ClientBootstrap(factory); 
     bootstrap.setPipelineFactory(new ChannelPipelineFactory() { 
      @Override 
      public ChannelPipeline getPipeline() throws Exception { 
       return Channels.pipeline(new TestPayloadEncoder(), 
             new TestPayloadDecoder(), 
             e, 
             echoClientHandler); 
      } 
     }); 
     bootstrap.setOption("tcpNoDelay", true); 
     bootstrap.setOption("keepAlive", false); 
     bootstrap.setOption("child.keepAlive", false); 
     bootstrap.setOption("sendBufferSize", 128 * 1024L); 
     bootstrap.setOption("receiveBufferSize", 128 * 1024L); 

     for (int i = 0; i < NUM_MSGS; i++) { 
      final InetSocketAddress serverAddr = 
        new InetSocketAddress("localhost", 8080); 

      bootstrap.connect(serverAddr).addListener(new ChannelFutureListener() { 
       @Override 
       public void operationComplete(ChannelFuture f) throws Exception { 
        if (f.isSuccess()) { 
         f.getChannel().write(new TestPayload()); 
        } 
       } 
      }); 
     } 

     while (echoClientHandler.numMsgs.get() < NUM_MSGS); 

     System.out.println(echoClientHandler.numMsgs); 
     System.out.println(echoClientHandler.aggTime); 
     System.out.println(String.format("mean transfer time: %.2fms", 
             ((float) echoClientHandler.aggTime.get())/
             echoClientHandler.numMsgs.get())); 
     System.out.flush(); 

     e.releaseExternalResources(); 
     factory.releaseExternalResources(); 
    } 

} 

EchoClientHandler

import org.jboss.netty.channel.ChannelHandlerContext; 
import org.jboss.netty.channel.ExceptionEvent; 
import org.jboss.netty.channel.MessageEvent; 
import org.jboss.netty.channel.SimpleChannelHandler; 

import java.util.concurrent.atomic.AtomicLong; 

public class EchoClientHandler extends SimpleChannelHandler { 

    public final AtomicLong numMsgs = new AtomicLong(0); 
    public final AtomicLong aggTime = new AtomicLong(0); 

    @Override 
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { 
     long recvTime = System.currentTimeMillis(); 
     TestPayload m = (TestPayload) e.getMessage(); 
     aggTime.addAndGet(recvTime - m.getTime()); 
     numMsgs.incrementAndGet(); 
    } 

    @Override 
    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception { 
     e.getCause().printStackTrace(); 
     e.getChannel().close(); 
    } 

} 

EchoServer

import org.jboss.netty.bootstrap.ServerBootstrap; 
import org.jboss.netty.channel.ChannelFactory; 
import org.jboss.netty.channel.ChannelPipeline; 
import org.jboss.netty.channel.ChannelPipelineFactory; 
import org.jboss.netty.channel.Channels; 
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; 
import org.jboss.netty.handler.execution.ExecutionHandler; 
import org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor; 

import java.net.InetSocketAddress; 
import java.util.concurrent.Executors; 

public class EchoServer { 

    public static void main(String[] args) { 
     ChannelFactory factory = 
       new NioServerSocketChannelFactory(Executors.newFixedThreadPool(4), 
                Executors.newFixedThreadPool(32), 
                32); 

     ServerBootstrap bootstrap = new ServerBootstrap(factory); 
     final ExecutionHandler e = 
       new ExecutionHandler(new OrderedMemoryAwareThreadPoolExecutor(4, 128 * 1024L, 128 * 1024L)); 
     bootstrap.setPipelineFactory(new ChannelPipelineFactory() { 
      @Override 
      public ChannelPipeline getPipeline() throws Exception { 
       return Channels.pipeline(e, new EchoServerHandler()); 
      } 
     }); 
     bootstrap.setOption("reuseAddr", true); 
     bootstrap.setOption("keepAlive", false); 
     bootstrap.setOption("child.reuseAddr", true); 
     bootstrap.setOption("child.soLinger", 0); 
     bootstrap.setOption("child.keepAlive", false); 
     bootstrap.setOption("child.tcpNoDelay", true); 
     bootstrap.setOption("child.sendBufferSize", 128 * 1024L); 
     bootstrap.setOption("child.receiveBufferSize", 128 * 1024L); 
     bootstrap.bind(new InetSocketAddress("localhost", 8080)); 
    } 

} 

EchoServerHandler

import org.jboss.netty.channel.*; 

public class EchoServerHandler extends SimpleChannelHandler { 

    @Override 
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { 
     e.getChannel().write(e.getMessage()); 
    } 

    @Override 
    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception { 
     e.getCause().printStackTrace(); 
     e.getChannel().close(); 
    } 

} 

TestPayload

import org.jboss.netty.buffer.ChannelBuffer; 

import java.util.Date; 
import java.util.Random; 

public class TestPayload { 

    private static final int PREAMBLE_LEN = (Long.SIZE + Integer.SIZE)/8; 

    private static final Random RNG; 
    static { 
     RNG = new Random(); 
     RNG.setSeed(new Date().getTime()); 
    } 

    private final int paddingLen; 
    private final byte[] padding; 
    private final long time; 

    public TestPayload() { 
     this(65536); 
    } 

    public TestPayload(int sizeInBytes) { 
     this.paddingLen = sizeInBytes; 
     this.padding = new byte[this.paddingLen]; 
     RNG.nextBytes(this.padding); 
     this.time = System.currentTimeMillis(); 
    } 

    private TestPayload(long time, int paddingLen, byte[] padding) { 
     this.paddingLen = paddingLen; 
     this.padding = padding; 
     this.time = time; 
    } 

    public long getTime() { 
     return this.time; 
    } 

    public void writeTo(ChannelBuffer buf) { 
     buf.writeLong(this.time); 
     buf.writeInt(this.paddingLen); 
     buf.writeBytes(this.padding); 
    } 

    public static TestPayload readFrom(ChannelBuffer buf) { 
     if (buf.readableBytes() < PREAMBLE_LEN) { 
      return null; 
     } 

     buf.markReaderIndex(); 

     long time = buf.readLong(); 
     int paddingLen = buf.readInt(); 

     if (buf.readableBytes() < paddingLen) { 
      buf.resetReaderIndex(); 
      return null; 
     } 

     byte[] padding = new byte[paddingLen]; 
     buf.readBytes(padding); 

     return new TestPayload(time, paddingLen, padding); 
    } 

    public int getLength() { 
     return PREAMBLE_LEN + this.paddingLen; 
    } 
+0

Если вы хотите измерить субмиллис, используйте System.nanoTime(), который является таймером. System.currentTimeMillis() - это «настенные часы» – ssedano

+0

Отредактировано сообщение ... Спасибо! – Bala

ответ

0

Вы используете ваш клиент и сервер в различных виртуальных машинах? Если это так, измерение времени через границы JVM не так прямо, как вы думаете. Так, например, используя System.nanoTime() не обязательно будет работать в соответствии с oracle java doc:

Значения, возвращаемые этим методом становится значимым только тогда, когда разница между двумя такими значениями, полученные в пределах того же экземпляра Java виртуальная машина, вычисляется.

Предполагая, что вы можете найти надежный способ измерения времени через JVM, и если ваша цель состоит в том, чтобы изолировать, как долго он принимает клиент Netty для отправки на сервер Нетти затем упростить случай использования, чтобы изолировать это как можно больше , Например, в приведенном выше коде вы рассчитываете время отправки/получения массива из 65536 байт. Удалите это из эксперимента по синхронизации, чтобы помочь изолировать места, где есть узкие места.

Сколько прогонов вы собираете с течением времени? Вы исключаете время инициализации самого Netty (выполняете несколько сообщений между клиентом/сервером, прежде чем принимать время)?

Также как регулировка вашей производительности влияет на конфигурацию? Существует множество регуляторов для настройки (размер пула потоков, размер отправки/получения буфера и т. Д.).

Какую версию Netty вы используете, и есть ли возможность принудительного сброса после того, как вы напишете?

Я не вижу код для EchoClient. Похоже, что вы скопировали/вставляли код для EchoClientHandler, где код EchoClient должен быть.

+0

Да, клиент и сервер находятся в разных JVM. Я пытаюсь измерить время, прошедшее с момента моего начала отправки до момента, когда Возможно, этот таймер может захватить немного больше, чем то, что я хочу точно измерить, но по большей части я считаю, что это должно быть ОК. – Bala

+0

Я не знаю, что настроить, чтобы получить субмиллисекундные задержки. На самом деле ничего ниже 4-6 мс было бы здорово, если я смогу это достичь. – Bala

+0

Вы не знаете, что все в порядке, пока вы не экспериментируете. Можете ли вы опубликовать номера с уменьшенной сложностью тестового случая, и с этим номером добавьте некоторое обоснование относительно того, как вы делаете синхронизацию времени JVM? Также вам придется поэкспериментировать с различными настройками. Например, каковы преимущества/недостатки для изменения параметров 'OrderedMemoryAwareThreadPoolExecutor'? Вам нужно столько потоков, сколько вы используете? –

Смежные вопросы