Browse Source

stream: fix y.pipe(x)+y.pipe(x)+y.unpipe(x)

Fix the uncommon situation when a readable stream is piped twice into
the same destination stream, and then unpiped once.

Previously, the `unpipe` event handlers weren’t able to tell whether
they were corresponding to the “right” conceptual pipe that was being
removed; this fixes this by adding a counter to the `unpipe` event
handler and only removing a single piping destination at most.

Fixes: https://github.com/nodejs/node/issues/12718
PR-URL: https://github.com/nodejs/node/pull/12746
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
v6
Anna Henningsen 8 years ago
parent
commit
6993eb0897
No known key found for this signature in database GPG Key ID: D8B9F5AEAE84E4CF
  1. 14
      lib/_stream_readable.js
  2. 78
      test/parallel/test-stream-pipe-same-destination-twice.js

14
lib/_stream_readable.js

@ -518,10 +518,13 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
src.once('end', endFn);
dest.on('unpipe', onunpipe);
function onunpipe(readable) {
function onunpipe(readable, unpipeInfo) {
debug('onunpipe');
if (readable === src) {
cleanup();
if (unpipeInfo && unpipeInfo.hasUnpiped === false) {
unpipeInfo.hasUnpiped = true;
cleanup();
}
}
}
@ -647,6 +650,7 @@ function pipeOnDrain(src) {
Readable.prototype.unpipe = function(dest) {
var state = this._readableState;
var unpipeInfo = { hasUnpiped: false };
// if we're not piping anywhere, then do nothing.
if (state.pipesCount === 0)
@ -666,7 +670,7 @@ Readable.prototype.unpipe = function(dest) {
state.pipesCount = 0;
state.flowing = false;
if (dest)
dest.emit('unpipe', this);
dest.emit('unpipe', this, unpipeInfo);
return this;
}
@ -681,7 +685,7 @@ Readable.prototype.unpipe = function(dest) {
state.flowing = false;
for (var i = 0; i < len; i++)
dests[i].emit('unpipe', this);
dests[i].emit('unpipe', this, unpipeInfo);
return this;
}
@ -695,7 +699,7 @@ Readable.prototype.unpipe = function(dest) {
if (state.pipesCount === 1)
state.pipes = state.pipes[0];
dest.emit('unpipe', this);
dest.emit('unpipe', this, unpipeInfo);
return this;
};

78
test/parallel/test-stream-pipe-same-destination-twice.js

@ -0,0 +1,78 @@
'use strict';
const common = require('../common');
// Regression test for https://github.com/nodejs/node/issues/12718.
// Tests that piping a source stream twice to the same destination stream
// works, and that a subsequent unpipe() call only removes the pipe *once*.
const assert = require('assert');
const { PassThrough, Writable } = require('stream');
{
const passThrough = new PassThrough();
const dest = new Writable({
write: common.mustCall((chunk, encoding, cb) => {
assert.strictEqual(`${chunk}`, 'foobar');
cb();
})
});
passThrough.pipe(dest);
passThrough.pipe(dest);
assert.strictEqual(passThrough._events.data.length, 2);
assert.strictEqual(passThrough._readableState.pipesCount, 2);
assert.strictEqual(passThrough._readableState.pipes[0], dest);
assert.strictEqual(passThrough._readableState.pipes[1], dest);
passThrough.unpipe(dest);
assert.strictEqual(passThrough._events.data.length, 1);
assert.strictEqual(passThrough._readableState.pipesCount, 1);
assert.strictEqual(passThrough._readableState.pipes, dest);
passThrough.write('foobar');
passThrough.pipe(dest);
}
{
const passThrough = new PassThrough();
const dest = new Writable({
write: common.mustCall((chunk, encoding, cb) => {
assert.strictEqual(`${chunk}`, 'foobar');
cb();
}, 2)
});
passThrough.pipe(dest);
passThrough.pipe(dest);
assert.strictEqual(passThrough._events.data.length, 2);
assert.strictEqual(passThrough._readableState.pipesCount, 2);
assert.strictEqual(passThrough._readableState.pipes[0], dest);
assert.strictEqual(passThrough._readableState.pipes[1], dest);
passThrough.write('foobar');
}
{
const passThrough = new PassThrough();
const dest = new Writable({
write: common.mustNotCall()
});
passThrough.pipe(dest);
passThrough.pipe(dest);
assert.strictEqual(passThrough._events.data.length, 2);
assert.strictEqual(passThrough._readableState.pipesCount, 2);
assert.strictEqual(passThrough._readableState.pipes[0], dest);
assert.strictEqual(passThrough._readableState.pipes[1], dest);
passThrough.unpipe(dest);
passThrough.unpipe(dest);
assert.strictEqual(passThrough._events.data, undefined);
assert.strictEqual(passThrough._readableState.pipesCount, 0);
passThrough.write('foobar');
}
Loading…
Cancel
Save