Browse Source

streams2: Get rid of .once() usage in Readable.pipe

Significant performance impact
v0.9.4-release
isaacs 12 years ago
parent
commit
38e2b0053a
  1. 39
      lib/_stream_readable.js

39
lib/_stream_readable.js

@ -78,6 +78,11 @@ function ReadableState(options, stream) {
this.needReadable = false; this.needReadable = false;
this.emittedReadable = false; this.emittedReadable = false;
// when piping, we only care about 'readable' events that happen
// after read()ing all the bytes and not getting any pushback.
this.ranOut = false;
this.flowChunkSize = null;
this.decoder = null; this.decoder = null;
if (options.encoding) { if (options.encoding) {
if (!StringDecoder) if (!StringDecoder)
@ -285,8 +290,6 @@ Readable.prototype._read = function(n, cb) {
Readable.prototype.pipe = function(dest, pipeOpts) { Readable.prototype.pipe = function(dest, pipeOpts) {
var src = this; var src = this;
var state = this._readableState; var state = this._readableState;
if (!pipeOpts)
pipeOpts = {};
switch (state.pipesCount) { switch (state.pipesCount) {
case 0: case 0:
@ -311,6 +314,9 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
}); });
} }
if (pipeOpts && pipeOpts.chunkSize)
state.flowChunkSize = pipeOpts.chunkSize;
function onend() { function onend() {
dest.end(); dest.end();
} }
@ -319,16 +325,22 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
// start the flow. // start the flow.
if (!state.flowing) { if (!state.flowing) {
// the handler that waits for readable events after all
// the data gets sucked out in flow.
// This would be easier to follow with a .once() handler
// in flow(), but that is too slow.
this.on('readable', pipeOnReadable);
state.flowing = true; state.flowing = true;
process.nextTick(function() { process.nextTick(function() {
flow(src, pipeOpts); flow(src);
}); });
} }
return dest; return dest;
}; };
function flow(src, pipeOpts) { function flow(src) {
var state = src._readableState; var state = src._readableState;
var chunk; var chunk;
var needDrain = 0; var needDrain = 0;
@ -336,7 +348,7 @@ function flow(src, pipeOpts) {
function ondrain() { function ondrain() {
needDrain--; needDrain--;
if (needDrain === 0) if (needDrain === 0)
flow(src, pipeOpts); flow(src);
} }
function write(dest, i, list) { function write(dest, i, list) {
@ -348,7 +360,7 @@ function flow(src, pipeOpts) {
} }
while (state.pipesCount && while (state.pipesCount &&
null !== (chunk = src.read(pipeOpts.chunkSize))) { null !== (chunk = src.read(state.pipeChunkSize))) {
if (state.pipesCount === 1) if (state.pipesCount === 1)
write(state.pipes, 0, null); write(state.pipes, 0, null);
@ -377,11 +389,17 @@ function flow(src, pipeOpts) {
// at this point, no one needed a drain, so we just ran out of data // at this point, no one needed a drain, so we just ran out of data
// on the next readable event, start it over again. // on the next readable event, start it over again.
src.once('readable', function() { state.ranOut = true;
flow(src, pipeOpts);
});
} }
function pipeOnReadable() {
if (this._readableState.ranOut) {
this._readableState.ranOut = false;
flow(this);
}
}
Readable.prototype.unpipe = function(dest) { Readable.prototype.unpipe = function(dest) {
var state = this._readableState; var state = this._readableState;
@ -401,6 +419,7 @@ Readable.prototype.unpipe = function(dest) {
// got a match. // got a match.
state.pipes = null; state.pipes = null;
state.pipesCount = 0; state.pipesCount = 0;
this.removeListener('readable', pipeOnReadable);
if (dest) if (dest)
dest.emit('unpipe', this); dest.emit('unpipe', this);
return this; return this;
@ -414,10 +433,10 @@ Readable.prototype.unpipe = function(dest) {
var len = state.pipesCount; var len = state.pipesCount;
state.pipes = null; state.pipes = null;
state.pipesCount = 0; state.pipesCount = 0;
this.removeListener('readable', pipeOnReadable);
for (var i = 0; i < len; i++) for (var i = 0; i < len; i++)
dests[i].emit('unpipe', this); dests[i].emit('unpipe', this);
return this; return this;
} }

Loading…
Cancel
Save