diff --git a/lib/http.js b/lib/http.js index 62b2ffe052..0555e699e2 100644 --- a/lib/http.js +++ b/lib/http.js @@ -114,19 +114,30 @@ function parserOnHeadersComplete(info) { return skipBody; } +// XXX This is a mess. +// TODO: http.Parser should be a Writable emits request/response events. function parserOnBody(b, start, len) { var parser = this; - var slice = b.slice(start, start + len); - if (parser.incoming._paused || parser.incoming._pendings.length) { - parser.incoming._pendings.push(slice); - } else { - parser.incoming._emitData(slice); + var stream = parser.incoming; + var rs = stream._readableState; + var socket = stream.socket; + + // pretend this was the result of a stream._read call. + if (len > 0) { + var slice = b.slice(start, start + len); + rs.onread(null, slice); } + + if (rs.length >= rs.highWaterMark) + socket.pause(); } function parserOnMessageComplete() { var parser = this; - parser.incoming.complete = true; + var stream = parser.incoming; + var socket = stream.socket; + + stream.complete = true; // Emit any trailing headers. var headers = parser._headers; @@ -140,19 +151,13 @@ function parserOnMessageComplete() { parser._url = ''; } - if (!parser.incoming.upgrade) { + if (!stream.upgrade) // For upgraded connections, also emit this after parser.execute - if (parser.incoming._paused || parser.incoming._pendings.length) { - parser.incoming._pendings.push(END_OF_FILE); - } else { - parser.incoming.readable = false; - parser.incoming._emitEnd(); - } - } + stream._readableState.onread(null, null); if (parser.socket.readable) { // force to read the next incoming message - parser.socket.resume(); + socket.resume(); } } @@ -263,9 +268,13 @@ function utcDate() { /* Abstract base class for ServerRequest and ClientResponse. */ function IncomingMessage(socket) { - Stream.call(this); + Stream.Readable.call(this); + + // XXX This implementation is kind of all over the place + // When the parser emits body chunks, they go in this list. + // _read() pulls them out, and when it finds EOF, it ends. + this._pendings = []; - // TODO Remove one of these eventually. this.socket = socket; this.connection = socket; @@ -276,77 +285,49 @@ function IncomingMessage(socket) { this.readable = true; - this._paused = false; this._pendings = []; - - this._endEmitted = false; + this._pendingIndex = 0; // request (server) only this.url = ''; - this.method = null; // response (client) only this.statusCode = null; this.client = this.socket; + + // flag for backwards compatibility grossness. + this._consuming = false; } -util.inherits(IncomingMessage, Stream); +util.inherits(IncomingMessage, Stream.Readable); exports.IncomingMessage = IncomingMessage; -IncomingMessage.prototype.destroy = function(error) { - this.socket.destroy(error); +IncomingMessage.prototype.read = function(n) { + this._consuming = true; + return Stream.Readable.prototype.read.call(this, n); }; -IncomingMessage.prototype.setEncoding = function(encoding) { - var StringDecoder = require('string_decoder').StringDecoder; // lazy load - this._decoder = new StringDecoder(encoding); -}; - - -IncomingMessage.prototype.pause = function() { - this._paused = true; - this.socket.pause(); +IncomingMessage.prototype._read = function(n, callback) { + // We actually do almost nothing here, because the parserOnBody + // function fills up our internal buffer directly. However, we + // do need to unpause the underlying socket so that it flows. + if (!this.socket.readable) + return callback(null, null); + else + this.socket.resume(); }; -IncomingMessage.prototype.resume = function() { - this._paused = false; - if (this.socket) { - this.socket.resume(); - } - - this._emitPending(); +IncomingMessage.prototype.destroy = function(error) { + this.socket.destroy(error); }; -IncomingMessage.prototype._emitPending = function(callback) { - if (this._pendings.length) { - var self = this; - process.nextTick(function() { - while (!self._paused && self._pendings.length) { - var chunk = self._pendings.shift(); - if (chunk !== END_OF_FILE) { - assert(Buffer.isBuffer(chunk)); - self._emitData(chunk); - } else { - assert(self._pendings.length === 0); - self.readable = false; - self._emitEnd(); - } - } - if (callback) { - callback(); - } - }); - } else if (callback) { - callback(); - } -}; IncomingMessage.prototype._emitData = function(d) { @@ -1016,7 +997,7 @@ ServerResponse.prototype.writeHead = function(statusCode) { // don't keep alive connections where the client expects 100 Continue // but we sent a final status; they may put extra bytes on the wire. - if (this._expect_continue && ! this._sent100) { + if (this._expect_continue && !this._sent100) { this.shouldKeepAlive = false; } @@ -1321,11 +1302,10 @@ function socketCloseListener() { // Socket closed before we emitted 'end' below. req.res.emit('aborted'); var res = req.res; - req.res._emitPending(function() { - res._emitEnd(); + res.on('end', function() { res.emit('close'); - res = null; }); + res._readableState.onread(null, null); } else if (!req.res && !req._hadError) { // This socket error fired before we started to // receive a response. The error needs to @@ -1428,11 +1408,13 @@ function socketOnData(d, start, end) { } +// client function parserOnIncomingClient(res, shouldKeepAlive) { var parser = this; var socket = this.socket; var req = socket._httpMessage; + // propogate "domain" setting... if (req.domain && !res.domain) { debug('setting "res.domain"'); @@ -1480,15 +1462,21 @@ function parserOnIncomingClient(res, shouldKeepAlive) { DTRACE_HTTP_CLIENT_RESPONSE(socket, req); COUNTER_HTTP_CLIENT_RESPONSE(); - req.emit('response', res); req.res = res; res.req = req; - + var handled = req.emit('response', res); res.on('end', responseOnEnd); + // If the user did not listen for the 'response' event, then they + // can't possibly read the data, so we .resume() it into the void + // so that the socket doesn't hang there in a paused state. + if (!handled) + res.resume(); + return isHeadResponse; } +// client function responseOnEnd() { var res = this; var req = res.req; @@ -1784,7 +1772,7 @@ function connectionListener(socket) { incoming.push(req); var res = new ServerResponse(req); - debug('server response shouldKeepAlive: ' + shouldKeepAlive); + res.shouldKeepAlive = shouldKeepAlive; DTRACE_HTTP_SERVER_REQUEST(req, socket); COUNTER_HTTP_SERVER_REQUEST(); @@ -1806,6 +1794,12 @@ function connectionListener(socket) { incoming.shift(); + // if the user never called req.read(), and didn't pipe() or + // .resume() or .on('data'), then we call req.resume() so that the + // bytes will be pulled off the wire. + if (!req._consuming) + req.resume(); + res.detachSocket(socket); if (res._last) {