Browse Source

first pass at http refactor for TLS

v0.7.4-release
Ryan Dahl 14 years ago
parent
commit
60aea96f84
  1. 508
      lib/http.js

508
lib/http.js

@ -1,8 +1,10 @@
var util = require('util');
var net = require('net');
var stream = require('stream');
var EventEmitter = require('events').EventEmitter;
var FreeList = require('freelist').FreeList;
var HTTPParser = process.binding('http_parser').HTTPParser;
var assert = process.assert;
var debug;
@ -284,13 +286,9 @@ IncomingMessage.prototype._addHeaderLine = function(field, value) {
};
function OutgoingMessage(socket) {
function OutgoingMessage() {
stream.Stream.call(this);
// TODO Remove one of these eventually.
this.socket = socket;
this.connection = socket;
this.output = [];
this.outputEncodings = [];
@ -312,6 +310,15 @@ util.inherits(OutgoingMessage, stream.Stream);
exports.OutgoingMessage = OutgoingMessage;
OutgoingMessage.prototype.assignSocket = function(socket) {
assert(!socket._httpMessage);
socket._httpMessage = this;
this.socket = socket;
this.connection = socket;
this._flush();
};
OutgoingMessage.prototype.destroy = function(error) {
this.socket.destroy(error);
};
@ -336,7 +343,7 @@ OutgoingMessage.prototype._send = function(data, encoding) {
OutgoingMessage.prototype._writeRaw = function(data, encoding) {
if (this.connection._outgoing[0] === this && this.connection.writable) {
if (this.connection._httpMessage === this && this.connection.writable) {
// There might be pending data in the this.output buffer.
while (this.output.length) {
if (!this.connection.writable) {
@ -550,7 +557,7 @@ OutgoingMessage.prototype.end = function(data, encoding) {
data.length > 0 &&
this.output.length === 0 &&
this.connection.writable &&
this.connection._outgoing[0] === this;
this.connection._httpMessage === this;
if (hot) {
// Hot path. They're doing
@ -585,17 +592,69 @@ OutgoingMessage.prototype.end = function(data, encoding) {
// There is the first message on the outgoing queue, and we've sent
// everything to the socket.
if (this.output.length === 0 && this.connection._outgoing[0] === this) {
debug('outgoing message end. shifting because was flushed');
this.connection._onOutgoingSent();
if (this.output.length === 0 && this.connection._httpMessage === this) {
debug('outgoing message end.');
this._finish();
}
return ret;
};
OutgoingMessage.prototype._finish = function() {
this.socket._httpMessage = null;
this.socket = this.connection = null;
this.emit('finish');
};
OutgoingMessage.prototype._flush = function() {
// This logic is probably a bit confusing. Let me explain a bit:
//
// In both HTTP servers and clients it is possible to queue up several
// outgoing messages. This is easiest to imagine in the case of a client.
// Take the following situation:
//
// req1 = client.request('GET', '/');
// req2 = client.request('POST', '/');
//
// When the user does
//
// req2.write('hello world\n');
//
// it's possible that the first request has not been completely flushed to
// the socket yet. Thus the outgoing messages need to be prepared to queue
// up data internally before sending it on further to the socket's queue.
//
// This function, outgoingFlush(), is called by both the Server and Client
// to attempt to flush any pending messages out to the socket.
if (!this.socket) return;
var ret;
while (this.output.length) {
if (!this.socket.writable) return; // XXX Necessary?
var data = this.output.shift();
var encoding = this.outputEncodings.shift();
ret = this.socket.write(data, encoding);
}
if (this.finished) {
// This is a queue to the server or client to bring in the next this.
this._finish();
} else if (ret) {
this.emit('drain');
}
};
function ServerResponse(req) {
OutgoingMessage.call(this, req.socket);
OutgoingMessage.call(this);
if (req.method === 'HEAD') this._hasBody = false;
@ -666,19 +725,30 @@ ServerResponse.prototype.writeHeader = function() {
};
function ClientRequest(socket, method, url, headers) {
OutgoingMessage.call(this, socket);
function ClientRequest(options) {
OutgoingMessage.call(this);
var method = this.method = (options.method || 'GET').toUpperCase();
var path = options.path || '/';
var headers = options.headers;
// Host header set by default.
if (options.host && !(headers.host || headers.Host || headers.HOST)) {
headers.Host = options.host;
}
this.method = method = method.toUpperCase();
this.shouldKeepAlive = false;
if (method === 'GET' || method === 'HEAD') {
this.useChunkedEncodingByDefault = false;
} else {
this.useChunkedEncodingByDefault = true;
}
// By default keep-alive is off. This is the last message unless otherwise
// specified.
this._last = true;
this._storeHeader(method + ' ' + url + ' HTTP/1.1\r\n', headers);
this._storeHeader(method + ' ' + path + ' HTTP/1.1\r\n', headers);
}
util.inherits(ClientRequest, OutgoingMessage);
@ -686,58 +756,12 @@ util.inherits(ClientRequest, OutgoingMessage);
exports.ClientRequest = ClientRequest;
function outgoingFlush(socket) {
// This logic is probably a bit confusing. Let me explain a bit:
//
// In both HTTP servers and clients it is possible to queue up several
// outgoing messages. This is easiest to imagine in the case of a client.
// Take the following situation:
//
// req1 = client.request('GET', '/');
// req2 = client.request('POST', '/');
//
// When the user does
//
// req2.write('hello world\n');
//
// it's possible that the first request has not been completely flushed to
// the socket yet. Thus the outgoing messages need to be prepared to queue
// up data internally before sending it on further to the socket's queue.
//
// This function, outgoingFlush(), is called by both the Server and Client
// to attempt to flush any pending messages out to the socket.
var message = socket._outgoing[0];
if (!message) return;
var ret;
while (message.output.length) {
if (!socket.writable) return; // XXX Necessary?
var data = message.output.shift();
var encoding = message.outputEncodings.shift();
ret = socket.write(data, encoding);
}
if (message.finished) {
socket._onOutgoingSent();
} else if (ret) {
message.emit('drain');
}
}
function httpSocketSetup(socket) {
// An array of outgoing messages for the socket. In pipelined connections
// we need to keep track of the order they were sent.
socket._outgoing = [];
// NOTE: be sure not to use ondrain elsewhere in this file!
socket.ondrain = function() {
var message = socket._outgoing[0];
if (message) message.emit('drain');
if (socket._httpMessage) {
socket._httpMessage.emit('drain');
}
};
}
@ -765,6 +789,7 @@ exports.createServer = function(requestListener) {
function connectionListener(socket) {
var self = this;
var outgoing = [];
debug('SERVER new http connection');
@ -811,9 +836,10 @@ function connectionListener(socket) {
socket.onend = function() {
parser.finish();
if (socket._outgoing.length) {
socket._outgoing[socket._outgoing.length - 1]._last = true;
outgoingFlush(socket);
if (outgoing.length) {
outgoing[outgoing.length - 1]._last = true;
} else if (socket._httpMessage) {
socket._httpMessage._last = true;
} else {
socket.end();
}
@ -824,21 +850,6 @@ function connectionListener(socket) {
parsers.free(parser);
});
// At the end of each response message, after it has been flushed to the
// socket. Here we insert logic about what to do next.
socket._onOutgoingSent = function(message) {
var message = socket._outgoing.shift();
if (message._last) {
// No more messages to be pushed out.
socket.destroySoon();
} else if (socket._outgoing.length) {
// Push out the next message.
outgoingFlush(socket);
}
};
// The following callback is issued after the headers have been read on a
// new message. In this callback we setup the response object and pass it
// to the user.
@ -846,7 +857,27 @@ function connectionListener(socket) {
var res = new ServerResponse(req);
debug('server response shouldKeepAlive: ' + shouldKeepAlive);
res.shouldKeepAlive = shouldKeepAlive;
socket._outgoing.push(res);
if (socket._httpMessage) {
// There are already pending outgoing res, append.
outgoing.push(res);
} else {
res.assignSocket(socket);
}
// When we're finished writing the response, check if this is the last
// respose, if so destroy the socket.
res.on('finish', function() {
if (res._last) {
socket.destroySoon();
} else {
// start sending the next message
var m = outgoing.shift();
if (m) {
m.assignSocket(socket);
}
}
});
if ('expect' in req.headers &&
(req.httpVersionMajor == 1 && req.httpVersionMinor == 1) &&
@ -867,280 +898,123 @@ function connectionListener(socket) {
exports._connectionListener = connectionListener;
function Client() {
if (!(this instanceof Client)) return new Client();
net.Stream.call(this, { allowHalfOpen: true });
var self = this;
// Possible states:
// - disconnected
// - connecting
// - connected
this._state = 'disconnected';
httpSocketSetup(self);
function Agent(host, port) {
this.host = host;
this.port = port;
function onData(d, start, end) {
if (!self.parser) {
throw new Error('parser not initialized prior to Client.ondata call');
this.queue = [];
this.sockets = [];
this.maxSockets = 5;
}
var ret = self.parser.execute(d, start, end - start);
if (ret instanceof Error) {
self.destroy(ret);
} else if (self.parser.incoming && self.parser.incoming.upgrade) {
var bytesParsed = ret;
self.ondata = null;
self.onend = null;
var req = self.parser.incoming;
util.inherits(Agent, EventEmitter);
var upgradeHead = d.slice(start + bytesParsed + 1, end);
if (self.listeners('upgrade').length) {
self.emit('upgrade', req, self, upgradeHead);
} else {
self.destroy();
}
}
};
self.addListener('connect', function() {
debug('CLIENT connected');
self.ondata = onData;
self.onend = onEnd;
Agent.prototype.appendMessage = function(options) {
var self = this;
self._state = 'connected';
var req = new ClientRequest(options);
this.queue.push(req);
self._initParser();
outgoingFlush(self);
req.on('finish', function () {
self._cycle();
});
function onEnd() {
if (self.parser) self.parser.finish();
debug('CLIENT got end closing. state = ' + self._state);
self.end();
this._cycle();
};
self.addListener('close', function(e) {
self._state = 'disconnected';
if (e) return;
debug('CLIENT onClose. state = ' + self._state);
// finally done with the request
self._outgoing.shift();
Agent.prototype._establishNewConnection = function(socket, message) {
var self = this;
assert(this.sockets.length < this.maxSockets);
// If there are more requests to handle, reconnect.
if (self._outgoing.length) {
self._ensureConnection();
} else if (self.parser) {
parsers.free(self.parser);
self.parser = null;
}
// Grab a new "socket". Depending on the implementation of _getConnection
// this could either be a raw TCP socket or a TLS stream.
var socket = this._getConnection(this.host, this.port, function () {
self._cycle();
});
}
util.inherits(Client, net.Stream);
this.sockets.push(socket);
exports.Client = Client;
// When the socket closes remove it from the list of available sockets.
socket.on('close', function() {
var i = self.sockets.indexOf(socket);
if (i >= 0) self.sockets.splice(i, 1);
});
};
exports.createClient = function(port, host, https, credentials) {
var c = new Client();
c.port = port;
c.host = host;
c.https = https;
c.credentials = credentials;
// Sub-classes can overwrite this method with e.g. something that supplies
// TLS streams.
Agent.prototype._getConnection = function(host, port, cb) {
var c = net.createConnection(port, host);
c.on('connect', cb);
return c;
};
Client.prototype._initParser = function() {
var self = this;
if (!self.parser) self.parser = parsers.alloc();
self.parser.reinitialize('response');
self.parser.socket = self;
self.parser.onIncoming = function(res) {
debug('CLIENT incoming response!');
var req = self._outgoing[0];
// Responses to HEAD requests are AWFUL. Ask Ryan.
// A major oversight in HTTP. Hence this nastiness.
var isHeadResponse = req.method == 'HEAD';
debug('CLIENT isHeadResponse ' + isHeadResponse);
if (res.statusCode == 100) {
// restart the parser, as this is a continue message.
req.emit('continue');
return true;
}
// This method attempts to shuffle items along the queue into one of the
// waiting sockets. If a waiting socket cannot be found, it will
// start the process of establishing one.
Agent.prototype._cycle = function() {
var first = this.queue[0];
if (!first) return;
if (req.shouldKeepAlive && res.headers.connection === 'close') {
req.shouldKeepAlive = false;
// First try to find an available socket.
for (var i = 0; i < this.sockets.length; i++) {
var socket = this.sockets[i];
// If the socket doesn't already have a message it's sending out
// and the socket is available for writing...
if (!socket._httpMessage && (socket.writable && socket.readable)) {
// We found an available connection!
this.queue.shift(); // remove first from queue.
first.assignSocket(socket);
return;
}
res.addListener('end', function() {
debug('CLIENT request complete disconnecting. state = ' + self._state);
// For the moment we reconnect for every request. FIXME!
// All that should be required for keep-alive is to not reconnect,
// but outgoingFlush instead.
if (req.shouldKeepAlive) {
outgoingFlush(self);
self._outgoing.shift();
outgoingFlush(self);
} else {
self.end();
}
});
req.emit('response', res);
return isHeadResponse;
};
};
// This is called each time a request has been pushed completely to the
// socket. The message that was sent is still sitting at client._outgoing[0]
// it is our responsibility to shift it off.
//
// We have to be careful when it we shift it because once we do any writes
// to other requests will be flushed directly to the socket.
//
// At the moment we're implement a client which connects and disconnects on
// each request/response cycle so we cannot shift off the request from
// client._outgoing until we're completely disconnected after the response
// comes back.
Client.prototype._onOutgoingSent = function(message) {
// We've just finished a message. We don't end/shutdown the connection here
// because HTTP servers typically cannot handle half-closed connections
// (Node servers can).
//
// Instead, we just check if the connection is closed, and if so
// reconnect if we have pending messages.
if (this._outgoing.length) {
debug('CLIENT request flush. ensure connection. state = ' + this._state);
this._ensureConnection();
// Otherwise see if we should be starting a new connection to handle
// this.
if (this.sockets.length < this.maxSockets) {
this._establishNewConnection();
}
};
Client.prototype._ensureConnection = function() {
if (this._state == 'disconnected') {
debug('CLIENT reconnecting state = ' + this._state);
this.connect(this.port, this.host);
this._state = 'connecting';
}
// All sockets are filled and all sockets are busy.
};
Client.prototype.request = function(method, url, headers) {
if (typeof(url) != 'string') {
// assume method was omitted, shift arguments
headers = url;
url = method;
method = 'GET';
}
var req = new ClientRequest(this, method, url, headers);
this._outgoing.push(req);
this._ensureConnection();
return req;
};
// process-wide hash of agents.
// keys: "host:port" string
// values: instance of Agent
// That is, one agent remote host.
// TODO currently we never remove agents from this hash. This is a small
// memory leak. Have a 2 second timeout after a agent's sockets are to try
// to remove it?
var agents = {}
exports.cat = function(url, encoding_, headers_) {
var encoding = 'utf8',
headers = {},
callback = null;
function getAgent(host, port) {
var id = host + ':' + port;
var agent = agents[id];
// parse the arguments for the various options... very ugly
if (typeof(arguments[1]) == 'string') {
encoding = arguments[1];
if (typeof(arguments[2]) == 'object') {
headers = arguments[2];
if (typeof(arguments[3]) == 'function') callback = arguments[3];
} else {
if (typeof(arguments[2]) == 'function') callback = arguments[2];
}
} else {
// didn't specify encoding
if (typeof(arguments[1]) == 'object') {
headers = arguments[1];
callback = arguments[2];
} else {
callback = arguments[1];
if (!agent) {
agent = agents[id] = new Agent(host, port);
}
}
var url = require('url').parse(url);
var hasHost = false;
if (Array.isArray(headers)) {
for (var i = 0, l = headers.length; i < l; i++) {
if (headers[i][0].toLowerCase() === 'host') {
hasHost = true;
break;
return agent;
}
}
} else if (typeof headers === 'Object') {
var keys = Object.keys(headers);
for (var i = 0, l = keys.length; i < l; i++) {
var key = keys[i];
if (key.toLowerCase() == 'host') {
hasHost = true;
break;
}
}
}
if (!hasHost) headers['Host'] = url.hostname;
var content = '';
var client = exports.createClient(url.port || 80, url.hostname);
var req = client.request((url.pathname || '/') +
(url.search || '') +
(url.hash || ''),
headers);
exports.request = function(options, cb) {
var agent = getAgent(options.host, options.port);
var req = agent.appendMessage(options);
if (url.protocol == 'https:') {
client.https = true;
}
var callbackSent = false;
req.addListener('response', function(res) {
if (res.statusCode < 200 || res.statusCode >= 300) {
if (callback && !callbackSent) {
callback(res.statusCode);
callbackSent = true;
}
client.end();
return;
}
res.setEncoding(encoding);
res.addListener('data', function(chunk) { content += chunk; });
res.addListener('end', function() {
if (callback && !callbackSent) {
callback(null, content);
callbackSent = true;
}
});
if (cb) {
req.once('response', function (res) {
cb(null, res);
});
client.addListener('error', function(err) {
if (callback && !callbackSent) {
callback(err);
callbackSent = true;
}
});
client.addListener('close', function() {
if (callback && !callbackSent) {
callback(new Error('Connection closed unexpectedly'));
callbackSent = true;
}
});
req.end();
return req;
};

Loading…
Cancel
Save