diff --git a/lib/net.js b/lib/net.js index 81d02a5a64..d0a2c5a627 100644 --- a/lib/net.js +++ b/lib/net.js @@ -20,7 +20,7 @@ // USE OR OTHER DEALINGS IN THE SOFTWARE. var events = require('events'); -var Stream = require('stream'); +var stream = require('stream'); var timers = require('timers'); var util = require('util'); var assert = require('assert'); @@ -42,16 +42,16 @@ function createTCP() { } -/* Bit flags for socket._flags */ -var FLAG_GOT_EOF = 1 << 0; -var FLAG_SHUTDOWN = 1 << 1; -var FLAG_DESTROY_SOON = 1 << 2; -var FLAG_SHUTDOWN_QUEUED = 1 << 3; - - var debug; if (process.env.NODE_DEBUG && /net/.test(process.env.NODE_DEBUG)) { - debug = function(x) { console.error('NET:', x); }; + var pid = process.pid; + debug = function(x) { + // if console is not set up yet, then skip this. + if (!console.error) + return; + console.error('NET: %d', pid, + util.format.apply(util, arguments).slice(0, 500)); + }; } else { debug = function() { }; } @@ -110,12 +110,8 @@ function normalizeConnectArgs(args) { exports._normalizeConnectArgs = normalizeConnectArgs; -/* called when creating new Socket, or when re-using a closed Socket */ +// called when creating new Socket, or when re-using a closed Socket function initSocketHandle(self) { - self._pendingWriteReqs = 0; - - self._flags = 0; - self._connectQueueSize = 0; self.destroyed = false; self.errorEmitted = false; self.bytesRead = 0; @@ -131,8 +127,6 @@ function initSocketHandle(self) { function Socket(options) { if (!(this instanceof Socket)) return new Socket(options); - Stream.call(this); - switch (typeof options) { case 'number': options = { fd: options }; // Legacy interface. @@ -142,7 +136,10 @@ function Socket(options) { break; } - if (typeof options.fd === 'undefined') { + this.readable = this.writable = false; + if (options.handle) { + this._handle = options.handle; // private + } else if (typeof options.fd === 'undefined') { this._handle = options && options.handle; // private } else { this._handle = createPipe(); @@ -150,17 +147,105 @@ function Socket(options) { this.readable = this.writable = true; } + this.onend = null; + + // shut down the socket when we're finished with it. + this.on('finish', onSocketFinish); + this.on('_socketEnd', onSocketEnd); + initSocketHandle(this); - this.allowHalfOpen = options && options.allowHalfOpen; + + this._pendingWrite = null; + + stream.Duplex.call(this, options); + + // handle strings directly + this._writableState.decodeStrings = false; + + // default to *not* allowing half open sockets + this.allowHalfOpen = options && options.allowHalfOpen || false; + + // if we have a handle, then start the flow of data into the + // buffer. if not, then this will happen when we connect + if (this._handle && (!options || options.readable !== false)) + this.read(0); +} +util.inherits(Socket, stream.Duplex); + +// the user has called .end(), and all the bytes have been +// sent out to the other side. +// If allowHalfOpen is false, or if the readable side has +// ended already, then destroy. +// If allowHalfOpen is true, then we need to do a shutdown, +// so that only the writable side will be cleaned up. +function onSocketFinish() { + debug('onSocketFinish'); + if (this._readableState.ended) { + debug('oSF: ended, destroy', this._readableState); + return this.destroy(); + } + + debug('oSF: not ended, call shutdown()'); + + // otherwise, just shutdown, or destroy() if not possible + if (!this._handle.shutdown) + return this.destroy(); + + var shutdownReq = this._handle.shutdown(); + + if (!shutdownReq) + return this._destroy(errnoException(errno, 'shutdown')); + + shutdownReq.oncomplete = afterShutdown; +} + + +function afterShutdown(status, handle, req) { + var self = handle.owner; + + debug('afterShutdown destroyed=%j', self.destroyed, + self._readableState); + + // callback may come after call to destroy. + if (self.destroyed) + return; + + if (self._readableState.ended) { + debug('readableState ended, destroying'); + self.destroy(); + } else { + self.once('_socketEnd', self.destroy); + } } -util.inherits(Socket, Stream); +// the EOF has been received, and no more bytes are coming. +// if the writable side has ended already, then clean everything +// up. +function onSocketEnd() { + // XXX Should not have to do as much crap in this function. + // ended should already be true, since this is called *after* + // the EOF errno and onread has returned null to the _read cb. + debug('onSocketEnd', this._readableState); + this._readableState.ended = true; + if (this._readableState.endEmitted) { + this.readable = false; + } else { + this.once('end', function() { + this.readable = false; + }); + this.read(0); + } + + if (!this.allowHalfOpen) + this.destroySoon(); +} exports.Socket = Socket; exports.Stream = Socket; // Legacy naming. Socket.prototype.listen = function() { + debug('socket.listen'); var self = this; self.on('connection', arguments[0]); listen(self, null, null, null); @@ -230,96 +315,62 @@ Object.defineProperty(Socket.prototype, 'readyState', { Object.defineProperty(Socket.prototype, 'bufferSize', { get: function() { if (this._handle) { - return this._handle.writeQueueSize + this._connectQueueSize; + return this._handle.writeQueueSize; } } }); -Socket.prototype.pause = function() { - this._paused = true; - if (this._handle && !this._connecting) { - this._handle.readStop(); +// Just call handle.readStart until we have enough in the buffer +Socket.prototype._read = function(n, callback) { + debug('_read'); + if (this._connecting || !this._handle) { + debug('_read wait for connection'); + this.once('connect', this._read.bind(this, n, callback)); + return; } -}; + assert(callback === this._readableState.onread); + assert(this._readableState.reading = true); -Socket.prototype.resume = function() { - this._paused = false; - if (this._handle && !this._connecting) { - this._handle.readStart(); + if (!this._handle.reading) { + debug('Socket._read readStart'); + this._handle.reading = true; + var r = this._handle.readStart(); + if (r) + this._destroy(errnoException(errno, 'read')); + } else { + debug('readStart already has been called.'); } }; Socket.prototype.end = function(data, encoding) { - if (this._connecting && ((this._flags & FLAG_SHUTDOWN_QUEUED) == 0)) { - // still connecting, add data to buffer - if (data) this.write(data, encoding); - this.writable = false; - this._flags |= FLAG_SHUTDOWN_QUEUED; - } - - if (!this.writable) return; + stream.Duplex.prototype.end.call(this, data, encoding); this.writable = false; - - if (data) this.write(data, encoding); DTRACE_NET_STREAM_END(this); - if (!this.readable) { - this.destroySoon(); - } else { - this._flags |= FLAG_SHUTDOWN; - var shutdownReq = this._handle.shutdown(); - - if (!shutdownReq) { - this._destroy(errnoException(errno, 'shutdown')); - return false; - } - - shutdownReq.oncomplete = afterShutdown; - } - - return true; + // just in case we're waiting for an EOF. + if (!this._readableState.endEmitted) + this.read(0); + return; }; -function afterShutdown(status, handle, req) { - var self = handle.owner; - - assert.ok(self._flags & FLAG_SHUTDOWN); - assert.ok(!self.writable); - - // callback may come after call to destroy. - if (self.destroyed) { - return; - } - - if (self._flags & FLAG_GOT_EOF || !self.readable) { - self._destroy(); - } else { - } -} - - Socket.prototype.destroySoon = function() { - this.writable = false; - this._flags |= FLAG_DESTROY_SOON; - - if (this._pendingWriteReqs == 0) { - this._destroy(); - } -}; - + if (this.writable) + this.end(); -Socket.prototype._connectQueueCleanUp = function(exception) { - this._connecting = false; - this._connectQueueSize = 0; - this._connectQueue = null; + if (this._writableState.finishing || this._writableState.finished) + this.destroy(); + else + this.once('finish', this.destroy); }; Socket.prototype._destroy = function(exception, cb) { + debug('destroy'); + var self = this; function fireErrorCallbacks() { @@ -333,13 +384,12 @@ Socket.prototype._destroy = function(exception, cb) { }; if (this.destroyed) { + debug('already destroyed, fire error callbacks'); fireErrorCallbacks(); return; } - self._connectQueueCleanUp(); - - debug('destroy'); + self._connecting = false; this.readable = this.writable = false; @@ -347,6 +397,8 @@ Socket.prototype._destroy = function(exception, cb) { debug('close'); if (this._handle) { + if (this !== process.stderr) + debug('close handle'); this._handle.close(); this._handle.onread = noop; this._handle = null; @@ -355,6 +407,7 @@ Socket.prototype._destroy = function(exception, cb) { fireErrorCallbacks(); process.nextTick(function() { + debug('emit close'); self.emit('close', exception ? true : false); }); @@ -362,6 +415,7 @@ Socket.prototype._destroy = function(exception, cb) { if (this.server) { COUNTER_NET_SERVER_CONNECTION_CLOSE(this); + debug('has server'); this.server._connections--; if (this.server._emitCloseIfDrained) { this.server._emitCloseIfDrained(); @@ -371,10 +425,13 @@ Socket.prototype._destroy = function(exception, cb) { Socket.prototype.destroy = function(exception) { + debug('destroy', exception); this._destroy(exception); }; +// This function is called whenever the handle gets a +// buffer, or when there's an error reading. function onread(buffer, offset, length) { var handle = this; var self = handle.owner; @@ -383,47 +440,56 @@ function onread(buffer, offset, length) { timers.active(self); var end = offset + length; + debug('onread', global.errno, offset, length, end); if (buffer) { - // Emit 'data' event. + debug('got data'); - if (self._decoder) { - // Emit a string. - var string = self._decoder.write(buffer.slice(offset, end)); - if (string.length) self.emit('data', string); - } else { - // Emit a slice. Attempt to avoid slicing the buffer if no one is - // listening for 'data'. - if (self._events && self._events['data']) { - self.emit('data', buffer.slice(offset, end)); - } + // read success. + // In theory (and in practice) calling readStop right now + // will prevent this from being called again until _read() gets + // called again. + + // if we didn't get any bytes, that doesn't necessarily mean EOF. + // wait for the next one. + if (offset === end) { + debug('not any data, keep waiting'); + return; } + // if it's not enough data, we'll just call handle.readStart() + // again right away. self.bytesRead += length; + self._readableState.onread(null, buffer.slice(offset, end)); + + if (handle.reading && !self._readableState.reading) { + handle.reading = false; + debug('readStop'); + var r = handle.readStop(); + if (r) + self._destroy(errnoException(errno, 'read')); + } // Optimization: emit the original buffer with end points if (self.ondata) self.ondata(buffer, offset, end); } else if (errno == 'EOF') { - // EOF - self.readable = false; + debug('EOF'); - assert.ok(!(self._flags & FLAG_GOT_EOF)); - self._flags |= FLAG_GOT_EOF; + if (self._readableState.length === 0) + self.readable = false; - // We call destroy() before end(). 'close' not emitted until nextTick so - // the 'end' event will come first as required. - if (!self.writable) self._destroy(); + if (self.onend) self.once('end', self.onend); - if (!self.allowHalfOpen) self.end(); - if (self._decoder) { - var ret = self._decoder.end(); - if (ret) - self.emit('data', ret); - } - if (self._events && self._events['end']) self.emit('end'); - if (self.onend) self.onend(); + // send a null to the _read cb to signal the end of data. + self._readableState.onread(null, null); + + // internal end event so that we know that the actual socket + // is no longer readable, and we can start the shutdown + // procedure. No need to wait for all the data to be consumed. + self.emit('_socketEnd'); } else { + debug('error', errno); // Error if (errno == 'ECONNRESET') { self._destroy(); @@ -434,12 +500,6 @@ function onread(buffer, offset, length) { } -Socket.prototype.setEncoding = function(encoding) { - var StringDecoder = require('string_decoder').StringDecoder; // lazy load - this._decoder = new StringDecoder(encoding); -}; - - Socket.prototype._getpeername = function() { if (!this._handle || !this._handle.getpeername) { return {}; @@ -465,63 +525,39 @@ Socket.prototype.__defineGetter__('remotePort', function() { }); -/* - * Arguments data, [encoding], [cb] - */ -Socket.prototype.write = function(data, arg1, arg2) { - var encoding, cb; +Socket.prototype.write = function(chunk, encoding, cb) { + if (typeof chunk !== 'string' && !Buffer.isBuffer(chunk)) + throw new TypeError('invalid data'); + return stream.Duplex.prototype.write.apply(this, arguments); +}; - // parse arguments - if (arg1) { - if (typeof arg1 === 'string') { - encoding = arg1; - cb = arg2; - } else if (typeof arg1 === 'function') { - cb = arg1; - } else { - throw new Error('bad arg'); - } - } - if (typeof data === 'string') { - encoding = (encoding || 'utf8').toLowerCase(); - switch (encoding) { - case 'utf8': - case 'utf-8': - case 'ascii': - case 'ucs2': - case 'ucs-2': - case 'utf16le': - case 'utf-16le': - // This encoding can be handled in the binding layer. - break; +Socket.prototype._write = function(dataEncoding, cb) { + assert(Array.isArray(dataEncoding)); + var data = dataEncoding[0]; + var encoding = dataEncoding[1] || 'utf8'; - default: - data = new Buffer(data, encoding); - } - } else if (!Buffer.isBuffer(data)) { - throw new TypeError('First argument must be a buffer or a string.'); - } + if (this !== process.stderr && this !== process.stdout) + debug('Socket._write'); // If we are still connecting, then buffer this for later. + // The Writable logic will buffer up any more writes while + // waiting for this one to be done. if (this._connecting) { - this._connectQueueSize += data.length; - if (this._connectQueue) { - this._connectQueue.push([data, encoding, cb]); - } else { - this._connectQueue = [[data, encoding, cb]]; - } - return false; + debug('_write: waiting for connection'); + this._pendingWrite = dataEncoding; + this.once('connect', function() { + debug('_write: connected now, try again'); + this._write(dataEncoding, cb); + }); + return; } + this._pendingWrite = null; - return this._write(data, encoding, cb); -}; - - -Socket.prototype._write = function(data, encoding, cb) { timers.active(this); if (!this._handle) { + debug('already destroyed'); this._destroy(new Error('This socket is closed.'), cb); return false; } @@ -550,39 +586,32 @@ Socket.prototype._write = function(data, encoding, cb) { break; default: - assert(0); + writeReq = this._handle.writeBuffer(new Buffer(data, encoding)); + break; } } - if (!writeReq || typeof writeReq !== 'object') { - this._destroy(errnoException(errno, 'write'), cb); - return false; - } + if (!writeReq || typeof writeReq !== 'object') + return this._destroy(errnoException(errno, 'write'), cb); writeReq.oncomplete = afterWrite; writeReq.cb = cb; - this._pendingWriteReqs++; this._bytesDispatched += writeReq.bytes; - - return this._handle.writeQueueSize == 0; }; Socket.prototype.__defineGetter__('bytesWritten', function() { var bytes = this._bytesDispatched, - connectQueue = this._connectQueue; + state = this._writableState, + pending = this._pendingWrite; - if (connectQueue) { - connectQueue.forEach(function(el) { - var data = el[0]; - if (Buffer.isBuffer(data)) { - bytes += data.length; - } else { - bytes += Buffer.byteLength(data, el[1]); - } - }, this); - } + state.buffer.forEach(function(el) { + bytes += Buffer.byteLength(el[0], el[1]); + }); + + if (pending) + bytes += Buffer.byteLength(pending[0], pending[1]); return bytes; }); @@ -590,30 +619,28 @@ Socket.prototype.__defineGetter__('bytesWritten', function() { function afterWrite(status, handle, req) { var self = handle.owner; + var state = self._writableState; + if (self !== process.stderr && self !== process.stdout) + debug('afterWrite', status, req); // callback may come after call to destroy. if (self.destroyed) { + debug('afterWrite destroyed'); return; } if (status) { + debug('write failure', errnoException(errno, 'write')); self._destroy(errnoException(errno, 'write'), req.cb); return; } timers.active(self); - self._pendingWriteReqs--; - - if (self._pendingWriteReqs == 0) { - self.emit('drain'); - } + if (self !== process.stderr && self !== process.stdout) + debug('afterWrite call cb'); - if (req.cb) req.cb(); - - if (self._pendingWriteReqs == 0 && self._flags & FLAG_DESTROY_SOON) { - self._destroy(); - } + req.cb.call(self); } @@ -663,10 +690,21 @@ Socket.prototype.connect = function(options, cb) { return Socket.prototype.connect.apply(this, args); } + if (this.destroyed) { + this._readableState.reading = false; + this._readableState.ended = false; + this._writableState.ended = false; + this._writableState.ending = false; + this._writableState.finished = false; + this._writableState.finishing = false; + this.destroyed = false; + this._handle = null; + } + var self = this; var pipe = !!options.path; - if (this.destroyed || !this._handle) { + if (!this._handle) { this._handle = pipe ? createPipe() : createTCP(); initSocketHandle(this); } @@ -755,28 +793,15 @@ function afterConnect(status, handle, req, readable, writable) { self.writable = writable; timers.active(self); - if (self.readable && !self._paused) { - handle.readStart(); - } - - if (self._connectQueue) { - debug('Drain the connect queue'); - var connectQueue = self._connectQueue; - for (var i = 0; i < connectQueue.length; i++) { - self._write.apply(self, connectQueue[i]); - } - self._connectQueueCleanUp(); - } - self.emit('connect'); - if (self._flags & FLAG_SHUTDOWN_QUEUED) { - // end called before connected - call end now with no data - self._flags &= ~FLAG_SHUTDOWN_QUEUED; - self.end(); - } + // start the first read, or get an immediate EOF. + // this doesn't actually consume any bytes, because len=0. + if (readable) + self.read(0); + } else { - self._connectQueueCleanUp(); + self._connecting = false; self._destroy(errnoException(errno, 'connect')); } } @@ -831,9 +856,9 @@ function Server(/* [ options, ] listener */) { configurable: true, enumerable: true }); - this.allowHalfOpen = options.allowHalfOpen || false; - this._handle = null; + + this.allowHalfOpen = options.allowHalfOpen || false; } util.inherits(Server, events.EventEmitter); exports.Server = Server; @@ -901,12 +926,14 @@ var createServerHandle = exports._createServerHandle = Server.prototype._listen2 = function(address, port, addressType, backlog, fd) { + debug('listen2', address, port, addressType, backlog); var self = this; var r = 0; // If there is not yet a handle, we need to create one and bind. // In the case of a server sent via IPC, we don't need to do this. if (!self._handle) { + debug('_listen2: create a handle'); self._handle = createServerHandle(address, port, addressType, fd); if (!self._handle) { var error = errnoException(errno, 'listen'); @@ -915,6 +942,8 @@ Server.prototype._listen2 = function(address, port, addressType, backlog, fd) { }); return; } + } else { + debug('_listen2: have a handle already'); } self._handle.onconnection = onconnection; @@ -1049,7 +1078,6 @@ function onconnection(clientHandle) { }); socket.readable = socket.writable = true; - clientHandle.readStart(); self._connections++; socket.server = self; @@ -1086,11 +1114,17 @@ Server.prototype.close = function(cb) { }; Server.prototype._emitCloseIfDrained = function() { + debug('SERVER _emitCloseIfDrained'); var self = this; - if (self._handle || self._connections) return; + if (self._handle || self._connections) { + debug('SERVER handle? %j connections? %d', + !!self._handle, self._connections); + return; + } process.nextTick(function() { + debug('SERVER: emit close'); self.emit('close'); }); };