я просто следовал поток AKKA ActorPublisher примера и когда я получил это сообщение:Akka поток OnNext не допускается
java.lang.IllegalStateException: onNext не допускается, если поток не запрашивал элементы, totalDemand был 0
глядя на документы, они объясняют:
Вы отправляете элементы потока, вызвав onNext. Вам разрешено отправлять столько элементов, сколько было запрошено абонентом потока. Эта сумма может быть запрошена с помощью totalDemand. Разрешено использовать только use onNext, когда isActive и totalDemand> 0, иначе onNext будет throw IllegalStateException.
Когда абонент потока запрашивает больше элементов, этому актеру достается сообщение ActorPublisherMessage.Request, и вы можете принять участие в этом мероприятии. TotalDemand обновляется автоматически.
Как я могу предотвратить, что totalDemand равен нулю? Когда я получил эту ошибку, я потерял сообщение, которое пытался отправить.
Вот пример я следующее:
http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-RC3/scala/stream-integrations.html
и это мой тестовый класс
object Test extends App {
implicit val actorSystem = ActorSystem("ReactiveKafka")
implicit val materializer = ActorFlowMaterializer()
val kafka = new ReactiveKafka(host = "localhost:9092", zooKeeperHost = "localhost:2181")
val publisher = kafka.consume("test", "groupName", new StringDecoder())
val workerActor = actorSystem.actorOf(Props[Worker], name = "workerActor")
Source(publisher).map(WorkerPool.Msg(_, workerActor)).runWith(Sink.actorSubscriber(WorkerPool.props))
}
Ну, я получил сообщение от Кафки и я передаю в WorkerActor, однако при отправке в Kafka, как 10 сообщений/сек, некоторые из них теряются из-за этой ошибки.
UPDATE
Я столкнулся ошибку, описанную здесь (с помощью той же библиотеки):
https://github.com/softwaremill/reactive-kafka/issues/11
Я решил мину с использованием буфера, но выглядит как этот PR решит проблему ,
https://github.com/softwaremill/reactive-kafka/pull/13
неявное вал Materializer = ActorFlowMaterializer ( ActorFlowMaterializerSettings (actorSystem) .withInputBuffer (initialSize = 1024, MaxSize = 1024)) –
реализации выше код, работал для меня. Спасибо. –
Приятно слышать. Я помню, что есть некоторые предостережения для использования внешнего буфера, а не для хранения буфера внутри самого Актера. Следите за этим ... –