2016-12-30 1 views
0

Я пытаюсь отключить поток, преобразующий некоторые числа и создающий другой поток того же плана графика. Однако второй экземпляр потока не запускается или, по крайней мере, он ничего не выводит на консоль.Akka Streams: создание другого RunnableGraph после завершения работы с KillSwitch

Что мне не хватает?

object KillSwitchSample extends App { 
    implicit val actorSystem = ActorSystem() 
    implicit val materializer = ActorMaterializer() 

    val killSwitch = KillSwitches.shared("switch") 

    val stream1 = createStream("stream 1") 
    stream1.run() 
    Thread.sleep(200) 
    killSwitch.shutdown() 

    val stream2 = createStream("stream 2") 
    stream2.run() 
    Thread.sleep(200) 
    killSwitch.shutdown() 

    def createStream(streamName: String): RunnableGraph[NotUsed] = { 
    Source.fromGraph(new NumbersSource) 
     .via(killSwitch.flow) 
     .map(el => s"$streamName: $el") 
     .to(Sink.foreach(println)) 
    } 
} 

class NumbersSource extends GraphStage[SourceShape[Int]] { 
    val out: Outlet[Int] = Outlet("NumbersSource") 
    override val shape: SourceShape[Int] = SourceShape(out) 

    override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = 
    new GraphStageLogic(shape) { 
     private var counter = 1 

     setHandler(out, new OutHandler { 
     override def onPull(): Unit = { 
      push(out, counter) 
      counter += 1 
     } 
     }) 
    } 
} 

ответ

4

Вы используете общий KillSwitch. Общедоступный KillSwitch может быть переключен только один раз, после чего он помнит, что он уже был переключен, и поэтому немедленно убивает и завершение последующих потоков.

Это то, что происходит с вашим кодом. Вы запускали переключатель kill перед запуском графика во второй раз.

Вы можете использовать KillSwitches.single вместо того, чтобы получить новый KillSwitch каждый раз:

def createStream(streamName: String): RunnableGraph[UniqueKillSwitch] = 
    Source.fromGraph(new NumbersSource) 
    .map(el => s"$streamName: $el") 
    .viaMat(KillSwitches.single)(Keep.right) 
    .to(Sink.foreach(println)) 

val switch1 = createStream("a").run() 
// ... 
switch1.shutdown() 

val switch2 = createStream("b").run() 
// ... 
switch2.shutdown() 
+0

Спасибо, я не был в курсе, что и не уделяют достаточного внимания [документации] (HTTP: //doc.akka.io/docs/akka/2.4/scala/stream/stream-dynamic.html#Controlling_graph_completion_with_KillSwitch), говорящий, что любой тип 'KillSwitch' (а не только' SharedKillSwitch') игнорирует последующие вызовы 'shutdown()' или 'abort()'. – Toaditoad

Смежные вопросы