2015-12-04 6 views
1

Я использую Rx.js для обработки содержимого файла, создания HTTP-запроса для каждой строки и последующего объединения результатов. Однако исходный файл содержит тысячи строк, и я перегружаю удаленный http api, с которым я выполняю HTTP-запрос. Мне нужно убедиться, что я ожидаю, что существующий запрос HTTP будет обратный вызов, прежде чем запускать другой. Я был бы открыт для пакетной обработки и выполнения n запросов одновременно, но для этого скрипта, выполняющего запросы в последовательном порядке, достаточно.Rx.js ждет завершения обратного вызова

У меня есть следующие:

const fs = require('fs'); 
const rx = require('rx'); 
const rxNode = require('rx-node'); 

const doHttpRequest = rx.Observable.fromCallback((params, callback) => { 
    process.nextTick(() => { 
    callback('http response'); 
    }); 
}); 

rxNode.fromReadableStream(fs.createReadStream('./source-file.txt')) 
    .flatMap(t => t.toString().split('\r\n')) 
    .take(5) 
    .concatMap(t => { 
    console.log('Submitting request'); 

    return doHttpRequest(t); 
    }) 
    .subscribe(results => { 
    console.log(results); 
    }, err => { 
    console.error('Error', err); 
    },() => { 
    console.log('Completed'); 
    }); 

Однако это не выполняет запросы HTTP в последовательный. Она выводит:

 
Submitting request 
Submitting request 
Submitting request 
Submitting request 
Submitting request 
http response 
http response 
http response 
http response 
http response 
Completed 

Если удалить вызов concatAll() то запросы в последовательный, но моя подписываться функция видит наблюдаемые прежде, чем запросы HTTP вернулись.

Как я могу выполнять HTTP-запросы серийно, чтобы результат был следующим?

 
Submitting request 
http response 
Submitting request 
http response 
Submitting request 
http response 
Submitting request 
http response 
Submitting request 
http response 
Completed 
+2

В качестве побочного примечания вы можете уменьшить сложность путем слияния операторов. 'map' +' flatMap' => 'flatMap',' map' + 'concatAll' =>' concatMap'. – paulpdaniels

+0

Спасибо, я обновил пример, чтобы отразить это – toby

ответ

1

Проблема здесь, вероятно, что при использовании rx.Observable.fromCallback, функция, которую вы прошли в аргументе выполняется немедленно. Возвращаемое наблюдение будет содержать значение, переданное обратному вызову в более поздний момент времени. Чтобы лучше понять, что происходит, вы должны использовать несколько более сложное моделирование: количество ваших запросов, чтобы они возвращали фактический (по-разному для каждого запроса) результат, который вы можете наблюдать через подписку.

Что я постулировать, что здесь происходит:

  • take(5) вопросы 5 значений
  • map вопросы 5 сообщений журнала, выполняет 5 функций и проходит по 5 наблюдаемым
  • этих 5 наблюдаемых обрабатываются с помощью concatAll и значений выпущенные этими наблюдаемыми, будут в порядке, как ожидалось. То, что вы заказываете здесь, является результатом вызова функций, а не вызовов самих функций.

Для достижения своей цели, вам нужно вызвать ваш наблюдаемый завод (rx.Observable.fromCallback) только тогда, когда concatAll выписывает ему, а не в момент создания. Для этого вы можете использовать defer: https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/defer.md

Так что ваш код превратится в:

rxNode.fromReadableStream(fs.createReadStream('./path-to-file')) 
    .map(t => t.toString().split('\r\n')) 
    .flatMap(t => t) 
    .take(5) 
    .map(t => { 
    console.log('Submitting request'); 

    return Observable.defer(function(){return doHttpRequest(t);}) 
    }) 
    .concatAll() 
    .subscribe(results => { 
    console.log(results); 
    }, err => { 
    console.error('Error', err); 
    },() => { 
    console.log('Completed'); 
    }); 

Вы можете увидеть подобный вопрос с отличным объяснением здесь: How to start second observable *only* after first is *completely* done in rxjs

Ваш журнал, вероятно, до сих пор показывают 5 последовательных сообщений «Отправка запроса». Но ваш запрос должен быть выполнен один за другим, если вы хотите.

Смежные вопросы