2014-09-30 2 views
5

Я пытаюсь использовать RxJS для написания сценария для обработки нескольких сотен файлов журналов, каждый из которых составляет около 1 ГБ. Скелет сценария выглядитКак ограничить параллелизм flatMap?

Rx.Observable.from(arrayOfLogFilePath) 
.flatMap(function(logFilePath){ 
    return Rx.Node.fromReadStream(logFilePath) 
    .filter(filterLogLine) 
}) 
.groupBy(someGroupingFunc) 
.map(someFurtherProcessing) 
.subscribe(...) 

код работает, но обратите внимание, что стадия фильтрации всех файлов журналов начнется одновременно. Однако с точки зрения производительности IO файловой системы предпочтительнее обрабатывать один файл за другим (или, по крайней мере, ограничивать параллелизм несколькими файлами, а не открывать все сотни файлов за одно и то же время). В этом отношении, как я могу реализовать его «функциональным реактивным способом»?

Я думал о планировщике, но не мог понять, как это может помочь здесь.

+0

У меня есть один и тот же вопрос, но с Rx.NET. Является ли это возможным? http://stackoverflow.com/questions/37345516/limiting-concurrent-requests-using-rx-and-selectmany – SuperJMN

ответ

12

Вы можете использовать .merge(maxConcurrent), чтобы ограничить параллелизм. Поскольку .merge(maxConcurrent) выравнивает метаобследование (наблюдаемый наблюдаемых) в наблюдаемый, вам необходимо заменить .flatMap на .map, чтобы выход был метаобслуживаемым («unflat»), затем вы вызываете .merge(maxConcurrent).

Rx.Observable.from(arrayOfLogFilePath) 
.map(function(logFilePath){ 
    return Rx.Node.fromReadStream(logFilePath) 
    .filter(filterLogLine) 
}) 
.merge(2) // 2 concurrent 
.groupBy(someGroupingFunc) 
.map(someFurtherProcessing) 
.subscribe(...) 

Этот код не был проверен (так как у меня нет доступа к среде разработки у вас есть), но это, как действовать. RxJS не имеет много операторов с параметрами параллелизма, но вы почти всегда можете делать то, что вам нужно, с помощью .merge(maxConcurrent).

+1

Это именно то, что я пытаюсь работать. У меня есть список 500 URL-адресов для загрузки и не хочу одновременно запускать все запросы. Я использовал карту (5), но она не работает ... Все запросы сделаны одновременно. – Roaders

+0

@Roaders вы получили это решение? Я пытаюсь сделать то же самое. Но каждый запрос уволен одновременно. Я везде гугл, и ничего не нашел. – Diego

+0

Если вы делаете асинхронный HTTP-вызов, например, вам нужно обернуть его в Rx.defer(), чтобы Rx мог решить, когда будет выполнен вызов (и повторите попытку, если он не сработает, например) – Roaders

0

Я только что решил аналогичную проблему с RxJs 5, поэтому я надеюсь, что решение может помочь другим с аналогичной проблемой.

// Simulate always processing 2 requests in parallel (when one is finished it starts processing one more), 
 
// retry two times, push error on stream if retry fails. 
 

 
//const Rx = require('rxjs-es6/Rx'); 
 

 
// -- Global variabel just to show that it works. -- 
 
let parallelRequests = 0; 
 
// -------------------------------------------------- 
 

 
function simulateRequest(req) { 
 
    console.log("Request " + req); 
 
    // --- To log retries --- 
 
    var retry = 0; 
 
    // ---------------------- 
 

 
    // Can't retry a promise, need to restart before the promise is made. 
 
    return Rx.Observable.of(req).flatMap(req => new Promise((resolve, reject) => { 
 

 
     var random = Math.floor(Math.random() * 2000); 
 
     // -- To show that it works -- 
 
     if (retry) { 
 
      console.log("Retrying request " + req + " ,retry " + retry); 
 
     } else { 
 

 
      parallelRequests++; 
 
     } 
 
     // --------------------------- 
 
     setTimeout(() => { 
 
      if (random < 900) { 
 
       retry++; 
 
       return reject(req + " !!!FAILED!!!"); 
 
      } 
 

 
      return resolve(req); 
 
     }, random); 
 
    })).retry(2).catch(e => Rx.Observable.of(e)); 
 
} 
 

 
Rx.Observable.range(1, 10) 
 
    .flatMap(e => simulateRequest(e), null, 2) 
 
    // -- To show that it works -- 
 
    .do(() => { 
 
     console.log("ParallelRequests " + parallelRequests); 
 
     parallelRequests--; 
 
    }) 
 
    // --------------------------- 
 
    .subscribe(e => console.log("Response from request " + e), e => console.log("Should not happen, error: " + e), e => console.log("Finished"));
<script src="https://npmcdn.com/@reactivex/[email protected]/dist/global/Rx.umd.js"></script>

Смежные вопросы