diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index 2a2abd4715..a31e5d0a66 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -305,12 +305,14 @@ Readable.prototype.pipe = function(dest, pipeOpts) { dest !== process.stdout && dest !== process.stderr) { src.once('end', onend); - dest.on('unpipe', function unpipe(readable) { - if (readable === src) { - src.removeListener('end', onend); - dest.removeListener('unpipe', unpipe); - } - }); + } else { + src.once('end', cleanup); + } + + dest.on('unpipe', onunpipe); + function onunpipe(readable) { + if (readable !== src) return; + cleanup(); } if (pipeOpts && pipeOpts.chunkSize) @@ -326,19 +328,25 @@ Readable.prototype.pipe = function(dest, pipeOpts) { // too slow. var ondrain = pipeOnDrain(src); dest.on('drain', ondrain); - dest.on('unpipe', function cleanup(readable) { - if (readable === src) { - dest.removeListener('unpipe', cleanup); - - // if the reader is waiting for a drain event from this - // specific writer, then it would cause it to never start - // flowing again. - // So, if this is awaiting a drain, then we just call it now. - // If we don't know, then assume that we are waiting for one. - if (!dest._writableState || dest._writableState.needDrain) - ondrain(); - } - }); + + function cleanup() { + // cleanup event handlers once the pipe is broken + dest.removeListener('close', unpipe); + dest.removeListener('finish', onfinish); + dest.removeListener('drain', ondrain); + dest.removeListener('error', onerror); + dest.removeListener('unpipe', onunpipe); + src.removeListener('end', onend); + src.removeListener('end', cleanup); + + // if the reader is waiting for a drain event from this + // specific writer, then it would cause it to never start + // flowing again. + // So, if this is awaiting a drain, then we just call it now. + // If we don't know, then assume that we are waiting for one. + if (!dest._writableState || dest._writableState.needDrain) + ondrain(); + } // if the dest has an error, then stop piping into it. // however, don't suppress the throwing behavior for this. @@ -361,17 +369,6 @@ Readable.prototype.pipe = function(dest, pipeOpts) { src.unpipe(dest); } - // cleanup event handlers once the pipe is broken - dest.once('unpipe', function cleanup(readable) { - if (readable !== src) return; - - dest.removeListener('close', unpipe); - dest.removeListener('finish', onfinish); - dest.removeListener('drain', ondrain); - dest.removeListener('error', onerror); - dest.removeListener('unpipe', cleanup); - }); - // tell the dest that it's being piped to dest.emit('pipe', src);