2015-12-30 2 views
3

Я пытаюсь реализовать «генератор миниатюр» в качестве микросервиса. Я думаю, что такая вещь, вероятно, лучше всего работает как сервер TCP, и поэтому, кратко рассмотрев несколько вариантов, я остановился на Netty. Чтобы сделать сервис максимально эффективным для работы с памятью, я предпочел бы не загружать полное изображение в память и, следовательно, пытаться построить конвейер, который «ThumbnailHandler» может использовать потоки с потоками, чтобы использовать чистые чтения Netty, чтобы поскольку Netty получает больше байтов, генератор миниатюр может пересекать больше потока. К сожалению, я недостаточно знаком с шаблонами Netty или NIO в целом, чтобы знать, буду ли я делать это наилучшим образом, и у меня возникли проблемы с получением даже упрощенной версии для работы, как я ожидал.Трубопровод с Netty

Вот мои настройки сервера:

public class ThumbnailerServer { 

    private int port; 

    public ThumbnailerServer(int port) { 
     this.port = port; 
    } 

    public void run() throws Exception { 
     final ThreadFactory acceptFactory = new DefaultThreadFactory("accept"); 
     final ThreadFactory connectFactory = new DefaultThreadFactory("connect"); 
     final NioEventLoopGroup acceptGroup = new NioEventLoopGroup(1, acceptFactory, NioUdtProvider.BYTE_PROVIDER); 
     final NioEventLoopGroup connectGroup = new NioEventLoopGroup(0, connectFactory, NioUdtProvider.BYTE_PROVIDER); 

     try { 
      ServerBootstrap b = new ServerBootstrap(); 

      b.group(acceptGroup, connectGroup) 
      .channelFactory(NioUdtProvider.BYTE_ACCEPTOR) 
      .option(ChannelOption.SO_BACKLOG, 128) 
      .handler(new LoggingHandler(LogLevel.INFO)) 
      .childHandler(new ChannelInitializer<UdtChannel>() { 
       @Override 
       public void initChannel(UdtChannel ch) throws Exception { 
        ChannelPipeline p = ch.pipeline(); 
        p.addLast("handler", new ThumbnailerServerHandler()); 
       } 
      }); 

      // bind and start to accept incoming connections. 
      b.bind(port).sync().channel().closeFuture().sync(); 
     } finally { 
      connectGroup.shutdownGracefully(); 
      acceptGroup.shutdownGracefully(); 
     } 
    } 

} 

И обработчик эскизов:

public class ThumbnailerServerHandler extends SimpleChannelInboundHandler<ByteBuf> { 

    private static final Logger logger = LoggerFactory.getLogger(ThumbnailerServerHandler.class); 
    private PipedInputStream toThumbnailer = new PipedInputStream(); 
    private PipedOutputStream fromClient = new PipedOutputStream(toThumbnailer); 

    private static final ListeningExecutorService executor = MoreExecutors.listeningDecorator(
      Executors.newFixedThreadPool(5)); 

    private ListenableFuture<OutputStream> future; 

    public ThumbnailerServerHandler() throws IOException { 
     super(ByteBuf.class, true); 
    } 

    @Override 
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception { 
     future = executor.submit(() -> ThumbnailGenerator.generate(toThumbnailer)); 
     future.addListener(() -> { 
      try { 
       ctx.writeAndFlush(future.get()); 
      } catch (InterruptedException | ExecutionException e) { 
       e.printStackTrace(); 
      } 
     }, executor); 
    } 

    @Override 
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { 
     this.fromClient.close(); 
     this.toThumbnailer.close(); 
    } 

    @Override 
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { 
     int readableBytes = msg.readableBytes(); 
     msg.readBytes(this.fromClient, readableBytes); 
    } 

    @Override 
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 
     logger.error("Encountered error during communication", cause); 
     ctx.close(); 
    } 


} 

Вот мой упрощена "Thumbnailer", пока я не весь поток работы:

public class ThumbnailGenerator { 

    public static OutputStream generate(InputStream toThumbnailer) { 
     OutputStream stream = new ByteArrayOutputStream(); 
     try { 
      IOUtils.copy(toThumbnailer, stream); 
     } catch (IOException e) { 
      e.printStackTrace(); 
     } 
     return stream; 
    } 

} 
  1. Уместно ли вы отбрасывать асинхронную задачу в метод handlerAdded, как это? Есть ли более «чистый» способ сделать это?
  2. IOUtils.copy предполагается и блокирует (из-за чтения на потоке с потоком), пока не будут доступны данные для чтения, поэтому я выгрузил его в пул исполнителей, потому что я не могу заблокировать в обработчике если я хочу продолжать получать байты. Однако Я обнаружил, что этот никогда не завершает свою работу, но делает ход. Это потому, что я никогда не сталкивался с байтом EOF (-1)? Как я могу заставить этот поток работать?
  3. Я пропустил конструкцию в нетти, которая упростит этот процесс? Я думал о том, чтобы реализовать его как декодер, который не декодирует, пока не будет весь поток, но тогда я буду загружать все в память.

ответ

2

Хорошо, так что получается, что у меня было несколько заблуждений, которые объясняют, почему у меня возникли проблемы с получением работы.

1) Многие типы файлов не имеют так называемых терминальных байтов. Фактически байты EOF (чаще всего -1, так как это значение переполнения) обычно представляют собой реализацию, предоставляемую читателями, чтобы сообщать своим потребителям, что они достигли конца содержимого. Обычно это не то, что существует в самом файле.

2) channelReadComplete не так ясно, как кажется. channelReadComplete вызывается после того, как выполняется максимальное количество считываний, настроенных в netty (по умолчанию 10), или когда у него есть основания полагать, что сообщение было полностью отправлено, как показано, прочитав пустой буфер или получив буфер, который меньше, чем сконфигурированный размер блока.

Что касается того, почему было похоже, что копия входного потока была висит, хорошо, потому что поток входного потока в канале никогда не вызывал терминального значения (это пример реализации-байта EOF-считывателя). PipedInputStreams указывают только EOF после того, как поток данных, управляющий ими, был закрыт.

Чтобы сделать эту реализацию, я должен был увеличить количество сообщений до достаточно большого числа и доверенного каналаReadComplete, который будет вызываться после последнего чтения, который возвращает меньше размера блока. В этот момент было бы безопасно закрыть и сбросить выходной поток для следующего сообщения. Закрытие выходного потока приведет к тому, что входной поток, наконец, вернет этот байт EOF, и все остальное может продолжить работу.

Смежные вопросы