From cd2b9f542c34ce59d2df7820a6237f7911c702f3 Mon Sep 17 00:00:00 2001 From: isaacs Date: Sat, 9 Mar 2013 10:56:17 -0800 Subject: [PATCH] stream: Avoid nextTick warning filling read buffer In the function that pre-emptively fills the Readable queue, it relies on a recursion through: stream.push(chunk) -> maybeReadMore(stream, state) -> if (not reading more and < hwm) stream.read(0) -> stream._read() -> stream.push(chunk) -> repeat. Since this was only calling read() a single time, and then relying on a future nextTick to collect more data, it ends up causing a nextTick recursion error (and potentially a RangeError, even) if you have a very high highWaterMark, and are getting very small chunks pushed synchronously in _read (as happens with TLS, or many simple test streams). This change implements a new approach, so that read(0) is called repeatedly as long as it is effective (that is, the length keeps increasing), and thus quickly fills up the buffer for streams such as these, without any stacks overflowing. --- lib/_stream_readable.js | 12 +++++++++--- test/simple/test-stream2-unpipe-leak.js | 15 +++++++++++++-- 2 files changed, 22 insertions(+), 5 deletions(-) diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index d6ef327a8a..41cf8e89cc 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -386,17 +386,23 @@ function maybeReadMore(stream, state) { if (!state.readingMore) { state.readingMore = true; process.nextTick(function() { - state.readingMore = false; maybeReadMore_(stream, state); }); } } function maybeReadMore_(stream, state) { - if (!state.reading && !state.flowing && !state.ended && - state.length < state.highWaterMark) { + var len = state.length; + while (!state.reading && !state.flowing && !state.ended && + state.length < state.highWaterMark) { stream.read(0); + if (len === state.length) + // didn't get any data, stop spinning. + break; + else + len = state.length; } + state.readingMore = false; } // abstract method. to be overridden in specific implementation classes. diff --git a/test/simple/test-stream2-unpipe-leak.js b/test/simple/test-stream2-unpipe-leak.js index c2cf9077f3..b9e8a960d8 100644 --- a/test/simple/test-stream2-unpipe-leak.js +++ b/test/simple/test-stream2-unpipe-leak.js @@ -24,6 +24,8 @@ var common = require('../common.js'); var assert = require('assert'); var stream = require('stream'); +var chunk = new Buffer('hallo'); + var util = require('util'); function TestWriter() { @@ -37,13 +39,15 @@ TestWriter.prototype._write = function(buffer, encoding, callback) { var dest = new TestWriter(); +// Set this high so that we'd trigger a nextTick warning +// and/or RangeError if we do maybeReadMore wrong. function TestReader() { - stream.Readable.call(this); + stream.Readable.call(this, { highWaterMark: 0x10000 }); } util.inherits(TestReader, stream.Readable); TestReader.prototype._read = function(size) { - this.push(new Buffer('hallo')); + this.push(chunk); }; var src = new TestReader(); @@ -61,3 +65,10 @@ assert.equal(dest.listeners('drain').length, 0); assert.equal(dest.listeners('error').length, 0); assert.equal(dest.listeners('close').length, 0); assert.equal(dest.listeners('finish').length, 0); + +console.error(src._readableState); +process.on('exit', function() { + assert(src._readableState.length >= src._readableState.highWaterMark); + src._readableState.buffer.length = 0; + console.error(src._readableState); +});