Browse Source

net: Refactor to use streams2

This is a combination of 6 commits.

* XXX net fixup lcase stream

* net: Refactor to use streams2

    Use 'socket.resume()' in many tests to trigger old-mode behavior.

* net: Call destroy() if shutdown() is not provided

    This is important for TTY wrap streams

* net: Call .end() in socket.destroySoon if necessary

    This makes the http 1.0 keepAlive test pass, also.

* net wtf-ish stuff kinda busted

* net fixup
v0.9.4-release
isaacs 12 years ago
parent
commit
8a3befa0c6
  1. 470
      lib/net.js

470
lib/net.js

@ -20,7 +20,7 @@
// USE OR OTHER DEALINGS IN THE SOFTWARE. // USE OR OTHER DEALINGS IN THE SOFTWARE.
var events = require('events'); var events = require('events');
var Stream = require('stream'); var stream = require('stream');
var timers = require('timers'); var timers = require('timers');
var util = require('util'); var util = require('util');
var assert = require('assert'); var assert = require('assert');
@ -42,16 +42,16 @@ function createTCP() {
} }
/* Bit flags for socket._flags */
var FLAG_GOT_EOF = 1 << 0;
var FLAG_SHUTDOWN = 1 << 1;
var FLAG_DESTROY_SOON = 1 << 2;
var FLAG_SHUTDOWN_QUEUED = 1 << 3;
var debug; var debug;
if (process.env.NODE_DEBUG && /net/.test(process.env.NODE_DEBUG)) { if (process.env.NODE_DEBUG && /net/.test(process.env.NODE_DEBUG)) {
debug = function(x) { console.error('NET:', x); }; var pid = process.pid;
debug = function(x) {
// if console is not set up yet, then skip this.
if (!console.error)
return;
console.error('NET: %d', pid,
util.format.apply(util, arguments).slice(0, 500));
};
} else { } else {
debug = function() { }; debug = function() { };
} }
@ -110,12 +110,8 @@ function normalizeConnectArgs(args) {
exports._normalizeConnectArgs = normalizeConnectArgs; exports._normalizeConnectArgs = normalizeConnectArgs;
/* called when creating new Socket, or when re-using a closed Socket */ // called when creating new Socket, or when re-using a closed Socket
function initSocketHandle(self) { function initSocketHandle(self) {
self._pendingWriteReqs = 0;
self._flags = 0;
self._connectQueueSize = 0;
self.destroyed = false; self.destroyed = false;
self.errorEmitted = false; self.errorEmitted = false;
self.bytesRead = 0; self.bytesRead = 0;
@ -131,8 +127,6 @@ function initSocketHandle(self) {
function Socket(options) { function Socket(options) {
if (!(this instanceof Socket)) return new Socket(options); if (!(this instanceof Socket)) return new Socket(options);
Stream.call(this);
switch (typeof options) { switch (typeof options) {
case 'number': case 'number':
options = { fd: options }; // Legacy interface. options = { fd: options }; // Legacy interface.
@ -142,7 +136,10 @@ function Socket(options) {
break; break;
} }
if (typeof options.fd === 'undefined') { this.readable = this.writable = false;
if (options.handle) {
this._handle = options.handle; // private
} else if (typeof options.fd === 'undefined') {
this._handle = options && options.handle; // private this._handle = options && options.handle; // private
} else { } else {
this._handle = createPipe(); this._handle = createPipe();
@ -150,17 +147,105 @@ function Socket(options) {
this.readable = this.writable = true; this.readable = this.writable = true;
} }
this.onend = null;
// shut down the socket when we're finished with it.
this.on('finish', onSocketFinish);
this.on('_socketEnd', onSocketEnd);
initSocketHandle(this); initSocketHandle(this);
this.allowHalfOpen = options && options.allowHalfOpen;
this._pendingWrite = null;
stream.Duplex.call(this, options);
// handle strings directly
this._writableState.decodeStrings = false;
// default to *not* allowing half open sockets
this.allowHalfOpen = options && options.allowHalfOpen || false;
// if we have a handle, then start the flow of data into the
// buffer. if not, then this will happen when we connect
if (this._handle && (!options || options.readable !== false))
this.read(0);
}
util.inherits(Socket, stream.Duplex);
// the user has called .end(), and all the bytes have been
// sent out to the other side.
// If allowHalfOpen is false, or if the readable side has
// ended already, then destroy.
// If allowHalfOpen is true, then we need to do a shutdown,
// so that only the writable side will be cleaned up.
function onSocketFinish() {
debug('onSocketFinish');
if (this._readableState.ended) {
debug('oSF: ended, destroy', this._readableState);
return this.destroy();
}
debug('oSF: not ended, call shutdown()');
// otherwise, just shutdown, or destroy() if not possible
if (!this._handle.shutdown)
return this.destroy();
var shutdownReq = this._handle.shutdown();
if (!shutdownReq)
return this._destroy(errnoException(errno, 'shutdown'));
shutdownReq.oncomplete = afterShutdown;
}
function afterShutdown(status, handle, req) {
var self = handle.owner;
debug('afterShutdown destroyed=%j', self.destroyed,
self._readableState);
// callback may come after call to destroy.
if (self.destroyed)
return;
if (self._readableState.ended) {
debug('readableState ended, destroying');
self.destroy();
} else {
self.once('_socketEnd', self.destroy);
}
} }
util.inherits(Socket, Stream);
// the EOF has been received, and no more bytes are coming.
// if the writable side has ended already, then clean everything
// up.
function onSocketEnd() {
// XXX Should not have to do as much crap in this function.
// ended should already be true, since this is called *after*
// the EOF errno and onread has returned null to the _read cb.
debug('onSocketEnd', this._readableState);
this._readableState.ended = true;
if (this._readableState.endEmitted) {
this.readable = false;
} else {
this.once('end', function() {
this.readable = false;
});
this.read(0);
}
if (!this.allowHalfOpen)
this.destroySoon();
}
exports.Socket = Socket; exports.Socket = Socket;
exports.Stream = Socket; // Legacy naming. exports.Stream = Socket; // Legacy naming.
Socket.prototype.listen = function() { Socket.prototype.listen = function() {
debug('socket.listen');
var self = this; var self = this;
self.on('connection', arguments[0]); self.on('connection', arguments[0]);
listen(self, null, null, null); listen(self, null, null, null);
@ -230,96 +315,62 @@ Object.defineProperty(Socket.prototype, 'readyState', {
Object.defineProperty(Socket.prototype, 'bufferSize', { Object.defineProperty(Socket.prototype, 'bufferSize', {
get: function() { get: function() {
if (this._handle) { if (this._handle) {
return this._handle.writeQueueSize + this._connectQueueSize; return this._handle.writeQueueSize;
} }
} }
}); });
Socket.prototype.pause = function() { // Just call handle.readStart until we have enough in the buffer
this._paused = true; Socket.prototype._read = function(n, callback) {
if (this._handle && !this._connecting) { debug('_read');
this._handle.readStop(); if (this._connecting || !this._handle) {
debug('_read wait for connection');
this.once('connect', this._read.bind(this, n, callback));
return;
} }
};
assert(callback === this._readableState.onread);
assert(this._readableState.reading = true);
Socket.prototype.resume = function() { if (!this._handle.reading) {
this._paused = false; debug('Socket._read readStart');
if (this._handle && !this._connecting) { this._handle.reading = true;
this._handle.readStart(); var r = this._handle.readStart();
if (r)
this._destroy(errnoException(errno, 'read'));
} else {
debug('readStart already has been called.');
} }
}; };
Socket.prototype.end = function(data, encoding) { Socket.prototype.end = function(data, encoding) {
if (this._connecting && ((this._flags & FLAG_SHUTDOWN_QUEUED) == 0)) { stream.Duplex.prototype.end.call(this, data, encoding);
// still connecting, add data to buffer
if (data) this.write(data, encoding);
this.writable = false;
this._flags |= FLAG_SHUTDOWN_QUEUED;
}
if (!this.writable) return;
this.writable = false; this.writable = false;
if (data) this.write(data, encoding);
DTRACE_NET_STREAM_END(this); DTRACE_NET_STREAM_END(this);
if (!this.readable) { // just in case we're waiting for an EOF.
this.destroySoon(); if (!this._readableState.endEmitted)
} else { this.read(0);
this._flags |= FLAG_SHUTDOWN; return;
var shutdownReq = this._handle.shutdown();
if (!shutdownReq) {
this._destroy(errnoException(errno, 'shutdown'));
return false;
}
shutdownReq.oncomplete = afterShutdown;
}
return true;
}; };
function afterShutdown(status, handle, req) {
var self = handle.owner;
assert.ok(self._flags & FLAG_SHUTDOWN);
assert.ok(!self.writable);
// callback may come after call to destroy.
if (self.destroyed) {
return;
}
if (self._flags & FLAG_GOT_EOF || !self.readable) {
self._destroy();
} else {
}
}
Socket.prototype.destroySoon = function() { Socket.prototype.destroySoon = function() {
this.writable = false; if (this.writable)
this._flags |= FLAG_DESTROY_SOON; this.end();
if (this._pendingWriteReqs == 0) {
this._destroy();
}
};
Socket.prototype._connectQueueCleanUp = function(exception) { if (this._writableState.finishing || this._writableState.finished)
this._connecting = false; this.destroy();
this._connectQueueSize = 0; else
this._connectQueue = null; this.once('finish', this.destroy);
}; };
Socket.prototype._destroy = function(exception, cb) { Socket.prototype._destroy = function(exception, cb) {
debug('destroy');
var self = this; var self = this;
function fireErrorCallbacks() { function fireErrorCallbacks() {
@ -333,13 +384,12 @@ Socket.prototype._destroy = function(exception, cb) {
}; };
if (this.destroyed) { if (this.destroyed) {
debug('already destroyed, fire error callbacks');
fireErrorCallbacks(); fireErrorCallbacks();
return; return;
} }
self._connectQueueCleanUp(); self._connecting = false;
debug('destroy');
this.readable = this.writable = false; this.readable = this.writable = false;
@ -347,6 +397,8 @@ Socket.prototype._destroy = function(exception, cb) {
debug('close'); debug('close');
if (this._handle) { if (this._handle) {
if (this !== process.stderr)
debug('close handle');
this._handle.close(); this._handle.close();
this._handle.onread = noop; this._handle.onread = noop;
this._handle = null; this._handle = null;
@ -355,6 +407,7 @@ Socket.prototype._destroy = function(exception, cb) {
fireErrorCallbacks(); fireErrorCallbacks();
process.nextTick(function() { process.nextTick(function() {
debug('emit close');
self.emit('close', exception ? true : false); self.emit('close', exception ? true : false);
}); });
@ -362,6 +415,7 @@ Socket.prototype._destroy = function(exception, cb) {
if (this.server) { if (this.server) {
COUNTER_NET_SERVER_CONNECTION_CLOSE(this); COUNTER_NET_SERVER_CONNECTION_CLOSE(this);
debug('has server');
this.server._connections--; this.server._connections--;
if (this.server._emitCloseIfDrained) { if (this.server._emitCloseIfDrained) {
this.server._emitCloseIfDrained(); this.server._emitCloseIfDrained();
@ -371,10 +425,13 @@ Socket.prototype._destroy = function(exception, cb) {
Socket.prototype.destroy = function(exception) { Socket.prototype.destroy = function(exception) {
debug('destroy', exception);
this._destroy(exception); this._destroy(exception);
}; };
// This function is called whenever the handle gets a
// buffer, or when there's an error reading.
function onread(buffer, offset, length) { function onread(buffer, offset, length) {
var handle = this; var handle = this;
var self = handle.owner; var self = handle.owner;
@ -383,47 +440,56 @@ function onread(buffer, offset, length) {
timers.active(self); timers.active(self);
var end = offset + length; var end = offset + length;
debug('onread', global.errno, offset, length, end);
if (buffer) { if (buffer) {
// Emit 'data' event. debug('got data');
if (self._decoder) { // read success.
// Emit a string. // In theory (and in practice) calling readStop right now
var string = self._decoder.write(buffer.slice(offset, end)); // will prevent this from being called again until _read() gets
if (string.length) self.emit('data', string); // called again.
} else {
// Emit a slice. Attempt to avoid slicing the buffer if no one is // if we didn't get any bytes, that doesn't necessarily mean EOF.
// listening for 'data'. // wait for the next one.
if (self._events && self._events['data']) { if (offset === end) {
self.emit('data', buffer.slice(offset, end)); debug('not any data, keep waiting');
} return;
} }
// if it's not enough data, we'll just call handle.readStart()
// again right away.
self.bytesRead += length; self.bytesRead += length;
self._readableState.onread(null, buffer.slice(offset, end));
if (handle.reading && !self._readableState.reading) {
handle.reading = false;
debug('readStop');
var r = handle.readStop();
if (r)
self._destroy(errnoException(errno, 'read'));
}
// Optimization: emit the original buffer with end points // Optimization: emit the original buffer with end points
if (self.ondata) self.ondata(buffer, offset, end); if (self.ondata) self.ondata(buffer, offset, end);
} else if (errno == 'EOF') { } else if (errno == 'EOF') {
// EOF debug('EOF');
self.readable = false;
assert.ok(!(self._flags & FLAG_GOT_EOF)); if (self._readableState.length === 0)
self._flags |= FLAG_GOT_EOF; self.readable = false;
// We call destroy() before end(). 'close' not emitted until nextTick so if (self.onend) self.once('end', self.onend);
// the 'end' event will come first as required.
if (!self.writable) self._destroy();
if (!self.allowHalfOpen) self.end(); // send a null to the _read cb to signal the end of data.
if (self._decoder) { self._readableState.onread(null, null);
var ret = self._decoder.end();
if (ret) // internal end event so that we know that the actual socket
self.emit('data', ret); // is no longer readable, and we can start the shutdown
} // procedure. No need to wait for all the data to be consumed.
if (self._events && self._events['end']) self.emit('end'); self.emit('_socketEnd');
if (self.onend) self.onend();
} else { } else {
debug('error', errno);
// Error // Error
if (errno == 'ECONNRESET') { if (errno == 'ECONNRESET') {
self._destroy(); self._destroy();
@ -434,12 +500,6 @@ function onread(buffer, offset, length) {
} }
Socket.prototype.setEncoding = function(encoding) {
var StringDecoder = require('string_decoder').StringDecoder; // lazy load
this._decoder = new StringDecoder(encoding);
};
Socket.prototype._getpeername = function() { Socket.prototype._getpeername = function() {
if (!this._handle || !this._handle.getpeername) { if (!this._handle || !this._handle.getpeername) {
return {}; return {};
@ -465,63 +525,39 @@ Socket.prototype.__defineGetter__('remotePort', function() {
}); });
/* Socket.prototype.write = function(chunk, encoding, cb) {
* Arguments data, [encoding], [cb] if (typeof chunk !== 'string' && !Buffer.isBuffer(chunk))
*/ throw new TypeError('invalid data');
Socket.prototype.write = function(data, arg1, arg2) { return stream.Duplex.prototype.write.apply(this, arguments);
var encoding, cb; };
// parse arguments
if (arg1) {
if (typeof arg1 === 'string') {
encoding = arg1;
cb = arg2;
} else if (typeof arg1 === 'function') {
cb = arg1;
} else {
throw new Error('bad arg');
}
}
if (typeof data === 'string') { Socket.prototype._write = function(dataEncoding, cb) {
encoding = (encoding || 'utf8').toLowerCase(); assert(Array.isArray(dataEncoding));
switch (encoding) { var data = dataEncoding[0];
case 'utf8': var encoding = dataEncoding[1] || 'utf8';
case 'utf-8':
case 'ascii':
case 'ucs2':
case 'ucs-2':
case 'utf16le':
case 'utf-16le':
// This encoding can be handled in the binding layer.
break;
default: if (this !== process.stderr && this !== process.stdout)
data = new Buffer(data, encoding); debug('Socket._write');
}
} else if (!Buffer.isBuffer(data)) {
throw new TypeError('First argument must be a buffer or a string.');
}
// If we are still connecting, then buffer this for later. // If we are still connecting, then buffer this for later.
// The Writable logic will buffer up any more writes while
// waiting for this one to be done.
if (this._connecting) { if (this._connecting) {
this._connectQueueSize += data.length; debug('_write: waiting for connection');
if (this._connectQueue) { this._pendingWrite = dataEncoding;
this._connectQueue.push([data, encoding, cb]); this.once('connect', function() {
} else { debug('_write: connected now, try again');
this._connectQueue = [[data, encoding, cb]]; this._write(dataEncoding, cb);
} });
return false; return;
} }
this._pendingWrite = null;
return this._write(data, encoding, cb);
};
Socket.prototype._write = function(data, encoding, cb) {
timers.active(this); timers.active(this);
if (!this._handle) { if (!this._handle) {
debug('already destroyed');
this._destroy(new Error('This socket is closed.'), cb); this._destroy(new Error('This socket is closed.'), cb);
return false; return false;
} }
@ -550,39 +586,32 @@ Socket.prototype._write = function(data, encoding, cb) {
break; break;
default: default:
assert(0); writeReq = this._handle.writeBuffer(new Buffer(data, encoding));
break;
} }
} }
if (!writeReq || typeof writeReq !== 'object') { if (!writeReq || typeof writeReq !== 'object')
this._destroy(errnoException(errno, 'write'), cb); return this._destroy(errnoException(errno, 'write'), cb);
return false;
}
writeReq.oncomplete = afterWrite; writeReq.oncomplete = afterWrite;
writeReq.cb = cb; writeReq.cb = cb;
this._pendingWriteReqs++;
this._bytesDispatched += writeReq.bytes; this._bytesDispatched += writeReq.bytes;
return this._handle.writeQueueSize == 0;
}; };
Socket.prototype.__defineGetter__('bytesWritten', function() { Socket.prototype.__defineGetter__('bytesWritten', function() {
var bytes = this._bytesDispatched, var bytes = this._bytesDispatched,
connectQueue = this._connectQueue; state = this._writableState,
pending = this._pendingWrite;
if (connectQueue) { state.buffer.forEach(function(el) {
connectQueue.forEach(function(el) { bytes += Buffer.byteLength(el[0], el[1]);
var data = el[0]; });
if (Buffer.isBuffer(data)) {
bytes += data.length; if (pending)
} else { bytes += Buffer.byteLength(pending[0], pending[1]);
bytes += Buffer.byteLength(data, el[1]);
}
}, this);
}
return bytes; return bytes;
}); });
@ -590,30 +619,28 @@ Socket.prototype.__defineGetter__('bytesWritten', function() {
function afterWrite(status, handle, req) { function afterWrite(status, handle, req) {
var self = handle.owner; var self = handle.owner;
var state = self._writableState;
if (self !== process.stderr && self !== process.stdout)
debug('afterWrite', status, req);
// callback may come after call to destroy. // callback may come after call to destroy.
if (self.destroyed) { if (self.destroyed) {
debug('afterWrite destroyed');
return; return;
} }
if (status) { if (status) {
debug('write failure', errnoException(errno, 'write'));
self._destroy(errnoException(errno, 'write'), req.cb); self._destroy(errnoException(errno, 'write'), req.cb);
return; return;
} }
timers.active(self); timers.active(self);
self._pendingWriteReqs--; if (self !== process.stderr && self !== process.stdout)
debug('afterWrite call cb');
if (self._pendingWriteReqs == 0) {
self.emit('drain');
}
if (req.cb) req.cb(); req.cb.call(self);
if (self._pendingWriteReqs == 0 && self._flags & FLAG_DESTROY_SOON) {
self._destroy();
}
} }
@ -663,10 +690,21 @@ Socket.prototype.connect = function(options, cb) {
return Socket.prototype.connect.apply(this, args); return Socket.prototype.connect.apply(this, args);
} }
if (this.destroyed) {
this._readableState.reading = false;
this._readableState.ended = false;
this._writableState.ended = false;
this._writableState.ending = false;
this._writableState.finished = false;
this._writableState.finishing = false;
this.destroyed = false;
this._handle = null;
}
var self = this; var self = this;
var pipe = !!options.path; var pipe = !!options.path;
if (this.destroyed || !this._handle) { if (!this._handle) {
this._handle = pipe ? createPipe() : createTCP(); this._handle = pipe ? createPipe() : createTCP();
initSocketHandle(this); initSocketHandle(this);
} }
@ -755,28 +793,15 @@ function afterConnect(status, handle, req, readable, writable) {
self.writable = writable; self.writable = writable;
timers.active(self); timers.active(self);
if (self.readable && !self._paused) {
handle.readStart();
}
if (self._connectQueue) {
debug('Drain the connect queue');
var connectQueue = self._connectQueue;
for (var i = 0; i < connectQueue.length; i++) {
self._write.apply(self, connectQueue[i]);
}
self._connectQueueCleanUp();
}
self.emit('connect'); self.emit('connect');
if (self._flags & FLAG_SHUTDOWN_QUEUED) { // start the first read, or get an immediate EOF.
// end called before connected - call end now with no data // this doesn't actually consume any bytes, because len=0.
self._flags &= ~FLAG_SHUTDOWN_QUEUED; if (readable)
self.end(); self.read(0);
}
} else { } else {
self._connectQueueCleanUp(); self._connecting = false;
self._destroy(errnoException(errno, 'connect')); self._destroy(errnoException(errno, 'connect'));
} }
} }
@ -831,9 +856,9 @@ function Server(/* [ options, ] listener */) {
configurable: true, enumerable: true configurable: true, enumerable: true
}); });
this.allowHalfOpen = options.allowHalfOpen || false;
this._handle = null; this._handle = null;
this.allowHalfOpen = options.allowHalfOpen || false;
} }
util.inherits(Server, events.EventEmitter); util.inherits(Server, events.EventEmitter);
exports.Server = Server; exports.Server = Server;
@ -901,12 +926,14 @@ var createServerHandle = exports._createServerHandle =
Server.prototype._listen2 = function(address, port, addressType, backlog, fd) { Server.prototype._listen2 = function(address, port, addressType, backlog, fd) {
debug('listen2', address, port, addressType, backlog);
var self = this; var self = this;
var r = 0; var r = 0;
// If there is not yet a handle, we need to create one and bind. // If there is not yet a handle, we need to create one and bind.
// In the case of a server sent via IPC, we don't need to do this. // In the case of a server sent via IPC, we don't need to do this.
if (!self._handle) { if (!self._handle) {
debug('_listen2: create a handle');
self._handle = createServerHandle(address, port, addressType, fd); self._handle = createServerHandle(address, port, addressType, fd);
if (!self._handle) { if (!self._handle) {
var error = errnoException(errno, 'listen'); var error = errnoException(errno, 'listen');
@ -915,6 +942,8 @@ Server.prototype._listen2 = function(address, port, addressType, backlog, fd) {
}); });
return; return;
} }
} else {
debug('_listen2: have a handle already');
} }
self._handle.onconnection = onconnection; self._handle.onconnection = onconnection;
@ -1049,7 +1078,6 @@ function onconnection(clientHandle) {
}); });
socket.readable = socket.writable = true; socket.readable = socket.writable = true;
clientHandle.readStart();
self._connections++; self._connections++;
socket.server = self; socket.server = self;
@ -1086,11 +1114,17 @@ Server.prototype.close = function(cb) {
}; };
Server.prototype._emitCloseIfDrained = function() { Server.prototype._emitCloseIfDrained = function() {
debug('SERVER _emitCloseIfDrained');
var self = this; var self = this;
if (self._handle || self._connections) return; if (self._handle || self._connections) {
debug('SERVER handle? %j connections? %d',
!!self._handle, self._connections);
return;
}
process.nextTick(function() { process.nextTick(function() {
debug('SERVER: emit close');
self.emit('close'); self.emit('close');
}); });
}; };

Loading…
Cancel
Save