Browse Source

Do load balancing test in test-child-process-fork2.

v0.7.4-release
Ryan Dahl 13 years ago
parent
commit
26c08a3f35
  1. 43
      lib/child_process_uv.js
  2. 52
      lib/net_uv.js
  3. 20
      test/fixtures/fork2.js
  4. 38
      test/simple/test-child-process-fork2.js

43
lib/child_process_uv.js

@ -20,6 +20,7 @@
// USE OR OTHER DEALINGS IN THE SOFTWARE.
var EventEmitter = require('events').EventEmitter;
var net = require('net');
var Process = process.binding('process_wrap').Process;
var inherits = require('util').inherits;
var constants; // if (!constants) constants = process.binding('constants');
@ -39,8 +40,7 @@ function createPipe(ipc) {
}
function createSocket(pipe, readable) {
var Socket = require('net').Socket;
var s = new Socket({ handle: pipe });
var s = new net.Socket({ handle: pipe });
if (readable) {
s.writable = false;
@ -75,16 +75,24 @@ function setupChannel(target, channel) {
channel.onread = function(pool, offset, length, recvStream) {
if (pool) {
for (var i = 0; i < length; i++) {
if (pool[offset + i] === LF) {
jsonBuffer += pool.toString('ascii', offset, offset + i);
var message = JSON.parse(jsonBuffer);
jsonBuffer = pool.toString('ascii', i, length);
offset = i + 1;
target.emit('message', message, recvStream);
jsonBuffer += pool.toString('ascii', offset, offset + length);
var i;
while ((i = jsonBuffer.indexOf('\n')) >= 0) {
var json = jsonBuffer.slice(0, i);
var message = JSON.parse(json);
jsonBuffer = jsonBuffer.slice(i + 1);
if (recvStream) {
// TODO support other types of stream.
// TODO probably need a queue of recvStreams
var server = new net.Server();
server._handle = recvStream;
}
target.emit('message', message, server);
}
} else {
channel.close();
target._channel = null;
@ -94,13 +102,12 @@ function setupChannel(target, channel) {
target.send = function(message, sendStream) {
if (!target._channel) throw new Error("channel closed");
// Open up net.Socket instances
if (sendStream instanceof require('net').Socket ||
sendStream instanceof require('net').Server) {
sendStream = sendStream._handle;
if (!sendStream) {
throw new Error("sendStream handle not yet opened");
// Open up net.Server instances
if (sendStream) {
if (false == sendStream instanceof net.Server) {
throw new Error("sendStream must be instance of net.Server");
}
sendStream = sendStream._handle;
}
// For overflow protection don't write if channel queue is too deep.
@ -114,10 +121,10 @@ function setupChannel(target, channel) {
if (!writeReq) {
throw new Error(errno + " cannot write to IPC channel.");
} else {
writeReq.oncomplete = nop;
}
writeReq.oncomplete = nop;
return true;
};

52
lib/net_uv.js

@ -621,24 +621,38 @@ function toPort(x) { return (x = Number(x)) >= 0 ? x : false; }
function listen(self, address, port, addressType) {
var r = 0;
// If there is not yet a handle, we need to create one and bind.
// In the case of a server sent via IPC, we don't need to do this.
if (!self._handle) {
// assign handle in listen, and clean up if bind or listen fails
self._handle =
(port == -1 && addressType == -1) ? createPipe() : createTCP();
}
self._handle.socket = self;
self._handle.onconnection = onconnection;
if (address || port) {
debug('bind to ' + address);
if (addressType == 6) {
r = self._handle.bind6(address, port);
} else {
r = self._handle.bind(address, port);
}
}
if (address || port) {
debug('bind to ' + address);
if (addressType == 6) {
r = self._handle.bind6(address, port);
} else {
r = self._handle.bind(address, port);
if (r) {
self._handle.close();
self._handle = null;
process.nextTick(function() {
self.emit('error', errnoException(errno, 'listen'));
});
return;
}
}
self._handle.onconnection = onconnection;
self._handle.socket = self;
r = self._handle.listen(self._backlog || 128);
if (r) {
self._handle.close();
self._handle = null;
@ -646,23 +660,15 @@ function listen(self, address, port, addressType) {
process.nextTick(function() {
self.emit('error', errnoException(errno, 'listen'));
});
} else {
r = self._handle.listen(self._backlog || 128);
if (r) {
self._handle.close();
self._handle = null;
process.nextTick(function() {
self.emit('error', errnoException(errno, 'listen'));
});
} else {
process.nextTick(function() {
self.emit('listening');
});
}
return;
}
process.nextTick(function() {
self.emit('listening');
});
}
Server.prototype.listen = function() {
var self = this;

20
test/fixtures/fork2.js

@ -1,9 +1,27 @@
var assert = require('assert');
var net = require('net');
var connections = 0;
process.on('message', function(m, server) {
console.log('CHILD got message:', m);
assert.ok(m.hello);
assert.ok(server);
process.send({ gotHandle: true });
assert.ok(server instanceof net.Server);
// TODO need better API for this.
server._backlog = 9;
server.listen(function() {
process.send({ gotHandle: true });
});
server.on('connection', function(c) {
connections++;
console.log('CHILD got connection');
c.destroy();
process.send({ childConnections: connections });
});
});

38
test/simple/test-child-process-fork2.js

@ -3,25 +3,53 @@ var common = require('../common');
var fork = require('child_process').fork;
var net = require('net');
var socketCloses = 0;
var N = 10;
var n = fork(common.fixturesDir + '/fork2.js');
var messageCount = 0;
var server = new net.Server();
var server = new net.Server(function(c) {
console.log('PARENT got connection');
c.destroy();
});
// TODO need better API for this.
server._backlog = 9;
server.listen(common.PORT, function() {
console.log('PARENT send child server handle');
n.send({ hello: 'world' }, server);
});
function makeConnections() {
for (var i = 0; i < N; i++) {
var socket = net.connect(common.PORT, function() {
console.log("CLIENT connected");
});
socket.on("close", function() {
socketCloses++;
console.log("CLIENT closed " + socketCloses);
if (socketCloses == N) {
n.kill();
server.close();
}
});
}
}
n.on('message', function(m) {
console.log('PARENT got message:', m);
assert.ok(m.gotHandle);
if (m.gotHandle) {
makeConnections();
}
messageCount++;
n.kill();
server.close();
});
process.on('exit', function() {
assert.equal(1, messageCount);
assert.equal(10, socketCloses);
assert.ok(messageCount > 1);
});

Loading…
Cancel
Save