2015-05-20 10 views
0

Как я могу избежать использования рекурсивной структуры, когда у меня появилось несколько потоков, и мне нужно получить абсолютное событие end, чтобы закончить логику.Избегайте обратного вызова ад с несколькими потоками

var someArray = ['file1', 'file2', 'file3']; 

someArray.forEach(function(file) { 
    fs 
     .createReadStream(file) 
     .pipe(/* do some stuff */) 
     .on('data', function(usageInfo) { 

      // done? 

     }); 
} 

У меня есть несколько файлов, которые я должен обрабатывать через tp некоторые процессы. Как настроить событие, которое сообщает мне, когда все они сделаны? В настоящее время я получаю каждый end событие индивидуально.

Я могу абсолютно запустить каждый поток в одно и то же время. Мне просто нужно как-то собрать конец?

Я мог бы вызвать вызов функции для каждого end события и считать это ... это звучит Hacky хотя? ...

Я чувствую, что есть способ сделать это с обещаниями, но я не знаю, как.

+2

Вы пробовали обещания? – laggingreflex

+0

да, так что я вроде как не упоминал об этом! :) Я хочу использовать обещания, но не могу понять, как с потоками здесь? – Dominik

+1

Две вещи, которые вы должны проверить: 1. Найдите способы использования [обещаний с потоками] (http://google.com/search?q=promises+with+stream+javascript), вы найдете много материала. [этот кажется хорошим] (http://www.bennadel.com/blog/2832-exposing-promise-deferred-functionality-on-streams-in-node-js.htm) и 2. [Продвижение Bluebird] (https : //github.com/petkaantonov/bluebird/blob/master/API.md#option-promisifier), в основном превратить любую асинхронную функцию в обещание упростить процесс – laggingreflex

ответ

3

Я чувствую, что есть способ сделать это с обещаниями, но я не знаю, как это сделать.

Да, есть. Как обещает сделать представляют асинхронные значения, вы получите обещание в конце одного потока:

var p = new Promise(function(resolve, reject) { 
    fs.createReadStream(file) 
    .on('error', reject) 
    .pipe(/* do some stuff */) 
    .on('end', resolve) 
    .on('error', reject); // call reject(err) when something goes wrong 
}); 

Он может быть использован как p.then(functio(usageInfo) { console.log("stream ended"); }).

Теперь, если вы создадите несколько обещаний, один для имени файла в вашем массиве, все потоки будут работать параллельно и разрешить их соответствующие обещания, когда это будет сделано. Затем вы можете использовать Promise.all для сбора - прочитайте «ожидание» - все результаты от каждого из них в новое обещание для массива результатов.

var promises = ['file1', 'file2', 'file3'].map(function(file) { 
    return new Promise(function(resolve, reject) { 
     … 
    }); 
}); 
Promise.all(promises).then(function(usageInfos) { 
    console.log("all of them done", usageInfos), 
}, function(err) { 
    console.error("(At least) one of them failed", err); 
}); 
+0

Спасибо. Это похоже на то, что я искал. Благодарю. Я все еще борюсь с данными. По сути, процесс pipe дает мне данные только на событие 'data' и теперь выплевывает« незавершенный бизнес »... Я думаю, что это может быть еще один вопрос :) – Dominik

+0

Возможно, посмотрите на [этот пример] (http: // stackoverflow. com/a/24945351/1048572), где несколько событий данных собираются в потоке до разрешения обещания, когда поток заканчивается. – Bergi

+0

Вы только прикрепляете обработчик ошибок к потоку назначения, труба не пересылает ошибки из источника в пункт назначения – Esailija

3

использовать счетчик:

var someArray = ['file1', 'file2', 'file3']; 
var still_processing = someArray.length; 

someArray.forEach(function(file) { 
    fs.createReadStream(file) 
     .pipe(/* do some stuff */) 
     .on('end', function() { 
      still_processing--; 

      if (!still_processing) { 
       // done 
      } 
     }); 
} 

Это является основным механизмом. Эта схема последовательности операций управления инкапсулируется с помощью функции async.parallel() в async.js:

var someArray = ['file1', 'file2', 'file3']; 
var streams_to_process = []; 

someArray.forEach(function(file) { 
    streams_to_process.push(function(callback) { 
     var result = ""; 
     fs.createReadStream(file) 
      .pipe(/* do some stuff */) 
      .on('end', function() { 
       callback(null, result); 
      }); 
    }); 
}); 

async.parallel(streams_to_process, function(err, results) { 
    // all done 
}); 

Внутри async.parallel использует счетчик захваченного в затворе, чтобы отслеживать, когда все процессы в асинхронных (этом случае «конец» события) сделано.

Для этого есть другие библиотеки. Большинство библиотек с обещаниями, например, предоставляют метод .all(), который работает одинаково - внутренне отслеживая значение счетчика и активируя обратный вызов .then(), когда все будет сделано.

+0

А, ну, по сути, мое первоначальное представление о счетчиках было правильным? Таким образом, 'callback вызову' parallel() будет вызываться, когда все файлы находятся в буфере? – Dominik

+1

Просто используйте 'async.map', когда у вас есть массив для работы с ... – Bergi

+0

@Bergi' map' имеет разные параметры и должен выглядеть иначе. Я пытался заставить это работать, но пока не удалось. – Dominik

Смежные вопросы