|
|
|
'use strict';
|
stream: Fix unshift() race conditions
Fix #5272
The consumption of a readable stream is a dance with 3 partners.
1. The specific stream Author (A)
2. The Stream Base class (B), and
3. The Consumer of the stream (C)
When B calls the _read() method that A implements, it sets a 'reading'
flag, so that parallel calls to _read() can be avoided. When A calls
stream.push(), B knows that it's safe to start calling _read() again.
If the consumer C is some kind of parser that wants in some cases to
pass the source stream off to some other party, but not before "putting
back" some bit of previously consumed data (as in the case of Node's
websocket http upgrade implementation). So, stream.unshift() will
generally *never* be called by A, but *only* called by C.
Prior to this patch, stream.unshift() *also* unset the state.reading
flag, meaning that C could indicate the end of a read, and B would
dutifully fire off another _read() call to A. This is inappropriate.
In the case of fs streams, and other variably-laggy streams that don't
tolerate overlapped _read() calls, this causes big problems.
Also, calling stream.shift() after the 'end' event did not raise any
kind of error, but would cause very strange behavior indeed. Calling it
after the EOF chunk was seen, but before the 'end' event was fired would
also cause weird behavior, and could lead to data being lost, since it
would not emit another 'readable' event.
This change makes it so that:
1. stream.unshift() does *not* set state.reading = false
2. stream.unshift() is allowed up until the 'end' event.
3. unshifting onto a EOF-encountered and zero-length (but not yet
end-emitted) stream will defer the 'end' event until the new data is
consumed.
4. pushing onto a EOF-encountered stream is now an error.
So, if you read(), you have that single tick to safely unshift() data
back into the stream, even if the null chunk was pushed, and the length
was 0.
12 years ago
|
|
|
var common = require('../common');
|
|
|
|
var assert = require('assert');
|
|
|
|
|
|
|
|
// This test verifies that:
|
|
|
|
// 1. unshift() does not cause colliding _read() calls.
|
|
|
|
// 2. unshift() after the 'end' event is an error, but after the EOF
|
|
|
|
// signalling null, it is ok, and just creates a new readable chunk.
|
|
|
|
// 3. push() after the EOF signaling null is an error.
|
|
|
|
// 4. _read() is not called after pushing the EOF null chunk.
|
|
|
|
|
|
|
|
var stream = require('stream');
|
|
|
|
var hwm = 10;
|
|
|
|
var r = stream.Readable({ highWaterMark: hwm });
|
|
|
|
var chunks = 10;
|
|
|
|
var t = (chunks * 5);
|
|
|
|
|
|
|
|
var data = new Buffer(chunks * hwm + Math.ceil(hwm / 2));
|
|
|
|
for (var i = 0; i < data.length; i++) {
|
|
|
|
var c = 'asdf'.charCodeAt(i % 4);
|
|
|
|
data[i] = c;
|
|
|
|
}
|
|
|
|
|
|
|
|
var pos = 0;
|
|
|
|
var pushedNull = false;
|
|
|
|
r._read = function(n) {
|
|
|
|
assert(!pushedNull, '_read after null push');
|
|
|
|
|
|
|
|
// every third chunk is fast
|
|
|
|
push(!(chunks % 3));
|
|
|
|
|
|
|
|
function push(fast) {
|
|
|
|
assert(!pushedNull, 'push() after null push');
|
|
|
|
var c = pos >= data.length ? null : data.slice(pos, pos + n);
|
|
|
|
pushedNull = c === null;
|
|
|
|
if (fast) {
|
|
|
|
pos += n;
|
|
|
|
r.push(c);
|
|
|
|
if (c === null) pushError();
|
|
|
|
} else {
|
|
|
|
setTimeout(function() {
|
|
|
|
pos += n;
|
|
|
|
r.push(c);
|
|
|
|
if (c === null) pushError();
|
|
|
|
});
|
|
|
|
}
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
function pushError() {
|
|
|
|
assert.throws(function() {
|
|
|
|
r.push(new Buffer(1));
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
var w = stream.Writable();
|
|
|
|
var written = [];
|
|
|
|
w._write = function(chunk, encoding, cb) {
|
|
|
|
written.push(chunk.toString());
|
|
|
|
cb();
|
|
|
|
};
|
|
|
|
|
|
|
|
var ended = false;
|
|
|
|
r.on('end', function() {
|
|
|
|
assert(!ended, 'end emitted more than once');
|
|
|
|
assert.throws(function() {
|
|
|
|
r.unshift(new Buffer(1));
|
|
|
|
});
|
|
|
|
ended = true;
|
|
|
|
w.end();
|
|
|
|
});
|
|
|
|
|
|
|
|
r.on('readable', function() {
|
|
|
|
var chunk;
|
|
|
|
while (null !== (chunk = r.read(10))) {
|
|
|
|
w.write(chunk);
|
|
|
|
if (chunk.length > 4)
|
|
|
|
r.unshift(new Buffer('1234'));
|
|
|
|
}
|
|
|
|
});
|
|
|
|
|
|
|
|
var finished = false;
|
|
|
|
w.on('finish', function() {
|
|
|
|
finished = true;
|
|
|
|
// each chunk should start with 1234, and then be asfdasdfasdf...
|
|
|
|
// The first got pulled out before the first unshift('1234'), so it's
|
|
|
|
// lacking that piece.
|
|
|
|
assert.equal(written[0], 'asdfasdfas');
|
|
|
|
var asdf = 'd';
|
|
|
|
console.error('0: %s', written[0]);
|
|
|
|
for (var i = 1; i < written.length; i++) {
|
|
|
|
console.error('%s: %s', i.toString(32), written[i]);
|
|
|
|
assert.equal(written[i].slice(0, 4), '1234');
|
|
|
|
for (var j = 4; j < written[i].length; j++) {
|
|
|
|
var c = written[i].charAt(j);
|
|
|
|
assert.equal(c, asdf);
|
|
|
|
switch (asdf) {
|
|
|
|
case 'a': asdf = 's'; break;
|
|
|
|
case 's': asdf = 'd'; break;
|
|
|
|
case 'd': asdf = 'f'; break;
|
|
|
|
case 'f': asdf = 'a'; break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
});
|
|
|
|
|
|
|
|
process.on('exit', function() {
|
|
|
|
assert.equal(written.length, 18);
|
|
|
|
assert(ended, 'stream ended');
|
|
|
|
assert(finished, 'stream finished');
|
|
|
|
console.log('ok');
|
|
|
|
});
|