From 87339a22b1cc539259c02119bd7381006d56396d Mon Sep 17 00:00:00 2001 From: Ryan Dahl Date: Wed, 12 Oct 2011 02:56:29 -0700 Subject: [PATCH] introduce node cluster --- lib/cluster.js | 199 +++++++++++++++++++++++++++++++++++++++++++++++++ lib/net.js | 72 ++++++++++++------ src/node.cc | 3 +- src/node.js | 11 +++ 4 files changed, 260 insertions(+), 25 deletions(-) create mode 100644 lib/cluster.js diff --git a/lib/cluster.js b/lib/cluster.js new file mode 100644 index 0000000000..75bcfa560a --- /dev/null +++ b/lib/cluster.js @@ -0,0 +1,199 @@ +// Copyright Joyent, Inc. and other Node contributors. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the +// "Software"), to deal in the Software without restriction, including +// without limitation the rights to use, copy, modify, merge, publish, +// distribute, sublicense, and/or sell copies of the Software, and to permit +// persons to whom the Software is furnished to do so, subject to the +// following conditions: +// +// The above copyright notice and this permission notice shall be included +// in all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN +// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR +// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE +// USE OR OTHER DEALINGS IN THE SOFTWARE. + +var assert = require('assert'); +var fork = require('child_process').fork; +var net = require('net'); +var amMaster; // Used for asserts + + +var debug; +if (process.env.NODE_DEBUG && /cluster/.test(process.env.NODE_DEBUG)) { + debug = function(x) { + var prefix = process.pid + ',' + + (process.env.NODE_WORKER_ID ? 'Worker' : 'Master'); + console.error(prefix, x); + }; +} else { + debug = function() { }; +} + + +// Used in the master: +var ids = 0; +var workers = []; +var servers = {}; + +// Used in the worker: +var workerId = 0; +var queryIds = 0; +var queryCallbacks = {}; + + +exports.start = function() { + amMaster = true; + + if (process.argv.length < 1) { + console.error('Usage: node cluster script.js'); + process.exit(1); + } + + var args = process.argv.slice(2); + var scriptFilename = args.shift(); + + var cpus = require('os').cpus().length; + console.error("Detected " + cpus + " cpus"); + + for (var i = 0; i < cpus; i++) { + forkWorker(scriptFilename, args); + } + + process.on('uncaughtException', function(e) { + // Quickly try to kill all the workers. + // TODO: be session leader - will cause auto SIGHUP to the children. + for (var id in workers) { + if (workers[id]) { + debug("kill worker " + id); + workers[id].kill(); + } + } + + console.error("Exception in cluster master process: " + + e.message + '\n' + e.stack); + console.error("Please report this bug."); + process.exit(1); + }); +} + + +function handleWorkerMessage(worker, message) { + assert.ok(amMaster); + + debug("recv " + JSON.stringify(message)); + + switch (message.cmd) { + case 'online': + console.log("Worker " + worker.pid + " online"); + workers[message._workerId] = worker; + break; + + case 'queryServer': + var key = message.address + ":" + + message.port + ":" + + message.addressType; + var response = { _queryId: message._queryId }; + + if (key in servers == false) { + // Create a new server. + debug('create new server ' + key); + servers[key] = net._createServerHandle(message.address, + message.port, + message.addressType); + } + worker.send(response, servers[key]); + break; + + default: + // Ignore. + break; + } +} + + +function forkWorker(scriptFilename, args) { + var id = ++ids; + var envCopy = {}; + + for (var x in process.env) { + envCopy[x] = process.env[x]; + } + + envCopy['NODE_WORKER_ID'] = id; + + var worker = fork(scriptFilename, args, { + env: envCopy + }); + + worker.on('message', function(message) { + handleWorkerMessage(worker, message); + }); + + worker.on('exit', function() { + debug('worker id=' + id + ' died'); + delete workers[id]; + }); + + return worker; +} + + +exports.startWorker = function() { + assert.ok(!amMaster); + amMaster = false; + workerId = parseInt(process.env.NODE_WORKER_ID); + + queryMaster({ cmd: 'online' }); + + // Make callbacks from queryMaster() + process.on('message', function(msg, handle) { + debug("recv " + JSON.stringify(msg)); + if (msg._queryId && msg._queryId in queryCallbacks) { + var cb = queryCallbacks[msg._queryId]; + if (typeof cb == 'function') { + cb(msg, handle); + } + delete queryCallbacks[msg._queryId] + } + }); +}; + + +function queryMaster(msg, cb) { + assert.ok(!amMaster); + + debug('send ' + JSON.stringify(msg)); + + // Grab some random queryId + msg._queryId = (++queryIds); + msg._workerId = workerId; + + // Store callback for later. Callback called in startWorker. + if (cb) { + queryCallbacks[msg._queryId] = cb; + } + + // Send message to master. + process.send(msg); +} + + +exports.getServer = function(address, port, addressType, cb) { + assert.ok(!amMaster); + + queryMaster({ + cmd: "queryServer", + address: address, + port: port, + addressType: addressType + }, function(msg, handle) { + cb(handle); + }); +}; diff --git a/lib/net.js b/lib/net.js index 0768de4b34..75d5e542b7 100644 --- a/lib/net.js +++ b/lib/net.js @@ -606,34 +606,45 @@ exports.Server = Server; function toPort(x) { return (x = Number(x)) >= 0 ? x : false; } -function listen(self, address, port, addressType) { +var createServerHandle = exports._createServerHandle = + function(address, port, addressType) { + var r = 0; + // assign handle in listen, and clean up if bind or listen fails + var handle = + (port == -1 && addressType == -1) ? createPipe() : createTCP(); + + if (address || port) { + debug('bind to ' + address); + if (addressType == 6) { + r = handle.bind6(address, port); + } else { + r = handle.bind(address, port); + } + } + + if (r) { + handle.close(); + handle = null; + + process.nextTick(function() { + self.emit('error', errnoException(errno, 'listen')); + }); + return; + } + + return handle; +}; + + +Server.prototype._listen2 = function(address, port, addressType) { + var self = this; var r = 0; // If there is not yet a handle, we need to create one and bind. // In the case of a server sent via IPC, we don't need to do this. if (!self._handle) { - // assign handle in listen, and clean up if bind or listen fails - self._handle = - (port == -1 && addressType == -1) ? createPipe() : createTCP(); - - if (address || port) { - debug('bind to ' + address); - if (addressType == 6) { - r = self._handle.bind6(address, port); - } else { - r = self._handle.bind(address, port); - } - } - - if (r) { - self._handle.close(); - self._handle = null; - - process.nextTick(function() { - self.emit('error', errnoException(errno, 'listen')); - }); - return; - } + self._handle = createServerHandle(address, port, addressType); + if (!self._handle) return; } self._handle.onconnection = onconnection; @@ -644,7 +655,6 @@ function listen(self, address, port, addressType) { if (r) { self._handle.close(); self._handle = null; - process.nextTick(function() { self.emit('error', errnoException(errno, 'listen')); }); @@ -657,6 +667,20 @@ function listen(self, address, port, addressType) { } +function listen(self, address, port, addressType) { + if (process.env.NODE_WORKER_ID) { + require('cluster').getServer(address, port, addressType, function(handle) { + self._handle = handle; + self._listen2(address, port, addressType); + }); + } else { + process.nextTick(function() { + self._listen2(address, port, addressType); + }); + } +} + + Server.prototype.listen = function() { var self = this; diff --git a/src/node.cc b/src/node.cc index 08f9ba3f43..170a169c68 100644 --- a/src/node.cc +++ b/src/node.cc @@ -2292,7 +2292,8 @@ static void ParseDebugOpt(const char* arg) { static void PrintHelp() { printf("Usage: node [options] [ -e script | script.js ] [arguments] \n" - " node debug [ -e script | script.js ] [arguments] \n" + " node debug script.js [arguments] \n" + " node cluster script.js [arguments] \n" "\n" "Options:\n" " -v, --version print node's version\n" diff --git a/src/node.js b/src/node.js index 93fc6b5fb0..8f725677f8 100644 --- a/src/node.js +++ b/src/node.js @@ -68,6 +68,10 @@ var d = NativeModule.require('_debugger'); d.start(); + } else if (process.argv[1] == 'cluster') { + var cluster = NativeModule.require('cluster'); + cluster.start(); + } else if (process._eval != null) { // User passed '-e' or '--eval' arguments to Node. var Module = NativeModule.require('module'); @@ -84,6 +88,13 @@ var path = NativeModule.require('path'); process.argv[1] = path.resolve(process.argv[1]); + // If this is a worker in cluster mode, start up the communiction + // channel. + if (process.env.NODE_WORKER_ID) { + var cluster = NativeModule.require('cluster'); + cluster.startWorker(); + } + var Module = NativeModule.require('module'); // REMOVEME: nextTick should not be necessary. This hack to get // test/simple/test-exception-handler2.js working.