2014-09-03 2 views
0

У меня есть следующая цепочка наблюдаемых, A-> B, где A создает запись в БД и B завершает работу в rabbitMQ. То, что я хочу сделать, это иметь третий, C, который работает тогда и только тогда, когда B выдает исключение (он должен удалить/вернуть данные, введенные в БД). Затем исключение должно быть передано вызывающему. Я использую onErrorResumeNext, который, похоже, работает, только если у меня есть явный (пустой) подписчик на нем. Примечание: делегат в коде ниже возвращает A и queueSender возвращает B. Правильно ли это, или есть лучший способ сделать это?Как использовать Observable.onErrorResumeNext для выполнения ветвления и распространять исключение

public Observable<Long> create(final Message m) { 
    return delegate.create(m).flatMap(new Func1<Long, Observable<Long>>() { 
     @Override 
     public Observable<Long> call(final Long aLong) { 
      Observable<Void> observableB = queueSender.observe(aLong) 

      observableB.onErrorResumeNext(new Func1<Throwable, Observable<Void>>() { 
       @Override 
       public Observable<Void> call(Throwable throwable) { 
        delegate.delete(aLong).toBlockingObservable().single(); 
        return null; 
       } 
      }).subscribe(new VoidSubscriber()); 

      return timelineObservable.map(new Func1<Void, Long>() { 
       @Override 
       public Long call(Void aVoid) { 
        return aLong; 
       } 
      }); 
     } 
    }); 
} 

ответ

0

Простым способом избавиться от VoidSubscriber было бы zip observableB с вашей временной шкалой. Так как onErrorResumeNext() проглотит ошибку, вам также необходимо будет изменить onErrorResumeNext() только на doOnError() для распространения ошибки. Вот модифицированный код:

public Observable<Long> createOld(final Message m) { 
    return delegate.create(m).flatMap(new Func1<Long, Observable<Long>>() { 
     @Override 
     public Observable<Long> call(final Long aLong) { 
      final Observable<Void> observableB = 
        queueSender 
          .observe(aLong) 
          .doOnError(
            new Action1<Throwable>() { 
             @Override 
             public void call(Throwable throwable) { 
              delegate.delete(aLong).toBlockingObservable().single(); 
             } 
            } 
          ); 

      final Observable<Long> observableTimeline = 
        timelineObservable 
          .map(new Func1<Void, Long>() { 
           @Override 
           public Long call(Void aVoid) { 
            return aLong; 
           } 
          }); 

      return Observable 
        .zip(
          observableB, 
          observableTimeline, 
          new Func2<Void, Long, Long>() { 
           @Override 
           public Long call(Void aVoid, Long aLong) { 
            return aLong; 
           } 
          } 
        ); 
     } 
    }); 
} 
Смежные вопросы