2016-02-16 4 views
2

Что я хочу достичь: Я хочу опросить некоторые ресурсы из Интернета путем опроса каждые 5 минут, но только при наличии подписчика на наблюдателя. Я использую BehaviorSubject и наблюдаемый интервал для объединения. Мне удалось реализовать его, но я новичок в Rx, и я думаю, что это можно сделать лучше.Смарт-опрос с использованием RxAndroid

Это, как я сделал это:

private BehaviorSubject<String> observable; 
private Subscription> subscription; 

public Subscription subscribe(final Action1<String> action) { 
    if (observable == null) { 
     observable = BehaviorSubject.create(); 
    } 
    if (subscription == null) { 
     Observable<String> source = Observable.interval(5, TimeUnit.MINUTES).map(new Func1<Long, String>() { 
      @Override 
      public String call(Long aLong) { 
       return getDataFromServer(); 
      } 
     }).observeOn(AndroidSchedulers.mainThread()); 
     subscription = source.subscribe(new Action1<String>() { 

      @Override 
      public void call(String s) { 
       if (observable.hasObservers()) { 
        observable.onNext(s); 
       } else { 
        subscription.unsubscribe(); 
        subscription = null; 
       } 
      } 
     }); 
    } 
    return observable.subscribe(action); 
} 

Идея: - У меня есть наблюдаемый источник для голосования и другого наблюдаемым, к которому клиенты могут подписаться (реализованы с использованием BehaviourSubject - так они всегда получают последние данные). Когда источник наблюдаемого испускает что-то, если у behavioursubject есть подписчики, он передается, иначе ничего не передается и я отписываюсь от источника, чтобы он остановился.

+0

Посмотрите, как я сделал опрос здесь: http://orbyt.github.io/RxCookbook/. после оператора '.interval()' я использовал оператор '.takeWhile (boolean)' для его включения и выключения. Не уверен, что он подходит для того, что вы пытаетесь выполнить, но дайте мне знать, если это поможет. – Orbit

+0

i ca'nt найти оператор takeWhile в вашем примере – amarkovits

+0

Thats, потому что его там нет, я перечислил, как использовать оператор '.takeWhile()' в своем комментарии. – Orbit

ответ

5

насчет:

Observable<String> observable = Observable.interval(0, 5, TimeUnit.SECONDS) 
    .doOnNext(new LoggingAction1<Long>("doOnNext")) 
    .flatMap(new Func1<Long, Observable<String>>() { 
     @Override 
     public Observable<String> call(Long pulse) { 
      return Observable.just(String.format("Request %d", pulse)); 
     } 
    }) 
    .replay(1) 
    .refCount(); 

Я думаю, что он делает все, что вы хотите, чтобы ваши настройки, чтобы сделать:

  • Пока нет Subscribers он ничего не делает.
  • Когда подписывается первый Subscriber, запускается interval и выдает одно значение сразу, а затем один раз в 5 секунд.
  • Новый Subscriber получит последний товар сразу, а затем все последующие.
  • Будет запущен только один interval - и поэтому только один сетевой запрос будет выполняться каждые 5 секунд - независимо от того, сколько Subscribers есть.
  • Когда все Subscribers отказались от подписки, interval перестанет испускать элементы.
+0

отлично подходит для моего сценария. – amarkovits

Смежные вопросы