|
|
@ -942,7 +942,7 @@ Agent.prototype.appendMessage = function(options) { |
|
|
|
Agent.prototype._removeSocket = function(socket) { |
|
|
|
var i = this.sockets.indexOf(socket); |
|
|
|
if (i >= 0) this.sockets.splice(i, 1); |
|
|
|
} |
|
|
|
}; |
|
|
|
|
|
|
|
|
|
|
|
Agent.prototype._establishNewConnection = function() { |
|
|
@ -951,10 +951,10 @@ Agent.prototype._establishNewConnection = function() { |
|
|
|
|
|
|
|
// Grab a new "socket". Depending on the implementation of _getConnection
|
|
|
|
// this could either be a raw TCP socket or a TLS stream.
|
|
|
|
var socket = this._getConnection(this.host, this.port, function () { |
|
|
|
var socket = this._getConnection(this.host, this.port, function() { |
|
|
|
socket._httpConnecting = false; |
|
|
|
self.emit('connect'); // mostly for the shim.
|
|
|
|
debug("Agent _getConnection callback"); |
|
|
|
debug('Agent _getConnection callback'); |
|
|
|
self._cycle(); |
|
|
|
}); |
|
|
|
|
|
|
@ -972,10 +972,10 @@ Agent.prototype._establishNewConnection = function() { |
|
|
|
parser.incoming = null; |
|
|
|
|
|
|
|
socket.on('error', function(err) { |
|
|
|
debug("AGENT SOCKET ERROR: " + err.message); |
|
|
|
var req; |
|
|
|
debug('AGENT SOCKET ERROR: ' + err.message); |
|
|
|
var req; |
|
|
|
if (socket._httpMessage) { |
|
|
|
req = socket._httpMessage |
|
|
|
req = socket._httpMessage; |
|
|
|
} else if (self.queue.length) { |
|
|
|
req = self.queue.shift(); |
|
|
|
} else { |
|
|
@ -1105,8 +1105,8 @@ Agent.prototype._establishNewConnection = function() { |
|
|
|
// Sub-classes can overwrite this method with e.g. something that supplies
|
|
|
|
// TLS streams.
|
|
|
|
Agent.prototype._getConnection = function(host, port, cb) { |
|
|
|
debug("Agent connected!"); |
|
|
|
var c = net.createConnection(port, host); |
|
|
|
debug('Agent connected!'); |
|
|
|
var c = net.createConnection(port, host); |
|
|
|
c.on('connect', cb); |
|
|
|
return c; |
|
|
|
}; |
|
|
@ -1116,7 +1116,8 @@ Agent.prototype._getConnection = function(host, port, cb) { |
|
|
|
// waiting sockets. If a waiting socket cannot be found, it will
|
|
|
|
// start the process of establishing one.
|
|
|
|
Agent.prototype._cycle = function() { |
|
|
|
debug("Agent _cycle sockets=" + this.sockets.length + " queue=" + this.queue.length); |
|
|
|
debug('Agent _cycle sockets=' + this.sockets.length + |
|
|
|
' queue=' + this.queue.length); |
|
|
|
var self = this; |
|
|
|
|
|
|
|
var first = this.queue[0]; |
|
|
@ -1131,7 +1132,7 @@ Agent.prototype._cycle = function() { |
|
|
|
// If the socket doesn't already have a message it's sending out
|
|
|
|
// and the socket is available for writing...
|
|
|
|
if (!socket._httpMessage && (socket.writable && socket.readable)) { |
|
|
|
debug("Agent found socket, shift"); |
|
|
|
debug('Agent found socket, shift'); |
|
|
|
// We found an available connection!
|
|
|
|
this.queue.shift(); // remove first from queue.
|
|
|
|
first.assignSocket(socket); |
|
|
@ -1159,7 +1160,7 @@ Agent.prototype._cycle = function() { |
|
|
|
// TODO currently we never remove agents from this hash. This is a small
|
|
|
|
// memory leak. Have a 2 second timeout after a agent's sockets are to try
|
|
|
|
// to remove it?
|
|
|
|
var agents = {} |
|
|
|
var agents = {}; |
|
|
|
|
|
|
|
|
|
|
|
function getAgent(host, port) { |
|
|
@ -1220,7 +1221,7 @@ function Client(port, host) { |
|
|
|
}); |
|
|
|
|
|
|
|
// proxy upgrade events upwards;
|
|
|
|
this.agent.on('upgrade', function (res, socket, upgradeHead) { |
|
|
|
this.agent.on('upgrade', function(res, socket, upgradeHead) { |
|
|
|
if (self.listeners('upgrade').length) { |
|
|
|
self.emit('upgrade', res, socket, upgradeHead); |
|
|
|
} else { |
|
|
|