2015-12-03 2 views
3

Это краткий пример реализации пользовательского читаемого потока. Класс называется MyStream. Поток получает имя файла/папки из каталога и толкает значения в событие данных.Как вызвать асинхронную функцию внутри читаемого потока node.js

Для сравнения я реализовал (в этом примере) два разных способа/функций. Один синхронный, а другой - асинхронный. Второй аргумент конструктора позволяет решить, какой путь используется (правда, для асинхронных и ложным для синхронного.

readcounter подсчитывает количество раз метод _read называется. Просто чтобы дать обратную связь.

var Readable = require('stream').Readable; 
var util = require('util'); 
var fs = require('fs'); 
util.inherits(MyStream, Readable); 

function MyStream(dirpath, async, opt) { 
    Readable.call(this, opt); 
    this.async = async; 
    this.dirpath = dirpath; 
    this.counter = 0; 
    this.readcounter = 0; 
} 

MyStream.prototype._read = function() { 
    this.readcounter++; 
    if (this.async === true){ 
    console.log("Readcounter: " + this.readcounter); 
    that = this; 
    fs.readdir(this.dirpath,function(err, files){ 
     that.counter ++; 
     console.log("Counter: " + that.counter); 
     for (var i = 0; i < files.length; i++){ 
     that.push(files[i]); 
     } 
     that.push(null); 
    }); 
    } else { 
    console.log("Readcounter: " + this.readcounter); 
    files = fs.readdirSync(this.dirpath) 
    for (var i = 0; i < files.length; i++){ 
     this.push(files[i]); 
    }; 
    this.push(null); 
    } 
}; 
//Instance for a asynchronous call 
mystream = new MyStream('C:\\Users', true); 
mystream.on('data', function(chunk){ 
    console.log(chunk.toString()); 
}); 

синхронный способ работает, как ожидалось, но что-то интересное происходит, когда я называю его асинхронно. Everytime имя файла проталкивается через that.push(files[i]) _read метод вызывается снова. Что приводит к ошибкам, когда первый асинхронный цикл завершается и that.push(null) определяет конец потока.

Окружающая среда, которую я использую для проверки этого: узел 4.1.1, электрон 0.35.2.

Я не понимаю, почему _read называется так из-за этого и почему это происходит. Может быть, это ошибка? Или есть то, чего я сейчас не вижу. Есть ли способ создать читаемый поток, используя асинхронные функции? Нажатие кусков асинхронно было бы действительно круто, потому что это был бы неблокирующий поток. Специально, когда у вас больше данных.

ответ

1

_read вызывается всякий раз, когда «читателю» нужны данные, и это обычно происходит сразу после нажатия данных.

У меня были такие же «проблемы» с реализацией _read прямо сейчас, я пишу функцию, возвращающую объект потока. Он работает неплохо, и данные не могут быть «вытащены» из моего потока, данные доступны/отталкиваются, когда я это решаю. С вашим примером, я хотел бы сделать это следующим образом:

var Readable = require('stream').Readable; 
var fs = require('fs'); 

function MyStream(dirpath, async, opt) { 
    var rs = new Readable(); 
    // needed to avoid "Not implemented" exception 
    rs._read = function() { 
    // console.log('give me data!'); // << this will print after every console.log(folder); 
    }; 

    var counter = 0; 
    var readcounter = 0; 

    if (async) { 
    console.log("Readcounter: " + readcounter); 
    fs.readdir(dirpath, function (err, files) { 
     counter++; 
     console.log("Counter: " + counter); 
     for (var i = 0; i < files.length; i++) { 
     rs.push(files[i]); 
     } 
     rs.push(null); 
    }); 
    } else { 
    console.log("Readcounter: " + readcounter); 
    files = fs.readdirSync(dirpath) 
    for (var i = 0; i < files.length; i++) { 
     rs.push(files[i]); 
    }; 
    rs.push(null); 
    } 

    return rs; 
} 

var mystream = MyStream('C:\\Users', true); 
mystream.on('data', function (chunk) { 
    console.log(chunk.toString()); 
}); 

Он не сразу ответить на ваш вопрос, но это способ получить рабочий код.

+0

В этом случае метод _read не вызывается каждый раз при нажатии данных. Если вы используете один и тот же класс только с синхронным readdir, счетчик остается 1. Ваш код wokring использует только читаемый класс потока, но не создает новый поток. Но я попытаюсь работать с функцией async не внутри метода _read. – apxp

+1

'rs' - это поток, разница в том, что вы расширяете' Readable'. Когда вы это сделаете, вы должны называть 'push()' только один раз в любом пути выполнения в '_read'. Ваш код может работать, если вы храните 'файлы' на уровне объекта (this.files) и сохраняете индекс, чтобы узнать, какой элемент вы должны нажать дальше. Я могу опубликовать «исправленную» версию вашего кода, если вы хотите. – Shanoor

+0

Нет, вам не нужно исправлять код. Это просто пример. Предложение с индексом работает с моим примером, но что, если функция занимает некоторое время, возвращает 10.000 значений и нуждается в некоторых ресурсах процессора. Чем не оптимально называть _read 10 000 раз и фильтровать только одно значение из этого. – apxp

Смежные вопросы