2016-09-23 1 views
1

Я использую highland.js для обработки файла с использованием потока для чтения содержимого между двумя разделителями. Я также использую async.js для последовательного запуска последовательности запросов HTTP.Обработка потоков высокогорного потока с использованием async

В идеале я хотел бы передать вывод x из высокогорья в качестве первой функции в цепочку async (цепочку), чтобы HTTP-запросы выполнялись для каждого фрагмента, извлеченного из потока.

Возможно ли это? Если да, то как это можно достичь?

var async = require('async'); 
var _ = require('highland'); 


_(fs.createReadStream(files[0], { encoding: 'utf8' })) 
     .splitBy('-----BEGIN-----\n') 
     .splitBy('\n-----END-----\n') 
     .filter(chunk => chunk !== '') 
     .each(function (x) { 
     }).done(function() { 

    async.series([ 
     function(callback) { 
      setTimeout(function() { 
       console.log('Task 1'); 
       callback(null, 1); 
      }, 300); 
     }, 
     function(callback) { 
      setTimeout(function() { 
       console.log('Task 2'); 
       callback(null, 2); 
      }, 200); 
     }, 
    ], function(error, results) { 
     console.log(results); 
    }); 

});; 
+0

Что такое 'done' в этом случае? – abdulbarik

+0

Почему 'x' не передается функции на' javascript' на вопрос? – guest271314

+0

'done' http://highlandjs.org/#done был похмелье из моего рабочего кода и сообщает процессу потока закончить. Я мог бы, вероятно, удалить его. – user1513388

ответ

1

Вы можете избавиться от звонков each и done. После фильтрации вы можете следить за ним с помощью .toArray(callback). Обратный вызов передается массивом, который содержит результаты с высоты. Вы можете реорганизовать как это

var Q = require('q'); 
var _ = require('highland'); 


_(fs.createReadStream(files[0], { encoding: 'utf8' })) 
     .splitBy('-----BEGIN-----\n') 
     .splitBy('\n-----END-----\n') 
     .filter(chunk => chunk !== '') 
     .each(asyncTasks); 

function asyncTasks(x) { // here, x will be each of the results from highland 
    async.series([ 
     // do something with x results 
     function(callback) { 
      console.log('Task 1'); 
      callback(null, 1); 
     }, 
     // do something else with x results 
     function(callback) { 
      console.log('Task 2'); 
      callback(null, 2); 
     }, 
    ], function(error, results) { 
     console.log(results); 
    }); 
} 

here является ссылка на документацию для toArray. toArray потребляет поток, точно так же, как done также. Дайте знать, если у вас появятся вопросы.

Хотя, честно говоря, я думаю, вам лучше использовать обещания. Хотя часть этого является лишь личным предпочтением, отчасти потому, что он делает код более читаемым. Начиная с what I've read, async более эффективен, чем обещания, но приятная часть обещаний заключается в том, что вы можете передавать результаты от одной функции к другой. Поэтому в вашем примере вы можете сделать некоторые вещи до x в первой части, а затем передать измененный результат следующей функции, следующей функции и т. Д. Где, когда вы используете async.series, вы завершаете каждую функцию, вызывая callback(null, result), и вы не получите результаты, пока не закончите в самом конце серии, когда получите все результаты от всех вызовов до callback. Теперь вы всегда можете сохранить свои результаты в какой-то переменной за пределами async.series, но это сделает ваш код более грязным. Если вы хотите переписать его с обещаниями, это будет выглядеть следующим образом. Я использую здесь q, но это всего лишь одна из многих библиотек обещаний, которые вы могли бы использовать.

var async = require('async'); 
    var _ = require('highland'); 


    _(fs.createReadStream(files[0], { encoding: 'utf8' })) 
      .splitBy('-----BEGIN-----\n') 
      .splitBy('\n-----END-----\n') 
      .filter(chunk => chunk !== '') 
      .each(asyncTasks); 

    function asyncTasks(x) { // here, x will be an array of the results from highland 
     return asyncTask1(x) 
       .then(asyncTask2) 
       .then(asyncTask3) 
    } 

    function asyncTask1(x) { 
     var deferred = Q.defer(); 

     // do some stuff 

     if (// some error condition) { 
     deferred.reject(); 
     } else { 
     deferred.resolve(x); // or pass along some modified version of x 
     } 

     return deferred.promise; 
    } 

    function asyncTask2(x) { 
     // same structure as above 
    } 

    function asyncTask3(x) { 
     // same structure as above 
    } 

Некоторого асинхронный API, в эти дни начали возвращать обещания, в дополнении к принимающему обратному вызову, или иногда вместо. Так что было бы хорошо, когда вам было бы удобно. Обещания очень полезны. Вы можете узнать о них подробнее here и here.

+0

Спасибо, я пробую это сейчас. Почему Q лучше и как он будет выглядеть, используя его. Я был бы счастлив переключиться, если бы это было лучше, – user1513388

+0

Похоже, что x - это массив всех кусков, которые выступают против всего фрагмента, который необходимо передать в цепочку асинхронов. – user1513388

+0

О, хорошо. Извините, я неправильно понял вашу потребность. Я обновил код. Просто небольшое изменение. Теперь вы можете позвонить каждому, а затем передать ту же функцию, что и раньше. Теперь асинхронные вызовы будут вызываться по каждому результату из высокогорья. Каждый из них потребляет поток, поэтому вам не нужно также вызывать 'done(). –

Смежные вопросы