2017-01-12 2 views
1

Предположим, что у меня есть два бесконечных источника того же типа, которые могут быть подключены к одному графику. Я хочу переключиться между ними извне уже материализованного графика, может быть так же, как можно отключить один из них с помощью KillSwitch.Как переключаться между несколькими источниками?

val source1: Source[ByteString, NotUsed] = ??? 
val source2: Source[ByteString, NotUsed] = ??? 

val (switcher: Switcher, source: Source[ByteString, NotUsed]) = 
    Source.combine(source1,source2).withSwitcher.run() 

switcher.switch() 

По умолчанию я хочу использовать source1 и после того, как переключатель, я хочу использовать данные из source2

source1 
     \ 
      switcher ~> source  

source2 

Можно ли реализовать эту логику с Akka Streams?

ответ

2

Хорошо, через некоторое время я нашел решение.

Итак, здесь я могу использовать тот же принцип, что и в VLAN. Мне просто нужно пометить мои источники, а затем передать их через MergeHub. После этого легко отфильтровать эти источники по тегам и получить правильный результат в качестве источника.

Все, что мне нужно для переключения с одного на другое Источник - это изменение состояния фильтра.

source1.map(s => (tag1, s)) 
          \ 
          MergeHub.filter(_._1 == tagX).map(_._2) -> Source 
         /
source2.map(s => (tag2, s)) 

Вот несколько примеров:

object SomeSource { 

    private var current = "tag1" 

    val source1: Source[ByteString, NotUsed] = ??? 
    val source2: Source[ByteString, NotUsed] = ??? 

    def switch = { 
    current = if (current == "tag1") "tag2" else "tag1" 
    } 

    val (sink: Sink[(String, ByteString), NotUsed], 
     source: Source[ByteString, NotUsed]) = 
    MergeHub.source[(String, ByteString)] 
     .filter(_._1 == current) 
     .via(Flow[(String, ByteString)].map(_._2)) 
     .toMat(BroadcastHub.sink[ByteString])(Keep.both).run() 

    source1.map(s => ("tag1", s)).runWith(sink) 
    source2.map(s => ("tag2", s)).runWith(sink) 

} 

SomeSource.source // do something with Source 

SomeSource.switch() // then switch 
Смежные вопросы