Browse Source

stream: Always defer preemptive reading to improve latency

v0.10.0-release
Gil Pedersen 12 years ago
committed by isaacs
parent
commit
77a776da90
  1. 16
      lib/_stream_readable.js

16
lib/_stream_readable.js

@ -72,6 +72,9 @@ function ReadableState(options, stream) {
// the number of writers that are awaiting a drain event in .pipe()s // the number of writers that are awaiting a drain event in .pipe()s
this.awaitDrain = 0; this.awaitDrain = 0;
// if true, a maybeReadMore has been scheduled
this.readingMore = false;
this.decoder = null; this.decoder = null;
if (options.encoding) { if (options.encoding) {
if (!StringDecoder) if (!StringDecoder)
@ -378,18 +381,19 @@ function emitReadable_(stream) {
// in turn another _read(n) call, in which case reading = true if // in turn another _read(n) call, in which case reading = true if
// it's in progress. // it's in progress.
// However, if we're not ended, or reading, and the length < hwm, // However, if we're not ended, or reading, and the length < hwm,
// then go ahead and try to read some more right now preemptively. // then go ahead and try to read some more preemptively.
function maybeReadMore(stream, state) { function maybeReadMore(stream, state) {
if (state.sync) if (!state.readingMore) {
state.readingMore = true;
process.nextTick(function() { process.nextTick(function() {
state.readingMore = false;
maybeReadMore_(stream, state); maybeReadMore_(stream, state);
}); });
else }
maybeReadMore_(stream, state);
} }
function maybeReadMore_(stream, state) { function maybeReadMore_(stream, state) {
if (!state.reading && !state.ended && if (!state.reading && !state.flowing && !state.ended &&
state.length < state.highWaterMark) { state.length < state.highWaterMark) {
stream.read(0); stream.read(0);
} }
@ -636,7 +640,7 @@ Readable.prototype.on = function(ev, fn) {
if (ev === 'data' && !this._readableState.flowing) if (ev === 'data' && !this._readableState.flowing)
emitDataEvents(this); emitDataEvents(this);
if (ev === 'readable') if (ev === 'readable' && !this._readableState.reading)
this.read(0); this.read(0);
return res; return res;

Loading…
Cancel
Save