Browse Source

child_process: do not keep list of sent sockets

Keeping list of all sockets that were sent to child process causes memory
leak and thus unacceptable (see #4587). However `server.close()` should
still work properly.

This commit introduces two options:

* child.send(socket, { track: true }) - will send socket and track its status.
  You should use it when you want `server.connections` to be a reliable
  number, and receive `close` event on sent sockets.
* child.send(socket) - will send socket without tracking it status. This
  performs much better, because of smaller number of RTT between master and
  child.

With both of these options `server.close()` will wait for all sent
sockets to get closed.
v0.9.7-release
Fedor Indutny 12 years ago
committed by isaacs
parent
commit
db5ee0b3de
  1. 8
      doc/api/child_process.markdown
  2. 11
      doc/api/net.markdown
  3. 191
      lib/child_process.js
  4. 57
      lib/net.js
  5. 107
      test/simple/test-child-process-fork-getconnections.js
  6. 59
      test/simple/test-child-process-fork-net2.js
  7. 110
      test/simple/test-child-process-fork-track.js

8
doc/api/child_process.markdown

@ -124,10 +124,11 @@ process may not actually kill it. `kill` really just sends a signal to a proces
See `kill(2)`
### child.send(message, [sendHandle])
### child.send(message, [sendHandle], [options])
* `message` {Object}
* `sendHandle` {Handle object}
* `options` {Object}
When using `child_process.fork()` you can write to the child using
`child.send(message, [sendHandle])` and messages are received by
@ -166,6 +167,11 @@ The `sendHandle` option to `child.send()` is for sending a TCP server or
socket object to another process. The child will receive the object as its
second argument to the `message` event.
The `options` object may have the following properties:
* `track` - Notify master process when `sendHandle` will be closed in child
process. (`false` by default)
**send server object**
Here is an example of sending a server:

11
doc/api/net.markdown

@ -231,10 +231,19 @@ with `child_process.fork()`.
The number of concurrent connections on the server.
This becomes `null` when sending a socket to a child with `child_process.fork()`.
This becomes `null` when sending a socket to a child with
`child_process.fork()`. To poll forks and get current number of active
connections use asynchronous `server.getConnections` instead.
`net.Server` is an [EventEmitter][] with the following events:
### server.getConnections(callback)
Asynchronously get the number of concurrent connections on the server. Works
when sockets were sent to forks.
Callback should take two arguments `err` and `count`.
### Event: 'listening'
Emitted when the server has been bound after calling `server.listen`.

191
lib/child_process.js

@ -107,29 +107,31 @@ var handleConversion = {
},
'net.Socket': {
send: function(message, socket) {
// pause socket so no data is lost, will be resumed later
// if the socket wsa created by net.Server
send: function(message, socket, options) {
// if the socket was created by net.Server
if (socket.server) {
// the slave should keep track of the socket
message.key = socket.server._connectionKey;
var firstTime = !this._channel.sockets.send[message.key];
// add socket to connections list
var socketList = getSocketList('send', this, message.key);
socketList.add(socket);
// the server should no longer expose a .connection property
// and when asked to close it should query the socket status from slaves
if (firstTime) {
socket.server._setupSlave(socketList);
if (options && options.track) {
// Keep track of socket's status
message.id = socketList.add(socket);
} else {
// the server should no longer expose a .connection property
// and when asked to close it should query the socket status from
// the slaves
if (firstTime) socket.server._setupSlave(socketList);
// Act like socket is detached
socket.server._connections--;
}
}
// remove handle from socket object, it will be closed when the socket
// has been send
// will be sent
var handle = socket._handle;
handle.onread = function() {};
socket._handle = null;
@ -137,6 +139,11 @@ var handleConversion = {
return handle;
},
postSend: function(handle) {
// Close the Socket handle after sending it
handle.close();
},
got: function(message, handle, emit) {
var socket = new net.Socket({handle: handle});
socket.readable = socket.writable = true;
@ -146,7 +153,10 @@ var handleConversion = {
// add socket to connections list
var socketList = getSocketList('got', this, message.key);
socketList.add(socket);
socketList.add({
id: message.id,
socket: socket
});
}
emit(socket);
@ -161,39 +171,98 @@ function SocketListSend(slave, key) {
var self = this;
this.key = key;
this.list = [];
this.slave = slave;
// These two arrays are used to store the list of sockets and the freelist of
// indexes in this list. After insertion, item will have persistent index
// until it'll be removed. This way we can use this index as an identifier for
// sockets.
this.list = [];
this.freelist = [];
slave.once('disconnect', function() {
self.flush();
});
this.slave.on('internalMessage', function(msg) {
if (msg.cmd !== 'NODE_SOCKET_CLOSED' || msg.key !== self.key) return;
self.flush();
self.remove(msg.id);
});
}
util.inherits(SocketListSend, EventEmitter);
SocketListSend.prototype.add = function(socket) {
this.list.push(socket);
var index;
// Pick one of free indexes, or insert in the end of the list
if (this.freelist.length > 0) {
index = this.freelist.pop();
this.list[index] = socket;
} else {
index = this.list.push(socket) - 1;
}
return index;
};
SocketListSend.prototype.remove = function(index) {
var socket = this.list[index];
if (!socket) return;
// Create a hole in the list and move index to the freelist
this.list[index] = null;
this.freelist.push(index);
socket.destroy();
};
SocketListSend.prototype.flush = function() {
var list = this.list;
this.list = [];
this.freelist = [];
list.forEach(function(socket) {
socket.destroy();
if (socket) socket.destroy();
});
};
SocketListSend.prototype.update = function() {
if (this.slave.connected === false) return;
SocketListSend.prototype._request = function request(msg, cmd, callback) {
var self = this;
if (!this.slave.connected) return onslaveclose();
this.slave.send(msg);
function onclose() {
self.slave.removeListener('internalMessage', onreply);
callback(new Error('Slave closed before reply'));
};
function onreply(msg) {
if (msg.cmd !== cmd || msg.key !== self.key) return;
self.slave.removeListener('disconnect', onclose);
self.slave.removeListener('internalMessage', onreply);
callback(null, msg);
};
this.slave.once('disconnect', onclose);
this.slave.on('internalMessage', onreply);
};
SocketListSend.prototype.close = function close(callback) {
this._request({
cmd: 'NODE_SOCKET_NOTIFY_CLOSE',
key: this.key
}, 'NODE_SOCKET_ALL_CLOSED', callback);
};
this.slave.send({
cmd: 'NODE_SOCKET_FETCH',
SocketListSend.prototype.getConnections = function getConnections(callback) {
this._request({
cmd: 'NODE_SOCKET_GET_COUNT',
key: this.key
}, 'NODE_SOCKET_COUNT', function(err, msg) {
if (err) return callback(err);
callback(null, msg.count);
});
};
@ -203,45 +272,59 @@ function SocketListReceive(slave, key) {
var self = this;
this.connections = 0;
this.key = key;
this.list = [];
this.slave = slave;
slave.on('internalMessage', function(msg) {
if (msg.cmd !== 'NODE_SOCKET_FETCH' || msg.key !== self.key) return;
if (self.list.length === 0) {
self.flush();
return;
}
function onempty() {
if (!self.slave.connected) return;
self.on('itemRemoved', function removeMe() {
if (self.list.length !== 0) return;
self.removeListener('itemRemoved', removeMe);
self.flush();
self.slave.send({
cmd: 'NODE_SOCKET_ALL_CLOSED',
key: self.key
});
}
this.slave.on('internalMessage', function(msg) {
if (msg.key !== self.key) return;
if (msg.cmd === 'NODE_SOCKET_NOTIFY_CLOSE') {
// Already empty
if (self.connections === 0) return onempty();
// Wait for sockets to get closed
self.once('empty', onempty);
} else if (msg.cmd === 'NODE_SOCKET_GET_COUNT') {
if (!self.slave.connected) return;
self.slave.send({
cmd: 'NODE_SOCKET_COUNT',
key: self.key,
count: self.connections
});
}
});
}
util.inherits(SocketListReceive, EventEmitter);
SocketListReceive.prototype.flush = function() {
this.list = [];
SocketListReceive.prototype.add = function(obj) {
var self = this;
if (this.slave.connected) {
this.slave.send({
cmd: 'NODE_SOCKET_CLOSED',
key: this.key
});
}
};
this.connections++;
SocketListReceive.prototype.add = function(socket) {
var self = this;
this.list.push(socket);
// Notify previous owner of socket about its state change
obj.socket.once('close', function() {
self.connections--;
socket.on('close', function() {
self.list.splice(self.list.indexOf(socket), 1);
self.emit('itemRemoved');
if (obj.id !== undefined && self.slave.connected) {
// Master wants to keep eye on socket status
self.slave.send({
cmd: 'NODE_SOCKET_CLOSED',
key: self.key,
id: obj.id
});
}
if (self.connections === 0) self.emit('empty');
});
};
@ -366,17 +449,16 @@ function setupChannel(target, channel) {
var string = JSON.stringify(message) + '\n';
var writeReq = channel.writeUtf8String(string, handle);
// Close the Socket handle after sending it
if (message && message.type === 'net.Socket') {
handle.close();
}
if (!writeReq) {
var er = errnoException(errno, 'write', 'cannot write to IPC channel.');
this.emit('error', er);
}
writeReq.oncomplete = nop;
if (obj && obj.postSend) {
writeReq.oncomplete = obj.postSend.bind(null, handle);
} else {
writeReq.oncomplete = nop;
}
/* If the master is > 2 read() calls behind, please stop sending. */
return channel.writeQueueSize < (65536 * 2);
@ -656,6 +738,7 @@ function ChildProcess() {
this._closesNeeded = 1;
this._closesGot = 0;
this.connected = false;
this.signalCode = null;
this.exitCode = null;

57
lib/net.js

@ -874,8 +874,6 @@ function Server(/* [ options, ] listener */) {
this._connections = 0;
// when server is using slaves .connections is not reliable
// so null will be return if thats the case
Object.defineProperty(this, 'connections', {
get: function() {
if (self._usingSlaves) {
@ -890,6 +888,8 @@ function Server(/* [ options, ] listener */) {
});
this._handle = null;
this._usingSlaves = false;
this._slaves = [];
this.allowHalfOpen = options.allowHalfOpen || false;
}
@ -1122,7 +1122,37 @@ function onconnection(clientHandle) {
}
Server.prototype.getConnections = function(cb) {
if (!this._usingSlaves) return cb(null, this.connections);
// Poll slaves
var left = this._slaves.length,
total = this._connections;
function oncount(err, count) {
if (err) {
left = -1;
return cb(err);
}
total += count;
if (--left === 0) return cb(null, total);
}
this._slaves.forEach(function(slave) {
slave.getConnections(oncount);
});
};
Server.prototype.close = function(cb) {
function onSlaveClose() {
if (--left !== 0) return;
self._connections = 0;
self._emitCloseIfDrained();
}
if (!this._handle) {
// Throw error. Follows net_legacy behaviour.
throw new Error('Not running');
@ -1133,14 +1163,21 @@ Server.prototype.close = function(cb) {
}
this._handle.close();
this._handle = null;
this._emitCloseIfDrained();
// fetch new socket lists
if (this._usingSlaves) {
this._slaves.forEach(function(socketList) {
if (socketList.list.length === 0) return;
socketList.update();
var self = this,
left = this._slaves.length;
// Increment connections to be sure that, even if all sockets will be closed
// during polling of slaves, `close` event will be emitted only once.
this._connections++;
// Poll slaves
this._slaves.forEach(function(slave) {
slave.close(onSlaveClose);
});
} else {
this._emitCloseIfDrained();
}
return this;
@ -1167,12 +1204,8 @@ Server.prototype.listenFD = util.deprecate(function(fd, type) {
return this.listen({ fd: fd });
}, 'listenFD is deprecated. Use listen({fd: <number>}).');
// when sending a socket using fork IPC this function is executed
Server.prototype._setupSlave = function(socketList) {
if (!this._usingSlaves) {
this._usingSlaves = true;
this._slaves = [];
}
this._usingSlaves = true;
this._slaves.push(socketList);
};

107
test/simple/test-child-process-fork-getconnections.js

@ -0,0 +1,107 @@
// Copyright Joyent, Inc. and other Node contributors.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the
// "Software"), to deal in the Software without restriction, including
// without limitation the rights to use, copy, modify, merge, publish,
// distribute, sublicense, and/or sell copies of the Software, and to permit
// persons to whom the Software is furnished to do so, subject to the
// following conditions:
//
// The above copyright notice and this permission notice shall be included
// in all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
// USE OR OTHER DEALINGS IN THE SOFTWARE.
var assert = require('assert');
var common = require('../common');
var fork = require('child_process').fork;
var net = require('net');
var count = 12;
if (process.argv[2] === 'child') {
var sockets = [];
var id = process.argv[3];
process.on('message', function(m, socket) {
if (socket) {
sockets.push(socket);
}
if (m.cmd === 'close') {
sockets[m.id].once('close', function() {
process.send({ id: m.id, status: 'closed' });
});
sockets[m.id].destroy();
}
});
} else {
var child = fork(process.argv[1], ['child']);
var server = net.createServer();
var sockets = [];
var sent = 0;
server.on('connection', function(socket) {
child.send({ cmd: 'new' }, socket, { track: false });
sockets.push(socket);
if (sockets.length === count) {
closeSockets();
server.close();
}
});
var disconnected = 0;
server.on('listening', function() {
var j = count, client;
while (j--) {
client = net.connect(common.PORT, '127.0.0.1');
client.on('close', function() {
console.error('[m] CLIENT: close event');
disconnected += 1;
});
// XXX This resume() should be unnecessary.
// a stream high water mark should be enough to keep
// consuming the input.
client.resume();
}
});
function closeSockets(i) {
if (!i) i = 0;
if (i === count) return;
sent++;
child.send({ id: i, cmd: 'close' });
child.once('message', function(m) {
assert(m.status === 'closed');
server.getConnections(function(err, num) {
closeSockets(i + 1);
});
});
};
var closeEmitted = false;
server.on('close', function() {
console.error('[m] server close');
closeEmitted = true;
child.kill();
});
server.listen(common.PORT, '127.0.0.1');
process.on('exit', function() {
assert.equal(sent, count);
assert.equal(disconnected, count);
assert.ok(closeEmitted);
});
}

59
test/simple/test-child-process-fork-net2.js

@ -26,13 +26,13 @@ var net = require('net');
var count = 12;
if (process.argv[2] === 'child') {
var needEnd = [];
var id = process.argv[3];
process.on('message', function(m, socket) {
if (!socket) return;
console.error('got socket', m);
console.error('[%d] got socket', id, m);
// will call .end('end') or .write('write');
socket[m](m);
@ -40,11 +40,11 @@ if (process.argv[2] === 'child') {
socket.resume();
socket.on('data', function() {
console.error('%d socket.data', process.pid, m);
console.error('[%d] socket.data', id, m);
});
socket.on('end', function() {
console.error('%d socket.end', process.pid, m);
console.error('[%d] socket.end', id, m);
});
// store the unfinished socket
@ -53,58 +53,62 @@ if (process.argv[2] === 'child') {
}
socket.on('close', function() {
console.error('%d socket.close', process.pid, m);
console.error('[%d] socket.close', id, m);
});
socket.on('finish', function() {
console.error('%d socket finished', process.pid, m);
console.error('[%d] socket finished', id, m);
});
});
process.on('message', function(m) {
if (m !== 'close') return;
console.error('got close message');
console.error('[%d] got close message', id);
needEnd.forEach(function(endMe, i) {
console.error('%d ending %d', process.pid, i);
console.error('[%d] ending %d', id, i);
endMe.end('end');
});
});
process.on('disconnect', function() {
console.error('%d process disconnect, ending', process.pid);
console.error('[%d] process disconnect, ending', id);
needEnd.forEach(function(endMe, i) {
console.error('%d ending %d', process.pid, i);
console.error('[%d] ending %d', id, i);
endMe.end('end');
});
endMe = null;
});
} else {
var child1 = fork(process.argv[1], ['child']);
var child2 = fork(process.argv[1], ['child']);
var child3 = fork(process.argv[1], ['child']);
var child1 = fork(process.argv[1], ['child', '1']);
var child2 = fork(process.argv[1], ['child', '2']);
var child3 = fork(process.argv[1], ['child', '3']);
var server = net.createServer();
var connected = 0;
var connected = 0,
closed = 0;
server.on('connection', function(socket) {
switch (connected % 6) {
case 0:
child1.send('end', socket); break;
child1.send('end', socket, { track: false }); break;
case 1:
child1.send('write', socket); break;
child1.send('write', socket, { track: true }); break;
case 2:
child2.send('end', socket); break;
child2.send('end', socket, { track: true }); break;
case 3:
child2.send('write', socket); break;
child2.send('write', socket, { track: false }); break;
case 4:
child3.send('end', socket); break;
child3.send('end', socket, { track: false }); break;
case 5:
child3.send('write', socket); break;
child3.send('write', socket, { track: false }); break;
}
connected += 1;
socket.once('close', function() {
console.log('[m] socket closed, total %d', ++closed);
});
if (connected === count) {
closeServer();
}
@ -117,7 +121,7 @@ if (process.argv[2] === 'child') {
while (j--) {
client = net.connect(common.PORT, '127.0.0.1');
client.on('close', function() {
console.error('CLIENT: close event in master');
console.error('[m] CLIENT: close event');
disconnected += 1;
});
// XXX This resume() should be unnecessary.
@ -129,7 +133,7 @@ if (process.argv[2] === 'child') {
var closeEmitted = false;
server.on('close', function() {
console.error('server close');
console.error('[m] server close');
closeEmitted = true;
child1.kill();
@ -141,18 +145,19 @@ if (process.argv[2] === 'child') {
var timeElasped = 0;
var closeServer = function() {
console.error('closeServer');
console.error('[m] closeServer');
var startTime = Date.now();
server.on('close', function() {
console.error('emit(close)');
console.error('[m] emit(close)');
timeElasped = Date.now() - startTime;
});
console.error('calling server.close');
console.error('[m] calling server.close');
server.close();
setTimeout(function() {
console.error('sending close to children');
assert(!closeEmitted);
console.error('[m] sending close to children');
child1.send('close');
child2.send('close');
child3.disconnect();

110
test/simple/test-child-process-fork-track.js

@ -0,0 +1,110 @@
// Copyright Joyent, Inc. and other Node contributors.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the
// "Software"), to deal in the Software without restriction, including
// without limitation the rights to use, copy, modify, merge, publish,
// distribute, sublicense, and/or sell copies of the Software, and to permit
// persons to whom the Software is furnished to do so, subject to the
// following conditions:
//
// The above copyright notice and this permission notice shall be included
// in all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
// USE OR OTHER DEALINGS IN THE SOFTWARE.
var assert = require('assert');
var common = require('../common');
var fork = require('child_process').fork;
var net = require('net');
var count = 12;
if (process.argv[2] === 'child') {
var sockets = [];
var id = process.argv[3];
process.on('message', function(m, socket) {
if (socket) {
sockets.push(socket);
}
if (m.cmd === 'close') {
sockets[m.id].once('close', function() {
process.send({ id: m.id, status: 'closed' });
});
sockets[m.id].destroy();
}
});
} else {
var child = fork(process.argv[1], ['child']);
var server = net.createServer();
var sockets = [];
var closed = 0;
server.on('connection', function(socket) {
child.send({ cmd: 'new' }, socket, { track: true });
sockets.push(socket);
socket.once('close', function() {
console.error('[m] socket closed');
closed++;
assert.equal(closed + server.connections, count);
if (server.connections === 0) server.close();
});
if (sockets.length === count) {
closeSockets();
}
});
var disconnected = 0;
server.on('listening', function() {
var j = count, client;
while (j--) {
client = net.connect(common.PORT, '127.0.0.1');
client.on('close', function() {
console.error('[m] CLIENT: close event');
disconnected += 1;
});
// XXX This resume() should be unnecessary.
// a stream high water mark should be enough to keep
// consuming the input.
client.resume();
}
});
function closeSockets(i) {
if (!i) i = 0;
if (i === count) return;
child.send({ id: i, cmd: 'close' });
child.once('message', function(m) {
assert(m.status === 'closed');
closeSockets(i + 1);
});
};
var closeEmitted = false;
server.on('close', function() {
console.error('[m] server close');
closeEmitted = true;
child.kill();
});
server.listen(common.PORT, '127.0.0.1');
process.on('exit', function() {
assert.equal(disconnected, count);
assert.equal(closed, count);
assert.ok(closeEmitted);
});
}
Loading…
Cancel
Save