2016-04-25 7 views
4

Я хочу следующего Поведение:RXJava - буфер наблюдаемого 1 до наблюдаемой 2 испускает один деталь

observableMain должен буфера всех элементов до тех пор, пока observableResumed излучает значение. Затем observableMain должен излучать все буферизацией и все значения функций ...

Что делать в activity'sonCreate:

PublishSubject<T> subject = ...; // I create a subject to emit items to and to subscribe to 

// 1) I create a main observable from my subject 
final Observable<T> observableMain = subject 
      .subscribeOn(Schedulers.io()) 
      .observeOn(AndroidSchedulers.mainThread()); 

// 2) I use a custom base class in which I can register listeners 
// for the onResume event and which I can query the isResumed state! 
// I call the object the pauseResumeProvider! 

// 2.1) I create an observable, it emits a value ONLY if my activity is resumed 
final Observable<Boolean> obsIsResumed = Observable 
      .defer(() -> Observable.just(pauseResumeProvider.isActivityResumed())) 
      .skipWhile(aBoolean -> aBoolean != true); 

// 2.2) I create a second observable, it emits a value as soon as my activity is resumed 
final Observable<Boolean> obsOnResumed = Observable.create(new Observable.OnSubscribe<Boolean>() 
    { 
     @Override 
     public void call(final Subscriber<? super Boolean> subscriber) 
     { 
      pauseResumeProvider.addPauseResumeListener(new IPauseResumeListener() { 
       @Override 
       public void onResume() { 
        pauseResumeProvider.removePauseResumeListener(this); 
        subscriber.onNext(true); 
        subscriber.onCompleted(); 
       } 

       @Override 
       public void onPause() { 

       } 
      }); 
     } 
    }); 

// 2.3) I combine the resumed observables and only emit the FIRST value I can get 
final Observable<Boolean> observableResumed = Observable 
     .concat(obsIsResumed, obsOnResumed) 
     .first(); 

// 3) here I'm stuck 
// 3 - Variant 1: 
Observable<T> observable = observableMain 
      .buffer(observableResumed) 
      .concatMap(values -> Observable.from(values)); 
// 3 - Variant 2: 
// Observable<T> observable = observableMain.delay(t -> observableResumed); 

// 4) I emit events to my my subject... 
// this event is LOST! 
subject.onNext("Test in onCreate"); 

Проблема

Все элементы, которые посылают к subject после activity будут возобновлены, все элементы до этого будут потеряны (по крайней мере, с решением delay). Я не могу заставить мое желание работать. Как я могу это правильно решить?

+0

Это кажется излишне сложным. Почему существует требование о том, чтобы значения только испускались, когда ваша деятельность возобновляется? – JohnWowUs

+0

Я хочу повторно использовать один наблюдаемый в своей деятельности, и я хочу обновить пользовательский интерфейс из фоновых задач, поэтому я хочу контролировать, когда элементы испускаются (я не хочу, чтобы пользовательский интерфейс был обновлен, если активность не возобновлена, и я также не хотите терять события обновления). Я хочу заменить существующую систему eventbus + queue, которую я использую довольно долгое время с помощью решения RXJava ... – prom85

ответ

5

Попросите источник переименовать и использовать задержку подписки для активации реальной подписки.

PublishSubject<Integer> emitNow = PublishSubject.create(); 

ConnectableObservable<T> co = source.replay(); 

co.subscribe(); 

co.connect(); 

co.delaySubscription(emitNow).subscribe(...); 

emitNow.onNext(1); 

Edit:

Here is a gist с оператором вы можете lift в последовательности, которая может приостановить и возобновить эмиссию от вверх по течению.

+0

Это выглядит очень многообещающим. Будет проверять его немного больше. Вспоминая «ConnectableObservable» и отменив подписку на него, а также переадресацию на нее, вы должны сделать паузу/возобновить мою основную подписку и никогда не потерять испущенный элемент (это то, что я хочу в конце). – prom85

+0

Вы не можете приостановить эту настройку. Он будет работать в фоновом режиме, и как только вы будете сигнализировать emitNow, вы получите все данные. – akarnokd

+0

Я думаю, что отказ от подписки на 'co' и повторная подписка на него таким же образом будет работать как пауза? – prom85

0

Другой способ сделать это заключается в задержке выбросов источника (горячей) наблюдаемом до «клапан» открывает

source.delay(new Func1<Integer, Observable<Boolean>>() { 
    @Override 
    public Observable<Boolean> call(Integer integer) { 
     return valve.filter(new Func1<Boolean, Boolean>() { 
      @Override 
      public Boolean call(Boolean isOpen) { 
       //emits only when isOpen is true 
       return isOpen; 
      } 
     }); 
    } 
}) 
.subscribe(new Action1<Integer>() { 
    @Override 
    public void call(Integer integer) { 
     System.out.println("out: " + integer); 
    } 
}); 

GIST: Rx Valve

Смежные вопросы