Browse Source

stream: Clean up more effectively in pipe()

v0.9.6-release
isaacs 12 years ago
parent
commit
3e6f737eaf
  1. 57
      lib/_stream_readable.js

57
lib/_stream_readable.js

@ -305,12 +305,14 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
dest !== process.stdout && dest !== process.stdout &&
dest !== process.stderr) { dest !== process.stderr) {
src.once('end', onend); src.once('end', onend);
dest.on('unpipe', function unpipe(readable) { } else {
if (readable === src) { src.once('end', cleanup);
src.removeListener('end', onend); }
dest.removeListener('unpipe', unpipe);
} dest.on('unpipe', onunpipe);
}); function onunpipe(readable) {
if (readable !== src) return;
cleanup();
} }
if (pipeOpts && pipeOpts.chunkSize) if (pipeOpts && pipeOpts.chunkSize)
@ -326,19 +328,25 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
// too slow. // too slow.
var ondrain = pipeOnDrain(src); var ondrain = pipeOnDrain(src);
dest.on('drain', ondrain); dest.on('drain', ondrain);
dest.on('unpipe', function cleanup(readable) {
if (readable === src) { function cleanup() {
dest.removeListener('unpipe', cleanup); // cleanup event handlers once the pipe is broken
dest.removeListener('close', unpipe);
// if the reader is waiting for a drain event from this dest.removeListener('finish', onfinish);
// specific writer, then it would cause it to never start dest.removeListener('drain', ondrain);
// flowing again. dest.removeListener('error', onerror);
// So, if this is awaiting a drain, then we just call it now. dest.removeListener('unpipe', onunpipe);
// If we don't know, then assume that we are waiting for one. src.removeListener('end', onend);
if (!dest._writableState || dest._writableState.needDrain) src.removeListener('end', cleanup);
ondrain();
} // if the reader is waiting for a drain event from this
}); // specific writer, then it would cause it to never start
// flowing again.
// So, if this is awaiting a drain, then we just call it now.
// If we don't know, then assume that we are waiting for one.
if (!dest._writableState || dest._writableState.needDrain)
ondrain();
}
// if the dest has an error, then stop piping into it. // if the dest has an error, then stop piping into it.
// however, don't suppress the throwing behavior for this. // however, don't suppress the throwing behavior for this.
@ -361,17 +369,6 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
src.unpipe(dest); src.unpipe(dest);
} }
// cleanup event handlers once the pipe is broken
dest.once('unpipe', function cleanup(readable) {
if (readable !== src) return;
dest.removeListener('close', unpipe);
dest.removeListener('finish', onfinish);
dest.removeListener('drain', ondrain);
dest.removeListener('error', onerror);
dest.removeListener('unpipe', cleanup);
});
// tell the dest that it's being piped to // tell the dest that it's being piped to
dest.emit('pipe', src); dest.emit('pipe', src);

Loading…
Cancel
Save