diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index 41cf8e89cc..b09694c071 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -49,6 +49,12 @@ function ReadableState(options, stream) { this.endEmitted = false; this.reading = false; + // In streams that never have any data, and do push(null) right away, + // the consumer can miss the 'end' event if they do some I/O before + // consuming the stream. So, we don't emit('end') until some reading + // happens. + this.calledRead = false; + // a flag to be able to tell if the onwrite cb is called immediately, // or on a later tick. We set this to true at first, becuase any // actions that shouldn't happen until "later" should generally also @@ -218,6 +224,7 @@ function howMuchToRead(n, state) { // you can override either this method, or the async _read(n) below. Readable.prototype.read = function(n) { var state = this._readableState; + state.calledRead = true; var nOrig = n; if (typeof n !== 'number' || n > 0) @@ -430,13 +437,15 @@ Readable.prototype.pipe = function(dest, pipeOpts) { } state.pipesCount += 1; - if ((!pipeOpts || pipeOpts.end !== false) && - dest !== process.stdout && - dest !== process.stderr) { - src.once('end', onend); - } else { - src.once('end', cleanup); - } + var doEnd = (!pipeOpts || pipeOpts.end !== false) && + dest !== process.stdout && + dest !== process.stderr; + + var endFn = doEnd ? onend : cleanup; + if (state.endEmitted) + process.nextTick(endFn); + else + src.once('end', endFn); dest.on('unpipe', onunpipe); function onunpipe(readable) { @@ -853,12 +862,12 @@ function endReadable(stream) { if (state.length > 0) throw new Error('endReadable called on non-empty stream'); - if (state.endEmitted) - return; - state.ended = true; - state.endEmitted = true; - process.nextTick(function() { - stream.readable = false; - stream.emit('end'); - }); + if (!state.endEmitted && state.calledRead) { + state.ended = true; + state.endEmitted = true; + process.nextTick(function() { + stream.readable = false; + stream.emit('end'); + }); + } } diff --git a/lib/http.js b/lib/http.js index 0f7ea853c0..2bceaa0152 100644 --- a/lib/http.js +++ b/lib/http.js @@ -416,13 +416,13 @@ IncomingMessage.prototype._addHeaderLine = function(field, value) { // Call this instead of resume() if we want to just // dump all the data to /dev/null IncomingMessage.prototype._dump = function() { - if (this._dumped) - return; - - this._dumped = true; - if (this.socket.parser) this.socket.parser.incoming = null; - this.push(null); - readStart(this.socket); + if (!this._dumped) { + this._dumped = true; + if (this.socket.parser) this.socket.parser.incoming = null; + this.push(null); + readStart(this.socket); + this.read(); + } }; diff --git a/test/simple/test-http-allow-req-after-204-res.js b/test/simple/test-http-allow-req-after-204-res.js index 8f89043094..225ff299e9 100644 --- a/test/simple/test-http-allow-req-after-204-res.js +++ b/test/simple/test-http-allow-req-after-204-res.js @@ -58,6 +58,7 @@ function nextRequest() { //process.nextTick(nextRequest); } }); + response.resume(); }); request.end(); } diff --git a/test/simple/test-http-client-response-domain.js b/test/simple/test-http-client-response-domain.js index 7a3c41bda4..e5a39913f4 100644 --- a/test/simple/test-http-client-response-domain.js +++ b/test/simple/test-http-client-response-domain.js @@ -60,6 +60,7 @@ function test() { res.on('end', function() { res.emit('error', new Error('should be caught by domain')); }); + res.resume(); }); req.end(); } diff --git a/test/simple/test-http-contentLength0.js b/test/simple/test-http-contentLength0.js index 4d6e616d53..c7f7b6b669 100644 --- a/test/simple/test-http-contentLength0.js +++ b/test/simple/test-http-contentLength0.js @@ -36,6 +36,7 @@ s.listen(common.PORT, function() { var request = http.request({ port: common.PORT }, function(response) { console.log('STATUS: ' + response.statusCode); s.close(); + response.resume(); }); request.end(); diff --git a/test/simple/test-http-head-request.js b/test/simple/test-http-head-request.js index 6b6b876aa6..60982abcc0 100644 --- a/test/simple/test-http-head-request.js +++ b/test/simple/test-http-head-request.js @@ -47,6 +47,7 @@ server.listen(common.PORT, function() { common.error('response end'); gotEnd = true; }); + response.resume(); }); request.end(); }); diff --git a/test/simple/test-http-head-response-has-no-body-end.js b/test/simple/test-http-head-response-has-no-body-end.js index 8ad3bcaf34..cde777a58d 100644 --- a/test/simple/test-http-head-response-has-no-body-end.js +++ b/test/simple/test-http-head-response-has-no-body-end.js @@ -51,6 +51,7 @@ server.on('listening', function() { server.close(); responseComplete = true; }); + res.resume(); }); common.error('req'); req.end(); diff --git a/test/simple/test-http-head-response-has-no-body.js b/test/simple/test-http-head-response-has-no-body.js index 620ad75877..ab6bd5b51f 100644 --- a/test/simple/test-http-head-response-has-no-body.js +++ b/test/simple/test-http-head-response-has-no-body.js @@ -48,6 +48,7 @@ server.on('listening', function() { server.close(); responseComplete = true; }); + res.resume(); }); common.error('req'); req.end(); diff --git a/test/simple/test-http-legacy.js b/test/simple/test-http-legacy.js index dd3eb6f7a3..c6605b0e4e 100644 --- a/test/simple/test-http-legacy.js +++ b/test/simple/test-http-legacy.js @@ -61,6 +61,7 @@ var server = http.createServer(function(req, res) { res.end(); responses_sent += 1; }); + req.resume(); //assert.equal('127.0.0.1', res.connection.remoteAddress); }); diff --git a/test/simple/test-http-max-headers-count.js b/test/simple/test-http-max-headers-count.js index 97fce2c169..4595b11e7c 100644 --- a/test/simple/test-http-max-headers-count.js +++ b/test/simple/test-http-max-headers-count.js @@ -75,6 +75,7 @@ server.listen(common.PORT, function() { server.close(); } }); + res.resume(); }); req.maxHeadersCount = max; req.end(); diff --git a/test/simple/test-http-pipe-fs.js b/test/simple/test-http-pipe-fs.js index bc033fe59f..57ebb11dca 100644 --- a/test/simple/test-http-pipe-fs.js +++ b/test/simple/test-http-pipe-fs.js @@ -54,6 +54,7 @@ var server = http.createServer(function(req, res) { server.close(); } }); + res.resume(); }); req.on('socket', function(s) { common.debug('req' + i + ' start'); diff --git a/test/simple/test-http-should-keep-alive.js b/test/simple/test-http-should-keep-alive.js index fc1f5c6c26..e2303be7bf 100644 --- a/test/simple/test-http-should-keep-alive.js +++ b/test/simple/test-http-should-keep-alive.js @@ -59,6 +59,7 @@ var server = net.createServer(function(socket) { } else { server.close(); } + res.resume(); }); } diff --git a/test/simple/test-http.js b/test/simple/test-http.js index 1915e24630..fa388356d8 100644 --- a/test/simple/test-http.js +++ b/test/simple/test-http.js @@ -58,6 +58,7 @@ var server = http.Server(function(req, res) { res.end(); responses_sent += 1; }); + req.resume(); //assert.equal('127.0.0.1', res.connection.remoteAddress); }); diff --git a/test/simple/test-https-client-reject.js b/test/simple/test-https-client-reject.js index 45788a8c89..bf191da1d9 100644 --- a/test/simple/test-https-client-reject.js +++ b/test/simple/test-https-client-reject.js @@ -41,6 +41,7 @@ var server = https.createServer(options, function(req, res) { ++reqCount; res.writeHead(200); res.end(); + req.resume(); }).listen(common.PORT, function() { unauthorized(); }); @@ -51,6 +52,7 @@ function unauthorized() { rejectUnauthorized: false }, function(res) { assert(!req.socket.authorized); + res.resume(); rejectUnauthorized(); }); req.on('error', function(err) { @@ -80,6 +82,7 @@ function authorized() { }; options.agent = new https.Agent(options); var req = https.request(options, function(res) { + res.resume(); assert(req.socket.authorized); server.close(); }); diff --git a/test/simple/test-stream-pipe-after-end.js b/test/simple/test-stream-pipe-after-end.js new file mode 100644 index 0000000000..b46ee90ad7 --- /dev/null +++ b/test/simple/test-stream-pipe-after-end.js @@ -0,0 +1,86 @@ +// Copyright Joyent, Inc. and other Node contributors. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the +// "Software"), to deal in the Software without restriction, including +// without limitation the rights to use, copy, modify, merge, publish, +// distribute, sublicense, and/or sell copies of the Software, and to permit +// persons to whom the Software is furnished to do so, subject to the +// following conditions: +// +// The above copyright notice and this permission notice shall be included +// in all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN +// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR +// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE +// USE OR OTHER DEALINGS IN THE SOFTWARE. + +var common = require('../common'); +var assert = require('assert'); + +var Readable = require('_stream_readable'); +var Writable = require('_stream_writable'); +var util = require('util'); + +util.inherits(TestReadable, Readable); +function TestReadable(opt) { + if (!(this instanceof TestReadable)) + return new TestReadable(opt); + Readable.call(this, opt); + this._ended = false; +} + +TestReadable.prototype._read = function(n) { + if (this._ended) + this.emit('error', new Error('_read called twice')); + this._ended = true; + this.push(null); +}; + +util.inherits(TestWritable, Writable); +function TestWritable(opt) { + if (!(this instanceof TestWritable)) + return new TestWritable(opt); + Writable.call(this, opt); + this._written = []; +} + +TestWritable.prototype._write = function(chunk, encoding, cb) { + this._written.push(chunk); + cb(); +}; + +// this one should not emit 'end' until we read() from it later. +var ender = new TestReadable(); +var enderEnded = false; + +// what happens when you pipe() a Readable that's already ended? +var piper = new TestReadable(); +// pushes EOF null, and length=0, so this will trigger 'end' +piper.read(); + +setTimeout(function() { + ender.on('end', function() { + enderEnded = true; + }); + assert(!enderEnded); + var c = ender.read(); + assert.equal(c, null); + + var w = new TestWritable(); + var writableFinished = false; + w.on('finish', function() { + writableFinished = true; + }); + piper.pipe(w); + + process.on('exit', function() { + assert(enderEnded); + assert(writableFinished); + console.log('ok'); + }); +});