Browse Source

stream: Correct Transform class backpressure

The refactor in b43e544140 to use
stream.push() in Transform inadvertently caused it to immediately
consume all the written data, regardless of whether or not the readable
side was being consumed.

Only pull data through the _transform() process when the readable side
is being consumed.

Fix #4667
v0.9.9-release
isaacs 12 years ago
parent
commit
e26622bd18
  1. 109
      lib/_stream_transform.js
  2. 27
      test/simple/test-stream2-transform.js

109
lib/_stream_transform.js

@ -70,28 +70,63 @@ var Duplex = require('_stream_duplex');
var util = require('util'); var util = require('util');
util.inherits(Transform, Duplex); util.inherits(Transform, Duplex);
function TransformState(stream) {
this.buffer = []; function TransformState(options, stream) {
this.transforming = false; var ts = this;
this.pendingReadCb = null;
this.output = function(chunk) { this.output = function(chunk) {
ts.needTransform = false;
stream.push(chunk); stream.push(chunk);
}; };
this.afterTransform = function(er, data) {
return afterTransform(stream, er, data);
};
this.needTransform = false;
this.transforming = false;
this.writecb = null;
this.writechunk = null;
}
function afterTransform(stream, er, data) {
var ts = stream._transformState;
ts.transforming = false;
var cb = ts.writecb;
if (!cb)
return this.emit('error', new Error('no writecb in Transform class'));
ts.writechunk = null;
ts.writecb = null;
if (data !== null && data !== undefined)
ts.output(data);
if (cb)
cb(er);
var rs = stream._readableState;
if (rs.needReadable || rs.length < rs.highWaterMark) {
stream._read();
}
} }
function Transform(options) { function Transform(options) {
if (!(this instanceof Transform)) if (!(this instanceof Transform))
return new Transform(options); return new Transform(options);
Duplex.call(this, options); Duplex.call(this, options);
// bind output so that it can be passed around as a regular function. var ts = this._transformState = new TransformState(options, this);
// when the writable side finishes, then flush out anything remaining.
var stream = this; var stream = this;
// the queue of _write chunks that are pending being transformed // start out asking for a readable event once data is transformed.
var ts = this._transformState = new TransformState(stream); this._readableState.needReadable = true;
// when the writable side finishes, then flush out anything remaining.
this.once('finish', function() { this.once('finish', function() {
if ('function' === typeof this._flush) if ('function' === typeof this._flush)
this._flush(ts.output, function(er) { this._flush(ts.output, function(er) {
@ -118,56 +153,30 @@ Transform.prototype._transform = function(chunk, output, cb) {
Transform.prototype._write = function(chunk, cb) { Transform.prototype._write = function(chunk, cb) {
var ts = this._transformState; var ts = this._transformState;
var rs = this._readableState; ts.writecb = cb;
ts.buffer.push([chunk, cb]); ts.writechunk = chunk;
// no need for auto-pull if already in the midst of one.
if (ts.transforming) if (ts.transforming)
return; return;
var rs = this._readableState;
// now we have something to transform, if we were waiting for it. if (ts.needTransform || rs.needReadable || rs.length < rs.highWaterMark)
// kick off a _read to pull it in. this._read();
if (ts.pendingReadCb) {
var readcb = ts.pendingReadCb;
ts.pendingReadCb = null;
this._read(0, readcb);
}
// if we weren't waiting for it, but nothing is queued up, then
// still kick off a transform, just so it's there when the user asks.
var doRead = rs.needReadable || rs.length <= rs.highWaterMark;
if (doRead && !rs.reading) {
var ret = this.read(0);
if (ret !== null)
return cb(new Error('invalid stream transform state'));
}
}; };
Transform.prototype._read = function(n, readcb) { // Doesn't matter what the args are here.
var ws = this._writableState; // the output and callback functions passed to _transform do all the work.
var rs = this._readableState; // That we got here means that the readable side wants more data.
Transform.prototype._read = function(n, cb) {
var ts = this._transformState; var ts = this._transformState;
ts.pendingReadCb = readcb; if (ts.writechunk && ts.writecb && !ts.transforming) {
ts.transforming = true;
// if there's nothing pending, then we just wait. this._transform(ts.writechunk, ts.output, ts.afterTransform);
// if we're already transforming, then also just hold on a sec.
// we've already stashed the readcb, so we can come back later
// when we have something to transform
if (ts.buffer.length === 0 || ts.transforming)
return; return;
}
// go ahead and transform that thing, now that someone wants it // mark that we need a transform, so that any data that comes in
var req = ts.buffer.shift(); // will get processed, now that we've asked for it.
var chunk = req[0]; ts.needTransform = true;
var writecb = req[1];
ts.transforming = true;
this._transform(chunk, ts.output, function(er, data) {
ts.transforming = false;
if (data)
ts.output(data);
writecb(er);
});
}; };

27
test/simple/test-stream2-transform.js

@ -61,6 +61,33 @@ process.nextTick(run);
///// /////
test('writable side consumption', function(t) {
var tx = new Transform({
highWaterMark: 10
});
var transformed = 0;
tx._transform = function(chunk, output, cb) {
transformed += chunk.length;
output(chunk);
cb();
};
for (var i = 1; i <= 10; i++) {
tx.write(new Buffer(i));
}
tx.end();
t.equal(tx._readableState.length, 10);
t.equal(transformed, 10);
t.equal(tx._transformState.writechunk.length, 5);
t.same(tx._writableState.buffer.map(function(c) {
return c[0].length;
}), [6, 7, 8, 9, 10]);
t.end();
});
test('passthrough', function(t) { test('passthrough', function(t) {
var pt = new PassThrough(); var pt = new PassThrough();

Loading…
Cancel
Save