2016-02-17 3 views
0

Я получаю поток файлов из Busboy, а затем я подключаю его к настраиваемому потоку преобразования для проверки и очистки. Он работает с небольшими файлами, но по мере того, как они становятся больше, мой пользовательский поток не дожидается завершения потока бутбоя, и он становится усеченным.Node transform stream not waiting

Вот код помощник официанта:

busboy 
.on("file", function(fieldname, file, filename, encoding, mimetype) { 
    //Creating a mongo doc first 
    Dataset.create(dataset, function (err, ds) { 
     if(err) {...} 
     else { 
      file.pipe(validateCSV)); 
     } 
    }); 

    validateCSV 
     .on("finish", function() { 
      // Send to Data Import 
      datasetService.import(validateCSV, dataset, function (err, result) { 
       ... 
      }); 
     }); 
}); 

И мой преобразование потока:

module.exports.ValidateCSV = ValidateCSV; 
function ValidateCSV(options) { 
    if (!(this instanceof ValidateCSV)) return new ValidateCSV(options); 

    if (!options) options = {}; 
    options.objectMode = true; 
    Transform.call(this, options); 
} 

util.inherits(ValidateCSV, Transform); 

ValidateCSV.prototype._transform = function (chunk, encoding, done) { 
    if (this._checked) { 
     this.push(chunk); 
    } else { 
     //Do some validation 
     var data = chunk.toString(); 
     var lines = data.match(/[^\r\n]+/g); 
     var headerline = lines[0] || ""; 
     var header = headerline.split(","); 
     ... 
     this._checked = true; 
     this.push(chunk); 
    } 
    done() 
} 

ответ

0

Оказалось, это была проблема с противодавлением и seeting вариант HighWaterMark на поток преобразования фиксированного его. В идеале это woold устанавливается в соответствии с размером загрузки, но это исправлено для меня:

function ValidateCSV(options) { 
    if (!(this instanceof ValidateCSV)) return new ValidateCSV(options); 

    if (!options) options = {}; 
    options.objectMode = true; 
    options.highWaterMark = 100000; 
    Transform.call(this, options); 
}