diff --git a/src/node_buffer.cc b/src/node_buffer.cc index be1020d621..f211283c43 100644 --- a/src/node_buffer.cc +++ b/src/node_buffer.cc @@ -39,7 +39,7 @@ static inline struct buffer* buffer_root(buffer *buffer) { /* Determines the absolute position for a relative offset */ static inline size_t buffer_abs_off(buffer *buffer, size_t off) { struct buffer *root = buffer_root(buffer); - off += root->offset; + off += buffer->offset; return MIN(root->length, off); } diff --git a/src/node_net2.cc b/src/node_net2.cc index cf2433e5b3..3d8a1fadd6 100644 --- a/src/node_net2.cc +++ b/src/node_net2.cc @@ -13,6 +13,13 @@ #include #include /* inet_pton */ +#include +#include + +#include +#include + + #include namespace node { @@ -427,7 +434,7 @@ static Handle Read(const Arguments& args) { return ThrowException(ErrnoException(errno, "read")); } - return Integer::New(bytes_read); + return scope.Close(Integer::New(bytes_read)); } // var bytesWritten = t.write(fd, buffer, offset, length); @@ -470,9 +477,28 @@ static Handle Write(const Arguments& args) { return ThrowException(ErrnoException(errno, "write")); } - return Integer::New(written); + return scope.Close(Integer::New(written)); } + +// Probably only works for Linux TCP sockets? +// Returns the amount of data on the read queue. +static Handle ToRead(const Arguments& args) { + HandleScope scope; + + FD_ARG(args[0]) + + int value; + int r = ioctl(fd, SIOCINQ, &value); + + if (r < 0) { + return ThrowException(ErrnoException(errno, "ioctl")); + } + + return scope.Close(Integer::New(value)); +} + + void InitNet2(Handle target) { HandleScope scope; @@ -490,6 +516,7 @@ void InitNet2(Handle target) { NODE_SET_METHOD(target, "listen", Listen); NODE_SET_METHOD(target, "accept", Accept); NODE_SET_METHOD(target, "getSocketError", GetSocketError); + NODE_SET_METHOD(target, "toRead", ToRead); target->Set(String::NewSymbol("EINPROGRESS"), Integer::New(EINPROGRESS)); diff --git a/tcp.js b/tcp.js index eafa144870..6dd29f8688 100644 --- a/tcp.js +++ b/tcp.js @@ -12,18 +12,47 @@ var listen = process.listen; var accept = process.accept; var close = process.close; var shutdown = process.shutdown; +var read = process.read; +var write = process.write; +var toRead = process.toRead; var Peer = function (peerInfo) { process.EventEmitter.call(); var self = this; - self.fd = peerInfo.fd; - self.remoteAddress = peerInfo.remoteAddress; - self.remotePort = peerInfo.remotePort; + process.mixin(self, peerInfo); + + // Allocated on demand. + self.recvBuffer = null; self.readWatcher = new process.IOWatcher(function () { - debug(self.fd + " readable"); + debug("\n" + self.fd + " readable"); + + // If this is the first recv (recvBuffer doesn't exist) or we've used up + // most of the recvBuffer, allocate a new one. + if (!self.recvBuffer || + self.recvBuffer.length - self.recvBufferBytesUsed < 128) { + self._allocateNewRecvBuf(); + } + + debug("recvBufferBytesUsed " + self.recvBufferBytesUsed); + var bytesRead = read(self.fd, + self.recvBuffer, + self.recvBufferBytesUsed, + self.recvBuffer.length - self.recvBufferBytesUsed); + debug("bytesRead " + bytesRead + "\n"); + + if (bytesRead == 0) { + self.readable = false; + self.readWatcher.stop(); + self.emit("eof"); + } else { + var slice = self.recvBuffer.slice(self.recvBufferBytesUsed, + self.recvBufferBytesUsed + bytesRead); + self.recvBufferBytesUsed += bytesRead; + self.emit("receive", slice); + } }); self.readWatcher.set(self.fd, true, false); self.readWatcher.start(); @@ -35,9 +64,36 @@ var Peer = function (peerInfo) { self.readable = true; self.writable = true; + + self._out = []; }; process.inherits(Peer, process.EventEmitter); +Peer.prototype._allocateNewRecvBuf = function () { + var self = this; + + var newBufferSize = 1024; // TODO make this adjustable from user API + + if (toRead) { + // Note: only Linux supports toRead(). + // Is the extra system call even worth it? + var bytesToRead = toRead(self.fd); + if (bytesToRead > 1024) { + newBufferSize = 4*1024; + } else if (bytesToRead == 0) { + // Probably getting an EOF - so let's not allocate so much. + if (self.recvBuffer && + self.recvBuffer.length - self.recvBufferBytesUsed > 0) { + return; // just recv the eof on the old buf. + } + newBufferSize = 128; + } + } + + self.recvBuffer = new process.Buffer(newBufferSize); + self.recvBufferBytesUsed = 0; +}; + Peer.prototype.close = function () { this.readable = false; this.writable = false; @@ -49,8 +105,6 @@ Peer.prototype.close = function () { this.fd = null; }; - - var Server = function (listener) { var self = this; @@ -73,14 +127,26 @@ var Server = function (listener) { }; process.inherits(Server, process.EventEmitter); -Server.prototype.listen = function (port, host) { +Server.prototype.listen = function () { var self = this; if (self.fd) throw new Error("Already running"); - self.fd = process.socket("TCP"); - // TODO dns resolution - bind(self.fd, port, host); + if (typeof(arguments[0]) == "string" && arguments.length == 1) { + // the first argument specifies a path + self.fd = process.socket("UNIX"); + // TODO unlink sockfile if exists? + // if (lstat(SOCKFILE, &tstat) == 0) { + // assert(S_ISSOCK(tstat.st_mode)); + // unlink(SOCKFILE); + // } + bind(self.fd, arguments[0]); + } else { + // the first argument is the port, the second an IP + self.fd = process.socket("TCP"); + // TODO dns resolution on arguments[1] + bind(self.fd, arguments[0], arguments[1]); + } listen(self.fd, 128); // TODO configurable backlog self.watcher.set(self.fd, true, false); @@ -97,7 +163,12 @@ Server.prototype.close = function () { /////////////////////////////////////////////////////// +process.Buffer.prototype.toString = function () { + return this.utf8Slice(0, this.length); +}; + var sys = require("sys"); + var server = new Server(function (peer) { sys.puts("connection (" + peer.fd + "): " + peer.remoteAddress @@ -105,7 +176,11 @@ var server = new Server(function (peer) { + peer.remotePort ); sys.puts("server fd: " + server.fd); - peer.close(); + + peer.addListener("receive", function (b) { + sys.puts("recv (" + b.length + "): " + b); + }); }); +//server.listen(8000); server.listen(8000); sys.puts("server fd: " + server.fd);