2015-12-24 2 views
3

Я пытаюсь реализовать UDP-сервер с Netty. Идея состоит в том, чтобы связывать только один раз (поэтому создавая только один Channel). Этот Channel инициализируется только одним обработчиком, который отправляет обработку входящих дейтаграмм между несколькими потоками через ExecutorService.Многопоточный UDP-сервер с Netty

@Configuration 
public class SpringConfig { 

    @Autowired 
    private Dispatcher dispatcher; 

    private String host; 

    private int port; 

    @Bean 
    public Bootstrap bootstrap() throws Exception { 
     Bootstrap bootstrap = new Bootstrap() 
      .group(new NioEventLoopGroup(1)) 
      .channel(NioDatagramChannel.class) 
      .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) 
      .handler(dispatcher); 

     ChannelFuture future = bootstrap.bind(host, port).await(); 
     if(!future.isSuccess()) 
      throw new Exception(String.format("Fail to bind on [host = %s , port = %d].", host, port), future.cause()); 

     return bootstrap; 
    } 
} 

@Component 
@Sharable 
public class Dispatcher extends ChannelInboundHandlerAdapter implements InitializingBean { 

    private int workerThreads; 

    private ExecutorService executorService; 

    @Override 
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 
     DatagramPacket packet = (DatagramPacket) msg; 

     final Channel channel = ctx.channel(); 

     executorService.execute(new Runnable() { 
      @Override 
      public void run() { 
       //Process the packet and produce a response packet (below)    
       DatagramPacket responsePacket = ...; 

       ChannelFuture future; 
       try { 
        future = channel.writeAndFlush(responsePacket).await(); 
       } catch (InterruptedException e) { 
        return; 
       } 
       if(!future.isSuccess()) 
        log.warn("Failed to write response packet."); 
      } 
     }); 
    } 

    @Override 
    public void afterPropertiesSet() throws Exception { 
     executorService = Executors.newFixedThreadPool(workerThreads); 
    } 
} 

У меня есть следующие вопросы:

  1. Если DatagramPacket получен channelRead методом в Dispatcher класса дублироваться перед использованием в рабочем потоке? Интересно, будет ли этот пакет уничтожен после возврата метода channelRead, даже если ссылка поддерживается рабочим потоком.
  2. Можно ли разделить Channel среди всех рабочих потоков и позволить им звонить writeAndFlush одновременно?

Спасибо!

ответ

4
  1. Nope. Если вам нужен объект, чтобы жить дольше, вы либо превращаете его в что-то другое, либо используете ReferenceCountUtil.retain(datagram), а затем ReferenceCountUtil.release(datagram), как только вы закончите с ним. Вы также не должны делать await() в службе исполнителя, вы должны зарегистрировать обработчик для всех, что происходит.

  2. Да, объекты канала являются потокобезопасными и могут быть вызваны из разных потоков.