2015-12-03 6 views
0

У меня есть поток наблюдаемых, который в основном эмулирует диаграмму состояния. Например:Цитирование через наблюдаемый поток

whenThisHappens() 
    .switchMap(i -> { 
     if (i.isThisThing()) { 
      return whenThatHappens(); 
     } else { 
      return nothingHappened(); 
     } 
    } 
    .subscribe(thing -> { 
     if (thing.isThatThing()) { 
      log("Got that thing"); 
     } else { 
      log("Got nothing"); 
     } 
    }); 

Вопрос заключается в том, что я хочу, чтобы перебрать входа, пока какое-то событие не происходит (это в течение длительного работающей службы на Android). Сейчас я в состоянии сделать это, сохраняя наблюдаемым в переменной, подписавшись на него, а затем отписки и Повторная подписка на него в это onComplete

obs = whenThisHappens() 
    .switchMap(i -> { 
     if (i.isThisThing()) { 
      return whenThatHappens(); 
     } else { 
      return nothingHappened(); 
     } 
    } 
    .doOnNext(thing -> { 
     if (thing.isThatThing()) { 
      log("Got that thing"); 
     } else { 
      log("Got nothing"); 
     } 
    }) 
    .doOnComplete(i -> { 
     obs.unsubscribe(); 
     obs.subscribe(); 
    } 
    obs.subscribe(); 

Но я вроде чувствую, что я делаю что-то действительно неправильно Вот. Есть ли лучший способ сделать это? Я посмотрел на retry, но ошибки при отправке, чтобы повторить попытку, выглядят так же плохо, как и то, что я делаю сейчас.

+0

ли isThatThing() добытчиками? Если это так, сеттеры должны возбуждать события, которые уведомляют, что имущество изменилось. В .net у нас есть шаблон/интерфейс INotifyPropertyChanged. – Aron

+0

Не слишком хорошо знаком с этим рисунком. Я буду исследовать. Благодарю. –

+0

Извините, ваш вопрос мне непонятен. Вы говорите о диаграмме состояния, но ваш код ничего не показывает. Не могли бы вы включить мраморную диаграмму того, что вы хотите достичь? В любом случае диаграммы состояний обычно легко реализуются с помощью 'scan'. –

ответ

0

Чтения кода это выглядит, как вы хотите filter:

whenThisHappens() 
     # ignore uninteresting things 
     .filter(i -> i.isThisThing()) 
     # do stuff on interesting things 
     .subscribe(item -> log("Got: " + item.toString())); 

Есть еще два дополнительных пылкие этот основной subscribe которые являются on-error функции и on-complete функции, которую вы можете использовать, если вам нужны - но подписки здесь управляется автоматом.

+0

Эта часть не является проблемой. Диаграмма фактического состояния не так проста. Проблема ждет первого события после того, как я закончил. В вашем коде тоже, если в нем есть 'switchMap', как только мы получим элемент, поток завершится, потому что whenThisHappens наблюдаемые отписки и потоковые переключатели. Я хочу, чтобы он начинался с 'whenThisHappens()' снова, как только мы получим 'item', когда там есть' switchMap'. –

+0

Чтобы уточнить, когда это происходит только один раз в подписке из-за состояния. В реальной жизни это может быть что-то вроде события распознавания активности, в котором говорится, что пользователь работает. Как только пользователь работает, я больше не забочусь о запущенных событиях и начинаю искать другие события. Напр. Я переключаюсь на другое наблюдаемое, которое получает калорийность от данных датчика и уведомляет пользователя, как только они сжигали х калорий. Но как только я закончу, я буду заботиться о том, как в следующий раз я снова заработаю событие, но не раньше. Имеет ли это смысл? –

1

Я думаю, что вы пытаетесь сделать лучше, с PublishSubject или BehaviorSubject.

Поток будет публиковать предметы по этому вопросу, которые будут вызывать вашу подписку.

Вот поток событий класса я написал некоторое время назад:

public class SubjectEventStream implements IEventStream { 
    private final BehaviorSubject<IEvent> stream = BehaviorSubject.create(); 

    @Override 
    public void publish(Observable<IEvent> event) { 
     event.doOnNext(stream::onNext).subscribe(); 
    } 

    @Override 
    public Observable<IEvent> observe() { 
     return stream; 
    } 

    @Override 
    public <T> Observable<T> observe(Class<T> eventClass) { 
     return stream.ofType(eventClass); 
    } 
} 

Смотреть еще некоторую информацию здесь:

http://reactivex.io/documentation/subject.html

http://akarnokd.blogspot.com/2015/06/subjects-part-1.html

Смежные вопросы