Я пытаюсь отключить поток, преобразующий некоторые числа и создающий другой поток того же плана графика. Однако второй экземпляр потока не запускается или, по крайней мере, он ничего не выводит на консоль.Akka Streams: создание другого RunnableGraph после завершения работы с KillSwitch
Что мне не хватает?
object KillSwitchSample extends App {
implicit val actorSystem = ActorSystem()
implicit val materializer = ActorMaterializer()
val killSwitch = KillSwitches.shared("switch")
val stream1 = createStream("stream 1")
stream1.run()
Thread.sleep(200)
killSwitch.shutdown()
val stream2 = createStream("stream 2")
stream2.run()
Thread.sleep(200)
killSwitch.shutdown()
def createStream(streamName: String): RunnableGraph[NotUsed] = {
Source.fromGraph(new NumbersSource)
.via(killSwitch.flow)
.map(el => s"$streamName: $el")
.to(Sink.foreach(println))
}
}
class NumbersSource extends GraphStage[SourceShape[Int]] {
val out: Outlet[Int] = Outlet("NumbersSource")
override val shape: SourceShape[Int] = SourceShape(out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) {
private var counter = 1
setHandler(out, new OutHandler {
override def onPull(): Unit = {
push(out, counter)
counter += 1
}
})
}
}
Спасибо, я не был в курсе, что и не уделяют достаточного внимания [документации] (HTTP: //doc.akka.io/docs/akka/2.4/scala/stream/stream-dynamic.html#Controlling_graph_completion_with_KillSwitch), говорящий, что любой тип 'KillSwitch' (а не только' SharedKillSwitch') игнорирует последующие вызовы 'shutdown()' или 'abort()'. – Toaditoad