Browse Source

http: Refactor for streams2

Because of some of the peculiarities of http, this has a bit of special
magic to handle cases where the IncomingMessage would wait forever in a
paused state.

In the server, if you do not begin consuming the request body by the
time the response emits 'finish', then it will be flushed out.

In the client, if you do not add a 'response' handler onto the request,
then the response stream will be flushed out.
v0.9.4-release
isaacs 12 years ago
parent
commit
1d369317ea
  1. 134
      lib/http.js

134
lib/http.js

@ -114,19 +114,30 @@ function parserOnHeadersComplete(info) {
return skipBody; return skipBody;
} }
// XXX This is a mess.
// TODO: http.Parser should be a Writable emits request/response events.
function parserOnBody(b, start, len) { function parserOnBody(b, start, len) {
var parser = this; var parser = this;
var slice = b.slice(start, start + len); var stream = parser.incoming;
if (parser.incoming._paused || parser.incoming._pendings.length) { var rs = stream._readableState;
parser.incoming._pendings.push(slice); var socket = stream.socket;
} else {
parser.incoming._emitData(slice); // pretend this was the result of a stream._read call.
if (len > 0) {
var slice = b.slice(start, start + len);
rs.onread(null, slice);
} }
if (rs.length >= rs.highWaterMark)
socket.pause();
} }
function parserOnMessageComplete() { function parserOnMessageComplete() {
var parser = this; var parser = this;
parser.incoming.complete = true; var stream = parser.incoming;
var socket = stream.socket;
stream.complete = true;
// Emit any trailing headers. // Emit any trailing headers.
var headers = parser._headers; var headers = parser._headers;
@ -140,19 +151,13 @@ function parserOnMessageComplete() {
parser._url = ''; parser._url = '';
} }
if (!parser.incoming.upgrade) { if (!stream.upgrade)
// For upgraded connections, also emit this after parser.execute // For upgraded connections, also emit this after parser.execute
if (parser.incoming._paused || parser.incoming._pendings.length) { stream._readableState.onread(null, null);
parser.incoming._pendings.push(END_OF_FILE);
} else {
parser.incoming.readable = false;
parser.incoming._emitEnd();
}
}
if (parser.socket.readable) { if (parser.socket.readable) {
// force to read the next incoming message // force to read the next incoming message
parser.socket.resume(); socket.resume();
} }
} }
@ -263,9 +268,13 @@ function utcDate() {
/* Abstract base class for ServerRequest and ClientResponse. */ /* Abstract base class for ServerRequest and ClientResponse. */
function IncomingMessage(socket) { function IncomingMessage(socket) {
Stream.call(this); Stream.Readable.call(this);
// XXX This implementation is kind of all over the place
// When the parser emits body chunks, they go in this list.
// _read() pulls them out, and when it finds EOF, it ends.
this._pendings = [];
// TODO Remove one of these eventually.
this.socket = socket; this.socket = socket;
this.connection = socket; this.connection = socket;
@ -276,77 +285,49 @@ function IncomingMessage(socket) {
this.readable = true; this.readable = true;
this._paused = false;
this._pendings = []; this._pendings = [];
this._pendingIndex = 0;
this._endEmitted = false;
// request (server) only // request (server) only
this.url = ''; this.url = '';
this.method = null; this.method = null;
// response (client) only // response (client) only
this.statusCode = null; this.statusCode = null;
this.client = this.socket; this.client = this.socket;
// flag for backwards compatibility grossness.
this._consuming = false;
} }
util.inherits(IncomingMessage, Stream); util.inherits(IncomingMessage, Stream.Readable);
exports.IncomingMessage = IncomingMessage; exports.IncomingMessage = IncomingMessage;
IncomingMessage.prototype.destroy = function(error) { IncomingMessage.prototype.read = function(n) {
this.socket.destroy(error); this._consuming = true;
return Stream.Readable.prototype.read.call(this, n);
}; };
IncomingMessage.prototype.setEncoding = function(encoding) { IncomingMessage.prototype._read = function(n, callback) {
var StringDecoder = require('string_decoder').StringDecoder; // lazy load // We actually do almost nothing here, because the parserOnBody
this._decoder = new StringDecoder(encoding); // function fills up our internal buffer directly. However, we
}; // do need to unpause the underlying socket so that it flows.
if (!this.socket.readable)
return callback(null, null);
IncomingMessage.prototype.pause = function() { else
this._paused = true; this.socket.resume();
this.socket.pause();
}; };
IncomingMessage.prototype.resume = function() { IncomingMessage.prototype.destroy = function(error) {
this._paused = false; this.socket.destroy(error);
if (this.socket) {
this.socket.resume();
}
this._emitPending();
}; };
IncomingMessage.prototype._emitPending = function(callback) {
if (this._pendings.length) {
var self = this;
process.nextTick(function() {
while (!self._paused && self._pendings.length) {
var chunk = self._pendings.shift();
if (chunk !== END_OF_FILE) {
assert(Buffer.isBuffer(chunk));
self._emitData(chunk);
} else {
assert(self._pendings.length === 0);
self.readable = false;
self._emitEnd();
}
}
if (callback) {
callback();
}
});
} else if (callback) {
callback();
}
};
IncomingMessage.prototype._emitData = function(d) { IncomingMessage.prototype._emitData = function(d) {
@ -1016,7 +997,7 @@ ServerResponse.prototype.writeHead = function(statusCode) {
// don't keep alive connections where the client expects 100 Continue // don't keep alive connections where the client expects 100 Continue
// but we sent a final status; they may put extra bytes on the wire. // but we sent a final status; they may put extra bytes on the wire.
if (this._expect_continue && ! this._sent100) { if (this._expect_continue && !this._sent100) {
this.shouldKeepAlive = false; this.shouldKeepAlive = false;
} }
@ -1321,11 +1302,10 @@ function socketCloseListener() {
// Socket closed before we emitted 'end' below. // Socket closed before we emitted 'end' below.
req.res.emit('aborted'); req.res.emit('aborted');
var res = req.res; var res = req.res;
req.res._emitPending(function() { res.on('end', function() {
res._emitEnd();
res.emit('close'); res.emit('close');
res = null;
}); });
res._readableState.onread(null, null);
} else if (!req.res && !req._hadError) { } else if (!req.res && !req._hadError) {
// This socket error fired before we started to // This socket error fired before we started to
// receive a response. The error needs to // receive a response. The error needs to
@ -1428,11 +1408,13 @@ function socketOnData(d, start, end) {
} }
// client
function parserOnIncomingClient(res, shouldKeepAlive) { function parserOnIncomingClient(res, shouldKeepAlive) {
var parser = this; var parser = this;
var socket = this.socket; var socket = this.socket;
var req = socket._httpMessage; var req = socket._httpMessage;
// propogate "domain" setting... // propogate "domain" setting...
if (req.domain && !res.domain) { if (req.domain && !res.domain) {
debug('setting "res.domain"'); debug('setting "res.domain"');
@ -1480,15 +1462,21 @@ function parserOnIncomingClient(res, shouldKeepAlive) {
DTRACE_HTTP_CLIENT_RESPONSE(socket, req); DTRACE_HTTP_CLIENT_RESPONSE(socket, req);
COUNTER_HTTP_CLIENT_RESPONSE(); COUNTER_HTTP_CLIENT_RESPONSE();
req.emit('response', res);
req.res = res; req.res = res;
res.req = req; res.req = req;
var handled = req.emit('response', res);
res.on('end', responseOnEnd); res.on('end', responseOnEnd);
// If the user did not listen for the 'response' event, then they
// can't possibly read the data, so we .resume() it into the void
// so that the socket doesn't hang there in a paused state.
if (!handled)
res.resume();
return isHeadResponse; return isHeadResponse;
} }
// client
function responseOnEnd() { function responseOnEnd() {
var res = this; var res = this;
var req = res.req; var req = res.req;
@ -1784,7 +1772,7 @@ function connectionListener(socket) {
incoming.push(req); incoming.push(req);
var res = new ServerResponse(req); var res = new ServerResponse(req);
debug('server response shouldKeepAlive: ' + shouldKeepAlive);
res.shouldKeepAlive = shouldKeepAlive; res.shouldKeepAlive = shouldKeepAlive;
DTRACE_HTTP_SERVER_REQUEST(req, socket); DTRACE_HTTP_SERVER_REQUEST(req, socket);
COUNTER_HTTP_SERVER_REQUEST(); COUNTER_HTTP_SERVER_REQUEST();
@ -1806,6 +1794,12 @@ function connectionListener(socket) {
incoming.shift(); incoming.shift();
// if the user never called req.read(), and didn't pipe() or
// .resume() or .on('data'), then we call req.resume() so that the
// bytes will be pulled off the wire.
if (!req._consuming)
req.resume();
res.detachSocket(socket); res.detachSocket(socket);
if (res._last) { if (res._last) {

Loading…
Cancel
Save