2016-06-09 3 views
4

Я использую RxJava, и мне нужно сделать 2 вещи:Rx: Как получить последний элемент, даже если onError был вызван?

  • Получить последний элемент, испускаемый из Observable
  • Определить, если onError был вызван, по сравнению с onCompleted

Я ve посмотрел на использование last и lastOrDefault (что на самом деле является поведением, которое мне нужно), но я не смог обойти onError, скрывая последний элемент. Я бы ОК с наблюдаемым существом используется дважды, один раз, чтобы получить значение last и один раз, чтобы получить статус завершения, но до сих пор я только был в состоянии сделать это, создав свой собственный Observer:

public class CacheLastObserver<T> implements Observer<T> { 

    private final AtomicReference<T> lastMessageReceived = new AtomicReference<>(); 
    private final AtomicReference<Throwable> error = new AtomicReference<>(); 

    @Override 
    public void onCompleted() { 
     // Do nothing 
    } 

    @Override 
    public void onError(Throwable e) { 
     error.set(e); 
    } 

    @Override 
    public void onNext(T message) { 
     lastMessageReceived.set(message); 
    } 

    public Optional<T> getLastMessageReceived() { 
     return Optional.ofNullable(lastMessageReceived.get()); 
    } 

    public Optional<Throwable> getError() { 
     return Optional.ofNullable(error.get()); 
    } 
} 

У меня нет проблем с созданием моего собственного Observer, но похоже, что Rx должен быть лучше в состоянии удовлетворить этот случай использования «получить последний элемент, выпущенный до завершения». Любые идеи о том, как это сделать?

ответ

3

Попробуйте это:

source.materialize().buffer(2).last() 

В случае ошибки последней эмиссии будет список из двух элементов являются последним значением, излучаемое завернутым как Notification и уведомление об ошибках. Без ошибки второй элемент будет уведомлением о завершении.

Обратите также внимание, что если значение не выбрано, то результатом будет список одного элемента, являющегося уведомлением о терминале.

+0

Есть ли способ, чтобы получить последнюю излучаемую деталь, не последние получила один, это хороший подход, хотя для получения последнего элемента на вызове onNext, но не из эмиссии исходного Observable. (т. е. испускание, 1,2,3, сбой перед 4, тогда у меня должен быть сбой и пункт 4, в настоящее время вы получите 3 с помощью этого метода). – desgraci

+0

для источника, испускающего 1,2,3, ошибка, 4 против контракта Observable и, следовательно, не поддерживается RxJava. Вы должны были бы подавить выброс ошибки и испустить ее как onNext излучения, чтобы это произошло. –

+0

Это не работает с наблюдаемыми с нечетным количеством элементов (полное или оповещение об ошибке указано в одном списке). Даже 'source.materialize(). Buffer (2, 1) .last()' не решает, потому что последний буфер будет иметь длину = 1. – maurocchi

0

я использовал этот подход, чтобы решить вашу проблему.

public class ExampleUnitTest { 
    @Test 
    public void testSample() throws Exception { 
     Observable.just(1, 2, 3, 4, 5) 
       .map(number -> { 
        if (number == 4) 
         throw new NullPointerException(); 
        else 
         return number; 
       }) 
       .onErrorResumeNext(t -> Observable.empty()) 
       .lastOrDefault(15) 
       .subscribe(lastEmittedNumber -> System.out.println("onNext: " + lastEmittedNumber)); 
    } 
} 

Он будет излучать onNext: 3

Надеются, что это помогает.

0

Я решил с:

source.materialize().withPrevious().last() 

где withPrevious есть (Котлин):

fun <T> Observable<T>.withPrevious(): Observable<Pair<T?, T>> = 
    this.scan(Pair<T?, T?>(null, null)) { previous, current -> Pair(previous.second, current) } 
     .skip(1) 
     .map { it as Pair<T?, T> } 
Смежные вопросы