From bdf7ac2c5dcdbb59d9a4db7e8eaa66462b1bed26 Mon Sep 17 00:00:00 2001 From: Andreas Madsen Date: Wed, 27 Feb 2013 19:31:24 +0100 Subject: [PATCH] child_process: support sending dgram socket child.send can send net servers and sockets. Now that we have support for dgram clusters this functionality should be extended to include dgram sockets. --- doc/api/child_process.markdown | 4 + lib/child_process.js | 21 ++++ lib/dgram.js | 39 ++++--- test/simple/test-child-process-fork-dgram.js | 103 +++++++++++++++++++ 4 files changed, 154 insertions(+), 13 deletions(-) create mode 100644 test/simple/test-child-process-fork-dgram.js diff --git a/doc/api/child_process.markdown b/doc/api/child_process.markdown index 76d42d5701..b652e8846e 100644 --- a/doc/api/child_process.markdown +++ b/doc/api/child_process.markdown @@ -200,6 +200,10 @@ And the child would the receive the server object as: Note that the server is now shared between the parent and child, this means that some connections will be handled by the parent and some by the child. +For `dgram` servers the workflow is exactly the same. Here you listen on +a `message` event instead of `connection` and use `server.bind` instead of +`server.listen`. + #### Example: sending socket object Here is an example of sending a socket. It will spawn two children and handle diff --git a/lib/child_process.js b/lib/child_process.js index a4a5d2b3a9..80735d4d0c 100644 --- a/lib/child_process.js +++ b/lib/child_process.js @@ -21,6 +21,7 @@ var EventEmitter = require('events').EventEmitter; var net = require('net'); +var dgram = require('dgram'); var Process = process.binding('process_wrap').Process; var util = require('util'); var constants; // if (!constants) constants = process.binding('constants'); @@ -167,6 +168,24 @@ var handleConversion = { got: function(message, handle, emit) { emit(handle); } + }, + + 'dgram.Socket': { + simultaneousAccepts: false, + + send: function(message, socket) { + message.dgramType = socket.type; + + return socket._handle; + }, + + got: function(message, handle, emit) { + var socket = new dgram.Socket(message.dgramType); + + socket.bind(handle, function() { + emit(socket); + }); + } } }; @@ -377,6 +396,8 @@ function setupChannel(target, channel) { } else if (handle instanceof process.binding('tcp_wrap').TCP || handle instanceof process.binding('pipe_wrap').Pipe) { message.type = 'net.Native'; + } else if (handle instanceof dgram.Socket) { + message.type = 'dgram.Socket'; } else if (handle instanceof process.binding('udp_wrap').UDP) { message.type = 'dgram.Native'; } else { diff --git a/lib/dgram.js b/lib/dgram.js index 91c2243681..e13f066710 100644 --- a/lib/dgram.js +++ b/lib/dgram.js @@ -141,8 +141,20 @@ function startListening(socket) { socket.emit('listening'); } +function replaceHandle(self, newHandle) { -Socket.prototype.bind = function(port, address, callback) { + // Set up the handle that we got from master. + newHandle.lookup = self._handle.lookup; + newHandle.bind = self._handle.bind; + newHandle.send = self._handle.send; + newHandle.owner = self; + + // Replace the existing handle by the handle we got from master. + self._handle.close(); + self._handle = newHandle; +} + +Socket.prototype.bind = function(/*port, address, callback*/) { var self = this; self._healthCheck(); @@ -152,8 +164,18 @@ Socket.prototype.bind = function(port, address, callback) { this._bindState = BIND_STATE_BINDING; - if (typeof callback === 'function') - self.once('listening', callback); + if (typeof arguments[arguments.length - 1] === 'function') + self.once('listening', arguments[arguments.length - 1]); + + var UDP = process.binding('udp_wrap').UDP; + if (arguments[0] instanceof UDP) { + replaceHandle(self, arguments[0]); + startListening(self); + return; + } + + var port = arguments[0]; + var address = arguments[1]; // resolve address first self._handle.lookup(address, function(err, ip) { @@ -172,16 +194,7 @@ Socket.prototype.bind = function(port, address, callback) { // handle has been closed in the mean time. return handle.close(); - // Set up the handle that we got from master. - handle.lookup = self._handle.lookup; - handle.bind = self._handle.bind; - handle.send = self._handle.send; - handle.owner = self; - - // Replace the existing handle by the handle we got from master. - self._handle.close(); - self._handle = handle; - + replaceHandle(self, handle); startListening(self); }); diff --git a/test/simple/test-child-process-fork-dgram.js b/test/simple/test-child-process-fork-dgram.js new file mode 100644 index 0000000000..039310fc11 --- /dev/null +++ b/test/simple/test-child-process-fork-dgram.js @@ -0,0 +1,103 @@ +// Copyright Joyent, Inc. and other Node contributors. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the +// "Software"), to deal in the Software without restriction, including +// without limitation the rights to use, copy, modify, merge, publish, +// distribute, sublicense, and/or sell copies of the Software, and to permit +// persons to whom the Software is furnished to do so, subject to the +// following conditions: +// +// The above copyright notice and this permission notice shall be included +// in all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN +// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR +// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE +// USE OR OTHER DEALINGS IN THE SOFTWARE. + +var dgram = require('dgram'); +var fork = require('child_process').fork; +var assert = require('assert'); +var common = require('../common'); + +if (process.argv[2] === 'child') { + var childCollected = 0; + var server; + + process.on('message', function removeMe(msg, clusterServer) { + if (msg === 'server') { + server = clusterServer; + + server.on('message', function () { + childCollected += 1; + }); + + } else if (msg === 'stop') { + server.close(); + process.send(childCollected); + process.removeListener('message', removeMe); + } + }); + +} else { + var server = dgram.createSocket('udp4'); + var client = dgram.createSocket('udp4'); + var child = fork(__filename, ['child']); + var msg = new Buffer('Some bytes'); + + var parentCollected = 0; + var childCollected = 0; + server.on('message', function (msg, rinfo) { + parentCollected += 1; + }); + + server.on('listening', function () { + child.send('server', server); + + sendMessages(); + }); + + var sendMessages = function () { + var wait = 0; + var send = 0; + var total = 100; + + var timer = setInterval(function () { + send += 1; + if (send === total) { + clearInterval(timer); + } + + client.send(msg, 0, msg.length, common.PORT, '127.0.0.1', function(err) { + if (err) throw err; + + wait += 1; + if (wait === total) { + shutdown(); + } + } + ); + }, 1); + }; + + var shutdown = function () { + child.send('stop'); + child.once('message', function (collected) { + childCollected = collected; + }); + + server.close(); + client.close(); + }; + + server.bind(common.PORT, '127.0.0.1'); + + process.once('exit', function () { + assert(childCollected > 0); + assert(parentCollected > 0); + }); +}