2015-05-05 5 views
6

Я пытаюсь периодически генерировать события (каждые 150 мс), даже если наблюдаемый выше поток будет быстрее отправлять события.MissingBackpressureException даже после вызова onBackpressureBlock()

Но я получаю MissingBackpressureException, хотя я назвал onBackpressureBlock()

код:

SerializedSubject<QuotationMarker, QuotationMarker> subject = new SerializedSubject<> (PublishSubject.create()); 

    return subject 
      .subscribeOn(Schedulers.computation()) 
      .doOnSubscribe(() -> { 
       NetworkRequestsManager.instance().queryQuotations(productId).subscribe(quotation -> { 
          Log.d(TAG, "new quotation " + quotation.hashCode()); 
          NetworkRequestsManager.instance().getSeller(quotation.sellerId) 
            .subscribe(seller -> { 
               for (Outlet outlet : seller.outlets) { 
                if (outlet.latitude != null && outlet.longitude != null) 
                 subject.onNext(new QuotationMarker(outlet, quotation.price)); 
               } 
              }, 
              error -> Log.fatalError(new RuntimeException(error))); 
         }, 
         error -> Log.fatalError(new RuntimeException(error))); 

      }) 
      .doOnError(throwable -> Log.fatalError(new RuntimeException(
        "error response in subscribe after doOnSubscribe", 
        throwable))) 
        // combine with another observable that emits items regularly (every 100ms) 
        // so that a new event is received every 100ms : 
        // also, first event itself is delayed. 
      .zipWith(Observable.interval(150, TimeUnit.MILLISECONDS), 
        (seller, aLong) -> seller) 
      .onBackpressureBlock() // prevent zipWith Observer.interval from throwing MissingBackpressureException s 
      .doOnError(throwable -> Log.fatalError(new RuntimeException(
        "error response after onBackpressureBlock()", 
        throwable))); // <-- error is thrown here 

след:

05-06 00:38:25.532 28106-28166/com.instano.buyer W/System.err﹕ java.lang.RuntimeException: error response after onBackpressureBlock() 
    05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.err﹕ at com.instano.retailer.instano.application.controller.Quotations.lambda$fetchQuotationMarkersForProduct$59(Quotations.java:67) 
    05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.err﹕ at com.instano.retailer.instano.application.controller.Quotations.access$lambda$5(Quotations.java) 
    05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.err﹕ at com.instano.retailer.instano.application.controller.Quotations$$Lambda$8.call(Unknown Source) 
    05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.err﹕ at rx.Observable$11.onError(Observable.java:4193) 
    05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.err﹕ at rx.internal.operators.OperatorDoOnEach$1.onError(OperatorDoOnEach.java:65) 
    05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.err﹕ at rx.internal.operators.OperatorOnBackpressureBlock$BlockingSubscriber.complete(OperatorOnBackpressureBlock.java:81) 
    05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.err﹕ at rx.internal.util.BackpressureDrainManager.drain(BackpressureDrainManager.java:190) 
    05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.err﹕ at rx.internal.util.BackpressureDrainManager.terminateAndDrain(BackpressureDrainManager.java:129) 
    05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.err﹕ at rx.internal.operators.OperatorOnBackpressureBlock$BlockingSubscriber.onError(OperatorOnBackpressureBlock.java:68) 
    05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.err﹕ at rx.internal.operators.OperatorZip$Zip$InnerSubscriber.onError(OperatorZip.java:324) 
    05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.err﹕ at rx.internal.operators.OperatorZip$Zip$InnerSubscriber.onNext(OperatorZip.java:332) 
    05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.err﹕ at rx.internal.operators.OnSubscribeTimerPeriodically$1.call(OnSubscribeTimerPeriodically.java:51) 
    05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.err﹕ at rx.Scheduler$Worker$1.call(Scheduler.java:120) 
    05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.err﹕ at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55) 
    05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.err﹕ at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:390) 
    05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.err﹕ at java.util.concurrent.FutureTask.run(FutureTask.java:234) 
    05-06 00:38:25.582 28106-28166/com.instano.buyer W/System.err﹕ at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:153) 
    05-06 00:38:25.592 28106-28166/com.instano.buyer W/System.err﹕ at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:267) 
    05-06 00:38:25.602 28106-28166/com.instano.buyer W/System.err﹕ at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1080) 
    05-06 00:38:25.602 28106-28166/com.instano.buyer W/System.err﹕ at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:573) 
    05-06 00:38:25.602 28106-28166/com.instano.buyer W/System.err﹕ at java.lang.Thread.run(Thread.java:841) 
    05-06 00:38:25.602 28106-28166/com.instano.buyer W/System.err﹕ Caused by: rx.exceptions.MissingBackpressureException 
    05-06 00:38:25.612 28106-28166/com.instano.buyer W/System.err﹕ at rx.internal.util.RxRingBuffer.onNext(RxRingBuffer.java:349) 
    05-06 00:38:25.642 28106-28166/com.instano.buyer W/System.err﹕ at rx.internal.operators.OperatorZip$Zip$InnerSubscriber.onNext(OperatorZip.java:330) 
    05-06 00:38:25.642 28106-28166/com.instano.buyer W/System.err﹕ ... 10 more 

PS: Log.fatalError(err) только моя оберткой Android.util.Log.e(...)

EDIT

После большого количества проб и ошибок, это становится wont fix для меня. zipWith(Observable.interval...) кажется виновником и возможной ошибкой каркаса. Удаляя эти строки (и, следовательно, мою функцию периодического выделения), мой код работает. Я использую тему, которая, вероятно, вызывает onNext из разных потоков, а затем я выполняю на нем Obeservable операторы.

ответ

2

Я думаю (но я не уверен), что проблема в том, что ваша конфигурация противодавления выполняется после оператора zip.

Оператор zip необходимо буферизовать элементы одного Observable, чтобы почтить его другим Observable. Это этот буфер, который должен генерировать ваше исключение. (См here)

Чтобы решить проблему, я думаю, вы должны попробовать, чтобы добавить конфигурацию противодавления на одном (или на каждом) Observable используется в операторе zip.

пример:

obs.zipWith(Observable.interval(150, TimeUnit.MILLISECONDS).onBackPressureDrop()); 

obs.onBackPressureBlock().zipWith(Observable.interval(150, TimeUnit.MILLISECONDS)); 
+0

Я пробовал это.Еще одна вещь – vedant1811

+0

Вы оба пытались? Из вашего описания трудно понять, какой из них слишком быстр. – zsxwing

2

Ответ выше от @dwursteisen и @zsxwing правильно.

Оператор интервала является тем, который испускает по времени и, таким образом, является «горячим» и не поддерживает противодавление. Таким образом, он будет продолжать излучать и заполнять внутренний ограниченный буфер zip, который вызывает исключение MissingBackpressureException.

При работе с «горячим» источником (например, в зависимости от времени или пользовательских событий) вы должны выбрать стратегию, как бороться с переполнением.

В этом случае вам нужно будет поместить эту стратегию на оператор interval.

Вот код, показывающий, что происходит, и варианты решения этого вопроса:

import java.util.concurrent.TimeUnit; 

import rx.Observable; 


public class ZipInterval { 

    public static void main(String... args) { 
     Observable<Long> slowHotSource = Observable.interval(1, TimeUnit.SECONDS); 

     /** This one is fast and hot so will cause a MissingBackpressureException. 
     * 
     * This is because a "hot" source based on time does not obey backpressure 
     * and keeps emitting regardless of what the downstream asks for. 
     * 
     * Examples of "hot" and "cold" and approaches to both can be found at: 
     * https://speakerdeck.com/benjchristensen/reactive-programming-with-rx-at-qconsf-2014?slide=90 and 
     * https://github.com/ReactiveX/RxJava/wiki/Backpressure 
     * */ 
     // Observable<Long> fastHotSource = Observable.interval(1, TimeUnit.MILLISECONDS); 

     /** 
     * The following version of 'fastHotSource' composes a simple flow control strategy. 
     */ 
     Observable<Long> fastHotSource = Observable.interval(1, TimeUnit.MILLISECONDS).onBackpressureDrop(); 

     Observable<String> zipped = Observable.zip(slowHotSource, fastHotSource, (s, f) -> { 
      return s + " " + f; 
     }); 

     // subscribe to the output 
     System.out.println("---- zip output"); 
     zipped.take(10).toBlocking().forEach(System.out::println); 

     /** 
     * The outcome of the above is probably not what is expected though. 
     * 
     * This is because zip will buffer the output and then `fastHotSource` will drop until 
     * the zip buffer asks for more. 
     * 
     * For temporal or "hot" sources like this, using `withLatestFrom` or `combineLatest` 
     * is often more appropriate than `zip`. 
     */ 

     Observable<String> latest = slowHotSource.withLatestFrom(fastHotSource, (s, f) -> { 
      return s + " " + f; 
     }); 

     // subscribe to the output 
     System.out.println("---- latest output"); 
     latest.take(10).toBlocking().forEach(System.out::println); 
    } 
} 

Выход этого:

---- zip output 
0 0 
1 1 
2 2 
3 3 
4 4 
5 5 
6 6 
7 7 
8 8 
9 9 
---- latest output 
0 1002 
1 2002 
2 3000 
3 4001 
4 5003 
5 6001 
6 7000 
7 8002 
8 9005 
9 10000 
0

Попробуйте использовать combineLatest потому, что комбинат Lastest не ждет новый значения для вызова onNext, он использует последние значения, когда новое значение поступает к функции