2015-09-09 4 views
3

У меня проблема с akka-http. Я пытаюсь несколько раз запрашивать поток, но он останавливается с настройкой по умолчанию в 4 раза. Вот код, который я использую. Может кто-нибудь помочь мне понять, почему он ждет?Почему akka http не продолжает запрашивать?

спасибо

import akka.actor.ActorSystem 
import akka.http.scaladsl.Http 
import akka.http.scaladsl.Http.{HostConnectionPool, OutgoingConnection} 
import akka.http.scaladsl.model.{HttpRequest, HttpResponse} 
import akka.stream._ 
import akka.stream.scaladsl._ 
import com.typesafe.scalalogging.LazyLogging 

import scala.concurrent.Future 
import scala.util.{Try, Failure, Success} 

object TestHttp extends LazyLogging { 

    def main(args: Array[String]) { 
    implicit val system = ActorSystem() 
    import system.dispatcher 
    val decider: Supervision.Decider = { 
     case e => { 
     logger.error(e.getMessage, e) 
     Supervision.Resume 
     } 
    } 
    implicit val materializer = ActorMaterializer(ActorMaterializerSettings(system).withSupervisionStrategy(decider).withDebugLogging(true)) 
    val resultSink: Sink[(Try[HttpResponse], Int), Future[Unit]] = Sink.foreach { case (hr, i) => { 
     hr match { 
     case Success(r) => logger.info(s"Success ${r} for ${i}") 
     case Failure(e) => logger.error(s"Error ${e} for ${i}") 
     } 
    } 
    } 
    val source: Source[Int, Unit] = Source(0 to 1500).map(i => { 
     logger.info(s"${i} iteration") 
     i 
    }) 

    val buildHr: Flow[Int, (HttpRequest, Int), Unit] = Flow[Int].map { case s => { 
     (HttpRequest(uri = "/").withDefaultHeaders(), s) 
    } 
    } 

    val connection: Flow[(HttpRequest, Int), (Try[HttpResponse], Int), HostConnectionPool] = Http().cachedHostConnectionPool("www.adajam.uk.com") 
    import FlowGraph.Implicits._ 
    source.via(buildHr).via(connection).to(resultSink).run() 
    } 
} 
+1

Вы должны прочитать тело ответа, то есть сделать что-то с 'r.entity.dataBytes'. В противном случае соединение нельзя использовать повторно. – jrudolph

+0

Спасибо, jrudolph, он работает. После прочтения объекта поток продолжается и освобождает соединение. –

+0

Прохладный, рад, что он работает! :) – jrudolph

ответ

3

Для того, чтобы освободить соединение, тело должно быть прочитано. Это может быть сделано с

r.entity.dataBytes.to(Sink.ignore) 
Смежные вопросы