2017-01-19 4 views
11

я выдвигаю наблюдаемые в массив, как, например ...RXJS Подождите все наблюдаемые в массиве для завершения (или ошибка)

var tasks$ = []; 
tasks$.push(Observable.timer(1000)); 
tasks$.push(Observable.timer(3000)); 
tasks$.push(Observable.timer(10000)); 

Я хочу наблюдаемый, который излучает, когда все задачи $ завершены. Имейте в виду, что на практике задачи $ не имеют известного числа наблюдаемых.

Я пробовал Observable.zip(tasks$).subscribe(), но это кажется неудачным в случае, если есть только одна задача, и заставляет меня полагать, что для ZIP требуется четное количество элементов, чтобы работать так, как я ожидал.

Я пробовал Observable.concat(tasks$).subscribe(), но результат оператора concat просто кажется массивом наблюдаемых ... например. в основном то же, что и вход. Вы даже не можете подписаться на него.

в C# это будет сродни Task.WhenAll(). в ES6 обещал, что это будет похоже на Promise.all().

Я столкнулся с рядом вопросов, но все они, похоже, имеют дело с ожиданием на известном количестве потоков (например, их сопоставление).

+1

Это зависит от того, что вы хотите сделать, когда какое-либо из Наблюдателей отправит уведомление об ошибке. Вы хотите просто проигнорировать ошибку или это означает, что весь результат будет отброшен, и вы получите только ошибку. – martin

ответ

26

Если вы хотите, чтобы составить наблюдаемый, который излучает, когда все источник наблюдаемым завершено, вы можете использовать forkJoin:

import { Observable } from 'rxjs/Observable'; 
import 'rxjs/add/observable/forkJoin'; 
import 'rxjs/add/operator/first'; 

var tasks$ = []; 
tasks$.push(Observable.timer(1000).first()); 
tasks$.push(Observable.timer(3000).first()); 
tasks$.push(Observable.timer(10000).first()); 
Observable.forkJoin(...tasks$).subscribe(results => { console.log(results); }); 
+6

Будьте осторожны, если 'tasks' построен динамически и пуст, forkJoin остановит наблюдаемую последовательность. См. Мой ответ здесь для получения дополнительной информации. Http://stackoverflow.com/a/42622968/1224564 – bgondy

+0

O третьего наблюдаемого не было в капитале, как было объявлено, поэтому оно стало видимым, не определено. Кроме того, задачам не хватает знака окончания доллара перед подпиской. Я не могу отредактировать ответ, потому что: Редактирование должно быть не менее 6 символов; есть ли что-то еще для улучшения этой должности? –

+0

Я не хочу называть вас навсегда, но я думаю, что результат не будет отображаться, потому что поток наблюдаемых не завершен. Возможно, просто добавьте .first() после вызова таймера или .take (n), чтобы получить более интересный результат. forkJoin был оператором, которого я искал, спасибо @cartant! –

0

Для меня это sample был лучшим решением.

const source = Observable.interval(500); 
const example = source.sample(Observable.interval(2000)); 
const subscribe = example.subscribe(val => console.log('sample', val)); 

Так .. только тогда, когда второй (пример) испускают - вы увидите последнее emited значение первого (источник).

В моей задаче я жду подтверждения формы и другого события DOM.

Смежные вопросы