Трубопровод потока Akka работает нормально, не принимая (2), но с помощью take (2) не выводит никакого выхода. Как функция take() имеет значение?Почему поток потока Akka не работает?
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream._
import akka.stream.scaladsl._
import scala.util.Try
import scala.concurrent.ExecutionContext.Implicits._
object TestStream {
implicit val system = ActorSystem("MyTest")
implicit val Matt = ActorMaterializer()
def mainflow():Unit ={
val grp = RunnableGraph.fromGraph(GraphDSL.create() {
implicit builder =>
import GraphDSL.Implicits._
val src:Outlet[Int] = builder.add(Source(1 to 10)).out
val flw1:FlowShape[Int, Int] = builder.add(Flow[Int].map(x => x*10).take(2))
val flw2:FlowShape[Int, Int] = builder.add(Flow[Int].filter(x => x > 50))
val snk2:Inlet[Any] = builder.add(Sink.foreach(println)).in
src ~> flw1 ~> flw2 ~> snk2
ClosedShape
}).run
}
def main(str:Array[String]):Unit = {
mainflow()
}
}
Просто для справки добавлена результата теста без дубля(): искры \ Bin \ искровым отправить --class TestStream --master местного [2] --jars конфиг-1.2.1.jar, akka-actor_2.11-2.4.2.jar, reactive-s treams-1.0.0.jar, akka-stream_2.11-2.4.2.jar target \ scala-2.11 \ simple-project_2.1 1-1.0.jar 60 70 80 90 100
Что вы сделали для устранения этой проблемы? Кажется, указывается некоторая промежуточная регистрация значений. –