|
|
|
# Stream
|
|
|
|
|
|
|
|
Stability: 2 - Stable
|
|
|
|
|
|
|
|
A stream is an abstract interface implemented by various objects in
|
|
|
|
Node.js. For example a [request to an HTTP server][http-incoming-message] is a
|
|
|
|
stream, as is [`process.stdout`][]. Streams are readable, writable, or both. All
|
|
|
|
streams are instances of [`EventEmitter`][].
|
|
|
|
|
|
|
|
You can load the Stream base classes by doing `require('stream')`.
|
|
|
|
There are base classes provided for [Readable][] streams, [Writable][]
|
|
|
|
streams, [Duplex][] streams, and [Transform][] streams.
|
|
|
|
|
|
|
|
This document is split up into 3 sections:
|
|
|
|
|
|
|
|
1. The first section explains the parts of the API that you need to be
|
|
|
|
aware of to use streams in your programs.
|
|
|
|
2. The second section explains the parts of the API that you need to
|
|
|
|
use if you implement your own custom streams yourself. The API is designed to
|
|
|
|
make this easy for you to do.
|
|
|
|
3. The third section goes into more depth about how streams work,
|
|
|
|
including some of the internal mechanisms and functions that you
|
|
|
|
should probably not modify unless you definitely know what you are
|
|
|
|
doing.
|
|
|
|
|
|
|
|
|
|
|
|
## API for Stream Consumers
|
|
|
|
|
|
|
|
<!--type=misc-->
|
|
|
|
|
|
|
|
Streams can be either [Readable][], [Writable][], or both ([Duplex][]).
|
|
|
|
|
|
|
|
All streams are EventEmitters, but they also have other custom methods
|
|
|
|
and properties depending on whether they are Readable, Writable, or
|
|
|
|
Duplex.
|
|
|
|
|
|
|
|
If a stream is both Readable and Writable, then it implements all of
|
|
|
|
the methods and events. So, a [Duplex][] or [Transform][] stream is
|
|
|
|
fully described by this API, though their implementation may be
|
|
|
|
somewhat different.
|
|
|
|
|
|
|
|
It is not necessary to implement Stream interfaces in order to consume
|
|
|
|
streams in your programs. If you **are** implementing streaming
|
|
|
|
interfaces in your own program, please also refer to
|
|
|
|
[API for Stream Implementors][].
|
|
|
|
|
|
|
|
Almost all Node.js programs, no matter how simple, use Streams in some
|
|
|
|
way. Here is an example of using Streams in an Node.js program:
|
|
|
|
|
|
|
|
```js
|
|
|
|
const http = require('http');
|
|
|
|
|
|
|
|
var server = http.createServer( (req, res) => {
|
|
|
|
// req is an http.IncomingMessage, which is a Readable Stream
|
|
|
|
// res is an http.ServerResponse, which is a Writable Stream
|
|
|
|
|
|
|
|
var body = '';
|
|
|
|
// we want to get the data as utf8 strings
|
|
|
|
// If you don't set an encoding, then you'll get Buffer objects
|
|
|
|
req.setEncoding('utf8');
|
|
|
|
|
|
|
|
// Readable streams emit 'data' events once a listener is added
|
|
|
|
req.on('data', (chunk) => {
|
|
|
|
body += chunk;
|
|
|
|
});
|
|
|
|
|
|
|
|
// the end event tells you that you have entire body
|
|
|
|
req.on('end', () => {
|
|
|
|
try {
|
|
|
|
var data = JSON.parse(body);
|
|
|
|
} catch (er) {
|
|
|
|
// uh oh! bad json!
|
|
|
|
res.statusCode = 400;
|
|
|
|
return res.end(`error: ${er.message}`);
|
|
|
|
}
|
|
|
|
|
|
|
|
// write back something interesting to the user:
|
|
|
|
res.write(typeof data);
|
|
|
|
res.end();
|
|
|
|
});
|
|
|
|
});
|
|
|
|
|
|
|
|
server.listen(1337);
|
|
|
|
|
|
|
|
// $ curl localhost:1337 -d '{}'
|
|
|
|
// object
|
|
|
|
// $ curl localhost:1337 -d '"foo"'
|
|
|
|
// string
|
|
|
|
// $ curl localhost:1337 -d 'not json'
|
|
|
|
// error: Unexpected token o
|
|
|
|
```
|
|
|
|
|
|
|
|
### Class: stream.Duplex
|
|
|
|
|
|
|
|
Duplex streams are streams that implement both the [Readable][] and
|
|
|
|
[Writable][] interfaces.
|
|
|
|
|
|
|
|
Examples of Duplex streams include:
|
|
|
|
|
|
|
|
* [TCP sockets][]
|
|
|
|
* [zlib streams][zlib]
|
|
|
|
* [crypto streams][crypto]
|
|
|
|
|
|
|
|
### Class: stream.Readable
|
|
|
|
|
|
|
|
<!--type=class-->
|
|
|
|
|
|
|
|
The Readable stream interface is the abstraction for a *source* of
|
|
|
|
data that you are reading from. In other words, data comes *out* of a
|
|
|
|
Readable stream.
|
|
|
|
|
|
|
|
A Readable stream will not start emitting data until you indicate that
|
|
|
|
you are ready to receive it.
|
|
|
|
|
|
|
|
Readable streams have two "modes": a **flowing mode** and a **paused
|
|
|
|
mode**. When in flowing mode, data is read from the underlying system
|
|
|
|
and provided to your program as fast as possible. In paused mode, you
|
|
|
|
must explicitly call [`stream.read()`][stream-read] to get chunks of data out.
|
|
|
|
Streams start out in paused mode.
|
|
|
|
|
|
|
|
**Note**: If no data event handlers are attached, and there are no
|
|
|
|
[`stream.pipe()`][] destinations, and the stream is switched into flowing
|
|
|
|
mode, then data will be lost.
|
|
|
|
|
|
|
|
You can switch to flowing mode by doing any of the following:
|
|
|
|
|
|
|
|
* Adding a [`'data'`][] event handler to listen for data.
|
|
|
|
* Calling the [`stream.resume()`][stream-resume] method to explicitly open the
|
|
|
|
flow.
|
|
|
|
* Calling the [`stream.pipe()`][] method to send the data to a [Writable][].
|
|
|
|
|
|
|
|
You can switch back to paused mode by doing either of the following:
|
|
|
|
|
|
|
|
* If there are no pipe destinations, by calling the
|
|
|
|
[`stream.pause()`][stream-pause] method.
|
|
|
|
* If there are pipe destinations, by removing any [`'data'`][] event
|
|
|
|
handlers, and removing all pipe destinations by calling the
|
|
|
|
[`stream.unpipe()`][] method.
|
|
|
|
|
|
|
|
Note that, for backwards compatibility reasons, removing [`'data'`][]
|
|
|
|
event handlers will **not** automatically pause the stream. Also, if
|
|
|
|
there are piped destinations, then calling [`stream.pause()`][stream-pause] will
|
|
|
|
not guarantee that the stream will *remain* paused once those
|
|
|
|
destinations drain and ask for more data.
|
|
|
|
|
|
|
|
Examples of readable streams include:
|
|
|
|
|
|
|
|
* [HTTP responses, on the client][http-incoming-message]
|
|
|
|
* [HTTP requests, on the server][http-incoming-message]
|
|
|
|
* [fs read streams][]
|
|
|
|
* [zlib streams][zlib]
|
|
|
|
* [crypto streams][crypto]
|
|
|
|
* [TCP sockets][]
|
|
|
|
* [child process stdout and stderr][]
|
|
|
|
* [`process.stdin`][]
|
|
|
|
|
|
|
|
#### Event: 'close'
|
|
|
|
|
|
|
|
Emitted when the stream and any of its underlying resources (a file
|
|
|
|
descriptor, for example) have been closed. The event indicates that
|
|
|
|
no more events will be emitted, and no further computation will occur.
|
|
|
|
|
|
|
|
Not all streams will emit the `'close'` event.
|
|
|
|
|
|
|
|
#### Event: 'data'
|
|
|
|
|
|
|
|
* `chunk` {Buffer|String} The chunk of data.
|
|
|
|
|
|
|
|
Attaching a `'data'` event listener to a stream that has not been
|
|
|
|
explicitly paused will switch the stream into flowing mode. Data will
|
|
|
|
then be passed as soon as it is available.
|
|
|
|
|
|
|
|
If you just want to get all the data out of the stream as fast as
|
|
|
|
possible, this is the best way to do so.
|
|
|
|
|
|
|
|
```js
|
|
|
|
var readable = getReadableStreamSomehow();
|
|
|
|
readable.on('data', (chunk) => {
|
|
|
|
console.log('got %d bytes of data', chunk.length);
|
|
|
|
});
|
|
|
|
```
|
|
|
|
|
|
|
|
#### Event: 'end'
|
|
|
|
|
|
|
|
This event fires when there will be no more data to read.
|
|
|
|
|
|
|
|
Note that the `'end'` event **will not fire** unless the data is
|
|
|
|
completely consumed. This can be done by switching into flowing mode,
|
|
|
|
or by calling [`stream.read()`][stream-read] repeatedly until you get to the
|
|
|
|
end.
|
|
|
|
|
|
|
|
```js
|
|
|
|
var readable = getReadableStreamSomehow();
|
|
|
|
readable.on('data', (chunk) => {
|
|
|
|
console.log('got %d bytes of data', chunk.length);
|
|
|
|
});
|
|
|
|
readable.on('end', () => {
|
|
|
|
console.log('there will be no more data.');
|
|
|
|
});
|
|
|
|
```
|
|
|
|
|
|
|
|
#### Event: 'error'
|
|
|
|
|
|
|
|
* {Error Object}
|
|
|
|
|
|
|
|
Emitted if there was an error receiving data.
|
|
|
|
|
|
|
|
#### Event: 'readable'
|
|
|
|
|
|
|
|
When a chunk of data can be read from the stream, it will emit a
|
|
|
|
`'readable'` event.
|
|
|
|
|
|
|
|
In some cases, listening for a `'readable'` event will cause some data
|
|
|
|
to be read into the internal buffer from the underlying system, if it
|
|
|
|
hadn't already.
|
|
|
|
|
|
|
|
```javascript
|
|
|
|
var readable = getReadableStreamSomehow();
|
|
|
|
readable.on('readable', () => {
|
|
|
|
// there is some data to read now
|
|
|
|
});
|
|
|
|
```
|
|
|
|
|
|
|
|
Once the internal buffer is drained, a `'readable'` event will fire
|
|
|
|
again when more data is available.
|
|
|
|
|
|
|
|
The `'readable'` event is not emitted in the "flowing" mode with the
|
|
|
|
sole exception of the last one, on end-of-stream.
|
|
|
|
|
|
|
|
The `'readable'` event indicates that the stream has new information:
|
|
|
|
either new data is available or the end of the stream has been reached.
|
|
|
|
In the former case, [`stream.read()`][stream-read] will return that data. In the
|
|
|
|
latter case, [`stream.read()`][stream-read] will return null. For instance, in
|
|
|
|
the following example, `foo.txt` is an empty file:
|
|
|
|
|
|
|
|
```js
|
|
|
|
const fs = require('fs');
|
|
|
|
var rr = fs.createReadStream('foo.txt');
|
|
|
|
rr.on('readable', () => {
|
|
|
|
console.log('readable:', rr.read());
|
|
|
|
});
|
|
|
|
rr.on('end', () => {
|
|
|
|
console.log('end');
|
|
|
|
});
|
|
|
|
```
|
|
|
|
|
|
|
|
The output of running this script is:
|
|
|
|
|
|
|
|
```
|
|
|
|
$ node test.js
|
|
|
|
readable: null
|
|
|
|
end
|
|
|
|
```
|
|
|
|
|
|
|
|
#### readable.isPaused()
|
|
|
|
|
|
|
|
* Return: {Boolean}
|
|
|
|
|
|
|
|
This method returns whether or not the `readable` has been **explicitly**
|
|
|
|
paused by client code (using [`stream.pause()`][stream-pause] without a
|
|
|
|
corresponding [`stream.resume()`][stream-resume]).
|
|
|
|
|
|
|
|
```js
|
|
|
|
var readable = new stream.Readable
|
|
|
|
|
|
|
|
readable.isPaused() // === false
|
|
|
|
readable.pause()
|
|
|
|
readable.isPaused() // === true
|
|
|
|
readable.resume()
|
|
|
|
readable.isPaused() // === false
|
|
|
|
```
|
|
|
|
|
|
|
|
#### readable.pause()
|
|
|
|
|
|
|
|
* Return: `this`
|
|
|
|
|
|
|
|
This method will cause a stream in flowing mode to stop emitting
|
|
|
|
[`'data'`][] events, switching out of flowing mode. Any data that becomes
|
|
|
|
available will remain in the internal buffer.
|
|
|
|
|
|
|
|
```js
|
|
|
|
var readable = getReadableStreamSomehow();
|
|
|
|
readable.on('data', (chunk) => {
|
|
|
|
console.log('got %d bytes of data', chunk.length);
|
|
|
|
readable.pause();
|
|
|
|
console.log('there will be no more data for 1 second');
|
|
|
|
setTimeout(() => {
|
|
|
|
console.log('now data will start flowing again');
|
|
|
|
readable.resume();
|
|
|
|
}, 1000);
|
|
|
|
});
|
|
|
|
```
|
|
|
|
|
|
|
|
#### readable.pipe(destination[, options])
|
|
|
|
|
|
|
|
* `destination` {stream.Writable} The destination for writing data
|
|
|
|
* `options` {Object} Pipe options
|
|
|
|
* `end` {Boolean} End the writer when the reader ends. Default = `true`
|
|
|
|
|
|
|
|
This method pulls all the data out of a readable stream, and writes it
|
|
|
|
to the supplied destination, automatically managing the flow so that
|
|
|
|
the destination is not overwhelmed by a fast readable stream.
|
|
|
|
|
|
|
|
Multiple destinations can be piped to safely.
|
|
|
|
|
|
|
|
```js
|
|
|
|
var readable = getReadableStreamSomehow();
|
|
|
|
var writable = fs.createWriteStream('file.txt');
|
|
|
|
// All the data from readable goes into 'file.txt'
|
|
|
|
readable.pipe(writable);
|
|
|
|
```
|
|
|
|
|
|
|
|
This function returns the destination stream, so you can set up pipe
|
|
|
|
chains like so:
|
|
|
|
|
|
|
|
```js
|
|
|
|
var r = fs.createReadStream('file.txt');
|
|
|
|
var z = zlib.createGzip();
|
|
|
|
var w = fs.createWriteStream('file.txt.gz');
|
|
|
|
r.pipe(z).pipe(w);
|
|
|
|
```
|
|
|
|
|
|
|
|
For example, emulating the Unix `cat` command:
|
|
|
|
|
|
|
|
```js
|
|
|
|
process.stdin.pipe(process.stdout);
|
|
|
|
```
|
|
|
|
|
|
|
|
By default [`stream.end()`][stream-end] is called on the destination when the
|
|
|
|
source stream emits [`'end'`][], so that `destination` is no longer writable.
|
|
|
|
Pass `{ end: false }` as `options` to keep the destination stream open.
|
|
|
|
|
|
|
|
This keeps `writer` open so that "Goodbye" can be written at the
|
|
|
|
end.
|
|
|
|
|
|
|
|
```js
|
|
|
|
reader.pipe(writer, { end: false });
|
|
|
|
reader.on('end', () => {
|
|
|
|
writer.end('Goodbye\n');
|
|
|
|
});
|
|
|
|
```
|
|
|
|
|
|
|
|
Note that [`process.stderr`][] and [`process.stdout`][] are never closed until
|
|
|
|
the process exits, regardless of the specified options.
|
|
|
|
|
|
|
|
#### readable.read([size])
|
|
|
|
|
|
|
|
* `size` {Number} Optional argument to specify how much data to read.
|
|
|
|
* Return {String|Buffer|Null}
|
|
|
|
|
|
|
|
The `read()` method pulls some data out of the internal buffer and
|
|
|
|
returns it. If there is no data available, then it will return
|
|
|
|
`null`.
|
|
|
|
|
|
|
|
If you pass in a `size` argument, then it will return that many
|
|
|
|
bytes. If `size` bytes are not available, then it will return `null`,
|
|
|
|
unless we've ended, in which case it will return the data remaining
|
|
|
|
in the buffer.
|
|
|
|
|
|
|
|
If you do not specify a `size` argument, then it will return all the
|
|
|
|
data in the internal buffer.
|
|
|
|
|
|
|
|
This method should only be called in paused mode. In flowing mode,
|
|
|
|
this method is called automatically until the internal buffer is
|
|
|
|
drained.
|
|
|
|
|
|
|
|
```js
|
|
|
|
var readable = getReadableStreamSomehow();
|
|
|
|
readable.on('readable', () => {
|
|
|
|
var chunk;
|
|
|
|
while (null !== (chunk = readable.read())) {
|
|
|
|
console.log('got %d bytes of data', chunk.length);
|
|
|
|
}
|
|
|
|
});
|
|
|
|
```
|
|
|
|
|
|
|
|
If this method returns a data chunk, then it will also trigger the
|
|
|
|
emission of a [`'data'`][] event.
|
|
|
|
|
|
|
|
Note that calling [`stream.read([size])`][stream-read] after the [`'end'`][]
|
|
|
|
event has been triggered will return `null`. No runtime error will be raised.
|
|
|
|
|
|
|
|
#### readable.resume()
|
|
|
|
|
|
|
|
* Return: `this`
|
|
|
|
|
|
|
|
This method will cause the readable stream to resume emitting [`'data'`][]
|
|
|
|
events.
|
|
|
|
|
|
|
|
This method will switch the stream into flowing mode. If you do *not*
|
|
|
|
want to consume the data from a stream, but you *do* want to get to
|
|
|
|
its [`'end'`][] event, you can call [`stream.resume()`][stream-resume] to open
|
|
|
|
the flow of data.
|
|
|
|
|
|
|
|
```js
|
|
|
|
var readable = getReadableStreamSomehow();
|
|
|
|
readable.resume();
|
|
|
|
readable.on('end', () => {
|
|
|
|
console.log('got to the end, but did not read anything');
|
|
|
|
});
|
|
|
|
```
|
|
|
|
|
|
|
|
#### readable.setEncoding(encoding)
|
|
|
|
|
|
|
|
* `encoding` {String} The encoding to use.
|
|
|
|
* Return: `this`
|
|
|
|
|
|
|
|
Call this function to cause the stream to return strings of the specified
|
|
|
|
encoding instead of Buffer objects. For example, if you do
|
|
|
|
`readable.setEncoding('utf8')`, then the output data will be interpreted as
|
|
|
|
UTF-8 data, and returned as strings. If you do `readable.setEncoding('hex')`,
|
|
|
|
then the data will be encoded in hexadecimal string format.
|
|
|
|
|
|
|
|
This properly handles multi-byte characters that would otherwise be
|
|
|
|
potentially mangled if you simply pulled the Buffers directly and
|
|
|
|
called [`buf.toString(encoding)`][] on them. If you want to read the data
|
|
|
|
as strings, always use this method.
|
|
|
|
|
|
|
|
Also you can disable any encoding at all with `readable.setEncoding(null)`.
|
|
|
|
This approach is very useful if you deal with binary data or with large
|
|
|
|
multi-byte strings spread out over multiple chunks.
|
|
|
|
|
|
|
|
```js
|
|
|
|
var readable = getReadableStreamSomehow();
|
|
|
|
readable.setEncoding('utf8');
|
|
|
|
readable.on('data', (chunk) => {
|
|
|
|
assert.equal(typeof chunk, 'string');
|
|
|
|
console.log('got %d characters of string data', chunk.length);
|
|
|
|
});
|
|
|
|
```
|
|
|
|
|
|
|
|
#### readable.unpipe([destination])
|
|
|
|
|
|
|
|
* `destination` {stream.Writable} Optional specific stream to unpipe
|
|
|
|
|
|
|
|
This method will remove the hooks set up for a previous [`stream.pipe()`][]
|
|
|
|
call.
|
|
|
|
|
|
|
|
If the destination is not specified, then all pipes are removed.
|
|
|
|
|
|
|
|
If the destination is specified, but no pipe is set up for it, then
|
|
|
|
this is a no-op.
|
|
|
|
|
|
|
|
```js
|
|
|
|
var readable = getReadableStreamSomehow();
|
|
|
|
var writable = fs.createWriteStream('file.txt');
|
|
|
|
// All the data from readable goes into 'file.txt',
|
|
|
|
// but only for the first second
|
|
|
|
readable.pipe(writable);
|
|
|
|
setTimeout(() => {
|
|
|
|
console.log('stop writing to file.txt');
|
|
|
|
readable.unpipe(writable);
|
|
|
|
console.log('manually close the file stream');
|
|
|
|
writable.end();
|
|
|
|
}, 1000);
|
|
|
|
```
|
|
|
|
|
|
|
|
#### readable.unshift(chunk)
|
|
|
|
|
|
|
|
* `chunk` {Buffer|String} Chunk of data to unshift onto the read queue
|
|
|
|
|
|
|
|
This is useful in certain cases where a stream is being consumed by a
|
|
|
|
parser, which needs to "un-consume" some data that it has
|
|
|
|
optimistically pulled out of the source, so that the stream can be
|
|
|
|
passed on to some other party.
|
|
|
|
|
|
|
|
Note that `stream.unshift(chunk)` cannot be called after the [`'end'`][] event
|
|
|
|
has been triggered; a runtime error will be raised.
|
|
|
|
|
|
|
|
If you find that you must often call `stream.unshift(chunk)` in your
|
|
|
|
programs, consider implementing a [Transform][] stream instead. (See [API
|
|
|
|
for Stream Implementors][].)
|
|
|
|
|
|
|
|
```js
|
|
|
|
// Pull off a header delimited by \n\n
|
|
|
|
// use unshift() if we get too much
|
|
|
|
// Call the callback with (error, header, stream)
|
|
|
|
const StringDecoder = require('string_decoder').StringDecoder;
|
|
|
|
function parseHeader(stream, callback) {
|
|
|
|
stream.on('error', callback);
|
|
|
|
stream.on('readable', onReadable);
|
|
|
|
var decoder = new StringDecoder('utf8');
|
|
|
|
var header = '';
|
|
|
|
function onReadable() {
|
|
|
|
var chunk;
|
|
|
|
while (null !== (chunk = stream.read())) {
|
|
|
|
var str = decoder.write(chunk);
|
|
|
|
if (str.match(/\n\n/)) {
|
|
|
|
// found the header boundary
|
|
|
|
var split = str.split(/\n\n/);
|
|
|
|
header += split.shift();
|
|
|
|
var remaining = split.join('\n\n');
|
|
|
|
var buf = new Buffer(remaining, 'utf8');
|
|
|
|
if (buf.length)
|
|
|
|
stream.unshift(buf);
|
|
|
|
stream.removeListener('error', callback);
|
|
|
|
stream.removeListener('readable', onReadable);
|
|
|
|
// now the body of the message can be read from the stream.
|
|
|
|
callback(null, header, stream);
|
|
|
|
} else {
|
|
|
|
// still reading the header.
|
|
|
|
header += str;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
```
|
|
|
|
|
|
|
|
Note that, unlike [`stream.push(chunk)`][stream-push], `stream.unshift(chunk)`
|
|
|
|
will not end the reading process by resetting the internal reading state of the
|
|
|
|
stream. This can cause unexpected results if `unshift()` is called during a
|
|
|
|
read (i.e. from within a [`stream._read()`][stream-_read] implementation on a
|
|
|
|
custom stream). Following the call to `unshift()` with an immediate
|
|
|
|
[`stream.push('')`][stream-push] will reset the reading state appropriately,
|
|
|
|
however it is best to simply avoid calling `unshift()` while in the process of
|
|
|
|
performing a read.
|
|
|
|
|
|
|
|
#### readable.wrap(stream)
|
|
|
|
|
|
|
|
* `stream` {Stream} An "old style" readable stream
|
|
|
|
|
|
|
|
Versions of Node.js prior to v0.10 had streams that did not implement the
|
|
|
|
entire Streams API as it is today. (See [Compatibility][] for
|
|
|
|
more information.)
|
|
|
|
|
|
|
|
If you are using an older Node.js library that emits [`'data'`][] events and
|
|
|
|
has a [`stream.pause()`][stream-pause] method that is advisory only, then you
|
|
|
|
can use the `wrap()` method to create a [Readable][] stream that uses the old
|
|
|
|
stream as its data source.
|
|
|
|
|
|
|
|
You will very rarely ever need to call this function, but it exists
|
|
|
|
as a convenience for interacting with old Node.js programs and libraries.
|
|
|
|
|
|
|
|
For example:
|
|
|
|
|
|
|
|
```js
|
|
|
|
const OldReader = require('./old-api-module.js').OldReader;
|
|
|
|
const Readable = require('stream').Readable;
|
|
|
|
const oreader = new OldReader;
|
|
|
|
const myReader = new Readable().wrap(oreader);
|
|
|
|
|
|
|
|
myReader.on('readable', () => {
|
|
|
|
myReader.read(); // etc.
|
|
|
|
});
|
|
|
|
```
|
|
|
|
|
|
|
|
### Class: stream.Transform
|
|
|
|
|
|
|
|
Transform streams are [Duplex][] streams where the output is in some way
|
|
|
|
computed from the input. They implement both the [Readable][] and
|
|
|
|
[Writable][] interfaces.
|
|
|
|
|
|
|
|
Examples of Transform streams include:
|
|
|
|
|
|
|
|
* [zlib streams][zlib]
|
|
|
|
* [crypto streams][crypto]
|
|
|
|
|
|
|
|
### Class: stream.Writable
|
|
|
|
|
|
|
|
<!--type=class-->
|
|
|
|
|
|
|
|
The Writable stream interface is an abstraction for a *destination*
|
|
|
|
that you are writing data *to*.
|
|
|
|
|
|
|
|
Examples of writable streams include:
|
|
|
|
|
|
|
|
* [HTTP requests, on the client][]
|
|
|
|
* [HTTP responses, on the server][]
|
|
|
|
* [fs write streams][]
|
|
|
|
* [zlib streams][zlib]
|
|
|
|
* [crypto streams][crypto]
|
|
|
|
* [TCP sockets][]
|
|
|
|
* [child process stdin][]
|
|
|
|
* [`process.stdout`][], [`process.stderr`][]
|
|
|
|
|
|
|
|
#### Event: 'drain'
|
|
|
|
|
|
|
|
If a [`stream.write(chunk)`][stream-write] call returns `false`, then the
|
|
|
|
`'drain'` event will indicate when it is appropriate to begin writing more data
|
|
|
|
to the stream.
|
|
|
|
|
|
|
|
```js
|
|
|
|
// Write the data to the supplied writable stream one million times.
|
|
|
|
// Be attentive to back-pressure.
|
|
|
|
function writeOneMillionTimes(writer, data, encoding, callback) {
|
|
|
|
var i = 1000000;
|
|
|
|
write();
|
|
|
|
function write() {
|
|
|
|
var ok = true;
|
|
|
|
do {
|
|
|
|
i -= 1;
|
|
|
|
if (i === 0) {
|
|
|
|
// last time!
|
|
|
|
writer.write(data, encoding, callback);
|
|
|
|
} else {
|
|
|
|
// see if we should continue, or wait
|
|
|
|
// don't pass the callback, because we're not done yet.
|
|
|
|
ok = writer.write(data, encoding);
|
|
|
|
}
|
|
|
|
} while (i > 0 && ok);
|
|
|
|
if (i > 0) {
|
|
|
|
// had to stop early!
|
|
|
|
// write some more once it drains
|
|
|
|
writer.once('drain', write);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
```
|
|
|
|
|
|
|
|
#### Event: 'error'
|
|
|
|
|
|
|
|
* {Error}
|
|
|
|
|
|
|
|
Emitted if there was an error when writing or piping data.
|
|
|
|
|
|
|
|
#### Event: 'finish'
|
|
|
|
|
|
|
|
When the [`stream.end()`][stream-end] method has been called, and all data has
|
|
|
|
been flushed to the underlying system, this event is emitted.
|
|
|
|
|
|
|
|
```javascript
|
|
|
|
var writer = getWritableStreamSomehow();
|
|
|
|
for (var i = 0; i < 100; i ++) {
|
|
|
|
writer.write('hello, #${i}!\n');
|
|
|
|
}
|
|
|
|
writer.end('this is the end\n');
|
|
|
|
writer.on('finish', () => {
|
|
|
|
console.error('all writes are now complete.');
|
|
|
|
});
|
|
|
|
```
|
|
|
|
|
|
|
|
#### Event: 'pipe'
|
|
|
|
|
|
|
|
* `src` {stream.Readable} source stream that is piping to this writable
|
|
|
|
|
|
|
|
This is emitted whenever the [`stream.pipe()`][] method is called on a readable
|
|
|
|
stream, adding this writable to its set of destinations.
|
|
|
|
|
|
|
|
```js
|
|
|
|
var writer = getWritableStreamSomehow();
|
|
|
|
var reader = getReadableStreamSomehow();
|
|
|
|
writer.on('pipe', (src) => {
|
|
|
|
console.error('something is piping into the writer');
|
|
|
|
assert.equal(src, reader);
|
|
|
|
});
|
|
|
|
reader.pipe(writer);
|
|
|
|
```
|
|
|
|
|
|
|
|
#### Event: 'unpipe'
|
|
|
|
|
|
|
|
* `src` {[Readable][] Stream} The source stream that
|
|
|
|
[unpiped][`stream.unpipe()`] this writable
|
|
|
|
|
|
|
|
This is emitted whenever the [`stream.unpipe()`][] method is called on a
|
|
|
|
readable stream, removing this writable from its set of destinations.
|
|
|
|
|
|
|
|
```js
|
|
|
|
var writer = getWritableStreamSomehow();
|
|
|
|
var reader = getReadableStreamSomehow();
|
|
|
|
writer.on('unpipe', (src) => {
|
|
|
|
console.error('something has stopped piping into the writer');
|
|
|
|
assert.equal(src, reader);
|
|
|
|
});
|
|
|
|
reader.pipe(writer);
|
|
|
|
reader.unpipe(writer);
|
|
|
|
```
|
|
|
|
|
|
|
|
#### writable.cork()
|
|
|
|
|
|
|
|
Forces buffering of all writes.
|
|
|
|
|
|
|
|
Buffered data will be flushed either at [`stream.uncork()`][] or at
|
|
|
|
[`stream.end()`][stream-end] call.
|
|
|
|
|
|
|
|
#### writable.end([chunk][, encoding][, callback])
|
|
|
|
|
|
|
|
* `chunk` {String|Buffer} Optional data to write
|
|
|
|
* `encoding` {String} The encoding, if `chunk` is a String
|
|
|
|
* `callback` {Function} Optional callback for when the stream is finished
|
|
|
|
|
|
|
|
Call this method when no more data will be written to the stream. If supplied,
|
|
|
|
the callback is attached as a listener on the [`'finish'`][] event.
|
|
|
|
|
|
|
|
Calling [`stream.write()`][stream-write] after calling
|
|
|
|
[`stream.end()`][stream-end] will raise an error.
|
|
|
|
|
|
|
|
```js
|
|
|
|
// write 'hello, ' and then end with 'world!'
|
|
|
|
var file = fs.createWriteStream('example.txt');
|
|
|
|
file.write('hello, ');
|
|
|
|
file.end('world!');
|
|
|
|
// writing more now is not allowed!
|
|
|
|
```
|
|
|
|
|
|
|
|
#### writable.setDefaultEncoding(encoding)
|
|
|
|
|
|
|
|
* `encoding` {String} The new default encoding
|
|
|
|
|
|
|
|
Sets the default encoding for a writable stream.
|
|
|
|
|
|
|
|
#### writable.uncork()
|
|
|
|
|
|
|
|
Flush all data, buffered since [`stream.cork()`][] call.
|
|
|
|
|
|
|
|
#### writable.write(chunk[, encoding][, callback])
|
|
|
|
|
|
|
|
* `chunk` {String|Buffer} The data to write
|
|
|
|
* `encoding` {String} The encoding, if `chunk` is a String
|
|
|
|
* `callback` {Function} Callback for when this chunk of data is flushed
|
|
|
|
* Returns: {Boolean} `true` if the data was handled completely.
|
|
|
|
|
|
|
|
This method writes some data to the underlying system, and calls the
|
|
|
|
supplied callback once the data has been fully handled.
|
|
|
|
|
|
|
|
The return value indicates if you should continue writing right now.
|
|
|
|
If the data had to be buffered internally, then it will return
|
|
|
|
`false`. Otherwise, it will return `true`.
|
|
|
|
|
|
|
|
This return value is strictly advisory. You MAY continue to write,
|
|
|
|
even if it returns `false`. However, writes will be buffered in
|
|
|
|
memory, so it is best not to do this excessively. Instead, wait for
|
|
|
|
the [`'drain'`][] event before writing more data.
|
|
|
|
|
|
|
|
|
|
|
|
## API for Stream Implementors
|
|
|
|
|
|
|
|
<!--type=misc-->
|
|
|
|
|
|
|
|
To implement any sort of stream, the pattern is the same:
|
|
|
|
|
|
|
|
1. Extend the appropriate parent class in your own subclass. (The
|
|
|
|
[`util.inherits()`][] method is particularly helpful for this.)
|
|
|
|
2. Call the appropriate parent class constructor in your constructor,
|
|
|
|
to be sure that the internal mechanisms are set up properly.
|
|
|
|
3. Implement one or more specific methods, as detailed below.
|
|
|
|
|
|
|
|
The class to extend and the method(s) to implement depend on the sort
|
|
|
|
of stream class you are writing:
|
|
|
|
|
|
|
|
<table>
|
|
|
|
<thead>
|
|
|
|
<tr>
|
|
|
|
<th>
|
|
|
|
<p>Use-case</p>
|
|
|
|
</th>
|
|
|
|
<th>
|
|
|
|
<p>Class</p>
|
|
|
|
</th>
|
|
|
|
<th>
|
|
|
|
<p>Method(s) to implement</p>
|
|
|
|
</th>
|
|
|
|
</tr>
|
|
|
|
</thead>
|
|
|
|
<tr>
|
|
|
|
<td>
|
|
|
|
<p>Reading only</p>
|
|
|
|
</td>
|
|
|
|
<td>
|
|
|
|
<p>[Readable](#stream_class_stream_readable_1)</p>
|
|
|
|
</td>
|
|
|
|
<td>
|
|
|
|
<p><code>[_read][stream-_read]</code></p>
|
|
|
|
</td>
|
|
|
|
</tr>
|
|
|
|
<tr>
|
|
|
|
<td>
|
|
|
|
<p>Writing only</p>
|
|
|
|
</td>
|
|
|
|
<td>
|
|
|
|
<p>[Writable](#stream_class_stream_writable_1)</p>
|
|
|
|
</td>
|
|
|
|
<td>
|
|
|
|
<p><code>[_write][stream-_write]</code>, <code>[_writev][stream-_writev]</code></p>
|
|
|
|
</td>
|
|
|
|
</tr>
|
|
|
|
<tr>
|
|
|
|
<td>
|
|
|
|
<p>Reading and writing</p>
|
|
|
|
</td>
|
|
|
|
<td>
|
|
|
|
<p>[Duplex](#stream_class_stream_duplex_1)</p>
|
|
|
|
</td>
|
|
|
|
<td>
|
|
|
|
<p><code>[_read][stream-_read]</code>, <code>[_write][stream-_write]</code>, <code>[_writev][stream-_writev]</code></p>
|
|
|
|
</td>
|
|
|
|
</tr>
|
|
|
|
<tr>
|
|
|
|
<td>
|
|
|
|
<p>Operate on written data, then read the result</p>
|
|
|
|
</td>
|
|
|
|
<td>
|
|
|
|
<p>[Transform](#stream_class_stream_transform_1)</p>
|
|
|
|
</td>
|
|
|
|
<td>
|
|
|
|
<p><code>[_transform][stream-_transform]</code>, <code>[_flush][stream-_flush]</code></p>
|
|
|
|
</td>
|
|
|
|
</tr>
|
|
|
|
</table>
|
|
|
|
|
|
|
|
In your implementation code, it is very important to never call the methods
|
|
|
|
described in [API for Stream Consumers][]. Otherwise, you can potentially cause
|
|
|
|
adverse side effects in programs that consume your streaming interfaces.
|
|
|
|
|
|
|
|
### Class: stream.Duplex
|
|
|
|
|
|
|
|
<!--type=class-->
|
|
|
|
|
|
|
|
A "duplex" stream is one that is both Readable and Writable, such as a TCP
|
|
|
|
socket connection.
|
|
|
|
|
|
|
|
Note that `stream.Duplex` is an abstract class designed to be extended
|
|
|
|
with an underlying implementation of the [`stream._read(size)`][stream-_read]
|
|
|
|
and [`stream._write(chunk, encoding, callback)`][stream-_write] methods as you
|
|
|
|
would with a Readable or Writable stream class.
|
|
|
|
|
|
|
|
Since JavaScript doesn't have multiple prototypal inheritance, this class
|
|
|
|
prototypally inherits from Readable, and then parasitically from Writable. It is
|
|
|
|
thus up to the user to implement both the low-level
|
|
|
|
[`stream._read(n)`][stream-_read] method as well as the low-level
|
|
|
|
[`stream._write(chunk, encoding, callback)`][stream-_write] method on extension
|
|
|
|
duplex classes.
|
|
|
|
|
|
|
|
#### new stream.Duplex(options)
|
|
|
|
|
|
|
|
* `options` {Object} Passed to both Writable and Readable
|
|
|
|
constructors. Also has the following fields:
|
|
|
|
* `allowHalfOpen` {Boolean} Default = `true`. If set to `false`, then
|
|
|
|
the stream will automatically end the readable side when the
|
|
|
|
writable side ends and vice versa.
|
|
|
|
* `readableObjectMode` {Boolean} Default = `false`. Sets `objectMode`
|
|
|
|
for readable side of the stream. Has no effect if `objectMode`
|
|
|
|
is `true`.
|
|
|
|
* `writableObjectMode` {Boolean} Default = `false`. Sets `objectMode`
|
|
|
|
for writable side of the stream. Has no effect if `objectMode`
|
|
|
|
is `true`.
|
|
|
|
|
|
|
|
In classes that extend the Duplex class, make sure to call the
|
|
|
|
constructor so that the buffering settings can be properly
|
|
|
|
initialized.
|
|
|
|
|
|
|
|
### Class: stream.PassThrough
|
|
|
|
|
|
|
|
This is a trivial implementation of a [Transform][] stream that simply
|
|
|
|
passes the input bytes across to the output. Its purpose is mainly
|
|
|
|
for examples and testing, but there are occasionally use cases where
|
|
|
|
it can come in handy as a building block for novel sorts of streams.
|
|
|
|
|
|
|
|
### Class: stream.Readable
|
|
|
|
|
|
|
|
<!--type=class-->
|
|
|
|
|
|
|
|
`stream.Readable` is an abstract class designed to be extended with an
|
|
|
|
underlying implementation of the [`stream._read(size)`][stream-_read] method.
|
|
|
|
|
|
|
|
Please see [API for Stream Consumers][] for how to consume
|
|
|
|
streams in your programs. What follows is an explanation of how to
|
|
|
|
implement Readable streams in your programs.
|
|
|
|
|
|
|
|
#### new stream.Readable([options])
|
|
|
|
|
|
|
|
* `options` {Object}
|
|
|
|
* `highWaterMark` {Number} The maximum number of bytes to store in
|
|
|
|
the internal buffer before ceasing to read from the underlying
|
|
|
|
resource. Default = `16384` (16kb), or `16` for `objectMode` streams
|
|
|
|
* `encoding` {String} If specified, then buffers will be decoded to
|
|
|
|
strings using the specified encoding. Default = `null`
|
|
|
|
* `objectMode` {Boolean} Whether this stream should behave
|
|
|
|
as a stream of objects. Meaning that [`stream.read(n)`][stream-read] returns
|
|
|
|
a single value instead of a Buffer of size n. Default = `false`
|
|
|
|
* `read` {Function} Implementation for the [`stream._read()`][stream-_read]
|
|
|
|
method.
|
|
|
|
|
|
|
|
In classes that extend the Readable class, make sure to call the
|
|
|
|
Readable constructor so that the buffering settings can be properly
|
|
|
|
initialized.
|
|
|
|
|
|
|
|
#### readable.\_read(size)
|
|
|
|
|
|
|
|
* `size` {Number} Number of bytes to read asynchronously
|
|
|
|
|
|
|
|
Note: **Implement this method, but do NOT call it directly.**
|
|
|
|
|
|
|
|
This method is prefixed with an underscore because it is internal to the
|
|
|
|
class that defines it and should only be called by the internal Readable
|
|
|
|
class methods. All Readable stream implementations must provide a \_read
|
|
|
|
method to fetch data from the underlying resource.
|
|
|
|
|
|
|
|
When `_read()` is called, if data is available from the resource, the `_read()`
|
|
|
|
implementation should start pushing that data into the read queue by calling
|
|
|
|
[`this.push(dataChunk)`][stream-push]. `_read()` should continue reading from
|
|
|
|
the resource and pushing data until push returns `false`, at which point it
|
|
|
|
should stop reading from the resource. Only when `_read()` is called again after
|
|
|
|
it has stopped should it start reading more data from the resource and pushing
|
|
|
|
that data onto the queue.
|
|
|
|
|
|
|
|
Note: once the `_read()` method is called, it will not be called again until
|
|
|
|
the [`stream.push()`][stream-push] method is called.
|
|
|
|
|
|
|
|
The `size` argument is advisory. Implementations where a "read" is a
|
|
|
|
single call that returns data can use this to know how much data to
|
|
|
|
fetch. Implementations where that is not relevant, such as TCP or
|
|
|
|
TLS, may ignore this argument, and simply provide data whenever it
|
|
|
|
becomes available. There is no need, for example to "wait" until
|
|
|
|
`size` bytes are available before calling [`stream.push(chunk)`][stream-push].
|
|
|
|
|
|
|
|
#### readable.push(chunk[, encoding])
|
|
|
|
|
|
|
|
|
|
|
|
* `chunk` {Buffer|Null|String} Chunk of data to push into the read queue
|
|
|
|
* `encoding` {String} Encoding of String chunks. Must be a valid
|
|
|
|
Buffer encoding, such as `'utf8'` or `'ascii'`
|
|
|
|
* return {Boolean} Whether or not more pushes should be performed
|
|
|
|
|
|
|
|
Note: **This method should be called by Readable implementors, NOT
|
|
|
|
by consumers of Readable streams.**
|
|
|
|
|
|
|
|
If a value other than null is passed, The `push()` method adds a chunk of data
|
|
|
|
into the queue for subsequent stream processors to consume. If `null` is
|
|
|
|
passed, it signals the end of the stream (EOF), after which no more data
|
|
|
|
can be written.
|
|
|
|
|
|
|
|
The data added with `push()` can be pulled out by calling the
|
|
|
|
[`stream.read()`][stream-read] method when the [`'readable'`][] event fires.
|
|
|
|
|
|
|
|
This API is designed to be as flexible as possible. For example,
|
|
|
|
you may be wrapping a lower-level source which has some sort of
|
|
|
|
pause/resume mechanism, and a data callback. In those cases, you
|
|
|
|
could wrap the low-level source object by doing something like this:
|
|
|
|
|
|
|
|
```js
|
|
|
|
// source is an object with readStop() and readStart() methods,
|
|
|
|
// and an `ondata` member that gets called when it has data, and
|
|
|
|
// an `onend` member that gets called when the data is over.
|
|
|
|
|
|
|
|
util.inherits(SourceWrapper, Readable);
|
|
|
|
|
|
|
|
function SourceWrapper(options) {
|
|
|
|
Readable.call(this, options);
|
|
|
|
|
|
|
|
this._source = getLowlevelSourceObject();
|
|
|
|
|
|
|
|
// Every time there's data, we push it into the internal buffer.
|
|
|
|
this._source.ondata = (chunk) => {
|
|
|
|
// if push() returns false, then we need to stop reading from source
|
|
|
|
if (!this.push(chunk))
|
|
|
|
this._source.readStop();
|
|
|
|
};
|
|
|
|
|
|
|
|
// When the source ends, we push the EOF-signaling `null` chunk
|
|
|
|
this._source.onend = () => {
|
|
|
|
this.push(null);
|
|
|
|
};
|
|
|
|
}
|
|
|
|
|
|
|
|
// _read will be called when the stream wants to pull more data in
|
|
|
|
// the advisory size argument is ignored in this case.
|
|
|
|
SourceWrapper.prototype._read = function(size) {
|
|
|
|
this._source.readStart();
|
|
|
|
};
|
|
|
|
```
|
|
|
|
|
|
|
|
#### Example: A Counting Stream
|
|
|
|
|
|
|
|
<!--type=example-->
|
|
|
|
|
|
|
|
This is a basic example of a Readable stream. It emits the numerals
|
|
|
|
from 1 to 1,000,000 in ascending order, and then ends.
|
|
|
|
|
|
|
|
```js
|
|
|
|
const Readable = require('stream').Readable;
|
|
|
|
const util = require('util');
|
|
|
|
util.inherits(Counter, Readable);
|
|
|
|
|
|
|
|
function Counter(opt) {
|
|
|
|
Readable.call(this, opt);
|
|
|
|
this._max = 1000000;
|
|
|
|
this._index = 1;
|
|
|
|
}
|
|
|
|
|
|
|
|
Counter.prototype._read = function() {
|
|
|
|
var i = this._index++;
|
|
|
|
if (i > this._max)
|
|
|
|
this.push(null);
|
|
|
|
else {
|
|
|
|
var str = '' + i;
|
|
|
|
var buf = new Buffer(str, 'ascii');
|
|
|
|
this.push(buf);
|
|
|
|
}
|
|
|
|
};
|
|
|
|
```
|
|
|
|
|
|
|
|
#### Example: SimpleProtocol v1 (Sub-optimal)
|
|
|
|
|
|
|
|
This is similar to the `parseHeader` function described
|
|
|
|
[here](#stream_readable_unshift_chunk), but implemented as a custom stream.
|
|
|
|
Also, note that this implementation does not convert the incoming data to a
|
|
|
|
string.
|
|
|
|
|
|
|
|
However, this would be better implemented as a [Transform][] stream. See
|
|
|
|
[SimpleProtocol v2][] for a better implementation.
|
|
|
|
|
|
|
|
```js
|
|
|
|
// A parser for a simple data protocol.
|
|
|
|
// The "header" is a JSON object, followed by 2 \n characters, and
|
|
|
|
// then a message body.
|
|
|
|
//
|
|
|
|
// NOTE: This can be done more simply as a Transform stream!
|
|
|
|
// Using Readable directly for this is sub-optimal. See the
|
|
|
|
// alternative example below under the Transform section.
|
|
|
|
|
|
|
|
const Readable = require('stream').Readable;
|
|
|
|
const util = require('util');
|
|
|
|
|
|
|
|
util.inherits(SimpleProtocol, Readable);
|
|
|
|
|
|
|
|
function SimpleProtocol(source, options) {
|
|
|
|
if (!(this instanceof SimpleProtocol))
|
|
|
|
return new SimpleProtocol(source, options);
|
|
|
|
|
|
|
|
Readable.call(this, options);
|
|
|
|
this._inBody = false;
|
|
|
|
this._sawFirstCr = false;
|
|
|
|
|
|
|
|
// source is a readable stream, such as a socket or file
|
|
|
|
this._source = source;
|
|
|
|
|
|
|
|
var self = this;
|
|
|
|
source.on('end', () => {
|
|
|
|
self.push(null);
|
|
|
|
});
|
|
|
|
|
|
|
|
// give it a kick whenever the source is readable
|
|
|
|
// read(0) will not consume any bytes
|
|
|
|
source.on('readable', () => {
|
|
|
|
self.read(0);
|
|
|
|
});
|
|
|
|
|
|
|
|
this._rawHeader = [];
|
|
|
|
this.header = null;
|
|
|
|
}
|
|
|
|
|
|
|
|
SimpleProtocol.prototype._read = function(n) {
|
|
|
|
if (!this._inBody) {
|
|
|
|
var chunk = this._source.read();
|
|
|
|
|
|
|
|
// if the source doesn't have data, we don't have data yet.
|
|
|
|
if (chunk === null)
|
|
|
|
return this.push('');
|
|
|
|
|
|
|
|
// check if the chunk has a \n\n
|
|
|
|
var split = -1;
|
|
|
|
for (var i = 0; i < chunk.length; i++) {
|
|
|
|
if (chunk[i] === 10) { // '\n'
|
|
|
|
if (this._sawFirstCr) {
|
|
|
|
split = i;
|
|
|
|
break;
|
|
|
|
} else {
|
|
|
|
this._sawFirstCr = true;
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
this._sawFirstCr = false;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if (split === -1) {
|
|
|
|
// still waiting for the \n\n
|
|
|
|
// stash the chunk, and try again.
|
|
|
|
this._rawHeader.push(chunk);
|
|
|
|
this.push('');
|
|
|
|
} else {
|
|
|
|
this._inBody = true;
|
|
|
|
var h = chunk.slice(0, split);
|
|
|
|
this._rawHeader.push(h);
|
|
|
|
var header = Buffer.concat(this._rawHeader).toString();
|
|
|
|
try {
|
|
|
|
this.header = JSON.parse(header);
|
|
|
|
} catch (er) {
|
|
|
|
this.emit('error', new Error('invalid simple protocol data'));
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
// now, because we got some extra data, unshift the rest
|
|
|
|
// back into the read queue so that our consumer will see it.
|
|
|
|
var b = chunk.slice(split);
|
|
|
|
this.unshift(b);
|
|
|
|
// calling unshift by itself does not reset the reading state
|
|
|
|
// of the stream; since we're inside _read, doing an additional
|
|
|
|
// push('') will reset the state appropriately.
|
|
|
|
this.push('');
|
|
|
|
|
|
|
|
// and let them know that we are done parsing the header.
|
|
|
|
this.emit('header', this.header);
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
// from there on, just provide the data to our consumer.
|
|
|
|
// careful not to push(null), since that would indicate EOF.
|
|
|
|
var chunk = this._source.read();
|
|
|
|
if (chunk) this.push(chunk);
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
// Usage:
|
|
|
|
// var parser = new SimpleProtocol(source);
|
|
|
|
// Now parser is a readable stream that will emit 'header'
|
|
|
|
// with the parsed header data.
|
|
|
|
```
|
|
|
|
|
|
|
|
### Class: stream.Transform
|
|
|
|
|
|
|
|
A "transform" stream is a duplex stream where the output is causally
|
|
|
|
connected in some way to the input, such as a [zlib][] stream or a
|
|
|
|
[crypto][] stream.
|
|
|
|
|
|
|
|
There is no requirement that the output be the same size as the input,
|
|
|
|
the same number of chunks, or arrive at the same time. For example, a
|
|
|
|
Hash stream will only ever have a single chunk of output which is
|
|
|
|
provided when the input is ended. A zlib stream will produce output
|
|
|
|
that is either much smaller or much larger than its input.
|
|
|
|
|
|
|
|
Rather than implement the [`stream._read()`][stream-_read] and
|
|
|
|
[`stream._write()`][stream-_write] methods, Transform classes must implement the
|
|
|
|
[`stream._transform()`][stream-_transform] method, and may optionally
|
|
|
|
also implement the [`stream._flush()`][stream-_flush] method. (See below.)
|
|
|
|
|
|
|
|
#### new stream.Transform([options])
|
|
|
|
|
|
|
|
* `options` {Object} Passed to both Writable and Readable
|
|
|
|
constructors. Also has the following fields:
|
|
|
|
* `transform` {Function} Implementation for the
|
|
|
|
[`stream._transform()`][stream-_transform] method.
|
|
|
|
* `flush` {Function} Implementation for the [`stream._flush()`][stream-_flush]
|
|
|
|
method.
|
|
|
|
|
|
|
|
In classes that extend the Transform class, make sure to call the
|
|
|
|
constructor so that the buffering settings can be properly
|
|
|
|
initialized.
|
|
|
|
|
|
|
|
#### Events: 'finish' and 'end'
|
|
|
|
|
|
|
|
The [`'finish'`][] and [`'end'`][] events are from the parent Writable
|
|
|
|
and Readable classes respectively. The `'finish'` event is fired after
|
|
|
|
[`stream.end()`][stream-end] is called and all chunks have been processed by
|
|
|
|
[`stream._transform()`][stream-_transform], `'end'` is fired after all data has
|
|
|
|
been output which is after the callback in [`stream._flush()`][stream-_flush]
|
|
|
|
has been called.
|
|
|
|
|
|
|
|
#### transform.\_flush(callback)
|
|
|
|
|
|
|
|
* `callback` {Function} Call this function (optionally with an error
|
|
|
|
argument) when you are done flushing any remaining data.
|
|
|
|
|
|
|
|
Note: **This function MUST NOT be called directly.** It MAY be implemented
|
|
|
|
by child classes, and if so, will be called by the internal Transform
|
|
|
|
class methods only.
|
|
|
|
|
|
|
|
In some cases, your transform operation may need to emit a bit more
|
|
|
|
data at the end of the stream. For example, a `Zlib` compression
|
|
|
|
stream will store up some internal state so that it can optimally
|
|
|
|
compress the output. At the end, however, it needs to do the best it
|
|
|
|
can with what is left, so that the data will be complete.
|
|
|
|
|
|
|
|
In those cases, you can implement a `_flush()` method, which will be
|
|
|
|
called at the very end, after all the written data is consumed, but
|
|
|
|
before emitting [`'end'`][] to signal the end of the readable side. Just
|
|
|
|
like with [`stream._transform()`][stream-_transform], call
|
|
|
|
`transform.push(chunk)` zero or more times, as appropriate, and call `callback`
|
|
|
|
when the flush operation is complete.
|
|
|
|
|
|
|
|
This method is prefixed with an underscore because it is internal to
|
|
|
|
the class that defines it, and should not be called directly by user
|
|
|
|
programs. However, you **are** expected to override this method in
|
|
|
|
your own extension classes.
|
|
|
|
|
|
|
|
#### transform.\_transform(chunk, encoding, callback)
|
|
|
|
|
|
|
|
* `chunk` {Buffer|String} The chunk to be transformed. Will **always**
|
|
|
|
be a buffer unless the `decodeStrings` option was set to `false`.
|
|
|
|
* `encoding` {String} If the chunk is a string, then this is the
|
|
|
|
encoding type. If chunk is a buffer, then this is the special
|
|
|
|
value - 'buffer', ignore it in this case.
|
|
|
|
* `callback` {Function} Call this function (optionally with an error
|
|
|
|
argument and data) when you are done processing the supplied chunk.
|
|
|
|
|
|
|
|
Note: **This function MUST NOT be called directly.** It should be
|
|
|
|
implemented by child classes, and called by the internal Transform
|
|
|
|
class methods only.
|
|
|
|
|
|
|
|
All Transform stream implementations must provide a `_transform()`
|
|
|
|
method to accept input and produce output.
|
|
|
|
|
|
|
|
`_transform()` should do whatever has to be done in this specific
|
|
|
|
Transform class, to handle the bytes being written, and pass them off
|
|
|
|
to the readable portion of the interface. Do asynchronous I/O,
|
|
|
|
process things, and so on.
|
|
|
|
|
|
|
|
Call `transform.push(outputChunk)` 0 or more times to generate output
|
|
|
|
from this input chunk, depending on how much data you want to output
|
|
|
|
as a result of this chunk.
|
|
|
|
|
|
|
|
Call the callback function only when the current chunk is completely
|
|
|
|
consumed. Note that there may or may not be output as a result of any
|
|
|
|
particular input chunk. If you supply a second argument to the callback
|
|
|
|
it will be passed to the push method. In other words the following are
|
|
|
|
equivalent:
|
|
|
|
|
|
|
|
```js
|
|
|
|
transform.prototype._transform = function (data, encoding, callback) {
|
|
|
|
this.push(data);
|
|
|
|
callback();
|
|
|
|
};
|
|
|
|
|
|
|
|
transform.prototype._transform = function (data, encoding, callback) {
|
|
|
|
callback(null, data);
|
|
|
|
};
|
|
|
|
```
|
|
|
|
|
|
|
|
This method is prefixed with an underscore because it is internal to
|
|
|
|
the class that defines it, and should not be called directly by user
|
|
|
|
programs. However, you **are** expected to override this method in
|
|
|
|
your own extension classes.
|
|
|
|
|
|
|
|
#### Example: `SimpleProtocol` parser v2
|
|
|
|
|
|
|
|
The example [here](#stream_example_simpleprotocol_v1_sub_optimal) of a simple
|
|
|
|
protocol parser can be implemented simply by using the higher level
|
|
|
|
[Transform][] stream class, similar to the `parseHeader` and `SimpleProtocol
|
|
|
|
v1` examples.
|
|
|
|
|
|
|
|
In this example, rather than providing the input as an argument, it
|
|
|
|
would be piped into the parser, which is a more idiomatic Node.js stream
|
|
|
|
approach.
|
|
|
|
|
|
|
|
```javascript
|
|
|
|
const util = require('util');
|
|
|
|
const Transform = require('stream').Transform;
|
|
|
|
util.inherits(SimpleProtocol, Transform);
|
|
|
|
|
|
|
|
function SimpleProtocol(options) {
|
|
|
|
if (!(this instanceof SimpleProtocol))
|
|
|
|
return new SimpleProtocol(options);
|
|
|
|
|
|
|
|
Transform.call(this, options);
|
|
|
|
this._inBody = false;
|
|
|
|
this._sawFirstCr = false;
|
|
|
|
this._rawHeader = [];
|
|
|
|
this.header = null;
|
|
|
|
}
|
|
|
|
|
|
|
|
SimpleProtocol.prototype._transform = function(chunk, encoding, done) {
|
|
|
|
if (!this._inBody) {
|
|
|
|
// check if the chunk has a \n\n
|
|
|
|
var split = -1;
|
|
|
|
for (var i = 0; i < chunk.length; i++) {
|
|
|
|
if (chunk[i] === 10) { // '\n'
|
|
|
|
if (this._sawFirstCr) {
|
|
|
|
split = i;
|
|
|
|
break;
|
|
|
|
} else {
|
|
|
|
this._sawFirstCr = true;
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
this._sawFirstCr = false;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if (split === -1) {
|
|
|
|
// still waiting for the \n\n
|
|
|
|
// stash the chunk, and try again.
|
|
|
|
this._rawHeader.push(chunk);
|
|
|
|
} else {
|
|
|
|
this._inBody = true;
|
|
|
|
var h = chunk.slice(0, split);
|
|
|
|
this._rawHeader.push(h);
|
|
|
|
var header = Buffer.concat(this._rawHeader).toString();
|
|
|
|
try {
|
|
|
|
this.header = JSON.parse(header);
|
|
|
|
} catch (er) {
|
|
|
|
this.emit('error', new Error('invalid simple protocol data'));
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
// and let them know that we are done parsing the header.
|
|
|
|
this.emit('header', this.header);
|
|
|
|
|
|
|
|
// now, because we got some extra data, emit this first.
|
|
|
|
this.push(chunk.slice(split));
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
// from there on, just provide the data to our consumer as-is.
|
|
|
|
this.push(chunk);
|
|
|
|
}
|
|
|
|
done();
|
|
|
|
};
|
|
|
|
|
|
|
|
// Usage:
|
|
|
|
// var parser = new SimpleProtocol();
|
|
|
|
// source.pipe(parser)
|
|
|
|
// Now parser is a readable stream that will emit 'header'
|
|
|
|
// with the parsed header data.
|
|
|
|
```
|
|
|
|
|
|
|
|
### Class: stream.Writable
|
|
|
|
|
|
|
|
<!--type=class-->
|
|
|
|
|
|
|
|
`stream.Writable` is an abstract class designed to be extended with an
|
|
|
|
underlying implementation of the
|
|
|
|
[`stream._write(chunk, encoding, callback)`][stream-_write] method.
|
|
|
|
|
|
|
|
Please see [API for Stream Consumers][] for how to consume
|
|
|
|
writable streams in your programs. What follows is an explanation of
|
|
|
|
how to implement Writable streams in your programs.
|
|
|
|
|
|
|
|
#### new stream.Writable([options])
|
|
|
|
|
|
|
|
* `options` {Object}
|
|
|
|
* `highWaterMark` {Number} Buffer level when
|
|
|
|
[`stream.write()`][stream-write] starts returning `false`. Default = `16384`
|
|
|
|
(16kb), or `16` for `objectMode` streams.
|
|
|
|
* `decodeStrings` {Boolean} Whether or not to decode strings into
|
|
|
|
Buffers before passing them to [`stream._write()`][stream-_write].
|
|
|
|
Default = `true`
|
|
|
|
* `objectMode` {Boolean} Whether or not the
|
|
|
|
[`stream.write(anyObj)`][stream-write] is a valid operation. If set you can
|
|
|
|
write arbitrary data instead of only `Buffer` / `String` data.
|
|
|
|
Default = `false`
|
|
|
|
* `write` {Function} Implementation for the
|
|
|
|
[`stream._write()`][stream-_write] method.
|
|
|
|
* `writev` {Function} Implementation for the
|
|
|
|
[`stream._writev()`][stream-_writev] method.
|
|
|
|
|
|
|
|
In classes that extend the Writable class, make sure to call the
|
|
|
|
constructor so that the buffering settings can be properly
|
|
|
|
initialized.
|
|
|
|
|
|
|
|
#### writable.\_write(chunk, encoding, callback)
|
|
|
|
|
|
|
|
* `chunk` {Buffer|String} The chunk to be written. Will **always**
|
|
|
|
be a buffer unless the `decodeStrings` option was set to `false`.
|
|
|
|
* `encoding` {String} If the chunk is a string, then this is the
|
|
|
|
encoding type. If chunk is a buffer, then this is the special
|
|
|
|
value - 'buffer', ignore it in this case.
|
|
|
|
* `callback` {Function} Call this function (optionally with an error
|
|
|
|
argument) when you are done processing the supplied chunk.
|
|
|
|
|
|
|
|
All Writable stream implementations must provide a
|
|
|
|
[`stream._write()`][stream-_write] method to send data to the underlying
|
|
|
|
resource.
|
|
|
|
|
|
|
|
Note: **This function MUST NOT be called directly.** It should be
|
|
|
|
implemented by child classes, and called by the internal Writable
|
|
|
|
class methods only.
|
|
|
|
|
|
|
|
Call the callback using the standard `callback(error)` pattern to
|
|
|
|
signal that the write completed successfully or with an error.
|
|
|
|
|
|
|
|
If the `decodeStrings` flag is set in the constructor options, then
|
|
|
|
`chunk` may be a string rather than a Buffer, and `encoding` will
|
|
|
|
indicate the sort of string that it is. This is to support
|
|
|
|
implementations that have an optimized handling for certain string
|
|
|
|
data encodings. If you do not explicitly set the `decodeStrings`
|
|
|
|
option to `false`, then you can safely ignore the `encoding` argument,
|
|
|
|
and assume that `chunk` will always be a Buffer.
|
|
|
|
|
|
|
|
This method is prefixed with an underscore because it is internal to
|
|
|
|
the class that defines it, and should not be called directly by user
|
|
|
|
programs. However, you **are** expected to override this method in
|
|
|
|
your own extension classes.
|
|
|
|
|
|
|
|
#### writable.\_writev(chunks, callback)
|
|
|
|
|
|
|
|
* `chunks` {Array} The chunks to be written. Each chunk has following
|
|
|
|
format: `{ chunk: ..., encoding: ... }`.
|
|
|
|
* `callback` {Function} Call this function (optionally with an error
|
|
|
|
argument) when you are done processing the supplied chunks.
|
|
|
|
|
|
|
|
Note: **This function MUST NOT be called directly.** It may be
|
|
|
|
implemented by child classes, and called by the internal Writable
|
|
|
|
class methods only.
|
|
|
|
|
|
|
|
This function is completely optional to implement. In most cases it is
|
|
|
|
unnecessary. If implemented, it will be called with all the chunks
|
|
|
|
that are buffered in the write queue.
|
|
|
|
|
|
|
|
|
|
|
|
## Simplified Constructor API
|
|
|
|
|
|
|
|
<!--type=misc-->
|
|
|
|
|
|
|
|
In simple cases there is now the added benefit of being able to construct a
|
|
|
|
stream without inheritance.
|
|
|
|
|
|
|
|
This can be done by passing the appropriate methods as constructor options:
|
|
|
|
|
|
|
|
Examples:
|
|
|
|
|
|
|
|
### Duplex
|
|
|
|
|
|
|
|
```js
|
|
|
|
var duplex = new stream.Duplex({
|
|
|
|
read: function(n) {
|
|
|
|
// sets this._read under the hood
|
|
|
|
|
|
|
|
// push data onto the read queue, passing null
|
|
|
|
// will signal the end of the stream (EOF)
|
|
|
|
this.push(chunk);
|
|
|
|
},
|
|
|
|
write: function(chunk, encoding, next) {
|
|
|
|
// sets this._write under the hood
|
|
|
|
|
|
|
|
// An optional error can be passed as the first argument
|
|
|
|
next()
|
|
|
|
}
|
|
|
|
});
|
|
|
|
|
|
|
|
// or
|
|
|
|
|
|
|
|
var duplex = new stream.Duplex({
|
|
|
|
read: function(n) {
|
|
|
|
// sets this._read under the hood
|
|
|
|
|
|
|
|
// push data onto the read queue, passing null
|
|
|
|
// will signal the end of the stream (EOF)
|
|
|
|
this.push(chunk);
|
|
|
|
},
|
|
|
|
writev: function(chunks, next) {
|
|
|
|
// sets this._writev under the hood
|
|
|
|
|
|
|
|
// An optional error can be passed as the first argument
|
|
|
|
next()
|
|
|
|
}
|
|
|
|
});
|
|
|
|
```
|
|
|
|
|
|
|
|
### Readable
|
|
|
|
|
|
|
|
```js
|
|
|
|
var readable = new stream.Readable({
|
|
|
|
read: function(n) {
|
|
|
|
// sets this._read under the hood
|
|
|
|
|
|
|
|
// push data onto the read queue, passing null
|
|
|
|
// will signal the end of the stream (EOF)
|
|
|
|
this.push(chunk);
|
|
|
|
}
|
|
|
|
});
|
|
|
|
```
|
|
|
|
|
|
|
|
### Transform
|
|
|
|
|
|
|
|
```js
|
|
|
|
var transform = new stream.Transform({
|
|
|
|
transform: function(chunk, encoding, next) {
|
|
|
|
// sets this._transform under the hood
|
|
|
|
|
|
|
|
// generate output as many times as needed
|
|
|
|
// this.push(chunk);
|
|
|
|
|
|
|
|
// call when the current chunk is consumed
|
|
|
|
next();
|
|
|
|
},
|
|
|
|
flush: function(done) {
|
|
|
|
// sets this._flush under the hood
|
|
|
|
|
|
|
|
// generate output as many times as needed
|
|
|
|
// this.push(chunk);
|
|
|
|
|
|
|
|
done();
|
|
|
|
}
|
|
|
|
});
|
|
|
|
```
|
|
|
|
|
|
|
|
### Writable
|
|
|
|
|
|
|
|
```js
|
|
|
|
var writable = new stream.Writable({
|
|
|
|
write: function(chunk, encoding, next) {
|
|
|
|
// sets this._write under the hood
|
|
|
|
|
|
|
|
// An optional error can be passed as the first argument
|
|
|
|
next()
|
|
|
|
}
|
|
|
|
});
|
|
|
|
|
|
|
|
// or
|
|
|
|
|
|
|
|
var writable = new stream.Writable({
|
|
|
|
writev: function(chunks, next) {
|
|
|
|
// sets this._writev under the hood
|
|
|
|
|
|
|
|
// An optional error can be passed as the first argument
|
|
|
|
next()
|
|
|
|
}
|
|
|
|
});
|
|
|
|
```
|
|
|
|
|
|
|
|
## Streams: Under the Hood
|
|
|
|
|
|
|
|
<!--type=misc-->
|
|
|
|
|
|
|
|
### Buffering
|
|
|
|
|
|
|
|
<!--type=misc-->
|
|
|
|
|
|
|
|
Both Writable and Readable streams will buffer data on an internal
|
|
|
|
object which can be retrieved from `_writableState.getBuffer()` or
|
|
|
|
`_readableState.buffer`, respectively.
|
|
|
|
|
|
|
|
The amount of data that will potentially be buffered depends on the
|
|
|
|
`highWaterMark` option which is passed into the constructor.
|
|
|
|
|
|
|
|
Buffering in Readable streams happens when the implementation calls
|
|
|
|
[`stream.push(chunk)`][stream-push]. If the consumer of the Stream does not
|
|
|
|
call [`stream.read()`][stream-read], then the data will sit in the internal
|
|
|
|
queue until it is consumed.
|
|
|
|
|
|
|
|
Buffering in Writable streams happens when the user calls
|
|
|
|
[`stream.write(chunk)`][stream-write] repeatedly, even when it returns `false`.
|
|
|
|
|
|
|
|
The purpose of streams, especially with the [`stream.pipe()`][] method, is to
|
|
|
|
limit the buffering of data to acceptable levels, so that sources and
|
|
|
|
destinations of varying speed will not overwhelm the available memory.
|
|
|
|
|
|
|
|
### Compatibility with Older Node.js Versions
|
|
|
|
|
|
|
|
<!--type=misc-->
|
|
|
|
|
|
|
|
In versions of Node.js prior to v0.10, the Readable stream interface was
|
|
|
|
simpler, but also less powerful and less useful.
|
|
|
|
|
|
|
|
* Rather than waiting for you to call the [`stream.read()`][stream-read] method,
|
|
|
|
[`'data'`][] events would start emitting immediately. If you needed to do
|
|
|
|
some I/O to decide how to handle data, then you had to store the chunks
|
|
|
|
in some kind of buffer so that they would not be lost.
|
|
|
|
* The [`stream.pause()`][stream-pause] method was advisory, rather than
|
|
|
|
guaranteed. This meant that you still had to be prepared to receive
|
|
|
|
[`'data'`][] events even when the stream was in a paused state.
|
|
|
|
|
|
|
|
In Node.js v0.10, the [Readable][] class was added.
|
|
|
|
For backwards compatibility with older Node.js programs, Readable streams
|
|
|
|
switch into "flowing mode" when a [`'data'`][] event handler is added, or
|
|
|
|
when the [`stream.resume()`][stream-resume] method is called. The effect is
|
|
|
|
that, even if you are not using the new [`stream.read()`][stream-read] method
|
|
|
|
and [`'readable'`][] event, you no longer have to worry about losing
|
|
|
|
[`'data'`][] chunks.
|
|
|
|
|
|
|
|
Most programs will continue to function normally. However, this
|
|
|
|
introduces an edge case in the following conditions:
|
|
|
|
|
|
|
|
* No [`'data'`][] event handler is added.
|
|
|
|
* The [`stream.resume()`][stream-resume] method is never called.
|
|
|
|
* The stream is not piped to any writable destination.
|
|
|
|
|
|
|
|
For example, consider the following code:
|
|
|
|
|
|
|
|
```js
|
|
|
|
// WARNING! BROKEN!
|
|
|
|
net.createServer((socket) => {
|
|
|
|
|
|
|
|
// we add an 'end' method, but never consume the data
|
|
|
|
socket.on('end', () => {
|
|
|
|
// It will never get here.
|
|
|
|
socket.end('I got your message (but didnt read it)\n');
|
|
|
|
});
|
|
|
|
|
|
|
|
}).listen(1337);
|
|
|
|
```
|
|
|
|
|
|
|
|
In versions of Node.js prior to v0.10, the incoming message data would be
|
|
|
|
simply discarded. However, in Node.js v0.10 and beyond,
|
|
|
|
the socket will remain paused forever.
|
|
|
|
|
|
|
|
The workaround in this situation is to call the
|
|
|
|
[`stream.resume()`][stream-resume] method to start the flow of data:
|
|
|
|
|
|
|
|
```js
|
|
|
|
// Workaround
|
|
|
|
net.createServer((socket) => {
|
|
|
|
|
|
|
|
socket.on('end', () => {
|
|
|
|
socket.end('I got your message (but didnt read it)\n');
|
|
|
|
});
|
|
|
|
|
|
|
|
// start the flow of data, discarding it.
|
|
|
|
socket.resume();
|
|
|
|
|
|
|
|
}).listen(1337);
|
|
|
|
```
|
|
|
|
|
|
|
|
In addition to new Readable streams switching into flowing mode,
|
|
|
|
pre-v0.10 style streams can be wrapped in a Readable class using the
|
|
|
|
[`stream.wrap()`][] method.
|
|
|
|
|
|
|
|
|
|
|
|
### Object Mode
|
|
|
|
|
|
|
|
<!--type=misc-->
|
|
|
|
|
|
|
|
Normally, Streams operate on Strings and Buffers exclusively.
|
|
|
|
|
|
|
|
Streams that are in **object mode** can emit generic JavaScript values
|
|
|
|
other than Buffers and Strings.
|
|
|
|
|
|
|
|
A Readable stream in object mode will always return a single item from
|
|
|
|
a call to [`stream.read(size)`][stream-read], regardless of what the size
|
|
|
|
argument is.
|
|
|
|
|
|
|
|
A Writable stream in object mode will always ignore the `encoding`
|
|
|
|
argument to [`stream.write(data, encoding)`][stream-write].
|
|
|
|
|
|
|
|
The special value `null` still retains its special value for object
|
|
|
|
mode streams. That is, for object mode readable streams, `null` as a
|
|
|
|
return value from [`stream.read()`][stream-read] indicates that there is no more
|
|
|
|
data, and [`stream.push(null)`][stream-push] will signal the end of stream data
|
|
|
|
(`EOF`).
|
|
|
|
|
|
|
|
No streams in Node.js core are object mode streams. This pattern is only
|
|
|
|
used by userland streaming libraries.
|
|
|
|
|
|
|
|
You should set `objectMode` in your stream child class constructor on
|
|
|
|
the options object. Setting `objectMode` mid-stream is not safe.
|
|
|
|
|
|
|
|
For Duplex streams `objectMode` can be set exclusively for readable or
|
|
|
|
writable side with `readableObjectMode` and `writableObjectMode`
|
|
|
|
respectively. These options can be used to implement parsers and
|
|
|
|
serializers with Transform streams.
|
|
|
|
|
|
|
|
```js
|
|
|
|
const util = require('util');
|
|
|
|
const StringDecoder = require('string_decoder').StringDecoder;
|
|
|
|
const Transform = require('stream').Transform;
|
|
|
|
util.inherits(JSONParseStream, Transform);
|
|
|
|
|
|
|
|
// Gets \n-delimited JSON string data, and emits the parsed objects
|
|
|
|
function JSONParseStream() {
|
|
|
|
if (!(this instanceof JSONParseStream))
|
|
|
|
return new JSONParseStream();
|
|
|
|
|
|
|
|
Transform.call(this, { readableObjectMode : true });
|
|
|
|
|
|
|
|
this._buffer = '';
|
|
|
|
this._decoder = new StringDecoder('utf8');
|
|
|
|
}
|
|
|
|
|
|
|
|
JSONParseStream.prototype._transform = function(chunk, encoding, cb) {
|
|
|
|
this._buffer += this._decoder.write(chunk);
|
|
|
|
// split on newlines
|
|
|
|
var lines = this._buffer.split(/\r?\n/);
|
|
|
|
// keep the last partial line buffered
|
|
|
|
this._buffer = lines.pop();
|
|
|
|
for (var l = 0; l < lines.length; l++) {
|
|
|
|
var line = lines[l];
|
|
|
|
try {
|
|
|
|
var obj = JSON.parse(line);
|
|
|
|
} catch (er) {
|
|
|
|
this.emit('error', er);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
// push the parsed object out to the readable consumer
|
|
|
|
this.push(obj);
|
|
|
|
}
|
|
|
|
cb();
|
|
|
|
};
|
|
|
|
|
|
|
|
JSONParseStream.prototype._flush = function(cb) {
|
|
|
|
// Just handle any leftover
|
|
|
|
var rem = this._buffer.trim();
|
|
|
|
if (rem) {
|
|
|
|
try {
|
|
|
|
var obj = JSON.parse(rem);
|
|
|
|
} catch (er) {
|
|
|
|
this.emit('error', er);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
// push the parsed object out to the readable consumer
|
|
|
|
this.push(obj);
|
|
|
|
}
|
|
|
|
cb();
|
|
|
|
};
|
|
|
|
```
|
|
|
|
|
|
|
|
### `stream.read(0)`
|
|
|
|
|
|
|
|
There are some cases where you want to trigger a refresh of the
|
|
|
|
underlying readable stream mechanisms, without actually consuming any
|
|
|
|
data. In that case, you can call `stream.read(0)`, which will always
|
|
|
|
return null.
|
|
|
|
|
|
|
|
If the internal read buffer is below the `highWaterMark`, and the
|
|
|
|
stream is not currently reading, then calling `stream.read(0)` will trigger
|
|
|
|
a low-level [`stream._read()`][stream-_read] call.
|
|
|
|
|
|
|
|
There is almost never a need to do this. However, you will see some
|
|
|
|
cases in Node.js's internals where this is done, particularly in the
|
|
|
|
Readable stream class internals.
|
|
|
|
|
|
|
|
### `stream.push('')`
|
|
|
|
|
|
|
|
Pushing a zero-byte string or Buffer (when not in [Object mode][]) has an
|
|
|
|
interesting side effect. Because it *is* a call to
|
|
|
|
[`stream.push()`][stream-push], it will end the `reading` process. However, it
|
|
|
|
does *not* add any data to the readable buffer, so there's nothing for
|
|
|
|
a user to consume.
|
|
|
|
|
|
|
|
Very rarely, there are cases where you have no data to provide now,
|
|
|
|
but the consumer of your stream (or, perhaps, another bit of your own
|
|
|
|
code) will know when to check again, by calling [`stream.read(0)`][stream-read].
|
|
|
|
In those cases, you *may* call `stream.push('')`.
|
|
|
|
|
|
|
|
So far, the only use case for this functionality is in the
|
|
|
|
[`tls.CryptoStream`][] class, which is deprecated in Node.js/io.js v1.0. If you
|
|
|
|
find that you have to use `stream.push('')`, please consider another
|
|
|
|
approach, because it almost certainly indicates that something is
|
|
|
|
horribly wrong.
|
|
|
|
|
|
|
|
[`'data'`]: #stream_event_data
|
|
|
|
[`'drain'`]: #stream_event_drain
|
|
|
|
[`'end'`]: #stream_event_end
|
|
|
|
[`'finish'`]: #stream_event_finish
|
|
|
|
[`'readable'`]: #stream_event_readable
|
|
|
|
[`buf.toString(encoding)`]: https://nodejs.org/docs/v5.8.0/api/buffer.html#buffer_buf_tostring_encoding_start_end
|
|
|
|
[`EventEmitter`]: https://nodejs.org/docs/v5.8.0/api/events.html#events_class_eventemitter
|
|
|
|
[`process.stderr`]: https://nodejs.org/docs/v5.8.0/api/process.html#process_process_stderr
|
|
|
|
[`process.stdin`]: https://nodejs.org/docs/v5.8.0/api/process.html#process_process_stdin
|
|
|
|
[`process.stdout`]: https://nodejs.org/docs/v5.8.0/api/process.html#process_process_stdout
|
|
|
|
[`stream.cork()`]: #stream_writable_cork
|
|
|
|
[`stream.pipe()`]: #stream_readable_pipe_destination_options
|
|
|
|
[`stream.uncork()`]: #stream_writable_uncork
|
|
|
|
[`stream.unpipe()`]: #stream_readable_unpipe_destination
|
|
|
|
[`stream.wrap()`]: #stream_readable_wrap_stream
|
|
|
|
[`tls.CryptoStream`]: https://nodejs.org/docs/v5.8.0/api/tls.html#tls_class_cryptostream
|
|
|
|
[`util.inherits()`]: https://nodejs.org/docs/v5.8.0/api/util.html#util_util_inherits_constructor_superconstructor
|
|
|
|
[API for Stream Consumers]: #stream_api_for_stream_consumers
|
|
|
|
[API for Stream Implementors]: #stream_api_for_stream_implementors
|
|
|
|
[child process stdin]: https://nodejs.org/docs/v5.8.0/api/child_process.html#child_process_child_stdin
|
|
|
|
[child process stdout and stderr]: https://nodejs.org/docs/v5.8.0/api/child_process.html#child_process_child_stdout
|
|
|
|
[Compatibility]: #stream_compatibility_with_older_node_js_versions
|
|
|
|
[crypto]: crypto.html
|
|
|
|
[Duplex]: #stream_class_stream_duplex
|
|
|
|
[fs read streams]: https://nodejs.org/docs/v5.8.0/api/fs.html#fs_class_fs_readstream
|
|
|
|
[fs write streams]: https://nodejs.org/docs/v5.8.0/api/fs.html#fs_class_fs_writestream
|
|
|
|
[HTTP requests, on the client]: https://nodejs.org/docs/v5.8.0/api/http.html#http_class_http_clientrequest
|
|
|
|
[HTTP responses, on the server]: https://nodejs.org/docs/v5.8.0/api/http.html#http_class_http_serverresponse
|
|
|
|
[http-incoming-message]: https://nodejs.org/docs/v5.8.0/api/http.html#http_class_http_incomingmessage
|
|
|
|
[Object mode]: #stream_object_mode
|
|
|
|
[Readable]: #stream_class_stream_readable
|
|
|
|
[SimpleProtocol v2]: #stream_example_simpleprotocol_parser_v2
|
|
|
|
[stream-_flush]: #stream_transform_flush_callback
|
|
|
|
[stream-_read]: #stream_readable_read_size_1
|
|
|
|
[stream-_transform]: #stream_transform_transform_chunk_encoding_callback
|
|
|
|
[stream-_write]: #stream_writable_write_chunk_encoding_callback_1
|
|
|
|
[stream-_writev]: #stream_writable_writev_chunks_callback
|
|
|
|
[stream-end]: #stream_writable_end_chunk_encoding_callback
|
|
|
|
[stream-pause]: #stream_readable_pause
|
|
|
|
[stream-push]: #stream_readable_push_chunk_encoding
|
|
|
|
[stream-read]: #stream_readable_read_size
|
|
|
|
[stream-resume]: #stream_readable_resume
|
|
|
|
[stream-write]: #stream_writable_write_chunk_encoding_callback
|
|
|
|
[TCP sockets]: https://nodejs.org/docs/v5.8.0/api/net.html#net_class_net_socket
|
|
|
|
[Transform]: #stream_class_stream_transform
|
|
|
|
[Writable]: #stream_class_stream_writable
|
|
|
|
[zlib]: zlib.html
|