diff --git a/lib/_stream_transform.js b/lib/_stream_transform.js index 020dc8d76a..8b75a52b62 100644 --- a/lib/_stream_transform.js +++ b/lib/_stream_transform.js @@ -70,28 +70,63 @@ var Duplex = require('_stream_duplex'); var util = require('util'); util.inherits(Transform, Duplex); -function TransformState(stream) { - this.buffer = []; - this.transforming = false; - this.pendingReadCb = null; + +function TransformState(options, stream) { + var ts = this; this.output = function(chunk) { + ts.needTransform = false; stream.push(chunk); }; + + this.afterTransform = function(er, data) { + return afterTransform(stream, er, data); + }; + + this.needTransform = false; + this.transforming = false; + this.writecb = null; + this.writechunk = null; +} + +function afterTransform(stream, er, data) { + var ts = stream._transformState; + ts.transforming = false; + + var cb = ts.writecb; + + if (!cb) + return this.emit('error', new Error('no writecb in Transform class')); + + ts.writechunk = null; + ts.writecb = null; + + if (data !== null && data !== undefined) + ts.output(data); + + if (cb) + cb(er); + + var rs = stream._readableState; + if (rs.needReadable || rs.length < rs.highWaterMark) { + stream._read(); + } } + function Transform(options) { if (!(this instanceof Transform)) return new Transform(options); Duplex.call(this, options); - // bind output so that it can be passed around as a regular function. + var ts = this._transformState = new TransformState(options, this); + + // when the writable side finishes, then flush out anything remaining. var stream = this; - // the queue of _write chunks that are pending being transformed - var ts = this._transformState = new TransformState(stream); + // start out asking for a readable event once data is transformed. + this._readableState.needReadable = true; - // when the writable side finishes, then flush out anything remaining. this.once('finish', function() { if ('function' === typeof this._flush) this._flush(ts.output, function(er) { @@ -118,56 +153,30 @@ Transform.prototype._transform = function(chunk, output, cb) { Transform.prototype._write = function(chunk, cb) { var ts = this._transformState; - var rs = this._readableState; - ts.buffer.push([chunk, cb]); - - // no need for auto-pull if already in the midst of one. + ts.writecb = cb; + ts.writechunk = chunk; if (ts.transforming) return; - - // now we have something to transform, if we were waiting for it. - // kick off a _read to pull it in. - if (ts.pendingReadCb) { - var readcb = ts.pendingReadCb; - ts.pendingReadCb = null; - this._read(0, readcb); - } - - // if we weren't waiting for it, but nothing is queued up, then - // still kick off a transform, just so it's there when the user asks. - var doRead = rs.needReadable || rs.length <= rs.highWaterMark; - if (doRead && !rs.reading) { - var ret = this.read(0); - if (ret !== null) - return cb(new Error('invalid stream transform state')); - } + var rs = this._readableState; + if (ts.needTransform || rs.needReadable || rs.length < rs.highWaterMark) + this._read(); }; -Transform.prototype._read = function(n, readcb) { - var ws = this._writableState; - var rs = this._readableState; +// Doesn't matter what the args are here. +// the output and callback functions passed to _transform do all the work. +// That we got here means that the readable side wants more data. +Transform.prototype._read = function(n, cb) { var ts = this._transformState; - ts.pendingReadCb = readcb; - - // if there's nothing pending, then we just wait. - // if we're already transforming, then also just hold on a sec. - // we've already stashed the readcb, so we can come back later - // when we have something to transform - if (ts.buffer.length === 0 || ts.transforming) + if (ts.writechunk && ts.writecb && !ts.transforming) { + ts.transforming = true; + this._transform(ts.writechunk, ts.output, ts.afterTransform); return; + } - // go ahead and transform that thing, now that someone wants it - var req = ts.buffer.shift(); - var chunk = req[0]; - var writecb = req[1]; - ts.transforming = true; - this._transform(chunk, ts.output, function(er, data) { - ts.transforming = false; - if (data) - ts.output(data); - writecb(er); - }); + // mark that we need a transform, so that any data that comes in + // will get processed, now that we've asked for it. + ts.needTransform = true; }; diff --git a/test/simple/test-stream2-transform.js b/test/simple/test-stream2-transform.js index 1c154c6897..92f5784bd4 100644 --- a/test/simple/test-stream2-transform.js +++ b/test/simple/test-stream2-transform.js @@ -61,6 +61,33 @@ process.nextTick(run); ///// +test('writable side consumption', function(t) { + var tx = new Transform({ + highWaterMark: 10 + }); + + var transformed = 0; + tx._transform = function(chunk, output, cb) { + transformed += chunk.length; + output(chunk); + cb(); + }; + + for (var i = 1; i <= 10; i++) { + tx.write(new Buffer(i)); + } + tx.end(); + + t.equal(tx._readableState.length, 10); + t.equal(transformed, 10); + t.equal(tx._transformState.writechunk.length, 5); + t.same(tx._writableState.buffer.map(function(c) { + return c[0].length; + }), [6, 7, 8, 9, 10]); + + t.end(); +}); + test('passthrough', function(t) { var pt = new PassThrough();