2014-09-03 5 views
7

У меня есть два наблюдаемых (названных A и B для простоты) и один подписчик. Таким образом, Абонент подписывается на A, и если есть ошибка в A, тогда B (который является резервным) вступает. Теперь, когда A нажимает на ошибку, B получает штраф, однако A вызывает onComplete() для абонента, поэтому B-ответ никогда не достигает абонента, даже если выполнение B выполнено успешно.RxJava onErrorResumeNext()

Это нормальное поведение? Я думал, что onErrorResumeNext() должен продолжить поток и уведомить абонента после его завершения, как указано в документации (https://github.com/ReactiveX/RxJava/wiki/Error-Handling-Operators#onerrorresumenext).

Это общая структура того, что я делаю (опущено несколько «скучный» код):

public Observable<ModelA> observeGetAPI(){ 
    return retrofitAPI.getObservableAPI1() 
      .flatMap(observableApi1Response -> { 
       ModelA model = new ModelA(); 

       model.setApi1Response(observableApi1Response); 

       return retrofitAPI.getObservableAPI2() 
         .map(observableApi2Response -> { 
          // Blah blah blah... 
          return model; 
         }) 
         .onErrorResumeNext(observeGetAPIFallback(model)) 
         .subscribeOn(Schedulers.newThread()) 
      }) 
      .onErrorReturn(throwable -> { 
       // Blah blah blah... 
       return model; 
      }) 
      .subscribeOn(Schedulers.newThread()); 
} 

private Observable<ModelA> observeGetAPIFallback(ModelA model){ 
    return retrofitAPI.getObservableAPI3().map(observableApi3Response -> { 
     // Blah blah blah... 
     return model; 
    }).onErrorReturn(throwable -> { 
     // Blah blah blah... 
     return model; 
    }) 
    .subscribeOn(Schedulers.immediate()); 
} 

Subscription subscription; 
subscription = observeGetAPI.subscribe(ModelA -> { 
    // IF THERE'S AN ERROR WE NEVER GET B RESPONSE HERE... 
}, throwable ->{ 
    // WE NEVER GET HERE... onErrorResumeNext() 
}, 
() -> { // IN CASE OF AN ERROR WE GET STRAIGHT HERE, MEANWHILE, B GETS EXECUTED } 
); 

Любые идеи, что я делаю не так?

Спасибо!

EDIT: Вот приблизительный график того, что происходит:

---> HTTP GET REQUEST B 
<--- HTTP 200 REQUEST B RESPONSE (SUCCESS) 

---> HTTP GET REQUEST A 
<--- HTTP 200 REQUEST A RESPONSE (FAILURE!) 

---> HTTP GET FALLBACK A 
** onComplete() called! ---> Subscriber never gets fallback response since onComplete() gets called before time. 
<--- HTTP 200 FALLBACK A RESPONSE (SUCCESS) 

А вот ссылка на простой схеме я сделал, которые представляют собой то, что я хочу, чтобы это произошло: Diagram

+0

В вашей временной шкале отображается HTTP 200 для ответа об отказе. Есть ли другой способ, который вы сигнализируете об ошибке из getObservableAPI2()? Кроме того, можете ли вы указать, какие запросы API соответствуют выходу временной шкалы? Это похоже на getObservableAPI1-> REQUEST B, getObservableAPI2-> REQUEST A, getObservableAPI3-> FALLBACK A, но я просто хочу убедиться. – kjones

+0

Да, на самом деле, хотя ответ - 200, некоторые данные могут иметь значение null, поэтому я бросаю и ошибаюсь в этих сценариях. И да, это отношение времени к запросам, я отредактирую вопрос ASAP, чтобы соответствовать запросу временной шкалы как ваш. – mradzinski

+0

Ваша логика выглядит здорово. Вы должны получить ответную реакцию перед onComplete. Вы можете удалить все вызовы subscribeOn() и посмотреть, что произойдет. Они не должны быть необходимы, так как Retrofit выполняет запросы на свой собственный пул потоков в любом случае. – kjones

ответ

5

ПРМ звонки, используемые в следующем, должны имитировать то, что вы делаете с помощью Retrofit.

fallbackObservable = 
     Observable 
       .create(new Observable.OnSubscribe<String>() { 
        @Override 
        public void call(Subscriber<? super String> subscriber) { 
         logger.v("emitting A Fallback"); 
         subscriber.onNext("A Fallback"); 
         subscriber.onCompleted(); 
        } 
       }) 
       .delay(1, TimeUnit.SECONDS) 
       .onErrorReturn(new Func1<Throwable, String>() { 
        @Override 
        public String call(Throwable throwable) { 
         logger.v("emitting Fallback Error"); 
         return "Fallback Error"; 
        } 
       }) 
       .subscribeOn(Schedulers.immediate()); 

stringObservable = 
     Observable 
       .create(new Observable.OnSubscribe<String>() { 
        @Override 
        public void call(Subscriber<? super String> subscriber) { 
         logger.v("emitting B"); 
         subscriber.onNext("B"); 
         subscriber.onCompleted(); 
        } 
       }) 
       .delay(1, TimeUnit.SECONDS) 
       .flatMap(new Func1<String, Observable<String>>() { 
        @Override 
        public Observable<String> call(String s) { 
         logger.v("flatMapping B"); 
         return Observable 
           .create(new Observable.OnSubscribe<String>() { 
            @Override 
            public void call(Subscriber<? super String> subscriber) { 
             logger.v("emitting A"); 
             subscriber.onNext("A"); 
             subscriber.onCompleted(); 
            } 
           }) 
           .delay(1, TimeUnit.SECONDS) 
           .map(new Func1<String, String>() { 
            @Override 
            public String call(String s) { 
             logger.v("A completes but contains invalid data - throwing error"); 
             throw new NotImplementedException("YUCK!"); 
            } 
           }) 
           .onErrorResumeNext(fallbackObservable) 
           .subscribeOn(Schedulers.newThread()); 
        } 
       }) 
       .onErrorReturn(new Func1<Throwable, String>() { 
        @Override 
        public String call(Throwable throwable) { 
         logger.v("emitting Return Error"); 
         return "Return Error"; 
        } 
       }) 
       .subscribeOn(Schedulers.newThread()); 

subscription = stringObservable.subscribe(
     new Action1<String>() { 
      @Override 
      public void call(String s) { 
       logger.v("onNext " + s); 
      } 
     }, 
     new Action1<Throwable>() { 
      @Override 
      public void call(Throwable throwable) { 
       logger.v("onError"); 
      } 
     }, 
     new Action0() { 
      @Override 
      public void call() { 
       logger.v("onCompleted"); 
      } 
     }); 

Выход из заявления журнала является:

 
RxNewThreadScheduler-1 emitting B 
RxComputationThreadPool-1 flatMapping B 
RxNewThreadScheduler-2 emitting A 
RxComputationThreadPool-2 A completes but contains invalid data - throwing error 
RxComputationThreadPool-2 emitting A Fallback 
RxComputationThreadPool-1 onNext A Fallback 
RxComputationThreadPool-1 onCompleted 

Это кажется, что вы ищете, но может быть, я что-то не хватает.

Смежные вопросы