Я пытаюсь выполнить следующий код, основанный на Akka потока краткое руководство:Akka поток - Source.fromPublisher
implicit val system = ActorSystem("QuickStart")
implicit val materializer = ActorMaterializer()
val songs = Source.fromPublisher(SongsService.stream)
val count: Flow[Song, Int, NotUsed] = Flow[Song].map(_ => 1)
val sumSink: Sink[Int, Future[Int]] = Sink.fold[Int, Int](0)(_ + _)
val counterGraph: RunnableGraph[Future[Int]] =
songs
.via(count)
.toMat(sumSink)(Keep.right)
val sum: Future[Int] = counterGraph.run()
sum.foreach(c => println(s"Total songs processed: $c"))
Проблема здесь состоит в том, что будущее никогда не возвращают результат. Самое большое отличие от примера документации - мой источник.
У меня есть игры нумератор, который я преобразовывая его в Акко Publisher, в результате чего в этом SongsService.stream
При использовании определенного списка в качестве источника как:
val songs = Source(list)
Он работает , но использовать Source.fromPublisher нет.
Но проблема здесь не является издателем на самом деле, я могу сделать простую операцию, и она работает:
val songs = Source.fromPublisher(SongsService.stream)
songs.runForeach(println)
Он проходит через базу данных, создавать плей нумератор, преобразовать его в издательство, и я могу перебирать.
Любые идеи?
Действительно ли 'songs.runForeach (println)' действительно возвращает 'Done', если вы его ждете? – Mullefa
@Mullefa просто поняла, что это не так. –