Я пытаюсь периодически генерировать события (каждые 150 мс), даже если наблюдаемый выше поток будет быстрее отправлять события.MissingBackpressureException даже после вызова onBackpressureBlock()
Но я получаю MissingBackpressureException
, хотя я назвал onBackpressureBlock()
код:
SerializedSubject<QuotationMarker, QuotationMarker> subject = new SerializedSubject<> (PublishSubject.create());
return subject
.subscribeOn(Schedulers.computation())
.doOnSubscribe(() -> {
NetworkRequestsManager.instance().queryQuotations(productId).subscribe(quotation -> {
Log.d(TAG, "new quotation " + quotation.hashCode());
NetworkRequestsManager.instance().getSeller(quotation.sellerId)
.subscribe(seller -> {
for (Outlet outlet : seller.outlets) {
if (outlet.latitude != null && outlet.longitude != null)
subject.onNext(new QuotationMarker(outlet, quotation.price));
}
},
error -> Log.fatalError(new RuntimeException(error)));
},
error -> Log.fatalError(new RuntimeException(error)));
})
.doOnError(throwable -> Log.fatalError(new RuntimeException(
"error response in subscribe after doOnSubscribe",
throwable)))
// combine with another observable that emits items regularly (every 100ms)
// so that a new event is received every 100ms :
// also, first event itself is delayed.
.zipWith(Observable.interval(150, TimeUnit.MILLISECONDS),
(seller, aLong) -> seller)
.onBackpressureBlock() // prevent zipWith Observer.interval from throwing MissingBackpressureException s
.doOnError(throwable -> Log.fatalError(new RuntimeException(
"error response after onBackpressureBlock()",
throwable))); // <-- error is thrown here
след:
05-06 00:38:25.532 28106-28166/com.instano.buyer W/System.err﹕ java.lang.RuntimeException: error response after onBackpressureBlock()
05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.err﹕ at com.instano.retailer.instano.application.controller.Quotations.lambda$fetchQuotationMarkersForProduct$59(Quotations.java:67)
05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.err﹕ at com.instano.retailer.instano.application.controller.Quotations.access$lambda$5(Quotations.java)
05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.err﹕ at com.instano.retailer.instano.application.controller.Quotations$$Lambda$8.call(Unknown Source)
05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.err﹕ at rx.Observable$11.onError(Observable.java:4193)
05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.err﹕ at rx.internal.operators.OperatorDoOnEach$1.onError(OperatorDoOnEach.java:65)
05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.err﹕ at rx.internal.operators.OperatorOnBackpressureBlock$BlockingSubscriber.complete(OperatorOnBackpressureBlock.java:81)
05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.err﹕ at rx.internal.util.BackpressureDrainManager.drain(BackpressureDrainManager.java:190)
05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.err﹕ at rx.internal.util.BackpressureDrainManager.terminateAndDrain(BackpressureDrainManager.java:129)
05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.err﹕ at rx.internal.operators.OperatorOnBackpressureBlock$BlockingSubscriber.onError(OperatorOnBackpressureBlock.java:68)
05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.err﹕ at rx.internal.operators.OperatorZip$Zip$InnerSubscriber.onError(OperatorZip.java:324)
05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.err﹕ at rx.internal.operators.OperatorZip$Zip$InnerSubscriber.onNext(OperatorZip.java:332)
05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.err﹕ at rx.internal.operators.OnSubscribeTimerPeriodically$1.call(OnSubscribeTimerPeriodically.java:51)
05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.err﹕ at rx.Scheduler$Worker$1.call(Scheduler.java:120)
05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.err﹕ at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)
05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.err﹕ at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:390)
05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.err﹕ at java.util.concurrent.FutureTask.run(FutureTask.java:234)
05-06 00:38:25.582 28106-28166/com.instano.buyer W/System.err﹕ at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:153)
05-06 00:38:25.592 28106-28166/com.instano.buyer W/System.err﹕ at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:267)
05-06 00:38:25.602 28106-28166/com.instano.buyer W/System.err﹕ at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1080)
05-06 00:38:25.602 28106-28166/com.instano.buyer W/System.err﹕ at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:573)
05-06 00:38:25.602 28106-28166/com.instano.buyer W/System.err﹕ at java.lang.Thread.run(Thread.java:841)
05-06 00:38:25.602 28106-28166/com.instano.buyer W/System.err﹕ Caused by: rx.exceptions.MissingBackpressureException
05-06 00:38:25.612 28106-28166/com.instano.buyer W/System.err﹕ at rx.internal.util.RxRingBuffer.onNext(RxRingBuffer.java:349)
05-06 00:38:25.642 28106-28166/com.instano.buyer W/System.err﹕ at rx.internal.operators.OperatorZip$Zip$InnerSubscriber.onNext(OperatorZip.java:330)
05-06 00:38:25.642 28106-28166/com.instano.buyer W/System.err﹕ ... 10 more
PS: Log.fatalError(err)
только моя оберткой Android.util.Log.e(...)
EDIT
После большого количества проб и ошибок, это становится wont fix
для меня. zipWith(Observable.interval...)
кажется виновником и возможной ошибкой каркаса. Удаляя эти строки (и, следовательно, мою функцию периодического выделения), мой код работает. Я использую тему, которая, вероятно, вызывает onNext
из разных потоков, а затем я выполняю на нем Obeservable операторы.
Я пробовал это.Еще одна вещь – vedant1811
Вы оба пытались? Из вашего описания трудно понять, какой из них слишком быстр. – zsxwing