Browse Source

Add cluster.setupMaster

Fixes #2470
v0.7.4-release
Andreas Madsen 13 years ago
committed by Ryan Dahl
parent
commit
f9a47debfc
  1. 34
      doc/api/cluster.markdown
  2. 59
      lib/cluster.js
  3. 84
      test/simple/test-cluster-kill-workers.js
  4. 132
      test/simple/test-cluster-master-error.js
  5. 89
      test/simple/test-cluster-setup-master.js

34
doc/api/cluster.markdown

@ -101,6 +101,40 @@ This can be used to restart the worker by calling `fork()` again.
cluster.fork(); cluster.fork();
}); });
### Event 'setup'
When the `.setupMaster()` function has been executed this event emits. If `.setupMaster()`
was not executed before `fork()` or `.autoFork()`, they will execute the function with no
arguments.
### cluster.setupMaster([options])
The `setupMaster` is used to change the default 'fork' behavior. It takes one option
object argument.
Example:
var cluster = require("cluster");
cluster.setupMaster({
exec : "worker.js",
args : ["--use", "https"],
silent : true
});
cluster.autoFork();
The options argument can contain 3 different properties.
- `exec` are the file path to the worker file, by default this is the same file as the master.
- `args` are a array of arguments send along with the worker, by default this is `process.argv.slice(2)`.
- `silent`, if this option is true the output of a worker won't propagate to the master, by default this is false.
### cluster.settings
All settings set by the `.setupMaster` is stored in this settings object.
This object is not supposed to be change or set manually, by you.
All propertys are `undefined` if they are not yet set.
### cluster.fork([env]) ### cluster.fork([env])
Spawn a new worker process. This can only be called from the master process. Spawn a new worker process. This can only be called from the master process.

59
lib/cluster.js

@ -61,8 +61,6 @@ var cluster = module.exports = new cluster();
var masterStarted = false; var masterStarted = false;
var ids = 0; var ids = 0;
var serverHandlers = {}; var serverHandlers = {};
var workerFilename;
var workerArgs;
// Used in the worker: // Used in the worker:
var serverLisenters = {}; var serverLisenters = {};
@ -78,6 +76,9 @@ cluster.worker = cluster.isWorker ? {} : null;
// The workers array is oly used in the naster // The workers array is oly used in the naster
cluster.workers = cluster.isMaster ? {} : null; cluster.workers = cluster.isMaster ? {} : null;
// Settings object
var settings = cluster.settings = {};
// Simple function there call a function on each worker // Simple function there call a function on each worker
function eachWorker(cb) { function eachWorker(cb) {
// Go througe all workers // Go througe all workers
@ -88,36 +89,44 @@ function eachWorker(cb) {
} }
} }
// Call this from the master process. It will start child workers. cluster.setupMaster = function(options) {
//
// options.workerFilename
// Specifies the script to execute for the child processes. Default is
// process.argv[1]
//
// options.args
// Specifies program arguments for the workers. The Default is
// process.argv.slice(2)
//
// options.workers
// The number of workers to start. Defaults to os.cpus().length.
function startMaster() {
// This can only be called from the master. // This can only be called from the master.
assert(cluster.isMaster); assert(cluster.isMaster);
// Don't allow this function to run more that once
if (masterStarted) return; if (masterStarted) return;
masterStarted = true; masterStarted = true;
workerFilename = process.argv[1]; // Get filename and arguments
workerArgs = process.argv.slice(2); options = options || {};
// Set settings object
settings = cluster.settings = {
exec: options.exec || process.argv[1],
args: options.args || process.argv.slice(2),
silent: options.silent || false
};
// Kill workers when a uncaught exception is received
process.on('uncaughtException', function(err) {
// Did the user install a listener? If so, it overrides this one.
if (process.listeners('uncaughtException').length > 1) return;
process.on('uncaughtException', function(e) { // Output the error stack, and create on if non exist
console.error('Exception in cluster master process: ' + if (!(err instanceof Error)) {
e.message + '\n' + e.stack); err = new Error(err);
}
console.error(err.stack);
// quick destroy cluster
quickDestroyCluster(); quickDestroyCluster();
// when done exit process with error code: 1
process.exit(1); process.exit(1);
}); });
}
// emit setup event
cluster.emit('setup');
};
// Check if a message is internal only // Check if a message is internal only
var INTERNAL_PREFIX = 'NODE_CLUSTER_'; var INTERNAL_PREFIX = 'NODE_CLUSTER_';
@ -275,10 +284,10 @@ function Worker(customEnv) {
} }
// fork worker // fork worker
this.process = fork(workerFilename, workerArgs, { this.process = fork(settings.exec, settings.args, {
'env': envCopy 'env': envCopy,
'silent': settings.silent
}); });
} else { } else {
this.process = process; this.process = process;
} }
@ -426,7 +435,7 @@ cluster.fork = function(env) {
assert(cluster.isMaster); assert(cluster.isMaster);
// Make sure that the master has been initalized // Make sure that the master has been initalized
startMaster(); cluster.setupMaster();
return (new cluster.Worker(env)); return (new cluster.Worker(env));
}; };

84
test/simple/test-cluster-kill-workers.js

@ -1,84 +0,0 @@
// Copyright Joyent, Inc. and other Node contributors.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the
// "Software"), to deal in the Software without restriction, including
// without limitation the rights to use, copy, modify, merge, publish,
// distribute, sublicense, and/or sell copies of the Software, and to permit
// persons to whom the Software is furnished to do so, subject to the
// following conditions:
//
// The above copyright notice and this permission notice shall be included
// in all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
// USE OR OTHER DEALINGS IN THE SOFTWARE.
// This test checks that if we kill a cluster master immediately after fork,
// before the worker has time to register itself, that the master will still
// clean up the worker.
// https://github.com/joyent/node/issues/2047
var common = require('../common');
var assert = require('assert');
var cluster = require('cluster');
var fork = require('child_process').fork;
var isTestRunner = process.argv[2] != 'child';
if (isTestRunner) {
console.log("starting master...");
var master = fork(__filename, [ 'child' ]);
console.log("master pid =", master.pid);
var workerPID;
master.on("message", function(m) {
console.log("got message from master:", m);
if (m.workerPID) {
console.log("worker pid =", m.workerPID);
workerPID = m.workerPID;
}
});
var gotExit = false;
var gotKillException = false;
master.on('exit', function(code) {
gotExit = true;
assert(code != 0);
assert(workerPID > 0);
try {
process.kill(workerPID, 0);
} catch(e) {
// workerPID is no longer running
console.log(e)
assert(e.code == 'ESRCH');
gotKillException = true;
}
})
process.on('exit', function() {
assert(gotExit);
assert(gotKillException);
});
} else {
// Cluster stuff.
if (cluster.isMaster) {
var worker = cluster.fork();
process.send({ workerPID: worker.process.pid });
// should kill the worker too
throw new Error('kill master');
} else {
setTimeout(function() {
assert(false, 'worker should have been killed');
}, 2500);
}
}

132
test/simple/test-cluster-master-error.js

@ -0,0 +1,132 @@
// Copyright Joyent, Inc. and other Node contributors.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the
// "Software"), to deal in the Software without restriction, including
// without limitation the rights to use, copy, modify, merge, publish,
// distribute, sublicense, and/or sell copies of the Software, and to permit
// persons to whom the Software is furnished to do so, subject to the
// following conditions:
//
// The above copyright notice and this permission notice shall be included
// in all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
// USE OR OTHER DEALINGS IN THE SOFTWARE.
var common = require('../common');
var assert = require('assert');
var cluster = require('cluster');
// Cluster setup
if (cluster.isWorker) {
var http = require('http');
http.Server(function() {
}).listen(common.PORT, '127.0.0.1');
} else if (process.argv[2] === 'cluster') {
var totalWorkers = 2;
// Send PID to testcase process
var forkNum = 0;
cluster.on('fork', function forkEvent(worker) {
// Send PID
process.send({
cmd: 'worker',
workerPID: worker.process.pid
});
// Stop listening when done
if (++forkNum === totalWorkers) {
cluster.removeListener('fork', forkEvent);
}
});
// Throw accidently error when all workers are listening
var listeningNum = 0;
cluster.on('listening', function listeningEvent() {
// When all workers are listening
if (++listeningNum === totalWorkers) {
// Stop listening
cluster.removeListener('listening', listeningEvent);
// throw accidently error
process.nextTick(function() {
throw 'accidently error';
});
}
});
// Startup a basic cluster
cluster.fork();
cluster.fork();
} else {
// This is the testcase
var fork = require('child_process').fork;
var isAlive = function(pid) {
try {
//this will throw an error if the process is dead
process.kill(pid, 0);
return true;
} catch (e) {
return false;
}
};
var existMaster = false;
var existWorker = false;
// List all workers
var workers = [];
// Spawn a cluster process
var master = fork(process.argv[1], ['cluster'], {silent: true});
// Handle messages from the cluster
master.on('message', function(data) {
// Add worker pid to list and progress tracker
if (data.cmd === 'worker') {
workers.push(data.workerPID);
}
});
// When cluster is dead
master.on('exit', function(code) {
// Check that the cluster died accidently
existMaster = (code === 1);
// When master is dead all workers should be dead to
var alive = false;
workers.forEach(function(pid) {
if (isAlive(pid)) {
alive = true;
}
});
// If a worker was alive this did not act as expected
existWorker = !alive;
});
process.once('exit', function() {
assert.ok(existMaster, 'The master did not die after an error was throwed');
assert.ok(existWorker, 'The workers did not die after an error in the master');
});
}

89
test/simple/test-cluster-setup-master.js

@ -0,0 +1,89 @@
// Copyright Joyent, Inc. and other Node contributors.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the
// "Software"), to deal in the Software without restriction, including
// without limitation the rights to use, copy, modify, merge, publish,
// distribute, sublicense, and/or sell copies of the Software, and to permit
// persons to whom the Software is furnished to do so, subject to the
// following conditions:
//
// The above copyright notice and this permission notice shall be included
// in all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
// USE OR OTHER DEALINGS IN THE SOFTWARE.
var common = require('../common');
var assert = require('assert');
var cluster = require('cluster');
if (cluster.isWorker) {
// Just keep the worker alive
process.send(process.argv[2]);
} else if (cluster.isMaster) {
var checks = {
args: false,
setupEvent: false,
settingsObject: false
};
var totalWorkers = 2;
cluster.once('setup', function() {
checks.setupEvent = true;
var settings = cluster.settings;
if (settings &&
settings.args && settings.args[0] === 'custom argument' &&
settings.silent === true &&
settings.exec === process.argv[1]) {
checks.settingsObject = true;
}
});
// Setup master
cluster.setupMaster({
args: ['custom argument'],
silent: true
});
var correctIn = 0;
cluster.on('online', function lisenter(worker) {
worker.once('message', function(data) {
correctIn += (data === 'custom argument' ? 1 : 0);
if (correctIn === totalWorkers) {
checks.args = true;
}
worker.destroy();
});
// All workers are online
if (cluster.onlineWorkers === totalWorkers) {
checks.workers = true;
}
});
// Start all workers
cluster.fork();
cluster.fork();
// Check all values
process.once('exit', function() {
assert.ok(checks.args, 'The arguments was noy send to the worker');
assert.ok(checks.setupEvent, 'The setup event was never emitted');
assert.ok(checks.settingsObject, 'The settingsObject do not have correct properties');
});
}
Loading…
Cancel
Save