// Copyright Joyent, Inc. and other Node contributors. // // Permission is hereby granted, free of charge, to any person obtaining a // copy of this software and associated documentation files (the // "Software"), to deal in the Software without restriction, including // without limitation the rights to use, copy, modify, merge, publish, // distribute, sublicense, and/or sell copies of the Software, and to permit // persons to whom the Software is furnished to do so, subject to the // following conditions: // // The above copyright notice and this permission notice shall be included // in all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS // OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN // NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, // DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR // OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE // USE OR OTHER DEALINGS IN THE SOFTWARE. var net = require('net'); var url = require('url'); var util = require('util'); var EventEmitter = require('events').EventEmitter; var ClientRequest = require('_http_client').ClientRequest; var debug = util.debuglog('http'); // New Agent code. // The largest departure from the previous implementation is that // an Agent instance holds connections for a variable number of host:ports. // Surprisingly, this is still API compatible as far as third parties are // concerned. The only code that really notices the difference is the // request object. // Another departure is that all code related to HTTP parsing is in // ClientRequest.onSocket(). The Agent is now *strictly* // concerned with managing a connection pool. function Agent(options) { if (!(this instanceof Agent)) return new Agent(options); EventEmitter.call(this); var self = this; self.defaultPort = 80; self.protocol = 'http:'; self.options = util._extend({}, options); // don't confuse net and make it think that we're connecting to a pipe self.options.path = null; self.requests = {}; self.sockets = {}; self.freeSockets = {}; self.keepAliveMsecs = self.options.keepAliveMsecs || 1000; self.keepAlive = self.options.keepAlive || false; self.maxSockets = self.options.maxSockets || Agent.defaultMaxSockets; self.on('free', function(socket, options) { var name = self.getName(options); debug('agent.on(free)', name); if (!socket.destroyed && self.requests[name] && self.requests[name].length) { self.requests[name].shift().onSocket(socket); if (self.requests[name].length === 0) { // don't leak delete self.requests[name]; } } else { // If there are no pending requests, then put it in // the freeSockets pool, but only if we're allowed to do so. var req = socket._httpMessage; if (req && req.shouldKeepAlive && !socket.destroyed && self.options.keepAlive) { var freeSockets = self.freeSockets[name]; var count = freeSockets ? freeSockets.length : 0; if (self.sockets[name]) count += self.sockets[name].length; if (count > self.maxSockets) { socket.destroy(); } else { freeSockets = freeSockets || []; self.freeSockets[name] = freeSockets; socket.setKeepAlive(true, self.keepAliveMsecs); socket.unref(); socket._httpMessage = null; freeSockets.push(socket); } } else { socket.destroy(); } } }); } util.inherits(Agent, EventEmitter); exports.Agent = Agent; Agent.defaultMaxSockets = Infinity; Agent.prototype.createConnection = net.createConnection; // Get the key for a given set of request options Agent.prototype.getName = function(options) { var name = ''; if (options.host) name += options.host; else name += 'localhost'; name += ':'; if (options.port) name += options.port; name += ':'; if (options.localAddress) name += options.localAddress; name += ':'; return name; }; Agent.prototype.addRequest = function(req, options) { var host = options.host; var port = options.port; var localAddress = options.localAddress; var name = this.getName(options); if (!this.sockets[name]) { this.sockets[name] = []; } if (this.freeSockets[name] && this.freeSockets[name].length) { debug('have free socket'); // we have a free socket, so use that. var socket = this.freeSockets[name].shift(); // don't leak if (!this.freeSockets[name].length) delete this.freeSockets[name]; socket.ref(); req.onSocket(socket); } else if (this.sockets[name].length < this.maxSockets) { debug('call onSocket'); // If we are under maxSockets create a new one. req.onSocket(this.createSocket(req, options)); } else { debug('wait for socket'); // We are over limit so we'll add it to the queue. if (!this.requests[name]) { this.requests[name] = []; } this.requests[name].push(req); } }; Agent.prototype.createSocket = function(req, options) { var self = this; options = util._extend({}, options); options = util._extend(options, self.options); options.servername = options.host; if (req) { var hostHeader = req.getHeader('host'); if (hostHeader) { options.servername = hostHeader.replace(/:.*$/, ''); } } var name = self.getName(options); debug('createConnection', name, options); var s = self.createConnection(options); if (!self.sockets[name]) { self.sockets[name] = []; } this.sockets[name].push(s); debug('sockets', name, this.sockets[name].length); function onFree() { self.emit('free', s, options); } s.on('free', onFree); function onClose(err) { debug('CLIENT socket onClose'); // This is the only place where sockets get removed from the Agent. // If you want to remove a socket from the pool, just close it. // All socket errors end in a close event anyway. self.removeSocket(s, options); } s.on('close', onClose); function onRemove() { // We need this function for cases like HTTP 'upgrade' // (defined by WebSockets) where we need to remove a socket from the // pool because it'll be locked up indefinitely debug('CLIENT socket onRemove'); self.removeSocket(s, options); s.removeListener('close', onClose); s.removeListener('free', onFree); s.removeListener('agentRemove', onRemove); } s.on('agentRemove', onRemove); return s; }; Agent.prototype.removeSocket = function(s, options) { var name = this.getName(options); debug('removeSocket', name); if (this.sockets[name]) { var index = this.sockets[name].indexOf(s); if (index !== -1) { this.sockets[name].splice(index, 1); if (this.sockets[name].length === 0) { // don't leak delete this.sockets[name]; } } } if (this.requests[name] && this.requests[name].length) { debug('removeSocket, have a request, make a socket'); var req = this.requests[name][0]; // If we have pending requests and a socket gets closed make a new one this.createSocket(req, options).emit('free'); } }; Agent.prototype.destroy = function() { var sets = [this.freeSockets, this.sockets]; sets.forEach(function(set) { Object.keys(set).forEach(function(name) { set[name].forEach(function(socket) { socket.destroy(); }); }); }); }; Agent.prototype.request = function(options, cb) { if (util.isString(options)) { options = url.parse(options); } // don't try to do dns lookups of foo.com:8080, just foo.com if (options.hostname) { options.host = options.hostname; } if (options && options.path && / /.test(options.path)) { // The actual regex is more like /[^A-Za-z0-9\-._~!$&'()*+,;=/:@]/ // with an additional rule for ignoring percentage-escaped characters // but that's a) hard to capture in a regular expression that performs // well, and b) possibly too restrictive for real-world usage. That's // why it only scans for spaces because those are guaranteed to create // an invalid request. throw new TypeError('Request path contains unescaped characters.'); } else if (options.protocol && options.protocol !== this.protocol) { throw new Error('Protocol:' + options.protocol + ' not supported.'); } options = util._extend({ agent: this, keepAlive: this.keepAlive }, options); // if it's false, then make a new one, just like this one. if (options.agent === false) options.agent = new this.constructor(options); debug('agent.request', options); return new ClientRequest(options, cb); }; Agent.prototype.get = function(options, cb) { var req = this.request(options, cb); req.end(); return req; };