У меня есть центральный класс, назовите его Central
. Он может добавить к нему 1-N наблюдаемых. Этим я должен добавить динамически, а затем узнать, когда был выполнен окончательный onComplete().RxJava: Динамический набор наблюдаемых
Как это сделать?
Пример кода:
public class Central {
public void addObservable(Observable o){
// Add the new observable to subscriptions
}
}
Update.
Я уже общался с этим. Используя ответ @ DaveMoten, я подошел.
Вот мой метод, чтобы добавить новый наблюдаемым по желанию, и получать уведомления, когда все они закончили (псевдо-код):
class central {
PublishSubject<Observable<T>> subject = PublishSubject.create();
int commandCount = 0;
int concatCount = 0;
addObservable(Observable newObs){
commandCount++;
concatCount++;
subject.concatMap(o ->
o.doOnCompleted(
() -> {
concatCount--;
LOG.warn("inner completed, concatCount: " + concatCount);
})
).doOnNext(System.out::println)
.doOnCompleted(() -> {
System.out.println("OUTER completed");
}) .subscribe();
onNext(newObs);
subject.onCompleted(); // If this is here, it ends prematurely (after first onComplete)
// if its not, OUTER onComplete never happens
newObs.subscribe((e) ->{},
(error) -> {},
() -> {
commandCount--;
LOG.warn("inner end: commandCount: " + commandCount);
});
}
}
// ... somewhere else in the app:
Observable ob1 = Observable.just(t1);
addObservable.addObservable(ob1);
// ... and possibly somewhere else:
Observable ob2 = Observable.just(t2, t3);
addObservable(ob2);
// Now, possibly somewhere else:
ob1.onNext(1);
// Another place:
ob2.onNext(2);
Каротаж выглядит так:
19:51:16.248 [main] WARN execute: commandCount: 1
19:51:17.340 [main] WARN execute: commandCount: 2
19:51:23.290 [main] WARN execute: commandCount: 3
9:51:26.969 [main] WARN inner completed, concatCount: 2
19:51:27.004 [main] WARN inner end: commandCount: 2
19:51:27.008 [main] WARN inner completed, concatCount: 1
19:51:27.009 [main] WARN inner end: commandCount: 1
19:51:51.745 [ProcessIoCompletion0] WARN inner completed, concatCount: 0
19:51:51.750 [ProcessIoCompletion0] WARN inner completed, concatCount: -1
19:51:51.751 [ProcessIoCompletion0] WARN inner end: commandCount: 0
19:51:51.752 [ProcessIoCompletion0] WARN inner completed, concatCount: -2
19:51:51.753 [ProcessIoCompletion0] WARN inner completed, concatCount: -3
UPDATE: Я добавил несколько счетчиков, которые демонстрируют, что я не понимаю, что происходит с concatMap. Вы можете видеть, что лямбда-подписка на самом мониторе отсчитывается до нуля правильно, но concatMap oncomplete снижается до -3! И OUTER закончен никогда не бывает.
Когда вы говорите, что преждевременно вы имеете в виду? Я не вижу проблемы. Я проверил тест, и он напечатал «1, внутренний завершен, 2, 3, внутренний завершен, внешний завершен». Это всего лишь билет. –
@DaveMoten Проблема заключается в методе с неопределенным количеством новых наблюдаемых приходов. Я не могу так называть onComplete, потому что он заставляет объект заканчиваться, даже если новые наблюдаемые были добавлены. Обновленный пример, чтобы попытаться сделать более понятным. – mtyson
Все в порядке. Это нормально никогда не называть 'subject.onCompleted()'! –