2016-04-17 3 views
5

Имейте поток с пользовательскими потоками, и на определенном этапе я хочу разделить поток и иметь две альтернативные обработки данных, которые снова сольются позже.Альтернативные потоки, основанные на условии для потока akka

E.g.

    -> F3 -> F6 
Src -> F1 -> F2    > Merge -> Sink 
        -> F4 -> F5 

F2 должны иметь условие о том, если данные содержат формат A, то он должен идти течь F3, иначе перейти к F4.

Насколько я могу видеть, каждый поток может иметь только один порт в каждом направлении (или два, если bidi) - так как я могу поддерживать такой поток?

ответ

11

Вы можете использовать Broadcast для разделения потока, тогда вы сможете использовать filter или collect для каждого потока, чтобы отфильтровать требуемые данные.

val split = builder.add(Broadcast[Int](2)) 

Src -> F1 -> split -> filterCondA -> F3 -> F6 -> Merge -> Sink 
        -> filterCondB -> F4 -> F5 -> Merge 

Кроме того, есть Partition этапа, который обрабатывает количество выходных портов и функцию карты от значения к номеру порта f: T => Int.

val portMapper(value: T): Int = value match { 
    case CondA => 0 
    case CondB => 1 
} 

val split = builder.add(Partition[T](2, portMapper)) 

Src -> F1 -> split -> F3 -> F6 -> Merge -> Sink 
      split -> F4 -> F5 -> Merge 

Возможно, есть более простой способ.

+0

Благодарим вас, вы спасите мой день. Мы можем создать val filterCondA = Flow [Int] .filter() –

Смежные вопросы