2017-02-08 3 views
0

Я пытаюсь подключить абонента реактивированного потока к источнику akka.Невозможно использовать recivestream Абонент с источниками потока akka

Мой источник, кажется, работает нормально с простой раковиной (например, foreach) - но если я положил реальную раковину, сделанную от подписчика, я ничего не получаю.

Мой контекст:

import akka.actor.ActorSystem 
import akka.stream.ActorMaterializer 
import akka.stream.scaladsl.{Sink, Source} 
import org.reactivestreams.{Subscriber, Subscription} 

implicit val system = ActorSystem.create("test") 
implicit val materializer = ActorMaterializer.create(system) 

class PrintSubscriber extends Subscriber[String] { 
    override def onError(t: Throwable): Unit = {} 
    override def onSubscribe(s: Subscription): Unit = {} 
    override def onComplete(): Unit = {} 
    override def onNext(t: String): Unit = { 
    println(t) 
    } 
} 

и мой тестовый пример:

val subscriber = new PrintSubscriber() 
val sink = Sink.fromSubscriber(subscriber) 

val source2 = Source.fromIterator(() => Iterator("aaa", "bbb", "ccc")) 
val source1 = Source.fromIterator(() => Iterator("xxx", "yyy", "zzz")) 
source1.to(sink).run()(materializer) 
source2.runForeach(println) 

Я получаю выход:

aaa 
bbb 
ccc 

Почему я не могу получить ххх, ууу, и ZZZ?

ответ

2

Приводя спецификации Реактивных Streams для ниже Subscriber:

Получит вызов onSubscribe (Subscription) сразу после прохождения экземпляра Абонента Publisher.subscribe (Subscriber). Никаких дополнительных уведомлений не будет получено до тех пор, пока не будет вызван абонемент Subscription (long) .

Наименьшее изменение, которое вы можете сделать, чтобы увидеть некоторые элементы, протекающие на ваш абонент является

override def onSubscribe(s: Subscription): Unit = { 
    s.request(3) 
} 

Однако, имейте в виду, что это не сделает его полностью совместимым с реактивными Streams specs. Это не так-то просто реализовать, это главная причина, лежащая в основе инструментальных средств высшего уровня, таких как Akka-Streams.

Смежные вопросы