From 0618f02f6f2fa326cef971522fc8d5066d7c38f3 Mon Sep 17 00:00:00 2001 From: Ryan Dahl Date: Thu, 17 Dec 2009 09:31:10 +0100 Subject: [PATCH] Implement half-closed streams --- lib/net.js | 302 ++++++++++++++++++++++++++++----------------- src/node_net2.cc | 4 +- test-net-server.js | 19 +++ 3 files changed, 209 insertions(+), 116 deletions(-) diff --git a/lib/net.js b/lib/net.js index 36416d243d..16826a5a4d 100644 --- a/lib/net.js +++ b/lib/net.js @@ -1,36 +1,37 @@ var debugLevel = 0; -if ("NODE_DEBUG" in process.ENV) debugLevel = 1; +if ('NODE_DEBUG' in process.ENV) debugLevel = 1; function debug (x) { if (debugLevel > 0) { - process.stdio.writeError(x + "\n"); + process.stdio.writeError(x + '\n'); } } -var socket = process.socket; -var bind = process.bind; -var listen = process.listen; -var accept = process.accept; -var close = process.close; -var shutdown = process.shutdown; -var read = process.read; -var write = process.write; -var toRead = process.toRead; - -var Stream = function (peerInfo) { + +var socket = process.socket; +var bind = process.bind; +var connect = process.connect; +var listen = process.listen; +var accept = process.accept; +var close = process.close; +var shutdown = process.shutdown; +var read = process.read; +var write = process.write; +var toRead = process.toRead; +var socketError = process.socketError; +var EINPROGRESS = process.EINPROGRESS; + + +function Stream (peerInfo) { process.EventEmitter.call(); var self = this; - self.fd = peerInfo.fd; - self.remoteAddress = peerInfo.remoteAddress; - self.remotePort = peerInfo.remotePort; - // Allocated on demand. self.recvBuffer = null; self.sendQueue = []; self.readWatcher = new process.IOWatcher(function () { - debug("\n" + self.fd + " readable"); + debug('\n' + self.fd + ' readable'); // If this is the first recv (recvBuffer doesn't exist) or we've used up // most of the recvBuffer, allocate a new one. @@ -39,36 +40,50 @@ var Stream = function (peerInfo) { self._allocateNewRecvBuf(); } - debug("recvBuffer.used " + self.recvBuffer.used); + debug('recvBuffer.used ' + self.recvBuffer.used); var bytesRead = read(self.fd, self.recvBuffer, self.recvBuffer.used, self.recvBuffer.length - self.recvBuffer.used); - debug("bytesRead " + bytesRead + "\n"); + debug('bytesRead ' + bytesRead + '\n'); if (bytesRead == 0) { self.readable = false; self.readWatcher.stop(); - self.emit("eof"); + self.emit('eof'); + if (!self.writable) self.forceClose(); } else { var slice = self.recvBuffer.slice(self.recvBuffer.used, self.recvBuffer.used + bytesRead); self.recvBuffer.used += bytesRead; - self.emit("receive", slice); + self.emit('receive', slice); } }); - self.readWatcher.set(self.fd, true, false); - self.readWatcher.start(); - self.writeWatcher = new process.IOWatcher(function () { + self._onWriteFlush = function () { self.flush(); - }); - self.writeWatcher.set(self.fd, false, true); + }; + + self.writeWatcher = new process.IOWatcher(self._onWriteFlush); + + self.readable = false; + self.writable = false; - self.readable = true; - self.writable = true; + if (peerInfo) { + self.fd = peerInfo.fd; + self.remoteAddress = peerInfo.remoteAddress; + self.remotePort = peerInfo.remotePort; + + self.readWatcher.set(self.fd, true, false); + self.readWatcher.start(); + self.writeWatcher.set(self.fd, false, true); + self.readable = true; + self.writable = true; + } }; process.inherits(Stream, process.EventEmitter); +exports.Stream = Stream; + Stream.prototype._allocateNewRecvBuf = function () { var self = this; @@ -95,6 +110,7 @@ Stream.prototype._allocateNewRecvBuf = function () { self.recvBuffer.used = 0; }; + Stream.prototype._allocateSendBuffer = function () { var b = new process.Buffer(1024); b.used = 0; @@ -103,56 +119,62 @@ Stream.prototype._allocateSendBuffer = function () { return b; }; -Stream.prototype.send = function (data, encoding) { + +Stream.prototype._sendString = function (data, encoding) { var self = this; - if (typeof(data) == "string") { - var buffer; - if (self.sendQueue.length == 0) { - buffer = self._allocateSendBuffer(); - } else { - // walk through the sendQueue, find the first empty buffer - for (var i = 0; i < self.sendQueue.length; i++) { - if (self.sendQueue[i].used == 0) { - buffer = self.sendQueue[i]; - break; - } - } - // if we didn't find one, take the last - if (!buffer) { - buffer = self.sendQueue[self.sendQueue.length-1]; - // if last buffer is empty - if (buffer.length == buffer.used) buffer = self._allocateSendBuffer(); + var buffer; + if (self.sendQueue.length == 0) { + buffer = self._allocateSendBuffer(); + } else { + // walk through the sendQueue, find the buffer with free space + for (var i = 0; i < self.sendQueue.length; i++) { + if (self.sendQueue[i].used == 0) { + buffer = self.sendQueue[i]; + break; } } + // if we didn't find one, take the last + if (!buffer) { + buffer = self.sendQueue[self.sendQueue.length-1]; + // if last buffer is used up + if (buffer.length == buffer.used) buffer = self._allocateSendBuffer(); + } + } - encoding = encoding || "ascii"; // default to ascii since it's faster + encoding = encoding || 'ascii'; // default to ascii since it's faster - var charsWritten; + var charsWritten; - if (encoding.toLowerCase() == "utf8") { - charsWritten = buffer.utf8Write(data, - buffer.used, - buffer.length - buffer.used); - buffer.used += process.Buffer.utf8Length(data.slice(0, charsWritten)); - } else { - // ascii - charsWritten = buffer.asciiWrite(data, - buffer.used, - buffer.length - buffer.used); - buffer.used += charsWritten; - debug("ascii charsWritten " + charsWritten); - debug("ascii buffer.used " + buffer.used); - } + if (encoding.toLowerCase() == 'utf8') { + charsWritten = buffer.utf8Write(data, + buffer.used, + buffer.length - buffer.used); + buffer.used += process.Buffer.utf8Length(data.slice(0, charsWritten)); + } else { + // ascii + charsWritten = buffer.asciiWrite(data, + buffer.used, + buffer.length - buffer.used); + buffer.used += charsWritten; + debug('ascii charsWritten ' + charsWritten); + debug('ascii buffer.used ' + buffer.used); + } - // If we didn't finish, then recurse with the rest of the string. - if (charsWritten < data.length) { - debug("recursive send"); - self.send(data.slice(charsWritten), encoding); - } + // If we didn't finish, then recurse with the rest of the string. + if (charsWritten < data.length) { + debug('recursive send'); + self._sendString(data.slice(charsWritten), encoding); + } +}; + + +Stream.prototype.send = function (data, encoding) { + var self = this; + if (typeof(data) == 'string') { + self._sendString(data, encoding); } else { // data is a process.Buffer - // walk through the sendQueue, find the first empty buffer var inserted = false; data.sent = 0; @@ -171,6 +193,7 @@ Stream.prototype.send = function (data, encoding) { this.flush(); }; + // returns true if flushed without getting EAGAIN // false if it got EAGAIN Stream.prototype.flush = function () { @@ -194,51 +217,124 @@ Stream.prototype.flush = function () { return false; } b.sent += bytesWritten; - debug("bytes sent: " + b.sent); + debug('bytes sent: ' + b.sent); } this.writeWatcher.stop(); return true; }; -Stream.prototype.close = function () { - this.readable = false; - this.writable = false; - this.writeWatcher.stop(); - this.readWatcher.stop(); - close(this.fd); - debug("close peer " + this.fd); - this.fd = null; +// var stream = new Stream(); +// stream.connect(80) - TCP connect to port 80 on the localhost +// stream.connect(80, 'nodejs.org') - TCP connect to port 80 on nodejs.org +// stream.connect('/tmp/socket') - UNIX connect to socket specified by path +Stream.prototype.connect = function () { + var self = this; + if (self.fd) throw new Error('Stream already opened'); + + if (typeof(arguments[0]) == 'string' && arguments.length == 1) { + self.fd = process.socket('UNIX'); + // TODO check if sockfile exists? + } else { + self.fd = process.socket('TCP'); + // TODO dns resolution on arguments[1] + } + + try { + connect(self.fd, arguments[0], arguments[1]); + } catch (e) { + close(self.fd); + throw e; + } + + // Don't start the read watcher until connection is established + self.readWatcher.set(self.fd, true, false); + + // How to connect on POSIX: Wait for fd to become writable, then call + // socketError() if there isn't an error, we're connected. AFAIK this a + // platform independent way determining when a non-blocking connection + // is established, but I have only seen it documented in the Linux + // Manual Page connect(2) under the error code EINPROGRESS. + self.writeWatcher.set(self.fd, false, true); + self.writeWatcher.start(); + self.writeWatcher.callback = function () { + var errno = socketError(self.fd); + if (errno == 0) { + // connection established + self.emit('connect'); + self.readWatcher.start(); + self.readable = true; + self.writable = true; + self.writeWatcher.callback = self._onWriteFlush; + } else if (errno != EINPROGRESS) { + var e = new Error('connection error'); + e.errno = errno; + self.readWatcher.stop(); + self.writeWatcher.stop(); + close(self.fd); + } + }; +}; + + +Stream.prototype.forceClose = function (exception) { + if (this.fd) { + this.readable = false; + this.writable = false; + + this.writeWatcher.stop(); + this.readWatcher.stop(); + close(this.fd); + debug('close peer ' + this.fd); + this.fd = null; + this.emit('close', exception); + } }; -var Server = function (listener) { + +Stream.prototype.close = function () { + if (this.readable && this.writable) { + this.writable = false; + shutdown(this.fd, "write"); + } else if (!this.readable && this.writable) { + // already got EOF + this.forceClose(this.fd); + } + // In the case we've already shutdown write side, + // but haven't got EOF: ignore. In the case we're + // fully closed already: ignore. +}; + + +function Server (listener) { var self = this; if (listener) { - self.addListener("connection", listener); + self.addListener('connection', listener); } self.watcher = new process.IOWatcher(function (readable, writeable) { while (self.fd) { var peerInfo = accept(self.fd); - debug("accept: " + JSON.stringify(peerInfo)); + debug('accept: ' + JSON.stringify(peerInfo)); if (!peerInfo) return; var peer = new Stream(peerInfo); - self.emit("connection", peer); + self.emit('connection', peer); } }); }; process.inherits(Server, process.EventEmitter); +exports.Server = Server; + Server.prototype.listen = function () { var self = this; - - if (self.fd) throw new Error("Already running"); + if (self.fd) throw new Error('Server already opened'); var backlogIndex; - if (typeof(arguments[0]) == "string" && arguments.length == 1) { + if (typeof(arguments[0]) == 'string' && arguments.length == 1) { // the first argument specifies a path - self.fd = process.socket("UNIX"); + self.fd = process.socket('UNIX'); // TODO unlink sockfile if exists? // if (lstat(SOCKFILE, &tstat) == 0) { // assert(S_ISSOCK(tstat.st_mode)); @@ -248,46 +344,24 @@ Server.prototype.listen = function () { backlogIndex = 1; } else { // the first argument is the port, the second an IP - self.fd = process.socket("TCP"); + self.fd = process.socket('TCP'); // TODO dns resolution on arguments[1] bind(self.fd, arguments[0], arguments[1]); - backlogIndex = typeof(arguments[1]) == "string" ? 2 : 1; + backlogIndex = typeof(arguments[1]) == 'string' ? 2 : 1; } listen(self.fd, arguments[backlogIndex] ? arguments[backlogIndex] : 128); + self.emit("listening"); + self.watcher.set(self.fd, true, false); self.watcher.start(); }; + Server.prototype.close = function () { var self = this; - if (!self.fd) throw new Error("Not running"); + if (!self.fd) throw new Error('Not running'); self.watcher.stop(); close(self.fd); self.fd = null; }; - -/////////////////////////////////////////////////////// - -process.Buffer.prototype.toString = function () { - return this.utf8Slice(0, this.length); -}; - -var sys = require("sys"); - -var server = new Server(function (peer) { - sys.puts("connection (" + peer.fd + "): " - + peer.remoteAddress - + " port " - + peer.remotePort - ); - sys.puts("server fd: " + server.fd); - - peer.addListener("receive", function (b) { - sys.puts("recv (" + b.length + "): " + b); - peer.send("pong\r\n"); - }); -}); -//server.listen(8000); -server.listen(8000); -sys.puts("server fd: " + server.fd); diff --git a/src/node_net2.cc b/src/node_net2.cc index 7b4a788ca7..af96216e20 100644 --- a/src/node_net2.cc +++ b/src/node_net2.cc @@ -377,7 +377,7 @@ static Handle Accept(const Arguments& args) { } -static Handle GetSocketError(const Arguments& args) { +static Handle SocketError(const Arguments& args) { HandleScope scope; FD_ARG(args[0]) @@ -515,7 +515,7 @@ void InitNet2(Handle target) { NODE_SET_METHOD(target, "bind", Bind); NODE_SET_METHOD(target, "listen", Listen); NODE_SET_METHOD(target, "accept", Accept); - NODE_SET_METHOD(target, "getSocketError", GetSocketError); + NODE_SET_METHOD(target, "socketError", SocketError); NODE_SET_METHOD(target, "toRead", ToRead); diff --git a/test-net-server.js b/test-net-server.js index 5b9fbd085c..aabd7f2b2e 100644 --- a/test-net-server.js +++ b/test-net-server.js @@ -18,6 +18,25 @@ var server = new net.Server(function (stream) { stream.send(b); stream.send("pong utf8\r\n", "utf8"); }); + + stream.addListener("eof", function () { + sys.puts("server peer eof"); + stream.close(); + }); }); server.listen(8000); sys.puts("server fd: " + server.fd); + + +var stream = new net.Stream(); +stream.addListener('connect', function () { + sys.puts("!!!client connected"); + stream.send("hello\n"); +}); + +stream.addListener('receive', function (d) { + sys.puts("!!!client got: " + JSON.stringify(d.toString())); +}); + +stream.connect(8000); +