2011-12-16 3 views
19

я работаю (складочном) скрипт nodeNodeJS | Кластер: Как отправить данные от мастера ко всем или одному ребенку/работникам?

var cluster = require('cluster'); 
var http = require('http'); 
var numReqs = 0; 

if (cluster.isMaster) { 
    // Fork workers. 
    for (var i = 0; i < 2; i++) { 
    var worker = cluster.fork(); 

    worker.on('message', function(msg) { 
     if (msg.cmd && msg.cmd == 'notifyRequest') { 
     numReqs++; 
     } 
    }); 
    } 

    setInterval(function() { 
    console.log("numReqs =", numReqs); 
    }, 1000); 
} else { 
    // Worker processes have a http server. 
    http.Server(function(req, res) { 
    res.writeHead(200); 
    res.end("hello world\n"); 
    // Send message to master process 
    process.send({ cmd: 'notifyRequest' }); 
    }).listen(8000); 
} 

В приведенном выше сценарии я могу отправить данные от рабочего до главного процесса с легкостью. Но как отправить данные от мастера к работнику/работникам? С примерами, если это возможно.

ответ

1

Вы должны быть в состоянии послать сообщение от ведущего работника, как это:

worker.send({message:'hello'}) 

, потому что «cluster.fork реализован на вершине child_process.fork» (cluster.fork реализуется на вершине из child_process.fork)

+0

Да, это работает, спасибо! Другими словами: при работе с рабочими я должен хранить их в массиве. И итерации этого массива для отправки данных каждому ребенку. Есть ли другой способ отправить данные всем работникам без сохранения и итерации. – htonus

+0

Если вы не хотите хранить рабочих в массиве и перебирать их для отправки сообщений, вы можете использовать сокет домена unix для передачи сообщений от мастера к работникам. – alessioalex

+0

Я предполагаю, что вы можете создать EventEmitter в главном устройстве, и они будут генерировать событие всякий раз, когда будет получено сообщение. После создания каждого рабочего вам просто нужно добавить слушателя к EventEmitter, который отправит сообщение работнику. Конечно, это все еще реализовано, сохраняя ссылки слушателей (таким образом, и рабочего) в EventEmitter, но эй, по крайней мере, вам не нужно смотреть на него. – cheng81

36

Поскольку cluster.fork реализуется на вершине child_process.fork, вы можете отправлять сообщения от мастера к работнику с помощью worker.send({ msg: 'test' }), и от рабочего до мастера по process.send({ msg: 'test' });. Вы получаете такие сообщения: worker.on('message', callback) (от рабочего до мастера) и process.on('message', callback); (от мастера к работнику).

Вот мой полный пример, вы можете проверить его, перейдя по ссылке http://localhost:8000/ Тогда работник будет посылать сообщение хозяину и мастер ответит:

var cluster = require('cluster'); 
var http = require('http'); 
var numReqs = 0; 
var worker; 

if (cluster.isMaster) { 
    // Fork workers. 
    for (var i = 0; i < 2; i++) { 
    worker = cluster.fork(); 

    worker.on('message', function(msg) { 
     // we only want to intercept messages that have a chat property 
     if (msg.chat) { 
     console.log('Worker to master: ', msg.chat); 
     worker.send({ chat: 'Ok worker, Master got the message! Over and out!' }); 
     } 
    }); 

    } 
} else { 
    process.on('message', function(msg) { 
    // we only want to intercept messages that have a chat property 
    if (msg.chat) { 
     console.log('Master to worker: ', msg.chat); 
    } 
    }); 
    // Worker processes have a http server. 
    http.Server(function(req, res) { 
    res.writeHead(200); 
    res.end("hello world\n"); 
    // Send message to master process 
    process.send({ chat: 'Hey master, I got a new request!' }); 
    }).listen(8000); 
} 
+0

На самом деле я хочу создать push-сервер для (web/flash). Текущая версия стеков на 1000 одновременных подключений. Поэтому я решил создать несколько рабочих с помощью socket.io-слушателей. Это означает, что мне нужно передавать данные работникам асинхронным способом. – htonus

+2

Звучит нормально, убедитесь, что вы используете Socket.IO с RedisStore. – alessioalex

+1

Это не сработает. 'var' внутри' for'? «Рабочий» будет удерживать последнего работника раздвоенным, а не каждого (особенно внутри обратного вызова события). Либо вы не заботитесь обо всех, а просто включаете свой обратный вызов или удерживаете всех работников в Массив. – dresende

8

Я нашел эту нить, ища способ отправить сообщение всем дочерним процессам и, к счастью, могло понять это благодаря комментариям о массивах. Просто хотел проиллюстрировать потенциальное решение для отправки сообщения всем дочерним процессам, использующим этот подход.

var cluster = require('cluster'); 
var http = require('http'); 
var numReqs = 0; 
var workers = []; 

if (cluster.isMaster) { 
    // Broadcast a message to all workers 
    var broadcast = function() { 
    for (var i in workers) { 
     var worker = workers[i]; 
     worker.send({ cmd: 'broadcast', numReqs: numReqs }); 
    } 
    } 

    // Fork workers. 
    for (var i = 0; i < 2; i++) { 
    var worker = cluster.fork(); 

    worker.on('message', function(msg) { 
     if (msg.cmd) { 
     switch (msg.cmd) { 
      case 'notifyRequest': 
      numReqs++; 
      break; 
      case 'broadcast': 
      broadcast(); 
      break; 
     } 
    }); 

    // Add the worker to an array of known workers 
    workers.push(worker); 
    } 

    setInterval(function() { 
    console.log("numReqs =", numReqs); 
    }, 1000); 
} else { 
    // React to messages received from master 
    process.on('message', function(msg) { 
    switch(msg.cmd) { 
     case 'broadcast': 
     if (msg.numReqs) console.log('Number of requests: ' + msg.numReqs); 
     break; 
    } 
    }); 

    // Worker processes have a http server. 
    http.Server(function(req, res) { 
    res.writeHead(200); 
    res.end("hello world\n"); 
    // Send message to master process 
    process.send({ cmd: 'notifyRequest' }); 
    process.send({ cmd: 'broadcast' }); 
    }).listen(8000); 
} 
3

Вот как я реализовал решение аналогичной проблемы. При подключении к cluster.on('fork') вы можете прикреплять обработчики сообщений к рабочим, поскольку они разветвляются (а не хранить их в массиве), что имеет дополнительное преимущество в отношении случаев, когда работники умирают или разъединяются, а новый работник разветвляется.

Этот фрагмент отправит сообщение от мастера на все работников.

if (cluster.isMaster) { 
    for (var i = 0; i < require('os').cpus.length; i++) { 
     cluster.fork(); 
    } 

    cluster.on('disconnect', function(worker) { 
     cluster.fork(); 
    } 

    // When a new worker process is forked, attach the handler 
    // This handles cases where new worker processes are forked 
    // on disconnect/exit, as above. 
    cluster.on('fork', function(worker) { 
     worker.on('message', messageRelay); 
    } 

    var messageRelay = function(msg) { 
     Object.keys(cluster.workers).forEach(function(id) { 
      cluster.workers[id].send(msg); 
     }); 
    }; 
} 
else { 
    process.on('message', messageHandler); 

    var messageHandler = function messageHandler(msg) { 
     // Worker received message--do something 
    }; 
} 
1

Я понимаю вашу цель вещания на все процессы узла рабочих в кластере, хотя вы не можете отправить компонент сокета, как таковой, но есть работа вокруг цели, чтобы служить. Я буду попробовать объяснить на примере:

Шаг 1: Когда действие клиент требует трансляции:

Child.js (Process that has been forked) : 

socket.on("BROADCAST_TO_ALL_WORKERS", function (data) 
{ 
    process.send({cmd : 'BROADCAST_TO_ALL_WORKERS', message :data.message}); 
}) 

Шаг 2: На стороне создания кластера

Server.js (Place where cluster forking happens): 

if (cluster.isMaster) { 

    for (var i = 0; i < numCPUs; i++) { 

var worker = cluster.fork();

worker.on('message', function (data) { 
    if (data.cmd === "BROADCAST_TO_ALL_WORKERS") { 
     console.log(server_debug_prefix() + "Server Broadcast To All, Message : " + data.message + " , Reload : " + data.reload + " Player Id : " + data.player_id); 
     Object.keys(cluster.workers).forEach(function(id) { 
      cluster.workers[id].send({cmd : "BROADCAST_TO_WORKER", message : data.message}); 
     }); 
     } 
    }); 
    } 

    cluster.on('exit', function (worker, code, signal) { 
    var newWorker = cluster.fork(); 
    newWorker.on('message', function (data) { 
     console.log(data); 
     if (data.cmd === "BROADCAST_TO_ALL_WORKERS") { 
     console.log(data.cmd,data); 
     Object.keys(cluster.workers).forEach(function(id) { 
      cluster.workers[id].send({cmd : "BROADCAST_TO_WORKER", message : data.message}); 
     }); 
     } 
    }); 
    }); 
} 
else { 
    //Node Js App Entry 
    require("./Child.js"); 
} 

Шаг 3: Для того, чтобы Вещание в дочернем процессе -

-> Поместите это перед io.on ("соединение") в ребенка.js

process.on("message", function(data){ 
    if(data.cmd === "BROADCAST_TO_WORKER"){ 
     io.sockets.emit("SERVER_MESSAGE", { message: data.message, reload: data.reload, player_id : data.player_id }); 
    } 
}); 

Надеюсь, это поможет. Пожалуйста, дайте мне знать, если требуется дополнительное разъяснение.

Смежные вопросы