2015-03-27 2 views
2

Я не могу на всю жизнь понять, как реализовать stream, который правильно обрабатывает противодавление. Если вы никогда не используете паузу и не возобновляете?Как реализовать поток, который правильно обрабатывает противодавление в node.js?

У меня есть эта реализация Я пытаюсь получить, чтобы работать правильно:

var StreamPeeker = exports.StreamPeeker = function(myStream, callback) { 
    stream.Readable.call(this, {highWaterMark: highWaterMark}) 
    this.stream = myStream 

    myStream.on('readable', function() { 
     var data = myStream.read(5000) 
     //process.stdout.write("Eff: "+data) 
     if(data !== null) { 
      if(!this.push(data)) { 
       process.stdout.write("Pause") 
       this.pause() 
      } 
      callback(data) 
     } 
    }.bind(this)) 

    myStream.on('end', function() { 
     this.push(null) 
    }.bind(this)) 
} 
util.inherits(StreamPeeker, stream.Readable) 
StreamPeeker.prototype._read = function() { 
    process.stdout.write("resume") 
    //this.resume() // putting this in for some reason causes the stream to not output??? 
} 

Это правильно посылает выход, но не правильно производить противодавление. Как я могу изменить его для правильной поддержки противодавления?

ответ

2

Хорошо, я наконец понял это после многих проб и ошибок. Пара руководящих принципов:

  • Никогда не используйте паузы или резюме (в противном случае он будет идти в наследство «течет» режим)
  • Никогда не добавить «данных» слушателя событий (в противном случае он будет идти в наследство «течет "режим)
  • своей ответственности реализатора, чтобы отслеживать, когда источник является читаемым
  • Его ответственность реализатора, чтобы отслеживать, когда адресат хочет больше данных
  • реализация не должен прочитать все данные до метода _read называется
  • Аргумент read указывает источнику, что он содержит много байтов, вероятно, лучше передать аргумент, переданный this._read в метод источника read. Таким образом, вы должны иметь возможность настроить, сколько нужно читать за раз в пункте назначения, а остальная часть цепочки потоков должна быть автоматической.

Так это то, что я изменил его:

Update: Я создал читаемый, что гораздо проще реализовать с помощью правильного обратного давления и должны иметь столько же гибкость, как нативные потоки узла ,

var Readable = stream.Readable 
var util = require('util') 

// an easier Readable stream interface to implement 
// requires that subclasses: 
    // implement a _readSource function that 
     // * gets the same parameter as Readable._read (size) 
     // * should return either data to write, or null if the source doesn't have more data yet 
    // call 'sourceHasData(hasData)' when the source starts or stops having data available 
    // calls 'end()' when the source is out of data (forever) 
var Stream666 = {} 
Stream666.Readable = function() { 
    stream.Readable.apply(this, arguments) 
    if(this._readSource === undefined) { 
     throw new Error("You must define a _readSource function for an object implementing Stream666") 
    } 

    this._sourceHasData = false 
    this._destinationWantsData = false 
    this._size = undefined // can be set by _read 
} 
util.inherits(Stream666.Readable, stream.Readable) 
Stream666.Readable.prototype._read = function(size) { 
    this._destinationWantsData = true 
    if(this._sourceHasData) { 
     pushSourceData(this, size) 
    } else { 
     this._size = size 
    } 
} 
Stream666.Readable.prototype.sourceHasData = function(_sourceHasData) { 
    this._sourceHasData = _sourceHasData 
    if(_sourceHasData && this._destinationWantsData) { 
     pushSourceData(this, this._size) 
    } 
} 
Stream666.Readable.prototype.end = function() { 
    this.push(null) 
} 
function pushSourceData(stream666Readable, size) { 
    var data = stream666Readable._readSource(size) 
    if(data !== null) { 
     if(!stream666Readable.push(data)) { 
      stream666Readable._destinationWantsData = false 
     } 
    } else { 
     stream666Readable._sourceHasData = false 
    } 
}  

// creates a stream that can view all the data in a stream and passes the data through 
// correctly supports backpressure 
// parameters: 
    // stream - the stream to peek at 
    // callback - called when there's data sent from the passed stream 
var StreamPeeker = function(myStream, callback) { 
    Stream666.Readable.call(this) 
    this.stream = myStream 
    this.callback = callback 

    myStream.on('readable', function() { 
     this.sourceHasData(true) 
    }.bind(this)) 
    myStream.on('end', function() { 
     this.end() 
    }.bind(this)) 
} 
util.inherits(StreamPeeker, Stream666.Readable) 
StreamPeeker.prototype._readSource = function(size) { 
    var data = this.stream.read(size) 
    if(data !== null) { 
     this.callback(data) 
     return data 
    } else { 
     this.sourceHasData(false) 
     return null 
    } 
} 

Старый Ответ:

// creates a stream that can view all the data in a stream and passes the data through 
// correctly supports backpressure 
// parameters: 
    // stream - the stream to peek at 
    // callback - called when there's data sent from the passed stream 
var StreamPeeker = exports.StreamPeeker = function(myStream, callback) { 
    stream.Readable.call(this) 
    this.stream = myStream 
    this.callback = callback 
    this.reading = false 
    this.sourceIsReadable = false 

    myStream.on('readable', function() { 
     this.sourceIsReadable = true 
     this._readMoreData() 
    }.bind(this)) 

    myStream.on('end', function() { 
     this.push(null) 
    }.bind(this)) 
} 
util.inherits(StreamPeeker, stream.Readable) 
StreamPeeker.prototype._read = function() { 
    this.reading = true 
    if(this.sourceIsReadable) { 
     this._readMoreData() 
    } 
} 
StreamPeeker.prototype._readMoreData = function() { 
    if(!this.reading) return; 

    var data = this.stream.read() 
    if(data !== null) { 
     if(!this.push(data)) { 
      this.reading = false 
     } 
     this.callback(data) 
    } 
} 
Смежные вопросы