Я хочу следующего Поведение:RXJava - буфер наблюдаемого 1 до наблюдаемой 2 испускает один деталь
observableMain
должен буфера всех элементов до тех пор, пока observableResumed
излучает значение. Затем observableMain
должен излучать все буферизацией и все значения функций ...
Что делать в activity's
onCreate
:
PublishSubject<T> subject = ...; // I create a subject to emit items to and to subscribe to
// 1) I create a main observable from my subject
final Observable<T> observableMain = subject
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
// 2) I use a custom base class in which I can register listeners
// for the onResume event and which I can query the isResumed state!
// I call the object the pauseResumeProvider!
// 2.1) I create an observable, it emits a value ONLY if my activity is resumed
final Observable<Boolean> obsIsResumed = Observable
.defer(() -> Observable.just(pauseResumeProvider.isActivityResumed()))
.skipWhile(aBoolean -> aBoolean != true);
// 2.2) I create a second observable, it emits a value as soon as my activity is resumed
final Observable<Boolean> obsOnResumed = Observable.create(new Observable.OnSubscribe<Boolean>()
{
@Override
public void call(final Subscriber<? super Boolean> subscriber)
{
pauseResumeProvider.addPauseResumeListener(new IPauseResumeListener() {
@Override
public void onResume() {
pauseResumeProvider.removePauseResumeListener(this);
subscriber.onNext(true);
subscriber.onCompleted();
}
@Override
public void onPause() {
}
});
}
});
// 2.3) I combine the resumed observables and only emit the FIRST value I can get
final Observable<Boolean> observableResumed = Observable
.concat(obsIsResumed, obsOnResumed)
.first();
// 3) here I'm stuck
// 3 - Variant 1:
Observable<T> observable = observableMain
.buffer(observableResumed)
.concatMap(values -> Observable.from(values));
// 3 - Variant 2:
// Observable<T> observable = observableMain.delay(t -> observableResumed);
// 4) I emit events to my my subject...
// this event is LOST!
subject.onNext("Test in onCreate");
Проблема
Все элементы, которые посылают к subject
после activity
будут возобновлены, все элементы до этого будут потеряны (по крайней мере, с решением delay
). Я не могу заставить мое желание работать. Как я могу это правильно решить?
Это кажется излишне сложным. Почему существует требование о том, чтобы значения только испускались, когда ваша деятельность возобновляется? – JohnWowUs
Я хочу повторно использовать один наблюдаемый в своей деятельности, и я хочу обновить пользовательский интерфейс из фоновых задач, поэтому я хочу контролировать, когда элементы испускаются (я не хочу, чтобы пользовательский интерфейс был обновлен, если активность не возобновлена, и я также не хотите терять события обновления). Я хочу заменить существующую систему eventbus + queue, которую я использую довольно долгое время с помощью решения RXJava ... – prom85