2016-12-26 2 views
0

Я новичок в Netty, и я написал на основе примера, когда нашел HTTP-сервер Netty, который поддерживает HTTP-соединения для отправки сервер-отправленных событий клиенту браузера.Netty-сервер с открытым доступом к http-соединениям

Проблема в том, что он принимает только до ~ 5 соединений и после этого блокирует новые соединения. Я googled и нашел большинство ответов сказал, чтобы установить SO_LOGBACK на более высокое значение. Пробовал разные значения, и пока я не видел разницы. Я даже установил значение MAX_INTEGER и все еще имел только 5 соединений.

код сервера (Использование Нетти версии 4.1.6.Final):

package server; 

import static io.netty.buffer.Unpooled.copiedBuffer; 

import io.netty.bootstrap.ServerBootstrap; 
import io.netty.channel.ChannelFuture; 
import io.netty.channel.ChannelHandlerContext; 
import io.netty.channel.ChannelInboundHandlerAdapter; 
import io.netty.channel.ChannelInitializer; 
import io.netty.channel.ChannelOption; 
import io.netty.channel.EventLoopGroup; 
import io.netty.channel.nio.NioEventLoopGroup; 
import io.netty.channel.socket.SocketChannel; 
import io.netty.channel.socket.nio.NioServerSocketChannel; 
import io.netty.handler.codec.http.DefaultFullHttpResponse; 
import io.netty.handler.codec.http.FullHttpResponse; 
import io.netty.handler.codec.http.HttpHeaders; 
import io.netty.handler.codec.http.HttpObjectAggregator; 
import io.netty.handler.codec.http.HttpResponseStatus; 
import io.netty.handler.codec.http.HttpServerCodec; 
import io.netty.handler.codec.http.HttpVersion; 

public class NettyHttpServer { 
private ChannelFuture channel; 
private final EventLoopGroup masterGroup; 

public NettyHttpServer() { 
    masterGroup = new NioEventLoopGroup(100); 
} 

public void start() { 
    try { 
    final ServerBootstrap bootstrap = new ServerBootstrap().group(masterGroup) 
    .channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer <SocketChannel>() { 
    @Override 
    public void initChannel(final SocketChannel ch) throws Exception { 
     ch.pipeline().addLast("codec", new HttpServerCodec()); 
     ch.pipeline().addLast("aggregator", new HttpObjectAggregator(512 * 1024)); 
     ch.pipeline().addLast("request", new ChannelInboundHandlerAdapter() { 
     @Override 
     public void channelRead(final ChannelHandlerContext ctx, final Object msg) 
     throws Exception { 
     System.out.println(msg); 
     registerToPubSub(ctx, msg); 
     } 

     @Override 
     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { 
     ctx.flush(); 
     } 

     @Override 
     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 
     ctx.writeAndFlush(new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, 
     HttpResponseStatus.INTERNAL_SERVER_ERROR, 
     copiedBuffer(cause.getMessage().getBytes()))); 
     } 
     }); 
    } 
    }).option(ChannelOption.SO_BACKLOG, Integer.MAX_VALUE) 
    .childOption(ChannelOption.SO_KEEPALIVE, true); 
    channel = bootstrap.bind(8081).sync(); 
    // channels.add(bootstrap.bind(8080).sync()); 
    } catch (final InterruptedException e) {} 
} 

public void shutdown() { 
    masterGroup.shutdownGracefully(); 

    try { 
    channel.channel().closeFuture().sync(); 
    } catch (InterruptedException e) {} 
} 

private void registerToPubSub(final ChannelHandlerContext ctx, Object msg) { 
    new Thread() { 
    @Override 
    public void run() { 
    while (true) { 
    final String responseMessage = "data:abcdef\n\n"; 
    FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, 
     copiedBuffer(responseMessage.getBytes())); 

    response.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE); 
    response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/event-stream"); 
    response.headers().set(HttpHeaders.Names.ACCESS_CONTROL_ALLOW_ORIGIN, "*"); 
    response.headers().set("Cache-Control", "no-cache"); 

    ctx.writeAndFlush(response); 

    try { 
     Thread.sleep(1000); 
    } catch (InterruptedException e) { 
     e.printStackTrace(); 
    } 
    } 
    }; 
    }.start(); 
} 

public static void main(String[] args) { 
    new NettyHttpServer().start(); 
} 
} 

клиент коды расслоения плотного (я запускаю его более чем в 5 раз от моего браузера в разных вкладках, а не все из них получает:

var source = new EventSource("http://localhost:8081"); 
source.onmessage = function(event) { 
    console.log(event.data); 
}; 
source.onerror= function(err){console.log(err); source.close()}; 
source.onopen = function(event){console.log('open'); console.log(event)} 

ответ

1

вы должны позволить браузеру знать, что вы сделали передачу ответа, и что у вас есть три варианта.

  1. Установить длину содержимого
  2. Отправить это фрагментированное
  3. Закрыть соединение, когда вы сделали

Вы не делаете какой-либо из них. Я подозреваю, что ваш браузер все еще ждет полного ответа на каждый отправленный вами запрос и использует новое соединение для каждого запроса в вашем тестировании. После 5 запросов ваш браузер должен отказаться от создания новых подключений.

Еще одна вещь, которую я заметил, это то, что вы создаете новый поток для каждого запроса на своем сервере и никогда не позволяете ему умереть. Это вызовет проблемы по линии при попытке масштабирования. Если вы действительно хотите, чтобы этот код работал в другом потоке, я предлагаю просмотреть перегруженные методы добавления обработчиков в конвейер; они должны указывать пул потоков для их запуска.