2016-05-09 2 views
0

Я пытаюсь отправить асинхронно большое количество запросов HTTP-сообщений на один сервер. Мои цели - сравнить каждый ответ с его оригинальным запросом.Как отправить несколько HTTP-запросов асинхронно с Netty?

Для этого я следую за Netty Snoop example.

Однако этот пример (и другие примеры http) не охватывает как отправлять множественные запросы асинхронно, так и как связать их впоследствии с соответствующими запросами.

Все похож вопросы (например, this one, this one или this one, реализовать класс SimpleChannelUpstreamHandler, который от Нетти 3 и не существует в 4.0 больше (documentation netty 4.0)

Каждый имеет представление о том, как решить эту проблему в Нетти 4,0

Edit:

Моя проблема, хотя я пишу много сообщений на канал, я только получить очень медленно ответы (1 ответ/сек, Whe надежда на получение нескольких тысяч/сек). Чтобы прояснить это, позвольте мне опубликовать то, что я получил до сих пор. Я уверен, что сервер, на который я отправляю запросы, также может обрабатывать множество трафика.

То, что я получил до сих пор:

import java.net.URI 
import java.nio.charset.StandardCharsets 
import java.io.File 

import io.netty.bootstrap.Bootstrap 
import io.netty.buffer.{Unpooled, ByteBuf} 
import io.netty.channel.{ChannelHandlerContext, SimpleChannelInboundHandler, ChannelInitializer} 
import io.netty.channel.socket.SocketChannel 
import io.netty.channel.socket.nio.NioSocketChannel 
import io.netty.handler.codec.http._ 
import io.netty.handler.timeout.IdleStateHandler 
import io.netty.util.{ReferenceCountUtil, CharsetUtil} 
import io.netty.channel.nio.NioEventLoopGroup 

import scala.io.Source 

object ClientTest { 

    val URL = System.getProperty("url", MY_URL)  
    val configuration = new Configuration 

    def main(args: Array[String]) { 
    println("Starting client") 
    start() 
    } 

    def start(): Unit = { 

    val group = new NioEventLoopGroup() 

    try { 

     val uri: URI = new URI(URL) 
     val host: String= {val h = uri.getHost(); if (h != null) h else "127.0.0.1"} 
     val port: Int = {val p = uri.getPort; if (p != -1) p else 80} 

     val b = new Bootstrap() 

     b.group(group) 
     .channel(classOf[NioSocketChannel]) 
     .handler(new HttpClientInitializer()) 

     val ch = b.connect(host, port).sync().channel() 

     val logFolder: File = new File(configuration.LOG_FOLDER) 
     val fileToProcess: Array[File] = logFolder.listFiles() 

     for (file <- fileToProcess){ 
     val name: String = file.getName() 
     val source = Source.fromFile(configuration.LOG_FOLDER + "/" + name) 

     val lineIterator: Iterator[String] = source.getLines() 

     while (lineIterator.hasNext) { 
      val line = lineIterator.next() 
      val jsonString = parseLine(line) 
      val request = createRequest(jsonString, uri, host) 
      ch.writeAndFlush(request) 
     } 
     println("closing") 
     ch.closeFuture().sync() 
     } 
    } finally { 
     group.shutdownGracefully() 
    } 
    } 

    private def parseLine(line: String) = { 
    //do some parsing to get the json string I want 
    } 

    def createRequest(jsonString: String, uri: URI, host: String): FullHttpRequest = { 
    val bytebuf: ByteBuf = Unpooled.copiedBuffer(jsonString, StandardCharsets.UTF_8) 

    val request: FullHttpRequest = new DefaultFullHttpRequest(
     HttpVersion.HTTP_1_1, HttpMethod.POST, uri.getRawPath()) 
    request.headers().set(HttpHeaders.Names.HOST, host) 
    request.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE) 
    request.headers().set(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.GZIP) 
    request.headers().add(HttpHeaders.Names.CONTENT_TYPE, "application/json") 

    request.headers().set(HttpHeaders.Names.CONTENT_LENGTH, bytebuf.readableBytes()) 
    request.content().clear().writeBytes(bytebuf) 

    request 
    } 
} 

class HttpClientInitializer() extends ChannelInitializer[SocketChannel] { 

    override def initChannel(ch: SocketChannel) = { 
    val pipeline = ch.pipeline() 

    pipeline.addLast(new HttpClientCodec()) 

    //aggregates all http messages into one if content is chunked 
    pipeline.addLast(new HttpObjectAggregator(1048576)) 

    pipeline.addLast(new IdleStateHandler(0, 0, 600)) 

    pipeline.addLast(new HttpClientHandler()) 
    } 
} 

class HttpClientHandler extends SimpleChannelInboundHandler[HttpObject] { 

    override def channelRead0(ctx: ChannelHandlerContext, msg: HttpObject) { 
    try { 
     msg match { 
     case res: FullHttpResponse => 
      println("response is: " + res.content().toString(CharsetUtil.US_ASCII)) 
      ReferenceCountUtil.retain(msg) 
     } 
    } finally { 
     ReferenceCountUtil.release(msg) 
    } 
    } 

    override def exceptionCaught(ctx: ChannelHandlerContext, e: Throwable) = { 
    println("HttpHandler caught exception", e) 
    ctx.close() 
    } 
} 
+0

Не пишите канала асинхронный? В результате написания вы получите будущее, которое зависит от вас, как с этим бороться – user1582639

+0

Я также изучаю Netty 4.0. Вот мое понимание дизайна. Первое, о чем я помню, это то, что в Netty 4 вы уверены, что все зарегистрированные обработчики выполняются в одном потоке, поэтому нет необходимости в синхронизации, если вы не используете общие обработчики. Поэтому все ваши отправленные запросы будут отправляться последовательно по каналу, и ответы будут приниматься в той же последовательности. Поэтому, управляя структурой данных, такой как очередь в вашем дуплексном обработчике для всех запросов, вы всегда можете опросить соответствующий запрос на последний полученный ответ. – user1582639

+0

Спасибо за ответы! Моя проблема заключается в том, что, хотя я пишу много сообщений на канал, я получаю очень медленно ответы (1 ответ/сек, тогда как надежда получить несколько тысяч/сек). Чтобы прояснить это, позвольте мне опубликовать то, что я получил до сих пор. – Mart

ответ

0

ChannelFuture ср = channel.writeAndFlush (createRequest());

и как связать их впоследствии с соответствующими запросами.

Can netty assign multiple IO threads to the same Channel?

Рабочий поток раз назначается для канала не изменяется в течение всего срока службы канала. Поэтому мы не используем нити. Это связано с тем, что вы поддерживаете связь, и канал остается в живых.

Для решения этой проблемы вы можете рассмотреть пул каналов (скажем, 30). Затем используйте пул каналов для размещения ваших запросов.

 int concurrent = 30; 

    // Start the client. 
    ChannelFuture[] channels = new ChannelFuture[concurrent]; 
    for (int i = 0; i < channels.length; i++) { 
    channels[i] = b.connect(host, port).sync(); 
    } 

    for (int i = 0; i < 1000; i++) { 
     ChannelFuture requestHandle = process(channels[(i+1)%concurrent]); 
     // do something with the request handle  
    } 

    for (int i = 0; i < channels.length; i++) { 
    channels[i].channel().closeFuture().sync(); 
    } 

НТН

+0

Я думаю, что даже с одним каналом он должен получать больше 1 сообщения в секунду со стороны сервера.Мое предположение заключается в том, что автор нагромождает канал с запросами, не дающими возможности обрабатывать ответы. – user1582639

+0

параллелизм - время (миллисекунды), чтобы обработать 1000 запросов (будет варьироваться в зависимости от конечной точки) 1 - 55219 10 - 48749 30 - 13364 100 - 29106 до увеличения времени числа нитей улучшает производительность, но позже контекстный переключатель влияет на производительность. –

+0

user1582639 то, что вы упомянули, действительно было одной из проблем. Ограничение количества запросов улучшило производительность. @AmodPandey: использование пула каналов также работает, а также улучшает производительность еще больше. Тем не менее, я до сих пор не понимаю, как связать запросы intiatial с соответствующим ответом. writeAndFlush действительно возвращает ChannelFurture, но если я добавлю слушателя к нему, он будет завершен после отправки запроса, что не позволяет мне каким-либо образом подключить его к ответу. – Mart

Смежные вопросы