Browse Source

Further net2 compatibilities

v0.7.4-release
Ryan Dahl 15 years ago
parent
commit
c1a0ade7e7
  1. 158
      lib/net.js
  2. 12
      src/node_buffer.cc
  3. 30
      test/pummel/test-tcp-many-clients.js
  4. 6
      test/pummel/test-tcp-pause.js
  5. 23
      test/simple/test-tcp-reconnect.js

158
lib/net.js

@ -1,4 +1,5 @@
var sys = require("./sys"); var sys = require("./sys");
var fs = require("./fs");
var debugLevel = 0; var debugLevel = 0;
if ('NODE_DEBUG' in process.ENV) debugLevel = 1; if ('NODE_DEBUG' in process.ENV) debugLevel = 1;
function debug (x) { function debug (x) {
@ -43,13 +44,13 @@ function FreeList (name, max, constructor) {
} }
FreeList.prototype.alloc = function () { FreeList.prototype.alloc = function () {
debug("alloc " + this.name + " " + this.list.length); //debug("alloc " + this.name + " " + this.list.length);
return this.list.length ? this.list.shift() return this.list.length ? this.list.shift()
: this.constructor.apply(this, arguments); : this.constructor.apply(this, arguments);
} }
FreeList.prototype.free = function (obj) { FreeList.prototype.free = function (obj) {
debug("free " + this.name + " " + this.list.length); //debug("free " + this.name + " " + this.list.length);
if (this.list.length < this.max) { if (this.list.length < this.max) {
this.list.push(obj); this.list.push(obj);
} }
@ -88,7 +89,7 @@ function initSocket (self) {
allocRecvBuffer(); allocRecvBuffer();
} }
debug('recvBuffer.used ' + recvBuffer.used); //debug('recvBuffer.used ' + recvBuffer.used);
var bytesRead; var bytesRead;
try { try {
@ -97,7 +98,7 @@ function initSocket (self) {
recvBuffer, recvBuffer,
recvBuffer.used, recvBuffer.used,
recvBuffer.length - recvBuffer.used); recvBuffer.length - recvBuffer.used);
debug('recvMsg.fd ' + recvMsg.fd); //debug('recvMsg.fd ' + recvMsg.fd);
if (recvMsg.fd) { if (recvMsg.fd) {
self.emit('fd', recvMsg.fd); self.emit('fd', recvMsg.fd);
} }
@ -112,7 +113,7 @@ function initSocket (self) {
return; return;
} }
debug('bytesRead ' + bytesRead + '\n'); //debug('bytesRead ' + bytesRead + '\n');
if (!recvMsg.fd && bytesRead == 0) { if (!recvMsg.fd && bytesRead == 0) {
self.readable = false; self.readable = false;
@ -126,6 +127,7 @@ function initSocket (self) {
var start = recvBuffer.used; var start = recvBuffer.used;
var end = recvBuffer.used + bytesRead; var end = recvBuffer.used + bytesRead;
if (!self._encoding) {
if (self._events && self._events['data']) { if (self._events && self._events['data']) {
// emit a slice // emit a slice
self.emit('data', recvBuffer.slice(start, end)); self.emit('data', recvBuffer.slice(start, end));
@ -133,6 +135,23 @@ function initSocket (self) {
// Optimization: emit the original buffer with end points // Optimization: emit the original buffer with end points
if (self.ondata) self.ondata(recvBuffer, start, end); if (self.ondata) self.ondata(recvBuffer, start, end);
} else {
// TODO remove me - we should only output Buffer
var string;
switch (self._encoding) {
case 'utf8':
string = recvBuffer.utf8Slice(start, end);
break;
case 'ascii':
string = recvBuffer.asciiSlice(start, end);
break;
default:
throw new Error('Unsupported encoding ' + self._encoding + '. Use Buffer');
}
self.emit('data', string);
}
recvBuffer.used += bytesRead; recvBuffer.used += bytesRead;
} }
@ -173,8 +192,7 @@ function Socket (peerInfo) {
this.remoteAddress = peerInfo.remoteAddress; this.remoteAddress = peerInfo.remoteAddress;
this.remotePort = peerInfo.remotePort; this.remotePort = peerInfo.remotePort;
this._readWatcher.set(this.fd, true, false); this.resume();
this._readWatcher.start();
this.readable = true; this.readable = true;
this._writeWatcher.set(this.fd, false, true); this._writeWatcher.set(this.fd, false, true);
@ -206,6 +224,7 @@ Socket.prototype._writeString = function (data, encoding) {
var self = this; var self = this;
if (!self.writable) throw new Error('Socket is not writable'); if (!self.writable) throw new Error('Socket is not writable');
var buffer; var buffer;
if (self._writeQueue.length == 0) { if (self._writeQueue.length == 0) {
buffer = self._allocateSendBuffer(); buffer = self._allocateSendBuffer();
} else { } else {
@ -229,16 +248,12 @@ Socket.prototype._writeString = function (data, encoding) {
bytesWritten = charsWritten; bytesWritten = charsWritten;
} else if (encoding.toLowerCase() == 'utf8') { } else if (encoding.toLowerCase() == 'utf8') {
buffer.isFd = false; buffer.isFd = false;
charsWritten = buffer.utf8Write(data, charsWritten = buffer.utf8Write(data, buffer.used);
buffer.used,
buffer.length - buffer.used);
bytesWritten = Buffer.utf8ByteLength(data.slice(0, charsWritten)); bytesWritten = Buffer.utf8ByteLength(data.slice(0, charsWritten));
} else { } else {
// ascii // ascii
buffer.isFd = false; buffer.isFd = false;
charsWritten = buffer.asciiWrite(data, charsWritten = buffer.asciiWrite(data, buffer.used);
buffer.used,
buffer.length - buffer.used);
bytesWritten = charsWritten; bytesWritten = charsWritten;
} }
@ -249,12 +264,12 @@ Socket.prototype._writeString = function (data, encoding) {
self._writeQueueSize += bytesWritten; self._writeQueueSize += bytesWritten;
} }
debug('charsWritten ' + charsWritten); //debug('charsWritten ' + charsWritten);
debug('buffer.used ' + buffer.used); //debug('buffer.used ' + buffer.used);
// If we didn't finish, then recurse with the rest of the string. // If we didn't finish, then recurse with the rest of the string.
if (charsWritten < data.length) { if (charsWritten < data.length) {
debug('recursive write'); //debug('recursive write');
self._writeString(data.slice(charsWritten), encoding); self._writeString(data.slice(charsWritten), encoding);
} }
}; };
@ -270,6 +285,10 @@ Socket.prototype.send = function () {
throw new Error('send renamed to write'); throw new Error('send renamed to write');
}; };
Socket.prototype.setEncoding = function (enc) {
// TODO check values, error out on bad, and deprecation message?
this._encoding = enc.toLowerCase();
};
// Returns true if all the data was flushed to socket. Returns false if // Returns true if all the data was flushed to socket. Returns false if
// something was queued. If data was queued, then the "drain" event will // something was queued. If data was queued, then the "drain" event will
@ -368,11 +387,11 @@ Socket.prototype.flush = function () {
if (b.isFd) { if (b.isFd) {
b.sent = b.used; b.sent = b.used;
self._writeMessageQueueSize -= 1; self._writeMessageQueueSize -= 1;
debug('sent fd: ' + fdToSend); //debug('sent fd: ' + fdToSend);
} else { } else {
b.sent += bytesWritten; b.sent += bytesWritten;
self._writeQueueSize -= bytesWritten; self._writeQueueSize -= bytesWritten;
debug('bytes sent: ' + b.sent); //debug('bytes sent: ' + b.sent);
} }
} }
if (self._writeWatcher) self._writeWatcher.stop(); if (self._writeWatcher) self._writeWatcher.stop();
@ -380,65 +399,71 @@ Socket.prototype.flush = function () {
}; };
// var stream = new Socket(); function doConnect (socket, port, host) {
// stream.connect(80) - TCP connect to port 80 on the localhost
// stream.connect(80, 'nodejs.org') - TCP connect to port 80 on nodejs.org
// stream.connect('/tmp/socket') - UNIX connect to socket specified by path
Socket.prototype.connect = function () {
initSocket(this);
var self = this;
if (self.fd) throw new Error('Socket already opened');
function doConnect () {
try { try {
connect(self.fd, arguments[0], arguments[1]); connect(socket.fd, port, host);
} catch (e) { } catch (e) {
close(self.fd); socket.forceClose(e);
throw e;
} }
// Don't start the read watcher until connection is established // Don't start the read watcher until connection is established
self._readWatcher.set(self.fd, true, false); socket._readWatcher.set(socket.fd, true, false);
// How to connect on POSIX: Wait for fd to become writable, then call // How to connect on POSIX: Wait for fd to become writable, then call
// socketError() if there isn't an error, we're connected. AFAIK this a // socketError() if there isn't an error, we're connected. AFAIK this a
// platform independent way determining when a non-blocking connection // platform independent way determining when a non-blocking connection
// is established, but I have only seen it documented in the Linux // is established, but I have only seen it documented in the Linux
// Manual Page connect(2) under the error code EINPROGRESS. // Manual Page connect(2) under the error code EINPROGRESS.
self._writeWatcher.set(self.fd, false, true); socket._writeWatcher.set(socket.fd, false, true);
self._writeWatcher.start(); socket._writeWatcher.start();
self._writeWatcher.callback = function () { socket._writeWatcher.callback = function () {
var errno = socketError(self.fd); var errno = socketError(socket.fd);
if (errno == 0) { if (errno == 0) {
// connection established // connection established
self._readWatcher.start(); socket._readWatcher.start();
self.readable = true; socket.readable = true;
self.writable = true; socket.writable = true;
self._writeWatcher.callback = self._doFlush; socket._writeWatcher.callback = socket._doFlush;
self.emit('connect'); socket.emit('connect');
} else if (errno != EINPROGRESS) { } else if (errno != EINPROGRESS) {
self.forceClose(errnoException(errno, 'connect')); socket.forceClose(errnoException(errno, 'connect'));
} }
}; };
} }
// var stream = new Socket();
// stream.connect(80) - TCP connect to port 80 on the localhost
// stream.connect(80, 'nodejs.org') - TCP connect to port 80 on nodejs.org
// stream.connect('/tmp/socket') - UNIX connect to socket specified by path
Socket.prototype.connect = function () {
initSocket(this);
var self = this;
if (self.fd) throw new Error('Socket already opened');
if (typeof(arguments[0]) == 'string') { if (typeof(arguments[0]) == 'string') {
self.fd = socket('unix'); self.fd = socket('unix');
self.type = 'unix'; self.type = 'unix';
// TODO check if sockfile exists? // TODO check if sockfile exists?
doConnect(arguments[0]); doConnect(self, arguments[0]);
} else { } else {
self.fd = socket('tcp'); self.fd = socket('tcp');
debug('new fd = ' + self.fd);
self.type = 'tcp'; self.type = 'tcp';
// TODO dns resolution on arguments[1] // TODO dns resolution on arguments[1]
var port = arguments[0]; var port = arguments[0];
var yyy = xxx++;
lookupDomainName(arguments[1], function (ip) { lookupDomainName(arguments[1], function (ip) {
doConnect(port, ip); debug('doConnect ' + self.fd + ' yyy=' + yyy);
doConnect(self, port, ip);
debug('doConnect done ' + self.fd + ' yyy=' + yyy);
}); });
} }
}; };
var xxx = 0;
Socket.prototype.address = function () { Socket.prototype.address = function () {
return getsockname(this.fd); return getsockname(this.fd);
@ -449,8 +474,19 @@ Socket.prototype.setNoDelay = function (v) {
}; };
Socket.prototype.pause = function () {
this._readWatcher.stop();
};
Socket.prototype.resume = function () {
if (!this.fd) throw new Error('Cannot resume() closed Socket.');
this._readWatcher.set(this.fd, true, false);
this._readWatcher.start();
};
Socket.prototype.forceClose = function (exception) { Socket.prototype.forceClose = function (exception) {
// recvBuffer is shared between sockets, so don't need to free it here. // recvBuffer is shared between sockets, so don't need to free it here.
var self = this;
var b; var b;
while (this._writeQueue.length) { while (this._writeQueue.length) {
@ -472,8 +508,12 @@ Socket.prototype.forceClose = function (exception) {
if (this.fd) { if (this.fd) {
close(this.fd); close(this.fd);
debug('close ' + this.fd);
this.fd = null; this.fd = null;
this.emit('close', exception); process.nextTick(function () {
if (exception) self.emit('error', exception);
self.emit('close', exception ? true : false);
});
} }
}; };
@ -517,12 +557,14 @@ function Server (listener) {
self.watcher.callback = function (readable, writeable) { self.watcher.callback = function (readable, writeable) {
while (self.fd) { while (self.fd) {
var peerInfo = accept(self.fd); var peerInfo = accept(self.fd);
debug('accept: ' + JSON.stringify(peerInfo));
if (!peerInfo) return; if (!peerInfo) return;
var peer = new Socket(peerInfo); var peer = new Socket(peerInfo);
peer.type = self.type; peer.type = self.type;
peer.server = self; peer.server = self;
self.emit('connection', peer); self.emit('connection', peer);
// The 'connect' event probably should be removed for server-side
// sockets. It's redundent.
peer.emit('connect');
} }
}; };
} }
@ -539,7 +581,13 @@ exports.createServer = function (listener) {
*/ */
function lookupDomainName (dn, callback) { function lookupDomainName (dn, callback) {
if (!needsLookup(dn)) { if (!needsLookup(dn)) {
callback(dn); // Always wait until the next tick this is so people can do
//
// server.listen(8000);
// server.addListener('listening', fn);
//
// Marginally slower, but a lot fewer WTFs.
process.nextTick(function () { callback(dn); })
} else { } else {
debug("getaddrinfo 4 " + dn); debug("getaddrinfo 4 " + dn);
getaddrinfo(dn, 4, function (r4) { getaddrinfo(dn, 4, function (r4) {
@ -589,9 +637,9 @@ Server.prototype.listen = function () {
var path = arguments[0]; var path = arguments[0];
self.path = path; self.path = path;
// unlink sockfile if it exists // unlink sockfile if it exists
process.fs.stat(path, function (r) { fs.stat(path, function (err, r) {
if (r instanceof Error) { if (err) {
if (r.errno == ENOENT) { if (err.errno == ENOENT) {
bind(self.fd, path); bind(self.fd, path);
doListen(); doListen();
} else { } else {
@ -601,7 +649,7 @@ Server.prototype.listen = function () {
if (!r.isFile()) { if (!r.isFile()) {
throw new Error("Non-file exists at " + path); throw new Error("Non-file exists at " + path);
} else { } else {
process.fs.unlink(path, function (err) { fs.unlink(path, function (err) {
if (err) { if (err) {
throw err; throw err;
} else { } else {
@ -623,9 +671,7 @@ Server.prototype.listen = function () {
self.fd = socket('tcp'); self.fd = socket('tcp');
self.type = 'tcp'; self.type = 'tcp';
var port = arguments[0]; var port = arguments[0];
debug("starting tcp server on port " + port);
lookupDomainName(arguments[1], function (ip) { lookupDomainName(arguments[1], function (ip) {
debug("starting tcp server on ip " + ip);
bind(self.fd, port, ip); bind(self.fd, port, ip);
doListen(); doListen();
}); });
@ -648,7 +694,7 @@ Server.prototype.close = function () {
self.fd = null; self.fd = null;
if (self.type === "unix") { if (self.type === "unix") {
process.fs.unlink(self.path, function () { fs.unlink(self.path, function () {
self.emit("close"); self.emit("close");
}); });
} else { } else {

12
src/node_buffer.cc

@ -8,6 +8,8 @@
#include <node.h> #include <node.h>
#define MIN(a,b) ((a) < (b) ? (a) : (b))
namespace node { namespace node {
using namespace v8; using namespace v8;
@ -226,7 +228,8 @@ Handle<Value> Buffer::Utf8Write(const Arguments &args) {
"Not enough space in Buffer for string"))); "Not enough space in Buffer for string")));
} }
int written = s->WriteUtf8((char*)p); int written = s->WriteUtf8((char*)p, buffer->length_ - offset);
return scope.Close(Integer::New(written)); return scope.Close(Integer::New(written));
} }
@ -253,12 +256,9 @@ Handle<Value> Buffer::AsciiWrite(const Arguments &args) {
const char *p = buffer->data_ + offset; const char *p = buffer->data_ + offset;
if (s->Length() + offset > buffer->length_) { size_t towrite = MIN(s->Length(), buffer->length_ - offset);
return ThrowException(Exception::TypeError(String::New(
"Not enough space in Buffer for string")));
}
int written = s->WriteAscii((char*)p); int written = s->WriteAscii((char*)p, 0, towrite);
return scope.Close(Integer::New(written)); return scope.Close(Integer::New(written));
} }

30
test/pummel/test-tcp-many-clients.js

@ -1,5 +1,5 @@
require("../common"); require("../common");
tcp = require("tcp"); net = require("net");
// settings // settings
var bytes = 1024*40; var bytes = 1024*40;
var concurrency = 100; var concurrency = 100;
@ -13,7 +13,7 @@ for (var i = 0; i < bytes; i++) {
body += "C"; body += "C";
} }
var server = tcp.createServer(function (c) { var server = net.createServer(function (c) {
c.addListener("connect", function () { c.addListener("connect", function () {
total_connections++; total_connections++;
print("#"); print("#");
@ -24,8 +24,10 @@ var server = tcp.createServer(function (c) {
server.listen(PORT); server.listen(PORT);
function runClient (callback) { function runClient (callback) {
var client = tcp.createConnection(PORT); var client = net.createConnection(PORT);
client.connections = 0; client.connections = 0;
client.setEncoding("utf8"); client.setEncoding("utf8");
client.addListener("connect", function () { client.addListener("connect", function () {
@ -38,14 +40,25 @@ function runClient (callback) {
this.recved += chunk; this.recved += chunk;
}); });
client.addListener("end", function (had_error) { client.addListener("end", function () {
client.close(); client.close();
}); });
client.addListener("error", function (e) {
puts("\n\nERROOOOOr");
throw e;
});
client.addListener("close", function (had_error) { client.addListener("close", function (had_error) {
print("."); print(".");
assert.equal(false, had_error); assert.equal(false, had_error);
assert.equal(bytes, client.recved.length); assert.equal(bytes, client.recved.length);
if (client.fd) {
puts(client.fd);
}
assert.ok(!client.fd);
if (this.connections < connections_per_client) { if (this.connections < connections_per_client) {
this.connect(PORT); this.connect(PORT);
} else { } else {
@ -54,13 +67,14 @@ function runClient (callback) {
}); });
} }
server.addListener('listening', function () {
var finished_clients = 0; var finished_clients = 0;
for (var i = 0; i < concurrency; i++) { for (var i = 0; i < concurrency; i++) {
runClient(function () { runClient(function () {
if (++finished_clients == concurrency) server.close(); if (++finished_clients == concurrency) server.close();
}); });
} }
});
process.addListener("exit", function () { process.addListener("exit", function () {
assert.equal(connections_per_client * concurrency, total_connections); assert.equal(connections_per_client * concurrency, total_connections);

6
test/pummel/test-tcp-pause.js

@ -1,8 +1,8 @@
require("../common"); require("../common");
tcp = require("tcp"); net = require("net");
N = 200; N = 200;
server = tcp.createServer(function (connection) { server = net.createServer(function (connection) {
function write (j) { function write (j) {
if (j >= N) { if (j >= N) {
connection.close(); connection.close();
@ -21,7 +21,7 @@ server.listen(PORT);
recv = ""; recv = "";
chars_recved = 0; chars_recved = 0;
client = tcp.createConnection(PORT); client = net.createConnection(PORT);
client.setEncoding("ascii"); client.setEncoding("ascii");
client.addListener("data", function (d) { client.addListener("data", function (d) {
print(d); print(d);

23
test/simple/test-tcp-reconnect.js

@ -1,12 +1,12 @@
require("../common"); require("../common");
tcp = require("tcp"); net = require('net');
var N = 50; var N = 50;
var c = 0; var c = 0;
var client_recv_count = 0; var client_recv_count = 0;
var disconnect_count = 0; var disconnect_count = 0;
var server = tcp.createServer(function (socket) { var server = net.createServer(function (socket) {
socket.addListener("connect", function () { socket.addListener("connect", function () {
socket.write("hello\r\n"); socket.write("hello\r\n");
}); });
@ -20,33 +20,38 @@ var server = tcp.createServer(function (socket) {
assert.equal(false, had_error); assert.equal(false, had_error);
}); });
}); });
server.listen(PORT); server.listen(PORT);
var client = tcp.createConnection(PORT); server.addListener('listening', function () {
puts('listening');
var client = net.createConnection(PORT);
client.setEncoding("UTF8"); client.setEncoding("UTF8");
client.addListener("connect", function () { client.addListener("connect", function () {
puts("client connected."); puts("client connected.");
}); });
client.addListener("data", function (chunk) { client.addListener("data", function (chunk) {
client_recv_count += 1; client_recv_count += 1;
puts("client_recv_count " + client_recv_count); puts("client_recv_count " + client_recv_count);
assert.equal("hello\r\n", chunk); assert.equal("hello\r\n", chunk);
client.close(); client.close();
}); });
client.addListener("close", function (had_error) { client.addListener("close", function (had_error) {
puts("disconnect"); puts("disconnect");
assert.equal(false, had_error); assert.equal(false, had_error);
if (disconnect_count++ < N) if (disconnect_count++ < N)
client.connect(PORT); // reconnect client.connect(PORT); // reconnect
else else
server.close(); server.close();
});
}); });
process.addListener("exit", function () { process.addListener("exit", function () {
assert.equal(N+1, disconnect_count); assert.equal(N+1, disconnect_count);
assert.equal(N+1, client_recv_count); assert.equal(N+1, client_recv_count);
}); });

Loading…
Cancel
Save