diff --git a/lib/http.js b/lib/http.js index 6904fab22f..46c37ca390 100644 --- a/lib/http.js +++ b/lib/http.js @@ -1,13 +1,13 @@ -var debugLevel = 0; -if ("NODE_DEBUG" in process.env) debugLevel = 1; +var sys = require('sys'); -function debug (x) { - if (debugLevel > 0) { - process.binding('stdio').writeError(x + "\n"); - } +var debug; +var debugLevel = parseInt(process.env.NODE_DEBUG, 16); +if (debugLevel & 0x4) { + debug = function (x) { sys.error('HTTP: ' + x); }; +} else { + debug = function () { }; } -var sys = require('sys'); var net = require('net'); var Utf8Decoder = require('utf8decoder').Utf8Decoder; var events = require('events'); @@ -259,13 +259,13 @@ function OutgoingMessage (socket) { this.output = []; this.outputEncodings = []; - this.closeOnFinish = false; + this._last = false; this.chunkedEncoding = false; this.shouldKeepAlive = true; this.useChunkedEncodingByDefault = true; - this.flushing = false; - this.headWritten = false; + this._headerFlushed = false; + this._header = null; // to be filled by _storeHeader this._hasBody = true; @@ -274,31 +274,45 @@ function OutgoingMessage (socket) { sys.inherits(OutgoingMessage, events.EventEmitter); exports.OutgoingMessage = OutgoingMessage; +// This abstract either writing directly to the socket or buffering it. +// Rename to _writeRaw() ? OutgoingMessage.prototype._send = function (data, encoding) { - var length = this.output.length; + if (this.connection._outgoing[0] === this && + this.connection.writable && + this.output.length === 0) + { + // Directly write to socket. + return this.connection.write(data, encoding); + } else { + // Buffer + var length = this.output.length; + + if (length === 0 || typeof data != 'string') { + this.output.push(data); + encoding = encoding || "ascii"; + this.outputEncodings.push(encoding); + return false; + } + + var lastEncoding = this.outputEncodings[length-1]; + var lastData = this.output[length-1]; + + if ((lastEncoding === encoding) || + (!encoding && data.constructor === lastData.constructor)) { + this.output[length-1] = lastData + data; + return false; + } - if (length === 0 || typeof data != 'string') { this.output.push(data); encoding = encoding || "ascii"; this.outputEncodings.push(encoding); - return; - } - - var lastEncoding = this.outputEncodings[length-1]; - var lastData = this.output[length-1]; - if ((lastEncoding === encoding) || - (!encoding && data.constructor === lastData.constructor)) { - this.output[length-1] = lastData + data; - return; + return false; } - - this.output.push(data); - encoding = encoding || "ascii"; - this.outputEncodings.push(encoding); }; -OutgoingMessage.prototype.sendHeaderLines = function (firstLine, headers) { + +OutgoingMessage.prototype._storeHeader = function (firstLine, headers) { var sentConnectionHeader = false; var sentContentLengthHeader = false; var sentTransferEncodingHeader = false; @@ -325,7 +339,7 @@ OutgoingMessage.prototype.sendHeaderLines = function (firstLine, headers) { if (connectionExpression.test(field)) { sentConnectionHeader = true; - if (closeExpression.test(value)) this.closeOnFinish = true; + if (closeExpression.test(value)) this._last = true; } else if (transferEncodingExpression.test(field)) { sentTransferEncodingHeader = true; @@ -344,7 +358,7 @@ OutgoingMessage.prototype.sendHeaderLines = function (firstLine, headers) { (sentContentLengthHeader || this.useChunkedEncodingByDefault)) { messageHeader += "Connection: keep-alive\r\n"; } else { - this.closeOnFinish = true; + this._last = true; messageHeader += "Connection: close\r\n"; } } @@ -355,7 +369,7 @@ OutgoingMessage.prototype.sendHeaderLines = function (firstLine, headers) { messageHeader += "Transfer-Encoding: chunked\r\n"; this.chunkedEncoding = true; } else { - this.closeOnFinish = true; + this._last = true; } } else { // Make sure we don't end the 0\r\n\r\n at the end of the message. @@ -365,7 +379,7 @@ OutgoingMessage.prototype.sendHeaderLines = function (firstLine, headers) { messageHeader += CRLF; - this._send(messageHeader); + this._header = messageHeader; // wait until the first body chunk, or close(), is sent to flush. }; @@ -376,7 +390,7 @@ OutgoingMessage.prototype.sendBody = function () { OutgoingMessage.prototype.write = function (chunk, encoding) { - if ( (this instanceof ServerResponse) && !this.headWritten) { + if (!this._header) { throw new Error("writeHead() must be called before write()") } @@ -390,28 +404,35 @@ OutgoingMessage.prototype.write = function (chunk, encoding) { throw new TypeError("first argument must be a string, Array, or Buffer"); } - encoding = encoding || "ascii"; + // write the header + + if (!this._headerFlushed) { + this._send(this._header); + this._headerFlushed = true; + } + + if (chunk.length === 0) return false; + + var len, ret; if (this.chunkedEncoding) { - var chunkLength = (typeof chunk == 'string' ? process._byteLength(chunk, encoding) : chunk.length); - if (chunkLength > 0) { - this._send(chunkLength.toString(16)); - this._send(CRLF); - this._send(chunk, encoding); - this._send(CRLF); + if (typeof(chunk) === 'string') { + len = Buffer.byteLength(chunk, encoding); + var chunk = len.toString(16) + CRLF + chunk + CRLF; + debug('string chunk = ' + sys.inspect(chunk)); + ret = this._send(chunk, encoding); + } else { + // buffer + len = chunk.length; + this._send(len.toString(16) + CRLF); + this._send(chunk); + ret = this._send(CRLF); } } else { - this._send(chunk, encoding); + ret = this._send(chunk, encoding); } - if (this.flushing) { - this.flush(); - } else { - this.flushing = true; - } -}; - -OutgoingMessage.prototype.flush = function () { - this._onFlush(); + debug('write ret = ' + ret); + return ret; }; OutgoingMessage.prototype.finish = function () { @@ -429,10 +450,31 @@ OutgoingMessage.prototype.close = function (data, encoding) { }; OutgoingMessage.prototype.end = function (data, encoding) { - if (data) this.write(data, encoding); - if (this.chunkedEncoding) this._send("0\r\n\r\n"); // last chunk + var ret; + // maybe the header hasn't been sent. if not send it. + if (!this._headerFlushed) { + ret = this._send(this._header); + this._headerFlushed = true; + } + + if (data) { + ret = this.write(data, encoding); + } + this.finished = true; - this.flush(); + + if (this.chunkedEncoding) { + ret = this._send("0\r\n\r\n"); // last chunk + } + + // There is the first message on the outgoing queue, and we've sent + // everything to the socket. + if (this.output.length === 0 && this.connection._outgoing[0] === this) { + debug('outgoing message end. shifting because was flushed'); + this.connection._onOutgoingSent(); + } + + return ret; }; @@ -479,8 +521,7 @@ ServerResponse.prototype.writeHead = function (statusCode) { } - this.sendHeaderLines(statusLine, headers); - this.headWritten = true; + this._storeHeader(statusLine, headers); }; // TODO Eventually remove @@ -510,9 +551,9 @@ function ClientRequest (socket, method, url, headers) { } else { this.useChunkedEncodingByDefault = true; } - this.closeOnFinish = true; + this._last = true; - this.sendHeaderLines(method + " " + url + " HTTP/1.1\r\n", headers); + this._storeHeader(method + " " + url + " HTTP/1.1\r\n", headers); } sys.inherits(ClientRequest, OutgoingMessage); exports.ClientRequest = ClientRequest; @@ -531,49 +572,82 @@ ClientRequest.prototype.close = function () { clientRequestCloseWarning = "Warning: ClientRequest.prototype.close has been renamed to end()"; sys.error(clientRequestCloseWarning); } - if (arguments.length > 0) { - throw new Error( "ClientRequest.prototype.end does not take any arguments. " - + "Add a response listener manually to the request object." + if (typeof arguments[0] == "function") { + throw new Error( "ClientRequest.prototype.end does not take a callback. " + + "Add a 'response' listener manually to the request object." ); } return this.end(); }; ClientRequest.prototype.end = function () { - if (arguments.length > 0) { - throw new Error( "ClientRequest.prototype.end does not take any arguments. " - + "Add a response listener manually to the request object." + if (typeof arguments[0] == "function") { + throw new Error( "ClientRequest.prototype.end does not take a callback. " + + "Add a 'response' listener manually to get the response." ); } - OutgoingMessage.prototype.end.call(this); + OutgoingMessage.prototype.end.apply(this, arguments); }; -/* Returns true if the message queue is finished and the socket - * should be closed. */ -function flushMessageQueue (socket, queue) { - while (queue[0]) { - var message = queue[0]; - - while (message.output.length > 0) { - if (!socket.writable) return true; - - var data = message.output.shift(); - var encoding = message.outputEncodings.shift(); +function outgoingFlush (socket) { + // This logic is probably a bit confusing. Let me explain a bit: + // + // In both HTTP servers and clients it is possible to queue up several + // outgoing messages. This is easiest to imagine in the case of a client. + // Take the following situation: + // + // req1 = client.request('GET', '/'); + // req2 = client.request('POST', '/'); + // + // The question is what happens when the user does + // + // req2.write("hello world\n"); + // + // It's possible that the first request has not been completely flushed to + // the socket yet. Thus the outgoing messages need to be prepared to queue + // up data internally before sending it on further to the socket's queue. + // + // This function, outgoingFlush(), is called by both the Server + // implementation and the Client implementation to attempt to flush any + // pending messages out to the socket. + var message = socket._outgoing[0]; + + if (!message) return; + + var ret; + + while (message.output.length) { + if (!socket.writable) return; // XXX Necessary? + + var data = message.output.shift(); + var encoding = message.outputEncodings.shift(); + + ret = socket.write(data, encoding); + } - socket.write(data, encoding); - } + if (message.finished) { + socket._onOutgoingSent(); + } else if (ret) { + message.emit('drain'); + } +} - if (!message.finished) break; - queue.shift(); +function httpSocketSetup (socket) { + // An array of outgoing messages for the socket. In pipelined connections + // we need to keep track of the order they were sent. + socket._outgoing = []; - if (message.closeOnFinish) return true; - } - return false; + // NOTE: be sure not to use ondrain elsewhere in this file! + socket.ondrain = function () { + var message = socket._outgoing[0]; + if (message) message.emit('drain'); + }; } + function Server (requestListener) { net.Server.call(this); @@ -598,9 +672,10 @@ exports.createServer = function (requestListener) { function connectionListener (socket) { var self = this; - // An array of responses for each socket. In pipelined connections - // we need to keep track of the order they were sent. - var responses = []; + + debug("new http connection"); + + httpSocketSetup(socket); socket.setTimeout(2*60*1000); // 2 minute timeout socket.addListener('timeout', function () { @@ -646,13 +721,29 @@ function connectionListener (socket) { socket.onend = function () { parser.finish(); + if (socket._outgoing.length) { + socket._outgoing[socket._outgoing.length-1]._last = true; + outgoingFlush(socket); + } else { + socket.end(); + } + }; + + socket.addListener('close', function () { // unref the parser for easy gc parsers.free(parser); + }); - if (responses.length == 0) { + // At the end of each response message, after it has been flushed to the + // socket. Here we insert logic about what to do next. + socket._onOutgoingSent = function (message) { + var message = socket._outgoing.shift(); + if (message._last) { + // No more messages to be pushed out. socket.end(); - } else { - responses[responses.length-1].closeOnFinish = true; + } else if (socket._outgoing.length) { + // Push out the next message. + outgoingFlush(socket); } }; @@ -661,14 +752,9 @@ function connectionListener (socket) { // to the user. parser.onIncoming = function (req, shouldKeepAlive) { var res = new ServerResponse(req); - + debug('server response shouldKeepAlive: ' + shouldKeepAlive); res.shouldKeepAlive = shouldKeepAlive; - res._onFlush = function () { - if (flushMessageQueue(socket, responses)) { - socket.end(); - } - }; - responses.push(res); + socket._outgoing.push(res); self.emit('request', req, res); return false; // Not a HEAD response. (Not even a response!) @@ -678,56 +764,40 @@ function connectionListener (socket) { function Client ( ) { net.Stream.call(this); - var self = this; - var requests = []; - var currentRequest; + httpSocketSetup(self); + var parser; - self._initParser = function () { + function initParser () { if (!parser) parser = parsers.alloc(); parser.reinitialize('response'); parser.socket = self; - parser.reqs = []; // list of request methods parser.onIncoming = function (res) { debug("incoming response!"); - var isHeadResponse = currentRequest.method == "HEAD"; + var req = self._outgoing[0]; + + // Responses to HEAD requests are AWFUL. Ask Ryan. + // A major oversight in HTTP. Hence this nastiness. + var isHeadResponse = req.method == "HEAD"; debug('isHeadResponse ' + isHeadResponse); res.addListener('end', function ( ) { debug("request complete disconnecting. readyState = " + self.readyState); + // For the moment we reconnect for every request. FIXME! + // All that should be required for keep-alive is to not reconnect, + // but outgoingFlush instead. self.end(); }); - currentRequest.emit("response", res); + req.emit("response", res); return isHeadResponse; }; }; - self._reconnect = function () { - if (self.readyState != "opening") { - debug("HTTP CLIENT: reconnecting readyState = " + self.readyState); - self.connect(self.port, self.host); - } - }; - - self._pushRequest = function (req) { - req._onFlush = function () { - if (self.readyState == "closed") { - debug("HTTP CLIENT request flush. reconnect. readyState = " + self.readyState); - self._reconnect(); - return; - } - - debug("self flush readyState = " + self.readyState); - if (req == currentRequest) flushMessageQueue(self, [req]); - }; - requests.push(req); - }; - self.ondata = function (d, start, end) { if (!parser) { throw new Error("parser not initialized prior to Client.ondata call"); @@ -739,29 +809,27 @@ function Client ( ) { var bytesParsed = ret; var upgradeHead = d.slice(start + bytesParsed, end - start); parser.incoming.upgradeHead = upgradeHead; - currentRequest.emit("response", parser.incoming); - parser.incoming.emit('end'); + var req = self._outgoing[0]; self.ondata = null; self.onend = null } }; self.addListener("connect", function () { + debug('client connected'); if (this.https) { this.setSecure(this.credentials); } else { - self._initParser(); - debug('requests: ' + sys.inspect(requests)); - currentRequest = requests.shift() - currentRequest.flush(); + initParser(); + debug('requests: ' + sys.inspect(self._outgoing)); + outgoingFlush(self); } }); self.addListener("secure", function () { - self._initParser(); - debug('requests: ' + sys.inspect(requests)); - currentRequest = requests.shift() - currentRequest.flush(); + initParser(); + debug('requests: ' + sys.inspect(self._outgoing)); + outgoingFlush(self); }); self.onend = function () { @@ -775,28 +843,64 @@ function Client ( ) { debug("HTTP CLIENT onClose. readyState = " + self.readyState); + // finally done with the request + self._outgoing.shift(); + // If there are more requests to handle, reconnect. - if (requests.length > 0) { + if (self._outgoing.length) { self._reconnect(); } else if (parser) { parsers.free(parser); parser = null; } }); - }; sys.inherits(Client, net.Stream); exports.Client = Client; exports.createClient = function (port, host, https, credentials) { - var c = new Client; + var c = new Client(); c.port = port; c.host = host; c.https = https; c.credentials = credentials; return c; -} +}; + + +// This is called each time a request has been pushed completely to the +// socket. The message that was sent is still sitting at client._outgoing[0] +// it is our responsibility to shift it off. +// +// We have to be careful when it we shift it because once we do any writes +// to other requests will be flushed directly to the socket. +// +// At the moment we're implement a client which connects and disconnects on +// each request/response cycle so we cannot shift off the request from +// client._outgoing until we're completely disconnected after the response +// comes back. +Client.prototype._onOutgoingSent = function (message) { + // We've just finished a message. We don't end/shutdown the connection here + // because HTTP servers typically cannot handle half-closed connections + // (Node servers can). + // + // Instead, we just check if the connection is closed, and if so + // reconnect if we have pending messages. + if (this._outgoing.length && this.readyState == "closed") { + debug("HTTP client request flush. reconnect. readyState = " + this.readyState); + this._reconnect(); + } +}; + + +Client.prototype._reconnect = function () { + if (this.readyState === "closed") { + debug("HTTP CLIENT: reconnecting readyState = " + this.readyState); + this.connect(this.port, this.host); + } +}; + Client.prototype.get = function () { throw new Error("client.get(...) is now client.request('GET', ...)"); @@ -819,13 +923,15 @@ Client.prototype.put = function () { }; Client.prototype.request = function (method, url, headers) { - if (typeof(url) != "string") { // assume method was omitted, shift arguments + if (typeof(url) != "string") { + // assume method was omitted, shift arguments headers = url; url = method; - method = null; + method = "GET"; } - var req = new ClientRequest(this, method || "GET", url, headers); - this._pushRequest(req); + var req = new ClientRequest(this, method, url, headers); + this._outgoing.push(req); + if (this.readyState === 'closed') this._reconnect(); return req; }; diff --git a/lib/module.js b/lib/module.js index 386d796b48..2c867528dc 100644 --- a/lib/module.js +++ b/lib/module.js @@ -63,9 +63,9 @@ var events = eventsModule.exports; // Modules -var debugLevel = parseInt(process.env["NODE_DEBUG"]); +var debugLevel = parseInt(process.env["NODE_DEBUG"], 16); function debug (x) { - if (debugLevel > 0) { + if (debugLevel & 1) { process.binding('stdio').writeError(x + "\n"); } } diff --git a/lib/net.js b/lib/net.js index 4c9c525966..ce0961a92e 100644 --- a/lib/net.js +++ b/lib/net.js @@ -7,9 +7,9 @@ var dns = require('dns'); var kMinPoolSpace = 128; var kPoolSize = 40*1024; -var debugLevel = process.env['NODE_DEBUG'] ? 1 : 0; +var debugLevel = parseInt(process.env.NODE_DEBUG, 16); function debug () { - if (debugLevel > 0) sys.error.apply(this, arguments); + if (debugLevel & 0x2) sys.error.apply(this, arguments); } var binding = process.binding('net'); diff --git a/test/simple/test-http-304.js b/test/simple/test-http-304.js index 92eb2a631e..896c932141 100644 --- a/test/simple/test-http-304.js +++ b/test/simple/test-http-304.js @@ -13,10 +13,11 @@ sys.puts('Server running at http://127.0.0.1:'+PORT+'/') s.addListener('listening', function () { - childProcess.exec('curl http://127.0.0.1:'+PORT+'/', function (err, stdout, stderr) { + childProcess.exec('curl -i http://127.0.0.1:'+PORT+'/', function (err, stdout, stderr) { if (err) throw err; s.close(); - sys.puts('curled response correctly'); + error('curled response correctly'); + error(sys.inspect(stdout)); }); }); diff --git a/test/simple/test-http-client-upload.js b/test/simple/test-http-client-upload.js index ce7887a7ce..bdc1f2d674 100644 --- a/test/simple/test-http-client-upload.js +++ b/test/simple/test-http-client-upload.js @@ -30,8 +30,10 @@ var req = client.request('POST', '/'); req.write('1\n'); req.write('2\n'); req.write('3\n'); +req.end(); + +error("client finished sending request"); -puts("client finished sending request"); req.addListener('response', function(res) { res.setEncoding("utf8"); res.addListener('data', function(chunk) { @@ -42,7 +44,6 @@ req.addListener('response', function(res) { server.close(); }); }); -req.end(); process.addListener("exit", function () { assert.equal("1\n2\n3\n", sent_body); diff --git a/test/simple/test-http-server.js b/test/simple/test-http-server.js index ec249570ef..97480f9395 100644 --- a/test/simple/test-http-server.js +++ b/test/simple/test-http-server.js @@ -21,18 +21,21 @@ http.createServer(function (req, res) { } if (req.id == 1) { + error("req 1"); assert.equal("POST", req.method); assert.equal("/quit", url.parse(req.url).pathname); } if (req.id == 2) { + error("req 2"); assert.equal("foo", req.headers['x-x']); } if (req.id == 3) { + error("req 3"); assert.equal("bar", req.headers['x-x']); this.close(); - //puts("server closed"); + error("server closed"); } setTimeout(function () { diff --git a/test/simple/test-http.js b/test/simple/test-http.js index f1da37c7ca..8aab058f40 100644 --- a/test/simple/test-http.js +++ b/test/simple/test-http.js @@ -2,6 +2,10 @@ require("../common"); http = require("http"); url = require("url"); +function p (x) { + error(inspect(x)); +} + var responses_sent = 0; var responses_recvd = 0; var body0 = ""; @@ -38,14 +42,14 @@ http.createServer(function (req, res) { var client = http.createClient(PORT); var req = client.request("/hello", {"Accept": "*/*", "Foo": "bar"}); +req.end(); req.addListener('response', function (res) { assert.equal(200, res.statusCode); responses_recvd += 1; - res.setBodyEncoding("ascii"); + res.setEncoding("utf8"); res.addListener('data', function (chunk) { body0 += chunk; }); debug("Got /hello response"); }); -req.end(); setTimeout(function () { req = client.request("POST", "/world");