2016-02-09 2 views
1

Я довольно новичок в RxJS. Таким образом, у меня есть поток, который создает полезную нагрузку для различных вызовов ajax, затем я использую flatMap для извлечения необходимых данных, и он отлично работает. Легко.RxJS Promises Chaining

const streamA = Rx.Observable.from(array); 

const streamB = streamA 
    .map(val => /* build payload */) 
    .flatMap(payload => Rx.Observable.fromPromise($.ajax(payload)) 

streamB.subscribe(result => /* got it */) 

Теперь я хотел бы создать массив полезной нагрузки для каждого элемента, но проблема в том, что теперь, когда я подписываюсь поток я получаю каждый запрос обратно, но я бы только начальные элементы должны быть возвращены после завершения ,

const streamC = streamA 
    .flatMap(payloads => { 
    return Rx.Observable.from(payloads) 
     .flatMap(payload => Rx.Observable.fromPromise($.ajax(payload)) 

streamC.subscribe(result => /* executed for every payload */) 

Я попытался с groupBy, который возвращает правильный сгруппированный массив и показал мне, что я могу цепь Observable, но я до сих пор не могу понять, как правильно подписаться наблюдатели, чтобы получить элементы выполнены.

const streamWLF = streamA 
    .flatMap(payloads => { 
    return Rx.Observable.from(payloads) 
     .flatMap(payload => Rx.Observable.fromPromise($.ajax(payload)) 
     .groupBy((obs) => obs.key, (obs) => obs) 

streamWLF.subscribe(result => { 
    result.subscribe(/* did my magic here*/); 
}) 

Так что мой вопрос в том, что это лучший способ сделать это?

Включен ли основной поток, подписанный, когда принимается элемент subStream?

И, если возможно, как я могу подписаться на subStream, чтобы запустить mainStream, подписанный только после завершения subStream?

+0

Возможно, вы должны использовать concatMap вместо flatMap. – xgrommx

+0

Я не понимаю вашу проблему. Что вы имеете в виду, что «у вас будут возвращены только начальные элементы, когда они будут завершены»? Обещания возвращают только один элемент. Вы пытаетесь связать результаты каждого набора полезных данных вместе? – paulpdaniels

+0

@paulpdaniels Да, у меня есть элементы, содержащие массив полезных нагрузок, и я хочу вернуть обратно, а не все полезные нагрузки в потоке, но элементы с возвращаемыми полезными нагрузками внутри. – user2131283

ответ

0

Я думаю, что вы ищете оператора forkJoin.

const streamC = streamA 
    .flatMap(payloads => Rx.Observable.forkJoin(payloads)); 

streamC.subscribe(results => /*An array containing the results from all payloads*/);