2016-04-15 2 views
0

Я пытаюсь выполнить следующий код, основанный на Akka потока краткое руководство:Akka поток - Source.fromPublisher

implicit val system = ActorSystem("QuickStart") 
implicit val materializer = ActorMaterializer() 

val songs = Source.fromPublisher(SongsService.stream) 

val count: Flow[Song, Int, NotUsed] = Flow[Song].map(_ => 1) 

val sumSink: Sink[Int, Future[Int]] = Sink.fold[Int, Int](0)(_ + _) 

val counterGraph: RunnableGraph[Future[Int]] = 
    songs 
    .via(count) 
    .toMat(sumSink)(Keep.right) 

val sum: Future[Int] = counterGraph.run() 

sum.foreach(c => println(s"Total songs processed: $c")) 

Проблема здесь состоит в том, что будущее никогда не возвращают результат. Самое большое отличие от примера документации - мой источник.

У меня есть игры нумератор, который я преобразовывая его в Акко Publisher, в результате чего в этом SongsService.stream

При использовании определенного списка в качестве источника как:

val songs = Source(list) 

Он работает , но использовать Source.fromPublisher нет.

Но проблема здесь не является издателем на самом деле, я могу сделать простую операцию, и она работает:

val songs = Source.fromPublisher(SongsService.stream) 
songs.runForeach(println) 

Он проходит через базу данных, создавать плей нумератор, преобразовать его в издательство, и я могу перебирать.

Любые идеи?

+1

Действительно ли 'songs.runForeach (println)' действительно возвращает 'Done', если вы его ждете? – Mullefa

+0

@Mullefa просто поняла, что это не так. –

ответ

4

Ваш издатель, вероятно, никогда не закончит.

Смежные вопросы