2014-06-07 2 views
4

Я пытаюсь обеспечить, чтобы один запрос mysql приводил к другому и не был завершен до тех пор, пока все его дочерние запросы не будут завершены. Так, например, я начинаю с одной строки select и stream и выполняю последующие запросы из результата этой строки. Это можно выполнить с обратными вызовами, но в конечном итоге у меня заканчивается нехватка памяти, поэтому я хотел бы замедлить процесс и запустить партии, но из-за асинхронного характера отправки я не могу удержать все в фазе и закончить соединение после того, как все строки обработаны.Создание синхронных запросов с узлом-mysql

Вот пример:

var query = conn.query('select id from table1 limit 10'); 

query.on('result', function(row){ 
    console.log('query1', row); 
    var query2 = conn.query('select id from books where id = ? ', [row.id]); 
    query2.on('result', function(row2){ 
     console.log('query2', row2); 
     var query3 = conn.query('insert into test (id) values (?)', [row2.id]); 
     query3.on('result', function(row3){ 
      console.log(row3); 
     }); 
    }); 
}); 

query.on('end', function(){ 
    conn.end(); 
}); 

выше терпит неудачу, потому что есть еще строки для обработки в query3 после первоначального запроса закончился.
Любые мысли? Фактический код еще сложнее, потому что я должен обработать xml из последующих запросов и сжечь еще больше вставок, когда я прохожу через пакет.

Спасибо!

ответ

2

Я хотел бы предложить это решение с async модуля:

var async = require("async"); 
// connection instance 
var conn; 

// here goes task serving logic 
// if any async function should be finished before drain callback, push them into q 
var solvers = { 
    query: function(q, task, row){ 
     console.log('query1', row); 
     q.push({ 
      solver: "query2", 
      req: "select id from books where id = ?", 
      reqArgs: [row.id] 
     }); 
    }, 
    query2: function(q, task, row){ 
     console.log('query2', row); 
     q.push({ 
      solver: "query3", 
      req: "insert into test (id) values (?)", 
      reqArgs: [row.id] 
     }); 
    }, 
    query3: function(q, task, row){ 
     console.log(row); 
    } 
} 

// here is a queue of tasks 
var q = async.queue(function(task, cb){ 
    var query = conn.query(task.req, task.reqArgs); 
    query.on("end", cb); 
    query.on("result",function(row){ 
     solvers[task.solver](q, task, row); 
    }); 
}, 2); // limit of parallel queries 

// when every request has reached "end" 
q.drain = function(){ 
    conn.end(); 
    // continue from here 
}; 

// initial task 
q.push({ 
    solver: "query", 
    req: "select id from table1 limit 10", 
    reqArgs: [] 
}); 

Но все же, я не уверен, что делает запросы ID по ID является хорошим решением.
Возможно, я просто не знаю полной проблемы.

2

@glukki, спасибо за отличный ответ и ссылку на async. Я пошел с перестановкой вашего кода и двумя асинхронными запросами, которые выполняют «chomp и chew», используя одно соединение и пул соединений для обработки более 100 тыс. Строк, вставляя вставки в строки 1.2M. Работала удивительно хорошо и заняла менее 10 минут. Вот полная реализация минус модуль и настройка соединения. Надеюсь, это тоже поможет кому-то другому. Еще раз спасибо!

function populateMesh(row, callback){  

    xmlParser.parseString('<root>'+row.mesh_heading_list+'</root>', function(err, result){ 

     var q2 = async.queue(function (task, cb) { 

      pool.getConnection(function(err, cnx){ 
       cnx.query('INSERT INTO abstract_mesh (mesh_id, abstract_id, major_topic) SELECT mesh_descriptor.id, ?, ? FROM mesh_descriptor WHERE mesh_descriptor.name = ?', [task.id, task.majorTopic, task.descriptorName], function(err, result){ 
        if (err) {throw err;} 
        cnx.release(); 
        cb(); 
       }); 
      }); 

     }, 50); 

     q2.drain = function() { 
      //console.log('all mesh processed'); 
      callback(); 
     } 

     if(!(result.root instanceof Object)){ 
      //console.log('its not obj!', row.id); 
      q2.push({id: row.id, majorTopic: 'N', descriptorName: 'Null'}, function (err) {}); 
     } 

     for(var i in result.root.MeshHeading){ 
//   console.log('in loop',result.root.MeshHeading[i].DescriptorName); 
      if(typeof result.root.MeshHeading[i].DescriptorName === 'undefined'){ 
       q2.push({id: row.id, majorTopic: 'N', descriptorName: 'Emergency'}, function(err){}); 
      } 

      for(var j in result.root.MeshHeading[i].DescriptorName){ 

       var descriptorName = result.root.MeshHeading[i].DescriptorName[j]._; 
       var majorTopic = result.root.MeshHeading[i].DescriptorName[j].$.MajorTopicYN; 

       q2.push({id: row.id, majorTopic: majorTopic, descriptorName: descriptorName}, function (err) {}); 

      } 
     } 
    });  

} 


// here goes task serving logic 
// if any async function should be finished before drain callback, push them into q 
var q = async.queue(function (row, callback) { 
     console.log('got id: ' + row.id); 
     populateMesh(row, function(){ 
      callback(); 
     }); 

    }, 10); 

    q.drain = function() { 
     console.log('all items have been processed'); 
     conn.end(function(err){ 
      console.log('connection ended'); 
     }); 
     pool.end(function(err){ 
      console.log('pool closed'); 
     }); 
    }; 

var truncate = conn.query('truncate abstract_mesh'); 

var select = conn.query('SELECT id, mesh_heading_list FROM pubtbl'); 

    select.on('result', function(result){ 
//  console.log(result); 
     q.push(result, function (err) { 
      //console.log('finished processing row'); 
     }); 
    }); 
+0

Довольно хорошо, вы поняли! ;) – glukki

2

На мой взгляд, лучшим решением является сделать код синхронно очень простым способом.

Вы можете использовать пакет «synchonize».

Просто

НПМ установить синхронизацию

Тогда var sync = require(synchronize);

Put логика, которая должна быть синхронным в волокно с помощью

sync.fiber(function() { //put your logic here }

В качестве примера для двух MySQL запросов:

var express = require('express'); 
var bodyParser = require('body-parser'); 
var mysql = require('mysql'); 
var sync = require('synchronize'); 

var db = mysql.createConnection({ 
    host  : 'localhost', 
    user  : 'user', 
    password : 'password', 
    database : 'database' 
}); 

db.connect(function(err) { 
    if (err) { 
     console.error('error connecting: ' + err.stack); 
     return; 
    } 
}); 

function saveSomething() { 
    var post = {id: newId}; 
    //no callback here; the result is in "query" 
    var query = sync.await(db.query('INSERT INTO mainTable SET ?', post, sync.defer())); 
    var newId = query.insertId; 
    post = {foreignKey: newId}; 
    //this query can be async, because it doesn't matter in this case 
    db.query('INSERT INTO subTable SET ?', post, function(err, result) { 
     if (err) throw err; 
    }); 
} 

Когда «saveSomething()» называется, он вставляет строку в главной таблице, и получает последний добавленный идентификатор. После этого будет выполнен код ниже. Не нужно влагать обещания или что-то подобное.

Смежные вопросы