@ -1,8 +1,10 @@ |
var util = require('util'); |
var net = require('net'); |
var stream = require('stream'); |
var EventEmitter = require('events').EventEmitter; |
var FreeList = require('freelist').FreeList; |
var HTTPParser = process.binding('http_parser').HTTPParser; |
var assert = process.assert; |
var debug; |
@ -284,13 +286,9 @@ IncomingMessage.prototype._addHeaderLine = function(field, value) { |
}; |
function OutgoingMessage(socket) { |
function OutgoingMessage() { |
stream.Stream.call(this); |
// TODO Remove one of these eventually.
this.socket = socket; |
this.connection = socket; |
this.output = []; |
this.outputEncodings = []; |
@ -312,6 +310,22 @@ util.inherits(OutgoingMessage, stream.Stream); |
exports.OutgoingMessage = OutgoingMessage; |
OutgoingMessage.prototype.assignSocket = function(socket) { |
assert(!socket._httpMessage); |
socket._httpMessage = this; |
this.socket = socket; |
this.connection = socket; |
this._flush(); |
}; |
OutgoingMessage.prototype.detachSocket = function(socket) { |
assert(socket._httpMessage == this); |
socket._httpMessage = null; |
this.socket = this.connection = null; |
}; |
OutgoingMessage.prototype.destroy = function(error) { |
this.socket.destroy(error); |
}; |
@ -336,7 +350,9 @@ OutgoingMessage.prototype._send = function(data, encoding) { |
OutgoingMessage.prototype._writeRaw = function(data, encoding) { |
if (this.connection._outgoing[0] === this && this.connection.writable) { |
if (this.connection && |
this.connection._httpMessage === this && |
this.connection.writable) { |
// There might be pending data in the this.output buffer.
while (this.output.length) { |
if (!this.connection.writable) { |
@ -550,7 +566,7 @@ OutgoingMessage.prototype.end = function(data, encoding) { |
data.length > 0 && |
this.output.length === 0 && |
this.connection.writable && |
this.connection._outgoing[0] === this; |
this.connection._httpMessage === this; |
if (hot) { |
// Hot path. They're doing
@ -585,17 +601,67 @@ OutgoingMessage.prototype.end = function(data, encoding) { |
// There is the first message on the outgoing queue, and we've sent
// everything to the socket.
if (this.output.length === 0 && this.connection._outgoing[0] === this) { |
debug('outgoing message end. shifting because was flushed'); |
this.connection._onOutgoingSent(); |
if (this.output.length === 0 && this.connection._httpMessage === this) { |
debug('outgoing message end.'); |
this._finish(); |
} |
return ret; |
}; |
OutgoingMessage.prototype._finish = function() { |
this.emit('finish'); |
}; |
OutgoingMessage.prototype._flush = function() { |
// This logic is probably a bit confusing. Let me explain a bit:
// In both HTTP servers and clients it is possible to queue up several
// outgoing messages. This is easiest to imagine in the case of a client.
// Take the following situation:
// req1 = client.request('GET', '/');
// req2 = client.request('POST', '/');
// When the user does
// req2.write('hello world\n');
// it's possible that the first request has not been completely flushed to
// the socket yet. Thus the outgoing messages need to be prepared to queue
// up data internally before sending it on further to the socket's queue.
// This function, outgoingFlush(), is called by both the Server and Client
// to attempt to flush any pending messages out to the socket.
if (!this.socket) return; |
var ret; |
while (this.output.length) { |
if (!this.socket.writable) return; // XXX Necessary?
var data = this.output.shift(); |
var encoding = this.outputEncodings.shift(); |
ret = this.socket.write(data, encoding); |
} |
if (this.finished) { |
// This is a queue to the server or client to bring in the next this.
this._finish(); |
} else if (ret) { |
this.emit('drain'); |
} |
}; |
function ServerResponse(req) { |
OutgoingMessage.call(this, req.socket); |
OutgoingMessage.call(this); |
if (req.method === 'HEAD') this._hasBody = false; |
@ -666,19 +732,30 @@ ServerResponse.prototype.writeHeader = function() { |
}; |
function ClientRequest(socket, method, url, headers) { |
OutgoingMessage.call(this, socket); |
function ClientRequest(options) { |
OutgoingMessage.call(this); |
var method = this.method = (options.method || 'GET').toUpperCase(); |
var path = options.path || '/'; |
var headers = options.headers || {}; |
// Host header set by default.
if (options.host && !(headers.host || headers.Host || headers.HOST)) { |
headers.Host = options.host; |
} |
this.method = method = method.toUpperCase(); |
this.shouldKeepAlive = false; |
if (method === 'GET' || method === 'HEAD') { |
this.useChunkedEncodingByDefault = false; |
} else { |
this.useChunkedEncodingByDefault = true; |
} |
// By default keep-alive is off. This is the last message unless otherwise
// specified.
this._last = true; |
this._storeHeader(method + ' ' + url + ' HTTP/1.1\r\n', headers); |
this._storeHeader(method + ' ' + path + ' HTTP/1.1\r\n', headers); |
} |
util.inherits(ClientRequest, OutgoingMessage); |
@ -686,58 +763,12 @@ util.inherits(ClientRequest, OutgoingMessage); |
exports.ClientRequest = ClientRequest; |
function outgoingFlush(socket) { |
// This logic is probably a bit confusing. Let me explain a bit:
// In both HTTP servers and clients it is possible to queue up several
// outgoing messages. This is easiest to imagine in the case of a client.
// Take the following situation:
// req1 = client.request('GET', '/');
// req2 = client.request('POST', '/');
// When the user does
// req2.write('hello world\n');
// it's possible that the first request has not been completely flushed to
// the socket yet. Thus the outgoing messages need to be prepared to queue
// up data internally before sending it on further to the socket's queue.
// This function, outgoingFlush(), is called by both the Server and Client
// to attempt to flush any pending messages out to the socket.
var message = socket._outgoing[0]; |
if (!message) return; |
var ret; |
while (message.output.length) { |
if (!socket.writable) return; // XXX Necessary?
var data = message.output.shift(); |
var encoding = message.outputEncodings.shift(); |
ret = socket.write(data, encoding); |
} |
if (message.finished) { |
socket._onOutgoingSent(); |
} else if (ret) { |
message.emit('drain'); |
} |
} |
function httpSocketSetup(socket) { |
// An array of outgoing messages for the socket. In pipelined connections
// we need to keep track of the order they were sent.
socket._outgoing = []; |
// NOTE: be sure not to use ondrain elsewhere in this file!
socket.ondrain = function() { |
var message = socket._outgoing[0]; |
if (message) message.emit('drain'); |
if (socket._httpMessage) { |
socket._httpMessage.emit('drain'); |
} |
}; |
} |
@ -765,6 +796,7 @@ exports.createServer = function(requestListener) { |
function connectionListener(socket) { |
var self = this; |
var outgoing = []; |
debug('SERVER new http connection'); |
@ -778,6 +810,7 @@ function connectionListener(socket) { |
var parser = parsers.alloc(); |
parser.reinitialize('request'); |
parser.socket = socket; |
parser.incoming = null; |
socket.addListener('error', function(e) { |
self.emit('clientError', e); |
@ -811,9 +844,10 @@ function connectionListener(socket) { |
socket.onend = function() { |
parser.finish(); |
if (socket._outgoing.length) { |
socket._outgoing[socket._outgoing.length - 1]._last = true; |
outgoingFlush(socket); |
if (outgoing.length) { |
outgoing[outgoing.length - 1]._last = true; |
} else if (socket._httpMessage) { |
socket._httpMessage._last = true; |
} else { |
socket.end(); |
} |
@ -824,21 +858,6 @@ function connectionListener(socket) { |
parsers.free(parser); |
}); |
// At the end of each response message, after it has been flushed to the
// socket. Here we insert logic about what to do next.
socket._onOutgoingSent = function(message) { |
var message = socket._outgoing.shift(); |
if (message._last) { |
// No more messages to be pushed out.
socket.destroySoon(); |
} else if (socket._outgoing.length) { |
// Push out the next message.
outgoingFlush(socket); |
} |
}; |
// The following callback is issued after the headers have been read on a
// new message. In this callback we setup the response object and pass it
// to the user.
@ -846,7 +865,29 @@ function connectionListener(socket) { |
var res = new ServerResponse(req); |
debug('server response shouldKeepAlive: ' + shouldKeepAlive); |
res.shouldKeepAlive = shouldKeepAlive; |
socket._outgoing.push(res); |
if (socket._httpMessage) { |
// There are already pending outgoing res, append.
outgoing.push(res); |
} else { |
res.assignSocket(socket); |
} |
// When we're finished writing the response, check if this is the last
// respose, if so destroy the socket.
res.on('finish', function() { |
res.detachSocket(socket); |
if (res._last) { |
socket.destroySoon(); |
} else { |
// start sending the next message
var m = outgoing.shift(); |
if (m) { |
m.assignSocket(socket); |
} |
} |
}); |
if ('expect' in req.headers && |
(req.httpVersionMajor == 1 && req.httpVersionMinor == 1) && |
@ -867,109 +908,153 @@ function connectionListener(socket) { |
exports._connectionListener = connectionListener; |
function Client() { |
if (!(this instanceof Client)) return new Client(); |
net.Stream.call(this, { allowHalfOpen: true }); |
function Agent(host, port) { |
this.host = host; |
this.port = port; |
this.queue = []; |
this.sockets = []; |
this.maxSockets = 5; |
} |
util.inherits(Agent, EventEmitter); |
Agent.prototype.appendMessage = function(options) { |
var self = this; |
// Possible states:
// - disconnected
// - connecting
// - connected
this._state = 'disconnected'; |
var req = new ClientRequest(options); |
this.queue.push(req); |
httpSocketSetup(self); |
/* |
req.on('finish', function () { |
self._cycle(); |
}); |
*/ |
function onData(d, start, end) { |
if (!self.parser) { |
throw new Error('parser not initialized prior to Client.ondata call'); |
} |
var ret = self.parser.execute(d, start, end - start); |
if (ret instanceof Error) { |
self.destroy(ret); |
} else if (self.parser.incoming && self.parser.incoming.upgrade) { |
var bytesParsed = ret; |
self.ondata = null; |
self.onend = null; |
this._cycle(); |
var req = self.parser.incoming; |
return req; |
}; |
var upgradeHead = d.slice(start + bytesParsed + 1, end); |
if (self.listeners('upgrade').length) { |
self.emit('upgrade', req, self, upgradeHead); |
} else { |
self.destroy(); |
} |
} |
}; |
Agent.prototype._removeSocket = function(socket) { |
var i = this.sockets.indexOf(socket); |
if (i >= 0) this.sockets.splice(i, 1); |
} |
self.addListener('connect', function() { |
debug('CLIENT connected'); |
self.ondata = onData; |
self.onend = onEnd; |
Agent.prototype._establishNewConnection = function() { |
var self = this; |
assert(this.sockets.length < this.maxSockets); |
// Grab a new "socket". Depending on the implementation of _getConnection
// this could either be a raw TCP socket or a TLS stream.
var socket = this._getConnection(this.host, this.port, function () { |
self.emit('connect'); // mostly for the shim.
debug("Agent _getConnection callback"); |
self._cycle(); |
}); |
this.sockets.push(socket); |
self._state = 'connected'; |
// Add a parser to the socket.
var parser = parsers.alloc(); |
parser.reinitialize('response'); |
parser.socket = socket; |
parser.incoming = null; |
socket.on('error', function(err) { |
debug("AGENT SOCKET ERROR: " + err.message); |
var req; |
if (socket._httpMessage) { |
req = socket._httpMessage |
} else if (self.queue.length) { |
req = self.queue.shift(); |
} else { |
// No requests on queue? Where is the request
assert(0); |
} |
self._initParser(); |
outgoingFlush(self); |
req.emit('error', err); |
req._hadError = true; // hacky
}); |
function onEnd() { |
if (self.parser) self.parser.finish(); |
debug('CLIENT got end closing. state = ' + self._state); |
self.end(); |
}; |
socket.ondata = function(d, start, end) { |
var ret = parser.execute(d, start, end - start); |
if (ret instanceof Error) { |
debug('parse error'); |
socket.destroy(ret); |
} else if (parser.incoming && parser.incoming.upgrade) { |
var bytesParsed = ret; |
socket.ondata = null; |
socket.onend = null; |
self.addListener('close', function(e) { |
self._state = 'disconnected'; |
if (e) return; |
var res = parser.incoming; |
assert(socket._httpMessage); |
socket._httpMessage.res = res; |
debug('CLIENT onClose. state = ' + self._state); |
// This is start + byteParsed + 1 due to the error of getting \n
// in the upgradeHead from the closing lines of the headers
var upgradeHead = d.slice(start + bytesParsed + 1, end); |
// finally done with the request
self._outgoing.shift(); |
// Make sure we don't try to send HTTP requests to it.
self._removeSocket(socket); |
// If there are more requests to handle, reconnect.
if (self._outgoing.length) { |
self._ensureConnection(); |
} else if (self.parser) { |
parsers.free(self.parser); |
self.parser = null; |
} |
}); |
} |
util.inherits(Client, net.Stream); |
socket.on('end', function() { |
self.emit('end'); |
}); |
// XXX free the parser?
exports.Client = Client; |
if (self.listeners('upgrade').length) { |
// Emit 'upgrade' on the Agent.
self.emit('upgrade', res, socket, upgradeHead); |
} else { |
// Got upgrade header, but have no handler.
socket.destroy(); |
} |
} |
}; |
socket.onend = function() { |
self.emit('end'); // mostly for the shim.
parser.finish(); |
socket.destroy(); |
}; |
exports.createClient = function(port, host, https, credentials) { |
var c = new Client(); |
c.port = port; |
c.host = host; |
c.https = https; |
c.credentials = credentials; |
return c; |
}; |
// When the socket closes remove it from the list of available sockets.
socket.on('close', function() { |
// This is really hacky: What if someone issues a request, the server
// accepts, but then terminates the connection. There is no parse error,
// there is no socket-level error. How does the user get informed?
// We check to see if the socket has a request, if so if it has a
// response (meaning that it emitted a 'response' event). If the socket
// has a request but no response and it never emitted an error event:
// THEN we need to trigger it manually.
// There must be a better way to do this.
if (socket._httpMessage && |
!socket._httpMessage.res && |
!socket._httpMessage._hadError) { |
socket._httpMessage.emit('error', new Error('socket hang up')); |
} |
self._removeSocket(socket); |
// unref the parser for easy gc
parsers.free(parser); |
}); |
parser.onIncoming = function(res, shouldKeepAlive) { |
debug('AGENT incoming response!'); |
Client.prototype._initParser = function() { |
var self = this; |
if (!self.parser) self.parser = parsers.alloc(); |
self.parser.reinitialize('response'); |
self.parser.socket = self; |
self.parser.onIncoming = function(res) { |
debug('CLIENT incoming response!'); |
var req = socket._httpMessage; |
assert(req); |
var req = self._outgoing[0]; |
req.res = res; |
// Responses to HEAD requests are AWFUL. Ask Ryan.
// A major oversight in HTTP. Hence this nastiness.
var isHeadResponse = req.method == 'HEAD'; |
debug('CLIENT isHeadResponse ' + isHeadResponse); |
debug('AGENT isHeadResponse ' + isHeadResponse); |
if (res.statusCode == 100) { |
// restart the parser, as this is a continue message.
@ -982,17 +1067,14 @@ Client.prototype._initParser = function() { |
} |
res.addListener('end', function() { |
debug('CLIENT request complete disconnecting. state = ' + self._state); |
debug('AGENT request complete disconnecting.'); |
// For the moment we reconnect for every request. FIXME!
// All that should be required for keep-alive is to not reconnect,
// but outgoingFlush instead.
if (req.shouldKeepAlive) { |
outgoingFlush(self); |
self._outgoing.shift(); |
outgoingFlush(self); |
} else { |
self.end(); |
} |
if (!req.shouldKeepAlive) socket.end(); |
req.detachSocket(socket); |
self._cycle(); |
}); |
req.emit('response', res); |
@ -1002,54 +1084,170 @@ Client.prototype._initParser = function() { |
}; |
// This is called each time a request has been pushed completely to the
// socket. The message that was sent is still sitting at client._outgoing[0]
// it is our responsibility to shift it off.
// We have to be careful when it we shift it because once we do any writes
// to other requests will be flushed directly to the socket.
// At the moment we're implement a client which connects and disconnects on
// each request/response cycle so we cannot shift off the request from
// client._outgoing until we're completely disconnected after the response
// comes back.
Client.prototype._onOutgoingSent = function(message) { |
// We've just finished a message. We don't end/shutdown the connection here
// because HTTP servers typically cannot handle half-closed connections
// (Node servers can).
// Instead, we just check if the connection is closed, and if so
// reconnect if we have pending messages.
if (this._outgoing.length) { |
debug('CLIENT request flush. ensure connection. state = ' + this._state); |
this._ensureConnection(); |
// Sub-classes can overwrite this method with e.g. something that supplies
// TLS streams.
Agent.prototype._getConnection = function(host, port, cb) { |
debug("Agent connected!"); |
var c = net.createConnection(port, host); |
c.on('connect', cb); |
return c; |
}; |
// This method attempts to shuffle items along the queue into one of the
// waiting sockets. If a waiting socket cannot be found, it will
// start the process of establishing one.
Agent.prototype._cycle = function() { |
debug("Agent _cycle"); |
var first = this.queue[0]; |
if (!first) return; |
// First try to find an available socket.
for (var i = 0; i < this.sockets.length; i++) { |
var socket = this.sockets[i]; |
// If the socket doesn't already have a message it's sending out
// and the socket is available for writing...
if (!socket._httpMessage && (socket.writable && socket.readable)) { |
debug("Agent found socket, shift"); |
// We found an available connection!
this.queue.shift(); // remove first from queue.
first.assignSocket(socket); |
return; |
} |
} |
// Otherwise see if we should be starting a new connection to handle
// this.
if (this.sockets.length < this.maxSockets) { |
this._establishNewConnection(); |
} |
// All sockets are filled and all sockets are busy.
}; |
// process-wide hash of agents.
// keys: "host:port" string
// values: instance of Agent
// That is, one agent remote host.
// TODO currently we never remove agents from this hash. This is a small
// memory leak. Have a 2 second timeout after a agent's sockets are to try
// to remove it?
var agents = {} |
function getAgent(host, port) { |
var id = host + ':' + port; |
var agent = agents[id]; |
if (!agent) { |
agent = agents[id] = new Agent(host, port); |
} |
return agent; |
} |
exports.request = function(options, cb) { |
var agent = getAgent(options.host, options.port); |
var req = agent.appendMessage(options); |
if (cb) req.once('response', cb); |
return req; |
}; |
exports.get = function(options, cb) { |
options.method = 'GET'; |
var req = exports.request(options, cb); |
req.end(); |
return req; |
}; |
// Shims to old interface.
function Client(port, host) { |
var self = this; |
this.port = port; |
this.host = host; |
this.agent = getAgent(this.host, this.port); |
// proxy connect events upwards;
this.agent.on('connect', function() { |
self.emit('connect'); |
}); |
this.agent.on('end', function() { |
self.emit('end'); |
}); |
// proxy upgrade events upwards;
this.agent.on('upgrade', function (res, socket, upgradeHead) { |
if (self.listeners('upgrade').length) { |
self.emit('upgrade', res, socket, upgradeHead); |
} else { |
socket.destroy(); |
} |
}); |
} |
util.inherits(Client, EventEmitter); |
// This method is used in a few tests to force the connections closed.
// Again - just a shim so as not to break code. Not really important.
Client.prototype.end = function() { |
for (var i = 0; i < this.agent.sockets.length; i++) { |
var socket = this.agent.sockets[i]; |
if (!socket._httpMessage && socket.writable) socket.end(); |
} |
}; |
Client.prototype._ensureConnection = function() { |
if (this._state == 'disconnected') { |
debug('CLIENT reconnecting state = ' + this._state); |
this.connect(this.port, this.host); |
this._state = 'connecting'; |
Client.prototype.destroy = function(e) { |
for (var i = 0; i < this.agent.sockets.length; i++) { |
var socket = this.agent.sockets[i]; |
socket.destroy(e); |
} |
}; |
Client.prototype.request = function(method, url, headers) { |
if (typeof(url) != 'string') { |
Client.prototype.request = function(method, path, headers) { |
if (typeof(path) != 'string') { |
// assume method was omitted, shift arguments
headers = url; |
url = method; |
headers = path; |
path = method; |
method = 'GET'; |
} |
var req = new ClientRequest(this, method, url, headers); |
this._outgoing.push(req); |
this._ensureConnection(); |
var options = { |
method: method, |
path: path, |
headers: headers, |
port: this.port, |
host: this.host |
}; |
var self = this; |
var req = exports.request(options); |
// proxy error events from req to Client
req.on('error', function(err) { |
self.emit('error', err); |
}); |
return req; |
}; |
exports.createClient = function(port, host) { |
return new Client(port, host); |
}; |
exports.cat = function(url, encoding_, headers_) { |
var encoding = 'utf8', |
headers = {}, |
@ -1076,49 +1274,26 @@ exports.cat = function(url, encoding_, headers_) { |
var url = require('url').parse(url); |
var hasHost = false; |
if (Array.isArray(headers)) { |
for (var i = 0, l = headers.length; i < l; i++) { |
if (headers[i][0].toLowerCase() === 'host') { |
hasHost = true; |
break; |
} |
} |
} else if (typeof headers === 'Object') { |
var keys = Object.keys(headers); |
for (var i = 0, l = keys.length; i < l; i++) { |
var key = keys[i]; |
if (key.toLowerCase() == 'host') { |
hasHost = true; |
break; |
} |
} |
} |
if (!hasHost) headers['Host'] = url.hostname; |
var options = { |
method: 'GET', |
port: url.port || 80, |
host: url.hostname, |
headers: headers, |
path: (url.pathname || '/') + (url.search || '') + (url.hash || '') |
}; |
var content = ''; |
var client = exports.createClient(url.port || 80, url.hostname); |
var req = client.request((url.pathname || '/') + |
(url.search || '') + |
(url.hash || ''), |
headers); |
if (url.protocol == 'https:') { |
client.https = true; |
} |
var callbackSent = false; |
req.addListener('response', function(res) { |
var req = exports.request(options, function(res) { |
if (res.statusCode < 200 || res.statusCode >= 300) { |
if (callback && !callbackSent) { |
callback(res.statusCode); |
callbackSent = true; |
} |
client.end(); |
return; |
} |
res.setEncoding(encoding); |
res.addListener('data', function(chunk) { content += chunk; }); |
res.addListener('end', function() { |
@ -1128,19 +1303,13 @@ exports.cat = function(url, encoding_, headers_) { |
} |
}); |
}); |
req.end(); |
client.addListener('error', function(err) { |
if (callback && !callbackSent) { |
callback(err); |
callbackSent = true; |
} |
}); |
client.addListener('close', function() { |
req.on('error', function(err) { |
if (callback && !callbackSent) { |
callback(new Error('Connection closed unexpectedly')); |
callback(err); |
callbackSent = true; |
} |
}); |
req.end(); |
}; |