2013-08-08 5 views
36

Использование mongoskin, я могу сделать запрос, как это, который будет возвращать курсор:Перебор MongoDB курсора последовательно (ожидание обратного вызова, прежде чем перейти к следующему документу)

myCollection.find({}, function(err, resultCursor) { 
     resultCursor.each(function(err, result) { 

     } 
} 

Однако, я хотел бы вызывать некоторые асинхронные функции для каждого документа и только перейти к следующему элементу курсора после того, как он перезвонил (аналогично структуре eachSeries в модуле async.js). Например:

myCollection.find({}, function(err, resultCursor) { 
     resultCursor.each(function(err, result) { 

      externalAsyncFunction(result, function(err) { 
       //externalAsyncFunction completed - now want to move to next doc 
      }); 

     } 
} 

Как я могу это сделать?

Благодаря

UPDATE:

Я не wan't использовать toArray(), поскольку это большая партия операция, и результаты могут не поместиться в памяти на одном дыхании.

+0

Если вы блокируете и ожидаете завершения функции async перед перемещением, то в чем смысл асинхронного вызова? –

+0

@RotemHermon У меня нет выбора! Это не моя функция, и это асинхронно. (Переименует myAsyncFunction в externalAsyncFunction ...) – UpTheCreek

+0

Почему вы не используете 'toArray()', а затем рекурсивную функцию для итерации результата? –

ответ

45

Если вы не хотите, чтобы загрузить все результаты в памяти с помощью ToArray, вы можете перемещаться с помощью курсора с чем-то вроде следующего.

myCollection.find({}, function(err, resultCursor) { 
    function processItem(err, item) { 
    if(item === null) { 
     return; // All done! 
    } 

    externalAsyncFunction(item, function(err) { 
     resultCursor.nextObject(processItem); 
    }); 

    } 

    resultCursor.nextObject(processItem); 
} 
+0

А я вижу, спасибо тимофеям! – UpTheCreek

+11

Этот метод не работал для меня для большого набора данных. Я получаю «RangeError: Максимальный размер стека вызовов». –

+0

@SoichiHayashi Существует ряд вещей, которые вызовут RangeError, но приведенный выше пример не должен его бросать. Возможно, если вы предоставите более подробную информацию в качестве отдельного вопроса, я могу помочь вам разобраться, где это происходит. –

0

Вы можете получить результат в Array и итерации с использованием рекурсивной функции, что-то вроде этого.

myCollection.find({}).toArray(function (err, items) { 
    var count = items.length; 
    var fn = function() { 
     externalAsyncFuntion(items[count], function() { 
      count -= 1; 
      if (count) fn(); 
     }) 
    } 

    fn(); 
}); 
+0

Извините, я слишком медленно отвечал на ваш вопрос в комментариях - я не могу использовать toArray, поскольку набор результатов слишком велик. – UpTheCreek

+0

О, хорошо. Тогда другой ответ подходит для вас. –

+0

Хотя это правда, лучше избегать этого шаблона, поскольку он может сломаться по мере роста данных. –

1

Вы можете сделать что-то подобное с помощью async lib. Ключевым моментом здесь является проверка того, является ли текущий документ нулевым. Если это так, значит, вы закончили.

async.series([ 
     function (cb) { 
      cursor.each(function (err, doc) { 
       if (err) { 
        cb(err); 
       } else if (doc === null) { 
        cb(); 
       } else { 
        console.log(doc); 
        array.push(doc); 
       } 
      }); 
     } 
    ], function (err) { 
     callback(err, array); 
    }); 
+0

Привет Антуан - проблема, с которой я столкнулась с этим подходом, состоит в том, что если вам нужно сделать что-то для каждой записи асинхронно, тогда цикл курсора не будет ждать, пока это не будет выполнено. (Cursor.each не обеспечивает «следующий» обратный вызов, поэтому в нем возможны только операции синхронизации). – UpTheCreek

-2

Вы можете использовать простые функции setTimeOut. Это пример в машинописном беге на nodejs (я использую обещания через «когда» модуль, но это может быть сделано без них, а):

 import mongodb = require("mongodb"); 

     var dbServer = new mongodb.Server('localhost', 27017, {auto_reconnect: true}, {}); 
     var db = new mongodb.Db('myDb', dbServer); 

     var util = require('util'); 
     var when = require('when'); //npm install when 

     var dbDefer = when.defer(); 
     db.open(function() { 
      console.log('db opened...'); 
      dbDefer.resolve(db); 
     }); 

     dbDefer.promise.then(function(db : mongodb.Db){ 
      db.collection('myCollection', function (error, dataCol){ 
       if(error) { 
        console.error(error); return; 
       } 

       var doneReading = when.defer(); 

       var processOneRecordAsync = function(record) : When.Promise{ 
        var result = when.defer(); 

        setTimeout (function() { 
         //simulate a variable-length operation 
         console.log(util.inspect(record)); 
         result.resolve('record processed'); 
        }, Math.random()*5); 

        return result.promise; 
       } 

       var runCursor = function (cursor : MongoCursor){ 
        cursor.next(function(error : any, record : any){ 
         if (error){ 
          console.log('an error occurred: ' + error); 
          return; 
         } 
         if (record){ 
          processOneRecordAsync(record).then(function(r){ 
           setTimeout(function() {runCursor(cursor)}, 1); 
          }); 
         } 
         else{ 
          //cursor up 
          doneReading.resolve('done reading data.'); 
         } 
        }); 
       } 

       dataCol.find({}, function(error, cursor : MongoCursor){ 
        if (!error) 
        { 
         setTimeout(function() {runCursor(cursor)}, 1); 
        } 
       }); 

       doneReading.promise.then(function(message : string){ 
        //message='done reading data' 
        console.log(message); 
       }); 
      }); 
     }); 
0

Вы можете использовать будущий:

myCollection.find({}, function(err, resultCursor) { 
    resultCursor.count(Meteor.bindEnvironment(function(err,count){ 
     for(var i=0;i<count;i++) 
     { 
      var itemFuture=new Future(); 

      resultCursor.nextObject(function(err,item)){ 
       itemFuture.result(item); 
      } 

      var item=itemFuture.wait(); 
      //do what you want with the item, 
      //and continue with the loop if so 

     } 
    })); 
}); 
4

Если кто-то ищет способ Promise сделать это (в отличие от использования обратных вызовов nextObject), вот оно. Я использую Node v4.2.2 и драйвер mongo v2.1.7. Это своего рода версия asyncSeries из Cursor.forEach():

function forEachSeries(cursor, iterator) { 
    return new Promise(function(resolve, reject) { 
    var count = 0; 
    function processDoc(doc) { 
     if (doc != null) { 
     count++; 
     return iterator(doc).then(function() { 
      return cursor.next().then(processDoc); 
     }); 
     } else { 
     resolve(count); 
     } 
    } 
    cursor.next().then(processDoc); 
    }); 
} 

Чтобы использовать эту функцию, передать курсор и итератор, который работает на каждом документе асинхронно (как вы бы для Cursor.forEach). Итератору нужно возвратить обещание, как и большинство встроенных функций драйвера mongodb.

Скажите, вы хотите обновить все документы в коллекции test.Это, как вы могли бы сделать это:

var theDb; 
MongoClient.connect(dbUrl).then(function(db) { 
    theDb = db;  // save it, we'll need to close the connection when done. 
    var cur = db.collection('test').find(); 

    return forEachSeries(cur, function(doc) { // this is the iterator 
    return db.collection('test').updateOne(
     {_id: doc._id}, 
     {$set: {updated: true}}  // or whatever else you need to change 
    ); 
    // updateOne returns a promise, if not supplied a callback. Just return it. 
    }); 
}) 
.then(function(count) { 
    console.log("All Done. Processed", count, "records"); 
    theDb.close(); 
}) 
+0

Я не вижу, где вызывается 'forEachSeries'. – chovy

+0

Переполнение стека вызовов. – chovy

6

Это работает с большим набором данных с помощью setImmediate:

var cursor = collection.find({filter...}).cursor(); 

cursor.nextObject(function fn(err, item) { 
    if (err || !item) return; 

    setImmediate(fnAction, item, arg1, arg2, function() { 
     cursor.nextObject(fn); 
    }); 
}); 

function fnAction(item, arg1, arg2, callback) { 
    // Here you can do whatever you want to do with your item. 
    return callback(); 
} 
14

более современный подход, который использует async/await:

const cursor = db.collection("foo").find({}); 
while(await cursor.hasNext()) { 
    const doc = await cursor.next(); 
    // process doc here 
} 

Примечания :

  • Это может быть еще больше простой в использовании, если async iterators прибыть.
  • Возможно, вам захочется добавить try/catch для проверки ошибок.
  • Содержащая функция должна быть async или код должен быть обернут в (async function() { ... })(), так как он использует await.
  • Если вы хотите, добавьте await new Promise(resolve => setTimeout(resolve, 1000)); (пауза в течение 1 секунды) в конце цикла while, чтобы показать, что он обрабатывает документы один за другим.
+0

Работал отлично, спасибо. Любая идея, если есть какие-то подводные камни с большими наборами данных? – FireBrand

+1

отлично, это лучшее решение, в отличие от выбранного, который просто сработает –

Смежные вопросы