From 26c08a3f35f251c86a881aa70a8e709216feb03d Mon Sep 17 00:00:00 2001 From: Ryan Dahl Date: Fri, 7 Oct 2011 13:58:49 -0700 Subject: [PATCH] Do load balancing test in test-child-process-fork2. --- lib/child_process_uv.js | 43 +++++++++++--------- lib/net_uv.js | 52 ++++++++++++++----------- test/fixtures/fork2.js | 20 +++++++++- test/simple/test-child-process-fork2.js | 38 +++++++++++++++--- 4 files changed, 106 insertions(+), 47 deletions(-) diff --git a/lib/child_process_uv.js b/lib/child_process_uv.js index a0f23edbb0..2e6c914178 100644 --- a/lib/child_process_uv.js +++ b/lib/child_process_uv.js @@ -20,6 +20,7 @@ // USE OR OTHER DEALINGS IN THE SOFTWARE. var EventEmitter = require('events').EventEmitter; +var net = require('net'); var Process = process.binding('process_wrap').Process; var inherits = require('util').inherits; var constants; // if (!constants) constants = process.binding('constants'); @@ -39,8 +40,7 @@ function createPipe(ipc) { } function createSocket(pipe, readable) { - var Socket = require('net').Socket; - var s = new Socket({ handle: pipe }); + var s = new net.Socket({ handle: pipe }); if (readable) { s.writable = false; @@ -75,16 +75,24 @@ function setupChannel(target, channel) { channel.onread = function(pool, offset, length, recvStream) { if (pool) { - for (var i = 0; i < length; i++) { - if (pool[offset + i] === LF) { - jsonBuffer += pool.toString('ascii', offset, offset + i); - var message = JSON.parse(jsonBuffer); - jsonBuffer = pool.toString('ascii', i, length); - offset = i + 1; - - target.emit('message', message, recvStream); + jsonBuffer += pool.toString('ascii', offset, offset + length); + + var i; + while ((i = jsonBuffer.indexOf('\n')) >= 0) { + var json = jsonBuffer.slice(0, i); + var message = JSON.parse(json); + jsonBuffer = jsonBuffer.slice(i + 1); + + if (recvStream) { + // TODO support other types of stream. + // TODO probably need a queue of recvStreams + var server = new net.Server(); + server._handle = recvStream; } + + target.emit('message', message, server); } + } else { channel.close(); target._channel = null; @@ -94,13 +102,12 @@ function setupChannel(target, channel) { target.send = function(message, sendStream) { if (!target._channel) throw new Error("channel closed"); - // Open up net.Socket instances - if (sendStream instanceof require('net').Socket || - sendStream instanceof require('net').Server) { - sendStream = sendStream._handle; - if (!sendStream) { - throw new Error("sendStream handle not yet opened"); + // Open up net.Server instances + if (sendStream) { + if (false == sendStream instanceof net.Server) { + throw new Error("sendStream must be instance of net.Server"); } + sendStream = sendStream._handle; } // For overflow protection don't write if channel queue is too deep. @@ -114,10 +121,10 @@ function setupChannel(target, channel) { if (!writeReq) { throw new Error(errno + " cannot write to IPC channel."); - } else { - writeReq.oncomplete = nop; } + writeReq.oncomplete = nop; + return true; }; diff --git a/lib/net_uv.js b/lib/net_uv.js index ea4aa49b54..1caacf2a58 100644 --- a/lib/net_uv.js +++ b/lib/net_uv.js @@ -621,24 +621,38 @@ function toPort(x) { return (x = Number(x)) >= 0 ? x : false; } function listen(self, address, port, addressType) { var r = 0; + // If there is not yet a handle, we need to create one and bind. + // In the case of a server sent via IPC, we don't need to do this. if (!self._handle) { // assign handle in listen, and clean up if bind or listen fails self._handle = (port == -1 && addressType == -1) ? createPipe() : createTCP(); - } - self._handle.socket = self; - self._handle.onconnection = onconnection; + if (address || port) { + debug('bind to ' + address); + if (addressType == 6) { + r = self._handle.bind6(address, port); + } else { + r = self._handle.bind(address, port); + } + } - if (address || port) { - debug('bind to ' + address); - if (addressType == 6) { - r = self._handle.bind6(address, port); - } else { - r = self._handle.bind(address, port); + if (r) { + self._handle.close(); + self._handle = null; + + process.nextTick(function() { + self.emit('error', errnoException(errno, 'listen')); + }); + return; } } + self._handle.onconnection = onconnection; + self._handle.socket = self; + + r = self._handle.listen(self._backlog || 128); + if (r) { self._handle.close(); self._handle = null; @@ -646,23 +660,15 @@ function listen(self, address, port, addressType) { process.nextTick(function() { self.emit('error', errnoException(errno, 'listen')); }); - } else { - r = self._handle.listen(self._backlog || 128); - if (r) { - self._handle.close(); - self._handle = null; - - process.nextTick(function() { - self.emit('error', errnoException(errno, 'listen')); - }); - } else { - process.nextTick(function() { - self.emit('listening'); - }); - } + return; } + + process.nextTick(function() { + self.emit('listening'); + }); } + Server.prototype.listen = function() { var self = this; diff --git a/test/fixtures/fork2.js b/test/fixtures/fork2.js index 8958a7915a..8aa579c31b 100644 --- a/test/fixtures/fork2.js +++ b/test/fixtures/fork2.js @@ -1,9 +1,27 @@ var assert = require('assert'); +var net = require('net'); + +var connections = 0; process.on('message', function(m, server) { console.log('CHILD got message:', m); assert.ok(m.hello); + assert.ok(server); - process.send({ gotHandle: true }); + assert.ok(server instanceof net.Server); + + // TODO need better API for this. + server._backlog = 9; + + server.listen(function() { + process.send({ gotHandle: true }); + }); + + server.on('connection', function(c) { + connections++; + console.log('CHILD got connection'); + c.destroy(); + process.send({ childConnections: connections }); + }); }); diff --git a/test/simple/test-child-process-fork2.js b/test/simple/test-child-process-fork2.js index 619ba1d154..ef4867da76 100644 --- a/test/simple/test-child-process-fork2.js +++ b/test/simple/test-child-process-fork2.js @@ -3,25 +3,53 @@ var common = require('../common'); var fork = require('child_process').fork; var net = require('net'); +var socketCloses = 0; +var N = 10; + var n = fork(common.fixturesDir + '/fork2.js'); var messageCount = 0; -var server = new net.Server(); +var server = new net.Server(function(c) { + console.log('PARENT got connection'); + c.destroy(); +}); + +// TODO need better API for this. +server._backlog = 9; + server.listen(common.PORT, function() { console.log('PARENT send child server handle'); n.send({ hello: 'world' }, server); }); +function makeConnections() { + for (var i = 0; i < N; i++) { + var socket = net.connect(common.PORT, function() { + console.log("CLIENT connected"); + }); + + socket.on("close", function() { + socketCloses++; + console.log("CLIENT closed " + socketCloses); + if (socketCloses == N) { + n.kill(); + server.close(); + } + }); + } +} n.on('message', function(m) { console.log('PARENT got message:', m); - assert.ok(m.gotHandle); + if (m.gotHandle) { + makeConnections(); + } messageCount++; - n.kill(); - server.close(); }); process.on('exit', function() { - assert.equal(1, messageCount); + assert.equal(10, socketCloses); + assert.ok(messageCount > 1); }); +