Я пытаюсь реализовать «генератор миниатюр» в качестве микросервиса. Я думаю, что такая вещь, вероятно, лучше всего работает как сервер TCP, и поэтому, кратко рассмотрев несколько вариантов, я остановился на Netty. Чтобы сделать сервис максимально эффективным для работы с памятью, я предпочел бы не загружать полное изображение в память и, следовательно, пытаться построить конвейер, который «ThumbnailHandler» может использовать потоки с потоками, чтобы использовать чистые чтения Netty, чтобы поскольку Netty получает больше байтов, генератор миниатюр может пересекать больше потока. К сожалению, я недостаточно знаком с шаблонами Netty или NIO в целом, чтобы знать, буду ли я делать это наилучшим образом, и у меня возникли проблемы с получением даже упрощенной версии для работы, как я ожидал.Трубопровод с Netty
Вот мои настройки сервера:
public class ThumbnailerServer {
private int port;
public ThumbnailerServer(int port) {
this.port = port;
}
public void run() throws Exception {
final ThreadFactory acceptFactory = new DefaultThreadFactory("accept");
final ThreadFactory connectFactory = new DefaultThreadFactory("connect");
final NioEventLoopGroup acceptGroup = new NioEventLoopGroup(1, acceptFactory, NioUdtProvider.BYTE_PROVIDER);
final NioEventLoopGroup connectGroup = new NioEventLoopGroup(0, connectFactory, NioUdtProvider.BYTE_PROVIDER);
try {
ServerBootstrap b = new ServerBootstrap();
b.group(acceptGroup, connectGroup)
.channelFactory(NioUdtProvider.BYTE_ACCEPTOR)
.option(ChannelOption.SO_BACKLOG, 128)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<UdtChannel>() {
@Override
public void initChannel(UdtChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast("handler", new ThumbnailerServerHandler());
}
});
// bind and start to accept incoming connections.
b.bind(port).sync().channel().closeFuture().sync();
} finally {
connectGroup.shutdownGracefully();
acceptGroup.shutdownGracefully();
}
}
}
И обработчик эскизов:
public class ThumbnailerServerHandler extends SimpleChannelInboundHandler<ByteBuf> {
private static final Logger logger = LoggerFactory.getLogger(ThumbnailerServerHandler.class);
private PipedInputStream toThumbnailer = new PipedInputStream();
private PipedOutputStream fromClient = new PipedOutputStream(toThumbnailer);
private static final ListeningExecutorService executor = MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(5));
private ListenableFuture<OutputStream> future;
public ThumbnailerServerHandler() throws IOException {
super(ByteBuf.class, true);
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
future = executor.submit(() -> ThumbnailGenerator.generate(toThumbnailer));
future.addListener(() -> {
try {
ctx.writeAndFlush(future.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}, executor);
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
this.fromClient.close();
this.toThumbnailer.close();
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
int readableBytes = msg.readableBytes();
msg.readBytes(this.fromClient, readableBytes);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.error("Encountered error during communication", cause);
ctx.close();
}
}
Вот мой упрощена "Thumbnailer", пока я не весь поток работы:
public class ThumbnailGenerator {
public static OutputStream generate(InputStream toThumbnailer) {
OutputStream stream = new ByteArrayOutputStream();
try {
IOUtils.copy(toThumbnailer, stream);
} catch (IOException e) {
e.printStackTrace();
}
return stream;
}
}
- Уместно ли вы отбрасывать асинхронную задачу в метод handlerAdded, как это? Есть ли более «чистый» способ сделать это?
- IOUtils.copy предполагается и блокирует (из-за чтения на потоке с потоком), пока не будут доступны данные для чтения, поэтому я выгрузил его в пул исполнителей, потому что я не могу заблокировать в обработчике если я хочу продолжать получать байты. Однако Я обнаружил, что этот никогда не завершает свою работу, но делает ход. Это потому, что я никогда не сталкивался с байтом EOF (-1)? Как я могу заставить этот поток работать?
- Я пропустил конструкцию в нетти, которая упростит этот процесс? Я думал о том, чтобы реализовать его как декодер, который не декодирует, пока не будет весь поток, но тогда я буду загружать все в память.