From 545807918ee72f55dfefc6a4c557e3d4e2018ee1 Mon Sep 17 00:00:00 2001 From: isaacs Date: Mon, 19 Aug 2013 07:59:39 -0700 Subject: [PATCH] stream: Throw on 'error' if listeners removed In this situation: writable.on('error', handler); readable.pipe(writable); writable.removeListener('error', handler); writable.emit('error', new Error('boom')); there is actually no error handler, but it doesn't throw, because of the fix for stream.once('error', handler), in 23d92ec. Note that simply reverting that change is not valid either, because otherwise this will emit twice, being handled the first time, and then throwing the second: writable.once('error', handler); readable.pipe(writable); writable.emit('error', new Error('boom')); Fix this with a horrible hack to make the stream pipe onerror handler added before any other userland handlers, so that our handler is not affected by adding or removing any userland handlers. Closes #6007. --- lib/_stream_readable.js | 16 +++- .../simple/test-stream-pipe-error-handling.js | 73 +++++++++++++++++++ 2 files changed, 85 insertions(+), 4 deletions(-) diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index f35599f455..368c923b63 100755 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -511,14 +511,22 @@ Readable.prototype.pipe = function(dest, pipeOpts) { // if the dest has an error, then stop piping into it. // however, don't suppress the throwing behavior for this. - // check for listeners before emit removes one-time listeners. - var errListeners = EE.listenerCount(dest, 'error'); function onerror(er) { unpipe(); - if (errListeners === 0 && EE.listenerCount(dest, 'error') === 0) + dest.removeListener('error', onerror); + if (EE.listenerCount(dest, 'error') === 0) dest.emit('error', er); } - dest.once('error', onerror); + // This is a brutally ugly hack to make sure that our error handler + // is attached before any userland ones. NEVER DO THIS. + if (!dest._events.error) + dest.on('error', onerror); + else if (Array.isArray(dest._events.error)) + dest._events.error.unshift(onerror); + else + dest._events.error = [onerror, dest._events.error]; + + // Both close and finish should trigger unpipe, but only once. function onclose() { diff --git a/test/simple/test-stream-pipe-error-handling.js b/test/simple/test-stream-pipe-error-handling.js index df81573daa..c5d724b782 100644 --- a/test/simple/test-stream-pipe-error-handling.js +++ b/test/simple/test-stream-pipe-error-handling.js @@ -56,3 +56,76 @@ var Stream = require('stream').Stream; assert.strictEqual(gotErr, err); })(); + +(function testErrorWithRemovedListenerThrows() { + var EE = require('events').EventEmitter; + var R = Stream.Readable; + var W = Stream.Writable; + + var r = new R; + var w = new W; + var removed = false; + var didTest = false; + + process.on('exit', function() { + assert(didTest); + console.log('ok'); + }); + + r._read = function() { + setTimeout(function() { + assert(removed); + assert.throws(function() { + w.emit('error', new Error('fail')); + }); + didTest = true; + }); + }; + + w.on('error', myOnError); + r.pipe(w); + w.removeListener('error', myOnError); + removed = true; + + function myOnError(er) { + throw new Error('this should not happen'); + } +})(); + +(function testErrorWithRemovedListenerThrows() { + var EE = require('events').EventEmitter; + var R = Stream.Readable; + var W = Stream.Writable; + + var r = new R; + var w = new W; + var removed = false; + var didTest = false; + var caught = false; + + process.on('exit', function() { + assert(didTest); + console.log('ok'); + }); + + r._read = function() { + setTimeout(function() { + assert(removed); + w.emit('error', new Error('fail')); + didTest = true; + }); + }; + + w.on('error', myOnError); + w._write = function() {}; + + r.pipe(w); + // Removing some OTHER random listener should not do anything + w.removeListener('error', function() {}); + removed = true; + + function myOnError(er) { + assert(!caught); + caught = true; + } +})();