2016-11-05 3 views
6

У меня есть следующая простая иерархия случая класса:Akka Streams разделить поток по типу

sealed trait Message 
case class Foo(bar: Int) extends Message 
case class Baz(qux: String) extends Message 

И у меня есть Flow[Message, Message, NotUsed] (от WebSocket на основе протокола с кодеком уже на месте).

Я хочу демультиплексировать этот Flow[Message] в отдельные потоки для типов Foo и Baz, так как они обрабатываются совершенно разными путями.

Каков самый простой способ сделать это? Должно быть очевидно, но мне что-то не хватает ...

ответ

5

Один из способов - создать RunnableGraph, который включает потоки для каждого типа сообщений.

val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] => 

    val in = Source(...) // Some message source 
    val out = Sink.ignore 

    val foo = builder.add(Flow[Message].map (x => x match { case [email protected](_) => f })) 
    val baz = builder.add(Flow[Message].map (x => x match { case [email protected](_) => b })) 
    val partition = builder.add(Partition[Message](2, { 
    case Foo(_) => 0 
    case Baz(_) => 1 
    })) 

    partition ~> foo ~> // other Flow[Foo] here ~> out 
    partition ~> baz ~> // other Flow[Baz] here ~> out 

    ClosedShape 
} 

g.run() 
+0

Право, раздел. Хорошо, я могу сделать именно это. Вероятно, для этого было бы полезно иметь встроенный комбинатор; возможно, я сделаю запрос на тяну. –

+0

@AlexanderTemerev Это может быть интересно: http://doc.akka.io/api/akka/2.4/?_ga=1.34091558.643806930.1478315511#akka.stream.scaladsl.Partition – Brian

Смежные вопросы