diff --git a/lib/net.js b/lib/net.js index 16826a5a4d..cf2a8bf8a1 100644 --- a/lib/net.js +++ b/lib/net.js @@ -7,6 +7,7 @@ function debug (x) { } +var assert = process.assert; var socket = process.socket; var bind = process.bind; var connect = process.connect; @@ -18,6 +19,9 @@ var read = process.read; var write = process.write; var toRead = process.toRead; var socketError = process.socketError; +var getsockname = process.getsockname; +var getaddrinfo = process.getaddrinfo; +var needsLookup = process.needsLookup; var EINPROGRESS = process.EINPROGRESS; @@ -28,11 +32,8 @@ function Stream (peerInfo) { // Allocated on demand. self.recvBuffer = null; - self.sendQueue = []; self.readWatcher = new process.IOWatcher(function () { - debug('\n' + self.fd + ' readable'); - // If this is the first recv (recvBuffer doesn't exist) or we've used up // most of the recvBuffer, allocate a new one. if (!self.recvBuffer || @@ -59,14 +60,19 @@ function Stream (peerInfo) { self.emit('receive', slice); } }); + self.readable = false; - self._onWriteFlush = function () { - self.flush(); + self.sendQueue = []; // queue of buffers that need to be written to socket + // XXX use link list? + self.sendQueueSize = 0; // in bytes, not to be confused with sendQueue.length! + self._doFlush = function () { + assert(self.sendQueueSize > 0); + if (self.flush()) { + assert(self.sendQueueSize == 0); + self.emit("drain"); + } }; - - self.writeWatcher = new process.IOWatcher(self._onWriteFlush); - - self.readable = false; + self.writeWatcher = new process.IOWatcher(self._doFlush); self.writable = false; if (peerInfo) { @@ -76,8 +82,9 @@ function Stream (peerInfo) { self.readWatcher.set(self.fd, true, false); self.readWatcher.start(); - self.writeWatcher.set(self.fd, false, true); self.readable = true; + + self.writeWatcher.set(self.fd, false, true); self.writable = true; } }; @@ -85,6 +92,13 @@ process.inherits(Stream, process.EventEmitter); exports.Stream = Stream; +exports.createConnection = function (port, host) { + var s = new Stream(); + s.connect(port, host); + return s; +}; + + Stream.prototype._allocateNewRecvBuf = function () { var self = this; @@ -122,6 +136,7 @@ Stream.prototype._allocateSendBuffer = function () { Stream.prototype._sendString = function (data, encoding) { var self = this; + if (!self.writable) throw new Error('Stream is not writable'); var buffer; if (self.sendQueue.length == 0) { buffer = self._allocateSendBuffer(); @@ -144,22 +159,26 @@ Stream.prototype._sendString = function (data, encoding) { encoding = encoding || 'ascii'; // default to ascii since it's faster var charsWritten; + var bytesWritten; if (encoding.toLowerCase() == 'utf8') { charsWritten = buffer.utf8Write(data, buffer.used, buffer.length - buffer.used); - buffer.used += process.Buffer.utf8Length(data.slice(0, charsWritten)); + bytesWritten = process.Buffer.utf8Length(data.slice(0, charsWritten)); } else { // ascii charsWritten = buffer.asciiWrite(data, buffer.used, buffer.length - buffer.used); - buffer.used += charsWritten; - debug('ascii charsWritten ' + charsWritten); - debug('ascii buffer.used ' + buffer.used); + bytesWritten = charsWritten; } + + buffer.used += bytesWritten; + self.sendQueueSize += bytesWritten; + debug('charsWritten ' + charsWritten); + debug('buffer.used ' + buffer.used); // If we didn't finish, then recurse with the rest of the string. if (charsWritten < data.length) { @@ -169,8 +188,12 @@ Stream.prototype._sendString = function (data, encoding) { }; +// Returns true if all the data was flushed to socket. Returns false if +// something was queued. If data was queued, then the "drain" event will +// signal when it has been finally flushed to socket. Stream.prototype.send = function (data, encoding) { var self = this; + if (!self.writable) throw new Error('Stream is not writable'); if (typeof(data) == 'string') { self._sendString(data, encoding); } else { @@ -189,15 +212,18 @@ Stream.prototype.send = function (data, encoding) { } if (!inserted) self.sendQueue.push(data); + + self.sendQueueSize += data.used; } - this.flush(); + return this.flush(); }; -// returns true if flushed without getting EAGAIN -// false if it got EAGAIN +// Flushes the write buffer out. Emits "drain" if the buffer is empty. Stream.prototype.flush = function () { var self = this; + if (!self.writable) throw new Error('Stream is not writable'); + var bytesWritten; while (self.sendQueue.length > 0) { var b = self.sendQueue[0]; @@ -213,13 +239,16 @@ Stream.prototype.flush = function () { b.sent, b.used - b.sent); if (bytesWritten === null) { - this.writeWatcher.start(); + // could not flush everything + self.writeWatcher.start(); + assert(self.sendQueueSize > 0); return false; } b.sent += bytesWritten; + self.sendQueueSize -= bytesWritten; debug('bytes sent: ' + b.sent); } - this.writeWatcher.stop(); + self.writeWatcher.stop(); return true; }; @@ -261,17 +290,15 @@ Stream.prototype.connect = function () { var errno = socketError(self.fd); if (errno == 0) { // connection established - self.emit('connect'); self.readWatcher.start(); self.readable = true; self.writable = true; - self.writeWatcher.callback = self._onWriteFlush; + self.writeWatcher.callback = self._doFlush; + self.emit('connect'); } else if (errno != EINPROGRESS) { var e = new Error('connection error'); e.errno = errno; - self.readWatcher.stop(); - self.writeWatcher.stop(); - close(self.fd); + self.forceClose(e); } }; }; @@ -292,17 +319,35 @@ Stream.prototype.forceClose = function (exception) { }; -Stream.prototype.close = function () { - if (this.readable && this.writable) { +Stream.prototype._shutdown = function () { + if (this.writable) { this.writable = false; shutdown(this.fd, "write"); - } else if (!this.readable && this.writable) { + } +}; + + +Stream.prototype.close = function () { + var self = this; + var closeMethod; + if (self.readable && self.writable) { + closeMethod = self._shutdown; + } else if (!self.readable && self.writable) { // already got EOF - this.forceClose(this.fd); + closeMethod = self.forceClose; } // In the case we've already shutdown write side, // but haven't got EOF: ignore. In the case we're // fully closed already: ignore. + + if (closeMethod) { + if (self.sendQueueSize == 0) { + // no queue. just shut down the socket. + closeMethod(); + } else { + self.addListener("drain", closeMethod); + } + } }; @@ -327,30 +372,43 @@ process.inherits(Server, process.EventEmitter); exports.Server = Server; +exports.createServer = function (listener) { + return new Server(listener); +}; + + Server.prototype.listen = function () { var self = this; if (self.fd) throw new Error('Server already opened'); - var backlogIndex; if (typeof(arguments[0]) == 'string' && arguments.length == 1) { // the first argument specifies a path self.fd = process.socket('UNIX'); + self.type = 'UNIX'; // TODO unlink sockfile if exists? // if (lstat(SOCKFILE, &tstat) == 0) { // assert(S_ISSOCK(tstat.st_mode)); // unlink(SOCKFILE); // } bind(self.fd, arguments[0]); - backlogIndex = 1; + } else if (arguments.length == 0) { + self.fd = process.socket('TCP'); + self.type = 'TCP'; + // Don't bind(). OS will assign a port with INADDR_ANY. The port will be + // passed to the 'listening' event. } else { // the first argument is the port, the second an IP self.fd = process.socket('TCP'); + self.type = 'TCP'; + if (needsLookup(arguments[1])) { + getaddrinfo(arguments[1], function (ip) { + }); + } // TODO dns resolution on arguments[1] bind(self.fd, arguments[0], arguments[1]); - backlogIndex = typeof(arguments[1]) == 'string' ? 2 : 1; } - listen(self.fd, arguments[backlogIndex] ? arguments[backlogIndex] : 128); + listen(self.fd, 128); self.emit("listening"); self.watcher.set(self.fd, true, false); @@ -358,10 +416,15 @@ Server.prototype.listen = function () { }; +Server.prototype.sockName = function () { + return getsockname(self.fd); +}; + + Server.prototype.close = function () { - var self = this; - if (!self.fd) throw new Error('Not running'); - self.watcher.stop(); - close(self.fd); - self.fd = null; + if (!this.fd) throw new Error('Not running'); + this.watcher.stop(); + close(this.fd); + this.fd = null; + this.emit("close"); }; diff --git a/src/node_net2.cc b/src/node_net2.cc index af96216e20..3457cdbf54 100644 --- a/src/node_net2.cc +++ b/src/node_net2.cc @@ -5,6 +5,7 @@ #include #include +#include #include #include @@ -32,6 +33,8 @@ static Persistent syscall_symbol; static Persistent fd_symbol; static Persistent remote_address_symbol; static Persistent remote_port_symbol; +static Persistent address_symbol; +static Persistent port_symbol; #define FD_ARG(a) \ if (!(a)->IsInt32()) { \ @@ -181,11 +184,11 @@ static inline Handle ParseAddressArgs(Handle first, char ipv6[255] = "::FFFF:"; if (inet_pton(AF_INET, *ip, &(in6.sin6_addr)) > 0) { - // If this is an IPv4 address then we need to change it - // to the IPv4-mapped-on-IPv6 format which looks like + // If this is an IPv4 address then we need to change it + // to the IPv4-mapped-on-IPv6 format which looks like // ::FFFF: - // For more information see "Address Format" ipv6(7) and - // "BUGS" in inet_pton(3) + // For more information see "Address Format" ipv6(7) and + // "BUGS" in inet_pton(3) strcat(ipv6, *ip); } else { strcpy(ipv6, *ip); @@ -313,6 +316,38 @@ static Handle Connect(const Arguments& args) { } +static Handle GetSockName(const Arguments& args) { + HandleScope scope; + + FD_ARG(args[0]) + + struct sockaddr_storage address_storage; + socklen_t len = sizeof(struct sockaddr_storage); + + int r = getsockname(fd, (struct sockaddr *) &address_storage, &len); + + if (r < 0) { + return ThrowException(ErrnoException(errno, "getsockname")); + } + + Local info = Object::New(); + + if (address_storage.ss_family == AF_INET6) { + struct sockaddr_in6 *a = (struct sockaddr_in6*)&address_storage; + + char ip[INET6_ADDRSTRLEN]; + inet_ntop(AF_INET6, &(a->sin6_addr), ip, INET6_ADDRSTRLEN); + + int port = ntohs(a->sin6_port); + + info->Set(address_symbol, String::New(ip)); + info->Set(port_symbol, Integer::New(port)); + } + + return scope.Close(info); +} + + static Handle Listen(const Arguments& args) { HandleScope scope; @@ -323,6 +358,7 @@ static Handle Listen(const Arguments& args) { return ThrowException(ErrnoException(errno, "listen")); } + return Undefined(); } @@ -499,6 +535,167 @@ static Handle ToRead(const Arguments& args) { } +// G E T A D D R I N F O + +struct resolve_request { + Persistent cb; + int ai_family; // AF_INET or AF_INET6 + char hostname[1]; +}; + + +static int AfterResolve(eio_req *req) { + ev_unref(EV_DEFAULT_UC); + + struct resolve_request * rreq = (struct resolve_request *)(req->data); + + struct addrinfo *address = NULL, + *address_list = static_cast(req->ptr2); + + HandleScope scope; + Local argv[1]; + + if (req->result != 0) { + argv[0] = ErrnoException(errno, "getaddrinfo"); + } else { + int n = 0; + for (address = address_list; address; address = address->ai_next) { n++; } + + Local results = Array::New(n); + + char ip[INET6_ADDRSTRLEN]; + + n = 0; + address = address_list; + while (address) { + HandleScope scope; + assert(address->ai_family == AF_INET || address->ai_family == AF_INET6); + assert(address->ai_socktype == SOCK_STREAM); + const char *c = inet_ntop(address->ai_family, &(address->ai_addr), ip, INET6_ADDRSTRLEN); + Local s = String::New(c); + results->Set(Integer::New(n), s); + + n++; + address = address->ai_next; + } + + argv[0] = results; + } + + TryCatch try_catch; + + rreq->cb->Call(Context::GetCurrent()->Global(), 1, argv); + + if (try_catch.HasCaught()) { + FatalException(try_catch); + } + + rreq->cb.Dispose(); // Dispose of the persistent handle + free(rreq); + freeaddrinfo(address_list); +} + +static int Resolve(eio_req *req) { + // Note: this function is executed in the thread pool! CAREFUL + struct resolve_request * rreq = (struct resolve_request *) req->data; + struct addrinfo *address_list = NULL; + + struct addrinfo hints; + memset(&hints, 0, sizeof(struct addrinfo)); + hints.ai_family = rreq->ai_family; + hints.ai_socktype = SOCK_STREAM; + + req->result = getaddrinfo((char*)rreq->hostname, NULL, &hints, &address_list); + req->ptr2 = address_list; + return 0; +} + + +static Handle GetAddrInfo(const Arguments& args) { + HandleScope scope; + + String::Utf8Value hostname(args[0]->ToString()); + + int type = args[1]->Int32Value(); + int fam = AF_INET; + switch (type) { + case 4: + fam = AF_INET; + break; + case 6: + fam = AF_INET6; + break; + default: + return ThrowException(Exception::TypeError( + String::New("Second argument must be an integer 4 or 6"))); + } + + if (!args[2]->IsFunction()) { + return ThrowException(Exception::TypeError( + String::New("Thrid argument must be a callback"))); + } + + Local cb = Local::Cast(args[2]); + + struct resolve_request *rreq = (struct resolve_request *) + malloc(sizeof(struct resolve_request) + hostname.length()); + + if (!rreq) { + V8::LowMemoryNotification(); + return ThrowException(Exception::Error( + String::New("Could not allocate enough memory"))); + } + + strcpy(rreq->hostname, *hostname); + rreq->cb = Persistent::New(cb); + rreq->ai_family = fam; + + // For the moment I will do DNS lookups in the eio thread pool. This is + // sub-optimal and cannot handle massive numbers of requests. + // + // (One particularly annoying problem is that the pthread stack size needs + // to be increased dramatically to handle getaddrinfo() see X_STACKSIZE in + // wscript ). + // + // In the future I will move to a system using c-ares: + // http://lists.schmorp.de/pipermail/libev/2009q1/000632.html + eio_custom(Resolve, EIO_PRI_DEFAULT, AfterResolve, rreq); + + // There will not be any active watchers from this object on the event + // loop while getaddrinfo() runs. If the only thing happening in the + // script was this hostname resolution, then the event loop would drop + // out. Thus we need to add ev_ref() until AfterResolve(). + ev_ref(EV_DEFAULT_UC); + + return Undefined(); +} + + +static Handle NeedsLookup(const Arguments& args) { + HandleScope scope; + + if (args[0]->IsNull() || args[0]->IsUndefined()) return False(); + + String::Utf8Value s(args[0]->ToString()); + + // avoiding buffer overflows in the following strcat + // 2001:0db8:85a3:08d3:1319:8a2e:0370:7334 + // 39 = max ipv6 address. + if (s.length() > INET6_ADDRSTRLEN) return True(); + + struct sockaddr_in6 a; + + if (inet_pton(AF_INET, *s, &(a.sin6_addr)) > 0) return False(); + if (inet_pton(AF_INET6, *s, &(a.sin6_addr)) > 0) return False(); + + char ipv6[255] = "::FFFF:"; + strcat(ipv6, *s); + if (inet_pton(AF_INET6, ipv6, &(a.sin6_addr)) > 0) return False(); + + return True(); +} + + void InitNet2(Handle target) { HandleScope scope; @@ -517,7 +714,9 @@ void InitNet2(Handle target) { NODE_SET_METHOD(target, "accept", Accept); NODE_SET_METHOD(target, "socketError", SocketError); NODE_SET_METHOD(target, "toRead", ToRead); - + NODE_SET_METHOD(target, "getsocksame", GetSockName); + NODE_SET_METHOD(target, "getaddrinfo", GetAddrInfo); + NODE_SET_METHOD(target, "needsLookup", NeedsLookup); target->Set(String::NewSymbol("EINPROGRESS"), Integer::New(EINPROGRESS)); target->Set(String::NewSymbol("EINTR"), Integer::New(EINTR)); @@ -525,12 +724,14 @@ void InitNet2(Handle target) { target->Set(String::NewSymbol("EPERM"), Integer::New(EPERM)); target->Set(String::NewSymbol("EADDRINUSE"), Integer::New(EADDRINUSE)); target->Set(String::NewSymbol("ECONNREFUSED"), Integer::New(ECONNREFUSED)); - + errno_symbol = NODE_PSYMBOL("errno"); syscall_symbol = NODE_PSYMBOL("syscall"); fd_symbol = NODE_PSYMBOL("fd"); remote_address_symbol = NODE_PSYMBOL("remoteAddress"); remote_port_symbol = NODE_PSYMBOL("remotePort"); + address_symbol = NODE_PSYMBOL("address"); + port_symbol = NODE_PSYMBOL("port"); } } // namespace node diff --git a/test-net-server.js b/test-net-server.js index aabd7f2b2e..6968eae066 100644 --- a/test-net-server.js +++ b/test-net-server.js @@ -19,6 +19,10 @@ var server = new net.Server(function (stream) { stream.send("pong utf8\r\n", "utf8"); }); + stream.addListener('drain', function () { + sys.puts("server-side socket drain"); + }); + stream.addListener("eof", function () { sys.puts("server peer eof"); stream.close(); @@ -28,15 +32,17 @@ server.listen(8000); sys.puts("server fd: " + server.fd); -var stream = new net.Stream(); -stream.addListener('connect', function () { +var c = net.createConnection(8000); +c.addListener('connect', function () { sys.puts("!!!client connected"); - stream.send("hello\n"); + c.send("hello\n"); }); -stream.addListener('receive', function (d) { - sys.puts("!!!client got: " + JSON.stringify(d.toString())); +c.addListener('drain', function () { + sys.puts("!!!client drain"); }); -stream.connect(8000); +c.addListener('receive', function (d) { + sys.puts("!!!client got: " + JSON.stringify(d.toString())); +}); diff --git a/wscript b/wscript index d2e13c9339..43a7a5322e 100644 --- a/wscript +++ b/wscript @@ -139,7 +139,7 @@ def configure(conf): conf.define("HAVE_CONFIG_H", 1) - conf.env.append_value("CCFLAGS", "-DX_STACKSIZE=%d" % (1024*64)) + conf.env.append_value("CCFLAGS", "-DX_STACKSIZE=%d" % (2*1024*1024)) # LFS conf.env.append_value('CCFLAGS', '-D_LARGEFILE_SOURCE')