Browse Source

stream: remove lowWaterMark feature

It seems like a good idea on the face of it, but lowWaterMarks are
actually not useful, and in practice should always be set to zero.

It would be worthwhile for writers if we actually did some kind of
writev() type of thing, but actually this just delays calling write()
and the overhead of doing a bunch of Buffer copies is not worth the
slight benefit of calling write() fewer times.
v0.9.11-release
isaacs 12 years ago
parent
commit
3b2e9d2648
  1. 14
      doc/api/stream.markdown
  2. 79
      doc/blog/feature/streams2.md
  3. 20
      lib/_stream_readable.js
  4. 4
      lib/_stream_transform.js
  5. 10
      lib/_stream_writable.js
  6. 1
      lib/fs.js
  7. 1
      lib/tty.js
  8. 19
      lib/zlib.js
  9. 1
      src/node.js
  10. 1
      test/simple/test-file-write-stream.js
  11. 1
      test/simple/test-file-write-stream2.js
  12. 3
      test/simple/test-fs-read-stream-err.js
  13. 1
      test/simple/test-fs-write-stream-err.js
  14. 3
      test/simple/test-net-binary.js
  15. 15
      test/simple/test-stream2-basic.js
  16. 82
      test/simple/test-stream2-objects.js
  17. 1
      test/simple/test-stream2-push.js
  18. 6
      test/simple/test-stream2-transform.js
  19. 6
      test/simple/test-stream2-writable.js

14
doc/api/stream.markdown

@ -92,8 +92,6 @@ method. (See below.)
* `options` {Object}
* `bufferSize` {Number} The size of the chunks to consume from the
underlying resource. Default=16kb
* `lowWaterMark` {Number} The minimum number of bytes to store in
the internal buffer before emitting `readable`. Default=0
* `highWaterMark` {Number} The maximum number of bytes to store in
the internal buffer before ceasing to read from the underlying
resource. Default=16kb
@ -193,9 +191,7 @@ myReader.on('readable', function() {
### Event: 'readable'
When there is data ready to be consumed, this event will fire. The
number of bytes that are required to be considered "readable" depends
on the `lowWaterMark` option set in the constructor.
When there is data ready to be consumed, this event will fire.
When this event emits, call the `read()` method to consume the data.
@ -322,8 +318,6 @@ method. (See below.)
* `options` {Object}
* `highWaterMark` {Number} Buffer level when `write()` starts
returning false. Default=16kb
* `lowWaterMark` {Number} The buffer level when `'drain'` is
emitted. Default=0
* `decodeStrings` {Boolean} Whether or not to decode strings into
Buffers before passing them to `_write()`. Default=true
@ -371,10 +365,8 @@ flushed to the underlying resource. Returns `false` to indicate that
the buffer is full, and the data will be sent out in the future. The
`'drain'` event will indicate when the buffer is empty again.
The specifics of when `write()` will return false, and when a
subsequent `'drain'` event will be emitted, are determined by the
`highWaterMark` and `lowWaterMark` options provided to the
constructor.
The specifics of when `write()` will return false, is determined by
the `highWaterMark` option provided to the constructor.
### writable.end([chunk], [encoding], [callback])

79
doc/blog/feature/streams2.md

@ -111,7 +111,7 @@ feedback.
A stream is an abstract interface implemented by various objects in
Node. For example a request to an HTTP server is a stream, as is
stdout. Streams are readable, writable, or both. All streams are
instances of EventEmitter.
instances of [EventEmitter][]
You can load the Stream base classes by doing `require('stream')`.
There are base classes provided for Readable streams, Writable
@ -198,13 +198,14 @@ method. (See below.)
* `options` {Object}
* `bufferSize` {Number} The size of the chunks to consume from the
underlying resource. Default=16kb
* `lowWaterMark` {Number} The minimum number of bytes to store in
the internal buffer before emitting `readable`. Default=0
* `highWaterMark` {Number} The maximum number of bytes to store in
the internal buffer before ceasing to read from the underlying
resource. Default=16kb
* `encoding` {String} If specified, then buffers will be decoded to
strings using the specified encoding. Default=null
* `objectMode` {Boolean} Whether this stream should behave
as a stream of objects. Meaning that stream.read(n) returns
a single value instead of a Buffer of size n
In classes that extend the Readable class, make sure to call the
constructor so that the buffering settings can be properly
@ -218,7 +219,7 @@ initialized.
All Readable stream implementations must provide a `_read` method
to fetch data from the underlying resource.
**This function MUST NOT be called directly.** It should be
Note: **This function MUST NOT be called directly.** It should be
implemented by child classes, and called by the internal Readable
class methods only.
@ -231,6 +232,46 @@ the class that defines it, and should not be called directly by user
programs. However, you **are** expected to override this method in
your own extension classes.
### readable.push(chunk)
* `chunk` {Buffer | null | String} Chunk of data to push into the read queue
* return {Boolean} Whether or not more pushes should be performed
The `Readable` class works by putting data into a read queue to be
pulled out later by calling the `read()` method when the `'readable'`
event fires.
The `push()` method will explicitly insert some data into the read
queue. If it is called with `null` then it will signal the end of the
data.
In some cases, you may be wrapping a lower-level source which has some
sort of pause/resume mechanism, and a data callback. In those cases,
you could wrap the low-level source object by doing something like
this:
```javascript
// source is an object with readStop() and readStart() methods,
// and an `ondata` member that gets called when it has data, and
// an `onend` member that gets called when the data is over.
var stream = new Readable();
source.ondata = function(chunk) {
// if push() returns false, then we need to stop reading from source
if (!stream.push(chunk))
source.readStop();
};
source.onend = function() {
stream.push(null);
};
// _read will be called when the stream wants to pull more data in
stream._read = function(size, cb) {
source.readStart();
};
```
### readable.wrap(stream)
@ -256,9 +297,7 @@ myReader.on('readable', function() {
### Event: 'readable'
When there is data ready to be consumed, this event will fire. The
number of bytes that are required to be considered "readable" depends
on the `lowWaterMark` option set in the constructor.
When there is data ready to be consumed, this event will fire.
When this event emits, call the `read()` method to consume the data.
@ -385,8 +424,6 @@ method. (See below.)
* `options` {Object}
* `highWaterMark` {Number} Buffer level when `write()` starts
returning false. Default=16kb
* `lowWaterMark` {Number} The buffer level when `'drain'` is
emitted. Default=0
* `decodeStrings` {Boolean} Whether or not to decode strings into
Buffers before passing them to `_write()`. Default=true
@ -402,7 +439,7 @@ initialized.
All Writable stream implementations must provide a `_write` method to
send data to the underlying resource.
**This function MUST NOT be called directly.** It should be
Note: **This function MUST NOT be called directly.** It should be
implemented by child classes, and called by the internal Writable
class methods only.
@ -434,16 +471,16 @@ flushed to the underlying resource. Returns `false` to indicate that
the buffer is full, and the data will be sent out in the future. The
`'drain'` event will indicate when the buffer is empty again.
The specifics of when `write()` will return false, and when a
subsequent `'drain'` event will be emitted, are determined by the
`highWaterMark` and `lowWaterMark` options provided to the
constructor.
The specifics of when `write()` will return false, is determined by
the `highWaterMark` option provided to the constructor.
### writable.end([chunk], [encoding])
### writable.end([chunk], [encoding], [callback])
* `chunk` {Buffer | String} Optional final data to be written
* `encoding` {String} Optional. If `chunk` is a string, then encoding
defaults to `'utf8'`
* `callback` {Function} Optional. Called when the final chunk is
successfully written.
Call this method to signal the end of the data being written to the
stream.
@ -459,6 +496,11 @@ without buffering again. Listen for it when `stream.write()` returns
Emitted when the underlying resource (for example, the backing file
descriptor) has been closed. Not all streams will emit this.
### Event: 'finish'
When `end()` is called and there are no more chunks to write, this
event is emitted.
### Event: 'pipe'
* `source` {Readable Stream}
@ -538,7 +580,7 @@ initialized.
All Transform stream implementations must provide a `_transform`
method to accept input and produce output.
**This function MUST NOT be called directly.** It should be
Note: **This function MUST NOT be called directly.** It should be
implemented by child classes, and called by the internal Transform
class methods only.
@ -564,7 +606,7 @@ your own extension classes.
* `callback` {Function} Call this function (optionally with an error
argument) when you are done flushing any remaining data.
**This function MUST NOT be called directly.** It MAY be implemented
Note: **This function MUST NOT be called directly.** It MAY be implemented
by child classes, and if so, will be called by the internal Transform
class methods only.
@ -592,3 +634,6 @@ This is a trivial implementation of a `Transform` stream that simply
passes the input bytes across to the output. Its purpose is mainly
for examples and testing, but there are occasionally use cases where
it can come in handy.
[EventEmitter]: http://nodejs.org/api/events.html#events_class_events_eventemitter

20
lib/_stream_readable.js

@ -39,18 +39,10 @@ function ReadableState(options, stream) {
var hwm = options.highWaterMark;
this.highWaterMark = (hwm || hwm === 0) ? hwm : 16 * 1024;
// the minimum number of bytes to buffer before emitting 'readable'
// default to pushing everything out as fast as possible.
this.lowWaterMark = options.lowWaterMark || 0;
// cast to ints.
this.bufferSize = ~~this.bufferSize;
this.lowWaterMark = ~~this.lowWaterMark;
this.highWaterMark = ~~this.highWaterMark;
if (this.lowWaterMark > this.highWaterMark)
throw new Error('lowWaterMark cannot be higher than highWaterMark');
this.buffer = [];
this.length = 0;
this.pipes = null;
@ -111,15 +103,15 @@ Readable.prototype.push = function(chunk) {
rs.onread(null, chunk);
// if it's past the high water mark, we can push in some more.
// Also, if it's still within the lowWaterMark, we can stand some
// more bytes. This is to work around cases where hwm=0 and
// lwm=0, such as the repl. Also, if the push() triggered a
// Also, if we have no data yet, we can stand some
// more bytes. This is to work around cases where hwm=0,
// such as the repl. Also, if the push() triggered a
// readable event, and the user called read(largeNumber) such that
// needReadable was set, then we ought to push more, so that another
// 'readable' event will be triggered.
return rs.needReadable ||
rs.length < rs.highWaterMark ||
rs.length <= rs.lowWaterMark;
rs.length === 0;
};
// backwards compatibility.
@ -324,12 +316,12 @@ function onread(stream, er, chunk) {
state.length += state.objectMode ? 1 : chunk.length;
state.buffer.push(chunk);
// if we haven't gotten enough to pass the lowWaterMark,
// if we haven't gotten any data,
// and we haven't ended, then don't bother telling the user
// that it's time to read more data. Otherwise, emitting 'readable'
// probably will trigger another stream.read(), which can trigger
// another _read(n,cb) before this one returns!
if (state.length <= state.lowWaterMark) {
if (state.length === 0) {
state.reading = true;
stream._read(state.bufferSize, state.onread);
return;

4
lib/_stream_transform.js

@ -60,9 +60,7 @@
//
// However, even in such a pathological case, only a single written chunk
// would be consumed, and then the rest would wait (un-transformed) until
// the results of the previous transformed chunk were consumed. Because
// the transform happens on-demand, it will only transform as much as is
// necessary to fill the readable buffer to the specified lowWaterMark.
// the results of the previous transformed chunk were consumed.
module.exports = Transform;

10
lib/_stream_writable.js

@ -41,21 +41,13 @@ function WritableState(options, stream) {
var hwm = options.highWaterMark;
this.highWaterMark = (hwm || hwm === 0) ? hwm : 16 * 1024;
// the point that it has to get to before we call _write(chunk,cb)
// default to pushing everything out as fast as possible.
this.lowWaterMark = options.lowWaterMark || 0;
// object stream flag to indicate whether or not this stream
// contains buffers or objects.
this.objectMode = !!options.objectMode;
// cast to ints.
this.lowWaterMark = ~~this.lowWaterMark;
this.highWaterMark = ~~this.highWaterMark;
if (this.lowWaterMark > this.highWaterMark)
throw new Error('lowWaterMark cannot be higher than highWaterMark');
this.needDrain = false;
// at the start of calling end()
this.ending = false;
@ -225,7 +217,7 @@ function onwrite(stream, er) {
return;
}
if (state.length <= state.lowWaterMark && state.needDrain) {
if (state.length === 0 && state.needDrain) {
// Must force callback to be called on nextTick, so that we don't
// emit 'drain' before the write() consumer gets the 'false' return
// value, and has a chance to attach a 'drain' listener.

1
lib/fs.js

@ -1390,7 +1390,6 @@ function ReadStream(path, options) {
// a little bit bigger buffer and water marks by default
options = util._extend({
bufferSize: 64 * 1024,
lowWaterMark: 16 * 1024,
highWaterMark: 64 * 1024
}, options || {});

1
lib/tty.js

@ -46,7 +46,6 @@ function ReadStream(fd, options) {
options = util._extend({
highWaterMark: 0,
lowWaterMark: 0,
readable: true,
writable: false,
handle: new TTY(fd, true)

19
lib/zlib.js

@ -309,24 +309,7 @@ Zlib.prototype.reset = function reset() {
};
Zlib.prototype._flush = function(output, callback) {
var rs = this._readableState;
var self = this;
this._transform(null, output, function(er) {
if (er)
return callback(er);
// now a weird thing happens... it could be that you called flush
// but everything had already actually been consumed, but it wasn't
// enough to get over the Readable class's lowWaterMark.
// In that case, we emit 'readable' now to make sure it's consumed.
if (rs.length &&
rs.length < rs.lowWaterMark &&
!rs.ended &&
rs.needReadable)
self.emit('readable');
callback();
});
this._transform(null, output, callback);
};
Zlib.prototype.flush = function(callback) {

1
src/node.js

@ -625,7 +625,6 @@
var tty = NativeModule.require('tty');
stdin = new tty.ReadStream(fd, {
highWaterMark: 0,
lowWaterMark: 0,
readable: true,
writable: false
});

1
test/simple/test-file-write-stream.js

@ -26,7 +26,6 @@ var path = require('path');
var fs = require('fs');
var fn = path.join(common.tmpDir, 'write.txt');
var file = fs.createWriteStream(fn, {
lowWaterMark: 0,
highWaterMark: 10
});

1
test/simple/test-file-write-stream2.js

@ -63,7 +63,6 @@ removeTestFile();
// drain at 0, return false at 10.
file = fs.createWriteStream(filepath, {
lowWaterMark: 0,
highWaterMark: 11
});

3
test/simple/test-fs-read-stream-err.js

@ -24,8 +24,7 @@ var assert = require('assert');
var fs = require('fs');
var stream = fs.createReadStream(__filename, {
bufferSize: 64,
lowWaterMark: 0
bufferSize: 64
});
var err = new Error('BAM');

1
test/simple/test-fs-write-stream-err.js

@ -24,7 +24,6 @@ var assert = require('assert');
var fs = require('fs');
var stream = fs.createWriteStream(common.tmpDir + '/out', {
lowWaterMark: 0,
highWaterMark: 10
});
var err = new Error('BAM');

3
test/simple/test-net-binary.js

@ -41,7 +41,6 @@ for (var i = 255; i >= 0; i--) {
// safe constructor
var echoServer = net.Server(function(connection) {
// connection._readableState.lowWaterMark = 0;
console.error('SERVER got connection');
connection.setEncoding('binary');
connection.on('data', function(chunk) {
@ -64,8 +63,6 @@ echoServer.on('listening', function() {
port: common.PORT
});
// c._readableState.lowWaterMark = 0;
c.setEncoding('binary');
c.on('data', function(chunk) {
console.error('CLIENT data %j', chunk);

15
test/simple/test-stream2-basic.js

@ -450,18 +450,3 @@ test('sync _read ending', function (t) {
t.end();
})
});
assert.throws(function() {
var bad = new R({
highWaterMark: 10,
lowWaterMark: 1000
});
});
assert.throws(function() {
var W = require('stream').Writable;
var bad = new W({
highWaterMark: 10,
lowWaterMark: 1000
});
});

82
test/simple/test-stream2-objects.js

@ -215,35 +215,8 @@ test('falsey values', function(t) {
}));
});
test('low watermark _read', function(t) {
var r = new Readable({
lowWaterMark: 2,
highWaterMark: 6,
objectMode: true
});
var calls = 0;
r._read = function(n, cb) {
calls++;
cb(null, 'foo');
};
// touch to cause it
r.read(0);
r.push(null);
r.pipe(toArray(function(list) {
assert.deepEqual(list, ['foo', 'foo', 'foo']);
t.end();
}));
});
test('high watermark _read', function(t) {
var r = new Readable({
lowWaterMark: 0,
highWaterMark: 6,
objectMode: true
});
@ -285,61 +258,6 @@ test('high watermark push', function(t) {
t.end();
});
test('low watermark push', function(t) {
var r = new Readable({
lowWaterMark: 2,
highWaterMark: 4,
objectMode: true
});
var l = console.log;
var called = 0;
var reading = false;
r._read = function() {
called++;
if (reading) {
assert.equal(r.push(42), false);
}
}
assert.equal(called, 0);
assert.equal(r.push(0), true);
assert.equal(called, 1);
assert.equal(r.push(1), true);
assert.equal(called, 2);
assert.equal(r.push(2), true);
assert.equal(called, 2);
assert.equal(r.push(3), false);
assert.equal(called, 2);
assert.equal(r.push(4), false);
assert.equal(called, 2);
assert.equal(r.push(5), false);
assert.equal(called, 2);
assert.deepEqual(r._readableState.buffer, [0, 1, 2, 3, 4, 5]);
reading = true;
assert.equal(r.read(), 0);
assert.equal(called, 2);
assert.equal(r.read(), 1);
assert.equal(called, 3);
assert.equal(r.read(), 2);
assert.equal(called, 4);
assert.equal(r.read(), 3);
assert.equal(called, 5);
assert.equal(r.read(), 4);
assert.equal(called, 6);
r.push(null);
r.pipe(toArray(function(array) {
assert.deepEqual(array, [5, 42, 42, 42, 42]);
t.end();
}));
});
test('stream of buffers converted to object halfway through', function(t) {
var r = new Readable();
r._read = noop;

1
test/simple/test-stream2-push.js

@ -32,7 +32,6 @@ var EE = require('events').EventEmitter;
// a mock thing a bit like the net.Socket/tcp_wrap.handle interaction
var stream = new Readable({
lowWaterMark: 0,
highWaterMark: 16,
encoding: 'utf8'
});

6
test/simple/test-stream2-transform.js

@ -212,8 +212,6 @@ test('assymetric transform (compress)', function(t) {
}.bind(this), 10);
};
pt._writableState.lowWaterMark = 3;
pt.write(new Buffer('aaaa'));
pt.write(new Buffer('bbbb'));
pt.write(new Buffer('cccc'));
@ -241,9 +239,7 @@ test('assymetric transform (compress)', function(t) {
test('passthrough event emission', function(t) {
var pt = new PassThrough({
lowWaterMark: 0
});
var pt = new PassThrough();
var emits = 0;
pt.on('readable', function() {
var state = pt._readableState;

6
test/simple/test-stream2-writable.js

@ -82,7 +82,6 @@ process.nextTick(run);
test('write fast', function(t) {
var tw = new TestWriter({
lowWaterMark: 5,
highWaterMark: 100
});
@ -100,7 +99,6 @@ test('write fast', function(t) {
test('write slow', function(t) {
var tw = new TestWriter({
lowWaterMark: 5,
highWaterMark: 100
});
@ -121,7 +119,6 @@ test('write slow', function(t) {
test('write backpressure', function(t) {
var tw = new TestWriter({
lowWaterMark: 5,
highWaterMark: 50
});
@ -154,7 +151,6 @@ test('write backpressure', function(t) {
test('write bufferize', function(t) {
var tw = new TestWriter({
lowWaterMark: 5,
highWaterMark: 100
});
@ -185,7 +181,6 @@ test('write bufferize', function(t) {
test('write no bufferize', function(t) {
var tw = new TestWriter({
lowWaterMark: 5,
highWaterMark: 100,
decodeStrings: false
});
@ -234,7 +229,6 @@ test('write callbacks', function (t) {
callbacks._called = [];
var tw = new TestWriter({
lowWaterMark: 5,
highWaterMark: 100
});

Loading…
Cancel
Save