2016-08-20 2 views
5

Я предполагаю, что это должно быть легко достигнуто, но у меня проблемы (концептуально, я думаю), выясняя, как справиться с этим.Дозирование с использованием RxJS?

У меня есть API, который возвращает массив объектов JSON. Мне нужно пройти через эти объекты, и для каждого объекта сделать еще один вызов AJAX. Проблема в том, что система, которая обрабатывает каждый вызов AJAX, может обрабатывать только два активных вызова за раз (поскольку это довольно сложная для ЦП задача, которая подключается к настольному приложению).

Мне было интересно, как я мог бы достичь этого, используя RxJS (используя версию 5 или 4)?

РЕДАКТИРОВАТЬ: Кроме того, возможно ли одновременное выполнение цепочки шагов. т.е.

Downloading File: 1 Processing File: 1 Converting File: 1 Uploading File: 1 Downloading File: 2 Processing File: 2 Converting File: 2 Uploading File: 2 Downloading File: 3 Processing File: 3 Converting File: 3 Uploading File: 3

Я пытался сделать что-то вроде:

Rx.Observable.fromPromise(start()) 
    .concatMap(arr => Rx.Observable.from(arr)) 
    .concatMap(x => downloadFile(x)) 
    .concatMap((entry) => processFile(entry)) 
    .concatMap((entry) => convertFile(entry)) 
    .concatMap((entry) => UploadFile(entry)) 
    .subscribe(
     data => console.log('data', new Date().getTime(), data), 
     error => logger.warn('err', error), 
     complete => logger.info('complete') 
    ); 

Однако это не похоже на работу. DownloadFile, например, не дожидается завершения процесса processFile, convertFile и uploadFile, скорее, следующий будет запускаться снова, как только предыдущий завершится.

+0

http://xgrommx.github.io/rx-book/content/observable/observable_instance_methods/subscribe.html – cport1

+0

@ cport1 - связанный пример выполняется один за другим, не дожидаясь какой-либо операции async. Это отличается от того, что я прошу. – NRaf

ответ

4

Вот 2 подхода, если вы хотите, чтобы последовательность запросов точно, как этот

Downloading File: 1 
Processing File: 1 
Converting File: 1 
Uploading File: 1 
Downloading File: 2 
Processing File: 2 
... 

Вы должны решить все обещания внутри один метод concatMap, например,

Rx.Observable.fromPromise(getJSONOfAjaxRequests()) 
    .flatMap(function(x) { return x;}) 
    .concatMap(function(item) { 
    return downloadFile(item) 
     .then(processFile) 
     .then(convertFile); 
    }) 
    .subscribe(function(data) { 
    console.log(data); 
    }); 

см. Рабочий plunkr здесь: https://plnkr.co/edit/iugdlC2PpW3NeNF2yLzS?p=preview Таким образом, новый вызов ajax будет отправлен только в том случае, если предыдущий закончен.

Другим подходом является то, что разрешать отправку файлов параллельно, но загрузка, обработка, конвертация, загрузка операций выполняется в последовательности.Для этого вы можете заставить его работать на

Rx.Observable.fromPromise(getJSONOfAjaxRequests()) 
    .flatMap(function(x) { return x;}) 
    .merge(2) // in case maximum concurrency required is 2 
    .concatMap(function(item) { 
    return downloadFile(item); 
    }) 
    .concatMap(function(item) { 
    return processFile(item); 
    }) 
    .concatMap(function(item) { 
    return convertFile(item) 
    }) 
    .subscribe(function(data) { 
    //console.log(data); 
    }); 

см plunkr здесь: https://plnkr.co/edit/mkDj6Q7lt72jZKQk8r0p?p=preview

+0

Спасибо, первый из тех, с чем я пошел, хотя второй был бы идеальным, если бы я мог ограничить количество параллельных операций (т.е. max 2 за раз). – NRaf

+0

Кстати, это первое решение, похоже, работает только с rxjs5. Поведение с использованием версии 4 запускает их все параллельно. – NRaf

+0

странно! теоретически поведение должно оставаться неизменным на rxjs4 и rxjs5. – FarazShuja

1

Как насчет чего-то подобного? Вы можете использовать from, чтобы разбить массив на куски размером с укусом и обработать их один за другим, используя concatMap.

function getArr() { 
    return Rx.Observable.of([1, 2, 3, 4, 5, 6, 7, 8]); 
} 


function processElement(element) { 
    return Rx.Observable.of(element) 
     .delay(500); 
} 


getArr() 
    .concatMap(arr => { 
     return Rx.Observable.from(arr); 
    }) 
    .concatMap(element => { 
     return processElement(element); 
    }) 
    .subscribe(res => { 
     console.log(res); 
    }); 
+0

Это выглядит многообещающим, но можно ли его расширить? то есть, если я хотел бы добавить еще один шаг? – NRaf

+0

Я считаю, что вы можете продлить его, да. Вы можете продолжать добавлять concatMaps или любой другой оператор, который вам нужен. Здесь есть много хорошей информации http://reactivex.io/rxjs/class/es6/Observable.js~Observable.html – jcolemang

2

Вы можете использовать merge оператор с maxConcurrency перегрузки (Rxjs v4), так что-то вроде:

Rx.Observable.fromArray(aJSONs) 
    .map (function (JSONObject) { 
    return ajaxRequest(JSONObject) // that would be an observable (or a promise) 
    }) 
    .merge(2) 

Вы можете посмотреть, чтобы увидеть другие примеры использования в:

Официальная документация:

Смежные вопросы