From 44cd121c63b7a4d647631f14631485ec725d7cde Mon Sep 17 00:00:00 2001 From: Fedor Indutny Date: Fri, 18 Jan 2013 03:13:10 +0400 Subject: [PATCH] Revert "child_process: do not keep list of sent sockets" This reverts commit db5ee0b3decace9b5d80ee535ce53183aff02909. --- doc/api/child_process.markdown | 8 +- doc/api/net.markdown | 11 +- lib/child_process.js | 191 +++++------------- lib/net.js | 57 ++---- .../test-child-process-fork-getconnections.js | 107 ---------- test/simple/test-child-process-fork-net2.js | 59 +++--- test/simple/test-child-process-fork-track.js | 110 ---------- 7 files changed, 95 insertions(+), 448 deletions(-) delete mode 100644 test/simple/test-child-process-fork-getconnections.js delete mode 100644 test/simple/test-child-process-fork-track.js diff --git a/doc/api/child_process.markdown b/doc/api/child_process.markdown index 343030d53f..d803cffe9b 100644 --- a/doc/api/child_process.markdown +++ b/doc/api/child_process.markdown @@ -124,11 +124,10 @@ process may not actually kill it. `kill` really just sends a signal to a proces See `kill(2)` -### child.send(message, [sendHandle], [options]) +### child.send(message, [sendHandle]) * `message` {Object} * `sendHandle` {Handle object} -* `options` {Object} When using `child_process.fork()` you can write to the child using `child.send(message, [sendHandle])` and messages are received by @@ -167,11 +166,6 @@ The `sendHandle` option to `child.send()` is for sending a TCP server or socket object to another process. The child will receive the object as its second argument to the `message` event. -The `options` object may have the following properties: - - * `track` - Notify master process when `sendHandle` will be closed in child - process. (`false` by default) - **send server object** Here is an example of sending a server: diff --git a/doc/api/net.markdown b/doc/api/net.markdown index 804935e39e..2b54dbeb0f 100644 --- a/doc/api/net.markdown +++ b/doc/api/net.markdown @@ -231,19 +231,10 @@ with `child_process.fork()`. The number of concurrent connections on the server. -This becomes `null` when sending a socket to a child with -`child_process.fork()`. To poll forks and get current number of active -connections use asynchronous `server.getConnections` instead. +This becomes `null` when sending a socket to a child with `child_process.fork()`. `net.Server` is an [EventEmitter][] with the following events: -### server.getConnections(callback) - -Asynchronously get the number of concurrent connections on the server. Works -when sockets were sent to forks. - -Callback should take two arguments `err` and `count`. - ### Event: 'listening' Emitted when the server has been bound after calling `server.listen`. diff --git a/lib/child_process.js b/lib/child_process.js index 1130af75ed..f69c47d390 100644 --- a/lib/child_process.js +++ b/lib/child_process.js @@ -107,31 +107,29 @@ var handleConversion = { }, 'net.Socket': { - send: function(message, socket, options) { - // if the socket was created by net.Server + send: function(message, socket) { + // pause socket so no data is lost, will be resumed later + + // if the socket wsa created by net.Server if (socket.server) { // the slave should keep track of the socket message.key = socket.server._connectionKey; var firstTime = !this._channel.sockets.send[message.key]; + + // add socket to connections list var socketList = getSocketList('send', this, message.key); + socketList.add(socket); - if (options && options.track) { - // Keep track of socket's status - message.id = socketList.add(socket); - } else { - // the server should no longer expose a .connection property - // and when asked to close it should query the socket status from - // the slaves - if (firstTime) socket.server._setupSlave(socketList); - - // Act like socket is detached - socket.server._connections--; + // the server should no longer expose a .connection property + // and when asked to close it should query the socket status from slaves + if (firstTime) { + socket.server._setupSlave(socketList); } } // remove handle from socket object, it will be closed when the socket - // will be sent + // has been send var handle = socket._handle; handle.onread = function() {}; socket._handle = null; @@ -139,11 +137,6 @@ var handleConversion = { return handle; }, - postSend: function(handle) { - // Close the Socket handle after sending it - handle.close(); - }, - got: function(message, handle, emit) { var socket = new net.Socket({handle: handle}); socket.readable = socket.writable = true; @@ -153,10 +146,7 @@ var handleConversion = { // add socket to connections list var socketList = getSocketList('got', this, message.key); - socketList.add({ - id: message.id, - socket: socket - }); + socketList.add(socket); } emit(socket); @@ -171,14 +161,8 @@ function SocketListSend(slave, key) { var self = this; this.key = key; - this.slave = slave; - - // These two arrays are used to store the list of sockets and the freelist of - // indexes in this list. After insertion, item will have persistent index - // until it'll be removed. This way we can use this index as an identifier for - // sockets. this.list = []; - this.freelist = []; + this.slave = slave; slave.once('disconnect', function() { self.flush(); @@ -186,83 +170,30 @@ function SocketListSend(slave, key) { this.slave.on('internalMessage', function(msg) { if (msg.cmd !== 'NODE_SOCKET_CLOSED' || msg.key !== self.key) return; - self.remove(msg.id); + self.flush(); }); } util.inherits(SocketListSend, EventEmitter); SocketListSend.prototype.add = function(socket) { - var index; - - // Pick one of free indexes, or insert in the end of the list - if (this.freelist.length > 0) { - index = this.freelist.pop(); - this.list[index] = socket; - } else { - index = this.list.push(socket) - 1; - } - - return index; -}; - -SocketListSend.prototype.remove = function(index) { - var socket = this.list[index]; - if (!socket) return; - - // Create a hole in the list and move index to the freelist - this.list[index] = null; - this.freelist.push(index); - - socket.destroy(); + this.list.push(socket); }; SocketListSend.prototype.flush = function() { var list = this.list; this.list = []; - this.freelist = []; list.forEach(function(socket) { - if (socket) socket.destroy(); + socket.destroy(); }); }; -SocketListSend.prototype._request = function request(msg, cmd, callback) { - var self = this; - - if (!this.slave.connected) return onslaveclose(); - this.slave.send(msg); - - function onclose() { - self.slave.removeListener('internalMessage', onreply); - callback(new Error('Slave closed before reply')); - }; - - function onreply(msg) { - if (msg.cmd !== cmd || msg.key !== self.key) return; - self.slave.removeListener('disconnect', onclose); - self.slave.removeListener('internalMessage', onreply); - - callback(null, msg); - }; - - this.slave.once('disconnect', onclose); - this.slave.on('internalMessage', onreply); -}; - -SocketListSend.prototype.close = function close(callback) { - this._request({ - cmd: 'NODE_SOCKET_NOTIFY_CLOSE', - key: this.key - }, 'NODE_SOCKET_ALL_CLOSED', callback); -}; +SocketListSend.prototype.update = function() { + if (this.slave.connected === false) return; -SocketListSend.prototype.getConnections = function getConnections(callback) { - this._request({ - cmd: 'NODE_SOCKET_GET_COUNT', + this.slave.send({ + cmd: 'NODE_SOCKET_FETCH', key: this.key - }, 'NODE_SOCKET_COUNT', function(err, msg) { - if (err) return callback(err); - callback(null, msg.count); }); }; @@ -272,59 +203,45 @@ function SocketListReceive(slave, key) { var self = this; - this.connections = 0; this.key = key; + this.list = []; this.slave = slave; - function onempty() { - if (!self.slave.connected) return; - - self.slave.send({ - cmd: 'NODE_SOCKET_ALL_CLOSED', - key: self.key - }); - } + slave.on('internalMessage', function(msg) { + if (msg.cmd !== 'NODE_SOCKET_FETCH' || msg.key !== self.key) return; - this.slave.on('internalMessage', function(msg) { - if (msg.key !== self.key) return; - - if (msg.cmd === 'NODE_SOCKET_NOTIFY_CLOSE') { - // Already empty - if (self.connections === 0) return onempty(); - - // Wait for sockets to get closed - self.once('empty', onempty); - } else if (msg.cmd === 'NODE_SOCKET_GET_COUNT') { - if (!self.slave.connected) return; - self.slave.send({ - cmd: 'NODE_SOCKET_COUNT', - key: self.key, - count: self.connections - }); + if (self.list.length === 0) { + self.flush(); + return; } + + self.on('itemRemoved', function removeMe() { + if (self.list.length !== 0) return; + self.removeListener('itemRemoved', removeMe); + self.flush(); + }); }); } util.inherits(SocketListReceive, EventEmitter); -SocketListReceive.prototype.add = function(obj) { - var self = this; - - this.connections++; +SocketListReceive.prototype.flush = function() { + this.list = []; - // Notify previous owner of socket about its state change - obj.socket.once('close', function() { - self.connections--; + if (this.slave.connected) { + this.slave.send({ + cmd: 'NODE_SOCKET_CLOSED', + key: this.key + }); + } +}; - if (obj.id !== undefined && self.slave.connected) { - // Master wants to keep eye on socket status - self.slave.send({ - cmd: 'NODE_SOCKET_CLOSED', - key: self.key, - id: obj.id - }); - } +SocketListReceive.prototype.add = function(socket) { + var self = this; + this.list.push(socket); - if (self.connections === 0) self.emit('empty'); + socket.on('close', function() { + self.list.splice(self.list.indexOf(socket), 1); + self.emit('itemRemoved'); }); }; @@ -449,16 +366,17 @@ function setupChannel(target, channel) { var string = JSON.stringify(message) + '\n'; var writeReq = channel.writeUtf8String(string, handle); + // Close the Socket handle after sending it + if (message && message.type === 'net.Socket') { + handle.close(); + } + if (!writeReq) { var er = errnoException(errno, 'write', 'cannot write to IPC channel.'); this.emit('error', er); } - if (obj && obj.postSend) { - writeReq.oncomplete = obj.postSend.bind(null, handle); - } else { - writeReq.oncomplete = nop; - } + writeReq.oncomplete = nop; /* If the master is > 2 read() calls behind, please stop sending. */ return channel.writeQueueSize < (65536 * 2); @@ -738,7 +656,6 @@ function ChildProcess() { this._closesNeeded = 1; this._closesGot = 0; - this.connected = false; this.signalCode = null; this.exitCode = null; diff --git a/lib/net.js b/lib/net.js index 4c79161908..41e28195ae 100644 --- a/lib/net.js +++ b/lib/net.js @@ -874,6 +874,8 @@ function Server(/* [ options, ] listener */) { this._connections = 0; + // when server is using slaves .connections is not reliable + // so null will be return if thats the case Object.defineProperty(this, 'connections', { get: function() { if (self._usingSlaves) { @@ -888,8 +890,6 @@ function Server(/* [ options, ] listener */) { }); this._handle = null; - this._usingSlaves = false; - this._slaves = []; this.allowHalfOpen = options.allowHalfOpen || false; } @@ -1122,37 +1122,7 @@ function onconnection(clientHandle) { } -Server.prototype.getConnections = function(cb) { - if (!this._usingSlaves) return cb(null, this.connections); - - // Poll slaves - var left = this._slaves.length, - total = this._connections; - - function oncount(err, count) { - if (err) { - left = -1; - return cb(err); - } - - total += count; - if (--left === 0) return cb(null, total); - } - - this._slaves.forEach(function(slave) { - slave.getConnections(oncount); - }); -}; - - Server.prototype.close = function(cb) { - function onSlaveClose() { - if (--left !== 0) return; - - self._connections = 0; - self._emitCloseIfDrained(); - } - if (!this._handle) { // Throw error. Follows net_legacy behaviour. throw new Error('Not running'); @@ -1163,21 +1133,14 @@ Server.prototype.close = function(cb) { } this._handle.close(); this._handle = null; + this._emitCloseIfDrained(); + // fetch new socket lists if (this._usingSlaves) { - var self = this, - left = this._slaves.length; - - // Increment connections to be sure that, even if all sockets will be closed - // during polling of slaves, `close` event will be emitted only once. - this._connections++; - - // Poll slaves - this._slaves.forEach(function(slave) { - slave.close(onSlaveClose); + this._slaves.forEach(function(socketList) { + if (socketList.list.length === 0) return; + socketList.update(); }); - } else { - this._emitCloseIfDrained(); } return this; @@ -1204,8 +1167,12 @@ Server.prototype.listenFD = util.deprecate(function(fd, type) { return this.listen({ fd: fd }); }, 'listenFD is deprecated. Use listen({fd: }).'); +// when sending a socket using fork IPC this function is executed Server.prototype._setupSlave = function(socketList) { - this._usingSlaves = true; + if (!this._usingSlaves) { + this._usingSlaves = true; + this._slaves = []; + } this._slaves.push(socketList); }; diff --git a/test/simple/test-child-process-fork-getconnections.js b/test/simple/test-child-process-fork-getconnections.js deleted file mode 100644 index 476dbc34cc..0000000000 --- a/test/simple/test-child-process-fork-getconnections.js +++ /dev/null @@ -1,107 +0,0 @@ -// Copyright Joyent, Inc. and other Node contributors. -// -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the -// "Software"), to deal in the Software without restriction, including -// without limitation the rights to use, copy, modify, merge, publish, -// distribute, sublicense, and/or sell copies of the Software, and to permit -// persons to whom the Software is furnished to do so, subject to the -// following conditions: -// -// The above copyright notice and this permission notice shall be included -// in all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF -// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN -// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, -// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR -// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE -// USE OR OTHER DEALINGS IN THE SOFTWARE. - -var assert = require('assert'); -var common = require('../common'); -var fork = require('child_process').fork; -var net = require('net'); -var count = 12; - -if (process.argv[2] === 'child') { - var sockets = []; - var id = process.argv[3]; - - process.on('message', function(m, socket) { - if (socket) { - sockets.push(socket); - } - - if (m.cmd === 'close') { - sockets[m.id].once('close', function() { - process.send({ id: m.id, status: 'closed' }); - }); - sockets[m.id].destroy(); - } - }); -} else { - var child = fork(process.argv[1], ['child']); - - var server = net.createServer(); - var sockets = []; - var sent = 0; - - server.on('connection', function(socket) { - child.send({ cmd: 'new' }, socket, { track: false }); - sockets.push(socket); - - if (sockets.length === count) { - closeSockets(); - server.close(); - } - }); - - var disconnected = 0; - server.on('listening', function() { - - var j = count, client; - while (j--) { - client = net.connect(common.PORT, '127.0.0.1'); - client.on('close', function() { - console.error('[m] CLIENT: close event'); - disconnected += 1; - }); - // XXX This resume() should be unnecessary. - // a stream high water mark should be enough to keep - // consuming the input. - client.resume(); - } - }); - - function closeSockets(i) { - if (!i) i = 0; - if (i === count) return; - - sent++; - child.send({ id: i, cmd: 'close' }); - child.once('message', function(m) { - assert(m.status === 'closed'); - server.getConnections(function(err, num) { - closeSockets(i + 1); - }); - }); - }; - - var closeEmitted = false; - server.on('close', function() { - console.error('[m] server close'); - closeEmitted = true; - - child.kill(); - }); - - server.listen(common.PORT, '127.0.0.1'); - - process.on('exit', function() { - assert.equal(sent, count); - assert.equal(disconnected, count); - assert.ok(closeEmitted); - }); -} diff --git a/test/simple/test-child-process-fork-net2.js b/test/simple/test-child-process-fork-net2.js index a8e8039b27..be749fe319 100644 --- a/test/simple/test-child-process-fork-net2.js +++ b/test/simple/test-child-process-fork-net2.js @@ -26,13 +26,13 @@ var net = require('net'); var count = 12; if (process.argv[2] === 'child') { + var needEnd = []; - var id = process.argv[3]; process.on('message', function(m, socket) { if (!socket) return; - console.error('[%d] got socket', id, m); + console.error('got socket', m); // will call .end('end') or .write('write'); socket[m](m); @@ -40,11 +40,11 @@ if (process.argv[2] === 'child') { socket.resume(); socket.on('data', function() { - console.error('[%d] socket.data', id, m); + console.error('%d socket.data', process.pid, m); }); socket.on('end', function() { - console.error('[%d] socket.end', id, m); + console.error('%d socket.end', process.pid, m); }); // store the unfinished socket @@ -53,62 +53,58 @@ if (process.argv[2] === 'child') { } socket.on('close', function() { - console.error('[%d] socket.close', id, m); + console.error('%d socket.close', process.pid, m); }); socket.on('finish', function() { - console.error('[%d] socket finished', id, m); + console.error('%d socket finished', process.pid, m); }); }); process.on('message', function(m) { if (m !== 'close') return; - console.error('[%d] got close message', id); + console.error('got close message'); needEnd.forEach(function(endMe, i) { - console.error('[%d] ending %d', id, i); + console.error('%d ending %d', process.pid, i); endMe.end('end'); }); }); process.on('disconnect', function() { - console.error('[%d] process disconnect, ending', id); + console.error('%d process disconnect, ending', process.pid); needEnd.forEach(function(endMe, i) { - console.error('[%d] ending %d', id, i); + console.error('%d ending %d', process.pid, i); endMe.end('end'); }); + endMe = null; }); } else { - var child1 = fork(process.argv[1], ['child', '1']); - var child2 = fork(process.argv[1], ['child', '2']); - var child3 = fork(process.argv[1], ['child', '3']); + var child1 = fork(process.argv[1], ['child']); + var child2 = fork(process.argv[1], ['child']); + var child3 = fork(process.argv[1], ['child']); var server = net.createServer(); - var connected = 0, - closed = 0; + var connected = 0; server.on('connection', function(socket) { switch (connected % 6) { case 0: - child1.send('end', socket, { track: false }); break; + child1.send('end', socket); break; case 1: - child1.send('write', socket, { track: true }); break; + child1.send('write', socket); break; case 2: - child2.send('end', socket, { track: true }); break; + child2.send('end', socket); break; case 3: - child2.send('write', socket, { track: false }); break; + child2.send('write', socket); break; case 4: - child3.send('end', socket, { track: false }); break; + child3.send('end', socket); break; case 5: - child3.send('write', socket, { track: false }); break; + child3.send('write', socket); break; } connected += 1; - socket.once('close', function() { - console.log('[m] socket closed, total %d', ++closed); - }); - if (connected === count) { closeServer(); } @@ -121,7 +117,7 @@ if (process.argv[2] === 'child') { while (j--) { client = net.connect(common.PORT, '127.0.0.1'); client.on('close', function() { - console.error('[m] CLIENT: close event'); + console.error('CLIENT: close event in master'); disconnected += 1; }); // XXX This resume() should be unnecessary. @@ -133,7 +129,7 @@ if (process.argv[2] === 'child') { var closeEmitted = false; server.on('close', function() { - console.error('[m] server close'); + console.error('server close'); closeEmitted = true; child1.kill(); @@ -145,19 +141,18 @@ if (process.argv[2] === 'child') { var timeElasped = 0; var closeServer = function() { - console.error('[m] closeServer'); + console.error('closeServer'); var startTime = Date.now(); server.on('close', function() { - console.error('[m] emit(close)'); + console.error('emit(close)'); timeElasped = Date.now() - startTime; }); - console.error('[m] calling server.close'); + console.error('calling server.close'); server.close(); setTimeout(function() { - assert(!closeEmitted); - console.error('[m] sending close to children'); + console.error('sending close to children'); child1.send('close'); child2.send('close'); child3.disconnect(); diff --git a/test/simple/test-child-process-fork-track.js b/test/simple/test-child-process-fork-track.js deleted file mode 100644 index 2382f64d89..0000000000 --- a/test/simple/test-child-process-fork-track.js +++ /dev/null @@ -1,110 +0,0 @@ -// Copyright Joyent, Inc. and other Node contributors. -// -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the -// "Software"), to deal in the Software without restriction, including -// without limitation the rights to use, copy, modify, merge, publish, -// distribute, sublicense, and/or sell copies of the Software, and to permit -// persons to whom the Software is furnished to do so, subject to the -// following conditions: -// -// The above copyright notice and this permission notice shall be included -// in all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF -// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN -// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, -// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR -// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE -// USE OR OTHER DEALINGS IN THE SOFTWARE. - -var assert = require('assert'); -var common = require('../common'); -var fork = require('child_process').fork; -var net = require('net'); -var count = 12; - -if (process.argv[2] === 'child') { - var sockets = []; - var id = process.argv[3]; - - process.on('message', function(m, socket) { - if (socket) { - sockets.push(socket); - } - - if (m.cmd === 'close') { - sockets[m.id].once('close', function() { - process.send({ id: m.id, status: 'closed' }); - }); - sockets[m.id].destroy(); - } - }); -} else { - var child = fork(process.argv[1], ['child']); - - var server = net.createServer(); - var sockets = []; - var closed = 0; - - server.on('connection', function(socket) { - child.send({ cmd: 'new' }, socket, { track: true }); - sockets.push(socket); - - socket.once('close', function() { - console.error('[m] socket closed'); - closed++; - assert.equal(closed + server.connections, count); - if (server.connections === 0) server.close(); - }); - - if (sockets.length === count) { - closeSockets(); - } - }); - - var disconnected = 0; - server.on('listening', function() { - - var j = count, client; - while (j--) { - client = net.connect(common.PORT, '127.0.0.1'); - client.on('close', function() { - console.error('[m] CLIENT: close event'); - disconnected += 1; - }); - // XXX This resume() should be unnecessary. - // a stream high water mark should be enough to keep - // consuming the input. - client.resume(); - } - }); - - function closeSockets(i) { - if (!i) i = 0; - if (i === count) return; - - child.send({ id: i, cmd: 'close' }); - child.once('message', function(m) { - assert(m.status === 'closed'); - closeSockets(i + 1); - }); - }; - - var closeEmitted = false; - server.on('close', function() { - console.error('[m] server close'); - closeEmitted = true; - - child.kill(); - }); - - server.listen(common.PORT, '127.0.0.1'); - - process.on('exit', function() { - assert.equal(disconnected, count); - assert.equal(closed, count); - assert.ok(closeEmitted); - }); -}