2017-02-22 26 views
0

Я новичок в Rxjs, и я хотел бы знать, можно ли решить эту проблему.Сжатие нескольких наблюдаемых RxJS и выполнение задачи подсчета

Дан массив из 5 наблюдаемой, каждый уже излучаемый некоторые данные, как показано ниже:

ичных [0]: { 'A', 'B', 'C'}

ичных [1] : { 'г', NULL}

ичных [2]: { '1', 'е', '3', NULL}

ичных [3]: { 'х', 'у', 'z'}

ary [4]: ​​{'h', 'v'}

Я хотел бы иметь новое наблюдаемое, которое испускает значение для подсчета ненулевых последних элементов этих 5 наблюдаемых. В приведенном выше случае первое значение, излучаемое новым наблюдаемым, равно 3 (c, z, v).

Всякий раз, когда новые данные испускаются из одного из наблюдаемых, новый наблюдаемый также испускает результат подсчета. Например, когда

ичных [4]: ​​{ 'ч', 'v'} становится

ичных [4]: ​​{ 'ч', 'v', NULL}

новая наблюдаемая будет излучать 2 (c, z).

+0

Можем ли мы описать источник данных, как это: https://plnkr.co/edit/e30MpsacNahHy0VUdQc7?p=preview? – Maxime

+0

Если вы можете обновить свой вопрос с помощью небольшой мраморной диаграммы, я могу помочь :) – Maxime

ответ

1

Вы можете использовать combineLatest с последующим map:

const baseObs = [ 
 
    new Rx.Subject(), 
 
    new Rx.Subject(), 
 
    new Rx.Subject(), 
 
    new Rx.Subject(), 
 
    new Rx.Subject(), 
 
] 
 

 
const counter$ = Rx.Observable 
 
    .combineLatest(baseObs) 
 
    .map(dataList => dataList.filter(d => d != null).length); 
 
    
 
counter$.subscribe(
 
    num => console.log(`Num is ${num}`) 
 
); 
 

 
baseObs[0].next("a"); 
 
baseObs[1].next(null); 
 
baseObs[2].next(null); 
 
baseObs[3].next("z"); 
 
baseObs[4].next("v"); 
 

 
setTimeout(() => baseObs[4].next(null), 300);
<script src="https://unpkg.com/rxjs/bundles/Rx.min.js"></script>

Смежные вопросы