Browse Source

child_process: allow sending a net Socket and Server object using child.send

child_process.fork() support sending native hander object, this patch add support for sending
net.Server and net.Socket object by converting the object to a native handle object and back
to a useful object again.

Note when sending a Socket there was emitted by a net Server object, the server.connections
property becomes null, because it is no longer possible to known when it is destroyed.
v0.9.1-release
Andreas Madsen 13 years ago
committed by isaacs
parent
commit
dceebbfa31
  1. 76
      doc/api/child_process.markdown
  2. 4
      doc/api/net.markdown
  3. 270
      lib/child_process.js
  4. 46
      lib/net.js
  5. 198
      test/simple/test-child-process-fork-net.js
  6. 131
      test/simple/test-child-process-fork-net2.js

76
doc/api/child_process.markdown

@ -55,7 +55,7 @@ An alternative way to check if you can send messages is to see if the
### Event: 'message'
* `message` {Object} a parsed JSON object or primitive value
* `sendHandle` {Handle object} a handle object
* `sendHandle` {Handle object} a Socket or Server object
Messages send by `.send(message, [sendHandle])` are obtained using the
`message` event.
@ -129,7 +129,7 @@ See `kill(2)`
* `message` {Object}
* `sendHandle` {Handle object}
When useing `child_process.fork()` an you can write to the child using
When using `child_process.fork()` you can write to the child using
`child.send(message, [sendHandle])` and messages are received by
a `'message'` event on the child.
@ -162,9 +162,73 @@ the `message` event, since they are internal messages used by node core.
Messages containing the prefix are emitted in the `internalMessage` event, you
should by all means avoid using this feature, it is subject to change without notice.
The `sendHandle` option to `child.send()` is for sending a handle object to
another process. The child will receive the object as its second argument to
the `message` event.
The `sendHandle` option to `child.send()` is for sending a TCP server or
socket object to another process. The child will receive the object as its
second argument to the `message` event.
**send server object**
Here is an example of sending a server:
var child = require('child_process').fork('child.js');
// Open up the server object and send the handle.
var server = require('net').createServer();
server.on('connection', function (socket) {
socket.end('handled by parent');
});
server.listen(1337, function() {
child.send('server', server);
});
And the child would the recive the server object as:
process.on('message', function(m, server) {
if (m === 'server') {
server.on('connection', function (socket) {
socket.end('handled by child');
});
}
});
Note that the server is now shared between the parent and child, this means
that some connections will be handled by the parent and some by the child.
**send socket object**
Here is an example of sending a socket. It will spawn two childs and handle
connections with the remote address `74.125.127.100` as VIP by sending the
socket to a "special" child process. Other sockets will go to a "normal" process.
var normal = require('child_process').fork('child.js', ['normal']);
var special = require('child_process').fork('child.js', ['special']);
// Open up the server and send sockets to child
var server = require('net').createServer();
server.on('connection', function (socket) {
// if this is a VIP
if (socket.remoteAddress === '74.125.127.100') {
special.send('socket', socket);
return;
}
// just the usual dudes
normal.send('socket', socket);
});
server.listen(1337);
The `child.js` could look like this:
process.on('message', function(m, socket) {
if (m === 'socket') {
socket.end('You where handled as a ' + process.argv[2] + ' person');
}
});
Note that once a single socket has been sent to a child the parent can no
longer keep track of when the socket is destroyed. To indicate this condition
the `.connections` property becomes `null`.
It is also recomended not to use `.maxConnections` in this condition.
### child.disconnect()
@ -382,7 +446,7 @@ leaner than `child_process.exec`. It has the same options.
This is a special case of the `spawn()` functionality for spawning Node
processes. In addition to having all the methods in a normal ChildProcess
instance, the returned object has a communication channel built-in. Se
instance, the returned object has a communication channel built-in. See
`child.send(message, [sendHandle])` for details.
By default the spawned Node process will have the stdout, stderr associated

4
doc/api/net.markdown

@ -198,10 +198,14 @@ Don't call `server.address()` until the `'listening'` event has been emitted.
Set this property to reject connections when the server's connection count gets
high.
It is not recommended to use this option once a socket has been sent to a child
with `child_process.fork()`.
### server.connections
The number of concurrent connections on the server.
This becomes `null` when sending a socket to a child with `child_process.fork()`.
`net.Server` is an `EventEmitter` with the following events:

270
lib/child_process.js

@ -54,15 +54,207 @@ function createSocket(pipe, readable) {
}
// this object contain function to convert TCP objects to native handle objects
// and back again.
var handleConversion = {
'net.Native': {
simultaneousAccepts: true,
send: function(message, handle) {
return handle;
},
got: function(message, handle, emit) {
emit(handle);
}
},
'net.Server': {
simultaneousAccepts: true,
send: function(message, server) {
return server._handle;
},
got: function(message, handle, emit) {
var self = this;
var server = new net.Server();
server.listen(handle, function() {
emit(server);
});
}
},
'net.Socket': {
send: function(message, socket) {
// pause socket so no data is lost, will be resumed later
socket.pause();
// if the socket wsa created by net.Server
if (socket.server) {
// the slave should keep track of the socket
message.key = socket.server._connectionKey;
var firstTime = !this._channel.sockets.send[message.key];
// add socket to connections list
var socketList = getSocketList('send', this, message.key);
socketList.add(socket);
// the server should no longer expose a .connection property
// and when asked to close it should query the socket status from slaves
if (firstTime) {
socket.server._setupSlave(socketList);
}
}
// remove handle from socket object, it will be closed when the socket
// has been send
var handle = socket._handle;
handle.onread = function() {};
socket._handle = null;
return handle;
},
got: function(message, handle, emit) {
var socket = new net.Socket({handle: handle});
socket.readable = socket.writable = true;
socket.pause();
// if the socket was created by net.Server we will track the socket
if (message.key) {
// add socket to connections list
var socketList = getSocketList('got', this, message.key);
socketList.add(socket);
}
emit(socket);
socket.resume();
}
}
};
// This object keep track of the socket there are sended
function SocketListSend(slave, key) {
var self = this;
this.key = key;
this.list = [];
this.slave = slave;
slave.once('disconnect', function() {
self.flush();
});
this.slave.on('internalMessage', function(msg) {
if (msg.cmd !== 'NODE_SOCKET_CLOSED' || msg.key !== self.key) return;
self.flush();
});
}
util.inherits(SocketListSend, EventEmitter);
SocketListSend.prototype.add = function(socket) {
this.list.push(socket);
};
SocketListSend.prototype.flush = function() {
var list = this.list;
this.list = [];
list.forEach(function(socket) {
socket.destroy();
});
};
SocketListSend.prototype.update = function() {
if (this.slave.connected === false) return;
this.slave.send({
cmd: 'NODE_SOCKET_FETCH',
key: this.key
});
};
// This object keep track of the socket there are received
function SocketListReceive(slave, key) {
var self = this;
this.key = key;
this.list = [];
this.slave = slave;
slave.on('internalMessage', function(msg) {
if (msg.cmd !== 'NODE_SOCKET_FETCH' || msg.key !== self.key) return;
if (self.list.length === 0) {
self.flush();
return;
}
self.on('itemRemoved', function removeMe() {
if (self.list.length !== 0) return;
self.removeListener('itemRemoved', removeMe);
self.flush();
});
});
}
util.inherits(SocketListReceive, EventEmitter);
SocketListReceive.prototype.flush = function() {
this.list = [];
if (this.slave.connected) {
this.slave.send({
cmd: 'NODE_SOCKET_CLOSED',
key: this.key
});
}
};
SocketListReceive.prototype.add = function(socket) {
var self = this;
this.list.push(socket);
socket.on('close', function() {
self.list.splice(self.list.indexOf(socket), 1);
self.emit('itemRemoved');
});
};
function getSocketList(type, slave, key) {
var sockets = slave._channel.sockets[type];
var socketList = sockets[key];
if (!socketList) {
var Construct = type === 'send' ? SocketListSend : SocketListReceive;
socketList = sockets[key] = new Construct(slave, key);
}
return socketList;
}
function handleMessage(target, message, handle) {
//Filter out internal messages
//if cmd property begin with "_NODE"
if (message !== null &&
typeof message === 'object' &&
typeof message.cmd === 'string' &&
message.cmd.indexOf('NODE_') === 0) {
target.emit('internalMessage', message, handle);
}
//Non-internal message
else {
target.emit('message', message, handle);
}
}
function setupChannel(target, channel) {
target._channel = channel;
var jsonBuffer = '';
channel.buffering = false;
channel.onread = function(pool, offset, length, recvHandle) {
// Update simultaneous accepts on Windows
net._setSimultaneousAccepts(recvHandle);
if (pool) {
jsonBuffer += pool.toString('ascii', offset, offset + length);
@ -73,18 +265,7 @@ function setupChannel(target, channel) {
var json = jsonBuffer.slice(start, i);
var message = JSON.parse(json);
//Filter out internal messages
//if cmd property begin with "_NODE"
if (message !== null &&
typeof message === 'object' &&
typeof message.cmd === 'string' &&
message.cmd.indexOf('NODE_') === 0) {
target.emit('internalMessage', message, recvHandle);
}
//Non-internal message
else {
target.emit('message', message, recvHandle);
}
handleMessage(target, message, recvHandle);
start = i + 1;
}
@ -97,7 +278,27 @@ function setupChannel(target, channel) {
}
};
target.send = function(message, sendHandle) {
// object where socket lists will live
channel.sockets = { got: {}, send: {} };
// handlers will go through this
target.on('internalMessage', function(message, handle) {
if (message.cmd !== 'NODE_HANDLE') return;
var obj = handleConversion[message.type];
// Update simultaneous accepts on Windows
if (obj.simultaneousAccepts) {
net._setSimultaneousAccepts(handle);
}
// Convert handle object
obj.got.call(this, message, handle, function(handle) {
handleMessage(target, message.msg, handle);
});
});
target.send = function(message, handle) {
if (typeof message === 'undefined') {
throw new TypeError('message cannot be undefined');
}
@ -112,12 +313,43 @@ function setupChannel(target, channel) {
return false;
}
var string = JSON.stringify(message) + '\n';
// package messages with a handle object
if (handle) {
// this message will be handled by an internalMessage event handler
message = {
cmd: 'NODE_HANDLE',
type: 'net.',
msg: message
};
switch (handle.constructor.name) {
case 'Socket':
message.type += 'Socket'; break;
case 'Server':
message.type += 'Server'; break;
case 'Pipe':
case 'TCP':
message.type += 'Native'; break;
}
var obj = handleConversion[message.type];
// convert TCP object to native handle object
handle = handleConversion[message.type].send.apply(target, arguments);
// Update simultaneous accepts on Windows
net._setSimultaneousAccepts(sendHandle);
if (obj.simultaneousAccepts) {
net._setSimultaneousAccepts(handle);
}
}
var writeReq = channel.writeUtf8String(string, sendHandle);
var string = JSON.stringify(message) + '\n';
var writeReq = channel.writeUtf8String(string, handle);
// Close the Socket handle after sending it
if (message && message.type === 'net.Socket') {
handle.close();
}
if (!writeReq) {
var er = errnoException(errno, 'write', 'cannot write to IPC channel.');

46
lib/net.js

@ -352,7 +352,7 @@ Socket.prototype._destroy = function(exception, cb) {
timers.unenroll(this);
if (this.server) {
this.server.connections--;
this.server._connections--;
this.server._emitCloseIfDrained();
}
@ -800,7 +800,23 @@ function Server(/* [ options, ] listener */) {
}
}
this.connections = 0;
this._connections = 0;
// when server is using slaves .connections is not reliable
// so null will be return if thats the case
Object.defineProperty(this, 'connections', {
get: function() {
if (self._usingSlaves) {
return null;
}
return self._connections;
},
set: function(val) {
return (self._connections = val);
},
configurable: true, enumerable: true
});
this.allowHalfOpen = options.allowHalfOpen || false;
this._handle = null;
@ -881,6 +897,9 @@ Server.prototype._listen2 = function(address, port, addressType, backlog) {
return;
}
// generate connection key, this should be unique to the connection
this._connectionKey = addressType + ':' + address + ':' + port;
process.nextTick(function() {
self.emit('listening');
});
@ -970,7 +989,7 @@ function onconnection(clientHandle) {
return;
}
if (self.maxConnections && self.connections >= self.maxConnections) {
if (self.maxConnections && self._connections >= self.maxConnections) {
clientHandle.close();
return;
}
@ -983,7 +1002,7 @@ function onconnection(clientHandle) {
socket.resume();
self.connections++;
self._connections++;
socket.server = self;
DTRACE_NET_SERVER_CONNECTION(socket);
@ -1005,13 +1024,21 @@ Server.prototype.close = function(cb) {
this._handle = null;
this._emitCloseIfDrained();
// fetch new socket lists
if (this._usingSlaves) {
this._slaves.forEach(function(socketList) {
if (socketList.list.length === 0) return;
socketList.update();
});
}
return this;
};
Server.prototype._emitCloseIfDrained = function() {
var self = this;
if (self._handle || self.connections) return;
if (self._handle || self._connections) return;
process.nextTick(function() {
self.emit('close');
@ -1023,6 +1050,15 @@ Server.prototype.listenFD = function(fd, type) {
throw new Error('This API is no longer supported. See child_process.fork');
};
// when sending a socket using fork IPC this function is executed
Server.prototype._setupSlave = function(socketList) {
if (!this._usingSlaves) {
this._usingSlaves = true;
this._slaves = [];
}
this._slaves.push(socketList);
};
// TODO: isIP should be moved to the DNS code. Putting it here now because
// this is what the legacy system did.

198
test/simple/test-child-process-fork-net.js

@ -0,0 +1,198 @@
// Copyright Joyent, Inc. and other Node contributors.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the
// "Software"), to deal in the Software without restriction, including
// without limitation the rights to use, copy, modify, merge, publish,
// distribute, sublicense, and/or sell copies of the Software, and to permit
// persons to whom the Software is furnished to do so, subject to the
// following conditions:
//
// The above copyright notice and this permission notice shall be included
// in all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
// USE OR OTHER DEALINGS IN THE SOFTWARE.
var assert = require('assert');
var common = require('../common');
var fork = require('child_process').fork;
var net = require('net');
// progress tracker
function ProgressTracker(missing, callback) {
this.missing = missing;
this.callback = callback;
}
ProgressTracker.prototype.done = function() {
this.missing -= 1;
this.check();
};
ProgressTracker.prototype.check = function() {
if (this.missing === 0) this.callback();
};
if (process.argv[2] === 'child') {
var serverScope;
process.on('message', function onServer(msg, server) {
if (msg.what !== 'server') return;
process.removeListener('message', onServer);
serverScope = server;
server.on('connection', function(socket) {
console.log('CHILD: got connection');
process.send({what: 'connection'});
socket.destroy();
});
// start making connection from parent
console.log('CHILD: server listening');
process.send({what: 'listening'});
});
process.on('message', function onClose(msg) {
if (msg.what !== 'close') return;
process.removeListener('message', onClose);
serverScope.on('close', function() {
process.send({what: 'close'});
});
serverScope.close();
});
process.on('message', function onSocket(msg, socket) {
if (msg.what !== 'socket') return;
process.removeListener('message', onSocket);
socket.end('echo');
console.log('CHILD: got socket');
});
process.send({what: 'ready'});
} else {
var child = fork(process.argv[1], ['child']);
child.on('exit', function() {
console.log('CHILD: died');
});
// send net.Server to child and test by connecting
var testServer = function(callback) {
// destroy server execute callback when done
var progress = new ProgressTracker(2, function() {
server.on('close', function() {
console.log('PARENT: server closed');
child.send({what: 'close'});
});
server.close();
});
// we expect 10 connections and close events
var connections = new ProgressTracker(10, progress.done.bind(progress));
var closed = new ProgressTracker(10, progress.done.bind(progress));
// create server and send it to child
var server = net.createServer();
server.on('connection', function(socket) {
console.log('PARENT: got connection');
socket.destroy();
connections.done();
});
server.on('listening', function() {
console.log('PARENT: server listening');
child.send({what: 'server'}, server);
});
server.listen(common.PORT);
// handle client messages
var messageHandlers = function(msg) {
if (msg.what === 'listening') {
// make connections
var socket;
for (var i = 0; i < 10; i++) {
socket = net.connect(common.PORT, function() {
console.log('CLIENT: connected');
});
socket.on('close', function() {
closed.done();
console.log('CLIENT: closed');
});
}
} else if (msg.what === 'connection') {
// child got connection
connections.done();
} else if (msg.what === 'close') {
child.removeListener('message', messageHandlers);
callback();
}
};
child.on('message', messageHandlers);
};
// send net.Socket to child
var testSocket = function(callback) {
// create a new server and connect to it,
// but the socket will be handled by the child
var server = net.createServer();
server.on('connection', function(socket) {
socket.on('close', function() {
console.log('CLIENT: socket closed');
});
child.send({what: 'socket'}, socket);
});
server.on('close', function() {
console.log('PARENT: server closed');
callback();
});
server.listen(common.PORT, function() {
var connect = net.connect(common.PORT);
var store = '';
connect.on('data', function(chunk) {
store += chunk;
console.log('CLIENT: got data');
});
connect.on('close', function() {
console.log('CLIENT: closed');
assert.equal(store, 'echo');
server.close();
});
});
};
// create server and send it to child
var serverSucess = false;
var socketSucess = false;
child.on('message', function onReady(msg) {
if (msg.what !== 'ready') return;
child.removeListener('message', onReady);
testServer(function() {
serverSucess = true;
testSocket(function() {
socketSucess = true;
child.kill();
});
});
});
process.on('exit', function() {
assert.ok(serverSucess);
assert.ok(socketSucess);
});
}

131
test/simple/test-child-process-fork-net2.js

@ -0,0 +1,131 @@
// Copyright Joyent, Inc. and other Node contributors.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the
// "Software"), to deal in the Software without restriction, including
// without limitation the rights to use, copy, modify, merge, publish,
// distribute, sublicense, and/or sell copies of the Software, and to permit
// persons to whom the Software is furnished to do so, subject to the
// following conditions:
//
// The above copyright notice and this permission notice shall be included
// in all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
// USE OR OTHER DEALINGS IN THE SOFTWARE.
var assert = require('assert');
var common = require('../common');
var fork = require('child_process').fork;
var net = require('net');
if (process.argv[2] === 'child') {
var endMe = null;
process.on('message', function(m, socket) {
if (!socket) return;
// will call .end('end') or .write('write');
socket[m](m);
// store the unfinished socket
if (m === 'write') {
endMe = socket;
}
});
process.on('message', function(m) {
if (m !== 'close') return;
endMe.end('end');
endMe = null;
});
process.on('disconnect', function() {
endMe.end('end');
endMe = null;
});
} else {
var child1 = fork(process.argv[1], ['child']);
var child2 = fork(process.argv[1], ['child']);
var child3 = fork(process.argv[1], ['child']);
var server = net.createServer();
var connected = 0;
server.on('connection', function(socket) {
switch (connected) {
case 0:
child1.send('end', socket); break;
case 1:
child1.send('write', socket); break;
case 2:
child2.send('end', socket); break;
case 3:
child2.send('write', socket); break;
case 4:
child3.send('end', socket); break;
case 5:
child3.send('write', socket); break;
}
connected += 1;
if (connected === 6) {
closeServer();
}
});
var disconnected = 0;
server.on('listening', function() {
var j = 6, client;
while (j--) {
client = net.connect(common.PORT, '127.0.0.1');
client.on('close', function() {
disconnected += 1;
});
}
});
var closeEmitted = false;
server.on('close', function() {
closeEmitted = true;
child1.kill();
child2.kill();
child3.kill();
});
server.listen(common.PORT, '127.0.0.1');
var timeElasped = 0;
var closeServer = function() {
var startTime = Date.now();
server.on('close', function() {
timeElasped = Date.now() - startTime;
});
server.close();
setTimeout(function() {
child1.send('close');
child2.send('close');
child3.disconnect();
}, 200);
};
process.on('exit', function() {
assert.equal(disconnected, 6);
assert.equal(connected, 6);
assert.ok(closeEmitted);
assert.ok(timeElasped >= 190 && timeElasped <= 1000,
'timeElasped was not between 190 and 1000 ms');
});
}
Loading…
Cancel
Save