3

Немного контекста: В этой ссылке: https://github.com/ReactiveX/RxJava/issues/448 @ ben-lesh предложил выполнить ручную рекурсию для опроса с использованием Observables. Однако в последней версии RxJava нет OnSubscribeFunc.Какова обновленная версия Manual Recursion для опроса с использованием Observables?

Вот это моя текущая реализация:

Observable.create(new Observable.OnSubscribe<Item>() { 
     @Override 
     public void call(final Subscriber<? super Item> innerSubscriber) { 

      Schedulers.io().createWorker() 
         .schedulePeriodically(new Action0() { 
          @Override 
          public void call() { 
           searchObservable() 
             .doOnNext(new Action1<Item>() { 
              @Override 
              public void call(Item item) { 
               innerSubscriber.onNext(item); 
              } 
             }) 
             .doOnError(new Action1<Throwable>() { 
              @Override 
              public void call(Throwable throwable) { 
               if (throwable != null) { 
                innerSubscriber.onError(throwable); 
               } 
              } 
             }) 
             .doOnCompleted(new Action0() { 
              @Override 
              public void call() { 
               innerSubscriber.onCompleted(); 
              } 
             }).subscribe(); // Set subscriber? 
          } 
         }, initialDelay, pollingInterval, TimeUnit.MINUTES); 
     } 
    }) 
      .subscribeOn(Schedulers.io()) // performs networking on background thread 
      .observeOn(observeOnScheduler) // sends notifications to another Scheduler, usually the UI thread 
      .subscribe(subscriber); // The subscriber 

searchObservable выполняет запрос на обслуживание. Это нормально работает в первом запуске, то есть данные передаются на subscriber. Однако после ожидания pollingInterval данные возвращаются и выполняется doOnNext, но данные не передаются в пользовательский интерфейс. Нужно ли устанавливать какой-либо абонент в Action, который принимает schedulePeriodically?

ответ

0

Это то, что работает хорошо для меня и следует парадигме, что Руководство Рекурсия:

public void manualRecursionPollingStrategy(Subscriber<Item> subscriber, Scheduler observeOnScheduler, long initialDelay, long pollingInterval) { 
      Observable.create(new Observable.OnSubscribe<Item>() { 
       @Override 
       public void call(final Subscriber<? super Item> innerSubscriber) { 
        Schedulers.newThread().createWorker() 
           .schedulePeriodically(new Action0() { 
            @Override 
            public void call() { 
               searchByHashtagObservable() 
                 .subscribeOn(Schedulers.io()) // performs networking on background thread 
                 .observeOn(observeOnScheduler) // sends notifications to a Scheduler, usually the 
                   // UI thread 
                 .subscribe(
                   new Action1<Item>() { 
                    @Override 
                    public void call(Item item) { 
                     subscriber.onNext(item); 
                    } 
                   }, 
                   new Action1<Throwable>() { 
                    @Override 
                    public void call(Throwable throwable) { 
                     if (throwable != null) { 
                      subscriber.onError(throwable); 
                     } 
                    } 
                   }, 
                   new Action0() { 
                    @Override 
                    public void call() { 
                     subscriber.onCompleted(); 
                    } 
                   } 
                ) 
            } 
           }, initialDelay, pollingInterval, TimeUnit.MINUTES); 
       } 
      }) 
       .observeOn(observeOnScheduler) // sends notifications to a Scheduler, usually the UI thread 
       .subscribe(subscriber); 

Пожалуйста, обратите внимание, что я присоединяюсь к searchByHashtagObservable() и называют onNext, onError и onCompletedSubscriber прошел как параметр.

Спасибо!

0

Он останавливается, потому что вы вызываете innerSubscriber.onCompleted, который завершает последовательность при первом запуске. Есть стандартные операторы, которые вы можете получить тот же эффект, без необходимости создания пользовательского Observable:

Observable.interval(initialDelay, pollingInterval, TimeUnit.MINUTES, Schedulers.io()) 
.onBackpressureBuffer() 
.concatMap(v -> searchObservable()) 
.observeOn(AndroidSchedulers.mainThread()) 
.subscribe(subscriber); 

(Примечание: нет необходимости в subscribeOn() здесь, потому что интервал будет излучать на Schedulers.io() в любом случае.)

Смежные вопросы