From db5ee0b3decace9b5d80ee535ce53183aff02909 Mon Sep 17 00:00:00 2001 From: Fedor Indutny Date: Mon, 14 Jan 2013 21:08:20 +0400 Subject: [PATCH] child_process: do not keep list of sent sockets Keeping list of all sockets that were sent to child process causes memory leak and thus unacceptable (see #4587). However `server.close()` should still work properly. This commit introduces two options: * child.send(socket, { track: true }) - will send socket and track its status. You should use it when you want `server.connections` to be a reliable number, and receive `close` event on sent sockets. * child.send(socket) - will send socket without tracking it status. This performs much better, because of smaller number of RTT between master and child. With both of these options `server.close()` will wait for all sent sockets to get closed. --- doc/api/child_process.markdown | 8 +- doc/api/net.markdown | 11 +- lib/child_process.js | 191 +++++++++++++----- lib/net.js | 57 ++++-- .../test-child-process-fork-getconnections.js | 107 ++++++++++ test/simple/test-child-process-fork-net2.js | 59 +++--- test/simple/test-child-process-fork-track.js | 110 ++++++++++ 7 files changed, 448 insertions(+), 95 deletions(-) create mode 100644 test/simple/test-child-process-fork-getconnections.js create mode 100644 test/simple/test-child-process-fork-track.js diff --git a/doc/api/child_process.markdown b/doc/api/child_process.markdown index d803cffe9b..343030d53f 100644 --- a/doc/api/child_process.markdown +++ b/doc/api/child_process.markdown @@ -124,10 +124,11 @@ process may not actually kill it. `kill` really just sends a signal to a proces See `kill(2)` -### child.send(message, [sendHandle]) +### child.send(message, [sendHandle], [options]) * `message` {Object} * `sendHandle` {Handle object} +* `options` {Object} When using `child_process.fork()` you can write to the child using `child.send(message, [sendHandle])` and messages are received by @@ -166,6 +167,11 @@ The `sendHandle` option to `child.send()` is for sending a TCP server or socket object to another process. The child will receive the object as its second argument to the `message` event. +The `options` object may have the following properties: + + * `track` - Notify master process when `sendHandle` will be closed in child + process. (`false` by default) + **send server object** Here is an example of sending a server: diff --git a/doc/api/net.markdown b/doc/api/net.markdown index 2b54dbeb0f..804935e39e 100644 --- a/doc/api/net.markdown +++ b/doc/api/net.markdown @@ -231,10 +231,19 @@ with `child_process.fork()`. The number of concurrent connections on the server. -This becomes `null` when sending a socket to a child with `child_process.fork()`. +This becomes `null` when sending a socket to a child with +`child_process.fork()`. To poll forks and get current number of active +connections use asynchronous `server.getConnections` instead. `net.Server` is an [EventEmitter][] with the following events: +### server.getConnections(callback) + +Asynchronously get the number of concurrent connections on the server. Works +when sockets were sent to forks. + +Callback should take two arguments `err` and `count`. + ### Event: 'listening' Emitted when the server has been bound after calling `server.listen`. diff --git a/lib/child_process.js b/lib/child_process.js index f69c47d390..1130af75ed 100644 --- a/lib/child_process.js +++ b/lib/child_process.js @@ -107,29 +107,31 @@ var handleConversion = { }, 'net.Socket': { - send: function(message, socket) { - // pause socket so no data is lost, will be resumed later - - // if the socket wsa created by net.Server + send: function(message, socket, options) { + // if the socket was created by net.Server if (socket.server) { // the slave should keep track of the socket message.key = socket.server._connectionKey; var firstTime = !this._channel.sockets.send[message.key]; - - // add socket to connections list var socketList = getSocketList('send', this, message.key); - socketList.add(socket); - // the server should no longer expose a .connection property - // and when asked to close it should query the socket status from slaves - if (firstTime) { - socket.server._setupSlave(socketList); + if (options && options.track) { + // Keep track of socket's status + message.id = socketList.add(socket); + } else { + // the server should no longer expose a .connection property + // and when asked to close it should query the socket status from + // the slaves + if (firstTime) socket.server._setupSlave(socketList); + + // Act like socket is detached + socket.server._connections--; } } // remove handle from socket object, it will be closed when the socket - // has been send + // will be sent var handle = socket._handle; handle.onread = function() {}; socket._handle = null; @@ -137,6 +139,11 @@ var handleConversion = { return handle; }, + postSend: function(handle) { + // Close the Socket handle after sending it + handle.close(); + }, + got: function(message, handle, emit) { var socket = new net.Socket({handle: handle}); socket.readable = socket.writable = true; @@ -146,7 +153,10 @@ var handleConversion = { // add socket to connections list var socketList = getSocketList('got', this, message.key); - socketList.add(socket); + socketList.add({ + id: message.id, + socket: socket + }); } emit(socket); @@ -161,39 +171,98 @@ function SocketListSend(slave, key) { var self = this; this.key = key; - this.list = []; this.slave = slave; + // These two arrays are used to store the list of sockets and the freelist of + // indexes in this list. After insertion, item will have persistent index + // until it'll be removed. This way we can use this index as an identifier for + // sockets. + this.list = []; + this.freelist = []; + slave.once('disconnect', function() { self.flush(); }); this.slave.on('internalMessage', function(msg) { if (msg.cmd !== 'NODE_SOCKET_CLOSED' || msg.key !== self.key) return; - self.flush(); + self.remove(msg.id); }); } util.inherits(SocketListSend, EventEmitter); SocketListSend.prototype.add = function(socket) { - this.list.push(socket); + var index; + + // Pick one of free indexes, or insert in the end of the list + if (this.freelist.length > 0) { + index = this.freelist.pop(); + this.list[index] = socket; + } else { + index = this.list.push(socket) - 1; + } + + return index; +}; + +SocketListSend.prototype.remove = function(index) { + var socket = this.list[index]; + if (!socket) return; + + // Create a hole in the list and move index to the freelist + this.list[index] = null; + this.freelist.push(index); + + socket.destroy(); }; SocketListSend.prototype.flush = function() { var list = this.list; this.list = []; + this.freelist = []; list.forEach(function(socket) { - socket.destroy(); + if (socket) socket.destroy(); }); }; -SocketListSend.prototype.update = function() { - if (this.slave.connected === false) return; +SocketListSend.prototype._request = function request(msg, cmd, callback) { + var self = this; + + if (!this.slave.connected) return onslaveclose(); + this.slave.send(msg); + + function onclose() { + self.slave.removeListener('internalMessage', onreply); + callback(new Error('Slave closed before reply')); + }; + + function onreply(msg) { + if (msg.cmd !== cmd || msg.key !== self.key) return; + self.slave.removeListener('disconnect', onclose); + self.slave.removeListener('internalMessage', onreply); + + callback(null, msg); + }; + + this.slave.once('disconnect', onclose); + this.slave.on('internalMessage', onreply); +}; + +SocketListSend.prototype.close = function close(callback) { + this._request({ + cmd: 'NODE_SOCKET_NOTIFY_CLOSE', + key: this.key + }, 'NODE_SOCKET_ALL_CLOSED', callback); +}; - this.slave.send({ - cmd: 'NODE_SOCKET_FETCH', +SocketListSend.prototype.getConnections = function getConnections(callback) { + this._request({ + cmd: 'NODE_SOCKET_GET_COUNT', key: this.key + }, 'NODE_SOCKET_COUNT', function(err, msg) { + if (err) return callback(err); + callback(null, msg.count); }); }; @@ -203,45 +272,59 @@ function SocketListReceive(slave, key) { var self = this; + this.connections = 0; this.key = key; - this.list = []; this.slave = slave; - slave.on('internalMessage', function(msg) { - if (msg.cmd !== 'NODE_SOCKET_FETCH' || msg.key !== self.key) return; - - if (self.list.length === 0) { - self.flush(); - return; - } + function onempty() { + if (!self.slave.connected) return; - self.on('itemRemoved', function removeMe() { - if (self.list.length !== 0) return; - self.removeListener('itemRemoved', removeMe); - self.flush(); + self.slave.send({ + cmd: 'NODE_SOCKET_ALL_CLOSED', + key: self.key }); + } + + this.slave.on('internalMessage', function(msg) { + if (msg.key !== self.key) return; + + if (msg.cmd === 'NODE_SOCKET_NOTIFY_CLOSE') { + // Already empty + if (self.connections === 0) return onempty(); + + // Wait for sockets to get closed + self.once('empty', onempty); + } else if (msg.cmd === 'NODE_SOCKET_GET_COUNT') { + if (!self.slave.connected) return; + self.slave.send({ + cmd: 'NODE_SOCKET_COUNT', + key: self.key, + count: self.connections + }); + } }); } util.inherits(SocketListReceive, EventEmitter); -SocketListReceive.prototype.flush = function() { - this.list = []; +SocketListReceive.prototype.add = function(obj) { + var self = this; - if (this.slave.connected) { - this.slave.send({ - cmd: 'NODE_SOCKET_CLOSED', - key: this.key - }); - } -}; + this.connections++; -SocketListReceive.prototype.add = function(socket) { - var self = this; - this.list.push(socket); + // Notify previous owner of socket about its state change + obj.socket.once('close', function() { + self.connections--; - socket.on('close', function() { - self.list.splice(self.list.indexOf(socket), 1); - self.emit('itemRemoved'); + if (obj.id !== undefined && self.slave.connected) { + // Master wants to keep eye on socket status + self.slave.send({ + cmd: 'NODE_SOCKET_CLOSED', + key: self.key, + id: obj.id + }); + } + + if (self.connections === 0) self.emit('empty'); }); }; @@ -366,17 +449,16 @@ function setupChannel(target, channel) { var string = JSON.stringify(message) + '\n'; var writeReq = channel.writeUtf8String(string, handle); - // Close the Socket handle after sending it - if (message && message.type === 'net.Socket') { - handle.close(); - } - if (!writeReq) { var er = errnoException(errno, 'write', 'cannot write to IPC channel.'); this.emit('error', er); } - writeReq.oncomplete = nop; + if (obj && obj.postSend) { + writeReq.oncomplete = obj.postSend.bind(null, handle); + } else { + writeReq.oncomplete = nop; + } /* If the master is > 2 read() calls behind, please stop sending. */ return channel.writeQueueSize < (65536 * 2); @@ -656,6 +738,7 @@ function ChildProcess() { this._closesNeeded = 1; this._closesGot = 0; + this.connected = false; this.signalCode = null; this.exitCode = null; diff --git a/lib/net.js b/lib/net.js index 41e28195ae..4c79161908 100644 --- a/lib/net.js +++ b/lib/net.js @@ -874,8 +874,6 @@ function Server(/* [ options, ] listener */) { this._connections = 0; - // when server is using slaves .connections is not reliable - // so null will be return if thats the case Object.defineProperty(this, 'connections', { get: function() { if (self._usingSlaves) { @@ -890,6 +888,8 @@ function Server(/* [ options, ] listener */) { }); this._handle = null; + this._usingSlaves = false; + this._slaves = []; this.allowHalfOpen = options.allowHalfOpen || false; } @@ -1122,7 +1122,37 @@ function onconnection(clientHandle) { } +Server.prototype.getConnections = function(cb) { + if (!this._usingSlaves) return cb(null, this.connections); + + // Poll slaves + var left = this._slaves.length, + total = this._connections; + + function oncount(err, count) { + if (err) { + left = -1; + return cb(err); + } + + total += count; + if (--left === 0) return cb(null, total); + } + + this._slaves.forEach(function(slave) { + slave.getConnections(oncount); + }); +}; + + Server.prototype.close = function(cb) { + function onSlaveClose() { + if (--left !== 0) return; + + self._connections = 0; + self._emitCloseIfDrained(); + } + if (!this._handle) { // Throw error. Follows net_legacy behaviour. throw new Error('Not running'); @@ -1133,14 +1163,21 @@ Server.prototype.close = function(cb) { } this._handle.close(); this._handle = null; - this._emitCloseIfDrained(); - // fetch new socket lists if (this._usingSlaves) { - this._slaves.forEach(function(socketList) { - if (socketList.list.length === 0) return; - socketList.update(); + var self = this, + left = this._slaves.length; + + // Increment connections to be sure that, even if all sockets will be closed + // during polling of slaves, `close` event will be emitted only once. + this._connections++; + + // Poll slaves + this._slaves.forEach(function(slave) { + slave.close(onSlaveClose); }); + } else { + this._emitCloseIfDrained(); } return this; @@ -1167,12 +1204,8 @@ Server.prototype.listenFD = util.deprecate(function(fd, type) { return this.listen({ fd: fd }); }, 'listenFD is deprecated. Use listen({fd: }).'); -// when sending a socket using fork IPC this function is executed Server.prototype._setupSlave = function(socketList) { - if (!this._usingSlaves) { - this._usingSlaves = true; - this._slaves = []; - } + this._usingSlaves = true; this._slaves.push(socketList); }; diff --git a/test/simple/test-child-process-fork-getconnections.js b/test/simple/test-child-process-fork-getconnections.js new file mode 100644 index 0000000000..476dbc34cc --- /dev/null +++ b/test/simple/test-child-process-fork-getconnections.js @@ -0,0 +1,107 @@ +// Copyright Joyent, Inc. and other Node contributors. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the +// "Software"), to deal in the Software without restriction, including +// without limitation the rights to use, copy, modify, merge, publish, +// distribute, sublicense, and/or sell copies of the Software, and to permit +// persons to whom the Software is furnished to do so, subject to the +// following conditions: +// +// The above copyright notice and this permission notice shall be included +// in all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN +// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR +// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE +// USE OR OTHER DEALINGS IN THE SOFTWARE. + +var assert = require('assert'); +var common = require('../common'); +var fork = require('child_process').fork; +var net = require('net'); +var count = 12; + +if (process.argv[2] === 'child') { + var sockets = []; + var id = process.argv[3]; + + process.on('message', function(m, socket) { + if (socket) { + sockets.push(socket); + } + + if (m.cmd === 'close') { + sockets[m.id].once('close', function() { + process.send({ id: m.id, status: 'closed' }); + }); + sockets[m.id].destroy(); + } + }); +} else { + var child = fork(process.argv[1], ['child']); + + var server = net.createServer(); + var sockets = []; + var sent = 0; + + server.on('connection', function(socket) { + child.send({ cmd: 'new' }, socket, { track: false }); + sockets.push(socket); + + if (sockets.length === count) { + closeSockets(); + server.close(); + } + }); + + var disconnected = 0; + server.on('listening', function() { + + var j = count, client; + while (j--) { + client = net.connect(common.PORT, '127.0.0.1'); + client.on('close', function() { + console.error('[m] CLIENT: close event'); + disconnected += 1; + }); + // XXX This resume() should be unnecessary. + // a stream high water mark should be enough to keep + // consuming the input. + client.resume(); + } + }); + + function closeSockets(i) { + if (!i) i = 0; + if (i === count) return; + + sent++; + child.send({ id: i, cmd: 'close' }); + child.once('message', function(m) { + assert(m.status === 'closed'); + server.getConnections(function(err, num) { + closeSockets(i + 1); + }); + }); + }; + + var closeEmitted = false; + server.on('close', function() { + console.error('[m] server close'); + closeEmitted = true; + + child.kill(); + }); + + server.listen(common.PORT, '127.0.0.1'); + + process.on('exit', function() { + assert.equal(sent, count); + assert.equal(disconnected, count); + assert.ok(closeEmitted); + }); +} diff --git a/test/simple/test-child-process-fork-net2.js b/test/simple/test-child-process-fork-net2.js index be749fe319..a8e8039b27 100644 --- a/test/simple/test-child-process-fork-net2.js +++ b/test/simple/test-child-process-fork-net2.js @@ -26,13 +26,13 @@ var net = require('net'); var count = 12; if (process.argv[2] === 'child') { - var needEnd = []; + var id = process.argv[3]; process.on('message', function(m, socket) { if (!socket) return; - console.error('got socket', m); + console.error('[%d] got socket', id, m); // will call .end('end') or .write('write'); socket[m](m); @@ -40,11 +40,11 @@ if (process.argv[2] === 'child') { socket.resume(); socket.on('data', function() { - console.error('%d socket.data', process.pid, m); + console.error('[%d] socket.data', id, m); }); socket.on('end', function() { - console.error('%d socket.end', process.pid, m); + console.error('[%d] socket.end', id, m); }); // store the unfinished socket @@ -53,58 +53,62 @@ if (process.argv[2] === 'child') { } socket.on('close', function() { - console.error('%d socket.close', process.pid, m); + console.error('[%d] socket.close', id, m); }); socket.on('finish', function() { - console.error('%d socket finished', process.pid, m); + console.error('[%d] socket finished', id, m); }); }); process.on('message', function(m) { if (m !== 'close') return; - console.error('got close message'); + console.error('[%d] got close message', id); needEnd.forEach(function(endMe, i) { - console.error('%d ending %d', process.pid, i); + console.error('[%d] ending %d', id, i); endMe.end('end'); }); }); process.on('disconnect', function() { - console.error('%d process disconnect, ending', process.pid); + console.error('[%d] process disconnect, ending', id); needEnd.forEach(function(endMe, i) { - console.error('%d ending %d', process.pid, i); + console.error('[%d] ending %d', id, i); endMe.end('end'); }); - endMe = null; }); } else { - var child1 = fork(process.argv[1], ['child']); - var child2 = fork(process.argv[1], ['child']); - var child3 = fork(process.argv[1], ['child']); + var child1 = fork(process.argv[1], ['child', '1']); + var child2 = fork(process.argv[1], ['child', '2']); + var child3 = fork(process.argv[1], ['child', '3']); var server = net.createServer(); - var connected = 0; + var connected = 0, + closed = 0; server.on('connection', function(socket) { switch (connected % 6) { case 0: - child1.send('end', socket); break; + child1.send('end', socket, { track: false }); break; case 1: - child1.send('write', socket); break; + child1.send('write', socket, { track: true }); break; case 2: - child2.send('end', socket); break; + child2.send('end', socket, { track: true }); break; case 3: - child2.send('write', socket); break; + child2.send('write', socket, { track: false }); break; case 4: - child3.send('end', socket); break; + child3.send('end', socket, { track: false }); break; case 5: - child3.send('write', socket); break; + child3.send('write', socket, { track: false }); break; } connected += 1; + socket.once('close', function() { + console.log('[m] socket closed, total %d', ++closed); + }); + if (connected === count) { closeServer(); } @@ -117,7 +121,7 @@ if (process.argv[2] === 'child') { while (j--) { client = net.connect(common.PORT, '127.0.0.1'); client.on('close', function() { - console.error('CLIENT: close event in master'); + console.error('[m] CLIENT: close event'); disconnected += 1; }); // XXX This resume() should be unnecessary. @@ -129,7 +133,7 @@ if (process.argv[2] === 'child') { var closeEmitted = false; server.on('close', function() { - console.error('server close'); + console.error('[m] server close'); closeEmitted = true; child1.kill(); @@ -141,18 +145,19 @@ if (process.argv[2] === 'child') { var timeElasped = 0; var closeServer = function() { - console.error('closeServer'); + console.error('[m] closeServer'); var startTime = Date.now(); server.on('close', function() { - console.error('emit(close)'); + console.error('[m] emit(close)'); timeElasped = Date.now() - startTime; }); - console.error('calling server.close'); + console.error('[m] calling server.close'); server.close(); setTimeout(function() { - console.error('sending close to children'); + assert(!closeEmitted); + console.error('[m] sending close to children'); child1.send('close'); child2.send('close'); child3.disconnect(); diff --git a/test/simple/test-child-process-fork-track.js b/test/simple/test-child-process-fork-track.js new file mode 100644 index 0000000000..2382f64d89 --- /dev/null +++ b/test/simple/test-child-process-fork-track.js @@ -0,0 +1,110 @@ +// Copyright Joyent, Inc. and other Node contributors. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the +// "Software"), to deal in the Software without restriction, including +// without limitation the rights to use, copy, modify, merge, publish, +// distribute, sublicense, and/or sell copies of the Software, and to permit +// persons to whom the Software is furnished to do so, subject to the +// following conditions: +// +// The above copyright notice and this permission notice shall be included +// in all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN +// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR +// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE +// USE OR OTHER DEALINGS IN THE SOFTWARE. + +var assert = require('assert'); +var common = require('../common'); +var fork = require('child_process').fork; +var net = require('net'); +var count = 12; + +if (process.argv[2] === 'child') { + var sockets = []; + var id = process.argv[3]; + + process.on('message', function(m, socket) { + if (socket) { + sockets.push(socket); + } + + if (m.cmd === 'close') { + sockets[m.id].once('close', function() { + process.send({ id: m.id, status: 'closed' }); + }); + sockets[m.id].destroy(); + } + }); +} else { + var child = fork(process.argv[1], ['child']); + + var server = net.createServer(); + var sockets = []; + var closed = 0; + + server.on('connection', function(socket) { + child.send({ cmd: 'new' }, socket, { track: true }); + sockets.push(socket); + + socket.once('close', function() { + console.error('[m] socket closed'); + closed++; + assert.equal(closed + server.connections, count); + if (server.connections === 0) server.close(); + }); + + if (sockets.length === count) { + closeSockets(); + } + }); + + var disconnected = 0; + server.on('listening', function() { + + var j = count, client; + while (j--) { + client = net.connect(common.PORT, '127.0.0.1'); + client.on('close', function() { + console.error('[m] CLIENT: close event'); + disconnected += 1; + }); + // XXX This resume() should be unnecessary. + // a stream high water mark should be enough to keep + // consuming the input. + client.resume(); + } + }); + + function closeSockets(i) { + if (!i) i = 0; + if (i === count) return; + + child.send({ id: i, cmd: 'close' }); + child.once('message', function(m) { + assert(m.status === 'closed'); + closeSockets(i + 1); + }); + }; + + var closeEmitted = false; + server.on('close', function() { + console.error('[m] server close'); + closeEmitted = true; + + child.kill(); + }); + + server.listen(common.PORT, '127.0.0.1'); + + process.on('exit', function() { + assert.equal(disconnected, count); + assert.equal(closed, count); + assert.ok(closeEmitted); + }); +}