2017-02-02 2 views
0

Трубопровод потока Akka работает нормально, не принимая (2), но с помощью take (2) не выводит никакого выхода. Как функция take() имеет значение?Почему поток потока Akka не работает?

import akka.NotUsed 
import akka.actor.ActorSystem 
import akka.stream._ 
import akka.stream.scaladsl._ 
import scala.util.Try 
import scala.concurrent.ExecutionContext.Implicits._ 

object TestStream { 
    implicit val system = ActorSystem("MyTest") 
    implicit val Matt = ActorMaterializer() 

    def mainflow():Unit ={ 
     val grp = RunnableGraph.fromGraph(GraphDSL.create() { 
     implicit builder => 
     import GraphDSL.Implicits._ 
     val src:Outlet[Int] = builder.add(Source(1 to 10)).out 
     val flw1:FlowShape[Int, Int] = builder.add(Flow[Int].map(x => x*10).take(2)) 
     val flw2:FlowShape[Int, Int] = builder.add(Flow[Int].filter(x => x > 50)) 
     val snk2:Inlet[Any] = builder.add(Sink.foreach(println)).in 
     src ~> flw1 ~> flw2 ~> snk2 
     ClosedShape 
     }).run 
    } 
    def main(str:Array[String]):Unit = { 
     mainflow() 
    } 
} 

Просто для справки добавлена ​​результата теста без дубля(): искры \ Bin \ искровым отправить --class TestStream --master местного [2] --jars конфиг-1.2.1.jar, akka-actor_2.11-2.4.2.jar, reactive-s treams-1.0.0.jar, akka-stream_2.11-2.4.2.jar target \ scala-2.11 \ simple-project_2.1 1-1.0.jar 60 70 80 90 100

+0

Что вы сделали для устранения этой проблемы? Кажется, указывается некоторая промежуточная регистрация значений. –

ответ

2

График работает как следует. Подумайте о том, что здесь происходит:

  1. вы создаете ленивый источник Интс: [1, 2, 3, ... 10]
  2. вы множественным их на 10, так что ваш поток [10, 20, 30 ... 100]
  3. вы take в первые два элемента и завершить поток , игнорируя остальные: [10, 20]
  4. фильтровать поток для тех, кто больше, чем 50, что исключает 10 и 20: []

Поэтому вы ничего не печатаете.

Если удалить take(2) полный набор из 10 элементов, достигнет мойку, и он будет печатать:

60 
70 
80 
90 
100 

Это действительно так же, как работает это в консоли Scala (но ленивее):

(1 to 10).map(_ * 10).take(2).filter(_ > 50).foreach(println) 
+0

Благодарим вас за объяснение. – Advika

Смежные вопросы