diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index f35599f455..368c923b63 100755 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -511,14 +511,22 @@ Readable.prototype.pipe = function(dest, pipeOpts) { // if the dest has an error, then stop piping into it. // however, don't suppress the throwing behavior for this. - // check for listeners before emit removes one-time listeners. - var errListeners = EE.listenerCount(dest, 'error'); function onerror(er) { unpipe(); - if (errListeners === 0 && EE.listenerCount(dest, 'error') === 0) + dest.removeListener('error', onerror); + if (EE.listenerCount(dest, 'error') === 0) dest.emit('error', er); } - dest.once('error', onerror); + // This is a brutally ugly hack to make sure that our error handler + // is attached before any userland ones. NEVER DO THIS. + if (!dest._events.error) + dest.on('error', onerror); + else if (Array.isArray(dest._events.error)) + dest._events.error.unshift(onerror); + else + dest._events.error = [onerror, dest._events.error]; + + // Both close and finish should trigger unpipe, but only once. function onclose() { diff --git a/test/simple/test-stream-pipe-error-handling.js b/test/simple/test-stream-pipe-error-handling.js index df81573daa..c5d724b782 100644 --- a/test/simple/test-stream-pipe-error-handling.js +++ b/test/simple/test-stream-pipe-error-handling.js @@ -56,3 +56,76 @@ var Stream = require('stream').Stream; assert.strictEqual(gotErr, err); })(); + +(function testErrorWithRemovedListenerThrows() { + var EE = require('events').EventEmitter; + var R = Stream.Readable; + var W = Stream.Writable; + + var r = new R; + var w = new W; + var removed = false; + var didTest = false; + + process.on('exit', function() { + assert(didTest); + console.log('ok'); + }); + + r._read = function() { + setTimeout(function() { + assert(removed); + assert.throws(function() { + w.emit('error', new Error('fail')); + }); + didTest = true; + }); + }; + + w.on('error', myOnError); + r.pipe(w); + w.removeListener('error', myOnError); + removed = true; + + function myOnError(er) { + throw new Error('this should not happen'); + } +})(); + +(function testErrorWithRemovedListenerThrows() { + var EE = require('events').EventEmitter; + var R = Stream.Readable; + var W = Stream.Writable; + + var r = new R; + var w = new W; + var removed = false; + var didTest = false; + var caught = false; + + process.on('exit', function() { + assert(didTest); + console.log('ok'); + }); + + r._read = function() { + setTimeout(function() { + assert(removed); + w.emit('error', new Error('fail')); + didTest = true; + }); + }; + + w.on('error', myOnError); + w._write = function() {}; + + r.pipe(w); + // Removing some OTHER random listener should not do anything + w.removeListener('error', function() {}); + removed = true; + + function myOnError(er) { + assert(!caught); + caught = true; + } +})();