2015-06-24 6 views
10

Интересно, является ли это законно называть unsubscribe внутри onNext обработчика так:RxJava: вызов отписки изнутри onNext

List<Integer> gatheredItems = new ArrayList<>(); 

Subscriber<Integer> subscriber = new Subscriber<Integer>() { 
    public void onNext(Integer item) { 
     gatheredItems.add(item); 
     if (item == 3) { 
      unsubscribe(); 
     } 
    } 
    public void onCompleted() { 
     // noop 
    } 
    public void onError(Throwable sourceError) { 
     // noop 
    } 
}; 

Observable<Integer> source = Observable.range(0,100); 

source.subscribe(subscriber); 
sleep(1000); 
System.out.println(gatheredItems); 

Приведенный выше код правильно выводит которые были собраны только четыре элемента: [0, 1, 2, 3]. Но если кто-то изменит источник, наблюдаемый для кеширования:

Observable<Integer> source = Observable.range(0,100).cache(); 

Затем собрано все сто элементов. У меня нет контроля над наблюдаемым источником (независимо от того, кэшируется он или нет), поэтому как определенно отказаться от подписки с onNext?

ОТВЕТСТВЕННОСТЬ: Так отписывается в пределах onNext неправильной вещи?

(Мой фактическое использование дело в том, что в onNext я на самом деле пишу в выходной поток, и когда IOException происходит ничего больше не может быть записан на выход, поэтому мне нужно как-то остановить дальнейшую обработку.)

ответ

3

cache() кэширует все, как только первый абонент приходит, и он не дает никаких опций, чтобы остановить его из нисходящего потока. Вы должны остановить поток до кэша(), чтобы избежать слишком много удержания:

Observable<Integer> source = Observable.range(1, 100); 

PublishSubject<Integer> stop = PublishSubject.create(); 

source 
.doOnNext(v -> System.out.println("Generating " + v)) 
.takeUntil(stop) 
.cache() 
.doOnNext(new Action1<Integer>() { 
    int calls; 
    @Override 
    public void call(Integer t) { 
     System.out.println("Saving " + t); 
     if (++calls == 3) { 
      stop.onNext(1); 
     } 
    } 
}) 
.subscribe(); 

Edit: приведенный выше пример не работает ниже 1.0.13, так вот версия, которая должна:

SerialSubscription ssub = new SerialSubscription(); 

ConnectableObservable<Integer> co = source 
     .doOnNext(v -> System.out.println("Generating " + v)) 
     .replay(); 

co.doOnNext(v -> { 
    System.out.println("Saving " + v); 
    if (v == 3) { 
     ssub.unsubscribe(); 
    } 
}) 
.subscribe(); 

co.connect(v -> ssub.set(v)); 
+0

Этот код не работает должным образом, он печатает сто раз «Генерация», а затем сто раз «Сохранение». Я бы предпочел «Экономить» только четыре раза. –

+0

Я запускаю свой пример, прежде чем я разместил его, чтобы он печатал по 3 строки. Вы пробовали это дословно или вы пытались вставить его в свой поток? – akarnokd

+0

Я использовал ваш пример дословно, просто добавил основной и импорт. Я запускал его на javarx 1.0.12 на двух разных компьютерах, и он всегда печатает «Сохранение» сто раз. –

Смежные вопросы