2016-06-30 5 views
1

У меня есть приложение, в котором пользователи могут получать уведомления по темам, на которые они подписываются. Последовательность действий следующее:Изменение потока RxJava во время его выполнения

  1. журналы пользователей в
  2. АРР, регистров на сервер уведомлений
  3. Пользователей выбирают подписаться/отписаться от различных тем

Я хочу, чтобы все сетевые запросы для сериализации. Если бы я знал, что при инициализации какие именно темы собираются быть подписаны/отписался, я мог бы написать поток, как показано ниже:

loginObservable.subscribeOn(Schedulers.io()) 
      .flatMap(user -> registerApp(appId)) 
      .flatMap(o -> subscribeToTopic("topic1")) 
      .flatMap(o -> unsubscribeFromTopic("topic2")) 
      .observeOn(AndroidSchedulers.mainThread()) 
      .subscribe() 

Дело в том, что пользователи могут подписаться/отказаться от подписки в любой точке жизненного цикла приложения, возможно, даже прежде чем регистрация будет успешной. Я мог бы сохранить список наблюдаемых и сериализовать все запросы вручную в onComplete(), но это не звучит очень Rx-ish. Есть ли шанс, что я смогу сделать это более кратким образом? Что-то вроде:

observable = loginObservable.subscribeOn(Schedulers.io()) 
      .flatMap(user -> registerApp(appId)) 
      .observeOn(AndroidSchedulers.mainThread()) 
      .subscribe() 

    //later 
    observable.flatMap(o -> subscribeToTopic("topic1")) 
      .subscribe() 

    //even later 
    observable.flatMap(o -> unsubscribeFromTopic("topic2")) 
      .subscribe() 

ответ

0

Что относительно использования doOnSubscribe? Где вы можете зарегистрировать свое приложение до публикации своих тем.

Посмотрите этот пример.

boolean onSubscribe = false; 

@Test 
public void observableDoOnSubscribe() { 
    String val = "test"; 
    Observable.just(val) 
       .doOnSubscribe(() -> onSubscribe = true) 
       .filter(s -> onSubscribe) 
       .subscribe(s -> System.out.printf(s)); 
} 

Вы можете увидеть больше примеров здесь https://github.com/politrons/reactive

0

Вы можете ввести новые темы в наблюдаемый через PublishSubject. Я не понял ваше использование switchMap в вашем примере, но такой подход может быть полезным для Вас:

PublishSubject<Topic> newTopics = PublishSubject.create(); 
Observable<Topic> topics = ...; 
newTopics 
    .mergeWith(topics) 
    .flatMap(topic -> subscribeToTopic(topic)) 
    ... 
    .subscribe(subscriber); 

взаимодействие UI:

newTopics.onNext(topic); 
+0

Нет необходимости 'switchMap' на самом деле, я просто редактировал вопрос заменить 'switchMap' на' flatMap' – mbonnin

+0

Относится ли это к взаимодействию пользовательского интерфейса, которое происходит до того, как приложение зарегистрировано в службе уведомлений? Я думал об использовании ReplaySubject. – mbonnin

Смежные вопросы