Browse Source

stream: Don't require read(0) to emit 'readable' event

When a readable listener is added, call read(0) so that data will flow in, up to
the high water mark.

Otherwise, it's somewhat confusing that you have to listen for readable,
and ALSO call read() (when it will certainly return null) just to get some
data out of the stream.

See: #4720
v0.9.12-release
isaacs 12 years ago
parent
commit
119cbf4854
  1. 10
      lib/_stream_readable.js
  2. 33
      test/simple/test-stream2-basic.js
  3. 2
      test/simple/test-stream2-unpipe-leak.js

10
lib/_stream_readable.js

@ -613,17 +613,17 @@ Readable.prototype.unpipe = function(dest) {
return this; return this;
}; };
// kludge for on('data', fn) consumers. Sad. // set up data events if they are asked for
// This is *not* part of the new readable stream interface. // Ensure readable listeners eventually get something
// It is an ugly unfortunate mess of history.
Readable.prototype.on = function(ev, fn) { Readable.prototype.on = function(ev, fn) {
var res = Stream.prototype.on.call(this, ev, fn); var res = Stream.prototype.on.call(this, ev, fn);
// https://github.com/isaacs/readable-stream/issues/16
// if we're already flowing, then no need to set up data events.
if (ev === 'data' && !this._readableState.flowing) if (ev === 'data' && !this._readableState.flowing)
emitDataEvents(this); emitDataEvents(this);
if (ev === 'readable')
this.read(0);
return res; return res;
}; };
Readable.prototype.addListener = Readable.prototype.on; Readable.prototype.addListener = Readable.prototype.on;

33
test/simple/test-stream2-basic.js

@ -38,6 +38,7 @@ function TestReader(n) {
util.inherits(TestReader, R); util.inherits(TestReader, R);
TestReader.prototype.read = function(n) { TestReader.prototype.read = function(n) {
if (n === 0) return null;
var max = this._buffer.length - this._pos; var max = this._buffer.length - this._pos;
n = n || max; n = n || max;
n = Math.max(n, 0); n = Math.max(n, 0);
@ -80,11 +81,6 @@ TestWriter.prototype.write = function(c) {
this.received.push(c.toString()); this.received.push(c.toString());
this.emit('write', c); this.emit('write', c);
return true; return true;
// flip back and forth between immediate acceptance and not.
this.flush = !this.flush;
if (!this.flush) setTimeout(this.emit.bind(this, 'drain'), 10);
return this.flush;
}; };
TestWriter.prototype.end = function(c) { TestWriter.prototype.end = function(c) {
@ -113,6 +109,7 @@ function run() {
console.log('# %s', name); console.log('# %s', name);
fn({ fn({
same: assert.deepEqual, same: assert.deepEqual,
ok: assert,
equal: assert.equal, equal: assert.equal,
end: function () { end: function () {
count--; count--;
@ -187,6 +184,7 @@ test('pipe', function(t) {
var w = new TestWriter; var w = new TestWriter;
var flush = true; var flush = true;
w.on('end', function(received) { w.on('end', function(received) {
t.same(received, expect); t.same(received, expect);
t.end(); t.end();
@ -450,3 +448,28 @@ test('sync _read ending', function (t) {
t.end(); t.end();
}) })
}); });
test('adding readable triggers data flow', function(t) {
var r = new R({ highWaterMark: 5 });
var onReadable = false;
var readCalled = 0;
r._read = function(n) {
if (readCalled++ === 2)
r.push(null);
else
r.push(new Buffer('asdf'));
};
var called = false;
r.on('readable', function() {
onReadable = true;
r.read();
});
r.on('end', function() {
t.equal(readCalled, 3);
t.ok(onReadable);
t.end();
});
});

2
test/simple/test-stream2-unpipe-leak.js

@ -43,7 +43,7 @@ function TestReader() {
util.inherits(TestReader, stream.Readable); util.inherits(TestReader, stream.Readable);
TestReader.prototype._read = function(size) { TestReader.prototype._read = function(size) {
stream.push(new Buffer('hallo')); this.push(new Buffer('hallo'));
}; };
var src = new TestReader(); var src = new TestReader();

Loading…
Cancel
Save