Browse Source

cluster: add graceful disconnect support

This patch add a worker.disconnect() method there will stop the worker from accepting
new connections and then stop the IPC. This allow the worker to die graceful.
When the IPC has been disconnected a 'disconnect' event will emit.

The patch also add a cluster.disconnect() method, this will call worker.disconnect() on
all connected workers. When the workers are disconneted it will then close all server
handlers. This allow the cluster itself to self terminate in a graceful way.
v0.9.1-release
Andreas Madsen 13 years ago
committed by isaacs
parent
commit
d927fbc9ab
  1. 90
      doc/api/cluster.markdown
  2. 117
      lib/cluster.js
  3. 122
      test/simple/test-cluster-disconnect.js
  4. 110
      test/simple/test-cluster-worker-disconnect.js

90
doc/api/cluster.markdown

@ -118,6 +118,21 @@ where the 'listening' event is emitted.
console.log("We are now connected");
});
## Event: 'disconnect'
* `worker` {Worker object}
When a workers IPC channel has disconnected this event is emitted. This will happen
when the worker die, usually after calling `.destroy()`.
But also when calling `.disconnect()`, in this case it is possible there is delay
between the `disconnect` and `death` and the event can be used to detect if the
process is stuck in a cleanup or if there are long living connection.
cluster.on('disconnect', function(worker) {
console.log('The worker #' + worker.uniqueID + ' has disconnected');
});
## Event: 'death'
* `worker` {Worker object}
@ -179,6 +194,16 @@ Spawn a new worker process. This can only be called from the master process.
All settings set by the `.setupMaster` is stored in this settings object.
This object is not supposed to be change or set manually.
## cluster.disconnect([callback])
* `callback` {Function} called when all workers are disconnected and handlers are closed
When calling this method all workers will commit a graceful suicide. When they are
disconnected all internal handlers will be closed, allowing the master process to
die graceful if no other event is waiting.
The method takes an optional callback argument there will be called when finished.
## cluster.workers
* {Object}
@ -232,9 +257,8 @@ See: [Child Process module](child_process.html)
* {Boolean}
This property is a boolean. It is set when a worker dies, until then it is
`undefined`. It is true if the worker was killed using the `.destroy()`
method, and false otherwise.
This property is a boolean. It is set when a worker dies after calling `.destroy()`
or immediately after calling the `.disconnect()` method. Until then it is `undefined`.
### worker.send(message, [sendHandle])
@ -273,6 +297,55 @@ a suicide boolean is set to true.
// destroy worker
worker.destroy();
## Worker.disconnect()
When calling this function the worker will no longer accept new connections, but
they will be handled by any other listening worker. Existing connection will be
allowed to exit as usual. When no more connections exist, the IPC channel to the worker
will close allowing it to die graceful. When the IPC channel is closed the `disconnect`
event will emit, this is then followed by the `death` event, there is emitted when
the worker finally die.
Because there might be long living connections, it is useful to implement a timeout.
This example ask the worker to disconnect and after 2 seconds it will destroy the
server. An alternative wound be to execute `worker.destroy()` after 2 seconds, but
that would normally not allow the worker to do any cleanup if needed.
if (cluster.isMaster) {
var worker = cluser.fork();
var timeout;
worker.on('listening', function () {
worker.disconnect();
timeout = setTimeout(function () {
worker.send('force kill');
}, 2000);
});
worker.on('disconnect', function () {
clearTimeout(timeout);
});
} else if (cluster.isWorker) {
var net = require('net');
var server = net.createServer(function (socket) {
// connection never end
});
server.listen(8000);
server.on('close', function () {
// cleanup
});
process.on('message', function (msg) {
if (msg === 'force kill') {
server.destroy();
}
});
}
### Event: 'message'
* `message` {Object}
@ -342,6 +415,17 @@ on the specified worker.
// Worker is listening
};
## Event: 'disconnect'
* `worker` {Worker object}
Same as the `cluster.on('disconnect')` event, but emits only when the state change
on the specified worker.
cluster.fork().on('disconnect', function (worker) {
// Worker has disconnected
};
## Event: 'death'
* `worker` {Worker object}

117
lib/cluster.js

@ -77,6 +77,19 @@ function eachWorker(cb) {
}
}
// Extremely simple progress tracker
function ProgressTracker(missing, callback) {
this.missing = missing;
this.callback = callback;
}
ProgressTracker.prototype.done = function() {
this.missing -= 1;
this.check();
};
ProgressTracker.prototype.check = function() {
if (this.missing === 0) this.callback();
};
cluster.setupMaster = function(options) {
// This can only be called from the master.
assert(cluster.isMaster);
@ -239,7 +252,10 @@ if (cluster.isMaster) {
// Messages to a worker will be handled using this methods
else if (cluster.isWorker) {
// TODO: the disconnect step will use this
// Handle worker.disconnect from master
messageHandingObject.disconnect = function(message, worker) {
worker.disconnect();
};
}
function toDecInt(value) {
@ -293,9 +309,11 @@ function Worker(customEnv) {
});
}
// handle internalMessage and exit event
// handle internalMessage, exit and disconnect event
this.process.on('internalMessage', handleMessage.bind(null, this));
this.process.on('exit', prepareDeath.bind(null, this, 'dead', 'death'));
this.process.on('disconnect',
prepareDeath.bind(null, this, 'disconnected', 'disconnect'));
// relay message and error
this.process.on('message', this.emit.bind(this, 'message'));
@ -356,14 +374,6 @@ Worker.prototype.send = function() {
this.process.send.apply(this.process, arguments);
};
function closeWorkerChannel(worker, callback) {
//Apparently the .close method is async, but do not have a callback
worker.process._channel.close();
worker.process._channel = null;
process.nextTick(callback);
}
// Kill the worker without restarting
Worker.prototype.destroy = function() {
var self = this;
@ -373,9 +383,14 @@ Worker.prototype.destroy = function() {
if (cluster.isMaster) {
// Disconnect IPC channel
// this way the worker won't need to propagate suicide state to master
closeWorkerChannel(this, function() {
if (self.process.connected) {
self.process.once('disconnect', function() {
self.process.kill();
});
self.process.disconnect();
} else {
self.process.kill();
}
} else {
// Channel is open
@ -403,6 +418,59 @@ Worker.prototype.destroy = function() {
}
};
// The .disconnect function will close all server and then disconnect
// the IPC channel.
if (cluster.isMaster) {
// Used in master
Worker.prototype.disconnect = function() {
this.suicide = true;
sendInternalMessage(this, {cmd: 'disconnect'});
};
} else {
// Used in workers
Worker.prototype.disconnect = function() {
var self = this;
this.suicide = true;
// keep track of open servers
var servers = Object.keys(serverLisenters).length;
var progress = new ProgressTracker(servers, function() {
// there are no more servers open so we will close the IPC channel.
// Closeing the IPC channel will emit emit a disconnect event
// in both master and worker on the process object.
// This event will be handled by prepearDeath.
self.process.disconnect();
});
// depending on where this function was called from (master or worker)
// the suicide state has allready been set.
// But it dosn't really matter if we set it again.
sendInternalMessage(this, {cmd: 'suicide'}, function() {
// in case there are no servers
progress.check();
// closeing all servers graceful
var server;
for (var key in serverLisenters) {
server = serverLisenters[key];
// in case the server is closed we wont close it again
if (server._handle === null) {
progress.done();
continue;
}
server.on('close', progress.done.bind(progress));
server.close();
}
});
};
}
// Fork a new worker
cluster.fork = function(env) {
// This can only be called from the master.
@ -414,6 +482,33 @@ cluster.fork = function(env) {
return (new cluster.Worker(env));
};
// execute .disconnect on all workers and close handlers when done
cluster.disconnect = function(callback) {
// This can only be called from the master.
assert(cluster.isMaster);
// Close all TCP handlers when all workers are disconnected
var workers = Object.keys(cluster.workers).length;
var progress = new ProgressTracker(workers, function() {
for (var key in serverHandlers) {
serverHandlers[key].close();
delete serverHandlers[key];
}
// call callback when done
if (callback) callback();
});
// begin disconnecting all workers
eachWorker(function(worker) {
worker.once('disconnect', progress.done.bind(progress));
worker.disconnect();
});
// in case there wasn't any workers
progress.check();
};
// Sync way to quickly kill all cluster workers
// However the workers may not die instantly
function quickDestroyCluster() {

122
test/simple/test-cluster-disconnect.js

@ -0,0 +1,122 @@
// Copyright Joyent, Inc. and other Node contributors.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the
// "Software"), to deal in the Software without restriction, including
// without limitation the rights to use, copy, modify, merge, publish,
// distribute, sublicense, and/or sell copies of the Software, and to permit
// persons to whom the Software is furnished to do so, subject to the
// following conditions:
//
// The above copyright notice and this permission notice shall be included
// in all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
// USE OR OTHER DEALINGS IN THE SOFTWARE.
var common = require('../common');
var assert = require('assert');
var cluster = require('cluster');
var net = require('net');
if (cluster.isWorker) {
net.createServer(function(socket) {
socket.end('echo');
}).listen(common.PORT, '127.0.0.1');
net.createServer(function(socket) {
socket.end('echo');
}).listen(common.PORT + 1, '127.0.0.1');
} else if (cluster.isMaster) {
// test a single TCP server
var testConnection = function(port, cb) {
var socket = net.connect(port, '127.0.0.1', function() {
// buffer result
var result = '';
socket.on('data', function(chunk) { result += chunk; });
// check result
socket.on('end', function() {
cb(result === 'echo');
});
});
};
// test both servers created in the cluster
var testCluster = function(cb) {
var servers = 2;
var done = 0;
for (var i = 0, l = servers; i < l; i++) {
testConnection(common.PORT + i, function(sucess) {
assert.ok(sucess);
done += 1;
if (done === servers) {
cb();
}
});
}
};
// start two workers and execute callback when both is listening
var startCluster = function(cb) {
var workers = 2;
var online = 0;
for (var i = 0, l = workers; i < l; i++) {
var worker = cluster.fork();
worker.on('listening', function() {
online += 1;
if (online === workers) {
cb();
}
});
}
};
var results = {
start: 0,
test: 0,
disconnect: 0
};
var test = function(again) {
//1. start cluster
startCluster(function() {
results.start += 1;
//2. test cluster
testCluster(function() {
results.test += 1;
//3. disconnect cluster
cluster.disconnect(function() {
results.disconnect += 1;
// run test again to confirm cleanup
if (again) {
test();
}
});
});
});
};
test(true);
process.once('exit', function() {
assert.equal(results.start, 2);
assert.equal(results.test, 2);
assert.equal(results.disconnect, 2);
});
}

110
test/simple/test-cluster-worker-disconnect.js

@ -0,0 +1,110 @@
// Copyright Joyent, Inc. and other Node contributors.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the
// "Software"), to deal in the Software without restriction, including
// without limitation the rights to use, copy, modify, merge, publish,
// distribute, sublicense, and/or sell copies of the Software, and to permit
// persons to whom the Software is furnished to do so, subject to the
// following conditions:
//
// The above copyright notice and this permission notice shall be included
// in all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
// USE OR OTHER DEALINGS IN THE SOFTWARE.
var common = require('../common');
var assert = require('assert');
var cluster = require('cluster');
if (cluster.isWorker) {
var http = require('http');
http.Server(function() {
}).listen(common.PORT, '127.0.0.1');
} else if (cluster.isMaster) {
var checks = {
cluster: {
emitDisconnect: false,
emitDeath: false,
callback: false
},
worker: {
emitDisconnect: false,
emitDeath: false,
state: false,
suicideMode: false,
died: false
}
};
// helper function to check if a process is alive
var alive = function(pid) {
try {
process.kill(pid, 0);
return true;
} catch (e) {
return false;
}
};
// start worker
var worker = cluster.fork();
// Disconnect worker when it is ready
worker.once('listening', function() {
worker.disconnect();
});
// Check cluster events
cluster.once('disconnect', function() {
checks.cluster.emitDisconnect = true;
});
cluster.once('death', function() {
checks.cluster.emitDeath = true;
});
// Check worker eventes and properties
worker.once('disconnect', function() {
checks.worker.emitDisconnect = true;
checks.worker.suicideMode = worker.suicide;
checks.worker.state = worker.state;
});
// Check that the worker died
worker.once('death', function() {
checks.worker.emitDeath = true;
checks.worker.died = !alive(worker.process.pid);
process.nextTick(function() {
process.exit(0);
});
});
process.once('exit', function() {
var w = checks.worker;
var c = checks.cluster;
// events
assert.ok(w.emitDisconnect, 'Disconnect event did not emit');
assert.ok(c.emitDisconnect, 'Disconnect event did not emit');
assert.ok(w.emitDeath, 'Death event did not emit');
assert.ok(c.emitDeath, 'Death event did not emit');
// flags
assert.equal(w.state, 'disconnected', 'The state property was not set');
assert.equal(w.suicideMode, true, 'Suicide mode was not set');
// is process alive
assert.ok(w.died, 'The worker did not die');
});
}
Loading…
Cancel
Save