2016-10-09 4 views
0

У меня есть центральный класс, назовите его Central. Он может добавить к нему 1-N наблюдаемых. Этим я должен добавить динамически, а затем узнать, когда был выполнен окончательный onComplete().RxJava: Динамический набор наблюдаемых

Как это сделать?

Пример кода:

public class Central { 
    public void addObservable(Observable o){ 
    // Add the new observable to subscriptions 
    } 
} 

Update.

Я уже общался с этим. Используя ответ @ DaveMoten, я подошел.

Вот мой метод, чтобы добавить новый наблюдаемым по желанию, и получать уведомления, когда все они закончили (псевдо-код):

class central { 
    PublishSubject<Observable<T>> subject = PublishSubject.create(); 
    int commandCount = 0; 
    int concatCount = 0; 
    addObservable(Observable newObs){ 
    commandCount++; 
    concatCount++; 
    subject.concatMap(o -> 
      o.doOnCompleted(
       () -> { 
        concatCount--; 
        LOG.warn("inner completed, concatCount: " + concatCount); 
        }) 
     ).doOnNext(System.out::println) 
      .doOnCompleted(() -> { 
       System.out.println("OUTER completed"); 
      }) .subscribe(); 

    onNext(newObs); 
    subject.onCompleted(); // If this is here, it ends prematurely (after first onComplete) 
    // if its not, OUTER onComplete never happens 

    newObs.subscribe((e) ->{}, 
    (error) -> {}, 
    () -> { 
     commandCount--; 
     LOG.warn("inner end: commandCount: " + commandCount); 
    }); 
    } 
} 
// ... somewhere else in the app: 
Observable ob1 = Observable.just(t1); 
addObservable.addObservable(ob1); 
// ... and possibly somewhere else: 
Observable ob2 = Observable.just(t2, t3); 
addObservable(ob2); 

// Now, possibly somewhere else: 
ob1.onNext(1); 
// Another place: 
ob2.onNext(2); 

Каротаж выглядит так:

19:51:16.248 [main] WARN execute: commandCount: 1 
19:51:17.340 [main] WARN execute: commandCount: 2 
19:51:23.290 [main] WARN execute: commandCount: 3 
9:51:26.969 [main] WARN inner completed, concatCount: 2 
19:51:27.004 [main] WARN inner end: commandCount: 2 
19:51:27.008 [main] WARN inner completed, concatCount: 1 
19:51:27.009 [main] WARN inner end: commandCount: 1 
19:51:51.745 [ProcessIoCompletion0] WARN inner completed, concatCount: 0 
19:51:51.750 [ProcessIoCompletion0] WARN inner completed, concatCount: -1 
19:51:51.751 [ProcessIoCompletion0] WARN inner end: commandCount: 0 
19:51:51.752 [ProcessIoCompletion0] WARN inner completed, concatCount: -2 
19:51:51.753 [ProcessIoCompletion0] WARN inner completed, concatCount: -3 

UPDATE: Я добавил несколько счетчиков, которые демонстрируют, что я не понимаю, что происходит с concatMap. Вы можете видеть, что лямбда-подписка на самом мониторе отсчитывается до нуля правильно, но concatMap oncomplete снижается до -3! И OUTER закончен никогда не бывает.

+0

Когда вы говорите, что преждевременно вы имеете в виду? Я не вижу проблемы. Я проверил тест, и он напечатал «1, внутренний завершен, 2, 3, внутренний завершен, внешний завершен». Это всего лишь билет. –

+0

@DaveMoten Проблема заключается в методе с неопределенным количеством новых наблюдаемых приходов. Я не могу так называть onComplete, потому что он заставляет объект заканчиваться, даже если новые наблюдаемые были добавлены. Обновленный пример, чтобы попытаться сделать более понятным. – mtyson

+0

Все в порядке. Это нормально никогда не называть 'subject.onCompleted()'! –

ответ

2

Используйте PublishSubject:

PublishSubject<Observable<T>> subject = 
    PublishSubject.create(); 
subject 
    // protect against concurrent calls to subject (optional) 
    .serialize() 
    .concatMap(o -> 
     o.doOnCompleted(() -> System.out.println("inner completed"))) 
    .doOnNext(System.out::println) 
    .doOnCompleted(() -> System.out.println("completed")) 
    .subscribe(subscriber); 

subject.onNext(Observable.just(t1)); 
subject.onNext(Observable.just(t2, t3)); 
subject.onCompleted(); 

    . 
+0

Это почти делает то, что я ищу ... эта часть, которая не есть, 'doOnCompleted' срабатывает только при ручном вызове' subject.onCompleted() '. Я хочу получить onComplete, когда исходные наблюдаемые уволили onComplete. – mtyson

+0

Я могу почти добраться туда с помощью 'observable.doOnCompleted (subject :: onCompleted);', но это работает только для одного наблюдаемого источника. Как это сделать для нескольких источников? – mtyson

+0

Легко сделать, добавив 'doOnCompleted' в' o' внутри 'concatMap'. Я изменил ответ, чтобы отразить это. –

Смежные вопросы