Browse Source

http: refactor server connection handling

PR-URL: https://github.com/nodejs/node/pull/6533
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: James M Snell <jasnell@gmail.com>
Reviewed-By: Fedor Indutny <fedor.indutny@gmail.com>
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
v6
Brian White 8 years ago
parent
commit
832271c88f
No known key found for this signature in database GPG Key ID: 606D7358F94DA209
  1. 440
      lib/_http_server.js

440
lib/_http_server.js

@ -264,40 +264,6 @@ exports.Server = Server;
function connectionListener(socket) {
var self = this;
var outgoing = [];
var incoming = [];
var outgoingData = 0;
function updateOutgoingData(delta) {
// `outgoingData` is an approximate amount of bytes queued through all
// inactive responses. If more data than the high watermark is queued - we
// need to pause TCP socket/HTTP parser, and wait until the data will be
// sent to the client.
outgoingData += delta;
if (socket._paused && outgoingData < socket._writableState.highWaterMark)
return socketOnDrain();
}
function abortIncoming() {
while (incoming.length) {
var req = incoming.shift();
req.emit('aborted');
req.emit('close');
}
// abort socket._httpMessage ?
}
function serverSocketCloseListener() {
debug('server socket close');
// mark this parser as reusable
if (this.parser) {
freeParser(this.parser, null, this);
}
abortIncoming();
}
debug('SERVER new http connection');
httpSocketSetup(socket);
@ -305,18 +271,9 @@ function connectionListener(socket) {
// If the user has added a listener to the server,
// request, or response, then it's their responsibility.
// otherwise, destroy on timeout by default
if (self.timeout)
socket.setTimeout(self.timeout);
socket.on('timeout', function() {
var req = socket.parser && socket.parser.incoming;
var reqTimeout = req && !req.complete && req.emit('timeout', socket);
var res = socket._httpMessage;
var resTimeout = res && res.emit('timeout', socket);
var serverTimeout = self.emit('timeout', socket);
if (!reqTimeout && !resTimeout && !serverTimeout)
socket.destroy();
});
if (this.timeout)
socket.setTimeout(this.timeout);
socket.on('timeout', socketOnTimeout.bind(undefined, this, socket));
var parser = parsers.alloc();
parser.reinitialize(HTTPParser.REQUEST);
@ -332,17 +289,34 @@ function connectionListener(socket) {
parser.maxHeaderPairs = 2000;
}
socket.addListener('error', socketOnError);
socket.addListener('close', serverSocketCloseListener);
parser.onIncoming = parserOnIncoming;
socket.on('end', socketOnEnd);
socket.on('data', socketOnData);
var state = {
onData: null,
onError: null,
onEnd: null,
onClose: null,
outgoing: [],
incoming: [],
// `outgoingData` is an approximate amount of bytes queued through all
// inactive responses. If more data than the high watermark is queued - we
// need to pause TCP socket/HTTP parser, and wait until the data will be
// sent to the client.
outgoingData: 0
};
state.onData = socketOnData.bind(undefined, this, socket, parser, state);
state.onError = socketOnError.bind(undefined, this, socket, state);
state.onEnd = socketOnEnd.bind(undefined, this, socket, parser, state);
state.onClose = socketOnClose.bind(undefined, socket, state);
socket.on('data', state.onData);
socket.on('error', state.onError);
socket.on('end', state.onEnd);
socket.on('close', state.onClose);
parser.onIncoming = parserOnIncoming.bind(undefined, this, socket, state);
// We are consuming socket, so it won't get any actual data
socket.on('resume', onSocketResume);
socket.on('pause', onSocketPause);
socket.on('drain', socketOnDrain);
socket.on('drain', socketOnDrain.bind(undefined, socket, state));
// Override on to unconsume on `data`, `readable` listeners
socket.on = socketOnWrap;
@ -352,205 +326,243 @@ function connectionListener(socket) {
parser._consumed = true;
parser.consume(external);
}
external = null;
parser[kOnExecute] = onParserExecute;
parser[kOnExecute] =
onParserExecute.bind(undefined, this, socket, parser, state);
// TODO(isaacs): Move all these functions out of here
function socketOnError(e) {
// Ignore further errors
this.removeListener('error', socketOnError);
this.on('error', () => {});
socket._paused = false;
}
exports._connectionListener = connectionListener;
if (!self.emit('clientError', e, this))
this.destroy(e);
function updateOutgoingData(socket, state, delta) {
state.outgoingData += delta;
if (socket._paused &&
state.outgoingData < socket._writableState.highWaterMark) {
return socketOnDrain(socket, state);
}
}
function socketOnData(d) {
assert(!socket._paused);
debug('SERVER socketOnData %d', d.length);
var ret = parser.execute(d);
function socketOnDrain(socket, state) {
var needPause = state.outgoingData > socket._writableState.highWaterMark;
onParserExecuteCommon(ret, d);
// If we previously paused, then start reading again.
if (socket._paused && !needPause) {
socket._paused = false;
if (socket.parser)
socket.parser.resume();
socket.resume();
}
}
function socketOnTimeout(server, socket) {
var req = socket.parser && socket.parser.incoming;
var reqTimeout = req && !req.complete && req.emit('timeout', socket);
var res = socket._httpMessage;
var resTimeout = res && res.emit('timeout', socket);
var serverTimeout = server.emit('timeout', socket);
if (!reqTimeout && !resTimeout && !serverTimeout)
socket.destroy();
}
function onParserExecute(ret, d) {
socket._unrefTimer();
debug('SERVER socketOnParserExecute %d', ret);
onParserExecuteCommon(ret, undefined);
function socketOnClose(socket, state) {
debug('server socket close');
// mark this parser as reusable
if (socket.parser) {
freeParser(socket.parser, null, socket);
}
function onParserExecuteCommon(ret, d) {
if (ret instanceof Error) {
debug('parse error');
socketOnError.call(socket, ret);
} else if (parser.incoming && parser.incoming.upgrade) {
// Upgrade or CONNECT
var bytesParsed = ret;
var req = parser.incoming;
debug('SERVER upgrade or connect', req.method);
if (!d)
d = parser.getCurrentBuffer();
socket.removeListener('data', socketOnData);
socket.removeListener('end', socketOnEnd);
socket.removeListener('close', serverSocketCloseListener);
unconsume(parser, socket);
parser.finish();
freeParser(parser, req, null);
parser = null;
var eventName = req.method === 'CONNECT' ? 'connect' : 'upgrade';
if (self.listenerCount(eventName) > 0) {
debug('SERVER have listener for %s', eventName);
var bodyHead = d.slice(bytesParsed, d.length);
// TODO(isaacs): Need a way to reset a stream to fresh state
// IE, not flowing, and not explicitly paused.
socket._readableState.flowing = null;
self.emit(eventName, req, socket, bodyHead);
} else {
// Got upgrade header or CONNECT method, but have no handler.
socket.destroy();
}
}
abortIncoming(state.incoming);
}
if (socket._paused && socket.parser) {
// onIncoming paused the socket, we should pause the parser as well
debug('pause parser');
socket.parser.pause();
}
function abortIncoming(incoming) {
while (incoming.length) {
var req = incoming.shift();
req.emit('aborted');
req.emit('close');
}
// abort socket._httpMessage ?
}
function socketOnEnd() {
var socket = this;
var ret = parser.finish();
function socketOnEnd(server, socket, parser, state) {
var ret = parser.finish();
if (ret instanceof Error) {
debug('parse error');
socketOnError.call(socket, ret);
return;
}
if (ret instanceof Error) {
debug('parse error');
state.onError(ret);
return;
}
if (!self.httpAllowHalfOpen) {
abortIncoming();
if (socket.writable) socket.end();
} else if (outgoing.length) {
outgoing[outgoing.length - 1]._last = true;
} else if (socket._httpMessage) {
socket._httpMessage._last = true;
} else {
if (socket.writable) socket.end();
}
if (!server.httpAllowHalfOpen) {
abortIncoming(state.incoming);
if (socket.writable) socket.end();
} else if (state.outgoing.length) {
state.outgoing[state.outgoing.length - 1]._last = true;
} else if (socket._httpMessage) {
socket._httpMessage._last = true;
} else {
if (socket.writable) socket.end();
}
}
function socketOnData(server, socket, parser, state, d) {
assert(!socket._paused);
debug('SERVER socketOnData %d', d.length);
var ret = parser.execute(d);
// The following callback is issued after the headers have been read on a
// new message. In this callback we setup the response object and pass it
// to the user.
onParserExecuteCommon(server, socket, parser, state, ret, d);
}
socket._paused = false;
function socketOnDrain() {
var needPause = outgoingData > socket._writableState.highWaterMark;
// If we previously paused, then start reading again.
if (socket._paused && !needPause) {
socket._paused = false;
if (socket.parser)
socket.parser.resume();
socket.resume();
function onParserExecute(server, socket, parser, state, ret, d) {
socket._unrefTimer();
debug('SERVER socketOnParserExecute %d', ret);
onParserExecuteCommon(server, socket, parser, state, ret, undefined);
}
function socketOnError(server, socket, state, e) {
// Ignore further errors
socket.removeListener('error', state.onError);
socket.on('error', () => {});
if (!server.emit('clientError', e, socket))
socket.destroy(e);
}
function onParserExecuteCommon(server, socket, parser, state, ret, d) {
if (ret instanceof Error) {
debug('parse error');
state.onError(ret);
} else if (parser.incoming && parser.incoming.upgrade) {
// Upgrade or CONNECT
var bytesParsed = ret;
var req = parser.incoming;
debug('SERVER upgrade or connect', req.method);
if (!d)
d = parser.getCurrentBuffer();
socket.removeListener('data', state.onData);
socket.removeListener('end', state.onEnd);
socket.removeListener('close', state.onClose);
unconsume(parser, socket);
parser.finish();
freeParser(parser, req, null);
parser = null;
var eventName = req.method === 'CONNECT' ? 'connect' : 'upgrade';
if (server.listenerCount(eventName) > 0) {
debug('SERVER have listener for %s', eventName);
var bodyHead = d.slice(bytesParsed, d.length);
// TODO(isaacs): Need a way to reset a stream to fresh state
// IE, not flowing, and not explicitly paused.
socket._readableState.flowing = null;
server.emit(eventName, req, socket, bodyHead);
} else {
// Got upgrade header or CONNECT method, but have no handler.
socket.destroy();
}
}
function parserOnIncoming(req, shouldKeepAlive) {
incoming.push(req);
// If the writable end isn't consuming, then stop reading
// so that we don't become overwhelmed by a flood of
// pipelined requests that may never be resolved.
if (!socket._paused) {
var needPause = socket._writableState.needDrain ||
outgoingData >= socket._writableState.highWaterMark;
if (needPause) {
socket._paused = true;
// We also need to pause the parser, but don't do that until after
// the call to execute, because we may still be processing the last
// chunk.
socket.pause();
}
}
if (socket._paused && socket.parser) {
// onIncoming paused the socket, we should pause the parser as well
debug('pause parser');
socket.parser.pause();
}
}
var res = new ServerResponse(req);
res._onPendingData = updateOutgoingData;
function resOnFinish(req, res, socket, state) {
// Usually the first incoming element should be our request. it may
// be that in the case abortIncoming() was called that the incoming
// array will be empty.
assert(state.incoming.length === 0 || state.incoming[0] === req);
res.shouldKeepAlive = shouldKeepAlive;
DTRACE_HTTP_SERVER_REQUEST(req, socket);
LTTNG_HTTP_SERVER_REQUEST(req, socket);
COUNTER_HTTP_SERVER_REQUEST();
state.incoming.shift();
if (socket._httpMessage) {
// There are already pending outgoing res, append.
outgoing.push(res);
} else {
res.assignSocket(socket);
// if the user never called req.read(), and didn't pipe() or
// .resume() or .on('data'), then we call req._dump() so that the
// bytes will be pulled off the wire.
if (!req._consuming && !req._readableState.resumeScheduled)
req._dump();
res.detachSocket(socket);
if (res._last) {
socket.destroySoon();
} else {
// start sending the next message
var m = state.outgoing.shift();
if (m) {
m.assignSocket(socket);
}
}
}
// When we're finished writing the response, check if this is the last
// response, if so destroy the socket.
res.on('finish', resOnFinish);
function resOnFinish() {
// Usually the first incoming element should be our request. it may
// be that in the case abortIncoming() was called that the incoming
// array will be empty.
assert(incoming.length === 0 || incoming[0] === req);
// The following callback is issued after the headers have been read on a
// new message. In this callback we setup the response object and pass it
// to the user.
function parserOnIncoming(server, socket, state, req, keepAlive) {
state.incoming.push(req);
// If the writable end isn't consuming, then stop reading
// so that we don't become overwhelmed by a flood of
// pipelined requests that may never be resolved.
if (!socket._paused) {
var needPause = socket._writableState.needDrain ||
state.outgoingData >= socket._writableState.highWaterMark;
if (needPause) {
socket._paused = true;
// We also need to pause the parser, but don't do that until after
// the call to execute, because we may still be processing the last
// chunk.
socket.pause();
}
}
incoming.shift();
var res = new ServerResponse(req);
res._onPendingData = updateOutgoingData.bind(undefined, socket, state);
// if the user never called req.read(), and didn't pipe() or
// .resume() or .on('data'), then we call req._dump() so that the
// bytes will be pulled off the wire.
if (!req._consuming && !req._readableState.resumeScheduled)
req._dump();
res.shouldKeepAlive = keepAlive;
DTRACE_HTTP_SERVER_REQUEST(req, socket);
LTTNG_HTTP_SERVER_REQUEST(req, socket);
COUNTER_HTTP_SERVER_REQUEST();
res.detachSocket(socket);
if (socket._httpMessage) {
// There are already pending outgoing res, append.
state.outgoing.push(res);
} else {
res.assignSocket(socket);
}
if (res._last) {
socket.destroySoon();
} else {
// start sending the next message
var m = outgoing.shift();
if (m) {
m.assignSocket(socket);
}
}
}
// When we're finished writing the response, check if this is the last
// response, if so destroy the socket.
var finish =
resOnFinish.bind(undefined, req, res, socket, state);
res.on('finish', finish);
if (req.headers.expect !== undefined &&
(req.httpVersionMajor === 1 && req.httpVersionMinor === 1)) {
if (continueExpression.test(req.headers.expect)) {
res._expect_continue = true;
if (req.headers.expect !== undefined &&
(req.httpVersionMajor === 1 && req.httpVersionMinor === 1)) {
if (continueExpression.test(req.headers.expect)) {
res._expect_continue = true;
if (self.listenerCount('checkContinue') > 0) {
self.emit('checkContinue', req, res);
} else {
res.writeContinue();
self.emit('request', req, res);
}
if (server.listenerCount('checkContinue') > 0) {
server.emit('checkContinue', req, res);
} else {
if (self.listenerCount('checkExpectation') > 0) {
self.emit('checkExpectation', req, res);
} else {
res.writeHead(417);
res.end();
}
res.writeContinue();
server.emit('request', req, res);
}
} else {
self.emit('request', req, res);
if (server.listenerCount('checkExpectation') > 0) {
server.emit('checkExpectation', req, res);
} else {
res.writeHead(417);
res.end();
}
}
return false; // Not a HEAD response. (Not even a response!)
} else {
server.emit('request', req, res);
}
return false; // Not a HEAD response. (Not even a response!)
}
exports._connectionListener = connectionListener;
function onSocketResume() {
// It may seem that the socket is resumed, but this is an enemy's trick to

Loading…
Cancel
Save