diff --git a/lib/http.js b/lib/http.js index 960d28aaa3..247e0dcb61 100644 --- a/lib/http.js +++ b/lib/http.js @@ -1,8 +1,10 @@ var util = require('util'); var net = require('net'); var stream = require('stream'); +var EventEmitter = require('events').EventEmitter; var FreeList = require('freelist').FreeList; var HTTPParser = process.binding('http_parser').HTTPParser; +var assert = process.assert; var debug; @@ -284,13 +286,9 @@ IncomingMessage.prototype._addHeaderLine = function(field, value) { }; -function OutgoingMessage(socket) { +function OutgoingMessage() { stream.Stream.call(this); - // TODO Remove one of these eventually. - this.socket = socket; - this.connection = socket; - this.output = []; this.outputEncodings = []; @@ -312,6 +310,15 @@ util.inherits(OutgoingMessage, stream.Stream); exports.OutgoingMessage = OutgoingMessage; +OutgoingMessage.prototype.assignSocket = function(socket) { + assert(!socket._httpMessage); + socket._httpMessage = this; + this.socket = socket; + this.connection = socket; + this._flush(); +}; + + OutgoingMessage.prototype.destroy = function(error) { this.socket.destroy(error); }; @@ -336,7 +343,7 @@ OutgoingMessage.prototype._send = function(data, encoding) { OutgoingMessage.prototype._writeRaw = function(data, encoding) { - if (this.connection._outgoing[0] === this && this.connection.writable) { + if (this.connection._httpMessage === this && this.connection.writable) { // There might be pending data in the this.output buffer. while (this.output.length) { if (!this.connection.writable) { @@ -550,7 +557,7 @@ OutgoingMessage.prototype.end = function(data, encoding) { data.length > 0 && this.output.length === 0 && this.connection.writable && - this.connection._outgoing[0] === this; + this.connection._httpMessage === this; if (hot) { // Hot path. They're doing @@ -585,17 +592,69 @@ OutgoingMessage.prototype.end = function(data, encoding) { // There is the first message on the outgoing queue, and we've sent // everything to the socket. - if (this.output.length === 0 && this.connection._outgoing[0] === this) { - debug('outgoing message end. shifting because was flushed'); - this.connection._onOutgoingSent(); + if (this.output.length === 0 && this.connection._httpMessage === this) { + debug('outgoing message end.'); + this._finish(); } return ret; }; +OutgoingMessage.prototype._finish = function() { + this.socket._httpMessage = null; + this.socket = this.connection = null; + this.emit('finish'); +}; + + +OutgoingMessage.prototype._flush = function() { + // This logic is probably a bit confusing. Let me explain a bit: + // + // In both HTTP servers and clients it is possible to queue up several + // outgoing messages. This is easiest to imagine in the case of a client. + // Take the following situation: + // + // req1 = client.request('GET', '/'); + // req2 = client.request('POST', '/'); + // + // When the user does + // + // req2.write('hello world\n'); + // + // it's possible that the first request has not been completely flushed to + // the socket yet. Thus the outgoing messages need to be prepared to queue + // up data internally before sending it on further to the socket's queue. + // + // This function, outgoingFlush(), is called by both the Server and Client + // to attempt to flush any pending messages out to the socket. + + if (!this.socket) return; + + var ret; + + while (this.output.length) { + if (!this.socket.writable) return; // XXX Necessary? + + var data = this.output.shift(); + var encoding = this.outputEncodings.shift(); + + ret = this.socket.write(data, encoding); + } + + if (this.finished) { + // This is a queue to the server or client to bring in the next this. + this._finish(); + } else if (ret) { + this.emit('drain'); + } +}; + + + + function ServerResponse(req) { - OutgoingMessage.call(this, req.socket); + OutgoingMessage.call(this); if (req.method === 'HEAD') this._hasBody = false; @@ -666,19 +725,30 @@ ServerResponse.prototype.writeHeader = function() { }; -function ClientRequest(socket, method, url, headers) { - OutgoingMessage.call(this, socket); +function ClientRequest(options) { + OutgoingMessage.call(this); + + var method = this.method = (options.method || 'GET').toUpperCase(); + var path = options.path || '/'; + var headers = options.headers; + + // Host header set by default. + if (options.host && !(headers.host || headers.Host || headers.HOST)) { + headers.Host = options.host; + } - this.method = method = method.toUpperCase(); this.shouldKeepAlive = false; if (method === 'GET' || method === 'HEAD') { this.useChunkedEncodingByDefault = false; } else { this.useChunkedEncodingByDefault = true; } + + // By default keep-alive is off. This is the last message unless otherwise + // specified. this._last = true; - this._storeHeader(method + ' ' + url + ' HTTP/1.1\r\n', headers); + this._storeHeader(method + ' ' + path + ' HTTP/1.1\r\n', headers); } util.inherits(ClientRequest, OutgoingMessage); @@ -686,58 +756,12 @@ util.inherits(ClientRequest, OutgoingMessage); exports.ClientRequest = ClientRequest; -function outgoingFlush(socket) { - // This logic is probably a bit confusing. Let me explain a bit: - // - // In both HTTP servers and clients it is possible to queue up several - // outgoing messages. This is easiest to imagine in the case of a client. - // Take the following situation: - // - // req1 = client.request('GET', '/'); - // req2 = client.request('POST', '/'); - // - // When the user does - // - // req2.write('hello world\n'); - // - // it's possible that the first request has not been completely flushed to - // the socket yet. Thus the outgoing messages need to be prepared to queue - // up data internally before sending it on further to the socket's queue. - // - // This function, outgoingFlush(), is called by both the Server and Client - // to attempt to flush any pending messages out to the socket. - var message = socket._outgoing[0]; - - if (!message) return; - - var ret; - - while (message.output.length) { - if (!socket.writable) return; // XXX Necessary? - - var data = message.output.shift(); - var encoding = message.outputEncodings.shift(); - - ret = socket.write(data, encoding); - } - - if (message.finished) { - socket._onOutgoingSent(); - } else if (ret) { - message.emit('drain'); - } -} - - function httpSocketSetup(socket) { - // An array of outgoing messages for the socket. In pipelined connections - // we need to keep track of the order they were sent. - socket._outgoing = []; - // NOTE: be sure not to use ondrain elsewhere in this file! socket.ondrain = function() { - var message = socket._outgoing[0]; - if (message) message.emit('drain'); + if (socket._httpMessage) { + socket._httpMessage.emit('drain'); + } }; } @@ -765,6 +789,7 @@ exports.createServer = function(requestListener) { function connectionListener(socket) { var self = this; + var outgoing = []; debug('SERVER new http connection'); @@ -811,9 +836,10 @@ function connectionListener(socket) { socket.onend = function() { parser.finish(); - if (socket._outgoing.length) { - socket._outgoing[socket._outgoing.length - 1]._last = true; - outgoingFlush(socket); + if (outgoing.length) { + outgoing[outgoing.length - 1]._last = true; + } else if (socket._httpMessage) { + socket._httpMessage._last = true; } else { socket.end(); } @@ -824,21 +850,6 @@ function connectionListener(socket) { parsers.free(parser); }); - // At the end of each response message, after it has been flushed to the - // socket. Here we insert logic about what to do next. - socket._onOutgoingSent = function(message) { - var message = socket._outgoing.shift(); - if (message._last) { - // No more messages to be pushed out. - - socket.destroySoon(); - - } else if (socket._outgoing.length) { - // Push out the next message. - outgoingFlush(socket); - } - }; - // The following callback is issued after the headers have been read on a // new message. In this callback we setup the response object and pass it // to the user. @@ -846,7 +857,27 @@ function connectionListener(socket) { var res = new ServerResponse(req); debug('server response shouldKeepAlive: ' + shouldKeepAlive); res.shouldKeepAlive = shouldKeepAlive; - socket._outgoing.push(res); + + if (socket._httpMessage) { + // There are already pending outgoing res, append. + outgoing.push(res); + } else { + res.assignSocket(socket); + } + + // When we're finished writing the response, check if this is the last + // respose, if so destroy the socket. + res.on('finish', function() { + if (res._last) { + socket.destroySoon(); + } else { + // start sending the next message + var m = outgoing.shift(); + if (m) { + m.assignSocket(socket); + } + } + }); if ('expect' in req.headers && (req.httpVersionMajor == 1 && req.httpVersionMinor == 1) && @@ -867,280 +898,123 @@ function connectionListener(socket) { exports._connectionListener = connectionListener; -function Client() { - if (!(this instanceof Client)) return new Client(); - net.Stream.call(this, { allowHalfOpen: true }); - var self = this; - - // Possible states: - // - disconnected - // - connecting - // - connected - this._state = 'disconnected'; - - httpSocketSetup(self); - - function onData(d, start, end) { - if (!self.parser) { - throw new Error('parser not initialized prior to Client.ondata call'); - } - var ret = self.parser.execute(d, start, end - start); - if (ret instanceof Error) { - self.destroy(ret); - } else if (self.parser.incoming && self.parser.incoming.upgrade) { - var bytesParsed = ret; - self.ondata = null; - self.onend = null; - - var req = self.parser.incoming; +function Agent(host, port) { + this.host = host; + this.port = port; - var upgradeHead = d.slice(start + bytesParsed + 1, end); - - if (self.listeners('upgrade').length) { - self.emit('upgrade', req, self, upgradeHead); - } else { - self.destroy(); - } - } - }; + this.queue = []; + this.sockets = []; + this.maxSockets = 5; +} +util.inherits(Agent, EventEmitter); - self.addListener('connect', function() { - debug('CLIENT connected'); - self.ondata = onData; - self.onend = onEnd; +Agent.prototype.appendMessage = function(options) { + var self = this; - self._state = 'connected'; + var req = new ClientRequest(options); + this.queue.push(req); - self._initParser(); - outgoingFlush(self); + req.on('finish', function () { + self._cycle(); }); - function onEnd() { - if (self.parser) self.parser.finish(); - debug('CLIENT got end closing. state = ' + self._state); - self.end(); - }; - - self.addListener('close', function(e) { - self._state = 'disconnected'; - if (e) return; + this._cycle(); +}; - debug('CLIENT onClose. state = ' + self._state); - // finally done with the request - self._outgoing.shift(); +Agent.prototype._establishNewConnection = function(socket, message) { + var self = this; + assert(this.sockets.length < this.maxSockets); - // If there are more requests to handle, reconnect. - if (self._outgoing.length) { - self._ensureConnection(); - } else if (self.parser) { - parsers.free(self.parser); - self.parser = null; - } + // Grab a new "socket". Depending on the implementation of _getConnection + // this could either be a raw TCP socket or a TLS stream. + var socket = this._getConnection(this.host, this.port, function () { + self._cycle(); }); -} -util.inherits(Client, net.Stream); + this.sockets.push(socket); -exports.Client = Client; + // When the socket closes remove it from the list of available sockets. + socket.on('close', function() { + var i = self.sockets.indexOf(socket); + if (i >= 0) self.sockets.splice(i, 1); + }); +}; -exports.createClient = function(port, host, https, credentials) { - var c = new Client(); - c.port = port; - c.host = host; - c.https = https; - c.credentials = credentials; +// Sub-classes can overwrite this method with e.g. something that supplies +// TLS streams. +Agent.prototype._getConnection = function(host, port, cb) { + var c = net.createConnection(port, host); + c.on('connect', cb); return c; }; -Client.prototype._initParser = function() { - var self = this; - if (!self.parser) self.parser = parsers.alloc(); - self.parser.reinitialize('response'); - self.parser.socket = self; - self.parser.onIncoming = function(res) { - debug('CLIENT incoming response!'); - - var req = self._outgoing[0]; - - // Responses to HEAD requests are AWFUL. Ask Ryan. - // A major oversight in HTTP. Hence this nastiness. - var isHeadResponse = req.method == 'HEAD'; - debug('CLIENT isHeadResponse ' + isHeadResponse); - - if (res.statusCode == 100) { - // restart the parser, as this is a continue message. - req.emit('continue'); - return true; - } - - if (req.shouldKeepAlive && res.headers.connection === 'close') { - req.shouldKeepAlive = false; +// This method attempts to shuffle items along the queue into one of the +// waiting sockets. If a waiting socket cannot be found, it will +// start the process of establishing one. +Agent.prototype._cycle = function() { + var first = this.queue[0]; + if (!first) return; + + // First try to find an available socket. + for (var i = 0; i < this.sockets.length; i++) { + var socket = this.sockets[i]; + // If the socket doesn't already have a message it's sending out + // and the socket is available for writing... + if (!socket._httpMessage && (socket.writable && socket.readable)) { + // We found an available connection! + this.queue.shift(); // remove first from queue. + first.assignSocket(socket); + return; } - - res.addListener('end', function() { - debug('CLIENT request complete disconnecting. state = ' + self._state); - // For the moment we reconnect for every request. FIXME! - // All that should be required for keep-alive is to not reconnect, - // but outgoingFlush instead. - if (req.shouldKeepAlive) { - outgoingFlush(self); - self._outgoing.shift(); - outgoingFlush(self); - } else { - self.end(); - } - }); - - req.emit('response', res); - - return isHeadResponse; - }; -}; - - -// This is called each time a request has been pushed completely to the -// socket. The message that was sent is still sitting at client._outgoing[0] -// it is our responsibility to shift it off. -// -// We have to be careful when it we shift it because once we do any writes -// to other requests will be flushed directly to the socket. -// -// At the moment we're implement a client which connects and disconnects on -// each request/response cycle so we cannot shift off the request from -// client._outgoing until we're completely disconnected after the response -// comes back. -Client.prototype._onOutgoingSent = function(message) { - // We've just finished a message. We don't end/shutdown the connection here - // because HTTP servers typically cannot handle half-closed connections - // (Node servers can). - // - // Instead, we just check if the connection is closed, and if so - // reconnect if we have pending messages. - if (this._outgoing.length) { - debug('CLIENT request flush. ensure connection. state = ' + this._state); - this._ensureConnection(); } -}; - -Client.prototype._ensureConnection = function() { - if (this._state == 'disconnected') { - debug('CLIENT reconnecting state = ' + this._state); - this.connect(this.port, this.host); - this._state = 'connecting'; + // Otherwise see if we should be starting a new connection to handle + // this. + if (this.sockets.length < this.maxSockets) { + this._establishNewConnection(); } -}; - -Client.prototype.request = function(method, url, headers) { - if (typeof(url) != 'string') { - // assume method was omitted, shift arguments - headers = url; - url = method; - method = 'GET'; - } - var req = new ClientRequest(this, method, url, headers); - this._outgoing.push(req); - this._ensureConnection(); - return req; + // All sockets are filled and all sockets are busy. }; -exports.cat = function(url, encoding_, headers_) { - var encoding = 'utf8', - headers = {}, - callback = null; +// process-wide hash of agents. +// keys: "host:port" string +// values: instance of Agent +// That is, one agent remote host. +// TODO currently we never remove agents from this hash. This is a small +// memory leak. Have a 2 second timeout after a agent's sockets are to try +// to remove it? +var agents = {} - // parse the arguments for the various options... very ugly - if (typeof(arguments[1]) == 'string') { - encoding = arguments[1]; - if (typeof(arguments[2]) == 'object') { - headers = arguments[2]; - if (typeof(arguments[3]) == 'function') callback = arguments[3]; - } else { - if (typeof(arguments[2]) == 'function') callback = arguments[2]; - } - } else { - // didn't specify encoding - if (typeof(arguments[1]) == 'object') { - headers = arguments[1]; - callback = arguments[2]; - } else { - callback = arguments[1]; - } - } - var url = require('url').parse(url); +function getAgent(host, port) { + var id = host + ':' + port; + var agent = agents[id]; - var hasHost = false; - if (Array.isArray(headers)) { - for (var i = 0, l = headers.length; i < l; i++) { - if (headers[i][0].toLowerCase() === 'host') { - hasHost = true; - break; - } - } - } else if (typeof headers === 'Object') { - var keys = Object.keys(headers); - for (var i = 0, l = keys.length; i < l; i++) { - var key = keys[i]; - if (key.toLowerCase() == 'host') { - hasHost = true; - break; - } - } + if (!agent) { + agent = agents[id] = new Agent(host, port); } - if (!hasHost) headers['Host'] = url.hostname; - - var content = ''; - var client = exports.createClient(url.port || 80, url.hostname); - var req = client.request((url.pathname || '/') + - (url.search || '') + - (url.hash || ''), - headers); + return agent; +} - if (url.protocol == 'https:') { - client.https = true; - } - var callbackSent = false; +exports.request = function(options, cb) { + var agent = getAgent(options.host, options.port); + var req = agent.appendMessage(options); - req.addListener('response', function(res) { - if (res.statusCode < 200 || res.statusCode >= 300) { - if (callback && !callbackSent) { - callback(res.statusCode); - callbackSent = true; - } - client.end(); - return; - } - res.setEncoding(encoding); - res.addListener('data', function(chunk) { content += chunk; }); - res.addListener('end', function() { - if (callback && !callbackSent) { - callback(null, content); - callbackSent = true; - } + if (cb) { + req.once('response', function (res) { + cb(null, res); }); - }); - - client.addListener('error', function(err) { - if (callback && !callbackSent) { - callback(err); - callbackSent = true; - } - }); + } - client.addListener('close', function() { - if (callback && !callbackSent) { - callback(new Error('Connection closed unexpectedly')); - callbackSent = true; - } - }); - req.end(); + return req; }; + +