Browse Source

http: Refactor for streams2

Because of some of the peculiarities of http, this has a bit of special
magic to handle cases where the IncomingMessage would wait forever in a
paused state.

In the server, if you do not begin consuming the request body by the
time the response emits 'finish', then it will be flushed out.

In the client, if you do not add a 'response' handler onto the request,
then the response stream will be flushed out.
v0.9.4-release
isaacs 12 years ago
parent
commit
1d369317ea
  1. 134
      lib/http.js

134
lib/http.js

@ -114,19 +114,30 @@ function parserOnHeadersComplete(info) {
return skipBody;
}
// XXX This is a mess.
// TODO: http.Parser should be a Writable emits request/response events.
function parserOnBody(b, start, len) {
var parser = this;
var slice = b.slice(start, start + len);
if (parser.incoming._paused || parser.incoming._pendings.length) {
parser.incoming._pendings.push(slice);
} else {
parser.incoming._emitData(slice);
var stream = parser.incoming;
var rs = stream._readableState;
var socket = stream.socket;
// pretend this was the result of a stream._read call.
if (len > 0) {
var slice = b.slice(start, start + len);
rs.onread(null, slice);
}
if (rs.length >= rs.highWaterMark)
socket.pause();
}
function parserOnMessageComplete() {
var parser = this;
parser.incoming.complete = true;
var stream = parser.incoming;
var socket = stream.socket;
stream.complete = true;
// Emit any trailing headers.
var headers = parser._headers;
@ -140,19 +151,13 @@ function parserOnMessageComplete() {
parser._url = '';
}
if (!parser.incoming.upgrade) {
if (!stream.upgrade)
// For upgraded connections, also emit this after parser.execute
if (parser.incoming._paused || parser.incoming._pendings.length) {
parser.incoming._pendings.push(END_OF_FILE);
} else {
parser.incoming.readable = false;
parser.incoming._emitEnd();
}
}
stream._readableState.onread(null, null);
if (parser.socket.readable) {
// force to read the next incoming message
parser.socket.resume();
socket.resume();
}
}
@ -263,9 +268,13 @@ function utcDate() {
/* Abstract base class for ServerRequest and ClientResponse. */
function IncomingMessage(socket) {
Stream.call(this);
Stream.Readable.call(this);
// XXX This implementation is kind of all over the place
// When the parser emits body chunks, they go in this list.
// _read() pulls them out, and when it finds EOF, it ends.
this._pendings = [];
// TODO Remove one of these eventually.
this.socket = socket;
this.connection = socket;
@ -276,77 +285,49 @@ function IncomingMessage(socket) {
this.readable = true;
this._paused = false;
this._pendings = [];
this._endEmitted = false;
this._pendingIndex = 0;
// request (server) only
this.url = '';
this.method = null;
// response (client) only
this.statusCode = null;
this.client = this.socket;
// flag for backwards compatibility grossness.
this._consuming = false;
}
util.inherits(IncomingMessage, Stream);
util.inherits(IncomingMessage, Stream.Readable);
exports.IncomingMessage = IncomingMessage;
IncomingMessage.prototype.destroy = function(error) {
this.socket.destroy(error);
IncomingMessage.prototype.read = function(n) {
this._consuming = true;
return Stream.Readable.prototype.read.call(this, n);
};
IncomingMessage.prototype.setEncoding = function(encoding) {
var StringDecoder = require('string_decoder').StringDecoder; // lazy load
this._decoder = new StringDecoder(encoding);
};
IncomingMessage.prototype.pause = function() {
this._paused = true;
this.socket.pause();
IncomingMessage.prototype._read = function(n, callback) {
// We actually do almost nothing here, because the parserOnBody
// function fills up our internal buffer directly. However, we
// do need to unpause the underlying socket so that it flows.
if (!this.socket.readable)
return callback(null, null);
else
this.socket.resume();
};
IncomingMessage.prototype.resume = function() {
this._paused = false;
if (this.socket) {
this.socket.resume();
}
this._emitPending();
IncomingMessage.prototype.destroy = function(error) {
this.socket.destroy(error);
};
IncomingMessage.prototype._emitPending = function(callback) {
if (this._pendings.length) {
var self = this;
process.nextTick(function() {
while (!self._paused && self._pendings.length) {
var chunk = self._pendings.shift();
if (chunk !== END_OF_FILE) {
assert(Buffer.isBuffer(chunk));
self._emitData(chunk);
} else {
assert(self._pendings.length === 0);
self.readable = false;
self._emitEnd();
}
}
if (callback) {
callback();
}
});
} else if (callback) {
callback();
}
};
IncomingMessage.prototype._emitData = function(d) {
@ -1016,7 +997,7 @@ ServerResponse.prototype.writeHead = function(statusCode) {
// don't keep alive connections where the client expects 100 Continue
// but we sent a final status; they may put extra bytes on the wire.
if (this._expect_continue && ! this._sent100) {
if (this._expect_continue && !this._sent100) {
this.shouldKeepAlive = false;
}
@ -1321,11 +1302,10 @@ function socketCloseListener() {
// Socket closed before we emitted 'end' below.
req.res.emit('aborted');
var res = req.res;
req.res._emitPending(function() {
res._emitEnd();
res.on('end', function() {
res.emit('close');
res = null;
});
res._readableState.onread(null, null);
} else if (!req.res && !req._hadError) {
// This socket error fired before we started to
// receive a response. The error needs to
@ -1428,11 +1408,13 @@ function socketOnData(d, start, end) {
}
// client
function parserOnIncomingClient(res, shouldKeepAlive) {
var parser = this;
var socket = this.socket;
var req = socket._httpMessage;
// propogate "domain" setting...
if (req.domain && !res.domain) {
debug('setting "res.domain"');
@ -1480,15 +1462,21 @@ function parserOnIncomingClient(res, shouldKeepAlive) {
DTRACE_HTTP_CLIENT_RESPONSE(socket, req);
COUNTER_HTTP_CLIENT_RESPONSE();
req.emit('response', res);
req.res = res;
res.req = req;
var handled = req.emit('response', res);
res.on('end', responseOnEnd);
// If the user did not listen for the 'response' event, then they
// can't possibly read the data, so we .resume() it into the void
// so that the socket doesn't hang there in a paused state.
if (!handled)
res.resume();
return isHeadResponse;
}
// client
function responseOnEnd() {
var res = this;
var req = res.req;
@ -1784,7 +1772,7 @@ function connectionListener(socket) {
incoming.push(req);
var res = new ServerResponse(req);
debug('server response shouldKeepAlive: ' + shouldKeepAlive);
res.shouldKeepAlive = shouldKeepAlive;
DTRACE_HTTP_SERVER_REQUEST(req, socket);
COUNTER_HTTP_SERVER_REQUEST();
@ -1806,6 +1794,12 @@ function connectionListener(socket) {
incoming.shift();
// if the user never called req.read(), and didn't pipe() or
// .resume() or .on('data'), then we call req.resume() so that the
// bytes will be pulled off the wire.
if (!req._consuming)
req.resume();
res.detachSocket(socket);
if (res._last) {

Loading…
Cancel
Save