Browse Source

child_process: support sending dgram socket

child.send can send net servers and sockets. Now that we have support
for dgram clusters this functionality should be extended to include
dgram sockets.
v0.10.0-release
Andreas Madsen 12 years ago
committed by Ben Noordhuis
parent
commit
bdf7ac2c5d
  1. 4
      doc/api/child_process.markdown
  2. 21
      lib/child_process.js
  3. 39
      lib/dgram.js
  4. 103
      test/simple/test-child-process-fork-dgram.js

4
doc/api/child_process.markdown

@ -200,6 +200,10 @@ And the child would the receive the server object as:
Note that the server is now shared between the parent and child, this means
that some connections will be handled by the parent and some by the child.
For `dgram` servers the workflow is exactly the same. Here you listen on
a `message` event instead of `connection` and use `server.bind` instead of
`server.listen`.
#### Example: sending socket object
Here is an example of sending a socket. It will spawn two children and handle

21
lib/child_process.js

@ -21,6 +21,7 @@
var EventEmitter = require('events').EventEmitter;
var net = require('net');
var dgram = require('dgram');
var Process = process.binding('process_wrap').Process;
var util = require('util');
var constants; // if (!constants) constants = process.binding('constants');
@ -167,6 +168,24 @@ var handleConversion = {
got: function(message, handle, emit) {
emit(handle);
}
},
'dgram.Socket': {
simultaneousAccepts: false,
send: function(message, socket) {
message.dgramType = socket.type;
return socket._handle;
},
got: function(message, handle, emit) {
var socket = new dgram.Socket(message.dgramType);
socket.bind(handle, function() {
emit(socket);
});
}
}
};
@ -377,6 +396,8 @@ function setupChannel(target, channel) {
} else if (handle instanceof process.binding('tcp_wrap').TCP ||
handle instanceof process.binding('pipe_wrap').Pipe) {
message.type = 'net.Native';
} else if (handle instanceof dgram.Socket) {
message.type = 'dgram.Socket';
} else if (handle instanceof process.binding('udp_wrap').UDP) {
message.type = 'dgram.Native';
} else {

39
lib/dgram.js

@ -141,8 +141,20 @@ function startListening(socket) {
socket.emit('listening');
}
function replaceHandle(self, newHandle) {
Socket.prototype.bind = function(port, address, callback) {
// Set up the handle that we got from master.
newHandle.lookup = self._handle.lookup;
newHandle.bind = self._handle.bind;
newHandle.send = self._handle.send;
newHandle.owner = self;
// Replace the existing handle by the handle we got from master.
self._handle.close();
self._handle = newHandle;
}
Socket.prototype.bind = function(/*port, address, callback*/) {
var self = this;
self._healthCheck();
@ -152,8 +164,18 @@ Socket.prototype.bind = function(port, address, callback) {
this._bindState = BIND_STATE_BINDING;
if (typeof callback === 'function')
self.once('listening', callback);
if (typeof arguments[arguments.length - 1] === 'function')
self.once('listening', arguments[arguments.length - 1]);
var UDP = process.binding('udp_wrap').UDP;
if (arguments[0] instanceof UDP) {
replaceHandle(self, arguments[0]);
startListening(self);
return;
}
var port = arguments[0];
var address = arguments[1];
// resolve address first
self._handle.lookup(address, function(err, ip) {
@ -172,16 +194,7 @@ Socket.prototype.bind = function(port, address, callback) {
// handle has been closed in the mean time.
return handle.close();
// Set up the handle that we got from master.
handle.lookup = self._handle.lookup;
handle.bind = self._handle.bind;
handle.send = self._handle.send;
handle.owner = self;
// Replace the existing handle by the handle we got from master.
self._handle.close();
self._handle = handle;
replaceHandle(self, handle);
startListening(self);
});

103
test/simple/test-child-process-fork-dgram.js

@ -0,0 +1,103 @@
// Copyright Joyent, Inc. and other Node contributors.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the
// "Software"), to deal in the Software without restriction, including
// without limitation the rights to use, copy, modify, merge, publish,
// distribute, sublicense, and/or sell copies of the Software, and to permit
// persons to whom the Software is furnished to do so, subject to the
// following conditions:
//
// The above copyright notice and this permission notice shall be included
// in all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
// USE OR OTHER DEALINGS IN THE SOFTWARE.
var dgram = require('dgram');
var fork = require('child_process').fork;
var assert = require('assert');
var common = require('../common');
if (process.argv[2] === 'child') {
var childCollected = 0;
var server;
process.on('message', function removeMe(msg, clusterServer) {
if (msg === 'server') {
server = clusterServer;
server.on('message', function () {
childCollected += 1;
});
} else if (msg === 'stop') {
server.close();
process.send(childCollected);
process.removeListener('message', removeMe);
}
});
} else {
var server = dgram.createSocket('udp4');
var client = dgram.createSocket('udp4');
var child = fork(__filename, ['child']);
var msg = new Buffer('Some bytes');
var parentCollected = 0;
var childCollected = 0;
server.on('message', function (msg, rinfo) {
parentCollected += 1;
});
server.on('listening', function () {
child.send('server', server);
sendMessages();
});
var sendMessages = function () {
var wait = 0;
var send = 0;
var total = 100;
var timer = setInterval(function () {
send += 1;
if (send === total) {
clearInterval(timer);
}
client.send(msg, 0, msg.length, common.PORT, '127.0.0.1', function(err) {
if (err) throw err;
wait += 1;
if (wait === total) {
shutdown();
}
}
);
}, 1);
};
var shutdown = function () {
child.send('stop');
child.once('message', function (collected) {
childCollected = collected;
});
server.close();
client.close();
};
server.bind(common.PORT, '127.0.0.1');
process.once('exit', function () {
assert(childCollected > 0);
assert(parentCollected > 0);
});
}
Loading…
Cancel
Save