2016-04-27 2 views
0

Нужно предложить, мне нужно запустить параллельные множественные диаграммы источников, например, я создал этот пример кода, где я создаю 10 графиков и выполняю их параллельно.Akka Stream - параллельные управляемые графики

Является ли это правильным подходом или я должен создать несколько источников внутри графика и запустить их параллельно на одном графике?

def createGraph(start: Int, end: Int, name: String) = { 
    RunnableGraph.fromGraph(GraphDSL.create() { 
    implicit builder => 
     import GraphDSL.Implicits._ 
     val s = Source(start to end) 
     val f = Flow[Int].map[String](x => x.toString) 
     val sink = Sink.foreach[String](x => println(name + ":" + x)) 

     val t = builder.add(s) 

     val flow1 = builder.add(f) 

     t ~> flow1 ~> sink 

     ClosedShape 
    }) 
} 


(1 to 10).map(x => createGraph(x, x + 10, "g" + x)).map(_.run()) 

Благодаря Arun

+0

Почему все, что код, чтобы сделать эквивалент: Source (начала до конца) .map (_ ToString.) .runForeach (х => Println "$ имя (s: $ x ")) –

+0

Я достал сложную обработку потока от основного кода для поддержания интеллектуальной собственности. Заявление о проблеме заключается в том, что если у меня есть несколько исходных данных, и вам нужно запустить график для каждого источника, что было бы лучше всего. Например, мы можем думать о том, что несколько источников могут думать о чтении нескольких тем кафки, преобразовании, обработке и потоке с базой данных. – ASe

+0

Подход выглядит несколько странным для меня, хотя я не могу точно указать, почему. Я бы отделил время, чтобы вещи были их собственным этапом, затем используйте Merge и/или Balance, чтобы связать их вместе, в один граф. Затем запустите этот один граф только один раз. Теперь вы создаете n «островов». – akauppi

ответ

0

Я попытался параллелизм с помощью http://doc.akka.io/docs/akka/2.4.4/scala/stream/stream-parallelism.html, это выглядит хорошо, где мой источник различны, но поток и раковина источник same.Each является моделирование в приведенном ниже примере, рассмотрим их, как вы чтение из некоторого внешнего источника в виде потока:

object TestParallelGraph extends App { 

    implicit val system = ActorSystem("test") 
    implicit val dispacher = system.dispatcher 
    implicit val materializer = ActorMaterializer() 

    val listOfDifferentSource=List(1,2,3) //consider we have to read data from various sources 


def createGraph() = { 
    RunnableGraph.fromGraph(GraphDSL.create() { 
     implicit builder => 
     import GraphDSL.Implicits._ 

     val merge=builder.add(Merge[Int](listOfDifferentSource.length)) 

     val flow=builder.add(Flow[Int].map(_ + 10)) //just random flow to add 10 

     //again as mentioned above creating source with different information to simulate 
     Source(listOfDifferentSource.head*100 to 100* listOfDifferentSource.head+10) ~> merge ~> flow ~> Sink.foreach(println) 

     for{ 
      i <- listOfDifferentSource.tail //for each other source 
     }yield (Source(100*i to 100*i+10) ~> merge) 

     ClosedShape 
    }) 
    } 

    createGraph().run() 
} 
+0

Обратите внимание, что это выполняется не параллельно, а последовательно, вам нужно отметить асинхронные границы, используя '.async', как описано в документах, к которым вы привязались. – johanandren

+0

Спасибо, Джон. Я использую kaka-stream-and-http-experimental 2.0.3, поэтому, думаю, mapAsync должен позаботиться об этом. – ASe

+0

Мой код станет val flow = builder.add (Flow [Int] .mapAsync (parallelism = 10) (_ + 10)) – ASe

Смежные вопросы