Browse Source

Fix: Multiple pipes to the same stream were broken

When creating multiple .pipe()s to the same destination stream, the
first source to end would close the destination, breaking all remaining
pipes. This patch fixes the problem by keeping track of all open
pipes, so that we only call end on destinations that have no more
sources piping to them.

closes #929
v0.7.4-release
Felix Geisendörfer 14 years ago
committed by Ryan Dahl
parent
commit
6c5b31bd80
  1. 29
      lib/stream.js
  2. 21
      test/simple/test-stream-pipe-cleanup.js

29
lib/stream.js

@ -28,9 +28,13 @@ function Stream() {
util.inherits(Stream, events.EventEmitter); util.inherits(Stream, events.EventEmitter);
exports.Stream = Stream; exports.Stream = Stream;
var pipes = [];
Stream.prototype.pipe = function(dest, options) { Stream.prototype.pipe = function(dest, options) {
var source = this; var source = this;
pipes.push(dest);
function ondata(chunk) { function ondata(chunk) {
if (dest.writable) { if (dest.writable) {
if (false === dest.write(chunk)) source.pause(); if (false === dest.write(chunk)) source.pause();
@ -52,10 +56,18 @@ Stream.prototype.pipe = function(dest, options) {
if (!options || options.end !== false) { if (!options || options.end !== false) {
function onend() { function onend() {
var index = pipes.indexOf(dest);
pipes.splice(index, 1);
if (pipes.indexOf(dest) > -1) {
return;
}
dest.end(); dest.end();
} }
source.on('end', onend); source.on('end', onend);
source.on('close', onend);
} }
/* /*
@ -73,34 +85,35 @@ Stream.prototype.pipe = function(dest, options) {
source.emit('resume'); source.emit('resume');
}; };
} }
var onpause = function() { var onpause = function() {
source.pause(); source.pause();
} }
dest.on('pause', onpause); dest.on('pause', onpause);
var onresume = function() { var onresume = function() {
if (source.readable) source.resume(); if (source.readable) source.resume();
}; };
dest.on('resume', onresume); dest.on('resume', onresume);
var cleanup = function () { var cleanup = function () {
source.removeListener('data', ondata); source.removeListener('data', ondata);
dest.removeListener('drain', ondrain); dest.removeListener('drain', ondrain);
source.removeListener('end', onend); source.removeListener('end', onend);
source.removeListener('close', onend);
dest.removeListener('pause', onpause); dest.removeListener('pause', onpause);
dest.removeListener('resume', onresume); dest.removeListener('resume', onresume);
source.removeListener('end', cleanup); source.removeListener('end', cleanup);
source.removeListener('close', cleanup); source.removeListener('close', cleanup);
dest.removeListener('end', cleanup); dest.removeListener('end', cleanup);
dest.removeListener('close', cleanup); dest.removeListener('close', cleanup);
} }
source.on('end', cleanup); source.on('end', cleanup);
source.on('close', cleanup); source.on('close', cleanup);

21
test/simple/test-stream-pipe-cleanup.js

@ -28,10 +28,13 @@ var util = require('util');
function Writable () { function Writable () {
this.writable = true; this.writable = true;
this.endCalls = 0;
stream.Stream.call(this); stream.Stream.call(this);
} }
util.inherits(Writable, stream.Stream); util.inherits(Writable, stream.Stream);
Writable.prototype.end = function () {} Writable.prototype.end = function () {
this.endCalls++;
}
function Readable () { function Readable () {
this.readable = true; this.readable = true;
@ -56,6 +59,9 @@ for (i = 0; i < limit; i++) {
r.emit('end') r.emit('end')
} }
assert.equal(0, r.listeners('end').length); assert.equal(0, r.listeners('end').length);
assert.equal(limit, w.endCalls);
w.endCalls = 0;
for (i = 0; i < limit; i++) { for (i = 0; i < limit; i++) {
r = new Readable() r = new Readable()
@ -63,6 +69,19 @@ for (i = 0; i < limit; i++) {
r.emit('close') r.emit('close')
} }
assert.equal(0, r.listeners('close').length); assert.equal(0, r.listeners('close').length);
assert.equal(limit, w.endCalls);
w.endCalls = 0;
var r2;
r = new Readable()
r2 = new Readable();
r.pipe(w)
r2.pipe(w)
r.emit('close')
r2.emit('close')
assert.equal(1, w.endCalls);
r = new Readable(); r = new Readable();

Loading…
Cancel
Save