0

Как я могу вывести результат агрегации MongoDB в коллекцию, не заменяя сборку из другого результата агрегации?Совокупность из нескольких коллекций

мне нужно получить данные только с $ из: «tempCollection», потому что у меня есть 500 млн документов, а также получение pipeline stage limit

var q = [ 
    {$match: query}, 
    {$group: {_id: '$hash'}}, 
    {$out: 'tempCollection'} 
]; 

async.parallel([ 
    function(callback) { 
    firstCollection.aggregate(q, callback); 
    }, 
    function(callback) { 
    secondCollection.aggregate(q, callback); 
    }, 

    ... 

], function() { 

    // I want to get all from tempCollection (with pagination) here 

}); 
+1

Неправильная конструкция вашего вопроса. '$ out' ** всегда ** заменяется. Что вы хотите сделать здесь на самом деле? «Добавить» оба результата в одной коллекции? Или «слияние» на основе определенных общих значений «накапливает» другие значения из обоих результатов? Также укажите, является ли это основным драйвером узла или чем-то другим, например, мангустом или монахом или чем-то еще. –

+0

Я использую Mongoose. Мне нужно каким-либо образом получить все отличительные значения хэша (слияние или запись в одной коллекции или т. Д.). –

+0

Выберите один. «merge» - означает, что у вас есть общий «ключ» или поля, составляющие «ключ», и вы намерены «увеличивать» другое значение, когда тот же ключ найден. «concatenating» - означает, что вы просто хотите, чтобы оба набора результатов попадали в одну коллекцию. Обратите внимание, что в последнем случае «ключ» действительно должен быть другим или искусственным. –

ответ

1

Суть здесь в том, что $out вариант только когда-либо «заменяет» вывода в целевой коллекции. Поэтому, чтобы делать что-либо еще, вы должны работать через клиентское соединение, а не просто выводить на сервер.

Ваш лучший вариант здесь с mongoose заключается в том, чтобы перейти прямо в базовый драйвер и получить доступ к node stream interface, как поддерживается водителем.

Trival пример, но он показывает основной путь к структуре:

var async = require('async'), 
    mongoose = require('mongoose'), 
    Schema = mongoose.Schema; 

mongoose.connect('mongodb://localhost/aggtest'); 

var testSchema = new Schema({},{ "_id": false, strict: false }); 


var ModelA = mongoose.model('ModelA', testSchema), 
    ModelB = mongoose.model('ModelB', testSchema), 
    ModelC = mongoose.model('ModelC', testSchema); 

function processCursor(cursor,target,callback) { 

    cursor.on("end",callback); 
    cursor.on("error",callback); 

    cursor.on("data",function(data) { 
    cursor.pause(); 
    target.update(
     { "_id": data._id }, 
     { "$setOnInsert": { "_id": data._id } }, 
     { "upsert": true }, 
     function(err) { 
     if (err) callback(err); 
     cursor.resume(); 
     } 
    ); 
    }); 
} 

async.series(
    [ 
    // Clean data 
    function(callback) { 
     async.each([ModelA,ModelB,ModelC],function(model,callback) { 
     model.remove({},callback); 
     },callback); 
    }, 

    // Sample data 
    function(callback) { 
     async.each([ModelA,ModelB],function(model,callback) { 
     async.each([1,2,3],function(id,callback) { 
      model.create({ "_id": id },callback); 
     },callback); 
     },callback); 
    }, 

    // Run merge 
    function(callback) { 
     async.parallel(
     [ 
      function(callback) { 
      var cursor = ModelA.collection.aggregate(
       [ 
       { "$group": { "_id": "$_id" } } 
       ], 
       { "batchSize": 25 } 
      ); 

      processCursor(cursor,ModelC,callback) 
      }, 
      function(callback) { 

      var cursor = ModelB.collection.aggregate(
       [ 
       { "$group": { "_id": "$_id" } } 
       ], 
       { "batchSize": 25 } 
      ); 

      processCursor(cursor,ModelC,callback) 
      } 
     ], 
     callback 
    ); 
    }, 

    // Get merged 
    function(callback) { 
     ModelC.find({},function(err,results) { 
     console.log(results); 
     callback(err); 
     }); 
    } 
    ], 
    function(err) { 
    if (err) throw err; 
    mongoose.disconnect(); 
    } 
); 

Oustide этого, то вы будете нуждаться в $out на «отдельные» коллекции, а затем объединить их с такой же .update() процесс, но чтобы сохранить его «серверной стороной», вам необходимо использовать .eval().

Это нехорошо, но это единственный способ сохранить работу на сервере. Вы также можете изменить это с помощью операций "Bulk" (опять же через тот же собственный интерфейс .collection) для большей пропускной способности. Но варианты сводятся к «чтению через клиента» или «eval».

Смежные вопросы