From 3e6f737eaf7042d5ffc1f0d560a94ac669975898 Mon Sep 17 00:00:00 2001 From: isaacs Date: Mon, 7 Jan 2013 06:10:35 -0800 Subject: [PATCH] stream: Clean up more effectively in pipe() --- lib/_stream_readable.js | 57 +++++++++++++++++++---------------------- 1 file changed, 27 insertions(+), 30 deletions(-) diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index 2a2abd4715..a31e5d0a66 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -305,12 +305,14 @@ Readable.prototype.pipe = function(dest, pipeOpts) { dest !== process.stdout && dest !== process.stderr) { src.once('end', onend); - dest.on('unpipe', function unpipe(readable) { - if (readable === src) { - src.removeListener('end', onend); - dest.removeListener('unpipe', unpipe); - } - }); + } else { + src.once('end', cleanup); + } + + dest.on('unpipe', onunpipe); + function onunpipe(readable) { + if (readable !== src) return; + cleanup(); } if (pipeOpts && pipeOpts.chunkSize) @@ -326,19 +328,25 @@ Readable.prototype.pipe = function(dest, pipeOpts) { // too slow. var ondrain = pipeOnDrain(src); dest.on('drain', ondrain); - dest.on('unpipe', function cleanup(readable) { - if (readable === src) { - dest.removeListener('unpipe', cleanup); - - // if the reader is waiting for a drain event from this - // specific writer, then it would cause it to never start - // flowing again. - // So, if this is awaiting a drain, then we just call it now. - // If we don't know, then assume that we are waiting for one. - if (!dest._writableState || dest._writableState.needDrain) - ondrain(); - } - }); + + function cleanup() { + // cleanup event handlers once the pipe is broken + dest.removeListener('close', unpipe); + dest.removeListener('finish', onfinish); + dest.removeListener('drain', ondrain); + dest.removeListener('error', onerror); + dest.removeListener('unpipe', onunpipe); + src.removeListener('end', onend); + src.removeListener('end', cleanup); + + // if the reader is waiting for a drain event from this + // specific writer, then it would cause it to never start + // flowing again. + // So, if this is awaiting a drain, then we just call it now. + // If we don't know, then assume that we are waiting for one. + if (!dest._writableState || dest._writableState.needDrain) + ondrain(); + } // if the dest has an error, then stop piping into it. // however, don't suppress the throwing behavior for this. @@ -361,17 +369,6 @@ Readable.prototype.pipe = function(dest, pipeOpts) { src.unpipe(dest); } - // cleanup event handlers once the pipe is broken - dest.once('unpipe', function cleanup(readable) { - if (readable !== src) return; - - dest.removeListener('close', unpipe); - dest.removeListener('finish', onfinish); - dest.removeListener('drain', ondrain); - dest.removeListener('error', onerror); - dest.removeListener('unpipe', cleanup); - }); - // tell the dest that it's being piped to dest.emit('pipe', src);