2015-12-13 4 views
1

Я пытаюсь создать конечную точку на моем сервере Akka Http, который сообщает пользователям об этом IP-адрес с помощью внешней службы (я знаю, что это можно сделать проще, но я делаю это как вызов).Akka Streams с сервером и клиентом Akka HTTP

Код, который не использует потоков на верхнем слое наиболее заключается в следующем:

implicit val system = ActorSystem() 
implicit val materializer = ActorMaterializer() 

val requestHandler: HttpRequest => Future[HttpResponse] = { 
    case HttpRequest(GET, Uri.Path("/"), _, _, _) => 
    Http().singleRequest(HttpRequest(GET, Uri("http://checkip.amazonaws.com/"))).flatMap { response => 
     response.entity.dataBytes.runFold(ByteString(""))(_ ++ _) map { string => 
     HttpResponse(entity = HttpEntity(MediaTypes.`text/html`, 
      "<html><body><h1>" + string.utf8String + "</h1></body></html>")) 
     } 
    } 

    case _: HttpRequest => 
    Future(HttpResponse(404, entity = "Unknown resource!")) 
} 

Http().bindAndHandleAsync(requestHandler, "localhost", 8080) 

и работает нормально. Однако, как вызов, я хотел ограничиться только использованием потоков (нет Future).

Это макет, который я думал использовать для такого подхода: Source[Request] -> Flow[Request, Request] -> Flow[Request, Response] ->Flow[Response, Response] и для размещения маршрута 404, также Source[Request] -> Flow[Request, Response]. Теперь, если мои знания Akka Stream хорошо помогают мне, мне нужно использовать Flow.fromGraph для такой вещи, однако, это то место, где я застрял.

В Future Я могу сделать легкую карту и flatMap для различных конечных точек, но в потоках, что означало бы разделение потока на несколько потоков, и я не совсем уверен, как бы я это сделал. Я думал об использовании UnzipWith и опций или общей трансляции.

Любая помощь по этому вопросу была бы высоко оценена.


Не знаю, если это потребуется? - http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0-M2/scala/stream-customize.html

+2

Где проблема в создании надуманной задачи, а затем ожидать, что кто-то другой ее разрешит? :) – pvg

+1

@pvg Touché haha. Я удалю свой вопрос и спрошу новый, в котором я заинтересован в разделении потока на несколько потоков, поскольку это не документировано в документах (я сомневаюсь, что эта функция будет идти против потоков вообще, но я неуверен). – Martijn

ответ

4

Вам не нужно использовать Flow.fromGraph. Вместо этого единственного числа потока, который использует flatMapConcat будет работать:

//an outgoing connection flow 
val checkIPFlow = Http().outgoingConnection("checkip.amazonaws.com") 

//converts the final html String to an HttpResponse 
def byteStrToResponse(byteStr : ByteString) = 
    HttpResponse(entity = new Default(MediaTypes.`text/html`, 
            byteStr.length, 
            Source.single(byteStr))) 

val reqResponseFlow = Flow[HttpRequest].flatMapConcat[HttpResponse](_ match { 
    case HttpRequest(GET, Uri.Path("/"), _, _, _) => 
    Source.single(HttpRequest(GET, Uri("http://checkip.amazonaws.com/"))) 
      .via(checkIPFlow) 
      .mapAsync(1)(_.entity.dataBytes.runFold(ByteString(""))(_ ++ _)) 
      .map("<html><body><h1>" + _.utf8String + "</h1></body></html>") 
      .map(ByteString.apply) 
      .map(byteStrToResponse) 

    case _ => 
    Source.single(HttpResponse(404, entity = "Unknown resource!"))  
}) 

Этот поток может быть использован для связывания входящих запросов:

Http().bindAndHandle(reqResponseFlow, "localhost", 8080) 

И все без Futures ...

+0

Не думал о flatMap как решение с источником. Принято как ответ! – Martijn

+0

@MartijnR Я рад, что мог бы по-другому взглянуть на проблему. Счастливый взлом! –

+0

@RamonJRomeroyVigil вы можете взглянуть на этот вопрос, пожалуйста? http://stackoverflow.com/questions/36776762/akka-stream-akka-http-get-request-on-error –

Смежные вопросы