From 2b929c7f1944bc9ac33fb21927f2cc96c111371b Mon Sep 17 00:00:00 2001 From: Mikeal Rogers Date: Tue, 26 Jul 2011 00:15:15 +0200 Subject: [PATCH] http: http2 implementation --- lib/http.js | 1028 ++++++++++++++++++++------------------------------ lib/https.js | 42 +-- 2 files changed, 406 insertions(+), 664 deletions(-) diff --git a/lib/http.js b/lib/http.js index e7e28e816c..55b43282e9 100644 --- a/lib/http.js +++ b/lib/http.js @@ -485,9 +485,9 @@ OutgoingMessage.prototype._storeHeader = function(firstLine, headers) { } // keep-alive logic - if (sentConnectionHeader == false) { + if (sentConnectionHeader === false) { if (this.shouldKeepAlive && - (sentContentLengthHeader || this.useChunkedEncodingByDefault)) { + (sentContentLengthHeader || this.useChunkedEncodingByDefault || this.agent)) { messageHeader += 'Connection: keep-alive\r\n'; } else { this._last = true; @@ -702,8 +702,8 @@ OutgoingMessage.prototype.end = function(data, encoding) { // There is the first message on the outgoing queue, and we've sent // everything to the socket. + debug('outgoing message end.'); if (this.output.length === 0 && this.connection._httpMessage === this) { - debug('outgoing message end.'); this._finish(); } @@ -747,8 +747,8 @@ OutgoingMessage.prototype._flush = function() { if (!this.socket) return; var ret; - while (this.output.length) { + if (!this.socket.writable) return; // XXX Necessary? var data = this.output.shift(); @@ -866,84 +866,405 @@ ServerResponse.prototype.writeHead = function(statusCode) { this._storeHeader(statusLine, headers); }; - ServerResponse.prototype.writeHeader = function() { this.writeHead.apply(this, arguments); }; -function ClientRequest(options, defaultPort) { - OutgoingMessage.call(this); +// New Agent code. - if (!defaultPort) defaultPort = 80; +// The largest departure from the previous implementation is that +// an Agent instance holds connections for a variable number of host:ports. +// Surprisingly, this is still API compatible as far as third parties are +// concerned. The only code that really notices the difference is the +// request object. - var method = this.method = (options.method || 'GET').toUpperCase(); - this.path = options.path || '/'; +// Another departure is that all code related to HTTP parsing is in +// ClientRequest.onSocket(). The Agent is now *strictly* +// concerned with managing a connection pool. - if (!Array.isArray(headers)) { +function Agent(options) { + var self = this; + self.options = options || {}; + self.requests = {}; + self.sockets = {}; + self.maxSockets = self.options.maxSockets || Agent.defaultMaxSockets; + self.on('free', function(socket, host, port) { + var name = host + ':' + port; + if (self.requests[name] && self.requests[name].length) { + self.requests[name].shift().onSocket(socket); + } else { + // If there are no pending requests just destroy the + // socket and it will get removed from the pool. This + // gets us out of timeout issues and allows us to + // default to Connection:keep-alive. + socket.destroy(); + } + }); + self.createConnection = net.createConnection; +} +util.inherits(Agent, EventEmitter); +exports.Agent = Agent; + +Agent.defaultMaxSockets = 5; + +Agent.prototype.defaultPort = 80; +Agent.prototype.addRequest = function(req, host, port) { + var name = host + ':' + port; + if (!this.sockets[name]) { + this.sockets[name] = []; + } + if (this.sockets[name].length < this.maxSockets) { + // If we are under maxSockets create a new one. + req.onSocket(this.createSocket(name, host, port)); + } else { + // We are over limit so we'll add it to the queue. + if (!this.requests[name]) { + this.requests[name] = []; + } + this.requests[name].push(req); + } +}; +Agent.prototype.createSocket = function(name, host, port) { + var self = this; + var s = self.createConnection(port, host); + if (!self.sockets[name]) { + self.sockets[name] = []; + } + this.sockets[name].push(s); + var onFree = function() { + self.emit('free', s, host, port); + } + s.on('free', onFree); + var onClose = function(err) { + // This is the only place where sockets get removed from the Agent. + // If you want to remove a socket from the pool, just close it. + // All socket errors end in a close event anyway. + self.removeSocket(s, name, host, port); + } + s.on('close', onClose); + var onRemove = function() { + // We need this function for cases like HTTP "upgrade" + // (defined by WebSockets) where we need to remove a socket from the pool + // because it'll be locked up indefinitely + self.removeSocket(s, name, host, port); + s.removeListener('close', onClose); + s.removeListener('free', onFree); + s.removeListener('agentRemove', onRemove); + } + s.on('agentRemove', onRemove); + return s; +}; +Agent.prototype.removeSocket = function(s, name, host, port) { + if (this.sockets[name] && this.sockets[name].indexOf(s) !== -1) { + this.sockets[name].shift(this.sockets[name].indexOf(s)); + } else if (this.sockets[name] && this.sockets[name].length === 0) { + // don't leak + delete this.sockets[name]; + delete this.requests[name]; + } + if (this.requests[name] && this.requests[name].length) { + // If we have pending requests and a socket gets closed a new one + // needs to be created to take over in the pool for the one that closed. + this.createSocket(name, host, port).emit('free'); + } +}; + +var globalAgent = new Agent(); +exports.globalAgent = globalAgent; + +function ClientRequest(options, cb) { + var self = this; + OutgoingMessage.call(self); + self.agent = options.agent; + options.defaultPort = options.defaultPort || 80; + + options.port = options.port || options.defaultPort; + options.host = options.host || 'localhost'; + + if (options.setHost === undefined) { + options.setHost = true; + } + + self.socketPath = options.socketPath; + + var method = self.method = (options.method || 'GET').toUpperCase(); + self.path = options.path || '/'; + if (cb) { + self.on('response', cb); + } + + if (!Array.isArray(options.headers)) { if (options.headers) { - var headers = options.headers; - var keys = Object.keys(headers); + var keys = Object.keys(options.headers); for (var i = 0, l = keys.length; i < l; i++) { var key = keys[i]; - this.setHeader(key, headers[key]); + self.setHeader(key, options.headers[key]); } } - // Host header set by default. - if (options.host && !this.getHeader('host')) { + if (options.host && !this.getHeader('host') && options.setHost) { var hostHeader = options.host; - if (options.port && +options.port !== defaultPort) { + if (options.port && +options.port !== options.defaultPort) { hostHeader += ':' + options.port; } this.setHeader("Host", hostHeader); } } - this.shouldKeepAlive = false; if (method === 'GET' || method === 'HEAD') { - this.useChunkedEncodingByDefault = false; + self.useChunkedEncodingByDefault = false; } else { - this.useChunkedEncodingByDefault = true; + self.useChunkedEncodingByDefault = true; } - // By default keep-alive is off. This is the last message unless otherwise - // specified. - this._last = true; - - if (Array.isArray(headers)) { - this._storeHeader(this.method + ' ' + this.path + ' HTTP/1.1\r\n', headers); - } else if (this.getHeader('expect')) { - this._storeHeader(this.method + ' ' + this.path + ' HTTP/1.1\r\n', this._renderHeaders()); + if (Array.isArray(options.headers)) { + self._storeHeader(self.method + ' ' + self.path + ' HTTP/1.1\r\n', options.headers); + } else if (self.getHeader('expect')) { + self._storeHeader(self.method + ' ' + self.path + ' HTTP/1.1\r\n', self._renderHeaders()); + } + if (self.socketPath) { + self._last = true; + self.shouldKeepAlive = false; + self.onSocket(net.createConnection(self.socketPath)); + } else if (self.agent) { + // If there is an agent we should default to Connection:keep-alive. + self._last = false; + self.shouldKeepAlive = true; + self.agent.addRequest(self, options.host, options.port); + } else { + // No agent should default to Connection:close. + self._last = true; + self.shouldKeepAlive = false; + self.onSocket(net.createConnection(options.port, options.host)); } + self._deferToConnect(null, null, function () { + self._flush(); + }) + } util.inherits(ClientRequest, OutgoingMessage); - exports.ClientRequest = ClientRequest; ClientRequest.prototype._implicitHeader = function() { this._storeHeader(this.method + ' ' + this.path + ' HTTP/1.1\r\n', this._renderHeaders()); -} +}; ClientRequest.prototype.abort = function() { - if (this._queue) { - // queued for dispatch - assert(!this.connection); - var i = this._queue.indexOf(this); - this._queue.splice(i, 1); - - } else if (this.connection) { + if (this.socket) { // in-progress - var c = this.connection; - this.detachSocket(c); - c.destroy(); + this.socket.destroy(); + } else { + // haven't been assigned a socket yet. + // this could be more efficient, it could + // remove itself from the pending requests + this._deferToConnect('destroy', []); + } +}; + +ClientRequest.prototype.onSocket = function(socket) { + var parser = parsers.alloc(); + var req = this; + + req.socket = socket; + req.connection = socket; + parser.reinitialize('response'); + parser.socket = socket; + parser.incoming = null; + req.parser = parser; + + socket._httpMessage = req; + // Setup "drain" propogation. + httpSocketSetup(socket); + + var errorListener = function(err) { + debug('HTTP SOCKET ERROR: ' + err.message + '\n' + err.stack); + req.emit('error', err); + // For Safety. Some additional errors might fire later on + // and we need to make sure we don't double-fire the error event. + req._hadError = true; + parser.finish(); + } + socket.on('error', errorListener); + + socket.ondata = function(d, start, end) { + var ret = parser.execute(d, start, end - start); + if (ret instanceof Error) { + debug('parse error'); + socket.destroy(ret); + } else if (parser.incoming && parser.incoming.upgrade) { + var bytesParsed = ret; + socket.ondata = null; + socket.onend = null; + + var res = parser.incoming; + req.res = res; + + // This is start + byteParsed + 1 due to the error of getting \n + // in the upgradeHead from the closing lines of the headers + var upgradeHead = d.slice(start + bytesParsed + 1, end); + if (req.listeners('upgrade').length) { + // Emit 'upgrade' on the Agent. + req.upgraded = true; + req.emit('upgrade', res, socket, upgradeHead); + socket.emit('agentRemove'); + } else { + // Got upgrade header, but have no handler. + socket.destroy(); + } + } + }; + + socket.onend = function() { + if (!req.res) { + // If we don't have a response then we know that the socket + // ended prematurely and we need to emit an error on the request. + req.emit('error', new Error("Request ended prematurely.")); + req._hadError = true; + } + parser.finish(); + parsers.free(parser); // I don't know if this is necessary --Mikeal + socket.destroy(); + }; + + var closeListener = function() { + debug('HTTP socket close'); + req.emit('close'); + if (req.res && req.res.readable) { + // Socket closed before we emitted "end" below. + req.res.emit('aborted'); + req.res.emit('end'); + req.res.emit('close'); + } else if (!req.res && !req._hadError) { + // This socket error fired before we started to + // receive a response. The error needs to + // fire on the request. + req.emit('error', new Error('socket hang up')); + } + } + socket.on('close', closeListener); + + parser.onIncoming = function(res, shouldKeepAlive) { + debug('AGENT incoming response!'); + + if (req.res) { + // We already have a response object, this means the server + // sent a double response. + socket.destroy(); + return; + } + req.res = res; + + // Responses to HEAD requests are crazy. + // HEAD responses aren't allowed to have an entity-body + // but *can* have a content-length which actually corresponds + // to the content-length of the entity-body had the request + // been a GET. + var isHeadResponse = req.method == 'HEAD'; + debug('AGENT isHeadResponse ' + isHeadResponse); + + if (res.statusCode == 100) { + // restart the parser, as this is a continue message. + delete req.res; // Clear res so that we don't hit double-responses. + req.emit('continue'); + return true; + } + if (req.shouldKeepAlive && res.headers.connection !== 'keep-alive' && !req.upgraded) { + // Server MUST respond with Connection:keep-alive for us to enable it. + // If we've been upgraded (via WebSockets) we also shouldn't try to + // keep the connection open. + req.shouldKeepAlive = false; + } + + res.addListener('end', function() { + if (!req.shouldKeepAlive) { + if (socket.writable) { + debug('AGENT socket.destroySoon()'); + socket.destroySoon(); + } + assert(!socket.writable); + } else { + debug('AGENT socket keep-alive'); + } + }); + + DTRACE_HTTP_CLIENT_RESPONSE(socket, req); + req.emit('response', res); + + res.on('end', function() { + if (req.shouldKeepAlive) { + socket.removeListener('close', closeListener); + socket.removeListener('error', errorListener); + socket.emit('free'); + } + }); + + return isHeadResponse; + }; + process.nextTick(function() { + req.emit('socket', socket); + }); +}; +ClientRequest.prototype._deferToConnect = function(method, arguments, cb) { + // This function is for calls that need to happen once the socket is + // connected and writable. It's an important promisy thing for all the socket + // calls that happen either now (when a socket is assigned) or + // in the future (when a socket gets assigned out of the pool and is + // eventually writable). + var self = this; + var onSocket = function() { + if (self.socket.writable) { + if (method) { + self.socket[method].apply(self.socket, arguments); + } + if (cb) { cb(); } + } else { + self.socket.on('connect', function() { + if (method) { + self.socket[method].apply(self.socket, arguments); + } + if (cb) { cb(); } + }); + } + } + if (!self.socket) { + self.once('socket', onSocket); } else { - // already complete. + onSocket(); + } +}; +ClientRequest.prototype.setTimeout = function() { + this._deferToConnect('setTimeout', arguments); +}; +ClientRequest.prototype.setNoDelay = function() { + this._deferToConnect('setNoDelay', arguments); +}; +ClientRequest.prototype.setSocketKeepAlive = function() { + this._deferToConnect('setKeepAlive', arguments); +}; +ClientRequest.prototype.pause = function() { + var self = this; + self._deferToConnect(null, null, function() { + OutgoingMessage.prototype.pause.apply(self, []); + }); +}; + + +exports.request = function(options, cb) { + if (options.agent === undefined) { + options.agent = globalAgent; } + return new ClientRequest(options, cb); }; +exports.get = function(options, cb) { + options.method = 'GET'; + var req = exports.request(options, cb); + req.end(); + return req; +}; function httpSocketSetup(socket) { // NOTE: be sure not to use ondrain elsewhere in this file! @@ -1126,588 +1447,52 @@ function connectionListener(socket) { } exports._connectionListener = connectionListener; +// Legacy Interface -function Agent(options) { - this.options = options; - this.host = options.host; - this.port = options.port || this.defaultPort; - this.socketPath = options.socketPath; - - this.queue = []; - this.sockets = []; - this.maxSockets = Agent.defaultMaxSockets; - +function Client(port, host) { + host = host || 'localhost'; + port = port || 80; + this.host = host; + this.port = port; } -util.inherits(Agent, EventEmitter); -exports.Agent = Agent; - - -Agent.defaultMaxSockets = 5; - -Agent.prototype.defaultPort = 80; -Agent.prototype.appendMessage = function(options) { - var self = this; - - var req = new ClientRequest(options, this.defaultPort); - this.queue.push(req); - req._queue = this.queue; - - this._cycle(); - - return req; -}; - - -Agent.prototype._removeSocket = function(socket) { - var i = this.sockets.indexOf(socket); - if (i >= 0) this.sockets.splice(i, 1); -}; - - -Agent.prototype._establishNewConnection = function() { +util.inherits(Client, EventEmitter); +Client.prototype.request = function(method, path, headers) { var self = this; - assert(this.sockets.length < this.maxSockets); - - // Grab a new "socket". Depending on the implementation of _getConnection - // this could either be a raw TCP socket or a TLS stream. - var socket = this._getConnection(self, function() { - socket._httpConnecting = false; - self.emit('connect'); // mostly for the shim. - debug('Agent _getConnection callback'); - self._cycle(); - }); - - // Use this special mark so that we know if the socket is connecting. - // TODO: come up with a standard way of specifying that a stream is being - // connected across tls and net. - socket._httpConnecting = true; - - this.sockets.push(socket); - - // Cycle so the request can be assigned to this new socket. - self._cycle(); - assert(socket._httpMessage); - - // Add a parser to the socket. - var parser = parsers.alloc(); - parser.reinitialize('response'); - parser.socket = socket; - parser.incoming = null; - - socket.on('error', function(err) { - debug('AGENT SOCKET ERROR: ' + err.message + '\n' + err.stack); - var req; - if (socket._httpMessage) { - req = socket._httpMessage; - } else if (self.queue.length) { - req = self.queue.shift(); - assert(req._queue === self.queue); - req._queue = null; - } - - if (req) { - req.emit('error', err); - req._hadError = true; // hacky - } - - // clean up so that agent can handle new requests - parser.finish(); - socket.destroy(); - self._removeSocket(socket); - self._cycle(); - }); - - socket.ondata = function(d, start, end) { - var ret = parser.execute(d, start, end - start); - if (ret instanceof Error) { - debug('parse error'); - socket.destroy(ret); - } else if (parser.incoming && parser.incoming.upgrade) { - var bytesParsed = ret; - socket.ondata = null; - socket.onend = null; - - var res = parser.incoming; - assert(socket._httpMessage); - socket._httpMessage.res = res; - - // This is start + byteParsed + 1 due to the error of getting \n - // in the upgradeHead from the closing lines of the headers - var upgradeHead = d.slice(start + bytesParsed + 1, end); - - // Make sure we don't try to send HTTP requests to it. - self._removeSocket(socket); - - socket.on('end', function() { - self.emit('end'); - }); - - // XXX free the parser? - - if (self.listeners('upgrade').length) { - // Emit 'upgrade' on the Agent. - self.emit('upgrade', res, socket, upgradeHead); - } else { - // Got upgrade header, but have no handler. - socket.destroy(); - } - } - }; - - socket.onend = function() { - self.emit('end'); // mostly for the shim. - parser.finish(); - socket.destroy(); - }; - - // When the socket closes remove it from the list of available sockets. - socket.on('close', function() { - debug('AGENT socket close'); - // This is really hacky: What if someone issues a request, the server - // accepts, but then terminates the connection. There is no parse error, - // there is no socket-level error. How does the user get informed? - // We check to see if the socket has a request, if so if it has a - // response (meaning that it emitted a 'response' event). If the socket - // has a request but no response and it never emitted an error event: - // THEN we need to trigger it manually. - // There must be a better way to do this. - if (socket._httpMessage) { - if (socket._httpMessage.res) { - socket._httpMessage.res.emit('aborted'); - socket._httpMessage.res.emit('close'); - } else { - if (!socket._httpMessage._hadError) { - socket._httpMessage.emit('error', new Error('socket hang up')); - } - } - } - - self._removeSocket(socket); - // unref the parser for easy gc - parsers.free(parser); - - self._cycle(); + var options = {}; + options.host = self.host; + options.port = self.port; + if (method[0] === '/') { + headers = path; + path = method; + method = 'GET'; + } + options.method = method; + options.path = path; + options.headers = headers; + var c = new ClientRequest(options); + c.on('error', function(e) { + self.emit('error', e); }); - - parser.onIncoming = function(res, shouldKeepAlive) { - debug('AGENT incoming response!'); - - // If we're receiving a message but we don't have a corresponding - // request - then somehow the server is seriously messed up and sending - // multiple responses at us. In this case we'll just bail. - if (!socket._httpMessage) { - socket.destroy(); - return; - } - - var req = socket._httpMessage; - req.res = res; - - // Responses to HEAD requests are AWFUL. Ask Ryan. - // A major oversight in HTTP. Hence this nastiness. - var isHeadResponse = req.method == 'HEAD'; - debug('AGENT isHeadResponse ' + isHeadResponse); - - if (res.statusCode == 100) { - // restart the parser, as this is a continue message. - req.emit('continue'); - return true; - } - - if (req.shouldKeepAlive && res.headers.connection === 'close') { - req.shouldKeepAlive = false; - } - - res.addListener('end', function() { - debug('AGENT request complete'); - // For the moment we reconnect for every request. FIXME! - // All that should be required for keep-alive is to not reconnect, - // but outgoingFlush instead. - if (!req.shouldKeepAlive) { - if (socket.writable) { - debug('AGENT socket.destroySoon()'); - socket.destroySoon(); - } - assert(!socket.writable); - } else { - debug('AGENT socket keep-alive'); - } - - // The socket may already be detached and destroyed by an abort call - if (socket._httpMessage) { - req.detachSocket(socket); - } - - assert(!socket._httpMessage); - - self._cycle(); + // The old Client interface emitted "end" on socket end. + // This doesn't map to how we want things to operate in the future + // but it will get removed when we remove this legacy interface. + c.on('socket', function(s) { + s.on('end', function() { + self.emit('end'); }); - - DTRACE_HTTP_CLIENT_RESPONSE(socket, self); - req.emit('response', res); - - return isHeadResponse; - }; -}; - - -// Sub-classes can overwrite this method with e.g. something that supplies -// TLS streams. -Agent.prototype._getConnection = function(options, cb) { - debug('Agent connected!'); - - var c; - - if (options.host) { - c = net.createConnection(options.port, options.host); - } else if (options.socketPath) { - c = net.createConnection(options.socketPath); - } else { - c = net.createConnection(options.port); - } - c.on('connect', cb); + }); return c; }; - -// This method attempts to shuffle items along the queue into one of the -// waiting sockets. If a waiting socket cannot be found, it will -// start the process of establishing one. -Agent.prototype._cycle = function() { - debug('Agent _cycle sockets=' + this.sockets.length + ' queue=' + this.queue.length); - var self = this; - - var first = this.queue[0]; - if (!first) return; - - // First try to find an available socket. - for (var i = 0; i < this.sockets.length; i++) { - var socket = this.sockets[i]; - // If the socket doesn't already have a message it's sending out - // and the socket is available for writing or it's connecting. - // In particular this rules out sockets that are closing. - if (!socket._httpMessage && - ((socket.writable && socket.readable) || socket._httpConnecting)) { - debug('Agent found socket, shift'); - // We found an available connection! - this.queue.shift(); // remove first from queue. - assert(first._queue === this.queue); - first._queue = null; - - first.assignSocket(socket); - httpSocketSetup(socket); - self._cycle(); // try to dispatch another - return; - } - } - - // If no sockets are connecting, and we have space for another we should - // be starting a new connection to handle this request. - if (this.sockets.length < this.maxSockets) { - this._establishNewConnection(); - } - - // All sockets are filled and all sockets are busy. -}; - - -// process-wide hash of agents. -// keys: "host:port" string -// values: instance of Agent -// That is, one agent remote host. -// TODO currently we never remove agents from this hash. This is a small -// memory leak. Have a 2 second timeout after a agent's sockets are to try -// to remove it? -var agents = {}; - -// Backwards compatible with legacy getAgent(host, port); -function getAgent(options) { - var agent; - var host; - var id; - var port; - - var _opts = {}; - - if (typeof options === 'string') { - port = arguments[1] || 80; - id = options + ':' + port; - _opts.host = options; - _opts.port = port; - } else if (options && typeof options === 'object') { - if (options.port || options.host) { - host = options.host || 'localhost'; - port = options.port || 80; - id = host + ':' + port; - _opts.host = host; - _opts.port = port; - } else if (options.socketPath) { - id = options.socketPath; - _opts.socketPath = options.socketPath; - } else { - throw new TypeError('Invalid options specification to getAgent'); - } - } else { - throw new TypeError('Invalid argument to getAgent'); - } - - agent = agents[id]; - - if (!agent) { - agent = agents[id] = new Agent(_opts); - } - - return agent; -} -exports.getAgent = getAgent; - - -exports._requestFromAgent = function(options, cb) { - var req = options.agent.appendMessage(options); - req.agent = options.agent; - if (cb) req.once('response', cb); - return req; -}; - - -exports.request = function(options, cb) { - if (options.agent === undefined) { - options.agent = getAgent(options); - } else if (options.agent === false) { - options.agent = new Agent(options); - } - return exports._requestFromAgent(options, cb); -}; - - -exports.get = function(options, cb) { - options.method = 'GET'; - var req = exports.request(options, cb); - req.end(); - return req; -}; - - - -// Legacy Interface: - - - -function Client() { - if (!(this instanceof Client)) return new Client(); - net.Stream.call(this, { allowHalfOpen: true }); - var self = this; - - // Possible states: - // - disconnected - // - connecting - // - connected - this._state = 'disconnected'; - - httpSocketSetup(self); - this._outgoing = []; - - function onData(d, start, end) { - if (!self.parser) { - throw new Error('parser not initialized prior to Client.ondata call'); - } - var ret = self.parser.execute(d, start, end - start); - if (ret instanceof Error) { - self.destroy(ret); - } else if (self.parser.incoming && self.parser.incoming.upgrade) { - var bytesParsed = ret; - self.ondata = null; - self.onend = null; - self._state = 'upgraded'; - - var res = self.parser.incoming; - - if (self._httpMessage) { - self._httpMessage.detachSocket(self); - } - - var upgradeHead = d.slice(start + bytesParsed + 1, end); - - if (self.listeners('upgrade').length) { - self.emit('upgrade', res, self, upgradeHead); - } else { - self.destroy(); - } - } - }; - - self.addListener('connect', function() { - debug('CLIENT connected'); - - self.ondata = onData; - self.onend = onEnd; - - self._state = 'connected'; - - self._initParser(); - - self._cycle(); - }); - - function onEnd() { - if (self.parser) self.parser.finish(); - debug('CLIENT got end closing. state = ' + self._state); - self.end(); - }; - - self.addListener('close', function(e) { - self._state = 'disconnected'; - self._upgraded = false; - - // Free the parser. - if (self.parser) { - parsers.free(self.parser); - self.parser = null; - } - - if (e) return; - - // If we have an http message, then drop it - var req = self._httpMessage; - if (req && !req.res) { - req.detachSocket(self); - self.emit('error', new Error('socket hang up')); - } - - debug('CLIENT onClose. state = ' + self._state); - self._cycle(); - }); -} -util.inherits(Client, net.Stream); - - exports.Client = Client; - - exports.createClient = function(port, host) { - var c = new Client(); - c.port = port; - c.host = host; - return c; + return new Client(port, host); }; - -Client.prototype._initParser = function() { - var self = this; - if (!self.parser) self.parser = parsers.alloc(); - self.parser.reinitialize('response'); - self.parser.socket = self; - self.parser.onIncoming = function(res) { - debug('CLIENT incoming response!'); - - assert(self._httpMessage); - var req = self._httpMessage; - - req.res = res; - - // Responses to HEAD requests are AWFUL. Ask Ryan. - // A major oversight in HTTP. Hence this nastiness. - var isHeadResponse = req.method == 'HEAD'; - debug('CLIENT isHeadResponse ' + isHeadResponse); - - if (res.statusCode == 100) { - // restart the parser, as this is a continue message. - req.emit('continue'); - return true; - } - - if (req.shouldKeepAlive && res.headers.connection === 'close') { - req.shouldKeepAlive = false; - } - - res.addListener('end', function() { - debug('CLIENT response complete disconnecting. state = ' + self._state); - - if (!req.shouldKeepAlive) { - self.end(); - } - - req.detachSocket(self); - assert(!self._httpMessage); - self._cycle(); - }); - - DTRACE_HTTP_CLIENT_RESPONSE(self, self); - req.emit('response', res); - - return isHeadResponse; - }; -}; - - -Client.prototype._cycle = function() { - debug("Client _cycle"); - if (this._upgraded) return; - - switch (this._state) { - case 'connecting': - break; - - case 'connected': - if (this.writable && this.readable) { - debug("Client _cycle shift()"); - if (this._httpMessage) { - this._httpMessage._flush(); - } else { - var req = this._outgoing.shift(); - if (req) { - req.assignSocket(this); - } - } - } - break; - - case 'disconnected': - if (this._httpMessage || this._outgoing.length) { - this._ensureConnection(); - } - break; - } -}; - - -Client.prototype._ensureConnection = function() { - if (this._state == 'disconnected') { - debug('CLIENT reconnecting state = ' + this._state); - this.connect(this.port, this.host); - this._state = 'connecting'; - } -}; - - -Client.prototype.request = function(method, url, headers) { - if (typeof(url) != 'string') { - // assume method was omitted, shift arguments - headers = url; - url = method; - method = 'GET'; - } - - var self = this; - - var options = { - method: method || 'GET', - path: url || '/', - headers: headers - }; - - var req = new ClientRequest(options); - this._outgoing.push(req); - this._cycle(); - - return req; -}; - - exports.cat = function(url, encoding_, headers_) { - var encoding = 'utf8', - headers = {}, - callback = null; + var encoding = 'utf8'; + var headers = {}; + var callback = null; console.error("http.cat will be removed in the near future. use http.get"); @@ -1754,19 +1539,9 @@ exports.cat = function(url, encoding_, headers_) { var content = ''; - var client = exports.createClient(url.port || 80, url.hostname); - var req = client.request((url.pathname || '/') + - (url.search || '') + - (url.hash || ''), - headers); - - if (url.protocol == 'https:') { - client.https = true; - } - + var path = (url.pathname || '/') + (url.search || '') + (url.hash || ''); var callbackSent = false; - - req.addListener('response', function(res) { + var req = exports.request({port: url.port || 80, host: url.hostname, path: path}, function(res) { if (res.statusCode < 200 || res.statusCode >= 300) { if (callback && !callbackSent) { callback(res.statusCode); @@ -1785,18 +1560,13 @@ exports.cat = function(url, encoding_, headers_) { }); }); - client.addListener('error', function(err) { + + req.addListener('error', function(err) { if (callback && !callbackSent) { callback(err); callbackSent = true; } }); - client.addListener('close', function() { - if (callback && !callbackSent) { - callback(new Error('Connection closed unexpectedly')); - callbackSent = true; - } - }); req.end(); }; diff --git a/lib/https.js b/lib/https.js index 1036ea3d78..b095ae7cad 100644 --- a/lib/https.js +++ b/lib/https.js @@ -52,56 +52,28 @@ exports.createServer = function(opts, requestListener) { // HTTPS agents. -var agents = {}; function Agent(options) { http.Agent.call(this, options); + this.createConnection = function(port, host) { + return tls.connect(port, host, options); + }; } inherits(Agent, http.Agent); - - Agent.prototype.defaultPort = 443; +var globalAgent = new Agent(); -Agent.prototype._getConnection = function(options, cb) { - if (NPN_ENABLED && !this.options.NPNProtocols) { - this.options.NPNProtocols = ['http/1.1', 'http/1.0']; - } - - var s = tls.connect(options.port, options.host, this.options, function() { - // do other checks here? - if (cb) cb(); - }); - - return s; -}; - - -function getAgent(options) { - if (!options.port) options.port = 443; - - var id = options.host + ':' + options.port; - var agent = agents[id]; - - if (!agent) { - agent = agents[id] = new Agent(options); - } - - return agent; -} -exports.getAgent = getAgent; +exports.globalAgent = globalAgent; exports.Agent = Agent; exports.request = function(options, cb) { if (options.agent === undefined) { - options.agent = getAgent(options); - } else if (options.agent === false) { - options.agent = new Agent(options); + options.agent = globalAgent; } - return http._requestFromAgent(options, cb); + return http.request(options, cb); }; - exports.get = function(options, cb) { options.method = 'GET'; var req = exports.request(options, cb);