2017-02-20 7 views
0

Я знаком с потоками узлов, но я борюсь за лучшие методы абстрагирования кода, которые я многократно использую в одном шаге.Узел - этапы этапов абстрагирования в функции

Вот усеченную версию о том, что я пишу сегодня:

inputStream 
.pipe(csv.parse({columns:true}) 
.pipe(csv.transform(function(row) {return transform(row); })) 
.pipe(csv.stringify({header: true}) 
.pipe(outputStream); 

Реальная работа происходит в transform(). Единственные вещи, которые действительно меняются: inputStream, transform() и outputStream. Как я уже сказал, это урезанная версия того, что я на самом деле использую. У меня много обработки ошибок и регистрации на каждом этапе шага, что в конечном итоге, почему я пытаюсь абстрагироваться от кода.

Что я ищу, чтобы написать это один шаг трубы, например, так:

inputStream 
.pipe(csvFunction(transform(row))) 
.pipe(outputStream); 

То, что я изо всех сил, чтобы понять, как превратить эти шаги труб в одну функцию, которая принимает поток и возвращает поток. Я смотрел на библиотеки, как через2, но я не уверен, как это получается, когда я пытаюсь идти.

ответ

0

Вот что я в конечном итоге происходит с. Я использовал библиотеку through2 и streaming API of the csv library, чтобы создать функцию трубы, которую я искал.

var csv = require('csv'); 
    through = require('through2'); 

module.exports = function(transformFunc) { 
    parser = csv.parse({columns:true, relax_column_count:true}), 
    transformer = csv.transform(function(row) { 
     return transformFunc(row); 
    }), 
    stringifier = csv.stringify({header: true}); 

    return through(function(chunk,enc,cb){ 
     var stream = this; 

      parser.on('data', function(data){ 
       transformer.write(data); 
      }); 

      transformer.on('data', function(data){ 
       stringifier.write(data); 
      }); 

      stringifier.on('data', function(data){ 
       stream.push(data); 
      }); 

      parser.write(chunk); 

      parser.removeAllListeners('data'); 
      transformer.removeAllListeners('data'); 
      stringifier.removeAllListeners('data'); 
      cb(); 
    }) 
} 

Стоит отметить ту часть, где я извлекаю слушателей событий к концу, это было связано с нарваться ошибок памяти, где я создал слишком много слушателей событий. Сначала я попытался решить эту проблему, прослушивая события с once, но это предотвратило чтение последующих блоков и переход к следующему этапу.

Дайте мне знать, если у кого есть отзывы или дополнительные идеи.

2

Вы можете использовать PassThrough класс как это:

var PassThrough = require('stream').PassThrough; 

var csvStream = new PassThrough(); 
csvStream.on('pipe', function (source) { 
    // undo piping of source 
    source.unpipe(this); 
    // build own pipe-line and store internally 
    this.combinedStream = 
    source.pipe(csv.parse({columns: true})) 
     .pipe(csv.transform(function (row) { 
     return transform(row); 
     })) 
     .pipe(csv.stringify({header: true})); 
}); 

csvStream.pipe = function (dest, options) { 
    // pipe internal combined stream to dest 
    return this.combinedStream.pipe(dest, options); 
}; 

inputStream 
    .pipe(csvStream) 
    .pipe(outputStream); 
+0

Как передать ссылку преобразования (строки) в поток? Разве мне не лучше использовать поток трансформации? Я читал, что PassThrough больше предназначен для мониторинга потока, чем на самом деле работает над ним? – AdamPat

+0

Зависит от того, как вы упаковываете обработку потока. Просто поставьте функцию в том месте, где она видна, или передайте ее в качестве ссылки. И, конечно, вы можете реализовать свое решение в потоке преобразования (посмотрите здесь https://nodejs.org/api/stream.html#stream_implementing_a_transform_stream). Показ идеи с потоком PassThrough был самым простым способом, который я нашел. – Marc

Смежные вопросы