From 444bbd4fa7315423a6b55aba0e0c12ea6534b2cb Mon Sep 17 00:00:00 2001 From: Raynos Date: Fri, 11 Jan 2013 20:59:57 -0800 Subject: [PATCH] streams: Support objects other than Buffers We detect for non-string and non-buffer values in onread and turn the stream into an "objectMode" stream. If we are in "objectMode" mode then howMuchToRead will always return 1, state.length will always have 1 appended to it when there is a new item and fromList always takes the first value from the list. This means that for object streams, the n in read(n) is ignored and read() will always return a single value Fixed a bug with unpipe where the pipe would break because the flowing state was not reset to false. Fixed a bug with sync cb(null, null) in _read which would forget to end the readable stream --- doc/api/stream.markdown | 3 + lib/_stream_readable.js | 52 +- lib/_stream_writable.js | 48 +- test/simple/test-stream2-basic.js | 134 ++++- test/simple/test-stream2-objects.js | 484 ++++++++++++++++++ .../simple/test-stream2-readable-from-list.js | 29 +- test/simple/test-stream2-set-encoding.js | 17 +- test/simple/test-stream2-transform.js | 13 +- test/simple/test-stream2-writable.js | 19 +- 9 files changed, 757 insertions(+), 42 deletions(-) create mode 100644 test/simple/test-stream2-objects.js diff --git a/doc/api/stream.markdown b/doc/api/stream.markdown index bf9da43e16..7cff67ea21 100644 --- a/doc/api/stream.markdown +++ b/doc/api/stream.markdown @@ -99,6 +99,9 @@ method. (See below.) resource. Default=16kb * `encoding` {String} If specified, then buffers will be decoded to strings using the specified encoding. Default=null + * `objectMode` {Boolean} Whether this stream should behave + as a stream of objects. Meaning that stream.read(n) returns + a single value instead of a Buffer of size n In classes that extend the Readable class, make sure to call the constructor so that the buffering settings can be properly diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index a6943c9f87..a5921805c6 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -69,6 +69,11 @@ function ReadableState(options, stream) { this.needReadable = false; this.emittedReadable = false; + + // object stream flag. Used to make read(n) ignore n and to + // make all the buffer merging and length checks go away + this.objectMode = !!options.objectMode; + // when piping, we only care about 'readable' events that happen // after read()ing all the bytes and not getting any pushback. this.ranOut = false; @@ -129,6 +134,9 @@ function howMuchToRead(n, state) { if (state.length === 0 && state.ended) return 0; + if (state.objectMode) + return n === 0 ? 0 : 1; + if (isNaN(n) || n === null) return state.length; @@ -217,11 +225,11 @@ Readable.prototype.read = function(n) { var ret; if (n > 0) - ret = fromList(n, state.buffer, state.length, !!state.decoder); + ret = fromList(n, state); else ret = null; - if (ret === null || ret.length === 0) { + if (ret === null || (!state.objectMode && ret.length === 0)) { state.needReadable = true; n = 0; } @@ -246,20 +254,36 @@ function onread(stream, er, chunk) { var state = stream._readableState; var sync = state.sync; + // If we get something that is not a buffer, string, null, or undefined, + // then switch into objectMode. Now stream chunks are all considered + // to be of length=1, and the watermarks determine how many objects to + // keep in the buffer, rather than how many bytes or characters. + if (!Buffer.isBuffer(chunk) && + 'string' !== typeof chunk && + chunk !== null && + chunk !== undefined) { + state.objectMode = true; + state.length = state.buffer.length; + state.decoder = null; + } + state.reading = false; if (er) return stream.emit('error', er); - if (!chunk || !chunk.length) { + if (chunk === null || + chunk === undefined || + (!state.objectMode && !chunk.length)) { // eof state.ended = true; if (state.decoder) { chunk = state.decoder.end(); if (chunk && chunk.length) { state.buffer.push(chunk); - state.length += chunk.length; + state.length += state.objectMode ? 1 : chunk.length; } } + // if we've ended and we have some data left, then emit // 'readable' now to make sure it gets picked up. if (!sync) { @@ -271,7 +295,8 @@ function onread(stream, er, chunk) { } } else endReadable(stream); - } + } else + endReadable(stream); return; } @@ -279,8 +304,8 @@ function onread(stream, er, chunk) { chunk = state.decoder.write(chunk); // update the buffer info. - if (chunk) { - state.length += chunk.length; + if (chunk || (state.objectMode && chunk !== undefined && chunk !== null)) { + state.length += state.objectMode ? 1 : chunk.length; state.buffer.push(chunk); } @@ -502,6 +527,7 @@ Readable.prototype.unpipe = function(dest) { state.pipes = null; state.pipesCount = 0; this.removeListener('readable', pipeOnReadable); + state.flowing = false; if (dest) dest.emit('unpipe', this); return this; @@ -516,6 +542,7 @@ Readable.prototype.unpipe = function(dest) { state.pipes = null; state.pipesCount = 0; this.removeListener('readable', pipeOnReadable); + state.flowing = false; for (var i = 0; i < len; i++) dests[i].emit('unpipe', this); @@ -680,16 +707,21 @@ Readable._fromList = fromList; // Pluck off n bytes from an array of buffers. // Length is the combined lengths of all the buffers in the list. -function fromList(n, list, length, stringMode) { +function fromList(n, state) { + var list = state.buffer; + var length = state.length; + var stringMode = !!state.decoder; + var objectMode = !!state.objectMode; var ret; // nothing in the list, definitely empty. - if (list.length === 0) { + if (list.length === 0) return null; - } if (length === 0) ret = null; + else if (objectMode) + ret = list.shift(); else if (!n || n >= length) { // read it all, truncate the array. if (stringMode) diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index 2d63c4d9e8..53c67ecbcc 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -45,6 +45,10 @@ function WritableState(options, stream) { // default to pushing everything out as fast as possible. this.lowWaterMark = options.lowWaterMark || 0; + // object stream flag to indicate whether or not this stream + // contains buffers or objects. + this.objectMode = !!options.objectMode; + // cast to ints. this.lowWaterMark = ~~this.lowWaterMark; this.highWaterMark = ~~this.highWaterMark; @@ -130,15 +134,29 @@ Writable.prototype.write = function(chunk, encoding, cb) { return; } - var l = chunk.length; - if (false === state.decodeStrings) - chunk = [chunk, encoding || 'utf8']; - else if (typeof chunk === 'string' || encoding) { - chunk = new Buffer(chunk + '', encoding); - l = chunk.length; + // Writing something other than a string or buffer will switch + // the stream into objectMode. + if (!state.objectMode && + typeof chunk !== 'string' && + chunk !== null && + chunk !== undefined && + !Buffer.isBuffer(chunk)) + state.objectMode = true; + + var len; + if (state.objectMode) + len = 1; + else { + len = chunk.length; + if (false === state.decodeStrings) + chunk = [chunk, encoding || 'utf8']; + else if (typeof chunk === 'string' || encoding) { + chunk = new Buffer(chunk + '', encoding); + len = chunk.length; + } } - state.length += l; + state.length += len; var ret = state.length < state.highWaterMark; if (ret === false) @@ -153,7 +171,7 @@ Writable.prototype.write = function(chunk, encoding, cb) { state.writing = true; state.sync = true; - state.writelen = l; + state.writelen = len; state.writecb = cb; this._write(chunk, state.onwrite); state.sync = false; @@ -165,7 +183,7 @@ function onwrite(stream, er) { var state = stream._writableState; var sync = state.sync; var cb = state.writecb; - var l = state.writelen; + var len = state.writelen; state.writing = false; state.writelen = null; @@ -188,7 +206,7 @@ function onwrite(stream, er) { stream.emit('error', er); return; } - state.length -= l; + state.length -= len; if (cb) { // Don't call the cb until the next tick if we're in sync mode. @@ -232,12 +250,14 @@ function onwrite(stream, er) { var chunk = chunkCb[0]; cb = chunkCb[1]; - if (false === state.decodeStrings) - l = chunk[0].length; + if (state.objectMode) + len = 1; + else if (false === state.decodeStrings) + len = chunk[0].length; else - l = chunk.length; + len = chunk.length; - state.writelen = l; + state.writelen = len; state.writecb = cb; state.writechunk = chunk; state.writing = true; diff --git a/test/simple/test-stream2-basic.js b/test/simple/test-stream2-basic.js index dff3340a48..ab4b066b34 100644 --- a/test/simple/test-stream2-basic.js +++ b/test/simple/test-stream2-basic.js @@ -96,7 +96,10 @@ TestWriter.prototype.end = function(c) { // tiny node-tap lookalike. var tests = []; +var count = 0; + function test(name, fn) { + count++; tests.push([name, fn]); } @@ -111,10 +114,18 @@ function run() { fn({ same: assert.deepEqual, equal: assert.equal, - end: run + end: function () { + count--; + run(); + } }); } +// ensure all tests have run +process.on("exit", function () { + assert.equal(count, 0); +}); + process.nextTick(run); @@ -319,6 +330,127 @@ test('multipipe', function(t) { }); }); +test('back pressure respected', function (t) { + function noop() {} + + var r = new R(); + var counter = 0; + r.push(["one"]); + r.push(["two"]); + r.push(["three"]); + r.push(["four"]); + r.push(null); + r._read = noop; + + var w1 = new R(); + w1.write = function (chunk) { + assert.equal(chunk[0], "one"); + w1.emit("close"); + process.nextTick(function () { + r.pipe(w2); + r.pipe(w3); + }) + }; + w1.end = noop; + + r.pipe(w1); + + var expected = ["two", "two", "three", "three", "four", "four"]; + + var w2 = new R(); + w2.write = function (chunk) { + assert.equal(chunk[0], expected.shift()); + assert.equal(counter, 0); + + counter++; + + if (chunk[0] === "four") { + return true; + } + + setTimeout(function () { + counter--; + w2.emit("drain"); + }, 10); + + return false; + } + w2.end = noop; + + var w3 = new R(); + w3.write = function (chunk) { + assert.equal(chunk[0], expected.shift()); + assert.equal(counter, 1); + + counter++; + + if (chunk[0] === "four") { + return true; + } + + setTimeout(function () { + counter--; + w3.emit("drain"); + }, 50); + + return false; + }; + w3.end = function () { + assert.equal(counter, 2); + assert.equal(expected.length, 0); + t.end(); + }; +}); + +test('read(0) for ended streams', function (t) { + var r = new R(); + var written = false; + var ended = false; + r._read = function () {}; + + r.push(new Buffer("foo")); + r.push(null); + + var v = r.read(0); + + assert.equal(v, null); + + var w = new R(); + + w.write = function (buffer) { + written = true; + assert.equal(ended, false); + assert.equal(buffer.toString(), "foo") + }; + + w.end = function () { + ended = true; + assert.equal(written, true); + t.end(); + }; + + r.pipe(w); +}) + +test('sync _read ending', function (t) { + var r = new R(); + var called = false; + r._read = function (n, cb) { + cb(null, null); + }; + + r.once('end', function () { + called = true; + }) + + r.read(); + + process.nextTick(function () { + assert.equal(called, true); + t.end(); + }) +}); + assert.throws(function() { var bad = new R({ highWaterMark: 10, diff --git a/test/simple/test-stream2-objects.js b/test/simple/test-stream2-objects.js new file mode 100644 index 0000000000..5579a00453 --- /dev/null +++ b/test/simple/test-stream2-objects.js @@ -0,0 +1,484 @@ +// Copyright Joyent, Inc. and other Node contributors. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the +// "Software"), to deal in the Software without restriction, including +// without limitation the rights to use, copy, modify, merge, publish, +// distribute, sublicense, and/or sell copies of the Software, and to permit +// persons to whom the Software is furnished to do so, subject to the +// following conditions: +// +// The above copyright notice and this permission notice shall be included +// in all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN +// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR +// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE +// USE OR OTHER DEALINGS IN THE SOFTWARE. + + +var common = require('../common.js'); +var Readable = require('_stream_readable'); +var Writable = require('_stream_writable'); +var assert = require('assert'); + +// tiny node-tap lookalike. +var tests = []; +var count = 0; + +function test(name, fn) { + count++; + tests.push([name, fn]); +} + +function run() { + var next = tests.shift(); + if (!next) + return console.error('ok'); + + var name = next[0]; + var fn = next[1]; + console.log('# %s', name); + fn({ + same: assert.deepEqual, + equal: assert.equal, + end: function() { + count--; + run(); + } + }); +} + +// ensure all tests have run +process.on('exit', function() { + assert.equal(count, 0); +}); + +process.nextTick(run); + +function toArray(callback) { + var stream = new Writable(); + var list = []; + stream.write = function(chunk) { + list.push(chunk); + }; + + stream.end = function() { + callback(list); + }; + + return stream; +} + +function fromArray(list) { + var r = new Readable(); + list.forEach(function(chunk) { + r.push(chunk); + }); + r.push(null); + r._read = noop; + + return r; +} + +function noop() {} + +test('can read objects from stream', function(t) { + var r = fromArray([{ one: '1'}, { two: '2' }]); + + var v1 = r.read(); + var v2 = r.read(); + var v3 = r.read(); + + assert.deepEqual(v1, { one: '1' }); + assert.deepEqual(v2, { two: '2' }); + assert.deepEqual(v3, null); + + t.end(); +}); + +test('can pipe objects into stream', function(t) { + var r = fromArray([{ one: '1'}, { two: '2' }]); + + r.pipe(toArray(function(list) { + assert.deepEqual(list, [ + { one: '1' }, + { two: '2' } + ]); + + t.end(); + })); +}); + +test('read(n) is ignored', function(t) { + var r = fromArray([{ one: '1'}, { two: '2' }]); + + var value = r.read(2); + + assert.deepEqual(value, { one: '1' }); + + t.end(); +}); + +test('can read objects from _read (sync)', function(t) { + var r = new Readable(); + var list = [{ one: '1'}, { two: '2' }]; + r._read = function(n, cb) { + var item = list.shift(); + cb(null, item || null); + }; + + r.pipe(toArray(function(list) { + assert.deepEqual(list, [ + { one: '1' }, + { two: '2' } + ]); + + t.end(); + })); +}); + +test('can read objects from _read (async)', function(t) { + var r = new Readable(); + var list = [{ one: '1'}, { two: '2' }]; + r._read = function(n, cb) { + var item = list.shift(); + process.nextTick(function() { + cb(null, item || null); + }); + }; + + r.pipe(toArray(function(list) { + assert.deepEqual(list, [ + { one: '1' }, + { two: '2' } + ]); + + t.end(); + })); +}); + +test('can read strings as objects', function(t) { + var r = new Readable({ + objectMode: true + }); + r._read = noop; + var list = ['one', 'two', 'three']; + list.forEach(function(str) { + r.push(str); + }); + r.push(null); + + r.pipe(toArray(function(array) { + assert.deepEqual(array, list); + + t.end(); + })); +}); + +test('read(0) for object streams', function(t) { + var r = new Readable({ + objectMode: true + }); + r._read = noop; + + r.push('foobar'); + r.push(null); + + var v = r.read(0); + + r.pipe(toArray(function(array) { + assert.deepEqual(array, ['foobar']); + + t.end(); + })); +}); + +test('falsey values', function(t) { + var r = new Readable({ + objectMode: true + }); + r._read = noop; + + r.push(false); + r.push(0); + r.push(''); + r.push(null); + + r.pipe(toArray(function(array) { + assert.deepEqual(array, [false, 0, '']); + + t.end(); + })); +}); + +test('low watermark _read', function(t) { + var r = new Readable({ + lowWaterMark: 2, + highWaterMark: 6, + objectMode: true + }); + + var calls = 0; + + r._read = function(n, cb) { + calls++; + cb(null, 'foo'); + }; + + // touch to cause it + r.read(0); + + r.push(null); + + r.pipe(toArray(function(list) { + assert.deepEqual(list, ['foo', 'foo', 'foo']); + + t.end(); + })); +}); + +test('high watermark _read', function(t) { + var r = new Readable({ + lowWaterMark: 0, + highWaterMark: 6, + objectMode: true + }); + var calls = 0; + var list = ['1', '2', '3', '4', '5', '6', '7', '8']; + + r._read = function() { + calls++; + }; + + list.forEach(function(c) { + r.push(c); + }); + + var v = r.read(); + + assert.equal(calls, 0); + assert.equal(v, '1'); + + var v2 = r.read(); + + assert.equal(calls, 1); + assert.equal(v2, '2'); + + t.end(); +}); + +test('high watermark push', function(t) { + var r = new Readable({ + highWaterMark: 6, + objectMode: true + }); + r._read = function() {}; + for (var i = 0; i < 6; i++) { + var bool = r.push(i); + assert.equal(bool, i === 5 ? false : true); + } + + t.end(); +}); + +test('low watermark push', function(t) { + var r = new Readable({ + lowWaterMark: 2, + highWaterMark: 4, + objectMode: true + }); + var l = console.log; + + var called = 0; + var reading = false; + + r._read = function() { + called++; + + if (reading) { + assert.equal(r.push(42), false); + } + } + + assert.equal(called, 0); + assert.equal(r.push(0), true); + assert.equal(called, 1); + assert.equal(r.push(1), true); + assert.equal(called, 2); + assert.equal(r.push(2), true); + assert.equal(called, 2); + assert.equal(r.push(3), false); + assert.equal(called, 2); + assert.equal(r.push(4), false); + assert.equal(called, 2); + assert.equal(r.push(5), false); + assert.equal(called, 2); + assert.deepEqual(r._readableState.buffer, [0, 1, 2, 3, 4, 5]); + + reading = true; + + assert.equal(r.read(), 0); + assert.equal(called, 2); + assert.equal(r.read(), 1); + assert.equal(called, 3); + assert.equal(r.read(), 2); + assert.equal(called, 4); + assert.equal(r.read(), 3); + assert.equal(called, 5); + assert.equal(r.read(), 4); + assert.equal(called, 6); + r.push(null); + + r.pipe(toArray(function(array) { + assert.deepEqual(array, [5, 42, 42, 42, 42]); + + t.end(); + })); +}); + +test('stream of buffers converted to object halfway through', function(t) { + var r = new Readable(); + r._read = noop; + + r.push(new Buffer('fus')); + r.push(new Buffer('do')); + r.push(new Buffer('rah')); + + var str = r.read(4); + + assert.equal(str, 'fusd'); + + r.push({ foo: 'bar' }); + r.push(null); + + r.pipe(toArray(function(list) { + assert.deepEqual(list, [ + new Buffer('o'), + new Buffer('rah'), + { foo: 'bar'} + ]); + + t.end(); + })); +}); + +test('stream of strings converted to objects halfway through', function(t) { + var r = new Readable({ + encoding: 'utf8' + }); + r._read = noop; + + r.push('fus'); + r.push('do'); + r.push('rah'); + + var str = r.read(4); + + assert.equal(str, 'fusd'); + + r.push({ foo: 'bar' }); + r.push(null); + + r.pipe(toArray(function(list) { + assert.deepEqual(list, [ + 'o', + 'rah', + { foo: 'bar'} + ]); + + t.end(); + })); +}); + +test('can write objects to stream', function(t) { + var w = new Writable(); + + w._write = function(chunk, cb) { + assert.deepEqual(chunk, { foo: 'bar' }); + cb(); + }; + + w.on('finish', function() { + t.end(); + }); + + w.write({ foo: 'bar' }); + w.end(); +}); + +test('can write multiple objects to stream', function(t) { + var w = new Writable(); + var list = []; + + w._write = function(chunk, cb) { + list.push(chunk); + cb(); + }; + + w.on('finish', function() { + assert.deepEqual(list, [0, 1, 2, 3, 4]); + + t.end(); + }); + + w.write(0); + w.write(1); + w.write(2); + w.write(3); + w.write(4); + w.end(); +}); + +test('can write strings as objects', function(t) { + var w = new Writable({ + objectMode: true + }); + var list = []; + + w._write = function(chunk, cb) { + list.push(chunk); + process.nextTick(cb); + }; + + w.on('finish', function() { + assert.deepEqual(list, ['0', '1', '2', '3', '4']); + + t.end(); + }); + + w.write('0'); + w.write('1'); + w.write('2'); + w.write('3'); + w.write('4'); + w.end(); +}); + +test('buffers finish until cb is called', function(t) { + var w = new Writable({ + objectMode: true + }); + var called = false; + + w._write = function(chunk, cb) { + assert.equal(chunk, 'foo'); + + process.nextTick(function() { + called = true; + cb(); + }); + }; + + w.on('finish', function() { + assert.equal(called, true); + + t.end(); + }); + + w.write('foo'); + w.end(); +}); diff --git a/test/simple/test-stream2-readable-from-list.js b/test/simple/test-stream2-readable-from-list.js index a28fe343ee..7c96ffe00c 100644 --- a/test/simple/test-stream2-readable-from-list.js +++ b/test/simple/test-stream2-readable-from-list.js @@ -25,7 +25,10 @@ var fromList = require('_stream_readable')._fromList; // tiny node-tap lookalike. var tests = []; +var count = 0; + function test(name, fn) { + count++; tests.push([name, fn]); } @@ -40,10 +43,18 @@ function run() { fn({ same: assert.deepEqual, equal: assert.equal, - end: run + end: function () { + count--; + run(); + } }); } +// ensure all tests have run +process.on("exit", function () { + assert.equal(count, 0); +}); + process.nextTick(run); @@ -57,19 +68,19 @@ test('buffers', function(t) { new Buffer('kuel') ]; // read more than the first element. - var ret = fromList(6, list, 16); + var ret = fromList(6, { buffer: list, length: 16 }); t.equal(ret.toString(), 'foogba'); // read exactly the first element. - ret = fromList(2, list, 10); + ret = fromList(2, { buffer: list, length: 10 }); t.equal(ret.toString(), 'rk'); // read less than the first element. - ret = fromList(2, list, 8); + ret = fromList(2, { buffer: list, length: 8 }); t.equal(ret.toString(), 'ba'); // read more than we have. - ret = fromList(100, list, 6); + ret = fromList(100, { buffer: list, length: 6 }); t.equal(ret.toString(), 'zykuel'); // all consumed. @@ -87,19 +98,19 @@ test('strings', function(t) { 'kuel' ]; // read more than the first element. - var ret = fromList(6, list, 16, true); + var ret = fromList(6, { buffer: list, length: 16, decoder: true }); t.equal(ret, 'foogba'); // read exactly the first element. - ret = fromList(2, list, 10, true); + ret = fromList(2, { buffer: list, length: 10, decoder: true }); t.equal(ret, 'rk'); // read less than the first element. - ret = fromList(2, list, 8, true); + ret = fromList(2, { buffer: list, length: 8, decoder: true }); t.equal(ret, 'ba'); // read more than we have. - ret = fromList(100, list, 6, true); + ret = fromList(100, { buffer: list, length: 6, decoder: true }); t.equal(ret, 'zykuel'); // all consumed. diff --git a/test/simple/test-stream2-set-encoding.js b/test/simple/test-stream2-set-encoding.js index a1883fbbb8..3571bac4f1 100644 --- a/test/simple/test-stream2-set-encoding.js +++ b/test/simple/test-stream2-set-encoding.js @@ -27,7 +27,10 @@ var util = require('util'); // tiny node-tap lookalike. var tests = []; +var count = 0; + function test(name, fn) { + count++; tests.push([name, fn]); } @@ -42,10 +45,18 @@ function run() { fn({ same: assert.deepEqual, equal: assert.equal, - end: run + end: function () { + count--; + run(); + } }); } +// ensure all tests have run +process.on("exit", function () { + assert.equal(count, 0); +}); + process.nextTick(run); ///// @@ -77,6 +88,8 @@ TestReader.prototype._read = function(n, cb) { var ret = new Buffer(n); ret.fill('a'); + console.log("cb(null, ret)", ret) + return cb(null, ret); }.bind(this), 1); }; @@ -177,12 +190,14 @@ test('setEncoding hex with read(13)', function(t) { "16161" ]; tr.on('readable', function flow() { + console.log("readable once") var chunk; while (null !== (chunk = tr.read(13))) out.push(chunk); }); tr.on('end', function() { + console.log("END") t.same(out, expect); t.end(); }); diff --git a/test/simple/test-stream2-transform.js b/test/simple/test-stream2-transform.js index 9087080751..0aa620026a 100644 --- a/test/simple/test-stream2-transform.js +++ b/test/simple/test-stream2-transform.js @@ -26,7 +26,10 @@ var Transform = require('_stream_transform'); // tiny node-tap lookalike. var tests = []; +var count = 0; + function test(name, fn) { + count++; tests.push([name, fn]); } @@ -41,10 +44,18 @@ function run() { fn({ same: assert.deepEqual, equal: assert.equal, - end: run + end: function () { + count--; + run(); + } }); } +// ensure all tests have run +process.on("exit", function () { + assert.equal(count, 0); +}); + process.nextTick(run); ///// diff --git a/test/simple/test-stream2-writable.js b/test/simple/test-stream2-writable.js index be40664edf..b9d2be98ae 100644 --- a/test/simple/test-stream2-writable.js +++ b/test/simple/test-stream2-writable.js @@ -48,29 +48,36 @@ for (var i = 0; i < chunks.length; i++) { // tiny node-tap lookalike. var tests = []; +var count = 0; + function test(name, fn) { + count++; tests.push([name, fn]); } function run() { var next = tests.shift(); if (!next) - return console.log('ok'); + return console.error('ok'); var name = next[0]; var fn = next[1]; - - if (!fn) - return run(); - console.log('# %s', name); fn({ same: assert.deepEqual, equal: assert.equal, - end: run + end: function () { + count--; + run(); + } }); } +// ensure all tests have run +process.on("exit", function () { + assert.equal(count, 0); +}); + process.nextTick(run); test('write fast', function(t) {