2016-07-15 5 views
0

Я пытаюсь создать TCP-сервер, который периодически считывает данные из базы данных (Redis) и отправляет их соответствующему клиенту.Служба Netty и Scheduled Executor

Однако, поскольку я довольно новичок в Netty, я не знаю, как я мог запланировать это. Я знаю, что мне нужно использовать Назначенный палач службу, как это:

ScheduledExecutorService e = Executors.newSingleThreadScheduledExecutor(); 
e.scheduleAtFixedRate(() -> { 
    System.out.println("Calling..."); 
    // Do something 
}, 1, 1, TimeUnit.SECONDS); 

Однако, когда я попытался положить, что в коде сервера, это только вызов методы сразу. Я пытался поместить это в другое место, но все еще не могу понять. Что мне делать?

Вот код сервера:

package com.example.test.app; 

import io.netty.bootstrap.ServerBootstrap; 
import io.netty.channel.ChannelFuture; 
import io.netty.channel.ChannelInitializer; 
import io.netty.channel.ChannelOption; 
import io.netty.channel.EventLoopGroup; 
import io.netty.channel.nio.NioEventLoopGroup; 
import io.netty.channel.socket.SocketChannel; 
import io.netty.channel.socket.nio.NioServerSocketChannel; 
import java.util.concurrent.Executors; 
import java.util.concurrent.ScheduledExecutorService; 
import java.util.concurrent.TimeUnit; 

public class Server { 

    public static void main(String[] args) throws Exception 
    { 
     EventLoopGroup bossGroup = new NioEventLoopGroup(); 
     EventLoopGroup workerGroup = new NioEventLoopGroup(); 

     final ServerHandler handler = new ServerHandler(); 

     try { 

      ServerBootstrap b = new ServerBootstrap(); 
      b.group(bossGroup, workerGroup); 
      b.channel(NioServerSocketChannel.class); 
      b.childHandler(new ChannelInitializer<SocketChannel>() { 
       @Override 
       protected void initChannel(SocketChannel ch) throws Exception 
       { 
        ch.pipeline().addLast(handler); 
       } 

      }); 
      b.option(ChannelOption.SO_BACKLOG, 128); 
      b.childOption(ChannelOption.SO_KEEPALIVE, true); 

      ScheduledExecutorService e = Executors.newSingleThreadScheduledExecutor(); 
      e.scheduleAtFixedRate(() -> { 
       System.out.println("Calling..."); 
       handler.saySomething(); 
      }, 1, 1, TimeUnit.SECONDS); 

      ChannelFuture f = b.bind(1337).sync(); 
      f.channel().closeFuture().sync(); 

     } finally { 
      workerGroup.shutdownGracefully(); 
      bossGroup.shutdownGracefully(); 
     } 
    } 

} 

А вот обработчик сервера:

package com.example.test.app; 

import io.netty.channel.ChannelFuture; 
import io.netty.channel.ChannelFutureListener; 
import io.netty.channel.ChannelHandlerContext; 
import io.netty.channel.ChannelInboundHandlerAdapter; 

public class ServerHandler extends ChannelInboundHandlerAdapter { 

    private ChannelHandlerContext ctx; 

    @Override 
    public void channelActive(ChannelHandlerContext ctx) 
    { 
     this.ctx = ctx; 
     System.out.println("Someone's connedted!"); 
    } 

    public void saySomething() 
    { 
     final ChannelFuture f = ctx.writeAndFlush("Sup!"); 
     f.addListener((ChannelFutureListener) (ChannelFuture future) -> { 
      System.out.println("Something has been said!"); 
     }); 
    } 

} 

ответ

1

Метод saySomething() генерирует NullPointerException для вызова final ChannelFuture f = ctx.writeAndFlush("Sup!"); пока ctx является недействительным. EventExecutorGroup.scheduleAtFixedRate Описание javadoc гласит, что «если какое-либо выполнение задачи встречает исключение, последующие казни подавляются». Вот почему вы получаете только один раз ...

Также кажется, что Netty позволяет повторно использовать экземпляр обработчика для разных экземпляров конвейера, только если вы аннотируете класс этого обработчика как @Sharable. В противном случае это исключение. Если ваш обработчик не имеет статуса (это не ваш случай, так как у вас есть член ctx), вы должны аннотировать его как @Sharable и повторно использовать его для всех созданных конвейеров. Если это состояние, создайте новый экземпляр для каждого нового конвейера (новое клиентское соединение).

Наконец, чтобы запланировать задачу для каждого подключенного клиента, вы можете использовать исполнитель, на который может ссылаться ctx канала подключенного клиента (по умолчанию, как и в вашем случае, EventLoop канала) в вашей реализации channelActive(). Этот исполнитель реализует ScheduledExecutorService, поэтому у вас также есть scheduleAtFixedRate. Посмотрите мою версию своего кода и посмотрите, подходит ли она вам.

Сервер:

package com.example.test.app; 

import io.netty.bootstrap.ServerBootstrap; 
import io.netty.channel.ChannelFuture; 
import io.netty.channel.ChannelInitializer; 
import io.netty.channel.ChannelOption; 
import io.netty.channel.EventLoopGroup; 
import io.netty.channel.nio.NioEventLoopGroup; 
import io.netty.channel.socket.SocketChannel; 
import io.netty.channel.socket.nio.NioServerSocketChannel; 
import java.util.concurrent.Executors; 
import java.util.concurrent.ScheduledExecutorService; 
import java.util.concurrent.TimeUnit; 

public class Server { 

    public static void main(String[] args) throws Exception 
    { 
     EventLoopGroup bossGroup = new NioEventLoopGroup(); 
     EventLoopGroup workerGroup = new NioEventLoopGroup(); 

     try { 

      ServerBootstrap b = new ServerBootstrap(); 
      b.group(bossGroup, workerGroup); 
      b.channel(NioServerSocketChannel.class); 
      b.childHandler(new ChannelInitializer<SocketChannel>() { 
       @Override 
       protected void initChannel(SocketChannel ch) throws Exception 
       { 
        ch.pipeline().addLast(new ServerHandler()); 
       } 

      }); 
      b.option(ChannelOption.SO_BACKLOG, 128); 
      b.childOption(ChannelOption.SO_KEEPALIVE, true); 

//   ScheduledExecutorService e = Executors.newSingleThreadScheduledExecutor(); 
//   e.scheduleAtFixedRate(() -> { 
//    System.out.println("Calling..."); 
//    handler.saySomething(); 
//   }, 1, 1, TimeUnit.SECONDS); 

      ChannelFuture f = b.bind(1337).sync(); 
      f.channel().closeFuture().sync(); 

     } finally { 
      workerGroup.shutdownGracefully(); 
      bossGroup.shutdownGracefully(); 
     } 
    } 

} 

ServerHandler:

package com.example.test.app; 

import io.netty.channel.ChannelFuture; 
import io.netty.channel.ChannelFutureListener; 
import io.netty.channel.ChannelHandlerContext; 
import io.netty.channel.ChannelInboundHandlerAdapter; 
import io.netty.util.concurrent.ScheduledFuture; 

import java.util.concurrent.TimeUnit; 

public class ServerHandler extends ChannelInboundHandlerAdapter { 

    private ScheduledFuture sf; 

    @Override 
    public void channelActive(ChannelHandlerContext ctx) 
    { 
     System.out.println("Someone's connedted! "+ctx.channel()); 
     sf = ctx.executor().scheduleAtFixedRate(() -> { 
      System.out.println("Calling..."); 
      saySomething(ctx); 
     }, 1, 1, TimeUnit.SECONDS); 
    } 

    @Override 
    public void channelInactive(ChannelHandlerContext ctx) { 
     System.out.println("Someone's disconnected! "+ctx.channel()); 
     sf.cancel(false); 
    } 

    private void saySomething(ChannelHandlerContext ctx) 
    { 
      final ChannelFuture f = ctx.writeAndFlush("Sup!"); 
      f.addListener((ChannelFutureListener) (ChannelFuture future) -> { 
       System.out.println("Something has been said!"); 
      }); 
    } 

} 
+0

Спасибо, не знал, что ChannelHandlerContext имеет свой собственный исполнитель. – Furunomoe