2016-12-26 5 views
0

У меня есть бесконечный поток данных из раздвоенного процесса. Я хочу, чтобы этот поток обрабатывался модулем, и иногда я хочу дублировать данные из этого потока для обработки другим модулем (например, отслеживать поток данных, но если что-то интересное происходит, я хочу записать следующие n байтов в файл для дальнейшее расследование).NodeJS Разделение потока

Итак, давайте предположим следующий сценарий:

  1. Я запустить программу и начать потреблять читаемый поток
  2. 2 секунды позже я хочу обрабатывать одни и те же данные в течение 1 сек другим потоком чтения
  3. Как только время закончится, я хочу закрыть второго потребителя, но первоначальный потребитель должен оставаться нетронутым.

Вот фрагмент кода для этого:

var stream = process.stdout; 

stream.pipe(detector); // Using the first consumer 

function startAnotherConsumer() { 
    stream2 = new PassThrough(); 
    stream.pipe(stream2); 

    // use stream2 somewhere else 
} 

function stopAnotherConsumer() { 
    stream.unpipe(stream2); 
} 

Моя проблема в том, что unpiping в stream2 не получает его закрытия. Если я позвоню stream.end() после команды unpipe, то он падает с ошибкой:

events.js:160 
     throw er; // Unhandled 'error' event 
    ^

Error: write after end 
    at writeAfterEnd (_stream_writable.js:192:12) 
    at PassThrough.Writable.write (_stream_writable.js:243:5) 
    at Socket.ondata (_stream_readable.js:555:20) 
    at emitOne (events.js:101:20) 
    at Socket.emit (events.js:188:7) 
    at readableAddChunk (_stream_readable.js:176:18) 
    at Socket.Readable.push (_stream_readable.js:134:10) 
    at Pipe.onread (net.js:548:20) 

Я даже попытался приостановить поток источника, чтобы помочь буферу промывать из второго потока, но это не сработало:

function stopAnotherConsumer() { 
    stream.pause(); 
    stream2.once('unpipe', function() { 
     stream.resume(); 
     stream2.end(); 
    }); 
    stream.unpipe(stream2); 
} 

Такая же ошибка, как раньше (напишите после окончания).

Как решить проблему? Мое первоначальное намерение состоит в том, чтобы дублировать потоковые данные из одной точки, а затем закрыть второй поток через некоторое время.

Note: I tried to use this answer to make it work.

ответ

0

Поскольку не было ответов, я размещаю свое (лоскутное) решение. Если у кого-то есть лучший, не держите его обратно.

Новый поток:

const Writable = require('stream').Writable; 
const Transform = require('stream').Transform; 

class DuplicatorStream extends Transform { 
    constructor(options) { 
     super(options); 

     this.otherStream = null; 
    } 

    attachStream(stream) { 
     if (!stream instanceof Writable) { 
      throw new Error('DuplicatorStream argument is not a writeable stream!'); 
     } 

     if (this.otherStream) { 
      throw new Error('A stream is already attached!'); 
     } 

     this.otherStream = stream; 
     this.emit('attach', stream); 
    } 

    detachStream() { 
     if (!this.otherStream) { 
      throw new Error('No stream to detach!'); 
     } 

     let stream = this.otherStream; 
     this.otherStream = null; 
     this.emit('detach', stream); 
    } 

    _transform(chunk, encoding, callback) { 
     if (this.otherStream) { 
      this.otherStream.write(chunk); 
     } 

     callback(null, chunk); 
    } 
} 

module.exports = DuplicatorStream; 

И использование:

var stream = process.stdout; 
var stream2; 

duplicatorStream = new DuplicatorStream(); 
stream.pipe(duplicatorStream); // Inserting my duplicator stream in the chain 
duplicatorStream.pipe(detector); // Using the first consumer 

function startAnotherConsumer() { 
    stream2 = new stream.PassThrough(); 
    duplicatorStream.attachStream(stream2); 

    // use stream2 somewhere else 
} 

function stopAnotherConsumer() { 
    duplicatorStream.once('detach', function() { 
     stream2.end(); 
    }); 
    duplicatorStream.detachStream(); 
} 
Смежные вопросы