Интересно, является ли это законно называть unsubscribe
внутри onNext
обработчика так:RxJava: вызов отписки изнутри onNext
List<Integer> gatheredItems = new ArrayList<>();
Subscriber<Integer> subscriber = new Subscriber<Integer>() {
public void onNext(Integer item) {
gatheredItems.add(item);
if (item == 3) {
unsubscribe();
}
}
public void onCompleted() {
// noop
}
public void onError(Throwable sourceError) {
// noop
}
};
Observable<Integer> source = Observable.range(0,100);
source.subscribe(subscriber);
sleep(1000);
System.out.println(gatheredItems);
Приведенный выше код правильно выводит которые были собраны только четыре элемента: [0, 1, 2, 3]
. Но если кто-то изменит источник, наблюдаемый для кеширования:
Observable<Integer> source = Observable.range(0,100).cache();
Затем собрано все сто элементов. У меня нет контроля над наблюдаемым источником (независимо от того, кэшируется он или нет), поэтому как определенно отказаться от подписки с onNext
?
ОТВЕТСТВЕННОСТЬ: Так отписывается в пределах onNext
неправильной вещи?
(Мой фактическое использование дело в том, что в onNext
я на самом деле пишу в выходной поток, и когда IOException
происходит ничего больше не может быть записан на выход, поэтому мне нужно как-то остановить дальнейшую обработку.)
Этот код не работает должным образом, он печатает сто раз «Генерация», а затем сто раз «Сохранение». Я бы предпочел «Экономить» только четыре раза. –
Я запускаю свой пример, прежде чем я разместил его, чтобы он печатал по 3 строки. Вы пробовали это дословно или вы пытались вставить его в свой поток? – akarnokd
Я использовал ваш пример дословно, просто добавил основной и импорт. Я запускал его на javarx 1.0.12 на двух разных компьютерах, и он всегда печатает «Сохранение» сто раз. –