diff --git a/lib/http.js b/lib/http.js index ec68f54668..32e2ef3072 100644 --- a/lib/http.js +++ b/lib/http.js @@ -37,7 +37,8 @@ if (process.env.NODE_DEBUG && /http/.test(process.env.NODE_DEBUG)) { } function readStart(socket) { - if (!socket || !socket._handle || !socket._handle.readStart) return; + if (!socket || !socket._handle || !socket._handle.readStart || socket._paused) + return; socket._handle.readStart(); } @@ -172,10 +173,8 @@ function parserOnMessageComplete() { stream.push(null); } - if (parser.socket.readable) { - // force to read the next incoming message - readStart(parser.socket); - } + // force to read the next incoming message + readStart(parser.socket); } @@ -1963,6 +1962,7 @@ function connectionListener(socket) { }); socket.ondata = function(d, start, end) { + assert(!socket._paused); var ret = parser.execute(d, start, end - start); if (ret instanceof Error) { debug('parse error'); @@ -1989,6 +1989,12 @@ function connectionListener(socket) { socket.destroy(); } } + + if (socket._paused) { + // onIncoming paused the socket, we should pause the parser as well + debug('pause parser'); + socket.parser.pause(); + } }; socket.onend = function() { @@ -2017,9 +2023,35 @@ function connectionListener(socket) { // The following callback is issued after the headers have been read on a // new message. In this callback we setup the response object and pass it // to the user. + + socket._paused = false; + function socketOnDrain() { + // If we previously paused, then start reading again. + if (socket._paused) { + socket._paused = false; + socket.parser.resume(); + readStart(socket); + } + } + socket.on('drain', socketOnDrain); + parser.onIncoming = function(req, shouldKeepAlive) { incoming.push(req); + // If the writable end isn't consuming, then stop reading + // so that we don't become overwhelmed by a flood of + // pipelined requests that may never be resolved. + if (!socket._paused) { + var needPause = socket._writableState.needDrain; + if (needPause) { + socket._paused = true; + // We also need to pause the parser, but don't do that until after + // the call to execute, because we may still be processing the last + // chunk. + readStop(socket); + } + } + var res = new ServerResponse(req); res.shouldKeepAlive = shouldKeepAlive; diff --git a/test/simple/test-http-pipeline-flood.js b/test/simple/test-http-pipeline-flood.js new file mode 100644 index 0000000000..30925eef38 --- /dev/null +++ b/test/simple/test-http-pipeline-flood.js @@ -0,0 +1,114 @@ +// Copyright Joyent, Inc. and other Node contributors. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the +// "Software"), to deal in the Software without restriction, including +// without limitation the rights to use, copy, modify, merge, publish, +// distribute, sublicense, and/or sell copies of the Software, and to permit +// persons to whom the Software is furnished to do so, subject to the +// following conditions: +// +// The above copyright notice and this permission notice shall be included +// in all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN +// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR +// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE +// USE OR OTHER DEALINGS IN THE SOFTWARE. + +var common = require('../common'); +var assert = require('assert'); + +switch (process.argv[2]) { + case undefined: + return parent(); + case 'child': + return child(); + default: + throw new Error('wtf'); +} + +function parent() { + var http = require('http'); + var bigResponse = new Buffer(10240) + bigResponse.fill('x'); + var gotTimeout = false; + var childClosed = false; + var requests = 0; + var connections = 0; + + var server = http.createServer(function(req, res) { + requests++; + res.setHeader('content-length', bigResponse.length); + res.end(bigResponse); + }); + + server.on('connection', function(conn) { + connections++; + }); + + // kill the connection after a bit, verifying that the + // flood of requests was eventually halted. + server.setTimeout(200, function(conn) { + gotTimeout = true; + conn.destroy(); + }); + + server.listen(common.PORT, function() { + var spawn = require('child_process').spawn; + var args = [__filename, 'child']; + var child = spawn(process.execPath, args, { stdio: 'inherit' }); + child.on('close', function(code) { + assert(!code); + childClosed = true; + server.close(); + }); + }); + + process.on('exit', function() { + assert(gotTimeout); + assert(childClosed); + assert.equal(connections, 1); + // 1213 works out to be the number of requests we end up processing + // before the outgoing connection backs up and requires a drain. + // however, to avoid being unnecessarily tied to a specific magic number, + // and making the test brittle, just assert that it's "a lot", which we + // can safely assume is more than 500. + assert(requests >= 500); + console.log('ok'); + }); +} + +function child() { + var net = require('net'); + + var gotEpipe = false; + var conn = net.connect({ port: common.PORT }); + + var req = 'GET / HTTP/1.1\r\nHost: localhost:' + + common.PORT + '\r\nAccept: */*\r\n\r\n'; + + req = new Array(10241).join(req); + + conn.on('connect', function() { + write(); + }); + + conn.on('drain', write); + + conn.on('error', function(er) { + gotEpipe = true; + }); + + process.on('exit', function() { + assert(gotEpipe); + console.log('ok - child'); + }); + + function write() { + while (false !== conn.write(req, 'ascii')); + } +}