Browse Source

stream: Break up the onread function

A primary motivation of this is to make the onread function more
inline-friendly, but also to make it more easy to explore not having
onread at all, in favor of always using push() to signal the end of
reading.
v0.9.11-release
isaacs 12 years ago
parent
commit
7764b84297
  1. 164
      lib/_stream_readable.js

164
lib/_stream_readable.js

@ -105,21 +105,51 @@ function Readable(options) {
// similar to how Writable.write() returns true if you should // similar to how Writable.write() returns true if you should
// write() some more. // write() some more.
Readable.prototype.push = function(chunk) { Readable.prototype.push = function(chunk) {
var rs = this._readableState; var state = this._readableState;
rs.onread(null, chunk); return readableAddChunk(this, state, chunk);
// if it's past the high water mark, we can push in some more.
// Also, if we have no data yet, we can stand some
// more bytes. This is to work around cases where hwm=0,
// such as the repl. Also, if the push() triggered a
// readable event, and the user called read(largeNumber) such that
// needReadable was set, then we ought to push more, so that another
// 'readable' event will be triggered.
return rs.needReadable ||
rs.length < rs.highWaterMark ||
rs.length === 0;
}; };
function readableAddChunk(stream, state, chunk) {
state.reading = false;
var er = chunkInvalid(state, chunk);
if (er) {
stream.emit('error', er);
} else if (chunk === null || chunk === undefined) {
onreadEof(stream, state);
} else if (state.objectMode || chunk && chunk.length > 0) {
if (state.decoder)
chunk = state.decoder.write(chunk);
// update the buffer info.
state.length += state.objectMode ? 1 : chunk.length;
state.buffer.push(chunk);
if (state.needReadable)
emitReadable(stream);
maybeReadMore(stream, state);
}
return needMoreData(state);
}
// if it's past the high water mark, we can push in some more.
// Also, if we have no data yet, we can stand some
// more bytes. This is to work around cases where hwm=0,
// such as the repl. Also, if the push() triggered a
// readable event, and the user called read(largeNumber) such that
// needReadable was set, then we ought to push more, so that another
// 'readable' event will be triggered.
function needMoreData(state) {
return !state.ended &&
(state.needReadable ||
state.length < state.highWaterMark ||
state.length === 0);
}
// backwards compatibility. // backwards compatibility.
Readable.prototype.setEncoding = function(enc) { Readable.prototype.setEncoding = function(enc) {
if (!StringDecoder) if (!StringDecoder)
@ -263,15 +293,20 @@ Readable.prototype.read = function(n) {
return ret; return ret;
}; };
// This is the function passed to _read(n,cb) as the callback.
// It should be called exactly once for every _read() call.
function onread(stream, er, chunk) { function onread(stream, er, chunk) {
var state = stream._readableState; var state = stream._readableState;
var sync = state.sync; var sync = state.sync;
// If we get something that is not a buffer, string, null, or undefined, if (er)
// and we're not in objectMode, then that's an error. stream.emit('error', er);
// Otherwise stream chunks are all considered to be of length=1, and the else
// watermarks determine how many objects to keep in the buffer, rather than stream.push(chunk);
// how many bytes or characters. }
function chunkInvalid(state, chunk) {
var er = null;
if (!Buffer.isBuffer(chunk) && if (!Buffer.isBuffer(chunk) &&
'string' !== typeof chunk && 'string' !== typeof chunk &&
chunk !== null && chunk !== null &&
@ -280,68 +315,26 @@ function onread(stream, er, chunk) {
!er) { !er) {
er = new TypeError('Invalid non-string/buffer chunk'); er = new TypeError('Invalid non-string/buffer chunk');
} }
return er;
}
state.reading = false;
if (er)
return stream.emit('error', er);
if (chunk === null || chunk === undefined) { function onreadEof(stream, state) {
// eof state.ended = true;
state.ended = true; if (state.decoder) {
if (state.decoder) { var chunk = state.decoder.end();
chunk = state.decoder.end(); if (chunk && chunk.length) {
if (chunk && chunk.length) { state.buffer.push(chunk);
state.buffer.push(chunk); state.length += state.objectMode ? 1 : chunk.length;
state.length += state.objectMode ? 1 : chunk.length;
}
} }
// if we've ended and we have some data left, then emit
// 'readable' now to make sure it gets picked up.
if (state.length > 0)
emitReadable(stream);
else
endReadable(stream);
return;
}
// at this point, if we got a zero-length buffer or string,
// and we're not in object-mode, then there's really no point
// continuing. it means that there is nothing to read right
// now, but as we have not received the EOF-signaling null,
// we're not ended. we've already unset the reading flag,
// so just get out of here.
if (!state.objectMode &&
(chunk || typeof chunk === 'string') &&
0 === chunk.length)
return;
if (state.decoder)
chunk = state.decoder.write(chunk);
// update the buffer info.
state.length += state.objectMode ? 1 : chunk.length;
state.buffer.push(chunk);
// if we haven't gotten any data,
// and we haven't ended, then don't bother telling the user
// that it's time to read more data. Otherwise, emitting 'readable'
// probably will trigger another stream.read(), which can trigger
// another _read(n,cb) before this one returns!
if (state.length === 0) {
state.reading = true;
stream._read(state.bufferSize, state.onread);
return;
} }
if (state.needReadable) // if we've ended and we have some data left, then emit
// 'readable' now to make sure it gets picked up.
if (state.length > 0)
emitReadable(stream); emitReadable(stream);
else if (state.sync)
process.nextTick(function() {
maybeReadMore(stream, state);
});
else else
maybeReadMore(stream, state); endReadable(stream);
} }
// Don't emit readable right away in sync mode, because this can trigger // Don't emit readable right away in sync mode, because this can trigger
@ -365,17 +358,26 @@ function emitReadable(stream) {
function emitReadable_(stream) { function emitReadable_(stream) {
var state = stream._readableState; var state = stream._readableState;
stream.emit('readable'); stream.emit('readable');
maybeReadMore(stream, state);
} }
// at this point, the user has presumably seen the 'readable' event,
// and called read() to consume some data. that may have triggered
// in turn another _read(n,cb) call, in which case reading = true if
// it's in progress.
// However, if we're not ended, or reading, and the length < hwm,
// then go ahead and try to read some more right now preemptively.
function maybeReadMore(stream, state) { function maybeReadMore(stream, state) {
// at this point, the user has presumably seen the 'readable' event, if (state.sync)
// and called read() to consume some data. that may have triggered process.nextTick(function() {
// in turn another _read(n,cb) call, in which case reading = true if maybeReadMore_(stream, state);
// it's in progress. });
// However, if we're not ended, or reading, and the length < hwm, else
// then go ahead and try to read some more right now preemptively. maybeReadMore_(stream, state);
if (!state.reading && !state.ending && !state.ended && }
function maybeReadMore_(stream, state) {
if (!state.reading && !state.ended &&
state.length < state.highWaterMark) { state.length < state.highWaterMark) {
stream.read(0); stream.read(0);
} }

Loading…
Cancel
Save