From 60aea96f84c850615bac7a6e00ef28fab5ae18ae Mon Sep 17 00:00:00 2001 From: Ryan Dahl Date: Thu, 20 Jan 2011 02:41:16 -0800 Subject: [PATCH 01/10] first pass at http refactor for TLS --- lib/http.js | 518 ++++++++++++++++++++-------------------------------- 1 file changed, 196 insertions(+), 322 deletions(-) diff --git a/lib/http.js b/lib/http.js index 960d28aaa3..247e0dcb61 100644 --- a/lib/http.js +++ b/lib/http.js @@ -1,8 +1,10 @@ var util = require('util'); var net = require('net'); var stream = require('stream'); +var EventEmitter = require('events').EventEmitter; var FreeList = require('freelist').FreeList; var HTTPParser = process.binding('http_parser').HTTPParser; +var assert = process.assert; var debug; @@ -284,13 +286,9 @@ IncomingMessage.prototype._addHeaderLine = function(field, value) { }; -function OutgoingMessage(socket) { +function OutgoingMessage() { stream.Stream.call(this); - // TODO Remove one of these eventually. - this.socket = socket; - this.connection = socket; - this.output = []; this.outputEncodings = []; @@ -312,6 +310,15 @@ util.inherits(OutgoingMessage, stream.Stream); exports.OutgoingMessage = OutgoingMessage; +OutgoingMessage.prototype.assignSocket = function(socket) { + assert(!socket._httpMessage); + socket._httpMessage = this; + this.socket = socket; + this.connection = socket; + this._flush(); +}; + + OutgoingMessage.prototype.destroy = function(error) { this.socket.destroy(error); }; @@ -336,7 +343,7 @@ OutgoingMessage.prototype._send = function(data, encoding) { OutgoingMessage.prototype._writeRaw = function(data, encoding) { - if (this.connection._outgoing[0] === this && this.connection.writable) { + if (this.connection._httpMessage === this && this.connection.writable) { // There might be pending data in the this.output buffer. while (this.output.length) { if (!this.connection.writable) { @@ -550,7 +557,7 @@ OutgoingMessage.prototype.end = function(data, encoding) { data.length > 0 && this.output.length === 0 && this.connection.writable && - this.connection._outgoing[0] === this; + this.connection._httpMessage === this; if (hot) { // Hot path. They're doing @@ -585,17 +592,69 @@ OutgoingMessage.prototype.end = function(data, encoding) { // There is the first message on the outgoing queue, and we've sent // everything to the socket. - if (this.output.length === 0 && this.connection._outgoing[0] === this) { - debug('outgoing message end. shifting because was flushed'); - this.connection._onOutgoingSent(); + if (this.output.length === 0 && this.connection._httpMessage === this) { + debug('outgoing message end.'); + this._finish(); } return ret; }; +OutgoingMessage.prototype._finish = function() { + this.socket._httpMessage = null; + this.socket = this.connection = null; + this.emit('finish'); +}; + + +OutgoingMessage.prototype._flush = function() { + // This logic is probably a bit confusing. Let me explain a bit: + // + // In both HTTP servers and clients it is possible to queue up several + // outgoing messages. This is easiest to imagine in the case of a client. + // Take the following situation: + // + // req1 = client.request('GET', '/'); + // req2 = client.request('POST', '/'); + // + // When the user does + // + // req2.write('hello world\n'); + // + // it's possible that the first request has not been completely flushed to + // the socket yet. Thus the outgoing messages need to be prepared to queue + // up data internally before sending it on further to the socket's queue. + // + // This function, outgoingFlush(), is called by both the Server and Client + // to attempt to flush any pending messages out to the socket. + + if (!this.socket) return; + + var ret; + + while (this.output.length) { + if (!this.socket.writable) return; // XXX Necessary? + + var data = this.output.shift(); + var encoding = this.outputEncodings.shift(); + + ret = this.socket.write(data, encoding); + } + + if (this.finished) { + // This is a queue to the server or client to bring in the next this. + this._finish(); + } else if (ret) { + this.emit('drain'); + } +}; + + + + function ServerResponse(req) { - OutgoingMessage.call(this, req.socket); + OutgoingMessage.call(this); if (req.method === 'HEAD') this._hasBody = false; @@ -666,19 +725,30 @@ ServerResponse.prototype.writeHeader = function() { }; -function ClientRequest(socket, method, url, headers) { - OutgoingMessage.call(this, socket); +function ClientRequest(options) { + OutgoingMessage.call(this); + + var method = this.method = (options.method || 'GET').toUpperCase(); + var path = options.path || '/'; + var headers = options.headers; + + // Host header set by default. + if (options.host && !(headers.host || headers.Host || headers.HOST)) { + headers.Host = options.host; + } - this.method = method = method.toUpperCase(); this.shouldKeepAlive = false; if (method === 'GET' || method === 'HEAD') { this.useChunkedEncodingByDefault = false; } else { this.useChunkedEncodingByDefault = true; } + + // By default keep-alive is off. This is the last message unless otherwise + // specified. this._last = true; - this._storeHeader(method + ' ' + url + ' HTTP/1.1\r\n', headers); + this._storeHeader(method + ' ' + path + ' HTTP/1.1\r\n', headers); } util.inherits(ClientRequest, OutgoingMessage); @@ -686,58 +756,12 @@ util.inherits(ClientRequest, OutgoingMessage); exports.ClientRequest = ClientRequest; -function outgoingFlush(socket) { - // This logic is probably a bit confusing. Let me explain a bit: - // - // In both HTTP servers and clients it is possible to queue up several - // outgoing messages. This is easiest to imagine in the case of a client. - // Take the following situation: - // - // req1 = client.request('GET', '/'); - // req2 = client.request('POST', '/'); - // - // When the user does - // - // req2.write('hello world\n'); - // - // it's possible that the first request has not been completely flushed to - // the socket yet. Thus the outgoing messages need to be prepared to queue - // up data internally before sending it on further to the socket's queue. - // - // This function, outgoingFlush(), is called by both the Server and Client - // to attempt to flush any pending messages out to the socket. - var message = socket._outgoing[0]; - - if (!message) return; - - var ret; - - while (message.output.length) { - if (!socket.writable) return; // XXX Necessary? - - var data = message.output.shift(); - var encoding = message.outputEncodings.shift(); - - ret = socket.write(data, encoding); - } - - if (message.finished) { - socket._onOutgoingSent(); - } else if (ret) { - message.emit('drain'); - } -} - - function httpSocketSetup(socket) { - // An array of outgoing messages for the socket. In pipelined connections - // we need to keep track of the order they were sent. - socket._outgoing = []; - // NOTE: be sure not to use ondrain elsewhere in this file! socket.ondrain = function() { - var message = socket._outgoing[0]; - if (message) message.emit('drain'); + if (socket._httpMessage) { + socket._httpMessage.emit('drain'); + } }; } @@ -765,6 +789,7 @@ exports.createServer = function(requestListener) { function connectionListener(socket) { var self = this; + var outgoing = []; debug('SERVER new http connection'); @@ -811,9 +836,10 @@ function connectionListener(socket) { socket.onend = function() { parser.finish(); - if (socket._outgoing.length) { - socket._outgoing[socket._outgoing.length - 1]._last = true; - outgoingFlush(socket); + if (outgoing.length) { + outgoing[outgoing.length - 1]._last = true; + } else if (socket._httpMessage) { + socket._httpMessage._last = true; } else { socket.end(); } @@ -824,21 +850,6 @@ function connectionListener(socket) { parsers.free(parser); }); - // At the end of each response message, after it has been flushed to the - // socket. Here we insert logic about what to do next. - socket._onOutgoingSent = function(message) { - var message = socket._outgoing.shift(); - if (message._last) { - // No more messages to be pushed out. - - socket.destroySoon(); - - } else if (socket._outgoing.length) { - // Push out the next message. - outgoingFlush(socket); - } - }; - // The following callback is issued after the headers have been read on a // new message. In this callback we setup the response object and pass it // to the user. @@ -846,7 +857,27 @@ function connectionListener(socket) { var res = new ServerResponse(req); debug('server response shouldKeepAlive: ' + shouldKeepAlive); res.shouldKeepAlive = shouldKeepAlive; - socket._outgoing.push(res); + + if (socket._httpMessage) { + // There are already pending outgoing res, append. + outgoing.push(res); + } else { + res.assignSocket(socket); + } + + // When we're finished writing the response, check if this is the last + // respose, if so destroy the socket. + res.on('finish', function() { + if (res._last) { + socket.destroySoon(); + } else { + // start sending the next message + var m = outgoing.shift(); + if (m) { + m.assignSocket(socket); + } + } + }); if ('expect' in req.headers && (req.httpVersionMajor == 1 && req.httpVersionMinor == 1) && @@ -867,280 +898,123 @@ function connectionListener(socket) { exports._connectionListener = connectionListener; -function Client() { - if (!(this instanceof Client)) return new Client(); - net.Stream.call(this, { allowHalfOpen: true }); - var self = this; - - // Possible states: - // - disconnected - // - connecting - // - connected - this._state = 'disconnected'; - - httpSocketSetup(self); - - function onData(d, start, end) { - if (!self.parser) { - throw new Error('parser not initialized prior to Client.ondata call'); - } - var ret = self.parser.execute(d, start, end - start); - if (ret instanceof Error) { - self.destroy(ret); - } else if (self.parser.incoming && self.parser.incoming.upgrade) { - var bytesParsed = ret; - self.ondata = null; - self.onend = null; - - var req = self.parser.incoming; +function Agent(host, port) { + this.host = host; + this.port = port; - var upgradeHead = d.slice(start + bytesParsed + 1, end); - - if (self.listeners('upgrade').length) { - self.emit('upgrade', req, self, upgradeHead); - } else { - self.destroy(); - } - } - }; + this.queue = []; + this.sockets = []; + this.maxSockets = 5; +} +util.inherits(Agent, EventEmitter); - self.addListener('connect', function() { - debug('CLIENT connected'); - self.ondata = onData; - self.onend = onEnd; +Agent.prototype.appendMessage = function(options) { + var self = this; - self._state = 'connected'; + var req = new ClientRequest(options); + this.queue.push(req); - self._initParser(); - outgoingFlush(self); + req.on('finish', function () { + self._cycle(); }); - function onEnd() { - if (self.parser) self.parser.finish(); - debug('CLIENT got end closing. state = ' + self._state); - self.end(); - }; - - self.addListener('close', function(e) { - self._state = 'disconnected'; - if (e) return; + this._cycle(); +}; - debug('CLIENT onClose. state = ' + self._state); - // finally done with the request - self._outgoing.shift(); +Agent.prototype._establishNewConnection = function(socket, message) { + var self = this; + assert(this.sockets.length < this.maxSockets); - // If there are more requests to handle, reconnect. - if (self._outgoing.length) { - self._ensureConnection(); - } else if (self.parser) { - parsers.free(self.parser); - self.parser = null; - } + // Grab a new "socket". Depending on the implementation of _getConnection + // this could either be a raw TCP socket or a TLS stream. + var socket = this._getConnection(this.host, this.port, function () { + self._cycle(); }); -} -util.inherits(Client, net.Stream); + this.sockets.push(socket); -exports.Client = Client; + // When the socket closes remove it from the list of available sockets. + socket.on('close', function() { + var i = self.sockets.indexOf(socket); + if (i >= 0) self.sockets.splice(i, 1); + }); +}; -exports.createClient = function(port, host, https, credentials) { - var c = new Client(); - c.port = port; - c.host = host; - c.https = https; - c.credentials = credentials; +// Sub-classes can overwrite this method with e.g. something that supplies +// TLS streams. +Agent.prototype._getConnection = function(host, port, cb) { + var c = net.createConnection(port, host); + c.on('connect', cb); return c; }; -Client.prototype._initParser = function() { - var self = this; - if (!self.parser) self.parser = parsers.alloc(); - self.parser.reinitialize('response'); - self.parser.socket = self; - self.parser.onIncoming = function(res) { - debug('CLIENT incoming response!'); - - var req = self._outgoing[0]; - - // Responses to HEAD requests are AWFUL. Ask Ryan. - // A major oversight in HTTP. Hence this nastiness. - var isHeadResponse = req.method == 'HEAD'; - debug('CLIENT isHeadResponse ' + isHeadResponse); - - if (res.statusCode == 100) { - // restart the parser, as this is a continue message. - req.emit('continue'); - return true; - } - - if (req.shouldKeepAlive && res.headers.connection === 'close') { - req.shouldKeepAlive = false; +// This method attempts to shuffle items along the queue into one of the +// waiting sockets. If a waiting socket cannot be found, it will +// start the process of establishing one. +Agent.prototype._cycle = function() { + var first = this.queue[0]; + if (!first) return; + + // First try to find an available socket. + for (var i = 0; i < this.sockets.length; i++) { + var socket = this.sockets[i]; + // If the socket doesn't already have a message it's sending out + // and the socket is available for writing... + if (!socket._httpMessage && (socket.writable && socket.readable)) { + // We found an available connection! + this.queue.shift(); // remove first from queue. + first.assignSocket(socket); + return; } - - res.addListener('end', function() { - debug('CLIENT request complete disconnecting. state = ' + self._state); - // For the moment we reconnect for every request. FIXME! - // All that should be required for keep-alive is to not reconnect, - // but outgoingFlush instead. - if (req.shouldKeepAlive) { - outgoingFlush(self); - self._outgoing.shift(); - outgoingFlush(self); - } else { - self.end(); - } - }); - - req.emit('response', res); - - return isHeadResponse; - }; -}; - - -// This is called each time a request has been pushed completely to the -// socket. The message that was sent is still sitting at client._outgoing[0] -// it is our responsibility to shift it off. -// -// We have to be careful when it we shift it because once we do any writes -// to other requests will be flushed directly to the socket. -// -// At the moment we're implement a client which connects and disconnects on -// each request/response cycle so we cannot shift off the request from -// client._outgoing until we're completely disconnected after the response -// comes back. -Client.prototype._onOutgoingSent = function(message) { - // We've just finished a message. We don't end/shutdown the connection here - // because HTTP servers typically cannot handle half-closed connections - // (Node servers can). - // - // Instead, we just check if the connection is closed, and if so - // reconnect if we have pending messages. - if (this._outgoing.length) { - debug('CLIENT request flush. ensure connection. state = ' + this._state); - this._ensureConnection(); } -}; - -Client.prototype._ensureConnection = function() { - if (this._state == 'disconnected') { - debug('CLIENT reconnecting state = ' + this._state); - this.connect(this.port, this.host); - this._state = 'connecting'; + // Otherwise see if we should be starting a new connection to handle + // this. + if (this.sockets.length < this.maxSockets) { + this._establishNewConnection(); } -}; - -Client.prototype.request = function(method, url, headers) { - if (typeof(url) != 'string') { - // assume method was omitted, shift arguments - headers = url; - url = method; - method = 'GET'; - } - var req = new ClientRequest(this, method, url, headers); - this._outgoing.push(req); - this._ensureConnection(); - return req; + // All sockets are filled and all sockets are busy. }; -exports.cat = function(url, encoding_, headers_) { - var encoding = 'utf8', - headers = {}, - callback = null; +// process-wide hash of agents. +// keys: "host:port" string +// values: instance of Agent +// That is, one agent remote host. +// TODO currently we never remove agents from this hash. This is a small +// memory leak. Have a 2 second timeout after a agent's sockets are to try +// to remove it? +var agents = {} - // parse the arguments for the various options... very ugly - if (typeof(arguments[1]) == 'string') { - encoding = arguments[1]; - if (typeof(arguments[2]) == 'object') { - headers = arguments[2]; - if (typeof(arguments[3]) == 'function') callback = arguments[3]; - } else { - if (typeof(arguments[2]) == 'function') callback = arguments[2]; - } - } else { - // didn't specify encoding - if (typeof(arguments[1]) == 'object') { - headers = arguments[1]; - callback = arguments[2]; - } else { - callback = arguments[1]; - } - } - var url = require('url').parse(url); +function getAgent(host, port) { + var id = host + ':' + port; + var agent = agents[id]; - var hasHost = false; - if (Array.isArray(headers)) { - for (var i = 0, l = headers.length; i < l; i++) { - if (headers[i][0].toLowerCase() === 'host') { - hasHost = true; - break; - } - } - } else if (typeof headers === 'Object') { - var keys = Object.keys(headers); - for (var i = 0, l = keys.length; i < l; i++) { - var key = keys[i]; - if (key.toLowerCase() == 'host') { - hasHost = true; - break; - } - } + if (!agent) { + agent = agents[id] = new Agent(host, port); } - if (!hasHost) headers['Host'] = url.hostname; - - var content = ''; - var client = exports.createClient(url.port || 80, url.hostname); - var req = client.request((url.pathname || '/') + - (url.search || '') + - (url.hash || ''), - headers); + return agent; +} - if (url.protocol == 'https:') { - client.https = true; - } - var callbackSent = false; +exports.request = function(options, cb) { + var agent = getAgent(options.host, options.port); + var req = agent.appendMessage(options); - req.addListener('response', function(res) { - if (res.statusCode < 200 || res.statusCode >= 300) { - if (callback && !callbackSent) { - callback(res.statusCode); - callbackSent = true; - } - client.end(); - return; - } - res.setEncoding(encoding); - res.addListener('data', function(chunk) { content += chunk; }); - res.addListener('end', function() { - if (callback && !callbackSent) { - callback(null, content); - callbackSent = true; - } + if (cb) { + req.once('response', function (res) { + cb(null, res); }); - }); - - client.addListener('error', function(err) { - if (callback && !callbackSent) { - callback(err); - callbackSent = true; - } - }); + } - client.addListener('close', function() { - if (callback && !callbackSent) { - callback(new Error('Connection closed unexpectedly')); - callbackSent = true; - } - }); - req.end(); + return req; }; + + From e576d4ec79ad983853d989819c78328aba0bb6f9 Mon Sep 17 00:00:00 2001 From: Ryan Dahl Date: Thu, 20 Jan 2011 10:29:54 -0800 Subject: [PATCH 02/10] Add parser to agent --- lib/http.js | 100 +++++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 96 insertions(+), 4 deletions(-) diff --git a/lib/http.js b/lib/http.js index 247e0dcb61..25fc8143d3 100644 --- a/lib/http.js +++ b/lib/http.js @@ -319,6 +319,13 @@ OutgoingMessage.prototype.assignSocket = function(socket) { }; +OutgoingMessage.prototype.detachSocket = function(socket) { + assert(socket._httpMessage == this); + socket._httpMessage = null; + this.socket = this.connection = null; +}; + + OutgoingMessage.prototype.destroy = function(error) { this.socket.destroy(error); }; @@ -343,7 +350,9 @@ OutgoingMessage.prototype._send = function(data, encoding) { OutgoingMessage.prototype._writeRaw = function(data, encoding) { - if (this.connection._httpMessage === this && this.connection.writable) { + if (this.connection && + this.connection._httpMessage === this && + this.connection.writable) { // There might be pending data in the this.output buffer. while (this.output.length) { if (!this.connection.writable) { @@ -602,8 +611,6 @@ OutgoingMessage.prototype.end = function(data, encoding) { OutgoingMessage.prototype._finish = function() { - this.socket._httpMessage = null; - this.socket = this.connection = null; this.emit('finish'); }; @@ -868,6 +875,8 @@ function connectionListener(socket) { // When we're finished writing the response, check if this is the last // respose, if so destroy the socket. res.on('finish', function() { + res.detachSocket(socket); + if (res._last) { socket.destroySoon(); } else { @@ -915,37 +924,117 @@ Agent.prototype.appendMessage = function(options) { var req = new ClientRequest(options); this.queue.push(req); + /* req.on('finish', function () { self._cycle(); }); + */ this._cycle(); + + return req; }; -Agent.prototype._establishNewConnection = function(socket, message) { +Agent.prototype._establishNewConnection = function() { var self = this; assert(this.sockets.length < this.maxSockets); // Grab a new "socket". Depending on the implementation of _getConnection // this could either be a raw TCP socket or a TLS stream. var socket = this._getConnection(this.host, this.port, function () { + debug("Agent _getConnection callback"); self._cycle(); }); this.sockets.push(socket); + // Add a parser to the socket. + var parser = parsers.alloc(); + parser.reinitialize('response'); + parser.socket = socket; + + socket.ondata = function(d, start, end) { + var ret = parser.execute(d, start, end - start); + if (ret instanceof Error) { + debug('parse error'); + socket.destroy(ret); + } else if (parser.incoming && parser.incoming.upgrade) { + var bytesParsed = ret; + socket.ondata = null; + socket.onend = null; + + var res = parser.incoming; + + // This is start + byteParsed + 1 due to the error of getting \n + // in the upgradeHead from the closing lines of the headers + var upgradeHead = d.slice(start + bytesParsed + 1, end); + + if (self.listeners('upgrade').length) { + self.emit('upgrade', res, res.socket, upgradeHead); + } else { + // Got upgrade header, but have no handler. + socket.destroy(); + } + } + }; + + socket.onend = function() { + parser.finish(); + socket.destroy(); + }; + // When the socket closes remove it from the list of available sockets. socket.on('close', function() { var i = self.sockets.indexOf(socket); if (i >= 0) self.sockets.splice(i, 1); + // unref the parser for easy gc + parsers.free(parser); }); + + parser.onIncoming = function(res, shouldKeepAlive) { + debug('AGENT incoming response!'); + + var req = socket._httpMessage; + assert(req); + + // Responses to HEAD requests are AWFUL. Ask Ryan. + // A major oversight in HTTP. Hence this nastiness. + var isHeadResponse = req.method == 'HEAD'; + debug('AGENT isHeadResponse ' + isHeadResponse); + + if (res.statusCode == 100) { + // restart the parser, as this is a continue message. + req.emit('continue'); + return true; + } + + if (req.shouldKeepAlive && res.headers.connection === 'close') { + req.shouldKeepAlive = false; + } + + res.addListener('end', function() { + debug('AGENT request complete disconnecting.'); + // For the moment we reconnect for every request. FIXME! + // All that should be required for keep-alive is to not reconnect, + // but outgoingFlush instead. + if (!req.shouldKeepAlive) socket.end(); + + req.detachSocket(socket); + self._cycle(); + }); + + req.emit('response', res); + + return isHeadResponse; + }; }; // Sub-classes can overwrite this method with e.g. something that supplies // TLS streams. Agent.prototype._getConnection = function(host, port, cb) { + debug("Agent connected!"); var c = net.createConnection(port, host); c.on('connect', cb); return c; @@ -956,6 +1045,8 @@ Agent.prototype._getConnection = function(host, port, cb) { // waiting sockets. If a waiting socket cannot be found, it will // start the process of establishing one. Agent.prototype._cycle = function() { + debug("Agent _cycle"); + var first = this.queue[0]; if (!first) return; @@ -965,6 +1056,7 @@ Agent.prototype._cycle = function() { // If the socket doesn't already have a message it's sending out // and the socket is available for writing... if (!socket._httpMessage && (socket.writable && socket.readable)) { + debug("Agent found socket, shift"); // We found an available connection! this.queue.shift(); // remove first from queue. first.assignSocket(socket); From 105c35b9fd66745d9b8abae77b93538873cf12ba Mon Sep 17 00:00:00 2001 From: Ryan Dahl Date: Thu, 20 Jan 2011 11:21:42 -0800 Subject: [PATCH 03/10] http.Client shims --- lib/http.js | 136 +++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 135 insertions(+), 1 deletion(-) diff --git a/lib/http.js b/lib/http.js index 25fc8143d3..1768e7648e 100644 --- a/lib/http.js +++ b/lib/http.js @@ -737,7 +737,7 @@ function ClientRequest(options) { var method = this.method = (options.method || 'GET').toUpperCase(); var path = options.path || '/'; - var headers = options.headers; + var headers = options.headers || {}; // Host header set by default. if (options.host && !(headers.host || headers.Host || headers.HOST)) { @@ -1110,3 +1110,137 @@ exports.request = function(options, cb) { }; +exports.get = function(options, cb) { + options.method = 'GET'; + var req = exports.request(options, cb); + req.end(); +}; + + +// Shims to old interface. + +function Client(port, host) { + this.port = port; + this.host = host; +} + + +Client.prototype.request = function(method, path, headers) { + if (typeof(path) != 'string') { + // assume method was omitted, shift arguments + headers = path; + path = method; + method = 'GET'; + } + + var options = { + method: method, + path: path, + headers: headers, + port: this.port, + host: this.host + }; + + return exports.request(options); +}; + + +exports.createClient = function(port, host) { + return new Client(port, host); +}; + + +exports.cat = function(url, encoding_, headers_) { + var encoding = 'utf8', + headers = {}, + callback = null; + + // parse the arguments for the various options... very ugly + if (typeof(arguments[1]) == 'string') { + encoding = arguments[1]; + if (typeof(arguments[2]) == 'object') { + headers = arguments[2]; + if (typeof(arguments[3]) == 'function') callback = arguments[3]; + } else { + if (typeof(arguments[2]) == 'function') callback = arguments[2]; + } + } else { + // didn't specify encoding + if (typeof(arguments[1]) == 'object') { + headers = arguments[1]; + callback = arguments[2]; + } else { + callback = arguments[1]; + } + } + + var url = require('url').parse(url); + + var hasHost = false; + if (Array.isArray(headers)) { + for (var i = 0, l = headers.length; i < l; i++) { + if (headers[i][0].toLowerCase() === 'host') { + hasHost = true; + break; + } + } + } else if (typeof headers === 'Object') { + var keys = Object.keys(headers); + for (var i = 0, l = keys.length; i < l; i++) { + var key = keys[i]; + if (key.toLowerCase() == 'host') { + hasHost = true; + break; + } + } + } + if (!hasHost) headers['Host'] = url.hostname; + + var content = ''; + + var client = exports.createClient(url.port || 80, url.hostname); + var req = client.request((url.pathname || '/') + + (url.search || '') + + (url.hash || ''), + headers); + + if (url.protocol == 'https:') { + client.https = true; + } + + var callbackSent = false; + + req.addListener('response', function(res) { + if (res.statusCode < 200 || res.statusCode >= 300) { + if (callback && !callbackSent) { + callback(res.statusCode); + callbackSent = true; + } + client.end(); + return; + } + res.setEncoding(encoding); + res.addListener('data', function(chunk) { content += chunk; }); + res.addListener('end', function() { + if (callback && !callbackSent) { + callback(null, content); + callbackSent = true; + } + }); + }); + + client.addListener('error', function(err) { + if (callback && !callbackSent) { + callback(err); + callbackSent = true; + } + }); + + client.addListener('close', function() { + if (callback && !callbackSent) { + callback(new Error('Connection closed unexpectedly')); + callbackSent = true; + } + }); + req.end(); +}; From 4125822bedfb84b052ead699946296ce3cfa691c Mon Sep 17 00:00:00 2001 From: Ryan Dahl Date: Thu, 20 Jan 2011 15:55:02 -0800 Subject: [PATCH 04/10] all errors go to req object --- lib/http.js | 34 ++++++++++++++++++++++++++++------ 1 file changed, 28 insertions(+), 6 deletions(-) diff --git a/lib/http.js b/lib/http.js index 1768e7648e..c60703b3f5 100644 --- a/lib/http.js +++ b/lib/http.js @@ -810,6 +810,7 @@ function connectionListener(socket) { var parser = parsers.alloc(); parser.reinitialize('request'); parser.socket = socket; + parser.incoming = null; socket.addListener('error', function(e) { self.emit('clientError', e); @@ -953,6 +954,20 @@ Agent.prototype._establishNewConnection = function() { var parser = parsers.alloc(); parser.reinitialize('response'); parser.socket = socket; + parser.incoming = null; + + socket.on('error', function(err) { + debug("AGENT SOCKET ERROR: " + err.message); + if (socket._httpMessage) { + socket._httpMessage.emit('error', err); + } else if (self.queue.length) { + var req = self.queue.shift(); + if (req) req.emit('error', err); + } else { + // No requests on queue? Where is the request + assert(0); + } + }); socket.ondata = function(d, start, end) { var ret = parser.execute(d, start, end - start); @@ -998,6 +1013,8 @@ Agent.prototype._establishNewConnection = function() { var req = socket._httpMessage; assert(req); + req.res = res; + // Responses to HEAD requests are AWFUL. Ask Ryan. // A major oversight in HTTP. Hence this nastiness. var isHeadResponse = req.method == 'HEAD'; @@ -1100,11 +1117,7 @@ exports.request = function(options, cb) { var agent = getAgent(options.host, options.port); var req = agent.appendMessage(options); - if (cb) { - req.once('response', function (res) { - cb(null, res); - }); - } + if (cb) req.once('response', cb); return req; }; @@ -1123,6 +1136,7 @@ function Client(port, host) { this.port = port; this.host = host; } +util.inherits(Client, EventEmitter); Client.prototype.request = function(method, path, headers) { @@ -1141,7 +1155,15 @@ Client.prototype.request = function(method, path, headers) { host: this.host }; - return exports.request(options); + var self = this; + var req = exports.request(options); + + // proxy error events from req to Client + req.on('error', function(err) { + self.emit('error', err); + }); + + return req; }; From d89454e5d443ed39422fbf1c4280ba3ead842e18 Mon Sep 17 00:00:00 2001 From: Ryan Dahl Date: Thu, 20 Jan 2011 16:25:24 -0800 Subject: [PATCH 05/10] Backport client 'upgrade' events --- lib/http.js | 62 ++++++++++++++++++++++-- test/simple/test-http-upgrade-client2.js | 3 ++ 2 files changed, 62 insertions(+), 3 deletions(-) diff --git a/lib/http.js b/lib/http.js index c60703b3f5..5e7e6d36c9 100644 --- a/lib/http.js +++ b/lib/http.js @@ -937,6 +937,12 @@ Agent.prototype.appendMessage = function(options) { }; +Agent.prototype._removeSocket = function(socket) { + var i = this.sockets.indexOf(socket); + if (i >= 0) this.sockets.splice(i, 1); +} + + Agent.prototype._establishNewConnection = function() { var self = this; assert(this.sockets.length < this.maxSockets); @@ -944,6 +950,7 @@ Agent.prototype._establishNewConnection = function() { // Grab a new "socket". Depending on the implementation of _getConnection // this could either be a raw TCP socket or a TLS stream. var socket = this._getConnection(this.host, this.port, function () { + self.emit('connect'); // mostly for the shim. debug("Agent _getConnection callback"); self._cycle(); }); @@ -985,8 +992,18 @@ Agent.prototype._establishNewConnection = function() { // in the upgradeHead from the closing lines of the headers var upgradeHead = d.slice(start + bytesParsed + 1, end); + // Make sure we don't try to send HTTP requests to it. + self._removeSocket(socket); + + socket.on('end', function() { + self.emit('end'); + }); + + // XXX free the parser? + if (self.listeners('upgrade').length) { - self.emit('upgrade', res, res.socket, upgradeHead); + // Emit 'upgrade' on the Agent. + self.emit('upgrade', res, socket, upgradeHead); } else { // Got upgrade header, but have no handler. socket.destroy(); @@ -995,14 +1012,14 @@ Agent.prototype._establishNewConnection = function() { }; socket.onend = function() { + self.emit('end'); // mostly for the shim. parser.finish(); socket.destroy(); }; // When the socket closes remove it from the list of available sockets. socket.on('close', function() { - var i = self.sockets.indexOf(socket); - if (i >= 0) self.sockets.splice(i, 1); + self._removeSocket(socket); // unref the parser for easy gc parsers.free(parser); }); @@ -1133,12 +1150,51 @@ exports.get = function(options, cb) { // Shims to old interface. function Client(port, host) { + var self = this; + this.port = port; this.host = host; + this.agent = getAgent(this.host, this.port); + + // proxy connect events upwards; + this.agent.on('connect', function() { + self.emit('connect'); + }); + + this.agent.on('end', function() { + self.emit('end'); + }); + + // proxy upgrade events upwards; + this.agent.on('upgrade', function (res, socket, upgradeHead) { + if (self.listeners('upgrade').length) { + self.emit('upgrade', res, socket, upgradeHead); + } else { + socket.destroy(); + } + }); } util.inherits(Client, EventEmitter); +// This method is used in a few tests to force the connections closed. +// Again - just a shim so as not to break code. Not really important. +Client.prototype.end = function() { + for (var i = 0; i < this.agent.sockets.length; i++) { + var socket = this.agent.sockets[i]; + if (!socket._httpMessage && socket.writable) socket.end(); + } +}; + + +Client.prototype.destroy = function(e) { + for (var i = 0; i < this.agent.sockets.length; i++) { + var socket = this.agent.sockets[i]; + socket.destroy(e); + } +}; + + Client.prototype.request = function(method, path, headers) { if (typeof(path) != 'string') { // assume method was omitted, shift arguments diff --git a/test/simple/test-http-upgrade-client2.js b/test/simple/test-http-upgrade-client2.js index 14a2fa1798..bdd585a13c 100644 --- a/test/simple/test-http-upgrade-client2.js +++ b/test/simple/test-http-upgrade-client2.js @@ -21,11 +21,13 @@ server.listen(common.PORT, function() { var client = http.createClient(common.PORT); function upgradeRequest(fn) { + console.log("req"); var header = { 'Connection': 'Upgrade', 'Upgrade': 'Test' }; var request = client.request('GET', '/', header); var wasUpgrade = false; function onUpgrade(res, socket, head) { + console.log("client upgraded"); wasUpgrade = true; client.removeListener('upgrade', onUpgrade); @@ -34,6 +36,7 @@ server.listen(common.PORT, function() { client.on('upgrade', onUpgrade); function onEnd() { + console.log("client end"); client.removeListener('end', onEnd); if (!wasUpgrade) { throw new Error('hasn\'t received upgrade event'); From f46594951676de7ad8f481e85efb8bf43f8614a5 Mon Sep 17 00:00:00 2001 From: Ryan Dahl Date: Thu, 20 Jan 2011 17:07:44 -0800 Subject: [PATCH 06/10] hacky work around socket hangups on http requests --- lib/http.js | 78 ++++++++++++---------------- test/simple/test-http-set-timeout.js | 12 +++-- 2 files changed, 41 insertions(+), 49 deletions(-) diff --git a/lib/http.js b/lib/http.js index 5e7e6d36c9..889fa25b7e 100644 --- a/lib/http.js +++ b/lib/http.js @@ -965,15 +965,18 @@ Agent.prototype._establishNewConnection = function() { socket.on('error', function(err) { debug("AGENT SOCKET ERROR: " + err.message); + var req; if (socket._httpMessage) { - socket._httpMessage.emit('error', err); + req = socket._httpMessage } else if (self.queue.length) { - var req = self.queue.shift(); - if (req) req.emit('error', err); + req = self.queue.shift(); } else { // No requests on queue? Where is the request assert(0); } + + req.emit('error', err); + req._hadError = true; // hacky }); socket.ondata = function(d, start, end) { @@ -987,6 +990,8 @@ Agent.prototype._establishNewConnection = function() { socket.onend = null; var res = parser.incoming; + assert(socket._httpMessage); + socket._httpMessage.res = res; // This is start + byteParsed + 1 due to the error of getting \n // in the upgradeHead from the closing lines of the headers @@ -1019,6 +1024,20 @@ Agent.prototype._establishNewConnection = function() { // When the socket closes remove it from the list of available sockets. socket.on('close', function() { + // This is really hacky: What if someone issues a request, the server + // accepts, but then terminates the connection. There is no parse error, + // there is no socket-level error. How does the user get informed? + // We check to see if the socket has a request, if so if it has a + // response (meaning that it emitted a 'response' event). If the socket + // has a request but no response and it never emitted an error event: + // THEN we need to trigger it manually. + // There must be a better way to do this. + if (socket._httpMessage && + !socket._httpMessage.res && + !socket._httpMessage._hadError) { + socket._httpMessage.emit('error', new Error('socket hang up')); + } + self._removeSocket(socket); // unref the parser for easy gc parsers.free(parser); @@ -1254,49 +1273,26 @@ exports.cat = function(url, encoding_, headers_) { var url = require('url').parse(url); - var hasHost = false; - if (Array.isArray(headers)) { - for (var i = 0, l = headers.length; i < l; i++) { - if (headers[i][0].toLowerCase() === 'host') { - hasHost = true; - break; - } - } - } else if (typeof headers === 'Object') { - var keys = Object.keys(headers); - for (var i = 0, l = keys.length; i < l; i++) { - var key = keys[i]; - if (key.toLowerCase() == 'host') { - hasHost = true; - break; - } - } - } - if (!hasHost) headers['Host'] = url.hostname; + var options = { + method: 'GET', + port: url.port || 80, + host: url.hostname, + headers: headers, + path: (url.pathname || '/') + (url.search || '') + (url.hash || '') + }; var content = ''; - - var client = exports.createClient(url.port || 80, url.hostname); - var req = client.request((url.pathname || '/') + - (url.search || '') + - (url.hash || ''), - headers); - - if (url.protocol == 'https:') { - client.https = true; - } - var callbackSent = false; - req.addListener('response', function(res) { + var req = exports.request(options, function(res) { if (res.statusCode < 200 || res.statusCode >= 300) { if (callback && !callbackSent) { callback(res.statusCode); callbackSent = true; } - client.end(); return; } + res.setEncoding(encoding); res.addListener('data', function(chunk) { content += chunk; }); res.addListener('end', function() { @@ -1306,19 +1302,13 @@ exports.cat = function(url, encoding_, headers_) { } }); }); + req.end(); - client.addListener('error', function(err) { - if (callback && !callbackSent) { - callback(err); - callbackSent = true; - } - }); - client.addListener('close', function() { + req.on('error', function(err) { if (callback && !callbackSent) { - callback(new Error('Connection closed unexpectedly')); + callback(err); callbackSent = true; } }); - req.end(); }; diff --git a/test/simple/test-http-set-timeout.js b/test/simple/test-http-set-timeout.js index 1ae3229cae..d9b54c2f94 100644 --- a/test/simple/test-http-set-timeout.js +++ b/test/simple/test-http-set-timeout.js @@ -7,6 +7,7 @@ var server = http.createServer(function(req, res) { req.connection.setTimeout(500); req.connection.addListener('timeout', function() { + req.connection.destroy(); common.debug('TIMEOUT'); server.close(); }); @@ -19,9 +20,10 @@ server.listen(common.PORT, function() { throw new Error('Timeout was not sucessful'); }, 2000); - http.cat('http://localhost:' + common.PORT + '/', 'utf8', - function(err, content) { - clearTimeout(errorTimer); - console.log('HTTP REQUEST COMPLETE (this is good)'); - }); + var url = 'http://localhost:' + common.PORT + '/'; + + http.cat(url, 'utf8', function(err, content) { + clearTimeout(errorTimer); + console.log('HTTP REQUEST COMPLETE (this is good)'); + }); }); From 032f80efeaa4d4bab1d49c29f7bf15e7a6c9e20a Mon Sep 17 00:00:00 2001 From: Ryan Dahl Date: Thu, 20 Jan 2011 17:54:59 -0800 Subject: [PATCH 07/10] Docs for new http API --- doc/api/http.markdown | 149 ++++++++++++++++++++++-------------------- 1 file changed, 79 insertions(+), 70 deletions(-) diff --git a/doc/api/http.markdown b/doc/api/http.markdown index c61c544773..e818582a9f 100644 --- a/doc/api/http.markdown +++ b/doc/api/http.markdown @@ -22,7 +22,6 @@ HTTP API is very low-level. It deals with stream handling and message parsing only. It parses a message into headers and body but it does not parse the actual headers or the body. -HTTPS is supported if OpenSSL is available on the underlying platform. ## http.Server @@ -311,34 +310,57 @@ If `data` is specified, it is equivalent to calling `response.write(data, encodi followed by `response.end()`. -## http.Client +## http.request(options, callback) -An HTTP client is constructed with a server address as its -argument, the returned handle is then used to issue one or more -requests. Depending on the server connected to, the client might -pipeline the requests or reestablish the stream after each -stream. _Currently the implementation does not pipeline requests._ +Node maintains several connections per server to make HTTP requests. +This function allows one to transparently issue requests. -Example of connecting to `google.com`: +Options: - var http = require('http'); - var google = http.createClient(80, 'www.google.com'); - var request = google.request('GET', '/', - {'host': 'www.google.com'}); - request.end(); - request.on('response', function (response) { - console.log('STATUS: ' + response.statusCode); - console.log('HEADERS: ' + JSON.stringify(response.headers)); - response.setEncoding('utf8'); - response.on('data', function (chunk) { +- `host`: A domain name or IP address of the server to issue the request to. +- `port`: Port of remote server. +- `method`: A string specifing the HTTP request method. Possible values: + `'GET'` (default), `'POST'`, `'PUT'`, and `'DELETE'`. +- `path`: Request path. Should include query string and fragments if any. + E.G. `'/index.html?page=12'` +- `headers`: An object containing request headers. + +`http.request()` returns an instance of the `http.ClientRequest` +class. The `ClientRequest` instance is a writable stream. If one needs to +upload a file with a POST request, then write to the `ClientRequest` object. + +Example: + + var options = { + host: 'www.google.com', + port: 80, + path: '/upload', + method: 'POST' + }; + + var req = http.request(options, function(res) { + console.log('STATUS: ' + res.statusCode); + console.log('HEADERS: ' + JSON.stringify(res.headers)); + res.setEncoding('utf8'); + res.on('data', function (chunk) { console.log('BODY: ' + chunk); }); }); -There are a few special headers that should be noted. + // write data to request body + req.write('data\n'); + req.write('data\n'); + req.end(); + +Note that in the example `req.end()` was called. With `http.request()` one +must always call `req.end()` to signify that you're done with the request - +even if there is no data being written to the request body. -* The 'Host' header is not added by Node, and is usually required by - website. +If any error is encountered during the request (be that with DNS resolution, +TCP level errors, or actual HTTP parse errors) an `'error'` event is emitted +on the returned request object. + +There are a few special headers that should be noted. * Sending a 'Connection: keep-alive' will notify Node that the connection to the server should be persisted until the next request. @@ -350,6 +372,33 @@ There are a few special headers that should be noted. and listen for the `continue` event. See RFC2616 Section 8.2.3 for more information. +## http.get(options, callback) + +Since most requests are GET requests without bodies, Node provides this +convience method. The only difference between this method and `http.request()` is +that it sets the method to GET and calls `req.end()` automatically. + +Example: + + var options = { + host: 'www.google.com', + port: 80, + path: '/index.html' + }; + + http.get(options, function(res) { + console.log("Got response: " + res.statusCode); + }).on('error', function(e) { + console.log("Got error: " + e.message); + }); + + +## http.Agent + +`http.request()` uses a special `Agent` for managing multiple connections to +an HTTP server. Normally `Agent` instances should not be exposed to user +code, however in certain situations it's useful to check the status of the +agent. ### Event: 'upgrade' @@ -369,56 +418,24 @@ Emitted when the server sends a '100 Continue' HTTP response, usually because the request contained 'Expect: 100-continue'. This is an instruction that the client should send the request body. +### agent.maxSockets -### http.createClient(port, host='localhost', secure=false, [credentials]) - -Constructs a new HTTP client. `port` and -`host` refer to the server to be connected to. A -stream is not established until a request is issued. - -`secure` is an optional boolean flag to enable https support and `credentials` is an optional -credentials object from the crypto module, which may hold the client's private key, -certificate, and a list of trusted CA certificates. - -If the connection is secure, but no explicit CA certificates are passed -in the credentials, then node.js will default to the publicly trusted list -of CA certificates, as given in . +By default set to 5. Determines how many concurrent sockets the agent can have open. -### client.request(method='GET', path, [request_headers]) +### agent.sockets -Issues a request; if necessary establishes stream. Returns a `http.ClientRequest` instance. +An array of sockets currently inuse by the Agent. Do not modify. -`method` is optional and defaults to 'GET' if omitted. +### agent.queue -`request_headers` is optional. -Additional request headers might be added internally -by Node. Returns a `ClientRequest` object. +A queue of requests waiting to be sent to sockets. -Do remember to include the `Content-Length` header if you -plan on sending a body. If you plan on streaming the body, perhaps -set `Transfer-Encoding: chunked`. - -*NOTE*: the request is not complete. This method only sends the header of -the request. One needs to call `request.end()` to finalize the request and -retrieve the response. (This sounds convoluted but it provides a chance for -the user to stream a body to the server with `request.write()`.) - -### client.verifyPeer() - -Returns true or false depending on the validity of the server's certificate -in the context of the defined or default list of trusted CA certificates. - -### client.getPeerCertificate() - -Returns a JSON structure detailing the server's certificate, containing a dictionary -with keys for the certificate `'subject'`, `'issuer'`, `'valid_from'` and `'valid_to'`. ## http.ClientRequest -This object is created internally and returned from the `request()` method -of a `http.Client`. It represents an _in-progress_ request whose header has -already been sent. +This object is created internally and returned from `http.request()`. It +represents an _in-progress_ request whose header has already been sent. To get the response, add a listener for `'response'` to the request object. `'response'` will be emitted from the request object when the response @@ -488,7 +505,7 @@ followed by `request.end()`. ## http.ClientResponse -This object is created when making a request with `http.Client`. It is +This object is created when making a request with `http.request()`. It is passed to the `'response'` event of the request object. The response implements the `Readable Stream` interface. @@ -499,10 +516,6 @@ The response implements the `Readable Stream` interface. Emitted when a piece of the message body is received. - Example: A chunk of the body is given as the single - argument. The transfer-encoding has been decoded. The - body chunk a String. The body encoding is set with - `response.setBodyEncoding()`. ### Event: 'end' @@ -542,7 +555,3 @@ Pauses response from emitting events. Useful to throttle back a download. ### response.resume() Resumes a paused response. - -### response.client - -A reference to the `http.Client` that this response belongs to. From 0866ecaf3fcda4603d8a0732e457d74248950b6b Mon Sep 17 00:00:00 2001 From: Ryan Dahl Date: Thu, 20 Jan 2011 17:55:13 -0800 Subject: [PATCH 08/10] ... --- lib/http.js | 1 + src/node_http_parser.cc | 1 - 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/http.js b/lib/http.js index 889fa25b7e..d9633024d7 100644 --- a/lib/http.js +++ b/lib/http.js @@ -1163,6 +1163,7 @@ exports.get = function(options, cb) { options.method = 'GET'; var req = exports.request(options, cb); req.end(); + return req; }; diff --git a/src/node_http_parser.cc b/src/node_http_parser.cc index 31e0ae5b60..7390a71127 100644 --- a/src/node_http_parser.cc +++ b/src/node_http_parser.cc @@ -229,7 +229,6 @@ class Parser : public ObjectWrap { } parser->Wrap(args.This()); - assert(!current_buffer); return args.This(); } From 7a16e2a2a06ba88985ec62b78bd05f5257f882f3 Mon Sep 17 00:00:00 2001 From: Ryan Dahl Date: Thu, 20 Jan 2011 18:19:30 -0800 Subject: [PATCH 09/10] test-http-client-reconnect-bug: hang up should be error. --- test/pummel/test-http-client-reconnect-bug.js | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/test/pummel/test-http-client-reconnect-bug.js b/test/pummel/test-http-client-reconnect-bug.js index 3b102fc0f6..b104f4dd46 100644 --- a/test/pummel/test-http-client-reconnect-bug.js +++ b/test/pummel/test-http-client-reconnect-bug.js @@ -11,11 +11,12 @@ var eofCount = 0; var server = net.createServer(function(socket) { socket.end(); }); + server.on('listening', function() { var client = http.createClient(common.PORT); client.addListener('error', function(err) { - console.log('ERROR! ' + (err.stack || err)); + console.log('ERROR! ' + err.message); errorCount++; }); @@ -30,6 +31,7 @@ server.on('listening', function() { console.log('STATUS: ' + response.statusCode); }); }); + server.listen(common.PORT); setTimeout(function() { @@ -38,6 +40,6 @@ setTimeout(function() { process.addListener('exit', function() { - assert.equal(0, errorCount); + assert.equal(1, errorCount); assert.equal(1, eofCount); }); From a86747603c3ca0713a8170157afe47a41d83bbd7 Mon Sep 17 00:00:00 2001 From: Ryan Dahl Date: Thu, 20 Jan 2011 18:23:50 -0800 Subject: [PATCH 10/10] Fix test-http-upload-timeout Cannot just close the connection or client will error. --- test/pummel/test-http-upload-timeout.js | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/test/pummel/test-http-upload-timeout.js b/test/pummel/test-http-upload-timeout.js index 4fb4980019..06b8e74496 100644 --- a/test/pummel/test-http-upload-timeout.js +++ b/test/pummel/test-http-upload-timeout.js @@ -14,18 +14,20 @@ server.on('request', function(req, res) { }); req.on('end', function() { connections--; - req.socket.end(); + res.writeHead(200); + res.end("done\n"); if (connections == 0) { server.close(); } }); }); + server.listen(common.PORT, '127.0.0.1', function() { for (var i = 0; i < 10; i++) { connections++; setTimeout(function() { - var client = http.createClient(common.PORT, '127.0.0.1'), + var client = http.createClient(common.PORT), request = client.request('POST', '/'); function ping() {