2014-10-19 11 views
3

В RxJS, когда вы хотите последовательно запускать HTTP-запросы, вы их связываете. Но я не понимаю, как я могу запускать запросы параллельно? Я видел в примерах на http://reactive-extensions.github.io/learnrx/, что они используют Observable.zip() для одновременного запуска 2 запросов. Но как вы будете запускать 5 запросов параллельно? В частности, как настроить, чтобы моя функция вызывалась:Выполнение задач async параллельно

  • когда все 5 в комплекте?
  • если сначала полный?

ответ

0

Это довольно старый вопрос, но без принятого ответа. Ответ, который вы ищете, может быть на удивление простым: concatMap.

Когда создается обещание, оно начинает выполнение немедленно, поэтому они выполняются параллельно; в то время, когда значения испускаются из одного наблюдаемого, они находятся в серийном режиме.

Итак, объедините эти два, для следующего фрагмента кода, наблюдаемые от обещания выполняются параллельно, и результат их испускается в последовательном порядке, поскольку concatMap помещает их в один поток в том порядке, в котором они созданы.

Rx.Observable.from(urls_array) 
.concatMap(function(url) { return Rx.Observable.fromPromise(Promise.resolve($.get(url))) }) 
.subscribe(
    function(jsonObj) { 
    // first result will arrive first 
    }, 
    function(err) { }, 
    function() { 
    // all completed 
    } 
) 
-4

Вы можете посмотреть на https://www.npmjs.org/package/async

Это модуль узел, который может быть использован в браузере тоже.

+0

Я специально попросил решение RxJS. Я знаю об async.js и большинстве других асинхронных вспомогательных библиотек для javascript. –

0

Может быть, эта ссылка будет полезна для Вас http://xgrommx.github.io/rx-book/content/core_objects/observable/observable_methods/forkjoin.html

+1

ссылка сломана - почему бы не написать основные шаги в ответе? – silverfighter

+0

Извините, меня переработала моя книга. Прямая ссылка http://xgrommx.github.io/rx-book/content/observable/observable_methods/forkjoin.html – xgrommx

+2

Ссылка выше действительно ужасная, самостоятельная реклама? Но да, функция 'forkJoin', вроде эквивалента' Promise.all', которая может быть лучше видна на официальной Rx rep: https://github.com/Reactive-Extensions/RxJS/blob/master/doc /api/core/operators/forkjoin.md –

0

.zip() может помочь вам в этом!

const a$ = Observable.interval(200).take(6) 
const b$ = Observable.interval(300).take(10) 
const c$ = Observable.interval(400).take(3) 
    .zip(b$,a$) 
    .subscribe(v=>console.log(v)) 


// marble 
-0-1-2-3-4-5| (a$) 
--0--1--2--3--4| (b$) 
---0---1---2| (c$) 
    zip(a$, b$) 
---[0,0,0]---[1,1,1]---[2,2,2]| 

// console.log 
[0,0,0] 
pause(400ms) 
[1,1,1] 
pause(400ms) 
[2,2,3] 

.zip (arg1, arg2, (сам, arg1, arg2) => йоЗотеЬЫпд())

const a$ = Observable.interval(200).take(6) 
const b$ = Observable.interval(300).take(10) 
const c$ = Observable.interval(400).take(3) 
    .zip(b$,a$, (c,b,a)=>a+b+c) 
    .subscribe(v=>console.log(v)) 

// console.log() 
0 
pause(400ms) 
3 = (1+1+1) 
pause(400ms) 
9 = (3+3+3) 

Или

слияния() + flatMap()

import Rx, { Observable } from 'rxjs' 
import axios from 'axios' 

const promiseA = axios.get('https://jsonplaceholder.typicode.com/users/1') 
    , promiseB = axios.get('https://jsonplaceholder.typicode.com/users/2') 
    , promiseC = axios.get('https://jsonplaceholder.typicode.com/users/3') 

Observable.interval(0).take(1) 
    .flatMap(()=>Observable.merge(promiseA, promiseB, promiseC)) 
    // flatMap will resolve the promise for you! 
    .map(res=>res.data.username) 
    .reduce((arr,item)=>arr.concat(item),[]) 
    .subscribe(v=>console.log(v)) // [ 'Samantha', 'Antonette', 'Bret' ] 
0

Используйте combineLatest или enter link description here!

// Assume you have an array of urls 
const urls = [ 
    "twitter.com/puppies.json", 
    "google.com/puppies.json", 
    "facebook.com/puppies.json" 
]; 

// Let's map these urls to Ajax Observables 
const requests = urls.map(url => Rx.DOM.Ajax.getJSON(url)) 

// Now combine the result from each request into an observable 
// Here's combineLatest: 
const allThePuppies$ = Rx.Observable.combineLatest(...urls) 
// Alternatively, here's forkJoin: 
const allThePuppies$ = Rx.Observable.forkJoin(urls) 


// When you subscribe to `allThePuppies$`, you'll kick off all your requests in parallel, and your response will contain an array with the results from each request: 
allThePuppies$.subscribe(results => { 
    const twitterPuppies, googlePuppies, facebookPuppies = results; 
    // Do what you must with the respective responses 
    // (Presumably in this example you'd show your users some adorable pics of puppies) 
}) 

combineLatest принимает произвольное число наблюдаемых, и как только каждый из них испускается по крайней мере одно значение, то оно будет излучать массив последнего значения из каждой наблюдаемой, когда любой из этих наблюдаемых пожаров.

Это ужасно абстрактно. Для наших целей мы знаем, что несколько запросов ajax будут реалистично выделяться только один раз. Итак, если мы используем combineLatest для нескольких наблюдаемых ajax, мы получим наблюдаемое, которое испускает массив результатов от каждого из запросов ajax.

forkJoin похож на combineLatest, но он только излучает свой массив ответов, как только каждая из его составляющих наблюдаемых завершена.

Смежные вопросы