2012-03-09 2 views
6

Я написал простой UDP-сервер с Netty, который просто выводит в журналах сообщения (фреймы). Для этого я создал простой декодер декодера кадра и простой обработчик сообщений. У меня также есть клиент, который может отправлять несколько запросов последовательно и/или параллельно.Множество UDP-запросов, потерянных на UDP-сервере с Netty

Когда я настраиваю свой клиентский тестер для отправки, например, нескольких сотен запросов последовательно с небольшой задержкой между ними, мой сервер, написанный с помощью Netty, получает их все правильно. Но на данный момент я увеличиваю количество одновременных запросов в моем клиенте (например, 100) в сочетании с последовательными и несколькими повторами, мой сервер начинает терять много запросов. Например, когда я отправляю 50000 запросов, мой сервер получает только около 49000, когда только использует простой ChannelHandler, который выводит полученное сообщение.

И когда я добавляю простой декодер кадра (который печатает кадр и копирует его в другой буфер) перед этим обработчиком, сервер обрабатывает только половину запросов!

Я заметил, что независимо от количества рабочих я указываю созданному NioDatagramChannelFactory, всегда есть один и только один поток, который обрабатывает запросы (я использую рекомендуемый Executors.newCachedThreadPool() в качестве другого параметра).

Я также создал другой похожий простой UDP-сервер, основанный на DatagramSocket, поставляемом с JDK, и он отлично обрабатывает все запросы с 0 (ноль) потерянных !! Когда я отправляю 50000 запросов в моем клиенте (например, с 1000 потоками), я получил 50000 запросов на моем сервере.

Я делаю что-то неправильно при настройке моего UDP-сервера с помощью Netty? Или, может быть, Netty просто не предназначена для поддержки такой нагрузки? Почему существует только один поток, используемый данным пулом кэшированных потоков (я заметил, что только один поток и всегда один и тот же используется при поиске в JMX jconsole и путем проверки имени потока в выходных журналах)? Я думаю, что если больше потоков, где используется, как ожидалось, сервер сможет легко обрабатывать такую ​​нагрузку, потому что я могу сделать это без каких-либо проблем, если не использую Netty!

См мой код инициализации ниже:

... 

lChannelfactory = new NioDatagramChannelFactory(Executors.newCachedThreadPool(), nbrWorkers); 
lBootstrap = new ConnectionlessBootstrap(lChannelfactory); 

lBootstrap.setPipelineFactory(new ChannelPipelineFactory() { 
    @Override 
    public ChannelPipeline getPipeline() 
    { 
     ChannelPipeline lChannelPipeline = Channels.pipeline(); 
     lChannelPipeline.addLast("Simple UDP Frame Dump DECODER", new SimpleUDPPacketDumpDecoder(null));    
     lChannelPipeline.addLast("Simple UDP Frame Dump HANDLER", new SimpleUDPPacketDumpChannelHandler(lOuterFrameStatsCollector));    
     return lChannelPipeline; 
    } 
}); 

bindChannel = lBootstrap.bind(socketAddress); 

... 

И содержание метода декодирования() в моем декодере:

protected Object decode(ChannelHandlerContext iCtx, Channel iChannel, ChannelBuffer iBuffer) throws Exception 
{ 
    ChannelBuffer lDuplicatedChannelBuffer = null; 
    sLogger.debug("Decode method called."); 

    if (iBuffer.readableBytes() < 8) return null; 
    if (outerFrameStatsCollector != null) outerFrameStatsCollector.incrementNbrRequests(); 

    if (iBuffer.readable()) 
    {   
     sLogger.debug(convertToAsciiHex(iBuffer.array(), iBuffer.readableBytes()));      
     lDuplicatedChannelBuffer = ChannelBuffers.dynamicBuffer(iBuffer.readableBytes());    
     iBuffer.readBytes(lDuplicatedChannelBuffer); 
    } 

    return lDuplicatedChannelBuffer; 
} 

И содержание messageReceived() метод в моем обработчике:

public void messageReceived(final ChannelHandlerContext iChannelHandlerContext, final MessageEvent iMessageEvent) throws Exception 
{ 
    ChannelBuffer lMessageBuffer = (ChannelBuffer) iMessageEvent.getMessage(); 
    if (outerFrameStatsCollector != null) outerFrameStatsCollector.incrementNbrRequests(); 

    if (lMessageBuffer.readable()) 
    {   
     sLogger.debug(convertToAsciiHex(lMessageBuffer.array(), lMessageBuffer.readableBytes()));    
     lMessageBuffer.discardReadBytes(); 
    } 
} 
+0

Вы знаете, что UDP не имеет никаких гарантий доставки правильно? –

+0

Да, я знаю, что таких гарантий поставки нет, но мои тесты нагрузки выполняются локально, и я ничего не теряю с помощью своего простого сервера, использующего DatagramSocket вместо Netty. В настоящее время я также анализирую запросы с помощью WireShark, чтобы проверить, что ничто не потеряно в одном случае (без netty) и что пакеты теряются при использовании netty. – The4Summers

+0

@ The4Summers Если вы видите, что пакеты теряются с Netty, Netty не достаточно быстро обрабатывает входящие пакеты, или нет, поэтому буфер приема сокета заполняется, поэтому входящие пакеты отбрасываются, не имея нигде больше идти. Ускорьте свое получение или замедлите отправку. – EJP

ответ

7

Неверный конфигурационный экземпляр ConnectionlessBootstrap.

  1. Вам необходимо настроить следующие параметры с оптимальными значениями.

    размер SO_SNDBUF, размер SO_RCVBUF и ReceiveBufferSizePredictorFactory

    lBootstrap.setOption("sendBufferSize", 1048576); 
    
    lBootstrap.setOption("receiveBufferSize", 1048576); 
    
    lBootstrap.setOption("receiveBufferSizePredictorFactory", 
    new AdaptiveReceiveBufferSizePredictorFactory(MIN_SIZE, INITIAL_SIZE, MAX_SIZE)); 
    

    проверка DefaultNioDatagramChannelConfig класс для получения более подробной информации.

  2. Трубопровод делает все, используя рабочий поток Netty.Если рабочий поток перегружен, он задерживает выполнение цикла цикла выбора и будет узким местом при чтении/записи канала . Вы должны добавить обработчик выполнения, как показано ниже в конвейере . Он освободит рабочий поток для выполнения собственной работы.

    ChannelPipeline lChannelPipeline = Channels.pipeline(); 
    
    lChannelPipeline.addFirst("execution-handler", new ExecutionHandler(
        new OrderedMemoryAwareThreadPoolExecutor(16, 1048576, 1048576)); 
    
    //add rest of the handlers here 
    
+0

Спасибо за подсказку о параметрах размера буфера! Однако, что касается обработчика выполнения, я попробовал его после этого сообщения и, да, он немного помог, но я все еще терял половину запросов при использовании декодера и обработчика. Я уверен, что увеличение буфера поможет намного больше. – The4Summers

+0

Тем не менее, я до сих пор не понимаю, почему мы должны указывать исполнятеля пула потоков и ряд рабочих при создании класса NioDatagramChannelFactory, если они никогда не используются !! ?? – The4Summers

+0

Я рад, что это сработало для вас. Исполнители пула потоков используются для создания рабочих потоков в конвейерах NioDatagramChannel. Один рабочий поток может быть назначен для DatagramChannel для неблокирующего чтения/записи. Если ваше приложение прослушивает более одного порта, многие рабочие потоки (по умолчанию - cpu size * 2) будут созданы и назначены DatagramChannel в циклическом режиме. Если вы хотите иметь больше рабочих потоков, вы можете указать в конструкторе NioDatagramChannelFactory. –

Смежные вопросы