2013-02-19 4 views
1

Мой основной экземпляр узла вызывает рабочий процесс, который принимает сообщения через IPC (используя встроенный узел process.send() и process.on('message'...), которые представляют собой объекты, содержащие информацию о новых заданиях для добавления в Kue. Затем он обрабатывает эти задания.Обратный вызов Kue при завершении задания

Мой главный экземпляр узла вызывает что-то вроде этого:

worker.send({jobType:'filesystem', operation: 'delete', path: fileDir}); 

и экземпляр работник делает что-то вроде этого:

jobs.create(message.jobType, message).save(); 

jobs.process('filesystem', function(job, done) { 
    fs.delete(job.data.path, function(err) { 
     done(err); 
    }); 
}); 

и задание завершается успешно.

Как я могу получить функцию обратного вызова в моем основном экземпляре узла, когда задание завершено? Как я могу вернуть некоторые результаты в основной экземпляр узла из экземпляра рабочего?

ответ

1

Я считаю, что я решил это, но я оставлю нерешенный вопрос, если кто-то может улучшить мое решение или обеспечить лучший.

Когда вы используете Kue для обработки заданий в отдельном процессе, вы не можете просто выполнить обратный вызов, когда задание будет завершено. Это пример взаимодействия между двумя процессами. Мне бы хотелось использовать идентификатор, который автоматически выполняет каждое задание Kue (что, по моему мнению, является тем же самым идентификатором, который он получает в Redis), но app.js должен знать идентификатор задания, прежде чем он будет отправлен работнику, чтобы он может совпадать с идентификатором, когда он получает сообщение.

app.js

var child = require('child_process'); 
var async = require('async'); 

var worker = child.fork("./worker.js"); 

//When a message is received, search activeJobs for it, call finished callback, and delete the job 
worker.on('message', function(m) { 
    for(var i = 0; i < activeJobs.length; i++) { 
     if(m.jobId == activeJobs[i].jobId) { 
      activeJobs[i].finished(m.err, m.results); 
      activeJobs.splice(i,1); 
      break; 
     } 
    } 
}); 

// local job system 
var newJobId = 0; 
var activeJobs = []; 

function Job(input, callback) { 
    this.jobId = newJobId; 
    input.jobId = newJobId; 
    newJobId++; 
    activeJobs.push(this); 

    worker.send(input); 

    this.finished = function(err, results) { 
     callback(err, results); 
    } 
} 


var deleteIt = function(req, res) { 
    async.series([ 
     function(callback) { 
      // An *EXAMPLE* asynchronous task that is passed off to the worker to be processed 
      // and requires a callback (because of async.series) 
      new Job({ 
       jobType:'filesystem', 
       title:'delete project directory', 
       operation: 'delete', 
       path: '/deleteMe' 
      }, function(err) { 
       callback(err); 
      }); 
     }, 
     //Delete it from the database 
     function(callback) { 
      someObject.remove(function(err) { 
       callback(err); 
      }); 
     }, 
    ], 
    function(err) { 
     if(err) console.log(err); 
    }); 
}; 

worker.js

var kue = require('kue'); 
var fs = require('fs-extra'); 

var jobs = kue.createQueue(); 

//Jobs that are sent arrive here 
process.on('message', function(message) { 
    if(message.jobType) { 
     var job = jobs.create(message.jobType, message).save(); 
    } else { 
     console.error("Worker:".cyan + " [ERROR] No jobType specified, message ignored".red); 
    } 
}); 

jobs.process('filesystem', function(job, done) { 

    if(job.data.operation == 'delete') { 
     fs.delete(job.data.path, function(err) { 
      notifyFinished(job.data.jobId, err); 
      done(err); 
     }); 
    } 
}); 

function notifyFinished(id, error, results) { 
    process.send({jobId: id, status: 'finished', error: error, results: results}); 
} 

https://gist.github.com/winduptoy/4991718

+0

Спасибо - я была такая же проблема. Было бы хорошо, если бы было более простое решение. – hoju

Смежные вопросы