You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

113 lines
2.7 KiB

stream: Fix unshift() race conditions Fix #5272 The consumption of a readable stream is a dance with 3 partners. 1. The specific stream Author (A) 2. The Stream Base class (B), and 3. The Consumer of the stream (C) When B calls the _read() method that A implements, it sets a 'reading' flag, so that parallel calls to _read() can be avoided. When A calls stream.push(), B knows that it's safe to start calling _read() again. If the consumer C is some kind of parser that wants in some cases to pass the source stream off to some other party, but not before "putting back" some bit of previously consumed data (as in the case of Node's websocket http upgrade implementation). So, stream.unshift() will generally *never* be called by A, but *only* called by C. Prior to this patch, stream.unshift() *also* unset the state.reading flag, meaning that C could indicate the end of a read, and B would dutifully fire off another _read() call to A. This is inappropriate. In the case of fs streams, and other variably-laggy streams that don't tolerate overlapped _read() calls, this causes big problems. Also, calling stream.shift() after the 'end' event did not raise any kind of error, but would cause very strange behavior indeed. Calling it after the EOF chunk was seen, but before the 'end' event was fired would also cause weird behavior, and could lead to data being lost, since it would not emit another 'readable' event. This change makes it so that: 1. stream.unshift() does *not* set state.reading = false 2. stream.unshift() is allowed up until the 'end' event. 3. unshifting onto a EOF-encountered and zero-length (but not yet end-emitted) stream will defer the 'end' event until the new data is consumed. 4. pushing onto a EOF-encountered stream is now an error. So, if you read(), you have that single tick to safely unshift() data back into the stream, even if the null chunk was pushed, and the length was 0.
12 years ago
var common = require('../common');
var assert = require('assert');
// This test verifies that:
// 1. unshift() does not cause colliding _read() calls.
// 2. unshift() after the 'end' event is an error, but after the EOF
// signalling null, it is ok, and just creates a new readable chunk.
// 3. push() after the EOF signaling null is an error.
// 4. _read() is not called after pushing the EOF null chunk.
var stream = require('stream');
var hwm = 10;
var r = stream.Readable({ highWaterMark: hwm });
var chunks = 10;
var t = (chunks * 5);
var data = new Buffer(chunks * hwm + Math.ceil(hwm / 2));
for (var i = 0; i < data.length; i++) {
var c = 'asdf'.charCodeAt(i % 4);
data[i] = c;
}
var pos = 0;
var pushedNull = false;
r._read = function(n) {
assert(!pushedNull, '_read after null push');
// every third chunk is fast
push(!(chunks % 3));
function push(fast) {
assert(!pushedNull, 'push() after null push');
var c = pos >= data.length ? null : data.slice(pos, pos + n);
pushedNull = c === null;
if (fast) {
pos += n;
r.push(c);
if (c === null) pushError();
} else {
setTimeout(function() {
pos += n;
r.push(c);
if (c === null) pushError();
});
}
}
};
function pushError() {
assert.throws(function() {
r.push(new Buffer(1));
});
}
var w = stream.Writable();
var written = [];
w._write = function(chunk, encoding, cb) {
written.push(chunk.toString());
cb();
};
var ended = false;
r.on('end', function() {
assert(!ended, 'end emitted more than once');
assert.throws(function() {
r.unshift(new Buffer(1));
});
ended = true;
w.end();
});
r.on('readable', function() {
var chunk;
while (null !== (chunk = r.read(10))) {
w.write(chunk);
if (chunk.length > 4)
r.unshift(new Buffer('1234'));
}
});
var finished = false;
w.on('finish', function() {
finished = true;
// each chunk should start with 1234, and then be asfdasdfasdf...
// The first got pulled out before the first unshift('1234'), so it's
// lacking that piece.
assert.equal(written[0], 'asdfasdfas');
var asdf = 'd';
console.error('0: %s', written[0]);
for (var i = 1; i < written.length; i++) {
console.error('%s: %s', i.toString(32), written[i]);
assert.equal(written[i].slice(0, 4), '1234');
for (var j = 4; j < written[i].length; j++) {
var c = written[i].charAt(j);
assert.equal(c, asdf);
switch (asdf) {
case 'a': asdf = 's'; break;
case 's': asdf = 'd'; break;
case 'd': asdf = 'f'; break;
case 'f': asdf = 'a'; break;
}
}
}
});
process.on('exit', function() {
assert.equal(written.length, 18);
assert(ended, 'stream ended');
assert(finished, 'stream finished');
console.log('ok');
});