diff --git a/doc/api/stream.md b/doc/api/stream.md index 743bea7955..c26ab800fa 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -1752,6 +1752,10 @@ constructor and implement *both* the `readable._read()` and * `writableObjectMode` {boolean} Defaults to `false`. Sets `objectMode` for writable side of the stream. Has no effect if `objectMode` is `true`. + * `readableHighWaterMark` {number} Sets `highWaterMark` for the readable side + of the stream. Has no effect if `highWaterMark` is provided. + * `writableHighWaterMark` {number} Sets `highWaterMark` for the writable side + of the stream. Has no effect if `highWaterMark` is provided. For example: diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index 463ac3bfbc..1a0bc8902a 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -61,18 +61,32 @@ function prependListener(emitter, event, fn) { function ReadableState(options, stream) { options = options || {}; + // Duplex streams are both readable and writable, but share + // the same options object. + // However, some cases require setting options to different + // values for the readable and the writable sides of the duplex stream. + // These options can be provided separately as readableXXX and writableXXX. + var isDuplex = stream instanceof Stream.Duplex; + // object stream flag. Used to make read(n) ignore n and to // make all the buffer merging and length checks go away this.objectMode = !!options.objectMode; - if (stream instanceof Stream.Duplex) + if (isDuplex) this.objectMode = this.objectMode || !!options.readableObjectMode; // the point at which it stops calling _read() to fill the buffer // Note: 0 is a valid value, means "don't call _read preemptively ever" var hwm = options.highWaterMark; + var readableHwm = options.readableHighWaterMark; var defaultHwm = this.objectMode ? 16 : 16 * 1024; - this.highWaterMark = (hwm || hwm === 0) ? hwm : defaultHwm; + + if (hwm || hwm === 0) + this.highWaterMark = hwm; + else if (isDuplex && (readableHwm || readableHwm === 0)) + this.highWaterMark = readableHwm; + else + this.highWaterMark = defaultHwm; // cast to ints. this.highWaterMark = Math.floor(this.highWaterMark); diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index 334f492ba6..6e0eaf45b5 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -41,19 +41,33 @@ function nop() {} function WritableState(options, stream) { options = options || {}; + // Duplex streams are both readable and writable, but share + // the same options object. + // However, some cases require setting options to different + // values for the readable and the writable sides of the duplex stream. + // These options can be provided separately as readableXXX and writableXXX. + var isDuplex = stream instanceof Stream.Duplex; + // object stream flag to indicate whether or not this stream // contains buffers or objects. this.objectMode = !!options.objectMode; - if (stream instanceof Stream.Duplex) + if (isDuplex) this.objectMode = this.objectMode || !!options.writableObjectMode; // the point at which write() starts returning false // Note: 0 is a valid value, means that we always return false if // the entire buffer is not flushed immediately on write() var hwm = options.highWaterMark; + var writableHwm = options.writableHighWaterMark; var defaultHwm = this.objectMode ? 16 : 16 * 1024; - this.highWaterMark = (hwm || hwm === 0) ? hwm : defaultHwm; + + if (hwm || hwm === 0) + this.highWaterMark = hwm; + else if (isDuplex && (writableHwm || writableHwm === 0)) + this.highWaterMark = writableHwm; + else + this.highWaterMark = defaultHwm; // cast to ints. this.highWaterMark = Math.floor(this.highWaterMark); diff --git a/test/parallel/test-stream-transform-split-highwatermark.js b/test/parallel/test-stream-transform-split-highwatermark.js new file mode 100644 index 0000000000..af2558ec6d --- /dev/null +++ b/test/parallel/test-stream-transform-split-highwatermark.js @@ -0,0 +1,71 @@ +'use strict'; +require('../common'); +const assert = require('assert'); + +const { Transform, Readable, Writable } = require('stream'); + +const DEFAULT = 16 * 1024; + +function testTransform(expectedReadableHwm, expectedWritableHwm, options) { + const t = new Transform(options); + assert.strictEqual(t._readableState.highWaterMark, expectedReadableHwm); + assert.strictEqual(t._writableState.highWaterMark, expectedWritableHwm); +} + +// test overriding defaultHwm +testTransform(666, DEFAULT, { readableHighWaterMark: 666 }); +testTransform(DEFAULT, 777, { writableHighWaterMark: 777 }); +testTransform(666, 777, { + readableHighWaterMark: 666, + writableHighWaterMark: 777, +}); + +// test 0 overriding defaultHwm +testTransform(0, DEFAULT, { readableHighWaterMark: 0 }); +testTransform(DEFAULT, 0, { writableHighWaterMark: 0 }); + +// test highWaterMark overriding +testTransform(555, 555, { + highWaterMark: 555, + readableHighWaterMark: 666, +}); +testTransform(555, 555, { + highWaterMark: 555, + writableHighWaterMark: 777, +}); +testTransform(555, 555, { + highWaterMark: 555, + readableHighWaterMark: 666, + writableHighWaterMark: 777, +}); + +// test highWaterMark = 0 overriding +testTransform(0, 0, { + highWaterMark: 0, + readableHighWaterMark: 666, +}); +testTransform(0, 0, { + highWaterMark: 0, + writableHighWaterMark: 777, +}); +testTransform(0, 0, { + highWaterMark: 0, + readableHighWaterMark: 666, + writableHighWaterMark: 777, +}); + +// test undefined, null, NaN +[undefined, null, NaN].forEach((v) => { + testTransform(DEFAULT, DEFAULT, { readableHighWaterMark: v }); + testTransform(DEFAULT, DEFAULT, { writableHighWaterMark: v }); + testTransform(666, DEFAULT, { highWaterMark: v, readableHighWaterMark: 666 }); + testTransform(DEFAULT, 777, { highWaterMark: v, writableHighWaterMark: 777 }); +}); + +// test non Duplex streams ignore the options +{ + const r = new Readable({ readableHighWaterMark: 666 }); + assert.strictEqual(r._readableState.highWaterMark, DEFAULT); + const w = new Writable({ writableHighWaterMark: 777 }); + assert.strictEqual(w._writableState.highWaterMark, DEFAULT); +}