Browse Source

stream: Avoid nextTick warning filling read buffer

In the function that pre-emptively fills the Readable queue, it relies
on a recursion through:

stream.push(chunk) ->
maybeReadMore(stream, state) ->
  if (not reading more and < hwm) stream.read(0) ->
stream._read() ->
stream.push(chunk) -> repeat.

Since this was only calling read() a single time, and then relying on a
future nextTick to collect more data, it ends up causing a nextTick
recursion error (and potentially a RangeError, even) if you have a very
high highWaterMark, and are getting very small chunks pushed
synchronously in _read (as happens with TLS, or many simple test
streams).

This change implements a new approach, so that read(0) is called
repeatedly as long as it is effective (that is, the length keeps
increasing), and thus quickly fills up the buffer for streams such as
these, without any stacks overflowing.
v0.10.0-release
isaacs 12 years ago
parent
commit
cd2b9f542c
  1. 10
      lib/_stream_readable.js
  2. 15
      test/simple/test-stream2-unpipe-leak.js

10
lib/_stream_readable.js

@ -386,17 +386,23 @@ function maybeReadMore(stream, state) {
if (!state.readingMore) { if (!state.readingMore) {
state.readingMore = true; state.readingMore = true;
process.nextTick(function() { process.nextTick(function() {
state.readingMore = false;
maybeReadMore_(stream, state); maybeReadMore_(stream, state);
}); });
} }
} }
function maybeReadMore_(stream, state) { function maybeReadMore_(stream, state) {
if (!state.reading && !state.flowing && !state.ended && var len = state.length;
while (!state.reading && !state.flowing && !state.ended &&
state.length < state.highWaterMark) { state.length < state.highWaterMark) {
stream.read(0); stream.read(0);
if (len === state.length)
// didn't get any data, stop spinning.
break;
else
len = state.length;
} }
state.readingMore = false;
} }
// abstract method. to be overridden in specific implementation classes. // abstract method. to be overridden in specific implementation classes.

15
test/simple/test-stream2-unpipe-leak.js

@ -24,6 +24,8 @@ var common = require('../common.js');
var assert = require('assert'); var assert = require('assert');
var stream = require('stream'); var stream = require('stream');
var chunk = new Buffer('hallo');
var util = require('util'); var util = require('util');
function TestWriter() { function TestWriter() {
@ -37,13 +39,15 @@ TestWriter.prototype._write = function(buffer, encoding, callback) {
var dest = new TestWriter(); var dest = new TestWriter();
// Set this high so that we'd trigger a nextTick warning
// and/or RangeError if we do maybeReadMore wrong.
function TestReader() { function TestReader() {
stream.Readable.call(this); stream.Readable.call(this, { highWaterMark: 0x10000 });
} }
util.inherits(TestReader, stream.Readable); util.inherits(TestReader, stream.Readable);
TestReader.prototype._read = function(size) { TestReader.prototype._read = function(size) {
this.push(new Buffer('hallo')); this.push(chunk);
}; };
var src = new TestReader(); var src = new TestReader();
@ -61,3 +65,10 @@ assert.equal(dest.listeners('drain').length, 0);
assert.equal(dest.listeners('error').length, 0); assert.equal(dest.listeners('error').length, 0);
assert.equal(dest.listeners('close').length, 0); assert.equal(dest.listeners('close').length, 0);
assert.equal(dest.listeners('finish').length, 0); assert.equal(dest.listeners('finish').length, 0);
console.error(src._readableState);
process.on('exit', function() {
assert(src._readableState.length >= src._readableState.highWaterMark);
src._readableState.buffer.length = 0;
console.error(src._readableState);
});

Loading…
Cancel
Save