Browse Source

streams2: Abstract out onread function

v0.9.4-release
isaacs 12 years ago
parent
commit
286aa04910
  1. 114
      lib/_stream_readable.js

114
lib/_stream_readable.js

@ -28,7 +28,7 @@ var StringDecoder;
util.inherits(Readable, Stream);
function ReadableState(options) {
function ReadableState(options, stream) {
options = options || {};
this.bufferSize = options.bufferSize || 16 * 1024;
@ -45,6 +45,8 @@ function ReadableState(options) {
this.ended = false;
this.endEmitted = false;
this.reading = false;
this.sync = false;
this.onread = onread.bind(stream);
// whenever we return null, then we set a flag to say
// that we're awaiting a 'readable' event emission.
@ -144,59 +146,10 @@ Readable.prototype.read = function(n) {
if (doRead) {
var sync = true;
state.reading = true;
state.sync = true;
// call internal read method
this._read(state.bufferSize, function onread(er, chunk) {
state.reading = false;
if (er)
return this.emit('error', er);
if (!chunk || !chunk.length) {
// eof
state.ended = true;
if (state.decoder) {
chunk = state.decoder.end();
if (chunk && chunk.length) {
state.buffer.push(chunk);
state.length += chunk.length;
}
}
// if we've ended and we have some data left, then emit
// 'readable' now to make sure it gets picked up.
if (!sync) {
if (state.length > 0)
this.emit('readable');
else
endReadable(this);
}
return;
}
if (state.decoder)
chunk = state.decoder.write(chunk);
// update the buffer info.
if (chunk) {
state.length += chunk.length;
state.buffer.push(chunk);
}
// if we haven't gotten enough to pass the lowWaterMark,
// and we haven't ended, then don't bother telling the user
// that it's time to read more data. Otherwise, that'll
// probably kick off another stream.read(), which can trigger
// another _read(n,cb) before this one returns!
if (state.length <= state.lowWaterMark) {
state.reading = true;
this._read(state.bufferSize, onread.bind(this));
return;
}
if (state.needReadable && !sync) {
state.needReadable = false;
this.emit('readable');
}
}.bind(this));
sync = false;
this._read(state.bufferSize, state.onread);
state.sync = false;
}
// If _read called its callback synchronously, then `reading`
@ -221,6 +174,61 @@ Readable.prototype.read = function(n) {
return ret;
};
function onread(er, chunk) {
var state = this._readableState;
var sync = state.sync;
state.reading = false;
if (er)
return this.emit('error', er);
if (!chunk || !chunk.length) {
// eof
state.ended = true;
if (state.decoder) {
chunk = state.decoder.end();
if (chunk && chunk.length) {
state.buffer.push(chunk);
state.length += chunk.length;
}
}
// if we've ended and we have some data left, then emit
// 'readable' now to make sure it gets picked up.
if (!sync) {
if (state.length > 0)
this.emit('readable');
else
endReadable(this);
}
return;
}
if (state.decoder)
chunk = state.decoder.write(chunk);
// update the buffer info.
if (chunk) {
state.length += chunk.length;
state.buffer.push(chunk);
}
// if we haven't gotten enough to pass the lowWaterMark,
// and we haven't ended, then don't bother telling the user
// that it's time to read more data. Otherwise, that'll
// probably kick off another stream.read(), which can trigger
// another _read(n,cb) before this one returns!
if (state.length <= state.lowWaterMark) {
state.reading = true;
this._read(state.bufferSize, state.onread);
return;
}
if (state.needReadable && !sync) {
state.needReadable = false;
this.emit('readable');
}
}
// abstract method. to be overridden in specific implementation classes.
// call cb(er, data) where data is <= n in length.
// for virtual (non-string, non-buffer) streams, "length" is somewhat

Loading…
Cancel
Save