2017-02-09 2 views
10

я с небольшой проблемой при использовании async.queue с FileStreamочередь Асинхронный, FileStream конец как знать, когда оба закончили

  1. У меня есть сценарий, где мой FileStream завершит
  2. я поставил FILEREAD в правда
  3. однако очередь будет пустой и уже называют стоком
  4. это приводит тогда мое «сделано», чтобы никогда не называли

Каков правильный способ сказать «закончить очередь» после того, как мой поток «end», и очередь пуста?

var fs = require('fs') 
 
    , util = require('util') 
 
    , stream = require('stream') 
 
    , es = require('event-stream'); 
 

 
var async = require('async'); 
 

 
var fileRead = false; 
 
var lineNr = 0; 
 

 
var q = async.queue(function(task, callback) { 
 
    task(function(err, lineData){ 
 
     responseLines.push(lineData); 
 
     callback(); 
 
     }); 
 
    }, 5); 
 

 
var q.drain = function() { 
 
    if(fileRead){ 
 
    done(null, responseLines); 
 
    } 
 
} 
 

 
var s = fs.createReadStream('very-large-file.csv') 
 
    .pipe(es.split()) 
 
    .pipe(es.mapSync(function(line){ 
 
     s.pause(); 
 
     q.push(async.apply(insertIntoDb, line)) 
 
     s.resume(); 
 
    }) 
 
    .on('error', function(err){ 
 
     done(err); 
 
    }) 
 
    .on('end', function(){ 
 
     fileRead = true; 
 
    }) 
 
);

или есть более эффективное использование асинхронных, который позволит мне это сделать? асинхронной технологическая линия по линии с возможностью выхода в начале, если одна из линий имеет ошибки

+0

Вы можете просто добавить другую задачу сразу же после того, как вы установили 'fileRead' в значение true. Я думаю, ваша проблема в том, что функция 'task', которую вы вызываете с каждым элементом очереди, вызывается и завершается до того, как событие' end' вызывается в вашем потоке. – forrestmid

ответ

1

Во-первых, я не знаю, сколько из вашего кода является псевдо-код, но var q.drain = ... не является действительным яваскрипт и должна ошибка. Это должно быть только q.drain =, поскольку вы определяете свойство на существующем объекте, не объявляя новую переменную. Это может быть причиной того, что ваша функция стока не срабатывает, если она не является псевдокодом.

Есть несколько способов добиться того, что, как я думаю, вы пытаетесь сделать. Можно было бы проверить длину очереди в вашем конечном обработчике и установить функцию стока, если есть все еще обрабатываемые элементы.

.on('end', function(){ 
    if(!q.length){ 
    callDone(); 
    } 
    else { 
    q.drain = callDone; 
    } 
}); 

function callDone(){ 
    done(null, responseLines); 
} 

Это эффективно говоря «если очередь будет обработано сделан вызов, если нет, то вызова сделан, когда у него есть!» Я уверен, что есть много способов убрать ваш код, но, надеюсь, это решение вашей конкретной проблемы.

Смежные вопросы