У меня есть бесконечный поток данных из раздвоенного процесса. Я хочу, чтобы этот поток обрабатывался модулем, и иногда я хочу дублировать данные из этого потока для обработки другим модулем (например, отслеживать поток данных, но если что-то интересное происходит, я хочу записать следующие n байтов в файл для дальнейшее расследование).NodeJS Разделение потока
Итак, давайте предположим следующий сценарий:
- Я запустить программу и начать потреблять читаемый поток
- 2 секунды позже я хочу обрабатывать одни и те же данные в течение 1 сек другим потоком чтения
- Как только время закончится, я хочу закрыть второго потребителя, но первоначальный потребитель должен оставаться нетронутым.
Вот фрагмент кода для этого:
var stream = process.stdout;
stream.pipe(detector); // Using the first consumer
function startAnotherConsumer() {
stream2 = new PassThrough();
stream.pipe(stream2);
// use stream2 somewhere else
}
function stopAnotherConsumer() {
stream.unpipe(stream2);
}
Моя проблема в том, что unpiping в stream2 не получает его закрытия. Если я позвоню stream.end()
после команды unpipe
, то он падает с ошибкой:
events.js:160
throw er; // Unhandled 'error' event
^
Error: write after end
at writeAfterEnd (_stream_writable.js:192:12)
at PassThrough.Writable.write (_stream_writable.js:243:5)
at Socket.ondata (_stream_readable.js:555:20)
at emitOne (events.js:101:20)
at Socket.emit (events.js:188:7)
at readableAddChunk (_stream_readable.js:176:18)
at Socket.Readable.push (_stream_readable.js:134:10)
at Pipe.onread (net.js:548:20)
Я даже попытался приостановить поток источника, чтобы помочь буферу промывать из второго потока, но это не сработало:
function stopAnotherConsumer() {
stream.pause();
stream2.once('unpipe', function() {
stream.resume();
stream2.end();
});
stream.unpipe(stream2);
}
Такая же ошибка, как раньше (напишите после окончания).
Как решить проблему? Мое первоначальное намерение состоит в том, чтобы дублировать потоковые данные из одной точки, а затем закрыть второй поток через некоторое время.
Note: I tried to use this answer to make it work.