|
|
@ -741,8 +741,8 @@ the [`'drain'`][] event before writing more data. |
|
|
|
|
|
|
|
To implement any sort of stream, the pattern is the same: |
|
|
|
|
|
|
|
1. Extend the appropriate parent class in your own subclass. (The |
|
|
|
[`util.inherits()`][] method is particularly helpful for this.) |
|
|
|
1. Extend the appropriate parent class in your own subclass via the `extends` |
|
|
|
keyword. |
|
|
|
2. Call the appropriate parent class constructor in your constructor, |
|
|
|
to be sure that the internal mechanisms are set up properly. |
|
|
|
3. Implement one or more specific methods, as detailed below. |
|
|
@ -945,31 +945,31 @@ could wrap the low-level source object by doing something like this: |
|
|
|
// and an `ondata` member that gets called when it has data, and |
|
|
|
// an `onend` member that gets called when the data is over. |
|
|
|
|
|
|
|
util.inherits(SourceWrapper, Readable); |
|
|
|
|
|
|
|
function SourceWrapper(options) { |
|
|
|
Readable.call(this, options); |
|
|
|
class SourceWrapper extends Readable { |
|
|
|
constructor(options) { |
|
|
|
super(options); |
|
|
|
|
|
|
|
this._source = getLowlevelSourceObject(); |
|
|
|
this._source = getLowlevelSourceObject(); |
|
|
|
|
|
|
|
// Every time there's data, we push it into the internal buffer. |
|
|
|
this._source.ondata = (chunk) => { |
|
|
|
// if push() returns false, then we need to stop reading from source |
|
|
|
if (!this.push(chunk)) |
|
|
|
this._source.readStop(); |
|
|
|
}; |
|
|
|
// Every time there's data, we push it into the internal buffer. |
|
|
|
this._source.ondata = (chunk) => { |
|
|
|
// if push() returns false, then we need to stop reading from source |
|
|
|
if (!this.push(chunk)) |
|
|
|
this._source.readStop(); |
|
|
|
}; |
|
|
|
|
|
|
|
// When the source ends, we push the EOF-signaling `null` chunk |
|
|
|
this._source.onend = () => { |
|
|
|
this.push(null); |
|
|
|
}; |
|
|
|
// When the source ends, we push the EOF-signaling `null` chunk |
|
|
|
this._source.onend = () => { |
|
|
|
this.push(null); |
|
|
|
}; |
|
|
|
} |
|
|
|
// _read will be called when the stream wants to pull more data in |
|
|
|
// the advisory size argument is ignored in this case. |
|
|
|
_read(size) { |
|
|
|
this._source.readStart(); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// _read will be called when the stream wants to pull more data in |
|
|
|
// the advisory size argument is ignored in this case. |
|
|
|
SourceWrapper.prototype._read = function(size) { |
|
|
|
this._source.readStart(); |
|
|
|
}; |
|
|
|
``` |
|
|
|
|
|
|
|
#### Example: A Counting Stream |
|
|
@ -981,25 +981,25 @@ from 1 to 1,000,000 in ascending order, and then ends. |
|
|
|
|
|
|
|
```js |
|
|
|
const Readable = require('stream').Readable; |
|
|
|
const util = require('util'); |
|
|
|
util.inherits(Counter, Readable); |
|
|
|
|
|
|
|
function Counter(opt) { |
|
|
|
Readable.call(this, opt); |
|
|
|
this._max = 1000000; |
|
|
|
this._index = 1; |
|
|
|
} |
|
|
|
class Counter extends Readable { |
|
|
|
constructor(opt) { |
|
|
|
super(opt); |
|
|
|
this._max = 1000000; |
|
|
|
this._index = 1; |
|
|
|
} |
|
|
|
|
|
|
|
Counter.prototype._read = function() { |
|
|
|
var i = this._index++; |
|
|
|
if (i > this._max) |
|
|
|
this.push(null); |
|
|
|
else { |
|
|
|
var str = '' + i; |
|
|
|
var buf = Buffer.from(str, 'ascii'); |
|
|
|
this.push(buf); |
|
|
|
_read() { |
|
|
|
var i = this._index++; |
|
|
|
if (i > this._max) |
|
|
|
this.push(null); |
|
|
|
else { |
|
|
|
var str = '' + i; |
|
|
|
var buf = Buffer.from(str, 'ascii'); |
|
|
|
this.push(buf); |
|
|
|
} |
|
|
|
} |
|
|
|
}; |
|
|
|
} |
|
|
|
``` |
|
|
|
|
|
|
|
#### Example: SimpleProtocol v1 (Sub-optimal) |
|
|
@ -1022,94 +1022,90 @@ However, this would be better implemented as a [Transform][] stream. See |
|
|
|
// alternative example below under the Transform section. |
|
|
|
|
|
|
|
const Readable = require('stream').Readable; |
|
|
|
const util = require('util'); |
|
|
|
|
|
|
|
util.inherits(SimpleProtocol, Readable); |
|
|
|
|
|
|
|
function SimpleProtocol(source, options) { |
|
|
|
if (!(this instanceof SimpleProtocol)) |
|
|
|
return new SimpleProtocol(source, options); |
|
|
|
class SimpleProtocol extends Readable { |
|
|
|
constructor(source, options) { |
|
|
|
super(options); |
|
|
|
|
|
|
|
Readable.call(this, options); |
|
|
|
this._inBody = false; |
|
|
|
this._sawFirstCr = false; |
|
|
|
this._inBody = false; |
|
|
|
this._sawFirstCr = false; |
|
|
|
|
|
|
|
// source is a readable stream, such as a socket or file |
|
|
|
this._source = source; |
|
|
|
// source is a readable stream, such as a socket or file |
|
|
|
this._source = source; |
|
|
|
|
|
|
|
source.on('end', () => { |
|
|
|
this.push(null); |
|
|
|
}); |
|
|
|
source.on('end', () => { |
|
|
|
this.push(null); |
|
|
|
}); |
|
|
|
|
|
|
|
// give it a kick whenever the source is readable |
|
|
|
// read(0) will not consume any bytes |
|
|
|
source.on('readable', () => { |
|
|
|
this.read(0); |
|
|
|
}); |
|
|
|
// give it a kick whenever the source is readable |
|
|
|
// read(0) will not consume any bytes |
|
|
|
source.on('readable', () => { |
|
|
|
this.read(0); |
|
|
|
}); |
|
|
|
|
|
|
|
this._rawHeader = []; |
|
|
|
this.header = null; |
|
|
|
} |
|
|
|
this._rawHeader = []; |
|
|
|
this.header = null; |
|
|
|
} |
|
|
|
|
|
|
|
SimpleProtocol.prototype._read = function(n) { |
|
|
|
if (!this._inBody) { |
|
|
|
var chunk = this._source.read(); |
|
|
|
|
|
|
|
// if the source doesn't have data, we don't have data yet. |
|
|
|
if (chunk === null) |
|
|
|
return this.push(''); |
|
|
|
|
|
|
|
// check if the chunk has a \n\n |
|
|
|
var split = -1; |
|
|
|
for (var i = 0; i < chunk.length; i++) { |
|
|
|
if (chunk[i] === 10) { // '\n' |
|
|
|
if (this._sawFirstCr) { |
|
|
|
split = i; |
|
|
|
break; |
|
|
|
_read(n) { |
|
|
|
if (!this._inBody) { |
|
|
|
var chunk = this._source.read(); |
|
|
|
|
|
|
|
// if the source doesn't have data, we don't have data yet. |
|
|
|
if (chunk === null) |
|
|
|
return this.push(''); |
|
|
|
|
|
|
|
// check if the chunk has a \n\n |
|
|
|
var split = -1; |
|
|
|
for (var i = 0; i < chunk.length; i++) { |
|
|
|
if (chunk[i] === 10) { // '\n' |
|
|
|
if (this._sawFirstCr) { |
|
|
|
split = i; |
|
|
|
break; |
|
|
|
} else { |
|
|
|
this._sawFirstCr = true; |
|
|
|
} |
|
|
|
} else { |
|
|
|
this._sawFirstCr = true; |
|
|
|
this._sawFirstCr = false; |
|
|
|
} |
|
|
|
} else { |
|
|
|
this._sawFirstCr = false; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
if (split === -1) { |
|
|
|
// still waiting for the \n\n |
|
|
|
// stash the chunk, and try again. |
|
|
|
this._rawHeader.push(chunk); |
|
|
|
this.push(''); |
|
|
|
} else { |
|
|
|
this._inBody = true; |
|
|
|
var h = chunk.slice(0, split); |
|
|
|
this._rawHeader.push(h); |
|
|
|
var header = Buffer.concat(this._rawHeader).toString(); |
|
|
|
try { |
|
|
|
this.header = JSON.parse(header); |
|
|
|
} catch (er) { |
|
|
|
this.emit('error', new Error('invalid simple protocol data')); |
|
|
|
return; |
|
|
|
if (split === -1) { |
|
|
|
// still waiting for the \n\n |
|
|
|
// stash the chunk, and try again. |
|
|
|
this._rawHeader.push(chunk); |
|
|
|
this.push(''); |
|
|
|
} else { |
|
|
|
this._inBody = true; |
|
|
|
var h = chunk.slice(0, split); |
|
|
|
this._rawHeader.push(h); |
|
|
|
var header = Buffer.concat(this._rawHeader).toString(); |
|
|
|
try { |
|
|
|
this.header = JSON.parse(header); |
|
|
|
} catch (er) { |
|
|
|
this.emit('error', new Error('invalid simple protocol data')); |
|
|
|
return; |
|
|
|
} |
|
|
|
// now, because we got some extra data, unshift the rest |
|
|
|
// back into the read queue so that our consumer will see it. |
|
|
|
var b = chunk.slice(split); |
|
|
|
this.unshift(b); |
|
|
|
// calling unshift by itself does not reset the reading state |
|
|
|
// of the stream; since we're inside _read, doing an additional |
|
|
|
// push('') will reset the state appropriately. |
|
|
|
this.push(''); |
|
|
|
|
|
|
|
// and let them know that we are done parsing the header. |
|
|
|
this.emit('header', this.header); |
|
|
|
} |
|
|
|
// now, because we got some extra data, unshift the rest |
|
|
|
// back into the read queue so that our consumer will see it. |
|
|
|
var b = chunk.slice(split); |
|
|
|
this.unshift(b); |
|
|
|
// calling unshift by itself does not reset the reading state |
|
|
|
// of the stream; since we're inside _read, doing an additional |
|
|
|
// push('') will reset the state appropriately. |
|
|
|
this.push(''); |
|
|
|
|
|
|
|
// and let them know that we are done parsing the header. |
|
|
|
this.emit('header', this.header); |
|
|
|
} else { |
|
|
|
// from there on, just provide the data to our consumer. |
|
|
|
// careful not to push(null), since that would indicate EOF. |
|
|
|
var chunk = this._source.read(); |
|
|
|
if (chunk) this.push(chunk); |
|
|
|
} |
|
|
|
} else { |
|
|
|
// from there on, just provide the data to our consumer. |
|
|
|
// careful not to push(null), since that would indicate EOF. |
|
|
|
var chunk = this._source.read(); |
|
|
|
if (chunk) this.push(chunk); |
|
|
|
} |
|
|
|
}; |
|
|
|
|
|
|
|
} |
|
|
|
// Usage: |
|
|
|
// var parser = new SimpleProtocol(source); |
|
|
|
// Now parser is a readable stream that will emit 'header' |
|
|
@ -1242,66 +1238,63 @@ would be piped into the parser, which is a more idiomatic Node.js stream |
|
|
|
approach. |
|
|
|
|
|
|
|
```javascript |
|
|
|
const util = require('util'); |
|
|
|
const Transform = require('stream').Transform; |
|
|
|
util.inherits(SimpleProtocol, Transform); |
|
|
|
|
|
|
|
function SimpleProtocol(options) { |
|
|
|
if (!(this instanceof SimpleProtocol)) |
|
|
|
return new SimpleProtocol(options); |
|
|
|
class SimpleProtocol extends Transform { |
|
|
|
constructor(options) { |
|
|
|
super(options); |
|
|
|
|
|
|
|
Transform.call(this, options); |
|
|
|
this._inBody = false; |
|
|
|
this._sawFirstCr = false; |
|
|
|
this._rawHeader = []; |
|
|
|
this.header = null; |
|
|
|
} |
|
|
|
this._inBody = false; |
|
|
|
this._sawFirstCr = false; |
|
|
|
this._rawHeader = []; |
|
|
|
this.header = null; |
|
|
|
} |
|
|
|
|
|
|
|
SimpleProtocol.prototype._transform = function(chunk, encoding, done) { |
|
|
|
if (!this._inBody) { |
|
|
|
// check if the chunk has a \n\n |
|
|
|
var split = -1; |
|
|
|
for (var i = 0; i < chunk.length; i++) { |
|
|
|
if (chunk[i] === 10) { // '\n' |
|
|
|
if (this._sawFirstCr) { |
|
|
|
split = i; |
|
|
|
break; |
|
|
|
_transform(chunk, encoding, done) { |
|
|
|
if (!this._inBody) { |
|
|
|
// check if the chunk has a \n\n |
|
|
|
var split = -1; |
|
|
|
for (var i = 0; i < chunk.length; i++) { |
|
|
|
if (chunk[i] === 10) { // '\n' |
|
|
|
if (this._sawFirstCr) { |
|
|
|
split = i; |
|
|
|
break; |
|
|
|
} else { |
|
|
|
this._sawFirstCr = true; |
|
|
|
} |
|
|
|
} else { |
|
|
|
this._sawFirstCr = true; |
|
|
|
this._sawFirstCr = false; |
|
|
|
} |
|
|
|
} else { |
|
|
|
this._sawFirstCr = false; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
if (split === -1) { |
|
|
|
// still waiting for the \n\n |
|
|
|
// stash the chunk, and try again. |
|
|
|
this._rawHeader.push(chunk); |
|
|
|
} else { |
|
|
|
this._inBody = true; |
|
|
|
var h = chunk.slice(0, split); |
|
|
|
this._rawHeader.push(h); |
|
|
|
var header = Buffer.concat(this._rawHeader).toString(); |
|
|
|
try { |
|
|
|
this.header = JSON.parse(header); |
|
|
|
} catch (er) { |
|
|
|
this.emit('error', new Error('invalid simple protocol data')); |
|
|
|
return; |
|
|
|
} |
|
|
|
// and let them know that we are done parsing the header. |
|
|
|
this.emit('header', this.header); |
|
|
|
if (split === -1) { |
|
|
|
// still waiting for the \n\n |
|
|
|
// stash the chunk, and try again. |
|
|
|
this._rawHeader.push(chunk); |
|
|
|
} else { |
|
|
|
this._inBody = true; |
|
|
|
var h = chunk.slice(0, split); |
|
|
|
this._rawHeader.push(h); |
|
|
|
var header = Buffer.concat(this._rawHeader).toString(); |
|
|
|
try { |
|
|
|
this.header = JSON.parse(header); |
|
|
|
} catch (er) { |
|
|
|
this.emit('error', new Error('invalid simple protocol data')); |
|
|
|
return; |
|
|
|
} |
|
|
|
// and let them know that we are done parsing the header. |
|
|
|
this.emit('header', this.header); |
|
|
|
|
|
|
|
// now, because we got some extra data, emit this first. |
|
|
|
this.push(chunk.slice(split)); |
|
|
|
// now, because we got some extra data, emit this first. |
|
|
|
this.push(chunk.slice(split)); |
|
|
|
} |
|
|
|
} else { |
|
|
|
// from there on, just provide the data to our consumer as-is. |
|
|
|
this.push(chunk); |
|
|
|
} |
|
|
|
} else { |
|
|
|
// from there on, just provide the data to our consumer as-is. |
|
|
|
this.push(chunk); |
|
|
|
done(); |
|
|
|
} |
|
|
|
done(); |
|
|
|
}; |
|
|
|
|
|
|
|
} |
|
|
|
// Usage: |
|
|
|
// var parser = new SimpleProtocol(); |
|
|
|
// source.pipe(parser) |
|
|
@ -1636,57 +1629,54 @@ respectively. These options can be used to implement parsers and |
|
|
|
serializers with Transform streams. |
|
|
|
|
|
|
|
```js |
|
|
|
const util = require('util'); |
|
|
|
const StringDecoder = require('string_decoder').StringDecoder; |
|
|
|
const Transform = require('stream').Transform; |
|
|
|
util.inherits(JSONParseStream, Transform); |
|
|
|
|
|
|
|
// Gets \n-delimited JSON string data, and emits the parsed objects |
|
|
|
function JSONParseStream() { |
|
|
|
if (!(this instanceof JSONParseStream)) |
|
|
|
return new JSONParseStream(); |
|
|
|
|
|
|
|
Transform.call(this, { readableObjectMode : true }); |
|
|
|
class JSONParseStream extends Transform { |
|
|
|
constructor() { |
|
|
|
super({ readableObjectMode : true }); |
|
|
|
|
|
|
|
this._buffer = ''; |
|
|
|
this._decoder = new StringDecoder('utf8'); |
|
|
|
} |
|
|
|
this._buffer = ''; |
|
|
|
this._decoder = new StringDecoder('utf8'); |
|
|
|
} |
|
|
|
|
|
|
|
JSONParseStream.prototype._transform = function(chunk, encoding, cb) { |
|
|
|
this._buffer += this._decoder.write(chunk); |
|
|
|
// split on newlines |
|
|
|
var lines = this._buffer.split(/\r?\n/); |
|
|
|
// keep the last partial line buffered |
|
|
|
this._buffer = lines.pop(); |
|
|
|
for (var l = 0; l < lines.length; l++) { |
|
|
|
var line = lines[l]; |
|
|
|
try { |
|
|
|
var obj = JSON.parse(line); |
|
|
|
} catch (er) { |
|
|
|
this.emit('error', er); |
|
|
|
return; |
|
|
|
_transform(chunk, encoding, cb) { |
|
|
|
this._buffer += this._decoder.write(chunk); |
|
|
|
// split on newlines |
|
|
|
var lines = this._buffer.split(/\r?\n/); |
|
|
|
// keep the last partial line buffered |
|
|
|
this._buffer = lines.pop(); |
|
|
|
for (var l = 0; l < lines.length; l++) { |
|
|
|
var line = lines[l]; |
|
|
|
try { |
|
|
|
var obj = JSON.parse(line); |
|
|
|
} catch (er) { |
|
|
|
this.emit('error', er); |
|
|
|
return; |
|
|
|
} |
|
|
|
// push the parsed object out to the readable consumer |
|
|
|
this.push(obj); |
|
|
|
} |
|
|
|
// push the parsed object out to the readable consumer |
|
|
|
this.push(obj); |
|
|
|
cb(); |
|
|
|
} |
|
|
|
cb(); |
|
|
|
}; |
|
|
|
|
|
|
|
JSONParseStream.prototype._flush = function(cb) { |
|
|
|
// Just handle any leftover |
|
|
|
var rem = this._buffer.trim(); |
|
|
|
if (rem) { |
|
|
|
try { |
|
|
|
var obj = JSON.parse(rem); |
|
|
|
} catch (er) { |
|
|
|
this.emit('error', er); |
|
|
|
return; |
|
|
|
_flush(cb) { |
|
|
|
// Just handle any leftover |
|
|
|
var rem = this._buffer.trim(); |
|
|
|
if (rem) { |
|
|
|
try { |
|
|
|
var obj = JSON.parse(rem); |
|
|
|
} catch (er) { |
|
|
|
this.emit('error', er); |
|
|
|
return; |
|
|
|
} |
|
|
|
// push the parsed object out to the readable consumer |
|
|
|
this.push(obj); |
|
|
|
} |
|
|
|
// push the parsed object out to the readable consumer |
|
|
|
this.push(obj); |
|
|
|
cb(); |
|
|
|
} |
|
|
|
cb(); |
|
|
|
}; |
|
|
|
} |
|
|
|
``` |
|
|
|
|
|
|
|
### `stream.read(0)` |
|
|
@ -1739,7 +1729,6 @@ horribly wrong. |
|
|
|
[`stream.unpipe()`]: #stream_readable_unpipe_destination |
|
|
|
[`stream.wrap()`]: #stream_readable_wrap_stream |
|
|
|
[`tls.CryptoStream`]: tls.html#tls_class_cryptostream |
|
|
|
[`util.inherits()`]: util.html#util_util_inherits_constructor_superconstructor |
|
|
|
[API for Stream Consumers]: #stream_api_for_stream_consumers |
|
|
|
[API for Stream Implementors]: #stream_api_for_stream_implementors |
|
|
|
[child process stdin]: child_process.html#child_process_child_stdin |
|
|
|