From 085a09874b03a1e5401d911ef61e9451b6ea3b30 Mon Sep 17 00:00:00 2001 From: Andreas Madsen Date: Sat, 14 Jul 2012 09:38:00 +0200 Subject: [PATCH] cluster: do not use internal server API --- lib/child_process.js | 2 +- lib/cluster.js | 120 ++++++++++++++---------------- lib/net.js | 74 ++++++++---------- test/simple/test-cluster-basic.js | 3 +- 4 files changed, 88 insertions(+), 111 deletions(-) diff --git a/lib/child_process.js b/lib/child_process.js index 972f68ce27..0418d73c01 100644 --- a/lib/child_process.js +++ b/lib/child_process.js @@ -101,7 +101,7 @@ var handleConversion = { var self = this; var server = new net.Server(); - server.listen(handle, function() { + server._listen(handle, function() { emit(server); }); } diff --git a/lib/cluster.js b/lib/cluster.js index 8f7df3e126..0d6b01d33a 100644 --- a/lib/cluster.js +++ b/lib/cluster.js @@ -159,24 +159,20 @@ function handleResponse(outMessage, outHandle, inMessage, inHandle, worker) { // Handle messages from both master and workers var messageHandler = {}; function handleMessage(worker, inMessage, inHandle) { + if (!isInternalMessage(inMessage)) return; // Remove internal prefix var message = util._extend({}, inMessage); message.cmd = inMessage.cmd.substr(INTERNAL_PREFIX.length); - var respondUsed = false; function respond(outMessage, outHandler) { - respondUsed = true; handleResponse(outMessage, outHandler, inMessage, inHandle, worker); } // Run handler if it exists if (messageHandler[message.cmd]) { messageHandler[message.cmd](message, worker, respond); - } - - // Send respond if it hasn't been called yet - if (respondUsed === false) { + } else { respond(); } } @@ -185,11 +181,13 @@ function handleMessage(worker, inMessage, inHandle) { if (cluster.isMaster) { // Handle online messages from workers - messageHandler.online = function(message, worker) { + messageHandler.online = function(message, worker, send) { worker.state = 'online'; debug('Worker ' + worker.process.pid + ' online'); worker.emit('online'); cluster.emit('online', worker); + + send(); }; // Handle queryServer messages from workers @@ -197,46 +195,38 @@ if (cluster.isMaster) { // This sequence of information is unique to the connection // but not to the worker - var args = [message.address, - message.port, - message.addressType, - message.fd]; - var key = args.join(':'); - var handler; + var args = message.args; + var key = JSON.stringify(args); if (serverHandlers.hasOwnProperty(key)) { - handler = serverHandlers[key]; - } else { - handler = serverHandlers[key] = net._createServerHandle.apply(net, args); + send({}, serverHandlers[key]); + return; } - // echo callback with the fd handler associated with it - send({}, handler); + var server = serverHandlers[key] = net.Server(); + server.once('listening', function() { + send({}, server); + }); + server.listen.apply(server, args); }; // Handle listening messages from workers - messageHandler.listening = function(message, worker) { + messageHandler.listening = function(message, worker, send) { worker.state = 'listening'; // Emit listening, now that we know the worker is listening - worker.emit('listening', { - address: message.address, - port: message.port, - addressType: message.addressType, - fd: message.fd - }); - cluster.emit('listening', worker, { - address: message.address, - port: message.port, - addressType: message.addressType, - fd: message.fd - }); + worker.emit('listening', message.address); + cluster.emit('listening', worker, message.address); + + send(); }; // Handle suicide messages from workers - messageHandler.suicide = function(message, worker) { + messageHandler.suicide = function(message, worker, send) { worker.suicide = true; + + send(); }; } @@ -245,8 +235,9 @@ if (cluster.isMaster) { else if (cluster.isWorker) { // Handle worker.disconnect from master - messageHandler.disconnect = function(message, worker) { + messageHandler.disconnect = function(message, worker, send) { worker.disconnect(); + send(); }; } @@ -521,38 +512,37 @@ cluster._setupWorker = function() { }; // Internal function. Called by lib/net.js when attempting to bind a server. -cluster._getServer = function(tcpSelf, address, port, addressType, fd, cb) { - // This can only be called from a worker. - assert(cluster.isWorker); - - // Store tcp instance for later use - var key = [address, port, addressType, fd].join(':'); - serverListeners[key] = tcpSelf; - - // Send a listening message to the master - tcpSelf.once('listening', function() { - cluster.worker.state = 'listening'; - sendInternalMessage(cluster.worker, { - cmd: 'listening', - address: address, - port: port, - addressType: addressType, - fd: fd - }); - }); +if (cluster.isWorker) { + var localListen = net.Server.prototype.listen; + net.Server.prototype.listen = function() { + var self = this; - // Request the fd handler from the master process - var message = { - cmd: 'queryServer', - address: address, - port: port, - addressType: addressType, - fd: fd - }; + var args = new Array(arguments.length); + for (var i = 0; i < arguments.length; i++) { + args[i] = arguments[i]; + } - // The callback will be stored until the master has responded - sendInternalMessage(cluster.worker, message, function(msg, handle) { - cb(handle); - }); + // filter out callback + if (typeof args[args.length - 1] === 'function') { + this.once('listening', args.pop()); + } -}; + // add server (used by. dissconnect) + serverListeners[JSON.stringify(args)] = this; + + // send callback to master, telling that worker is listening + this.once('listening', function() { + cluster.worker.state = 'listening'; + + var message = { cmd: 'listening', address: this.address() }; + sendInternalMessage(cluster.worker, message); + }); + + // request server + var message = { cmd: 'queryServer', args: args }; + + sendInternalMessage(cluster.worker, message, function(msg, server) { + localListen.call(self, server); + }); + }; +} diff --git a/lib/net.js b/lib/net.js index 30746d42e8..28e733bc13 100644 --- a/lib/net.js +++ b/lib/net.js @@ -834,8 +834,7 @@ exports.Server = Server; function toNumber(x) { return (x = Number(x)) >= 0 ? x : false; } -var createServerHandle = exports._createServerHandle = - function(address, port, addressType, fd) { +var createServerHandle = function(address, port, addressType, fd) { var r = 0; // assign handle in listen, and clean up if bind or listen fails var handle; @@ -922,56 +921,43 @@ Server.prototype._listen2 = function(address, port, addressType, backlog, fd) { function listen(self, address, port, addressType, backlog, fd) { - if (!cluster) cluster = require('cluster'); - - if (cluster.isWorker) { - cluster._getServer(self, address, port, addressType, fd, function(handle) { - self._handle = handle; - var r = self._listen2(address, port, addressType, backlog, fd); - if (r === 0) { - self.emit('listening'); - } - }); - } else { - // If there is not yet a handle, we need to create one and bind. - // In the case of a server sent via IPC, we don't need to do this. + // If there is not yet a handle, we need to create one and bind. + // In the case of a server sent via IPC, we don't need to do this. + if (!self._handle) { + self._handle = createServerHandle(address, port, addressType, fd); if (!self._handle) { - self._handle = createServerHandle(address, port, addressType, fd); - if (!self._handle) { - process.nextTick(function() { - self.emit('error', errnoException(errno, 'listen')); - }); - return; - } - } - - // self._handle.listen will be called lazy - // if there are no connection listeners - if (self.listeners('connection').length === 0) { - self.on('newListener', function removeme(name) { - if (name !== 'connection') return; - - self.removeListener('newListener', removeme); - self._listen2(address, port, addressType, backlog, fd); - }); - process.nextTick(function() { - self.emit('listening'); + self.emit('error', errnoException(errno, 'listen')); }); return; } + } - var r = self._listen2(address, port, addressType, backlog, fd); - if (r === 0) { - process.nextTick(function() { - self.emit('listening'); - }); - } + // self._handle.listen will be called lazy + // if there are no connection listeners + if (self.listeners('connection').length === 0) { + self.on('newListener', function removeme(name) { + if (name !== 'connection') return; + + self.removeListener('newListener', removeme); + self._listen2(address, port, addressType, backlog, fd); + }); + + process.nextTick(function() { + self.emit('listening'); + }); + return; } -} + var r = self._listen2(address, port, addressType, backlog, fd); + if (r === 0) { + process.nextTick(function() { + self.emit('listening'); + }); + } +} -Server.prototype.listen = function() { +Server.prototype._listen = function() { var self = this; var lastArg = arguments[arguments.length - 1]; @@ -1030,6 +1016,8 @@ Server.prototype.listen = function() { return self; }; +Server.prototype.listen = Server.prototype._listen; + Server.prototype.address = function() { if (this._handle && this._handle.getsockname) { return this._handle.getsockname(); diff --git a/test/simple/test-cluster-basic.js b/test/simple/test-cluster-basic.js index 88ba6edfff..fe248b9413 100644 --- a/test/simple/test-cluster-basic.js +++ b/test/simple/test-cluster-basic.js @@ -136,8 +136,7 @@ else if (cluster.isMaster) { assert.equal(arguments.length, 1); var expect = { address: '127.0.0.1', port: common.PORT, - addressType: 4, - fd: undefined }; + family: 'IPv4'}; assert.deepEqual(arguments[0], expect); break;