Browse Source

streams2: Handle pipeChunkSize properly

v0.9.4-release
isaacs 12 years ago
parent
commit
ac5a185edf
  1. 45
      lib/_stream_readable.js

45
lib/_stream_readable.js

@ -84,7 +84,7 @@ function ReadableState(options, stream) {
// the number of writers that are awaiting a drain event in .pipe()s
this.awaitDrain = 0;
this.flowChunkSize = null;
this.pipeChunkSize = null;
this.decoder = null;
if (options.encoding) {
@ -118,7 +118,7 @@ function howMuchToRead(n, state) {
if (state.length === 0 && state.ended)
return 0;
if (isNaN(n))
if (isNaN(n) || n === null)
return state.length;
if (n <= 0)
@ -266,8 +266,8 @@ function onread(stream, er, chunk) {
// if we haven't gotten enough to pass the lowWaterMark,
// and we haven't ended, then don't bother telling the user
// that it's time to read more data. Otherwise, that'll
// probably kick off another stream.read(), which can trigger
// that it's time to read more data. Otherwise, emitting 'readable'
// probably will trigger another stream.read(), which can trigger
// another _read(n,cb) before this one returns!
if (state.length <= state.lowWaterMark) {
state.reading = true;
@ -322,34 +322,41 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
}
if (pipeOpts && pipeOpts.chunkSize)
state.flowChunkSize = pipeOpts.chunkSize;
state.pipeChunkSize = pipeOpts.chunkSize;
function onend() {
dest.end();
}
// when the dest drains, it reduces the awaitDrain counter
// on the source. This would be more elegant with a .once()
// handler in flow(), but adding and removing repeatedly is
// too slow.
var ondrain = pipeOnDrain(src);
dest.on('drain', ondrain);
dest.on('unpipe', function(readable) {
if (readable === src)
dest.removeListener('drain', ondrain);
// if the reader is waiting for a drain event from this
// specific writer, then it would cause it to never start
// flowing again.
// So, if this is awaiting a drain, then we just call it now.
// If we don't know, then assume that we are waiting for one.
if (!dest._writableState || dest._writableState.needDrain)
ondrain();
});
// tell the dest that it's being piped to
dest.emit('pipe', src);
// start the flow.
// start the flow if it hasn't been started already.
if (!state.flowing) {
// the handler that waits for readable events after all
// the data gets sucked out in flow.
// This would be easier to follow with a .once() handler
// in flow(), but that is too slow.
this.on('readable', pipeOnReadable);
var ondrain = pipeOnDrain(src);
dest.on('drain', ondrain);
dest.on('unpipe', function(readable) {
if (readable === src)
dest.removeListener('drain', ondrain);
// if the reader is waiting for a drain event from this
// specific writer, then it would cause it to never start
// flowing again.
// So, if this is awaiting a drain, then we just call it now.
if (dest._writableState.needDrain)
ondrain();
});
state.flowing = true;
process.nextTick(function() {

Loading…
Cancel
Save