Это краткий пример реализации пользовательского читаемого потока. Класс называется MyStream. Поток получает имя файла/папки из каталога и толкает значения в событие данных.Как вызвать асинхронную функцию внутри читаемого потока node.js
Для сравнения я реализовал (в этом примере) два разных способа/функций. Один синхронный, а другой - асинхронный. Второй аргумент конструктора позволяет решить, какой путь используется (правда, для асинхронных и ложным для синхронного.
readcounter подсчитывает количество раз метод _read называется. Просто чтобы дать обратную связь.
var Readable = require('stream').Readable;
var util = require('util');
var fs = require('fs');
util.inherits(MyStream, Readable);
function MyStream(dirpath, async, opt) {
Readable.call(this, opt);
this.async = async;
this.dirpath = dirpath;
this.counter = 0;
this.readcounter = 0;
}
MyStream.prototype._read = function() {
this.readcounter++;
if (this.async === true){
console.log("Readcounter: " + this.readcounter);
that = this;
fs.readdir(this.dirpath,function(err, files){
that.counter ++;
console.log("Counter: " + that.counter);
for (var i = 0; i < files.length; i++){
that.push(files[i]);
}
that.push(null);
});
} else {
console.log("Readcounter: " + this.readcounter);
files = fs.readdirSync(this.dirpath)
for (var i = 0; i < files.length; i++){
this.push(files[i]);
};
this.push(null);
}
};
//Instance for a asynchronous call
mystream = new MyStream('C:\\Users', true);
mystream.on('data', function(chunk){
console.log(chunk.toString());
});
синхронный способ работает, как ожидалось, но что-то интересное происходит, когда я называю его асинхронно. Everytime имя файла проталкивается через that.push(files[i])
_read метод вызывается снова. Что приводит к ошибкам, когда первый асинхронный цикл завершается и that.push(null)
определяет конец потока.
Окружающая среда, которую я использую для проверки этого: узел 4.1.1, электрон 0.35.2.
Я не понимаю, почему _read называется так из-за этого и почему это происходит. Может быть, это ошибка? Или есть то, чего я сейчас не вижу. Есть ли способ создать читаемый поток, используя асинхронные функции? Нажатие кусков асинхронно было бы действительно круто, потому что это был бы неблокирующий поток. Специально, когда у вас больше данных.
В этом случае метод _read не вызывается каждый раз при нажатии данных. Если вы используете один и тот же класс только с синхронным readdir, счетчик остается 1. Ваш код wokring использует только читаемый класс потока, но не создает новый поток. Но я попытаюсь работать с функцией async не внутри метода _read. – apxp
'rs' - это поток, разница в том, что вы расширяете' Readable'. Когда вы это сделаете, вы должны называть 'push()' только один раз в любом пути выполнения в '_read'. Ваш код может работать, если вы храните 'файлы' на уровне объекта (this.files) и сохраняете индекс, чтобы узнать, какой элемент вы должны нажать дальше. Я могу опубликовать «исправленную» версию вашего кода, если вы хотите. – Shanoor
Нет, вам не нужно исправлять код. Это просто пример. Предложение с индексом работает с моим примером, но что, если функция занимает некоторое время, возвращает 10.000 значений и нуждается в некоторых ресурсах процессора. Чем не оптимально называть _read 10 000 раз и фильтровать только одно значение из этого. – apxp