Browse Source

streams: fix pipe is destructed by 'end' from destination

v0.9.3-release
koichik 13 years ago
committed by isaacs
parent
commit
016afe21ae
  1. 1
      lib/stream.js
  2. 48
      test/simple/test-stream-pipe-cleanup.js

1
lib/stream.js

@ -106,7 +106,6 @@ Stream.prototype.pipe = function(dest, options) {
source.on('end', cleanup); source.on('end', cleanup);
source.on('close', cleanup); source.on('close', cleanup);
dest.on('end', cleanup);
dest.on('close', cleanup); dest.on('close', cleanup);
dest.emit('pipe', source); dest.emit('pipe', source);

48
test/simple/test-stream-pipe-cleanup.js

@ -47,15 +47,17 @@ function Readable() {
} }
util.inherits(Readable, stream.Stream); util.inherits(Readable, stream.Stream);
function Duplex() {
this.readable = true;
Writable.call(this);
}
util.inherits(Duplex, Writable);
var i = 0; var i = 0;
var limit = 100; var limit = 100;
var w = new Writable(); var w = new Writable();
console.error = function(text) {
throw new Error(text);
};
var r; var r;
for (i = 0; i < limit; i++) { for (i = 0; i < limit; i++) {
@ -80,13 +82,6 @@ w.endCalls = 0;
r = new Readable(); r = new Readable();
for (i = 0; i < limit; i++) {
w = new Writable();
r.pipe(w);
w.emit('end');
}
assert.equal(0, w.listeners('end').length);
for (i = 0; i < limit; i++) { for (i = 0; i < limit; i++) {
w = new Writable(); w = new Writable();
r.pipe(w); r.pipe(w);
@ -94,3 +89,34 @@ for (i = 0; i < limit; i++) {
} }
assert.equal(0, w.listeners('close').length); assert.equal(0, w.listeners('close').length);
r = new Readable();
w = new Writable();
var d = new Duplex();
r.pipe(d); // pipeline A
d.pipe(w); // pipeline B
assert.equal(r.listeners('end').length, 2); // A.onend, A.cleanup
assert.equal(r.listeners('close').length, 2); // A.onclose, A.cleanup
assert.equal(d.listeners('end').length, 2); // B.onend, B.cleanup
assert.equal(d.listeners('close').length, 3); // A.cleanup, B.onclose, B.cleanup
assert.equal(w.listeners('end').length, 0);
assert.equal(w.listeners('close').length, 1); // B.cleanup
r.emit('end');
assert.equal(d.endCalls, 1);
assert.equal(w.endCalls, 0);
assert.equal(r.listeners('end').length, 0);
assert.equal(r.listeners('close').length, 0);
assert.equal(d.listeners('end').length, 2); // B.onend, B.cleanup
assert.equal(d.listeners('close').length, 2); // B.onclose, B.cleanup
assert.equal(w.listeners('end').length, 0);
assert.equal(w.listeners('close').length, 1); // B.cleanup
d.emit('end');
assert.equal(d.endCalls, 1);
assert.equal(w.endCalls, 1);
assert.equal(r.listeners('end').length, 0);
assert.equal(r.listeners('close').length, 0);
assert.equal(d.listeners('end').length, 0);
assert.equal(d.listeners('close').length, 0);
assert.equal(w.listeners('end').length, 0);
assert.equal(w.listeners('close').length, 0);

Loading…
Cancel
Save