Browse Source

streams2: Handle immediate synthetic transforms properly

v0.9.4-release
isaacs 12 years ago
parent
commit
8acb416ad0
  1. 3
      lib/_stream_passthrough.js
  2. 134
      lib/_stream_readable.js
  3. 7
      lib/_stream_transform.js

3
lib/_stream_passthrough.js

@ -34,6 +34,5 @@ function PassThrough(options) {
} }
PassThrough.prototype._transform = function(chunk, output, cb) { PassThrough.prototype._transform = function(chunk, output, cb) {
output(chunk); cb(null, chunk);
cb();
}; };

134
lib/_stream_readable.js

@ -28,7 +28,7 @@ var StringDecoder;
util.inherits(Readable, Stream); util.inherits(Readable, Stream);
function ReadableState(options, stream) { function ReadableState(options) {
options = options || {}; options = options || {};
this.bufferSize = options.bufferSize || 16 * 1024; this.bufferSize = options.bufferSize || 16 * 1024;
@ -44,7 +44,6 @@ function ReadableState(options, stream) {
this.flowing = false; this.flowing = false;
this.ended = false; this.ended = false;
this.endEmitted = false; this.endEmitted = false;
this.stream = stream;
this.reading = false; this.reading = false;
// whenever we return null, then we set a flag to say // whenever we return null, then we set a flag to say
@ -71,52 +70,76 @@ Readable.prototype.setEncoding = function(enc) {
this._readableState.decoder = new StringDecoder(enc); this._readableState.decoder = new StringDecoder(enc);
}; };
// you can override either this method, or _read(n, cb) below.
Readable.prototype.read = function(n) {
var state = this._readableState;
if (state.length === 0 && state.ended) { function howMuchToRead(n, state) {
endReadable(this); if (state.length === 0 && state.ended)
return null; return 0;
}
if (isNaN(n))
return state.length;
if (isNaN(n) || n <= 0) if (n <= 0)
n = state.length return 0;
// XXX: controversial.
// don't have that much. return null, unless we've ended. // don't have that much. return null, unless we've ended.
// However, if the low water mark is lower than the number of bytes,
// then we still need to return what we have, or else it won't kick
// off another _read() call. For example,
// lwm=5
// len=9
// read(10)
// We don't have that many bytes, so it'd be tempting to return null,
// but then it won't ever cause _read to be called, so in that case,
// we just return what we have, and let the programmer deal with it.
if (n > state.length) { if (n > state.length) {
if (!state.ended && state.length <= state.lowWaterMark) { if (!state.ended) {
state.needReadable = true; state.needReadable = true;
n = 0; return 0;
} else } else
n = state.length; return state.length;
} }
return n;
}
var ret; // you can override either this method, or _read(n, cb) below.
if (n > 0) Readable.prototype.read = function(n) {
ret = fromList(n, state.buffer, state.length, !!state.decoder); var state = this._readableState;
else var nOrig = n;
ret = null;
if (ret === null || ret.length === 0) n = howMuchToRead(n, state);
state.needReadable = true;
state.length -= n; // if we've ended, and we're now clear, then finish it up.
if (n === 0 && state.ended) {
endReadable(this);
return null;
}
if (!state.ended && // All the actual chunk generation logic needs to be
state.length <= state.lowWaterMark && // *below* the call to _read. The reason is that in certain
!state.reading) { // synthetic stream cases, such as passthrough streams, _read
// may be a completely synchronous operation which may change
// the state of the read buffer, providing enough data when
// before there was *not* enough.
//
// So, the steps are:
// 1. Figure out what the state of things will be after we do
// a read from the buffer.
//
// 2. If that resulting state will trigger a _read, then call _read.
// Note that this may be asynchronous, or synchronous. Yes, it is
// deeply ugly to write APIs this way, but that still doesn't mean
// that the Readable class should behave improperly, as streams are
// designed to be sync/async agnostic.
// Take note if the _read call is sync or async (ie, if the read call
// has returned yet), so that we know whether or not it's safe to emit
// 'readable' etc.
//
// 3. Actually pull the requested chunks out of the buffer and return.
// if we need a readable event, then we need to do some reading.
var doRead = state.needReadable;
// if we currently have less than the lowWaterMark, then also read some
if (state.length - n <= state.lowWaterMark)
doRead = true;
// however, if we've ended, then there's no point, and if we're already
// reading, then it's unnecessary.
if (state.ended || state.reading)
doRead = false;
if (doRead) {
var sync = true;
state.reading = true; state.reading = true;
// call internal read method // call internal read method
this._read(state.bufferSize, function onread(er, chunk) { this._read(state.bufferSize, function onread(er, chunk) {
@ -125,21 +148,27 @@ Readable.prototype.read = function(n) {
return this.emit('error', er); return this.emit('error', er);
if (!chunk || !chunk.length) { if (!chunk || !chunk.length) {
// eof
state.ended = true; state.ended = true;
// if we've ended and we have some data left, then emit // if we've ended and we have some data left, then emit
// 'readable' now to make sure it gets picked up. // 'readable' now to make sure it gets picked up.
if (state.length > 0) if (!sync) {
this.emit('readable'); if (state.length > 0)
else this.emit('readable');
endReadable(this); else
endReadable(this);
}
return; return;
} }
if (state.decoder) if (state.decoder)
chunk = state.decoder.write(chunk); chunk = state.decoder.write(chunk);
state.length += chunk.length; // update the buffer info.
state.buffer.push(chunk); if (chunk) {
state.length += chunk.length;
state.buffer.push(chunk);
}
// if we haven't gotten enough to pass the lowWaterMark, // if we haven't gotten enough to pass the lowWaterMark,
// and we haven't ended, then don't bother telling the user // and we haven't ended, then don't bother telling the user
@ -152,14 +181,33 @@ Readable.prototype.read = function(n) {
return; return;
} }
// now we have something to call this.read() to get. if (state.needReadable && !sync) {
if (state.needReadable) {
state.needReadable = false; state.needReadable = false;
this.emit('readable'); this.emit('readable');
} }
}.bind(this)); }.bind(this));
sync = false;
} }
// If _read called its callback synchronously, then `reading`
// will be false, and we need to re-evaluate how much data we
// can return to the user.
if (doRead && !state.reading)
n = howMuchToRead(nOrig, state);
var ret;
if (n > 0)
ret = fromList(n, state.buffer, state.length, !!state.decoder);
else
ret = null;
if (ret === null || ret.length === 0) {
state.needReadable = true;
n = 0;
}
state.length -= n;
return ret; return ret;
}; };

7
lib/_stream_transform.js

@ -122,13 +122,14 @@ Transform.prototype._write = function(chunk, cb) {
if (ts.pendingReadCb) { if (ts.pendingReadCb) {
var readcb = ts.pendingReadCb; var readcb = ts.pendingReadCb;
ts.pendingReadCb = null; ts.pendingReadCb = null;
this._read(-1, readcb); this._read(0, readcb);
} }
// if we weren't waiting for it, but nothing is queued up, then // if we weren't waiting for it, but nothing is queued up, then
// still kick off a transform, just so it's there when the user asks. // still kick off a transform, just so it's there when the user asks.
if (rs.length === 0) { var doRead = rs.needReadable || rs.length <= rs.lowWaterMark;
var ret = this.read(); if (doRead && !rs.reading) {
var ret = this.read(0);
if (ret !== null) if (ret !== null)
return cb(new Error('invalid stream transform state')); return cb(new Error('invalid stream transform state'));
} }

Loading…
Cancel
Save