2015-07-02 3 views
0

Основываясь на хорошую схему из официальной документации, combineLatest http://reactivex.io/documentation/operators/combinelatest.htmlRxJava Наблюдаемого combineLatest сброс состояния

Интересно, если есть способ, скажем, когда я получил «C» из второго Observable, я хотел бы, чтобы очистить состояний внутри первых и вторых наблюдаемых. Так что, когда «D» прибывает со второго Observable, он больше не будет сочетаться с «2» от первого Observable, но дождитесь, пока «3» не прибудет.

ответ

1

Решение вашей конкретной проблемы заключается в публикации каждого источника, не принимают до порогового значения, а затем повторно подписать вашу цепь источников:

PublishSubject<Integer> a = PublishSubject.create(); 
PublishSubject<String> b = PublishSubject.create(); 

ConnectableObservable<Integer> ap = a.publish(); 
ConnectableObservable<String> bp = b.publish(); 
Observable.combineLatest(
     ap, bp, 
     Pair::of 
) 
.takeUntil(p -> "C".equals(p.second)) 
.repeat(2) 
.subscribe(System.out::println) 
; 

ap.connect(); 
bp.connect(); 

a.onNext(1); 
b.onNext("A"); 
a.onNext(2); 
b.onNext("B"); 
b.onNext("C"); 

a.onNext(3); 
b.onNext("D"); 

Таким образом, вы завершить первый «сеанс» комбинации и запустите новый.

+0

Я не понимаю использование 'repeat (2)' здесь. Он работает только для двух «С», но не для трех, не так ли? Не лучше ли (или более подходит) просто использовать 'repeat()'? –

Смежные вопросы