You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

146 lines
3.0 KiB

'use strict';
require('../common');
var assert = require('assert');
var stream = require('stream');
var Readable = stream.Readable;
var Writable = stream.Writable;
var totalChunks = 100;
var chunkSize = 99;
var expectTotalData = totalChunks * chunkSize;
var expectEndingData = expectTotalData;
var r = new Readable({ highWaterMark: 1000 });
var chunks = totalChunks;
r._read = function(n) {
if (!(chunks % 2))
setImmediate(push);
else if (!(chunks % 3))
process.nextTick(push);
else
push();
};
var totalPushed = 0;
function push() {
var chunk = chunks-- > 0 ? Buffer.alloc(chunkSize, 'x') : null;
if (chunk) {
totalPushed += chunk.length;
}
r.push(chunk);
}
read100();
// first we read 100 bytes
function read100() {
readn(100, onData);
}
function readn(n, then) {
console.error('read %d', n);
expectEndingData -= n;
(function read() {
var c = r.read(n);
if (!c)
r.once('readable', read);
else {
assert.equal(c.length, n);
assert(!r._readableState.flowing);
then();
}
})();
}
// then we listen to some data events
function onData() {
expectEndingData -= 100;
console.error('onData');
var seen = 0;
r.on('data', function od(c) {
seen += c.length;
if (seen >= 100) {
// seen enough
r.removeListener('data', od);
r.pause();
if (seen > 100) {
// oh no, seen too much!
// put the extra back.
var diff = seen - 100;
r.unshift(c.slice(c.length - diff));
console.error('seen too much', seen, diff);
}
// Nothing should be lost in between
setImmediate(pipeLittle);
}
});
}
// Just pipe 200 bytes, then unshift the extra and unpipe
function pipeLittle() {
expectEndingData -= 200;
console.error('pipe a little');
var w = new Writable();
var written = 0;
w.on('finish', function() {
assert.equal(written, 200);
setImmediate(read1234);
});
w._write = function(chunk, encoding, cb) {
written += chunk.length;
if (written >= 200) {
r.unpipe(w);
w.end();
cb();
if (written > 200) {
var diff = written - 200;
written -= diff;
r.unshift(chunk.slice(chunk.length - diff));
}
} else {
setImmediate(cb);
}
};
r.pipe(w);
}
// now read 1234 more bytes
function read1234() {
readn(1234, resumePause);
}
function resumePause() {
console.error('resumePause');
// don't read anything, just resume and re-pause a whole bunch
r.resume();
r.pause();
r.resume();
r.pause();
r.resume();
r.pause();
r.resume();
r.pause();
r.resume();
r.pause();
setImmediate(pipe);
}
function pipe() {
console.error('pipe the rest');
var w = new Writable();
var written = 0;
w._write = function(chunk, encoding, cb) {
written += chunk.length;
cb();
};
w.on('finish', function() {
console.error('written', written, totalPushed);
assert.equal(written, expectEndingData);
assert.equal(totalPushed, expectTotalData);
console.log('ok');
});
r.pipe(w);
}