2015-06-18 5 views
1

я просто следовал поток AKKA ActorPublisher примера и когда я получил это сообщение:Akka поток OnNext не допускается

java.lang.IllegalStateException: onNext не допускается, если поток не запрашивал элементы, totalDemand был 0

глядя на документы, они объясняют:

Вы отправляете элементы потока, вызвав onNext. Вам разрешено отправлять столько элементов, сколько было запрошено абонентом потока. Эта сумма может быть запрошена с помощью totalDemand. Разрешено использовать только use onNext, когда isActive и totalDemand> 0, иначе onNext будет throw IllegalStateException.

Когда абонент потока запрашивает больше элементов, этому актеру достается сообщение ActorPublisherMessage.Request, и вы можете принять участие в этом мероприятии. TotalDemand обновляется автоматически.

Как я могу предотвратить, что totalDemand равен нулю? Когда я получил эту ошибку, я потерял сообщение, которое пытался отправить.

Вот пример я следующее:

http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-RC3/scala/stream-integrations.html

и это мой тестовый класс

object Test extends App { 

    implicit val actorSystem = ActorSystem("ReactiveKafka") 
    implicit val materializer = ActorFlowMaterializer() 

    val kafka = new ReactiveKafka(host = "localhost:9092", zooKeeperHost = "localhost:2181") 
    val publisher = kafka.consume("test", "groupName", new StringDecoder()) 

    val workerActor = actorSystem.actorOf(Props[Worker], name = "workerActor") 

    Source(publisher).map(WorkerPool.Msg(_, workerActor)).runWith(Sink.actorSubscriber(WorkerPool.props)) 

} 

Ну, я получил сообщение от Кафки и я передаю в WorkerActor, однако при отправке в Kafka, как 10 сообщений/сек, некоторые из них теряются из-за этой ошибки.

UPDATE

Я столкнулся ошибку, описанную здесь (с помощью той же библиотеки):

https://github.com/softwaremill/reactive-kafka/issues/11

Я решил мину с использованием буфера, но выглядит как этот PR решит проблему ,

https://github.com/softwaremill/reactive-kafka/pull/13

ответ

2

Если вниз по течению раковина не имеет спроса, то только ваши варианты

  1. говорят источник данных кормления Worker, что нет спроса, так что источник может остановить производство сообщения пока не появится больше спроса (реактивное решение).
  2. буферизуйте сообщения, пока не получите какой-либо запрос от раковины, который потенциально может заполнить ваш буфер, и вы все равно отправляете сообщения.
  3. отбрасывать сообщения, когда спрос равен 0 (что кажется вашей текущей реализацией).

Но все, что касается «противодавления», заключается в том, чтобы предотвратить, чтобы onNext вызывался, когда спроса нет.

Для реализации buffering выше опции вы можешь либо буфер внутри актера или за его пределами:

  • Внутреннего буфер: смотреть на примере «ActorPublisher» в documentation для примера буферизации в Актере, который питает ActorPublisher.
  • Внешний буфер: используйте внешний буфер, используя либо буферный материализатор, либо Flow.buffer в вашем потоке.
+0

неявное вал Materializer = ActorFlowMaterializer ( ActorFlowMaterializerSettings (actorSystem) .withInputBuffer (initialSize = 1024, MaxSize = 1024)) –

+0

реализации выше код, работал для меня. Спасибо. –

+0

Приятно слышать. Я помню, что есть некоторые предостережения для использования внешнего буфера, а не для хранения буфера внутри самого Актера. Следите за этим ... –

Смежные вопросы