2014-12-18 2 views
1

Я пытаюсь использовать Highland.js для сценария обновления базы данных на множестве Mongoose моделей, это кажется довольно идеально подходит для QueryStream вызова на Model.find(). У меня есть некоторые синхронные вещи (обновление моей модели для соответствия новой схеме, несколько операций очистки), и в конце я хочу, чтобы документ save(). У меня есть настроенные заранее подготовленные перехватчики, которые нужно запустить, и обновления на самом деле не совместимы с прямой Model.update(). Я сумел получить его сортировки работы с помощью комбинации Q.js и нагорья:Асинхронные преобразовывает на потоках в Highland.js

var sender_stream = Sender.find({}).stream(); 
var promise_save = function(document) { 
    var deferred = Q.defer(); 
    document.save(deferred.makeNodeResolver()); 
    return _(deferred.promise); 
} 

var sender_deferred = Q.defer(); 
_(sender_stream).map(function(sender) { 
    // set some fields on sender... 
    return sender; 
}).map(promise_save).series().on('done', sender_deferred.resolve).resume(); 

Однако, это, кажется, не решить обещание, и я не уверен, что это «право» способ держать вещи приятными и потоковыми ... кажется, странно сочетать Q.js и Highland.js так глубоко. Есть ли способ лучше?

+0

Обновление: все больше похоже на проблему с неработающим событием, чем данные, которые не возвращаются. Тем не менее, не уверен, что именно так используется Хайленд. –

ответ

3

Я не знаю много о Q или Highland. Но это похоже на простой пример использования функции преобразования на querystreams.

var stream = Sender.find({}).stream({ transform: manipulate }) 

function manipulate(document) { 
    // do stuff here 
    return document; 
} 

stream.on("data", function(document) { 
    stream.pause() 
    document.save(function(error) { 
     // error handle, maybe stream.destroy([err]) if you want it to stop immediately 
     stream.resume(); 
    }); 
}); 

stream.on("error", function(err){ 
    //error handle 
}); 

stream.on("close", function(){ 
    console.log("hopefully this worked for you"); 
}); 

Функция преобразования будет выполняться на документе перед тем, как излучать событие «данные». Как только функция преобразования сделала свой материал, это возвращаемое значение отправляется в функцию «данные». Затем вы просто приостанавливаете/сохраняете/возобновляете.

1

Вместо Promise вы можете использовать функции асинхронной функции Highland: http://highlandjs.org/#async. Mongoose также возвращает обещание, так что вы можете обернуть, что с нагорья вместо стиля функции асинхронной, но по-прежнему избегать добавления Q.

Я бы рекомендовал использовать .flatMap() вместо .map() и .series(), чтобы сгладить эти потоки обратно в один поток документов. Затем добавление .done() также можно использовать для создания Thunk вместо использования .resume() в сочетании с прослушивателем событий «done».

Честно говоря, не на 100% уверен, почему у вас возникли проблемы с вызываемым событием.

var sender_stream, set_fields, save, sender_deferred; 

sender_stream = Sender.find({}).stream(); 

save = function save(document) { 
    return _(function(push, next) { 
     document.save(function(err, result) { 
      push(err, document); 
      push(null, _.nil); 
     }); 
    }); 
}; 

set_fields = function setFields(sender) { 
    // set some fields on sender... 
    return sender; 
}; 

sender_deferred = Q.defer(); 

_(sender_stream) 
    .map(setFields) 
    .flatMap(save) 
    .done(function() { 
     sender_deferred.resolve(); 
    }); 
Смежные вопросы