diff --git a/lib/net.js b/lib/net.js index 656f824765..820a9a8eb0 100644 --- a/lib/net.js +++ b/lib/net.js @@ -1,4 +1,5 @@ var sys = require("./sys"); +var fs = require("./fs"); var debugLevel = 0; if ('NODE_DEBUG' in process.ENV) debugLevel = 1; function debug (x) { @@ -43,13 +44,13 @@ function FreeList (name, max, constructor) { } FreeList.prototype.alloc = function () { - debug("alloc " + this.name + " " + this.list.length); + //debug("alloc " + this.name + " " + this.list.length); return this.list.length ? this.list.shift() : this.constructor.apply(this, arguments); } FreeList.prototype.free = function (obj) { - debug("free " + this.name + " " + this.list.length); + //debug("free " + this.name + " " + this.list.length); if (this.list.length < this.max) { this.list.push(obj); } @@ -88,7 +89,7 @@ function initSocket (self) { allocRecvBuffer(); } - debug('recvBuffer.used ' + recvBuffer.used); + //debug('recvBuffer.used ' + recvBuffer.used); var bytesRead; try { @@ -97,7 +98,7 @@ function initSocket (self) { recvBuffer, recvBuffer.used, recvBuffer.length - recvBuffer.used); - debug('recvMsg.fd ' + recvMsg.fd); + //debug('recvMsg.fd ' + recvMsg.fd); if (recvMsg.fd) { self.emit('fd', recvMsg.fd); } @@ -112,7 +113,7 @@ function initSocket (self) { return; } - debug('bytesRead ' + bytesRead + '\n'); + //debug('bytesRead ' + bytesRead + '\n'); if (!recvMsg.fd && bytesRead == 0) { self.readable = false; @@ -125,14 +126,32 @@ function initSocket (self) { } else if (bytesRead > 0) { var start = recvBuffer.used; var end = recvBuffer.used + bytesRead; - - if (self._events && self._events['data']) { - // emit a slice - self.emit('data', recvBuffer.slice(start, end)); + + if (!self._encoding) { + if (self._events && self._events['data']) { + // emit a slice + self.emit('data', recvBuffer.slice(start, end)); + } + + // Optimization: emit the original buffer with end points + if (self.ondata) self.ondata(recvBuffer, start, end); + } else { + // TODO remove me - we should only output Buffer + + var string; + switch (self._encoding) { + case 'utf8': + string = recvBuffer.utf8Slice(start, end); + break; + case 'ascii': + string = recvBuffer.asciiSlice(start, end); + break; + default: + throw new Error('Unsupported encoding ' + self._encoding + '. Use Buffer'); + } + self.emit('data', string); } - // Optimization: emit the original buffer with end points - if (self.ondata) self.ondata(recvBuffer, start, end); recvBuffer.used += bytesRead; } @@ -173,8 +192,7 @@ function Socket (peerInfo) { this.remoteAddress = peerInfo.remoteAddress; this.remotePort = peerInfo.remotePort; - this._readWatcher.set(this.fd, true, false); - this._readWatcher.start(); + this.resume(); this.readable = true; this._writeWatcher.set(this.fd, false, true); @@ -206,6 +224,7 @@ Socket.prototype._writeString = function (data, encoding) { var self = this; if (!self.writable) throw new Error('Socket is not writable'); var buffer; + if (self._writeQueue.length == 0) { buffer = self._allocateSendBuffer(); } else { @@ -229,16 +248,12 @@ Socket.prototype._writeString = function (data, encoding) { bytesWritten = charsWritten; } else if (encoding.toLowerCase() == 'utf8') { buffer.isFd = false; - charsWritten = buffer.utf8Write(data, - buffer.used, - buffer.length - buffer.used); + charsWritten = buffer.utf8Write(data, buffer.used); bytesWritten = Buffer.utf8ByteLength(data.slice(0, charsWritten)); } else { // ascii buffer.isFd = false; - charsWritten = buffer.asciiWrite(data, - buffer.used, - buffer.length - buffer.used); + charsWritten = buffer.asciiWrite(data, buffer.used); bytesWritten = charsWritten; } @@ -249,12 +264,12 @@ Socket.prototype._writeString = function (data, encoding) { self._writeQueueSize += bytesWritten; } - debug('charsWritten ' + charsWritten); - debug('buffer.used ' + buffer.used); + //debug('charsWritten ' + charsWritten); + //debug('buffer.used ' + buffer.used); // If we didn't finish, then recurse with the rest of the string. if (charsWritten < data.length) { - debug('recursive write'); + //debug('recursive write'); self._writeString(data.slice(charsWritten), encoding); } }; @@ -270,6 +285,10 @@ Socket.prototype.send = function () { throw new Error('send renamed to write'); }; +Socket.prototype.setEncoding = function (enc) { + // TODO check values, error out on bad, and deprecation message? + this._encoding = enc.toLowerCase(); +}; // Returns true if all the data was flushed to socket. Returns false if // something was queued. If data was queued, then the "drain" event will @@ -368,11 +387,11 @@ Socket.prototype.flush = function () { if (b.isFd) { b.sent = b.used; self._writeMessageQueueSize -= 1; - debug('sent fd: ' + fdToSend); + //debug('sent fd: ' + fdToSend); } else { b.sent += bytesWritten; self._writeQueueSize -= bytesWritten; - debug('bytes sent: ' + b.sent); + //debug('bytes sent: ' + b.sent); } } if (self._writeWatcher) self._writeWatcher.stop(); @@ -380,6 +399,39 @@ Socket.prototype.flush = function () { }; +function doConnect (socket, port, host) { + try { + connect(socket.fd, port, host); + } catch (e) { + socket.forceClose(e); + } + + // Don't start the read watcher until connection is established + socket._readWatcher.set(socket.fd, true, false); + + // How to connect on POSIX: Wait for fd to become writable, then call + // socketError() if there isn't an error, we're connected. AFAIK this a + // platform independent way determining when a non-blocking connection + // is established, but I have only seen it documented in the Linux + // Manual Page connect(2) under the error code EINPROGRESS. + socket._writeWatcher.set(socket.fd, false, true); + socket._writeWatcher.start(); + socket._writeWatcher.callback = function () { + var errno = socketError(socket.fd); + if (errno == 0) { + // connection established + socket._readWatcher.start(); + socket.readable = true; + socket.writable = true; + socket._writeWatcher.callback = socket._doFlush; + socket.emit('connect'); + } else if (errno != EINPROGRESS) { + socket.forceClose(errnoException(errno, 'connect')); + } + }; +} + + // var stream = new Socket(); // stream.connect(80) - TCP connect to port 80 on the localhost // stream.connect(80, 'nodejs.org') - TCP connect to port 80 on nodejs.org @@ -390,55 +442,28 @@ Socket.prototype.connect = function () { var self = this; if (self.fd) throw new Error('Socket already opened'); - function doConnect () { - try { - connect(self.fd, arguments[0], arguments[1]); - } catch (e) { - close(self.fd); - throw e; - } - - // Don't start the read watcher until connection is established - self._readWatcher.set(self.fd, true, false); - - // How to connect on POSIX: Wait for fd to become writable, then call - // socketError() if there isn't an error, we're connected. AFAIK this a - // platform independent way determining when a non-blocking connection - // is established, but I have only seen it documented in the Linux - // Manual Page connect(2) under the error code EINPROGRESS. - self._writeWatcher.set(self.fd, false, true); - self._writeWatcher.start(); - self._writeWatcher.callback = function () { - var errno = socketError(self.fd); - if (errno == 0) { - // connection established - self._readWatcher.start(); - self.readable = true; - self.writable = true; - self._writeWatcher.callback = self._doFlush; - self.emit('connect'); - } else if (errno != EINPROGRESS) { - self.forceClose(errnoException(errno, 'connect')); - } - }; - } - if (typeof(arguments[0]) == 'string') { self.fd = socket('unix'); self.type = 'unix'; // TODO check if sockfile exists? - doConnect(arguments[0]); + doConnect(self, arguments[0]); } else { self.fd = socket('tcp'); + debug('new fd = ' + self.fd); self.type = 'tcp'; // TODO dns resolution on arguments[1] var port = arguments[0]; + var yyy = xxx++; lookupDomainName(arguments[1], function (ip) { - doConnect(port, ip); + debug('doConnect ' + self.fd + ' yyy=' + yyy); + doConnect(self, port, ip); + debug('doConnect done ' + self.fd + ' yyy=' + yyy); }); } }; +var xxx = 0; + Socket.prototype.address = function () { return getsockname(this.fd); @@ -449,8 +474,19 @@ Socket.prototype.setNoDelay = function (v) { }; +Socket.prototype.pause = function () { + this._readWatcher.stop(); +}; + +Socket.prototype.resume = function () { + if (!this.fd) throw new Error('Cannot resume() closed Socket.'); + this._readWatcher.set(this.fd, true, false); + this._readWatcher.start(); +}; + Socket.prototype.forceClose = function (exception) { // recvBuffer is shared between sockets, so don't need to free it here. + var self = this; var b; while (this._writeQueue.length) { @@ -472,8 +508,12 @@ Socket.prototype.forceClose = function (exception) { if (this.fd) { close(this.fd); + debug('close ' + this.fd); this.fd = null; - this.emit('close', exception); + process.nextTick(function () { + if (exception) self.emit('error', exception); + self.emit('close', exception ? true : false); + }); } }; @@ -517,12 +557,14 @@ function Server (listener) { self.watcher.callback = function (readable, writeable) { while (self.fd) { var peerInfo = accept(self.fd); - debug('accept: ' + JSON.stringify(peerInfo)); if (!peerInfo) return; var peer = new Socket(peerInfo); peer.type = self.type; peer.server = self; self.emit('connection', peer); + // The 'connect' event probably should be removed for server-side + // sockets. It's redundent. + peer.emit('connect'); } }; } @@ -539,7 +581,13 @@ exports.createServer = function (listener) { */ function lookupDomainName (dn, callback) { if (!needsLookup(dn)) { - callback(dn); + // Always wait until the next tick this is so people can do + // + // server.listen(8000); + // server.addListener('listening', fn); + // + // Marginally slower, but a lot fewer WTFs. + process.nextTick(function () { callback(dn); }) } else { debug("getaddrinfo 4 " + dn); getaddrinfo(dn, 4, function (r4) { @@ -589,9 +637,9 @@ Server.prototype.listen = function () { var path = arguments[0]; self.path = path; // unlink sockfile if it exists - process.fs.stat(path, function (r) { - if (r instanceof Error) { - if (r.errno == ENOENT) { + fs.stat(path, function (err, r) { + if (err) { + if (err.errno == ENOENT) { bind(self.fd, path); doListen(); } else { @@ -601,7 +649,7 @@ Server.prototype.listen = function () { if (!r.isFile()) { throw new Error("Non-file exists at " + path); } else { - process.fs.unlink(path, function (err) { + fs.unlink(path, function (err) { if (err) { throw err; } else { @@ -623,9 +671,7 @@ Server.prototype.listen = function () { self.fd = socket('tcp'); self.type = 'tcp'; var port = arguments[0]; - debug("starting tcp server on port " + port); lookupDomainName(arguments[1], function (ip) { - debug("starting tcp server on ip " + ip); bind(self.fd, port, ip); doListen(); }); @@ -648,7 +694,7 @@ Server.prototype.close = function () { self.fd = null; if (self.type === "unix") { - process.fs.unlink(self.path, function () { + fs.unlink(self.path, function () { self.emit("close"); }); } else { diff --git a/src/node_buffer.cc b/src/node_buffer.cc index d5b68aa7c5..70820dd49b 100644 --- a/src/node_buffer.cc +++ b/src/node_buffer.cc @@ -8,6 +8,8 @@ #include +#define MIN(a,b) ((a) < (b) ? (a) : (b)) + namespace node { using namespace v8; @@ -226,7 +228,8 @@ Handle Buffer::Utf8Write(const Arguments &args) { "Not enough space in Buffer for string"))); } - int written = s->WriteUtf8((char*)p); + int written = s->WriteUtf8((char*)p, buffer->length_ - offset); + return scope.Close(Integer::New(written)); } @@ -253,12 +256,9 @@ Handle Buffer::AsciiWrite(const Arguments &args) { const char *p = buffer->data_ + offset; - if (s->Length() + offset > buffer->length_) { - return ThrowException(Exception::TypeError(String::New( - "Not enough space in Buffer for string"))); - } + size_t towrite = MIN(s->Length(), buffer->length_ - offset); - int written = s->WriteAscii((char*)p); + int written = s->WriteAscii((char*)p, 0, towrite); return scope.Close(Integer::New(written)); } diff --git a/test/pummel/test-tcp-many-clients.js b/test/pummel/test-tcp-many-clients.js index f6de67bc2d..1c27169af4 100644 --- a/test/pummel/test-tcp-many-clients.js +++ b/test/pummel/test-tcp-many-clients.js @@ -1,5 +1,5 @@ require("../common"); -tcp = require("tcp"); +net = require("net"); // settings var bytes = 1024*40; var concurrency = 100; @@ -13,7 +13,7 @@ for (var i = 0; i < bytes; i++) { body += "C"; } -var server = tcp.createServer(function (c) { +var server = net.createServer(function (c) { c.addListener("connect", function () { total_connections++; print("#"); @@ -24,8 +24,10 @@ var server = tcp.createServer(function (c) { server.listen(PORT); function runClient (callback) { - var client = tcp.createConnection(PORT); + var client = net.createConnection(PORT); + client.connections = 0; + client.setEncoding("utf8"); client.addListener("connect", function () { @@ -38,14 +40,25 @@ function runClient (callback) { this.recved += chunk; }); - client.addListener("end", function (had_error) { + client.addListener("end", function () { client.close(); }); + client.addListener("error", function (e) { + puts("\n\nERROOOOOr"); + throw e; + }); + client.addListener("close", function (had_error) { print("."); assert.equal(false, had_error); assert.equal(bytes, client.recved.length); + + if (client.fd) { + puts(client.fd); + } + assert.ok(!client.fd); + if (this.connections < connections_per_client) { this.connect(PORT); } else { @@ -54,13 +67,14 @@ function runClient (callback) { }); } - -var finished_clients = 0; -for (var i = 0; i < concurrency; i++) { - runClient(function () { - if (++finished_clients == concurrency) server.close(); - }); -} +server.addListener('listening', function () { + var finished_clients = 0; + for (var i = 0; i < concurrency; i++) { + runClient(function () { + if (++finished_clients == concurrency) server.close(); + }); + } +}); process.addListener("exit", function () { assert.equal(connections_per_client * concurrency, total_connections); diff --git a/test/pummel/test-tcp-pause.js b/test/pummel/test-tcp-pause.js index 71b83df9e2..adb6154401 100644 --- a/test/pummel/test-tcp-pause.js +++ b/test/pummel/test-tcp-pause.js @@ -1,8 +1,8 @@ require("../common"); -tcp = require("tcp"); +net = require("net"); N = 200; -server = tcp.createServer(function (connection) { +server = net.createServer(function (connection) { function write (j) { if (j >= N) { connection.close(); @@ -21,7 +21,7 @@ server.listen(PORT); recv = ""; chars_recved = 0; -client = tcp.createConnection(PORT); +client = net.createConnection(PORT); client.setEncoding("ascii"); client.addListener("data", function (d) { print(d); diff --git a/test/simple/test-tcp-reconnect.js b/test/simple/test-tcp-reconnect.js index 2a1ce652a5..66d93e30d6 100644 --- a/test/simple/test-tcp-reconnect.js +++ b/test/simple/test-tcp-reconnect.js @@ -1,12 +1,12 @@ require("../common"); -tcp = require("tcp"); +net = require('net'); var N = 50; var c = 0; var client_recv_count = 0; var disconnect_count = 0; -var server = tcp.createServer(function (socket) { +var server = net.createServer(function (socket) { socket.addListener("connect", function () { socket.write("hello\r\n"); }); @@ -20,33 +20,38 @@ var server = tcp.createServer(function (socket) { assert.equal(false, had_error); }); }); + server.listen(PORT); -var client = tcp.createConnection(PORT); +server.addListener('listening', function () { + puts('listening'); + var client = net.createConnection(PORT); -client.setEncoding("UTF8"); + client.setEncoding("UTF8"); -client.addListener("connect", function () { - puts("client connected."); -}); + client.addListener("connect", function () { + puts("client connected."); + }); -client.addListener("data", function (chunk) { - client_recv_count += 1; - puts("client_recv_count " + client_recv_count); - assert.equal("hello\r\n", chunk); - client.close(); -}); + client.addListener("data", function (chunk) { + client_recv_count += 1; + puts("client_recv_count " + client_recv_count); + assert.equal("hello\r\n", chunk); + client.close(); + }); -client.addListener("close", function (had_error) { - puts("disconnect"); - assert.equal(false, had_error); - if (disconnect_count++ < N) - client.connect(PORT); // reconnect - else - server.close(); + client.addListener("close", function (had_error) { + puts("disconnect"); + assert.equal(false, had_error); + if (disconnect_count++ < N) + client.connect(PORT); // reconnect + else + server.close(); + }); }); process.addListener("exit", function () { assert.equal(N+1, disconnect_count); assert.equal(N+1, client_recv_count); }); +