diff --git a/lib/stream.js b/lib/stream.js index d31a9fe239..632c87d2e2 100644 --- a/lib/stream.js +++ b/lib/stream.js @@ -28,9 +28,13 @@ function Stream() { util.inherits(Stream, events.EventEmitter); exports.Stream = Stream; +var pipes = []; + Stream.prototype.pipe = function(dest, options) { var source = this; + pipes.push(dest); + function ondata(chunk) { if (dest.writable) { if (false === dest.write(chunk)) source.pause(); @@ -52,10 +56,18 @@ Stream.prototype.pipe = function(dest, options) { if (!options || options.end !== false) { function onend() { + var index = pipes.indexOf(dest); + pipes.splice(index, 1); + + if (pipes.indexOf(dest) > -1) { + return; + } + dest.end(); } source.on('end', onend); + source.on('close', onend); } /* @@ -73,34 +85,35 @@ Stream.prototype.pipe = function(dest, options) { source.emit('resume'); }; } - + var onpause = function() { source.pause(); } dest.on('pause', onpause); - + var onresume = function() { if (source.readable) source.resume(); }; - + dest.on('resume', onresume); - + var cleanup = function () { source.removeListener('data', ondata); dest.removeListener('drain', ondrain); source.removeListener('end', onend); - + source.removeListener('close', onend); + dest.removeListener('pause', onpause); dest.removeListener('resume', onresume); - + source.removeListener('end', cleanup); source.removeListener('close', cleanup); - + dest.removeListener('end', cleanup); dest.removeListener('close', cleanup); } - + source.on('end', cleanup); source.on('close', cleanup); diff --git a/test/simple/test-stream-pipe-cleanup.js b/test/simple/test-stream-pipe-cleanup.js index fce4ac82a7..32ecd153dc 100644 --- a/test/simple/test-stream-pipe-cleanup.js +++ b/test/simple/test-stream-pipe-cleanup.js @@ -28,10 +28,13 @@ var util = require('util'); function Writable () { this.writable = true; + this.endCalls = 0; stream.Stream.call(this); } util.inherits(Writable, stream.Stream); -Writable.prototype.end = function () {} +Writable.prototype.end = function () { + this.endCalls++; +} function Readable () { this.readable = true; @@ -56,6 +59,9 @@ for (i = 0; i < limit; i++) { r.emit('end') } assert.equal(0, r.listeners('end').length); +assert.equal(limit, w.endCalls); + +w.endCalls = 0; for (i = 0; i < limit; i++) { r = new Readable() @@ -63,6 +69,19 @@ for (i = 0; i < limit; i++) { r.emit('close') } assert.equal(0, r.listeners('close').length); +assert.equal(limit, w.endCalls); + +w.endCalls = 0; + +var r2; +r = new Readable() +r2 = new Readable(); + +r.pipe(w) +r2.pipe(w) +r.emit('close') +r2.emit('close') +assert.equal(1, w.endCalls); r = new Readable();