2014-01-24 4 views
5

Рассмотрим приведенный ниже код ... Я пытаюсь, чтобы приостановить поток после прочтения первых 5 строк:Приостановка Readline в Node.js

var fs   = require('fs'); 
var readline = require('readline'); 
var stream  = require('stream'); 
var numlines = 0; 
var instream = fs.createReadStream("myfile.json"); 
var outstream = new stream; 
var readStream = readline.createInterface(instream, outstream); 
readStream.on('line', function(line){ 
    numlines++; 
    console.log("Read " + numlines + " lines"); 
    if (numlines >= 5) { 
    console.log("Pausing stream"); 
    readStream.pause(); 
    } 
}); 

Выход (скопированный рядом) предполагает, что он держит чтения строки после пауза. Возможно, readline поставил в очередь еще несколько строк в буфере и все равно кормит их мне ... это имеет смысл, если он будет продолжать читать асинхронно в фоновом режиме, но на основе документации я не знаю, что должно быть надлежащее поведение. Любые рекомендации о том, как добиться желаемого эффекта?

Read 1 lines 
Read 2 lines 
Read 3 lines 
Read 4 lines 
Read 5 lines 
Pausing stream 
Read 6 lines 
Pausing stream 
Read 7 lines 

ответ

6

Таким образом, получается, что поток Readline имеет тенденцию к «капать» (то есть, утечка несколько дополнительных линий), даже после паузы(). В документации это не делается, но это правда.

Если вы хотите, чтобы переключатель pause() появлялся незамедлительно, вам нужно создать свой собственный буфер строки и самостоятельно собрать оставшиеся строки.

8

Несколько unintuitively, the pause methods does not stop queued up line events:

Вызов rl.pause() не сразу приостановите другие события (в том числе 'line') от излучаемого экземпляром readline.Interface.

Существует, однако модуль третьей стороной по имени line-by-line где pauseделает паузу line события, пока не будет возобновлена.

var LineByLineReader = require('line-by-line'), 
    lr = new LineByLineReader('big_file.txt'); 

lr.on('error', function (err) { 
    // 'err' contains error object 
}); 

lr.on('line', function (line) { 
    // pause emitting of lines... 
    lr.pause(); 

    // ...do your asynchronous line processing.. 
    setTimeout(function() { 

     // ...and continue emitting lines. 
     lr.resume(); 
    }, 100); 
}); 

lr.on('end', function() { 
    // All lines are read, file is closed now. 
}); 

(я не имею никакого отношения к модулю, только что нашел его полезным для решения этого вопроса.)

+0

Спасибо за этот ответ. Из интереса, насколько распространено такое требование? Я разбираю CSV на 80 ГБ, который нужно передавать на сервер. какие другие варианты использования? –

+1

@ZachSmith Я обнаружил, что возможность приостановки и возобновления во время очень полезна, когда обратный вызов не может или не должен выполняться синхронно (скажем, вставляя строки в базу данных).Если вы читаете строки быстрее, чем можете их обработать, вы можете поставить слишком много запросов и исчерпать память. –

0

добавить некоторые пункты:

.on('pause', function() { 
    console.log(numlines) 
}) 

Вы получите 5. Он упоминается в документе node.js document:

  • Входной поток не приостановлен и принимает событие SIGCONT. (См. События SIGTSTP и SIGCONT)

Итак, я создал буфер tmp в событии линии. Используйте флаг, чтобы определить, приостановлено ли оно.

.on('line', function(line) { 
    if (paused) { 
     putLineInBulkTmp(line); 
    } else { 
     putLineInBulk(line); 
    } 
} 

затем в на паузу и возобновлять:

.on('pause', function() { 
    paused = true; 
    doSomething(bulk, function(resp) { 
     // clean up bulk for the next. 
     bulk = []; 
     // clone tmp buffer. 
     bulk = clone(bulktmp); 
     bulktmp = []; 
     lr.resume(); 
    }); 
}) 
.on('resume',() => { 
    paused = false; 
}) 

Используйте этот способ справиться с такой ситуацией.

Смежные вопросы