Немного контекста: В этой ссылке: https://github.com/ReactiveX/RxJava/issues/448 @ ben-lesh предложил выполнить ручную рекурсию для опроса с использованием Observables. Однако в последней версии RxJava нет OnSubscribeFunc
.Какова обновленная версия Manual Recursion для опроса с использованием Observables?
Вот это моя текущая реализация:
Observable.create(new Observable.OnSubscribe<Item>() {
@Override
public void call(final Subscriber<? super Item> innerSubscriber) {
Schedulers.io().createWorker()
.schedulePeriodically(new Action0() {
@Override
public void call() {
searchObservable()
.doOnNext(new Action1<Item>() {
@Override
public void call(Item item) {
innerSubscriber.onNext(item);
}
})
.doOnError(new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
if (throwable != null) {
innerSubscriber.onError(throwable);
}
}
})
.doOnCompleted(new Action0() {
@Override
public void call() {
innerSubscriber.onCompleted();
}
}).subscribe(); // Set subscriber?
}
}, initialDelay, pollingInterval, TimeUnit.MINUTES);
}
})
.subscribeOn(Schedulers.io()) // performs networking on background thread
.observeOn(observeOnScheduler) // sends notifications to another Scheduler, usually the UI thread
.subscribe(subscriber); // The subscriber
searchObservable
выполняет запрос на обслуживание. Это нормально работает в первом запуске, то есть данные передаются на subscriber
. Однако после ожидания pollingInterval
данные возвращаются и выполняется doOnNext
, но данные не передаются в пользовательский интерфейс. Нужно ли устанавливать какой-либо абонент в Action
, который принимает schedulePeriodically
?