2016-04-21 2 views
3

У меня есть следующий поток, который работает довольно хорошо:Akka поток + Akka Http - Получить запрос на Ошибке

source 
    .map(x => HttpRequest(uri = x.rawRequest)) 
    .via(Http().outgoingConnection(host, port)) 
    .to(Sink.actorRef(myActor, IsDone)) 
    .run() 

и простой актер обрабатывать статус ответа и окончательное сообщение, когда поток завершается:

/** 
    * A simple actor to count how many rows have been processed 
    * in the complete process given a http status 
    * 
    * It also finish the main thread upon a message of type [[IsDone]] is received 
    */ 
class MyActor extends Actor with ActorLogging { 

    var totalProcessed = 0 

    def receive = LoggingReceive { 

    case response: HttpResponse => 

     if(response.status.isSuccess()) { 
     totalProcessed = totalProcessed + 1 
     } else if(response.status.isFailure()) { 
     log.error(s"Http response error: ${response.status.intValue()} - ${response.status.reason()}") 
     } else { 
     log.error(s"Error: ${response.status.intValue()} - ${response.status.reason()}") 
     } 

    case IsDone => 
     println(s"total processed: $totalProcessed") 
     sys.exit() 
    } 
} 

case object IsDone 

Я не знаю, является ли это наилучшим подходом к подсчету вещей, а также для обработки статуса ответа, но он работает до сих пор.

Вопрос в том, как передать исходный запрос актеру таким образом, чтобы я мог знать, какой запрос вызвал конкретную ошибку.

Мой актер мог ожидать следующий вместо этого:

case (request: String, response: HttpResponse) => 

Но как передать эту информацию, что у меня в начале моего трубопровода?

Я думал, чтобы map как это:

source 
    .map(x => (HttpRequest(uri = x.rawRequest), x.rawRequest)) 

Но я не имею ни малейшего представления о том, как стрелять поток HTTP.

Любое предложение?

+2

Попробуйте использовать пул соединений хоста вместо явного открытия исходящего соединения для каждого запроса. Эта модель требует произвольного идентификатора для запроса, который затем возвращается в ответ, чтобы вы могли правильно соотнести запрос и ответ – cmbaxter

+0

Привет @cmbaxter, вы хотите использовать этот пример? http://doc.akka.io/docs/akka/2.4.4/scala/http/client-side/host-level.html# Пример, но с помощью String? –

ответ

1

С @cmbaxter помощью я мог бы решить мою проблему, используя следующий фрагмент кода:

val poolClientFlow = Http().cachedHostConnectionPool[String](host, port) 

source 
    .map(url => HttpRequest(uri = url) -> url) 
    .via(poolClientFlow) 
    .to(Sink.actorRef(myActor, IsDone)) 
    .run() 

Теперь мой актер может получить это:

case (respTry: Try[HttpResponse], request: String) =>