// Copyright Joyent, Inc. and other Node contributors. // // Permission is hereby granted, free of charge, to any person obtaining a // copy of this software and associated documentation files (the // "Software"), to deal in the Software without restriction, including // without limitation the rights to use, copy, modify, merge, publish, // distribute, sublicense, and/or sell copies of the Software, and to permit // persons to whom the Software is furnished to do so, subject to the // following conditions: // // The above copyright notice and this permission notice shall be included // in all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS // OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN // NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, // DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR // OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE // USE OR OTHER DEALINGS IN THE SOFTWARE. var assert = require('assert'); var fork = require('child_process').fork; var net = require('net'); var EventEmitter = require('events').EventEmitter; var util = require('util'); function isObject(o) { return (typeof o === 'object' && o !== null); } var debug; if (process.env.NODE_DEBUG && /cluster/.test(process.env.NODE_DEBUG)) { debug = function(x) { var prefix = process.pid + ',' + (process.env.NODE_UNIQUE_ID ? 'Worker' : 'Master'); console.error(prefix, x); }; } else { debug = function() { }; } // cluster object: function Cluster() { EventEmitter.call(this); } util.inherits(Cluster, EventEmitter); var cluster = module.exports = new Cluster(); // Used in the master: var masterStarted = false; var ids = 0; var serverHandlers = {}; // Used in the worker: var serverListeners = {}; var queryIds = 0; var queryCallbacks = {}; // Define isWorker and isMaster cluster.isWorker = 'NODE_UNIQUE_ID' in process.env; cluster.isMaster = ! cluster.isWorker; // The worker object is only used in a worker cluster.worker = cluster.isWorker ? {} : null; // The workers array is only used in the master cluster.workers = cluster.isMaster ? {} : null; // Settings object var settings = cluster.settings = {}; // Simple function to call a function on each worker function eachWorker(cb) { // Go through all workers for (var id in cluster.workers) { if (cluster.workers.hasOwnProperty(id)) { cb(cluster.workers[id]); } } } // Extremely simple progress tracker function ProgressTracker(missing, callback) { this.missing = missing; this.callback = callback; } ProgressTracker.prototype.done = function() { this.missing -= 1; this.check(); }; ProgressTracker.prototype.check = function() { if (this.missing === 0) this.callback(); }; cluster.setupMaster = function(options) { // This can only be called from the master. assert(cluster.isMaster); // Don't allow this function to run more than once if (masterStarted) return; masterStarted = true; // Get filename and arguments options = options || {}; // By default, V8 writes the profile data of all processes to a single // v8.log. // // Running that log file through a tick processor produces bogus numbers // because many events won't match up with the recorded memory mappings // and you end up with graphs where 80+% of ticks is unaccounted for. // // Fixing the tick processor to deal with multi-process output is not very // useful because the processes may be running wildly disparate workloads. // // That's why we fix up the command line arguments to include // a "--logfile=v8-%p.log" argument (where %p is expanded to the PID) // unless it already contains a --logfile argument. var execArgv = options.execArgv || process.execArgv; if (execArgv.some(function(s) { return /^--prof/.test(s); }) && !execArgv.some(function(s) { return /^--logfile=/.test(s); })) { execArgv = execArgv.slice(); execArgv.push('--logfile=v8-%p.log'); } // Set settings object settings = cluster.settings = { exec: options.exec || process.argv[1], execArgv: execArgv, args: options.args || process.argv.slice(2), silent: options.silent || false }; // emit setup event cluster.emit('setup'); }; // Check if a message is internal only var INTERNAL_PREFIX = 'NODE_CLUSTER_'; function isInternalMessage(message) { return isObject(message) && typeof message.cmd === 'string' && message.cmd.length > INTERNAL_PREFIX.length && message.cmd.slice(0, INTERNAL_PREFIX.length) === INTERNAL_PREFIX; } // Modify message object to be internal function internalMessage(inMessage) { var outMessage = util._extend({}, inMessage); // Add internal prefix to cmd outMessage.cmd = INTERNAL_PREFIX + (outMessage.cmd || ''); return outMessage; } // Handle callback messages function handleResponse(outMessage, outHandle, inMessage, inHandle, worker) { // The message there will be sent var message = internalMessage(outMessage); // callback id - will be undefined if not set message._queryEcho = inMessage._requestEcho; // Call callback if a query echo is received if (inMessage._queryEcho) { queryCallbacks[inMessage._queryEcho](inMessage.content, inHandle); delete queryCallbacks[inMessage._queryEcho]; } // Send if outWrap contains something useful if (!(outMessage === undefined && message._queryEcho === undefined)) { sendInternalMessage(worker, message, outHandle); } } // Handle messages from both master and workers var messageHandler = {}; function handleMessage(worker, inMessage, inHandle) { // Remove internal prefix var message = util._extend({}, inMessage); message.cmd = inMessage.cmd.substr(INTERNAL_PREFIX.length); var respondUsed = false; function respond(outMessage, outHandler) { respondUsed = true; handleResponse(outMessage, outHandler, inMessage, inHandle, worker); } // Run handler if it exists if (messageHandler[message.cmd]) { messageHandler[message.cmd](message, worker, respond); } // Send respond if it hasn't been called yet if (respondUsed === false) { respond(); } } // Messages to the master will be handled using these methods if (cluster.isMaster) { // Handle online messages from workers messageHandler.online = function(message, worker) { worker.state = 'online'; debug('Worker ' + worker.process.pid + ' online'); worker.emit('online'); cluster.emit('online', worker); }; // Handle queryServer messages from workers messageHandler.queryServer = function(message, worker, send) { // This sequence of information is unique to the connection // but not to the worker var args = [message.address, message.port, message.addressType, message.fd]; var key = args.join(':'); var handler; if (serverHandlers.hasOwnProperty(key)) { handler = serverHandlers[key]; } else { if (message.addressType === 'udp4' || message.addressType === 'udp6') { var dgram = require('dgram'); handler = dgram._createSocketHandle.apply(net, args); } else { handler = net._createServerHandle.apply(net, args); } if (!handler) { send({ content: { error: process._errno } }, null); return; } serverHandlers[key] = handler; } // echo callback with the fd handler associated with it send({}, handler); }; // Handle listening messages from workers messageHandler.listening = function(message, worker) { worker.state = 'listening'; // Emit listening, now that we know the worker is listening worker.emit('listening', { address: message.address, port: message.port, addressType: message.addressType, fd: message.fd }); cluster.emit('listening', worker, { address: message.address, port: message.port, addressType: message.addressType, fd: message.fd }); }; // Handle suicide messages from workers messageHandler.suicide = function(message, worker) { worker.suicide = true; }; } // Messages to a worker will be handled using these methods else if (cluster.isWorker) { // Handle worker.disconnect from master messageHandler.disconnect = function(message, worker) { worker.disconnect(); }; } function toDecInt(value) { value = parseInt(value, 10); return isNaN(value) ? null : value; } // Create a worker object, that works both for master and worker function Worker(customEnv) { if (!(this instanceof Worker)) return new Worker(); EventEmitter.call(this); var self = this; var env = process.env; // Assign a unique id, default null this.id = cluster.isMaster ? ++ids : toDecInt(env.NODE_UNIQUE_ID); // XXX: Legacy. Remove in 0.9 this.workerID = this.uniqueID = this.id; // Assign state this.state = 'none'; // Create or get process if (cluster.isMaster) { // Create env object // first: copy and add id property var envCopy = util._extend({}, env); envCopy['NODE_UNIQUE_ID'] = this.id; // second: extend envCopy with the env argument if (isObject(customEnv)) { envCopy = util._extend(envCopy, customEnv); } // fork worker this.process = fork(settings.exec, settings.args, { 'env': envCopy, 'silent': settings.silent, 'execArgv': settings.execArgv }); } else { this.process = process; } if (cluster.isMaster) { // Save worker in the cluster.workers array cluster.workers[this.id] = this; // Emit a fork event, on next tick // There is no worker.fork event since this has no real purpose process.nextTick(function() { cluster.emit('fork', self); }); } // handle internalMessage, exit and disconnect event this.process.on('internalMessage', handleMessage.bind(null, this)); this.process.once('exit', function(exitCode, signalCode) { prepareExit(self, 'dead'); self.emit('exit', exitCode, signalCode); cluster.emit('exit', self, exitCode, signalCode); }); this.process.once('disconnect', function() { prepareExit(self, 'disconnected'); self.emit('disconnect'); cluster.emit('disconnect', self); }); // relay message and error this.process.on('message', this.emit.bind(this, 'message')); this.process.on('error', this.emit.bind(this, 'error')); } util.inherits(Worker, EventEmitter); cluster.Worker = Worker; function prepareExit(worker, state) { // set state to disconnect worker.state = state; // Make suicide a boolean worker.suicide = !!worker.suicide; // Remove from workers in the master if (cluster.isMaster) { delete cluster.workers[worker.id]; } } // Send internal message function sendInternalMessage(worker, message/*, handler, callback*/) { // Exist callback var callback = arguments[arguments.length - 1]; if (typeof callback !== 'function') { callback = undefined; } // exist handler var handler = arguments[2] !== callback ? arguments[2] : undefined; if (!isInternalMessage(message)) { message = internalMessage(message); } // Store callback for later if (callback) { message._requestEcho = worker.id + ':' + (++queryIds); queryCallbacks[message._requestEcho] = callback; } worker.send(message, handler); } // Send message to worker or master Worker.prototype.send = function() { // You could also just use process.send in a worker this.process.send.apply(this.process, arguments); }; // Kill the worker without restarting Worker.prototype.kill = Worker.prototype.destroy = function(signal) { if (!signal) signal = 'SIGTERM'; var self = this; this.suicide = true; if (cluster.isMaster) { // Disconnect IPC channel // this way the worker won't need to propagate suicide state to master if (self.process.connected) { self.process.once('disconnect', function() { self.process.kill(signal); }); self.process.disconnect(); } else { self.process.kill(signal); } } else { // Channel is open if (this.process.connected) { // Inform master to suicide and then kill sendInternalMessage(this, {cmd: 'suicide'}, function() { process.exit(0); }); // When channel is closed, terminate the process this.process.once('disconnect', function() { process.exit(0); }); } else { process.exit(0); } } }; // The .disconnect function will close all servers // and then disconnect the IPC channel. if (cluster.isMaster) { // Used in master Worker.prototype.disconnect = function() { this.suicide = true; sendInternalMessage(this, {cmd: 'disconnect'}); }; } else { // Used in workers Worker.prototype.disconnect = function() { var self = this; this.suicide = true; // keep track of open servers var servers = Object.keys(serverListeners).length; var progress = new ProgressTracker(servers, function() { // There are no more servers open so we will close the IPC channel. // Closing the IPC channel will emit a disconnect event // in both master and worker on the process object. // This event will be handled by prepareExit. self.process.disconnect(); }); // depending on where this function was called from (master or worker) // The suicide state has already been set, // but it doesn't really matter if we set it again. sendInternalMessage(this, {cmd: 'suicide'}, function() { // in case there are no servers progress.check(); // closing all servers gracefully var server; for (var key in serverListeners) { server = serverListeners[key]; // in case the server is closed we won't close it again if (server._handle === null) { progress.done(); continue; } server.on('close', progress.done.bind(progress)); server.close(); } }); }; } // Fork a new worker cluster.fork = function(env) { // This can only be called from the master. assert(cluster.isMaster); // Make sure that the master has been initialized cluster.setupMaster(); return (new cluster.Worker(env)); }; // execute .disconnect on all workers and close handlers when done cluster.disconnect = function(callback) { // This can only be called from the master. assert(cluster.isMaster); // Close all TCP handlers when all workers are disconnected var workers = Object.keys(cluster.workers).length; var progress = new ProgressTracker(workers, function() { for (var key in serverHandlers) { serverHandlers[key].close(); delete serverHandlers[key]; } // call callback when done if (callback) callback(); }); // begin disconnecting all workers eachWorker(function(worker) { worker.once('disconnect', progress.done.bind(progress)); worker.disconnect(); }); // in case there weren't any workers progress.check(); }; // Internal function. Called from src/node.js when worker process starts. cluster._setupWorker = function() { // Get worker class var worker = cluster.worker = new Worker(); // we will terminate the worker // when the worker is disconnected from the parent accidentally process.once('disconnect', function() { if (worker.suicide !== true) { process.exit(0); } }); // Tell master that the worker is online worker.state = 'online'; sendInternalMessage(worker, { cmd: 'online' }); }; // Internal function. Called by net.js and dgram.js when attempting to bind a // TCP server or UDP socket. cluster._getServer = function(tcpSelf, address, port, addressType, fd, cb) { // This can only be called from a worker. assert(cluster.isWorker); // Store tcp instance for later use var key = [address, port, addressType, fd].join(':'); serverListeners[key] = tcpSelf; // Send a listening message to the master tcpSelf.once('listening', function() { cluster.worker.state = 'listening'; sendInternalMessage(cluster.worker, { cmd: 'listening', address: address, port: tcpSelf.address().port || port, addressType: addressType, fd: fd }); }); // Request the fd handler from the master process var message = { cmd: 'queryServer', address: address, port: port, addressType: addressType, fd: fd }; // The callback will be stored until the master has responded sendInternalMessage(cluster.worker, message, function(msg, handle) { cb(handle, msg && msg.error); }); };