diff --git a/lib/cluster.js b/lib/cluster.js index 44dd2e5d2a..6503cb5f90 100644 --- a/lib/cluster.js +++ b/lib/cluster.js @@ -19,572 +19,304 @@ // OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE // USE OR OTHER DEALINGS IN THE SOFTWARE. +var EventEmitter = require('events').EventEmitter; var assert = require('assert'); +var dgram = require('dgram'); var fork = require('child_process').fork; var net = require('net'); -var EventEmitter = require('events').EventEmitter; var util = require('util'); -function isObject(o) { - return (typeof o === 'object' && o !== null); -} +var cluster = new EventEmitter; +module.exports = cluster; +cluster.Worker = Worker; +cluster.isWorker = ('NODE_UNIQUE_ID' in process.env); +cluster.isMaster = (cluster.isWorker === false); -var debug; -if (process.env.NODE_DEBUG && /cluster/.test(process.env.NODE_DEBUG)) { - debug = function(x) { - var prefix = process.pid + ',' + - (process.env.NODE_UNIQUE_ID ? 'Worker' : 'Master'); - console.error(prefix, x); - }; -} else { - debug = function() { }; -} -// cluster object: -function Cluster() { +function Worker() { + if (!(this instanceof Worker)) return new Worker; EventEmitter.call(this); + this.suicide = undefined; + this.state = 'none'; + this.id = 0; } +util.inherits(Worker, EventEmitter); -util.inherits(Cluster, EventEmitter); - -var cluster = module.exports = new Cluster(); - -// Used in the master: -var masterStarted = false; -var ids = 0; -var serverHandlers = {}; - -// Used in the worker: -var serverListeners = {}; -var queryIds = 0; -var queryCallbacks = {}; - -// Define isWorker and isMaster -cluster.isWorker = 'NODE_UNIQUE_ID' in process.env; -cluster.isMaster = ! cluster.isWorker; - -// The worker object is only used in a worker -cluster.worker = cluster.isWorker ? {} : null; -// The workers array is only used in the master -cluster.workers = cluster.isMaster ? {} : null; - -// Settings object -var settings = cluster.settings = {}; - -// Simple function to call a function on each worker -function eachWorker(cb) { - // Go through all workers - for (var id in cluster.workers) { - if (cluster.workers.hasOwnProperty(id)) { - cb(cluster.workers[id]); - } - } -} - -// Extremely simple progress tracker -function ProgressTracker(missing, callback) { - this.missing = missing; - this.callback = callback; -} -ProgressTracker.prototype.done = function() { - this.missing -= 1; - this.check(); +Worker.prototype.kill = function() { + this.destroy.apply(this, arguments); }; -ProgressTracker.prototype.check = function() { - if (this.missing === 0) this.callback(); -}; - -cluster.setupMaster = function(options) { - // This can only be called from the master. - assert(cluster.isMaster); - - // Don't allow this function to run more than once - if (masterStarted) return; - masterStarted = true; - - // Get filename and arguments - options = options || {}; - - // By default, V8 writes the profile data of all processes to a single - // v8.log. - // - // Running that log file through a tick processor produces bogus numbers - // because many events won't match up with the recorded memory mappings - // and you end up with graphs where 80+% of ticks is unaccounted for. - // - // Fixing the tick processor to deal with multi-process output is not very - // useful because the processes may be running wildly disparate workloads. - // - // That's why we fix up the command line arguments to include - // a "--logfile=v8-%p.log" argument (where %p is expanded to the PID) - // unless it already contains a --logfile argument. - var execArgv = options.execArgv || process.execArgv; - if (execArgv.some(function(s) { return /^--prof/.test(s); }) && - !execArgv.some(function(s) { return /^--logfile=/.test(s); })) - { - execArgv = execArgv.slice(); - execArgv.push('--logfile=v8-%p.log'); - } - - // Set settings object - settings = cluster.settings = { - exec: options.exec || process.argv[1], - execArgv: execArgv, - args: options.args || process.argv.slice(2), - silent: options.silent || false - }; - // emit setup event - cluster.emit('setup'); +Worker.prototype.send = function() { + this.process.send.apply(this.process, arguments); }; -// Check if a message is internal only -var INTERNAL_PREFIX = 'NODE_CLUSTER_'; -function isInternalMessage(message) { - return isObject(message) && - typeof message.cmd === 'string' && - message.cmd.length > INTERNAL_PREFIX.length && - message.cmd.slice(0, INTERNAL_PREFIX.length) === INTERNAL_PREFIX; -} +// Master/worker specific methods are defined in the *Init() functions. -// Modify message object to be internal -function internalMessage(inMessage) { - var outMessage = util._extend({}, inMessage); - // Add internal prefix to cmd - outMessage.cmd = INTERNAL_PREFIX + (outMessage.cmd || ''); +if (cluster.isMaster) + masterInit(); +else + workerInit(); - return outMessage; -} - -// Handle callback messages -function handleResponse(outMessage, outHandle, inMessage, inHandle, worker) { - - // The message there will be sent - var message = internalMessage(outMessage); - // callback id - will be undefined if not set - message._queryEcho = inMessage._requestEcho; +function masterInit() { + cluster.workers = {}; - // Call callback if a query echo is received - if (inMessage._queryEcho) { - queryCallbacks[inMessage._queryEcho](inMessage.content, inHandle); - delete queryCallbacks[inMessage._queryEcho]; - } + var intercom = new EventEmitter; + var settings = { + args: process.argv.slice(2), + exec: process.argv[1], + execArgv: process.execArgv, + silent: false + }; + cluster.settings = settings; + + // Indexed by address:port:etc key. Its entries are dicts with handle and + // workers keys. That second one is a list of workers that hold a reference + // to the handle. When a worker dies, we scan the dicts and close the handle + // when its reference count drops to zero. Yes, that means we're doing an + // O(n*m) scan but n and m are small and worker deaths are rare events anyway. + var handles = {}; + + var initialized = false; + cluster.setupMaster = function(options) { + if (initialized === true) return; + initialized = true; + settings = util._extend(settings, options || {}); + // Tell V8 to write profile data for each process to a separate file. + // Without --logfile=v8-%p.log, everything ends up in a single, unusable + // file. (Unusable because what V8 logs are memory addresses and each + // process has its own memory mappings.) + if (settings.execArgv.some(function(s) { return /^--prof/.test(s); }) && + !settings.execArgv.some(function(s) { return /^--logfile=/.test(s); })) + { + settings.execArgv = settings.execArgv.concat(['--logfile=v8-%p.log']); + } + cluster.settings = settings; + cluster.emit('setup'); + }; - // Send if outWrap contains something useful - if (!(outMessage === undefined && message._queryEcho === undefined)) { - sendInternalMessage(worker, message, outHandle); - } -} + var ids = 0; + cluster.fork = function(env) { + cluster.setupMaster(); + var worker = new Worker; + worker.id = ++ids; + var workerEnv = util._extend({}, process.env); + workerEnv = util._extend(workerEnv, env); + workerEnv.NODE_UNIQUE_ID = '' + worker.id; + worker.process = fork(settings.exec, settings.args, { + env: workerEnv, + silent: settings.silent, + execArgv: settings.execArgv + }); + worker.process.once('exit', function(exitCode, signalCode) { + worker.suicide = !!worker.suicide; + worker.state = 'dead'; + worker.emit('exit', exitCode, signalCode); + cluster.emit('exit', worker, exitCode, signalCode); + delete cluster.workers[worker.id]; + }); + worker.process.once('disconnect', function() { + worker.suicide = !!worker.suicide; + worker.state = 'disconnected'; + worker.emit('disconnect'); + cluster.emit('disconnect', worker); + delete cluster.workers[worker.id]; + }); + worker.process.on('error', worker.emit.bind(worker, 'error')); + worker.process.on('message', worker.emit.bind(worker, 'message')); + worker.process.on('internalMessage', internal(worker, onmessage)); + process.nextTick(function() { + cluster.emit('fork', worker); + }); + cluster.workers[worker.id] = worker; + return worker; + }; -// Handle messages from both master and workers -var messageHandler = {}; -function handleMessage(worker, inMessage, inHandle) { + cluster.disconnect = function(cb) { + for (var key in cluster.workers) { + var worker = cluster.workers[key]; + worker.disconnect(); + } + if (cb) intercom.once('disconnect', cb); + }; - // Remove internal prefix - var message = util._extend({}, inMessage); - message.cmd = inMessage.cmd.substr(INTERNAL_PREFIX.length); + cluster.on('disconnect', function(worker) { + delete cluster.workers[worker.id]; + // O(n*m) scan but for small values of n and m. + for (var key in handles) { + var e = handles[key]; + var i = e.workers.indexOf(worker); + if (i === -1) continue; + e.workers.splice(i, 1); + if (e.workers.length !== 0) continue; + e.handle.close(); + delete handles[key]; + } + if (Object.keys(handles).length === 0) { + intercom.emit('disconnect'); + } + }); - var respondUsed = false; - function respond(outMessage, outHandler) { - respondUsed = true; - handleResponse(outMessage, outHandler, inMessage, inHandle, worker); - } + Worker.prototype.disconnect = function() { + this.suicide = true; + send(this, { act: 'disconnect' }); + }; - // Run handler if it exists - if (messageHandler[message.cmd]) { - messageHandler[message.cmd](message, worker, respond); - } + Worker.prototype.destroy = function(signo) { + signo = signo || 'SIGTERM'; + var proc = this.process; + if (proc.connected) { + proc.once('disconnect', proc.kill.bind(proc, signo)); + proc.disconnect(); + return; + } + proc.kill(signo); + }; - // Send respond if it hasn't been called yet - if (respondUsed === false) { - respond(); + function onmessage(message, handle) { + var worker = this; + if (message.act === 'online') + online(worker); + else if (message.act === 'queryServer') + queryServer(worker, message); + else if (message.act === 'listening') + listening(worker, message); + else if (message.act === 'suicide') + worker.suicide = true; } -} - -// Messages to the master will be handled using these methods -if (cluster.isMaster) { - // Handle online messages from workers - messageHandler.online = function(message, worker) { + function online(worker) { worker.state = 'online'; - debug('Worker ' + worker.process.pid + ' online'); worker.emit('online'); cluster.emit('online', worker); - }; - - // Handle queryServer messages from workers - messageHandler.queryServer = function(message, worker, send) { + } - // This sequence of information is unique to the connection - // but not to the worker + function queryServer(worker, message) { var args = [message.address, message.port, message.addressType, message.fd]; var key = args.join(':'); - var handler; - - if (serverHandlers.hasOwnProperty(key)) { - handler = serverHandlers[key]; - } else if (message.addressType === 'udp4' || - message.addressType === 'udp6') { - var dgram = require('dgram'); - handler = dgram._createSocketHandle.apply(net, args); - serverHandlers[key] = handler; - } else { - handler = net._createServerHandle.apply(net, args); - serverHandlers[key] = handler; + var e = handles[key]; + if (typeof e === 'undefined') { + e = { workers: [] }; + if (message.addressType === 'udp4' || message.addressType === 'udp6') + e.handle = dgram._createSocketHandle.apply(null, args); + else + e.handle = net._createServerHandle.apply(null, args); + handles[key] = e; } + e.workers.push(worker); + send(worker, { ack: message.seq }, e.handle); + } - // echo callback with the fd handler associated with it - send({}, handler); - }; - - // Handle listening messages from workers - messageHandler.listening = function(message, worker) { - - worker.state = 'listening'; - - // Emit listening, now that we know the worker is listening - worker.emit('listening', { - address: message.address, - port: message.port, + function listening(worker, message) { + var info = { addressType: message.addressType, - fd: message.fd - }); - cluster.emit('listening', worker, { address: message.address, port: message.port, - addressType: message.addressType, fd: message.fd - }); - }; - - // Handle suicide messages from workers - messageHandler.suicide = function(message, worker) { - worker.suicide = true; - }; -} - - -// Messages to a worker will be handled using these methods -else if (cluster.isWorker) { - - // Handle worker.disconnect from master - messageHandler.disconnect = function(message, worker) { - worker.disconnect(); - }; -} - -function toDecInt(value) { - value = parseInt(value, 10); - return isNaN(value) ? null : value; -} - -// Create a worker object, that works both for master and worker -function Worker(customEnv) { - if (!(this instanceof Worker)) return new Worker(); - EventEmitter.call(this); - - var self = this; - var env = process.env; - - // Assign a unique id, default null - this.id = cluster.isMaster ? ++ids : toDecInt(env.NODE_UNIQUE_ID); - - // XXX: Legacy. Remove in 0.9 - this.workerID = this.uniqueID = this.id; - - // Assign state - this.state = 'none'; - - // Create or get process - if (cluster.isMaster) { - - // Create env object - // first: copy and add id property - var envCopy = util._extend({}, env); - envCopy['NODE_UNIQUE_ID'] = this.id; - // second: extend envCopy with the env argument - if (isObject(customEnv)) { - envCopy = util._extend(envCopy, customEnv); - } - - // fork worker - this.process = fork(settings.exec, settings.args, { - 'env': envCopy, - 'silent': settings.silent, - 'execArgv': settings.execArgv - }); - } else { - this.process = process; - } - - if (cluster.isMaster) { - // Save worker in the cluster.workers array - cluster.workers[this.id] = this; - - // Emit a fork event, on next tick - // There is no worker.fork event since this has no real purpose - process.nextTick(function() { - cluster.emit('fork', self); - }); - } - - // handle internalMessage, exit and disconnect event - this.process.on('internalMessage', handleMessage.bind(null, this)); - this.process.once('exit', function(exitCode, signalCode) { - prepareExit(self, 'dead'); - self.emit('exit', exitCode, signalCode); - cluster.emit('exit', self, exitCode, signalCode); - }); - this.process.once('disconnect', function() { - prepareExit(self, 'disconnected'); - self.emit('disconnect'); - cluster.emit('disconnect', self); - }); - - // relay message and error - this.process.on('message', this.emit.bind(this, 'message')); - this.process.on('error', this.emit.bind(this, 'error')); - -} -util.inherits(Worker, EventEmitter); -cluster.Worker = Worker; - -function prepareExit(worker, state) { - - // set state to disconnect - worker.state = state; - - // Make suicide a boolean - worker.suicide = !!worker.suicide; - - // Remove from workers in the master - if (cluster.isMaster) { - delete cluster.workers[worker.id]; - } -} - -// Send internal message -function sendInternalMessage(worker, message/*, handler, callback*/) { - - // Exist callback - var callback = arguments[arguments.length - 1]; - if (typeof callback !== 'function') { - callback = undefined; - } - - // exist handler - var handler = arguments[2] !== callback ? arguments[2] : undefined; - - if (!isInternalMessage(message)) { - message = internalMessage(message); + }; + worker.state = 'listening'; + worker.emit('listening', info); + cluster.emit('listening', worker, info); } - // Store callback for later - if (callback) { - message._requestEcho = worker.id + ':' + (++queryIds); - queryCallbacks[message._requestEcho] = callback; + function send(worker, message, handle, cb) { + sendHelper(worker.process, message, handle, cb); } - - - worker.send(message, handler); } -// Send message to worker or master -Worker.prototype.send = function() { - // You could also just use process.send in a worker - this.process.send.apply(this.process, arguments); -}; - -// Kill the worker without restarting -Worker.prototype.kill = Worker.prototype.destroy = function(signal) { - if (!signal) - signal = 'SIGTERM'; - - var self = this; - - this.suicide = true; - - if (cluster.isMaster) { - // Disconnect IPC channel - // this way the worker won't need to propagate suicide state to master - if (self.process.connected) { - self.process.once('disconnect', function() { - self.process.kill(signal); - }); - self.process.disconnect(); - } else { - self.process.kill(signal); - } +function workerInit() { + var handles = []; - } else { - // Channel is open - if (this.process.connected) { - - // Inform master to suicide and then kill - sendInternalMessage(this, {cmd: 'suicide'}, function() { - process.exit(0); - }); - - // When channel is closed, terminate the process - this.process.once('disconnect', function() { - process.exit(0); - }); - } else { - process.exit(0); + // Called from src/node.js + cluster._setupWorker = function() { + var worker = new Worker; + cluster.worker = worker; + worker.id = +process.env.NODE_UNIQUE_ID | 0; + worker.state = 'online'; + worker.process = process; + process.once('disconnect', process.exit.bind(null, 0)); + process.on('internalMessage', internal(worker, onmessage)); + send({ act: 'online' }); + function onmessage(message, handle) { + if (message.act === 'disconnect') worker.disconnect(); } - } -}; - -// The .disconnect function will close all servers -// and then disconnect the IPC channel. -if (cluster.isMaster) { - // Used in master - Worker.prototype.disconnect = function() { - this.suicide = true; - - sendInternalMessage(this, {cmd: 'disconnect'}); }; -} else { - // Used in workers - Worker.prototype.disconnect = function() { - var self = this; - - this.suicide = true; - - // keep track of open servers - var servers = Object.keys(serverListeners).length; - var progress = new ProgressTracker(servers, function() { - // There are no more servers open so we will close the IPC channel. - // Closing the IPC channel will emit a disconnect event - // in both master and worker on the process object. - // This event will be handled by prepareExit. - self.process.disconnect(); + // obj is a net#Server or a dgram#Socket object. + cluster._getServer = function(obj, address, port, addressType, fd, cb) { + var message = { + addressType: addressType, + address: address, + port: port, + act: 'queryServer', + fd: fd + }; + send(message, function(_, handle) { + // Monkey-patch the close() method so we can keep track of when it's + // closed. Avoids resource leaks when the handle is short-lived. + var close = handle.close; + handle.close = function() { + var index = handles.indexOf(handle); + if (index !== -1) handles.splice(index, 1); + return close.apply(this, arguments); + }; + handles.push(handle); + cb(handle); }); - - // depending on where this function was called from (master or worker) - // The suicide state has already been set, - // but it doesn't really matter if we set it again. - sendInternalMessage(this, {cmd: 'suicide'}, function() { - // in case there are no servers - progress.check(); - - // closing all servers gracefully - var server; - for (var key in serverListeners) { - server = serverListeners[key]; - - // in case the server is closed we won't close it again - if (server._handle === null) { - progress.done(); - continue; - } - - server.on('close', progress.done.bind(progress)); - server.close(); - } + obj.once('listening', function() { + cluster.worker.state = 'listening'; + message.act = 'listening'; + message.port = obj.address().port || port; + send(message); }); - }; -} - -// Fork a new worker -cluster.fork = function(env) { - // This can only be called from the master. - assert(cluster.isMaster); - - // Make sure that the master has been initialized - cluster.setupMaster(); - return (new cluster.Worker(env)); -}; - -// execute .disconnect on all workers and close handlers when done -cluster.disconnect = function(callback) { - // This can only be called from the master. - assert(cluster.isMaster); - - // Close all TCP handlers when all workers are disconnected - var workers = Object.keys(cluster.workers).length; - var progress = new ProgressTracker(workers, function() { - for (var key in serverHandlers) { - serverHandlers[key].close(); - delete serverHandlers[key]; - } + Worker.prototype.disconnect = function() { + for (var handle; handle = handles.shift(); handle.close()); + process.disconnect(); + }; - // call callback when done - if (callback) callback(); - }); + Worker.prototype.destroy = function() { + if (!process.connected) process.exit(0); + var exit = process.exit.bind(null, 0); + send({ act: 'suicide' }, exit); + process.once('disconnect', exit); + process.disconnect(); + }; - // begin disconnecting all workers - eachWorker(function(worker) { - worker.once('disconnect', progress.done.bind(progress)); - worker.disconnect(); - }); + function send(message, cb) { + sendHelper(process, message, null, cb); + } +} - // in case there weren't any workers - progress.check(); -}; -// Internal function. Called from src/node.js when worker process starts. -cluster._setupWorker = function() { +var seq = 0; +var callbacks = {}; +function sendHelper(proc, message, handle, cb) { + // Mark message as internal. See INTERNAL_PREFIX in lib/child_process.js + message = util._extend({ cmd: 'NODE_CLUSTER' }, message); + if (cb) callbacks[seq] = cb; + message.seq = seq; + seq += 1; + proc.send(message, handle); +} - // Get worker class - var worker = cluster.worker = new Worker(); - // we will terminate the worker - // when the worker is disconnected from the parent accidentally - process.once('disconnect', function() { - if (worker.suicide !== true) { - process.exit(0); +// Returns an internalMessage listener that hands off normal messages +// to the callback but intercepts and redirects ACK messages. +function internal(worker, cb) { + return function(message, handle) { + if (message.cmd !== 'NODE_CLUSTER') return; + var fn = cb; + if (typeof message.ack !== 'undefined') { + fn = callbacks[message.ack]; + delete callbacks[message.ack]; } - }); - - // Tell master that the worker is online - worker.state = 'online'; - sendInternalMessage(worker, { cmd: 'online' }); -}; - -// Internal function. Called by net.js and dgram.js when attempting to bind a -// TCP server or UDP socket. -cluster._getServer = function(tcpSelf, address, port, addressType, fd, cb) { - // This can only be called from a worker. - assert(cluster.isWorker); - - // Store tcp instance for later use - var key = [address, port, addressType, fd].join(':'); - serverListeners[key] = tcpSelf; - - // Send a listening message to the master - tcpSelf.once('listening', function() { - cluster.worker.state = 'listening'; - sendInternalMessage(cluster.worker, { - cmd: 'listening', - address: address, - port: tcpSelf.address().port || port, - addressType: addressType, - fd: fd - }); - }); - - // Request the fd handler from the master process - var message = { - cmd: 'queryServer', - address: address, - port: port, - addressType: addressType, - fd: fd + fn.apply(worker, arguments); }; - - // The callback will be stored until the master has responded - sendInternalMessage(cluster.worker, message, function(msg, handle) { - cb(handle); - }); - -}; +}