У меня есть следующая цепочка наблюдаемых, A-> B, где A создает запись в БД и B завершает работу в rabbitMQ. То, что я хочу сделать, это иметь третий, C, который работает тогда и только тогда, когда B выдает исключение (он должен удалить/вернуть данные, введенные в БД). Затем исключение должно быть передано вызывающему. Я использую onErrorResumeNext, который, похоже, работает, только если у меня есть явный (пустой) подписчик на нем. Примечание: делегат в коде ниже возвращает A и queueSender возвращает B. Правильно ли это, или есть лучший способ сделать это?Как использовать Observable.onErrorResumeNext для выполнения ветвления и распространять исключение
public Observable<Long> create(final Message m) {
return delegate.create(m).flatMap(new Func1<Long, Observable<Long>>() {
@Override
public Observable<Long> call(final Long aLong) {
Observable<Void> observableB = queueSender.observe(aLong)
observableB.onErrorResumeNext(new Func1<Throwable, Observable<Void>>() {
@Override
public Observable<Void> call(Throwable throwable) {
delegate.delete(aLong).toBlockingObservable().single();
return null;
}
}).subscribe(new VoidSubscriber());
return timelineObservable.map(new Func1<Void, Long>() {
@Override
public Long call(Void aVoid) {
return aLong;
}
});
}
});
}