Я пытаюсь отправить асинхронно большое количество запросов HTTP-сообщений на один сервер. Мои цели - сравнить каждый ответ с его оригинальным запросом.Как отправить несколько HTTP-запросов асинхронно с Netty?
Для этого я следую за Netty Snoop example.
Однако этот пример (и другие примеры http) не охватывает как отправлять множественные запросы асинхронно, так и как связать их впоследствии с соответствующими запросами.
Все похож вопросы (например, this one, this one или this one, реализовать класс SimpleChannelUpstreamHandler, который от Нетти 3 и не существует в 4.0 больше (documentation netty 4.0)
Каждый имеет представление о том, как решить эту проблему в Нетти 4,0
Edit:
Моя проблема, хотя я пишу много сообщений на канал, я только получить очень медленно ответы (1 ответ/сек, Whe надежда на получение нескольких тысяч/сек). Чтобы прояснить это, позвольте мне опубликовать то, что я получил до сих пор. Я уверен, что сервер, на который я отправляю запросы, также может обрабатывать множество трафика.
То, что я получил до сих пор:
import java.net.URI
import java.nio.charset.StandardCharsets
import java.io.File
import io.netty.bootstrap.Bootstrap
import io.netty.buffer.{Unpooled, ByteBuf}
import io.netty.channel.{ChannelHandlerContext, SimpleChannelInboundHandler, ChannelInitializer}
import io.netty.channel.socket.SocketChannel
import io.netty.channel.socket.nio.NioSocketChannel
import io.netty.handler.codec.http._
import io.netty.handler.timeout.IdleStateHandler
import io.netty.util.{ReferenceCountUtil, CharsetUtil}
import io.netty.channel.nio.NioEventLoopGroup
import scala.io.Source
object ClientTest {
val URL = System.getProperty("url", MY_URL)
val configuration = new Configuration
def main(args: Array[String]) {
println("Starting client")
start()
}
def start(): Unit = {
val group = new NioEventLoopGroup()
try {
val uri: URI = new URI(URL)
val host: String= {val h = uri.getHost(); if (h != null) h else "127.0.0.1"}
val port: Int = {val p = uri.getPort; if (p != -1) p else 80}
val b = new Bootstrap()
b.group(group)
.channel(classOf[NioSocketChannel])
.handler(new HttpClientInitializer())
val ch = b.connect(host, port).sync().channel()
val logFolder: File = new File(configuration.LOG_FOLDER)
val fileToProcess: Array[File] = logFolder.listFiles()
for (file <- fileToProcess){
val name: String = file.getName()
val source = Source.fromFile(configuration.LOG_FOLDER + "/" + name)
val lineIterator: Iterator[String] = source.getLines()
while (lineIterator.hasNext) {
val line = lineIterator.next()
val jsonString = parseLine(line)
val request = createRequest(jsonString, uri, host)
ch.writeAndFlush(request)
}
println("closing")
ch.closeFuture().sync()
}
} finally {
group.shutdownGracefully()
}
}
private def parseLine(line: String) = {
//do some parsing to get the json string I want
}
def createRequest(jsonString: String, uri: URI, host: String): FullHttpRequest = {
val bytebuf: ByteBuf = Unpooled.copiedBuffer(jsonString, StandardCharsets.UTF_8)
val request: FullHttpRequest = new DefaultFullHttpRequest(
HttpVersion.HTTP_1_1, HttpMethod.POST, uri.getRawPath())
request.headers().set(HttpHeaders.Names.HOST, host)
request.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE)
request.headers().set(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.GZIP)
request.headers().add(HttpHeaders.Names.CONTENT_TYPE, "application/json")
request.headers().set(HttpHeaders.Names.CONTENT_LENGTH, bytebuf.readableBytes())
request.content().clear().writeBytes(bytebuf)
request
}
}
class HttpClientInitializer() extends ChannelInitializer[SocketChannel] {
override def initChannel(ch: SocketChannel) = {
val pipeline = ch.pipeline()
pipeline.addLast(new HttpClientCodec())
//aggregates all http messages into one if content is chunked
pipeline.addLast(new HttpObjectAggregator(1048576))
pipeline.addLast(new IdleStateHandler(0, 0, 600))
pipeline.addLast(new HttpClientHandler())
}
}
class HttpClientHandler extends SimpleChannelInboundHandler[HttpObject] {
override def channelRead0(ctx: ChannelHandlerContext, msg: HttpObject) {
try {
msg match {
case res: FullHttpResponse =>
println("response is: " + res.content().toString(CharsetUtil.US_ASCII))
ReferenceCountUtil.retain(msg)
}
} finally {
ReferenceCountUtil.release(msg)
}
}
override def exceptionCaught(ctx: ChannelHandlerContext, e: Throwable) = {
println("HttpHandler caught exception", e)
ctx.close()
}
}
Не пишите канала асинхронный? В результате написания вы получите будущее, которое зависит от вас, как с этим бороться – user1582639
Я также изучаю Netty 4.0. Вот мое понимание дизайна. Первое, о чем я помню, это то, что в Netty 4 вы уверены, что все зарегистрированные обработчики выполняются в одном потоке, поэтому нет необходимости в синхронизации, если вы не используете общие обработчики. Поэтому все ваши отправленные запросы будут отправляться последовательно по каналу, и ответы будут приниматься в той же последовательности. Поэтому, управляя структурой данных, такой как очередь в вашем дуплексном обработчике для всех запросов, вы всегда можете опросить соответствующий запрос на последний полученный ответ. – user1582639
Спасибо за ответы! Моя проблема заключается в том, что, хотя я пишу много сообщений на канал, я получаю очень медленно ответы (1 ответ/сек, тогда как надежда получить несколько тысяч/сек). Чтобы прояснить это, позвольте мне опубликовать то, что я получил до сих пор. – Mart