Browse Source

streams2: Do multipipe without always using forEach

The Array.forEach call is too expensive.
v0.9.4-release
isaacs 12 years ago
parent
commit
2ff499c022
  1. 88
      lib/_stream_readable.js
  2. 8
      test/simple/test-stream2-basic.js

88
lib/_stream_readable.js

@ -62,7 +62,8 @@ function ReadableState(options, stream) {
this.buffer = [];
this.length = 0;
this.pipes = [];
this.pipes = null;
this.pipesCount = 0;
this.flowing = false;
this.ended = false;
this.endEmitted = false;
@ -282,7 +283,19 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
var state = this._readableState;
if (!pipeOpts)
pipeOpts = {};
switch (state.pipesCount) {
case 0:
state.pipes = dest;
break;
case 1:
state.pipes = [ state.pipes, dest ];
break;
default:
state.pipes.push(dest);
break;
}
state.pipesCount += 1;
if ((!pipeOpts || pipeOpts.end !== false) &&
dest !== process.stdout &&
@ -320,15 +333,22 @@ function flow(src, pipeOpts) {
flow(src, pipeOpts);
}
while (state.pipes.length &&
null !== (chunk = src.read(pipeOpts.chunkSize))) {
state.pipes.forEach(function(dest, i, list) {
function write(dest, i, list) {
var written = dest.write(chunk);
if (false === written) {
needDrain++;
dest.once('drain', ondrain);
}
});
}
while (state.pipesCount &&
null !== (chunk = src.read(pipeOpts.chunkSize))) {
if (state.pipesCount === 1)
write(state.pipes, 0, null);
else
state.pipes.forEach(write);
src.emit('data', chunk);
// if anyone needs a drain, then we have to wait for that.
@ -340,7 +360,7 @@ function flow(src, pipeOpts) {
// function, or in the while loop, then stop flowing.
//
// NB: This is a pretty rare edge case.
if (state.pipes.length === 0) {
if (state.pipesCount === 0) {
state.flowing = false;
// if there were data event listeners added, then switch to old mode.
@ -356,19 +376,55 @@ function flow(src, pipeOpts) {
Readable.prototype.unpipe = function(dest) {
var state = this._readableState;
if (!dest) {
// remove all of them.
state.pipes.forEach(function(dest, i, list) {
dest.emit('unpipe', this);
}, this);
state.pipes.length = 0;
} else {
var i = state.pipes.indexOf(dest);
if (i !== -1) {
// if we're not piping anywhere, then do nothing.
if (state.pipesCount === 0)
return this;
// just one destination. most common case.
if (state.pipesCount === 1) {
// passed in one, but it's not the right one.
if (dest && dest !== state.pipes)
return this;
if (!dest)
dest = state.pipes;
// got a match.
state.pipes = null;
state.pipesCount = 0;
if (dest)
dest.emit('unpipe', this);
state.pipes.splice(i, 1);
return this;
}
// slow case. multiple pipe destinations.
if (!dest) {
// remove all.
var dests = state.pipes;
var len = state.pipesCount;
state.pipes = null;
state.pipesCount = 0;
for (var i = 0; i < len; i++)
dests[i].emit('unpipe', this);
return this;
}
// try to find the right one.
var i = state.pipes.indexOf(dest);
if (i === -1)
return this;
state.pipes.splice(i, 1);
state.pipesCount -= 1;
if (state.pipesCount === 1)
state.pipes = state.pipes[0];
dest.emit('unpipe', this);
return this;
};

8
test/simple/test-stream2-basic.js

@ -209,19 +209,27 @@ test('pipe', function(t) {
w[0].on('write', function() {
if (--writes === 0) {
r.unpipe();
t.equal(r._readableState.pipes, null);
w[0].end();
r.pipe(w[1]);
t.equal(r._readableState.pipes, w[1]);
}
});
var ended = 0;
var ended0 = false;
var ended1 = false;
w[0].on('end', function(results) {
t.equal(ended0, false);
ended0 = true;
ended++;
t.same(results, expect[0]);
});
w[1].on('end', function(results) {
t.equal(ended1, false);
ended1 = true;
ended++;
t.equal(ended, 2);
t.same(results, expect[1]);

Loading…
Cancel
Save