2017-01-31 3 views
0

Предположим, у меня есть контроллер, который обрабатывает команды, которые я получаю от websocket.Подход Akka-http для обработки команд websocket

class WebSocketController(implicit M: Materializer) 
    extends Controller 
    with CirceSupport { 

    override def route: Route = path("ws") { 
    handleWebSocketMessages(wsFlow) 
    } 

    def wsFlow: Flow[Message, Message, Any] = 
    Flow[Message].mapConcat { 
     case tm: TextMessage => 
     decode[Command](tm.getStrictText) match { 
      // todo pass this command to some actor or service 
      // and get response and reply back. 
      case Right(AuthorizeUser(login, password)) => 
      TextMessage(s"Authorized: $login, $password") :: Nil 
      case _ => 
      Nil 
     } 

     case bm: BinaryMessage => 
     bm.dataStream.runWith(Sink.ignore) 
     Nil 
    } 

} 

Таким образом, я получаю команду, десериализациям это и следующий шаг, который я хочу сделать, это передать его в какую-то службу или актер, который вернет мне Future[SomeReply].

Вопрос: Каков основной подход к обработке такого потока потоками akka?

ответ

1

При обработке Future с внутри Flow, mapAsync обычно является тем, что вы ищете. Для того, чтобы добавить в свой конкретный пример:

def asyncOp(msg: TextMessage): Future[SomeReply] = ??? 
def tailorResponse(r: SomeReply): TextMessage = ??? 

def wsFlow: Flow[Message, Message, Any] = 
    Flow[Message] 
    .mapConcat {...} 
    .mapAsync(parallelism = 4)(asyncOp) 
    .via(tailorResponse) 

mapAsyncUnordered также может быть использован в случае, порядок в Future сек результат не имеет значения. Параллелизм показывает, сколько Future s может быть запущено в одно и то же время перед прохождением ступени.

Смотрите также

  • этап docs
  • , как использовать в сочетании с ask - here
Смежные вопросы