From e26622bd18fc86033cea393125cad49c577b524b Mon Sep 17 00:00:00 2001 From: isaacs Date: Sun, 27 Jan 2013 11:56:36 -0800 Subject: [PATCH] stream: Correct Transform class backpressure The refactor in b43e544140ccf68580c02e71c56d19b82e1e1d32 to use stream.push() in Transform inadvertently caused it to immediately consume all the written data, regardless of whether or not the readable side was being consumed. Only pull data through the _transform() process when the readable side is being consumed. Fix #4667 --- lib/_stream_transform.js | 109 ++++++++++++++------------ test/simple/test-stream2-transform.js | 27 +++++++ 2 files changed, 86 insertions(+), 50 deletions(-) diff --git a/lib/_stream_transform.js b/lib/_stream_transform.js index 020dc8d76a..8b75a52b62 100644 --- a/lib/_stream_transform.js +++ b/lib/_stream_transform.js @@ -70,28 +70,63 @@ var Duplex = require('_stream_duplex'); var util = require('util'); util.inherits(Transform, Duplex); -function TransformState(stream) { - this.buffer = []; - this.transforming = false; - this.pendingReadCb = null; + +function TransformState(options, stream) { + var ts = this; this.output = function(chunk) { + ts.needTransform = false; stream.push(chunk); }; + + this.afterTransform = function(er, data) { + return afterTransform(stream, er, data); + }; + + this.needTransform = false; + this.transforming = false; + this.writecb = null; + this.writechunk = null; +} + +function afterTransform(stream, er, data) { + var ts = stream._transformState; + ts.transforming = false; + + var cb = ts.writecb; + + if (!cb) + return this.emit('error', new Error('no writecb in Transform class')); + + ts.writechunk = null; + ts.writecb = null; + + if (data !== null && data !== undefined) + ts.output(data); + + if (cb) + cb(er); + + var rs = stream._readableState; + if (rs.needReadable || rs.length < rs.highWaterMark) { + stream._read(); + } } + function Transform(options) { if (!(this instanceof Transform)) return new Transform(options); Duplex.call(this, options); - // bind output so that it can be passed around as a regular function. + var ts = this._transformState = new TransformState(options, this); + + // when the writable side finishes, then flush out anything remaining. var stream = this; - // the queue of _write chunks that are pending being transformed - var ts = this._transformState = new TransformState(stream); + // start out asking for a readable event once data is transformed. + this._readableState.needReadable = true; - // when the writable side finishes, then flush out anything remaining. this.once('finish', function() { if ('function' === typeof this._flush) this._flush(ts.output, function(er) { @@ -118,56 +153,30 @@ Transform.prototype._transform = function(chunk, output, cb) { Transform.prototype._write = function(chunk, cb) { var ts = this._transformState; - var rs = this._readableState; - ts.buffer.push([chunk, cb]); - - // no need for auto-pull if already in the midst of one. + ts.writecb = cb; + ts.writechunk = chunk; if (ts.transforming) return; - - // now we have something to transform, if we were waiting for it. - // kick off a _read to pull it in. - if (ts.pendingReadCb) { - var readcb = ts.pendingReadCb; - ts.pendingReadCb = null; - this._read(0, readcb); - } - - // if we weren't waiting for it, but nothing is queued up, then - // still kick off a transform, just so it's there when the user asks. - var doRead = rs.needReadable || rs.length <= rs.highWaterMark; - if (doRead && !rs.reading) { - var ret = this.read(0); - if (ret !== null) - return cb(new Error('invalid stream transform state')); - } + var rs = this._readableState; + if (ts.needTransform || rs.needReadable || rs.length < rs.highWaterMark) + this._read(); }; -Transform.prototype._read = function(n, readcb) { - var ws = this._writableState; - var rs = this._readableState; +// Doesn't matter what the args are here. +// the output and callback functions passed to _transform do all the work. +// That we got here means that the readable side wants more data. +Transform.prototype._read = function(n, cb) { var ts = this._transformState; - ts.pendingReadCb = readcb; - - // if there's nothing pending, then we just wait. - // if we're already transforming, then also just hold on a sec. - // we've already stashed the readcb, so we can come back later - // when we have something to transform - if (ts.buffer.length === 0 || ts.transforming) + if (ts.writechunk && ts.writecb && !ts.transforming) { + ts.transforming = true; + this._transform(ts.writechunk, ts.output, ts.afterTransform); return; + } - // go ahead and transform that thing, now that someone wants it - var req = ts.buffer.shift(); - var chunk = req[0]; - var writecb = req[1]; - ts.transforming = true; - this._transform(chunk, ts.output, function(er, data) { - ts.transforming = false; - if (data) - ts.output(data); - writecb(er); - }); + // mark that we need a transform, so that any data that comes in + // will get processed, now that we've asked for it. + ts.needTransform = true; }; diff --git a/test/simple/test-stream2-transform.js b/test/simple/test-stream2-transform.js index 1c154c6897..92f5784bd4 100644 --- a/test/simple/test-stream2-transform.js +++ b/test/simple/test-stream2-transform.js @@ -61,6 +61,33 @@ process.nextTick(run); ///// +test('writable side consumption', function(t) { + var tx = new Transform({ + highWaterMark: 10 + }); + + var transformed = 0; + tx._transform = function(chunk, output, cb) { + transformed += chunk.length; + output(chunk); + cb(); + }; + + for (var i = 1; i <= 10; i++) { + tx.write(new Buffer(i)); + } + tx.end(); + + t.equal(tx._readableState.length, 10); + t.equal(transformed, 10); + t.equal(tx._transformState.writechunk.length, 5); + t.same(tx._writableState.buffer.map(function(c) { + return c[0].length; + }), [6, 7, 8, 9, 10]); + + t.end(); +}); + test('passthrough', function(t) { var pt = new PassThrough();