Что я хочу достичь: Я хочу опросить некоторые ресурсы из Интернета путем опроса каждые 5 минут, но только при наличии подписчика на наблюдателя. Я использую BehaviorSubject и наблюдаемый интервал для объединения. Мне удалось реализовать его, но я новичок в Rx, и я думаю, что это можно сделать лучше.Смарт-опрос с использованием RxAndroid
Это, как я сделал это:
private BehaviorSubject<String> observable;
private Subscription> subscription;
public Subscription subscribe(final Action1<String> action) {
if (observable == null) {
observable = BehaviorSubject.create();
}
if (subscription == null) {
Observable<String> source = Observable.interval(5, TimeUnit.MINUTES).map(new Func1<Long, String>() {
@Override
public String call(Long aLong) {
return getDataFromServer();
}
}).observeOn(AndroidSchedulers.mainThread());
subscription = source.subscribe(new Action1<String>() {
@Override
public void call(String s) {
if (observable.hasObservers()) {
observable.onNext(s);
} else {
subscription.unsubscribe();
subscription = null;
}
}
});
}
return observable.subscribe(action);
}
Идея: - У меня есть наблюдаемый источник для голосования и другого наблюдаемым, к которому клиенты могут подписаться (реализованы с использованием BehaviourSubject - так они всегда получают последние данные). Когда источник наблюдаемого испускает что-то, если у behavioursubject есть подписчики, он передается, иначе ничего не передается и я отписываюсь от источника, чтобы он остановился.
Посмотрите, как я сделал опрос здесь: http://orbyt.github.io/RxCookbook/. после оператора '.interval()' я использовал оператор '.takeWhile (boolean)' для его включения и выключения. Не уверен, что он подходит для того, что вы пытаетесь выполнить, но дайте мне знать, если это поможет. – Orbit
i ca'nt найти оператор takeWhile в вашем примере – amarkovits
Thats, потому что его там нет, я перечислил, как использовать оператор '.takeWhile()' в своем комментарии. – Orbit