Browse Source

streams: Support objects other than Buffers

We detect for non-string and non-buffer values in onread and
turn the stream into an "objectMode" stream.

If we are in "objectMode" mode then howMuchToRead will
always return 1, state.length will always have 1 appended
to it when there is a new item and fromList always takes
the first value from the list.

This means that for object streams, the n in read(n) is
ignored and read() will always return a single value

Fixed a bug with unpipe where the pipe would break because
the flowing state was not reset to false.

Fixed a bug with sync cb(null, null) in _read which would
forget to end the readable stream
v0.9.8-release
Raynos 12 years ago
committed by isaacs
parent
commit
444bbd4fa7
  1. 3
      doc/api/stream.markdown
  2. 52
      lib/_stream_readable.js
  3. 48
      lib/_stream_writable.js
  4. 134
      test/simple/test-stream2-basic.js
  5. 484
      test/simple/test-stream2-objects.js
  6. 29
      test/simple/test-stream2-readable-from-list.js
  7. 17
      test/simple/test-stream2-set-encoding.js
  8. 13
      test/simple/test-stream2-transform.js
  9. 19
      test/simple/test-stream2-writable.js

3
doc/api/stream.markdown

@ -99,6 +99,9 @@ method. (See below.)
resource. Default=16kb
* `encoding` {String} If specified, then buffers will be decoded to
strings using the specified encoding. Default=null
* `objectMode` {Boolean} Whether this stream should behave
as a stream of objects. Meaning that stream.read(n) returns
a single value instead of a Buffer of size n
In classes that extend the Readable class, make sure to call the
constructor so that the buffering settings can be properly

52
lib/_stream_readable.js

@ -69,6 +69,11 @@ function ReadableState(options, stream) {
this.needReadable = false;
this.emittedReadable = false;
// object stream flag. Used to make read(n) ignore n and to
// make all the buffer merging and length checks go away
this.objectMode = !!options.objectMode;
// when piping, we only care about 'readable' events that happen
// after read()ing all the bytes and not getting any pushback.
this.ranOut = false;
@ -129,6 +134,9 @@ function howMuchToRead(n, state) {
if (state.length === 0 && state.ended)
return 0;
if (state.objectMode)
return n === 0 ? 0 : 1;
if (isNaN(n) || n === null)
return state.length;
@ -217,11 +225,11 @@ Readable.prototype.read = function(n) {
var ret;
if (n > 0)
ret = fromList(n, state.buffer, state.length, !!state.decoder);
ret = fromList(n, state);
else
ret = null;
if (ret === null || ret.length === 0) {
if (ret === null || (!state.objectMode && ret.length === 0)) {
state.needReadable = true;
n = 0;
}
@ -246,20 +254,36 @@ function onread(stream, er, chunk) {
var state = stream._readableState;
var sync = state.sync;
// If we get something that is not a buffer, string, null, or undefined,
// then switch into objectMode. Now stream chunks are all considered
// to be of length=1, and the watermarks determine how many objects to
// keep in the buffer, rather than how many bytes or characters.
if (!Buffer.isBuffer(chunk) &&
'string' !== typeof chunk &&
chunk !== null &&
chunk !== undefined) {
state.objectMode = true;
state.length = state.buffer.length;
state.decoder = null;
}
state.reading = false;
if (er)
return stream.emit('error', er);
if (!chunk || !chunk.length) {
if (chunk === null ||
chunk === undefined ||
(!state.objectMode && !chunk.length)) {
// eof
state.ended = true;
if (state.decoder) {
chunk = state.decoder.end();
if (chunk && chunk.length) {
state.buffer.push(chunk);
state.length += chunk.length;
state.length += state.objectMode ? 1 : chunk.length;
}
}
// if we've ended and we have some data left, then emit
// 'readable' now to make sure it gets picked up.
if (!sync) {
@ -271,7 +295,8 @@ function onread(stream, er, chunk) {
}
} else
endReadable(stream);
}
} else
endReadable(stream);
return;
}
@ -279,8 +304,8 @@ function onread(stream, er, chunk) {
chunk = state.decoder.write(chunk);
// update the buffer info.
if (chunk) {
state.length += chunk.length;
if (chunk || (state.objectMode && chunk !== undefined && chunk !== null)) {
state.length += state.objectMode ? 1 : chunk.length;
state.buffer.push(chunk);
}
@ -502,6 +527,7 @@ Readable.prototype.unpipe = function(dest) {
state.pipes = null;
state.pipesCount = 0;
this.removeListener('readable', pipeOnReadable);
state.flowing = false;
if (dest)
dest.emit('unpipe', this);
return this;
@ -516,6 +542,7 @@ Readable.prototype.unpipe = function(dest) {
state.pipes = null;
state.pipesCount = 0;
this.removeListener('readable', pipeOnReadable);
state.flowing = false;
for (var i = 0; i < len; i++)
dests[i].emit('unpipe', this);
@ -680,16 +707,21 @@ Readable._fromList = fromList;
// Pluck off n bytes from an array of buffers.
// Length is the combined lengths of all the buffers in the list.
function fromList(n, list, length, stringMode) {
function fromList(n, state) {
var list = state.buffer;
var length = state.length;
var stringMode = !!state.decoder;
var objectMode = !!state.objectMode;
var ret;
// nothing in the list, definitely empty.
if (list.length === 0) {
if (list.length === 0)
return null;
}
if (length === 0)
ret = null;
else if (objectMode)
ret = list.shift();
else if (!n || n >= length) {
// read it all, truncate the array.
if (stringMode)

48
lib/_stream_writable.js

@ -45,6 +45,10 @@ function WritableState(options, stream) {
// default to pushing everything out as fast as possible.
this.lowWaterMark = options.lowWaterMark || 0;
// object stream flag to indicate whether or not this stream
// contains buffers or objects.
this.objectMode = !!options.objectMode;
// cast to ints.
this.lowWaterMark = ~~this.lowWaterMark;
this.highWaterMark = ~~this.highWaterMark;
@ -130,15 +134,29 @@ Writable.prototype.write = function(chunk, encoding, cb) {
return;
}
var l = chunk.length;
if (false === state.decodeStrings)
chunk = [chunk, encoding || 'utf8'];
else if (typeof chunk === 'string' || encoding) {
chunk = new Buffer(chunk + '', encoding);
l = chunk.length;
// Writing something other than a string or buffer will switch
// the stream into objectMode.
if (!state.objectMode &&
typeof chunk !== 'string' &&
chunk !== null &&
chunk !== undefined &&
!Buffer.isBuffer(chunk))
state.objectMode = true;
var len;
if (state.objectMode)
len = 1;
else {
len = chunk.length;
if (false === state.decodeStrings)
chunk = [chunk, encoding || 'utf8'];
else if (typeof chunk === 'string' || encoding) {
chunk = new Buffer(chunk + '', encoding);
len = chunk.length;
}
}
state.length += l;
state.length += len;
var ret = state.length < state.highWaterMark;
if (ret === false)
@ -153,7 +171,7 @@ Writable.prototype.write = function(chunk, encoding, cb) {
state.writing = true;
state.sync = true;
state.writelen = l;
state.writelen = len;
state.writecb = cb;
this._write(chunk, state.onwrite);
state.sync = false;
@ -165,7 +183,7 @@ function onwrite(stream, er) {
var state = stream._writableState;
var sync = state.sync;
var cb = state.writecb;
var l = state.writelen;
var len = state.writelen;
state.writing = false;
state.writelen = null;
@ -188,7 +206,7 @@ function onwrite(stream, er) {
stream.emit('error', er);
return;
}
state.length -= l;
state.length -= len;
if (cb) {
// Don't call the cb until the next tick if we're in sync mode.
@ -232,12 +250,14 @@ function onwrite(stream, er) {
var chunk = chunkCb[0];
cb = chunkCb[1];
if (false === state.decodeStrings)
l = chunk[0].length;
if (state.objectMode)
len = 1;
else if (false === state.decodeStrings)
len = chunk[0].length;
else
l = chunk.length;
len = chunk.length;
state.writelen = l;
state.writelen = len;
state.writecb = cb;
state.writechunk = chunk;
state.writing = true;

134
test/simple/test-stream2-basic.js

@ -96,7 +96,10 @@ TestWriter.prototype.end = function(c) {
// tiny node-tap lookalike.
var tests = [];
var count = 0;
function test(name, fn) {
count++;
tests.push([name, fn]);
}
@ -111,10 +114,18 @@ function run() {
fn({
same: assert.deepEqual,
equal: assert.equal,
end: run
end: function () {
count--;
run();
}
});
}
// ensure all tests have run
process.on("exit", function () {
assert.equal(count, 0);
});
process.nextTick(run);
@ -319,6 +330,127 @@ test('multipipe', function(t) {
});
});
test('back pressure respected', function (t) {
function noop() {}
var r = new R();
var counter = 0;
r.push(["one"]);
r.push(["two"]);
r.push(["three"]);
r.push(["four"]);
r.push(null);
r._read = noop;
var w1 = new R();
w1.write = function (chunk) {
assert.equal(chunk[0], "one");
w1.emit("close");
process.nextTick(function () {
r.pipe(w2);
r.pipe(w3);
})
};
w1.end = noop;
r.pipe(w1);
var expected = ["two", "two", "three", "three", "four", "four"];
var w2 = new R();
w2.write = function (chunk) {
assert.equal(chunk[0], expected.shift());
assert.equal(counter, 0);
counter++;
if (chunk[0] === "four") {
return true;
}
setTimeout(function () {
counter--;
w2.emit("drain");
}, 10);
return false;
}
w2.end = noop;
var w3 = new R();
w3.write = function (chunk) {
assert.equal(chunk[0], expected.shift());
assert.equal(counter, 1);
counter++;
if (chunk[0] === "four") {
return true;
}
setTimeout(function () {
counter--;
w3.emit("drain");
}, 50);
return false;
};
w3.end = function () {
assert.equal(counter, 2);
assert.equal(expected.length, 0);
t.end();
};
});
test('read(0) for ended streams', function (t) {
var r = new R();
var written = false;
var ended = false;
r._read = function () {};
r.push(new Buffer("foo"));
r.push(null);
var v = r.read(0);
assert.equal(v, null);
var w = new R();
w.write = function (buffer) {
written = true;
assert.equal(ended, false);
assert.equal(buffer.toString(), "foo")
};
w.end = function () {
ended = true;
assert.equal(written, true);
t.end();
};
r.pipe(w);
})
test('sync _read ending', function (t) {
var r = new R();
var called = false;
r._read = function (n, cb) {
cb(null, null);
};
r.once('end', function () {
called = true;
})
r.read();
process.nextTick(function () {
assert.equal(called, true);
t.end();
})
});
assert.throws(function() {
var bad = new R({
highWaterMark: 10,

484
test/simple/test-stream2-objects.js

@ -0,0 +1,484 @@
// Copyright Joyent, Inc. and other Node contributors.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the
// "Software"), to deal in the Software without restriction, including
// without limitation the rights to use, copy, modify, merge, publish,
// distribute, sublicense, and/or sell copies of the Software, and to permit
// persons to whom the Software is furnished to do so, subject to the
// following conditions:
//
// The above copyright notice and this permission notice shall be included
// in all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
// USE OR OTHER DEALINGS IN THE SOFTWARE.
var common = require('../common.js');
var Readable = require('_stream_readable');
var Writable = require('_stream_writable');
var assert = require('assert');
// tiny node-tap lookalike.
var tests = [];
var count = 0;
function test(name, fn) {
count++;
tests.push([name, fn]);
}
function run() {
var next = tests.shift();
if (!next)
return console.error('ok');
var name = next[0];
var fn = next[1];
console.log('# %s', name);
fn({
same: assert.deepEqual,
equal: assert.equal,
end: function() {
count--;
run();
}
});
}
// ensure all tests have run
process.on('exit', function() {
assert.equal(count, 0);
});
process.nextTick(run);
function toArray(callback) {
var stream = new Writable();
var list = [];
stream.write = function(chunk) {
list.push(chunk);
};
stream.end = function() {
callback(list);
};
return stream;
}
function fromArray(list) {
var r = new Readable();
list.forEach(function(chunk) {
r.push(chunk);
});
r.push(null);
r._read = noop;
return r;
}
function noop() {}
test('can read objects from stream', function(t) {
var r = fromArray([{ one: '1'}, { two: '2' }]);
var v1 = r.read();
var v2 = r.read();
var v3 = r.read();
assert.deepEqual(v1, { one: '1' });
assert.deepEqual(v2, { two: '2' });
assert.deepEqual(v3, null);
t.end();
});
test('can pipe objects into stream', function(t) {
var r = fromArray([{ one: '1'}, { two: '2' }]);
r.pipe(toArray(function(list) {
assert.deepEqual(list, [
{ one: '1' },
{ two: '2' }
]);
t.end();
}));
});
test('read(n) is ignored', function(t) {
var r = fromArray([{ one: '1'}, { two: '2' }]);
var value = r.read(2);
assert.deepEqual(value, { one: '1' });
t.end();
});
test('can read objects from _read (sync)', function(t) {
var r = new Readable();
var list = [{ one: '1'}, { two: '2' }];
r._read = function(n, cb) {
var item = list.shift();
cb(null, item || null);
};
r.pipe(toArray(function(list) {
assert.deepEqual(list, [
{ one: '1' },
{ two: '2' }
]);
t.end();
}));
});
test('can read objects from _read (async)', function(t) {
var r = new Readable();
var list = [{ one: '1'}, { two: '2' }];
r._read = function(n, cb) {
var item = list.shift();
process.nextTick(function() {
cb(null, item || null);
});
};
r.pipe(toArray(function(list) {
assert.deepEqual(list, [
{ one: '1' },
{ two: '2' }
]);
t.end();
}));
});
test('can read strings as objects', function(t) {
var r = new Readable({
objectMode: true
});
r._read = noop;
var list = ['one', 'two', 'three'];
list.forEach(function(str) {
r.push(str);
});
r.push(null);
r.pipe(toArray(function(array) {
assert.deepEqual(array, list);
t.end();
}));
});
test('read(0) for object streams', function(t) {
var r = new Readable({
objectMode: true
});
r._read = noop;
r.push('foobar');
r.push(null);
var v = r.read(0);
r.pipe(toArray(function(array) {
assert.deepEqual(array, ['foobar']);
t.end();
}));
});
test('falsey values', function(t) {
var r = new Readable({
objectMode: true
});
r._read = noop;
r.push(false);
r.push(0);
r.push('');
r.push(null);
r.pipe(toArray(function(array) {
assert.deepEqual(array, [false, 0, '']);
t.end();
}));
});
test('low watermark _read', function(t) {
var r = new Readable({
lowWaterMark: 2,
highWaterMark: 6,
objectMode: true
});
var calls = 0;
r._read = function(n, cb) {
calls++;
cb(null, 'foo');
};
// touch to cause it
r.read(0);
r.push(null);
r.pipe(toArray(function(list) {
assert.deepEqual(list, ['foo', 'foo', 'foo']);
t.end();
}));
});
test('high watermark _read', function(t) {
var r = new Readable({
lowWaterMark: 0,
highWaterMark: 6,
objectMode: true
});
var calls = 0;
var list = ['1', '2', '3', '4', '5', '6', '7', '8'];
r._read = function() {
calls++;
};
list.forEach(function(c) {
r.push(c);
});
var v = r.read();
assert.equal(calls, 0);
assert.equal(v, '1');
var v2 = r.read();
assert.equal(calls, 1);
assert.equal(v2, '2');
t.end();
});
test('high watermark push', function(t) {
var r = new Readable({
highWaterMark: 6,
objectMode: true
});
r._read = function() {};
for (var i = 0; i < 6; i++) {
var bool = r.push(i);
assert.equal(bool, i === 5 ? false : true);
}
t.end();
});
test('low watermark push', function(t) {
var r = new Readable({
lowWaterMark: 2,
highWaterMark: 4,
objectMode: true
});
var l = console.log;
var called = 0;
var reading = false;
r._read = function() {
called++;
if (reading) {
assert.equal(r.push(42), false);
}
}
assert.equal(called, 0);
assert.equal(r.push(0), true);
assert.equal(called, 1);
assert.equal(r.push(1), true);
assert.equal(called, 2);
assert.equal(r.push(2), true);
assert.equal(called, 2);
assert.equal(r.push(3), false);
assert.equal(called, 2);
assert.equal(r.push(4), false);
assert.equal(called, 2);
assert.equal(r.push(5), false);
assert.equal(called, 2);
assert.deepEqual(r._readableState.buffer, [0, 1, 2, 3, 4, 5]);
reading = true;
assert.equal(r.read(), 0);
assert.equal(called, 2);
assert.equal(r.read(), 1);
assert.equal(called, 3);
assert.equal(r.read(), 2);
assert.equal(called, 4);
assert.equal(r.read(), 3);
assert.equal(called, 5);
assert.equal(r.read(), 4);
assert.equal(called, 6);
r.push(null);
r.pipe(toArray(function(array) {
assert.deepEqual(array, [5, 42, 42, 42, 42]);
t.end();
}));
});
test('stream of buffers converted to object halfway through', function(t) {
var r = new Readable();
r._read = noop;
r.push(new Buffer('fus'));
r.push(new Buffer('do'));
r.push(new Buffer('rah'));
var str = r.read(4);
assert.equal(str, 'fusd');
r.push({ foo: 'bar' });
r.push(null);
r.pipe(toArray(function(list) {
assert.deepEqual(list, [
new Buffer('o'),
new Buffer('rah'),
{ foo: 'bar'}
]);
t.end();
}));
});
test('stream of strings converted to objects halfway through', function(t) {
var r = new Readable({
encoding: 'utf8'
});
r._read = noop;
r.push('fus');
r.push('do');
r.push('rah');
var str = r.read(4);
assert.equal(str, 'fusd');
r.push({ foo: 'bar' });
r.push(null);
r.pipe(toArray(function(list) {
assert.deepEqual(list, [
'o',
'rah',
{ foo: 'bar'}
]);
t.end();
}));
});
test('can write objects to stream', function(t) {
var w = new Writable();
w._write = function(chunk, cb) {
assert.deepEqual(chunk, { foo: 'bar' });
cb();
};
w.on('finish', function() {
t.end();
});
w.write({ foo: 'bar' });
w.end();
});
test('can write multiple objects to stream', function(t) {
var w = new Writable();
var list = [];
w._write = function(chunk, cb) {
list.push(chunk);
cb();
};
w.on('finish', function() {
assert.deepEqual(list, [0, 1, 2, 3, 4]);
t.end();
});
w.write(0);
w.write(1);
w.write(2);
w.write(3);
w.write(4);
w.end();
});
test('can write strings as objects', function(t) {
var w = new Writable({
objectMode: true
});
var list = [];
w._write = function(chunk, cb) {
list.push(chunk);
process.nextTick(cb);
};
w.on('finish', function() {
assert.deepEqual(list, ['0', '1', '2', '3', '4']);
t.end();
});
w.write('0');
w.write('1');
w.write('2');
w.write('3');
w.write('4');
w.end();
});
test('buffers finish until cb is called', function(t) {
var w = new Writable({
objectMode: true
});
var called = false;
w._write = function(chunk, cb) {
assert.equal(chunk, 'foo');
process.nextTick(function() {
called = true;
cb();
});
};
w.on('finish', function() {
assert.equal(called, true);
t.end();
});
w.write('foo');
w.end();
});

29
test/simple/test-stream2-readable-from-list.js

@ -25,7 +25,10 @@ var fromList = require('_stream_readable')._fromList;
// tiny node-tap lookalike.
var tests = [];
var count = 0;
function test(name, fn) {
count++;
tests.push([name, fn]);
}
@ -40,10 +43,18 @@ function run() {
fn({
same: assert.deepEqual,
equal: assert.equal,
end: run
end: function () {
count--;
run();
}
});
}
// ensure all tests have run
process.on("exit", function () {
assert.equal(count, 0);
});
process.nextTick(run);
@ -57,19 +68,19 @@ test('buffers', function(t) {
new Buffer('kuel') ];
// read more than the first element.
var ret = fromList(6, list, 16);
var ret = fromList(6, { buffer: list, length: 16 });
t.equal(ret.toString(), 'foogba');
// read exactly the first element.
ret = fromList(2, list, 10);
ret = fromList(2, { buffer: list, length: 10 });
t.equal(ret.toString(), 'rk');
// read less than the first element.
ret = fromList(2, list, 8);
ret = fromList(2, { buffer: list, length: 8 });
t.equal(ret.toString(), 'ba');
// read more than we have.
ret = fromList(100, list, 6);
ret = fromList(100, { buffer: list, length: 6 });
t.equal(ret.toString(), 'zykuel');
// all consumed.
@ -87,19 +98,19 @@ test('strings', function(t) {
'kuel' ];
// read more than the first element.
var ret = fromList(6, list, 16, true);
var ret = fromList(6, { buffer: list, length: 16, decoder: true });
t.equal(ret, 'foogba');
// read exactly the first element.
ret = fromList(2, list, 10, true);
ret = fromList(2, { buffer: list, length: 10, decoder: true });
t.equal(ret, 'rk');
// read less than the first element.
ret = fromList(2, list, 8, true);
ret = fromList(2, { buffer: list, length: 8, decoder: true });
t.equal(ret, 'ba');
// read more than we have.
ret = fromList(100, list, 6, true);
ret = fromList(100, { buffer: list, length: 6, decoder: true });
t.equal(ret, 'zykuel');
// all consumed.

17
test/simple/test-stream2-set-encoding.js

@ -27,7 +27,10 @@ var util = require('util');
// tiny node-tap lookalike.
var tests = [];
var count = 0;
function test(name, fn) {
count++;
tests.push([name, fn]);
}
@ -42,10 +45,18 @@ function run() {
fn({
same: assert.deepEqual,
equal: assert.equal,
end: run
end: function () {
count--;
run();
}
});
}
// ensure all tests have run
process.on("exit", function () {
assert.equal(count, 0);
});
process.nextTick(run);
/////
@ -77,6 +88,8 @@ TestReader.prototype._read = function(n, cb) {
var ret = new Buffer(n);
ret.fill('a');
console.log("cb(null, ret)", ret)
return cb(null, ret);
}.bind(this), 1);
};
@ -177,12 +190,14 @@ test('setEncoding hex with read(13)', function(t) {
"16161" ];
tr.on('readable', function flow() {
console.log("readable once")
var chunk;
while (null !== (chunk = tr.read(13)))
out.push(chunk);
});
tr.on('end', function() {
console.log("END")
t.same(out, expect);
t.end();
});

13
test/simple/test-stream2-transform.js

@ -26,7 +26,10 @@ var Transform = require('_stream_transform');
// tiny node-tap lookalike.
var tests = [];
var count = 0;
function test(name, fn) {
count++;
tests.push([name, fn]);
}
@ -41,10 +44,18 @@ function run() {
fn({
same: assert.deepEqual,
equal: assert.equal,
end: run
end: function () {
count--;
run();
}
});
}
// ensure all tests have run
process.on("exit", function () {
assert.equal(count, 0);
});
process.nextTick(run);
/////

19
test/simple/test-stream2-writable.js

@ -48,29 +48,36 @@ for (var i = 0; i < chunks.length; i++) {
// tiny node-tap lookalike.
var tests = [];
var count = 0;
function test(name, fn) {
count++;
tests.push([name, fn]);
}
function run() {
var next = tests.shift();
if (!next)
return console.log('ok');
return console.error('ok');
var name = next[0];
var fn = next[1];
if (!fn)
return run();
console.log('# %s', name);
fn({
same: assert.deepEqual,
equal: assert.equal,
end: run
end: function () {
count--;
run();
}
});
}
// ensure all tests have run
process.on("exit", function () {
assert.equal(count, 0);
});
process.nextTick(run);
test('write fast', function(t) {

Loading…
Cancel
Save