|
|
@ -395,9 +395,7 @@ function emitReadable(stream) { |
|
|
|
debug('emitReadable', state.flowing); |
|
|
|
state.emittedReadable = true; |
|
|
|
if (state.sync) |
|
|
|
process.nextTick(function() { |
|
|
|
emitReadable_(stream); |
|
|
|
}); |
|
|
|
process.nextTick(emitReadable_, stream); |
|
|
|
else |
|
|
|
emitReadable_(stream); |
|
|
|
} |
|
|
@ -419,9 +417,7 @@ function emitReadable_(stream) { |
|
|
|
function maybeReadMore(stream, state) { |
|
|
|
if (!state.readingMore) { |
|
|
|
state.readingMore = true; |
|
|
|
process.nextTick(function() { |
|
|
|
maybeReadMore_(stream, state); |
|
|
|
}); |
|
|
|
process.nextTick(maybeReadMore_, stream, state); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
@ -667,11 +663,7 @@ Readable.prototype.on = function(ev, fn) { |
|
|
|
state.emittedReadable = false; |
|
|
|
state.needReadable = true; |
|
|
|
if (!state.reading) { |
|
|
|
var self = this; |
|
|
|
process.nextTick(function() { |
|
|
|
debug('readable nexttick read 0'); |
|
|
|
self.read(0); |
|
|
|
}); |
|
|
|
process.nextTick(nReadingNextTick, this); |
|
|
|
} else if (state.length) { |
|
|
|
emitReadable(this, state); |
|
|
|
} |
|
|
@ -682,6 +674,11 @@ Readable.prototype.on = function(ev, fn) { |
|
|
|
}; |
|
|
|
Readable.prototype.addListener = Readable.prototype.on; |
|
|
|
|
|
|
|
function nReadingNextTick(self) { |
|
|
|
debug('readable nexttick read 0'); |
|
|
|
self.read(0); |
|
|
|
} |
|
|
|
|
|
|
|
// pause() and resume() are remnants of the legacy readable stream API
|
|
|
|
// If the user uses them, then switch into old mode.
|
|
|
|
Readable.prototype.resume = function() { |
|
|
@ -697,9 +694,7 @@ Readable.prototype.resume = function() { |
|
|
|
function resume(stream, state) { |
|
|
|
if (!state.resumeScheduled) { |
|
|
|
state.resumeScheduled = true; |
|
|
|
process.nextTick(function() { |
|
|
|
resume_(stream, state); |
|
|
|
}); |
|
|
|
process.nextTick(resume_, stream, state); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
@ -883,13 +878,15 @@ function endReadable(stream) { |
|
|
|
|
|
|
|
if (!state.endEmitted) { |
|
|
|
state.ended = true; |
|
|
|
process.nextTick(function() { |
|
|
|
process.nextTick(endReadableNT, state, stream); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
function endReadableNT(state, stream) { |
|
|
|
// Check that we didn't get one last unshift.
|
|
|
|
if (!state.endEmitted && state.length === 0) { |
|
|
|
state.endEmitted = true; |
|
|
|
stream.readable = false; |
|
|
|
stream.emit('end'); |
|
|
|
} |
|
|
|
}); |
|
|
|
} |
|
|
|
} |
|
|
|