diff --git a/lib/net.js b/lib/net.js index a7f0f35c32..e33378c49d 100644 --- a/lib/net.js +++ b/lib/net.js @@ -34,6 +34,167 @@ var EINPROGRESS = process.EINPROGRESS; var ENOENT = process.ENOENT; var END_OF_FILE = 0; + +// IDLE TIMEOUTS +// +// Because often many sockets will have the same idle timeout we will not +// use one timeout watcher per socket. It is too much overhead. Instead +// we'll use a single watcher for all sockets with the same timeout value +// and a linked list. This technique is described in the libev manual: +// http://pod.tst.eu/http://cvs.schmorp.de/libev/ev.pod#Be_smart_about_timeouts + + +var timeout = new (function () { + // Object containing all lists, timers + // key = time in milliseconds + // value = list + var lists = {}; + + // show the most idle socket + function peek (list) { + if (list._idlePrev == list) return null; + return list._idlePrev; + } + + + // remove the most idle socket from the list + function shift (list) { + var first = list._idlePrev; + remove(first); + return first; + } + + + // remove a socket from its list + function remove (socket) { + socket._idleNext._idlePrev = socket._idlePrev; + socket._idlePrev._idleNext = socket._idleNext; + } + + + // remove a socket from its list and place at the end. + function append (list, socket) { + remove(socket); + socket._idleNext = list._idleNext; + socket._idleNext._idlePrev = socket; + socket._idlePrev = list + list._idleNext = socket; + } + + + function normalize (msecs) { + if (!msecs || msecs <= 0) return 0; + // round up to one sec + if (msecs < 1000) return 1000; + // round down to nearest second. + return msecs - (msecs % 1000); + } + + // the main function - creates lists on demand and the watchers associated + // with them. + function insert (socket, msecs) { + socket._idleStart = process.now; + socket._idleTimeout = msecs; + + if (!msecs) return; + + var list; + + if (lists[msecs]) { + list = lists[msecs]; + } else { + list = new process.Timer(); + list._idleNext = list; + list._idlePrev = list; + + lists[msecs] = list; + + list.callback = function () { + sys.puts('timeout callback ' + msecs); + // TODO - don't stop and start the watcher all the time. + // just set its repeat + var now = process.now; + var first; + while (first = peek(list)) { + var diff = now - first._idleStart; + if (diff < msecs) { + list.again(msecs - diff); + sys.puts(msecs + ' list wait'); + return; + } else { + remove(first); + assert(first != peek(list)); + first.emit('timeout'); + first.forceClose(new Error('idle timeout')); + } + } + sys.puts(msecs + ' list empty'); + assert(list._idleNext == list); // list is empty + list.stop(); + }; + } + + if (list._idleNext == list) { + // if empty (re)start the timer + list.again(msecs); + } + + append(list, socket); + assert(list._idleNext != list); // list is not empty + } + + + var unenroll = this.unenroll = function (socket) { + socket._idleNext._idlePrev = socket._idlePrev; + socket._idlePrev._idleNext = socket._idleNext; + + var list = lists[socket._idleTimeout]; + // if empty then stop the watcher + //sys.puts('unenroll'); + if (list && list._idlePrev == list) { + //sys.puts('unenroll: list empty'); + list.stop(); + } + }; + + + // Does not start the time, just sets up the members needed. + this.enroll = function (socket, msecs) { + // if this socket was already in a list somewhere + // then we should unenroll it from that + if (socket._idleNext) unenroll(socket); + + socket._idleTimeout = msecs; + socket._idleNext = socket; + socket._idlePrev = socket; + }; + + // call this whenever the socket is active (not idle) + // it will reset its timeout. + this.active = function (socket) { + var msecs = socket._idleTimeout; + if (msecs) { + var list = lists[msecs]; + if (socket._idleNext == socket) { + insert(socket, msecs); + } else { + // inline append + socket._idleStart = process.now; + socket._idleNext._idlePrev = socket._idlePrev; + socket._idlePrev._idleNext = socket._idleNext; + socket._idleNext = list._idleNext; + socket._idleNext._idlePrev = socket; + socket._idlePrev = list + list._idleNext = socket; + } + } + }; +})(); + + + + + // This is a free list to avoid creating so many of the same object. function FreeList (name, max, constructor) { @@ -43,29 +204,33 @@ function FreeList (name, max, constructor) { this.list = []; } + FreeList.prototype.alloc = function () { //debug("alloc " + this.name + " " + this.list.length); return this.list.length ? this.list.shift() : this.constructor.apply(this, arguments); -} +}; + FreeList.prototype.free = function (obj) { //debug("free " + this.name + " " + this.list.length); if (this.list.length < this.max) { this.list.push(obj); } -} +}; var ioWatchers = new FreeList("iowatcher", 100, function () { return new IOWatcher(); }); + var nb = 0; var buffers = new FreeList("buffer", 100, function (l) { - return new Buffer(500); + return new Buffer(l); }); + // Allocated on demand. var recvBuffer = null; function allocRecvBuffer () { @@ -73,6 +238,7 @@ function allocRecvBuffer () { recvBuffer.used = 0; } + function _doFlush () { var socket = this.socket; // Socket becomes writeable on connect() but don't flush if there's @@ -90,6 +256,8 @@ function _doFlush () { } function initSocket (self) { + timeout.enroll(self, 60*1000); // default timeout, 60 seconds + self._readWatcher = ioWatchers.alloc(); self._readWatcher.callback = function () { // If this is the first recv (recvBuffer doesn't exist) or we've used up @@ -140,6 +308,9 @@ function initSocket (self) { if (!self.writable) self.forceClose(); } else if (bytesRead > 0) { + + timeout.active(self); + var start = recvBuffer.used; var end = recvBuffer.used + bytesRead; @@ -215,6 +386,7 @@ exports.createConnection = function (port, host) { Object.defineProperty(Socket.prototype, 'readyState', { get: function () { + sys.error('readyState is depricated. Use stream.readable or stream.writable'); if (this.readable && this.writable) { return 'open'; } else if (this.readable && !this.writable){ @@ -396,6 +568,9 @@ Socket.prototype.flush = function () { return false; } + timeout.active(self); + + if (bytesWritten === null) { // EAGAIN debug('write EAGAIN'); @@ -459,6 +634,8 @@ Socket.prototype.connect = function () { var self = this; if (self.fd) throw new Error('Socket already opened'); + + timeout.active(socket); if (typeof(arguments[0]) == 'string') { self.fd = socket('unix'); @@ -477,28 +654,34 @@ Socket.prototype.connect = function () { } }; -var xxx = 0; - Socket.prototype.address = function () { return getsockname(this.fd); }; + Socket.prototype.setNoDelay = function (v) { if (this.type == 'tcp') setNoDelay(this.fd, v); }; +Socket.prototype.setTimeout = function (msecs) { + timeout.enroll(this, msecs); +}; + + Socket.prototype.pause = function () { this._readWatcher.stop(); }; + Socket.prototype.resume = function () { if (!this.fd) throw new Error('Cannot resume() closed Socket.'); this._readWatcher.set(this.fd, true, false); this._readWatcher.start(); }; + Socket.prototype.forceClose = function (exception) { // recvBuffer is shared between sockets, so don't need to free it here. var self = this; @@ -521,6 +704,8 @@ Socket.prototype.forceClose = function (exception) { this._readWatcher = null; } + timeout.unenroll(this); + if (this.fd) { close(this.fd); debug('close ' + this.fd); @@ -580,6 +765,7 @@ function Server (listener) { // The 'connect' event probably should be removed for server-side // sockets. It's redundent. peer.emit('connect'); + timeout.active(peer); } }; }