diff --git a/doc/api/stream.md b/doc/api/stream.md index be36b64b9e..4892183428 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -2,61 +2,123 @@ Stability: 2 - Stable -A stream is an abstract interface implemented by various objects in -Node.js. For example a [request to an HTTP server][http-incoming-message] is a -stream, as is [`process.stdout`][]. Streams are readable, writable, or both. All -streams are instances of [`EventEmitter`][]. +A stream is an abstract interface for working with streaming data in Node.js. +The `stream` module provides a base API that makes it easy to build objects +that implement the stream interface. -You can load the Stream base classes by doing `require('stream')`. -There are base classes provided for [Readable][] streams, [Writable][] -streams, [Duplex][] streams, and [Transform][] streams. +There are many stream objects provided by Node.js. For instance, a +[request to an HTTP server][http-incoming-message] and [`process.stdout`][] +are both stream instances. -This document is split up into 3 sections: +Streams can be readable, writable, or both. All streams are instances of +[`EventEmitter`][]. -1. The first section explains the parts of the API that you need to be - aware of to use streams in your programs. -2. The second section explains the parts of the API that you need to - use if you implement your own custom streams yourself. The API is designed to - make this easy for you to do. -3. The third section goes into more depth about how streams work, - including some of the internal mechanisms and functions that you - should probably not modify unless you definitely know what you are - doing. +The `stream` module can be accessed using: +```js +const stream = require('stream'); +``` -## API for Stream Consumers +While it is important for all Node.js users to understand how streams works, +the `stream` module itself is most useful for developer's that are creating new +types of stream instances. Developer's who are primarily *consuming* stream +objects will rarely (if ever) have need to use the `stream` module directly. + +## Organization of this document + +This document is divided into two primary sections and third section for +additional notes. The first section explains the elements of the stream API that +are required to *use* streams within an application. The second section explains +the elements of the API that are required to *implement* new types of streams. + +## Types of Streams + +There are four fundamental stream types within Node.js: + +* [Readable][] - streams from which data can be read (for example + [`fs.createReadStream()`][]). +* [Writable][] - streams to which data can be written (for example + [`fs.createWriteStream`][]). +* [Duplex][] - streams that are both Readable and Writable (for example + [`net.Socket`][]). +* [Transform][] - Duplex streams that can modify or transform the data as it + is written and read (for example [`zlib.createDeflate()`][]). + +### Object Mode + +All streams created by Node.js APIs operate exclusively on strings and `Buffer` +objects. It is possible, however, for stream implementations to work with other +types of JavaScript values (with the exception of `null` which serves a special +purpose within streams). Such streams are considered to operate in "object +mode". + +Stream instances are switched into object mode using the `objectMode` option +when the stream is created. Attempting to switch an existing stream into +object mode is not safe. + +### Buffering -Streams can be either [Readable][], [Writable][], or both ([Duplex][]). +Both [Writable][] and [Readable][] streams will store data in an internal +buffer that can be retrieved using `writable._writableState.getBuffer()` or +`readable._readableState.buffer`, respectively. + +The amount of data potentially buffered depends on the `highWaterMark` option +passed into the streams constructor. For normal streams, the `highWaterMark` +option specifies a total number of bytes. For streams operating in object mode, +the `highWaterMark` specifies a total number of objects. + +Data is buffered in Readable streams when the implementation calls +[`stream.push(chunk)`][stream-push]. If the consumer of the Stream does not +call [`stream.read()`][stream-read], the data will sit in the internal +queue until it is consumed. -All streams are EventEmitters, but they also have other custom methods -and properties depending on whether they are Readable, Writable, or -Duplex. +Once the total size of the internal read buffer reaches the threshold specified +by `highWaterMark`, the stream will temporarily stop reading data from the +underlying resource until the data currently buffered can be consumed (that is, +the stream will stop calling the internal `readable.\_read()` method that is +used to fill the read buffer). + +Data is buffered in Writable streams when the +[`writable.write(chunk)`][stream-write] method is called repeatedly. While the +total size of the internal write buffer is below the threshold set by +`highWaterMark`, calls to `writable.write()` will return `true`. Once the +the size of the internal buffer reaches or exceeds the `highWaterMark`, `false` +will be returned. + +A key goal of the `stream` API, an in particular the [`stream.pipe()`] method, +is to limit the buffering of data to acceptable levels such that sources and +destinations of differing speeds will not overwhelm the available memory. + +Because [Duplex][] and [Transform][] streams are both Readable and Writable, +each maintain *two* separate internal buffers used for reading and writing, +allowing each side to operate independently of the other while maintaining an +appropriate and efficient flow of data. For example, [`net.Socket`][] instances +are [Duplex][] streams whose Readable side allows consumption of data received +*from* the socket and whose Writable side allows writing data *to* the socket. +Because data may be written to the socket at a faster or slower rate than data +is received, it is important each side operate (and buffer) independently of +the other. -If a stream is both Readable and Writable, then it implements all of -the methods and events. So, a [Duplex][] or [Transform][] stream is -fully described by this API, though their implementation may be -somewhat different. +## API for Stream Consumers -It is not necessary to implement Stream interfaces in order to consume -streams in your programs. If you **are** implementing streaming -interfaces in your own program, please also refer to -[API for Stream Implementors][]. + -Almost all Node.js programs, no matter how simple, use Streams in some -way. Here is an example of using Streams in an Node.js program: +Almost all Node.js applications, no matter how simple, use streams in some +manner. The following is an example of using streams in a Node.js application +that implements an HTTP server: ```js const http = require('http'); -var server = http.createServer( (req, res) => { +const server = http.createServer( (req, res) => { // req is an http.IncomingMessage, which is a Readable Stream // res is an http.ServerResponse, which is a Writable Stream var body = ''; - // we want to get the data as utf8 strings - // If you don't set an encoding, then you'll get Buffer objects + // Get the data as utf8 strings. + // If an encoding is not set, Buffer objects will be received. req.setEncoding('utf8'); // Readable streams emit 'data' events once a listener is added @@ -64,10 +126,10 @@ var server = http.createServer( (req, res) => { body += chunk; }); - // the end event tells you that you have entire body + // the end event indicates that the entire body has been received req.on('end', () => { try { - var data = JSON.parse(body); + const data = JSON.parse(body); } catch (er) { // uh oh! bad json! res.statusCode = 400; @@ -90,60 +152,272 @@ server.listen(1337); // error: Unexpected token o ``` -### Class: stream.Duplex +[Writable][] streams (such as `res` in the example) expose methods such as +`write()` and `end()` that are used to write data onto the stream. -Duplex streams are streams that implement both the [Readable][] and -[Writable][] interfaces. +[Readable][] streams use the [`EventEmitter`][] API for notifying application +code when data is available to be read off the stream. That available data can +be read from the stream in multiple ways. -Examples of Duplex streams include: +Both [Writable][] and [Readable][] streams use the [`EventEmitter`][] API in +various ways to communicate the current state of the stream. -* [TCP sockets][] +[Duplex][] and [Transform][] streams are both [Writable][] and [Readable][]. + +Applications that are either writing data to or consuming data from a stream +are not required to implement the stream interfaces directly and will generally +have no reason to call `require('stream')`. + +Developers wishing to implement new types of streams should refer to the +section [API for Stream Implemeters][]. + +### Writable Streams + +Writable streams are an abstraction for a *destination* to which data is +written. + +Examples of [Writable][] streams include: + +* [HTTP requests, on the client][] +* [HTTP responses, on the server][] +* [fs write streams][] * [zlib streams][zlib] * [crypto streams][crypto] +* [TCP sockets][] +* [child process stdin][] +* [`process.stdout`][], [`process.stderr`][] + +*Note*: Some of these examples are actually [Duplex][] streams that implement +the [Writable][] interface. + +All [Writable][] streams implement the interface defined by the +`stream.Writable` class. + +While specific instances of [Writable][] streams may differ in various ways, +all Writable streams follow the same fundamental usage pattern as illustrated +in the example below: + +```js +const myStream = getWritableStreamSomehow(); +myStream.write('some data'); +myStream.write('some more data'); +myStream.end('done writing data'); +``` -### Class: stream.Readable +#### Class: stream.Writable -The Readable stream interface is the abstraction for a *source* of -data that you are reading from. In other words, data comes *out* of a -Readable stream. +##### Event: 'close' -A Readable stream will not start emitting data until you indicate that -you are ready to receive it. +The `'close'` event is emitted when the stream and any of its underlying +resources (a file descriptor, for example) have been closed. The event indicates +that no more events will be emitted, and no further computation will occur. -Readable streams have two "modes": a **flowing mode** and a **paused -mode**. When in flowing mode, data is read from the underlying system -and provided to your program as fast as possible. In paused mode, you -must explicitly call [`stream.read()`][stream-read] to get chunks of data out. -Streams start out in paused mode. +Not all Writable streams will emit the `'close'` event. -**Note**: If no data event handlers are attached, and there are no -[`stream.pipe()`][] destinations, and the stream is switched into flowing -mode, then data will be lost. +##### Event: 'drain' -You can switch to flowing mode by doing any of the following: +If a call to [`stream.write(chunk)`][stream-write] returns `false`, the +`'drain'` event will be emitted when it is appropriate to resume writing data +to the stream. -* Adding a [`'data'`][] event handler to listen for data. -* Calling the [`stream.resume()`][stream-resume] method to explicitly open the - flow. -* Calling the [`stream.pipe()`][] method to send the data to a [Writable][]. +```js +// Write the data to the supplied writable stream one million times. +// Be attentive to back-pressure. +function writeOneMillionTimes(writer, data, encoding, callback) { + var i = 1000000; + write(); + function write() { + var ok = true; + do { + i--; + if (i === 0) { + // last time! + writer.write(data, encoding, callback); + } else { + // see if we should continue, or wait + // don't pass the callback, because we're not done yet. + ok = writer.write(data, encoding); + } + } while (i > 0 && ok); + if (i > 0) { + // had to stop early! + // write some more once it drains + writer.once('drain', write); + } + } +} +``` -You can switch back to paused mode by doing either of the following: +##### Event: 'error' -* If there are no pipe destinations, by calling the - [`stream.pause()`][stream-pause] method. -* If there are pipe destinations, by removing any [`'data'`][] event - handlers, and removing all pipe destinations by calling the - [`stream.unpipe()`][] method. +* {Error} + +The `'error'` event is emitted if an error occurred while writing or piping +data. The listener callback is passed a single `Error` argument when called. + +*Note*: The stream is not closed when the `'error'` event is emitted. + +##### Event: 'finish' + +The `'finish'` event is emitted after the [`stream.end()`][stream-end] method +has been called, and all data has been flushed to the underlying system. + +```js +const writer = getWritableStreamSomehow(); +for (var i = 0; i < 100; i ++) { + writer.write('hello, #${i}!\n'); +} +writer.end('This is the end\n'); +writer.on('finish', () => { + console.error('All writes are now complete.'); +}); +``` + +##### Event: 'pipe' + +* `src` {stream.Readable} source stream that is piping to this writable + +The `'pipe'` event is emitted when the [`stream.pipe()`][] method is called on +a readable stream, adding this writable to its set of destinations. + +```js +const writer = getWritableStreamSomehow(); +const reader = getReadableStreamSomehow(); +writer.on('pipe', (src) => { + console.error('something is piping into the writer'); + assert.equal(src, reader); +}); +reader.pipe(writer); +``` + +##### Event: 'unpipe' + +* `src` {[Readable][] Stream} The source stream that + [unpiped][`stream.unpipe()`] this writable + +The `'unpipe'` event is emitted when the [`stream.unpipe()`][] method is called +on a [Readable][] stream, removing this [Writable][] from its set of +destinations. + +```js +const writer = getWritableStreamSomehow(); +const reader = getReadableStreamSomehow(); +writer.on('unpipe', (src) => { + console.error('Something has stopped piping into the writer.'); + assert.equal(src, reader); +}); +reader.pipe(writer); +reader.unpipe(writer); +``` + +##### writable.cork() + +The `writable.cork()` method forces all written data to be buffered in memory. +The buffered data will be flushed when either the [`stream.uncork()`][] or +[`stream.end()`][stream-end] methods are called. + +The primary intent of `writable.cork()` is to avoid a situation where writing +many small chunks of data to a stream do not cause an backup in the internal +buffer that would have an adverse impact on performance. In such situations, +implementations that implement the `writable.\_writev()` method can perform +buffered writes in a more optimized manner. + +##### writable.end([chunk][, encoding][, callback]) + +* `chunk` {String|Buffer|any} Optional data to write. For streams not operating + in object mode, `chunk` must be a string or a `Buffer`. For object mode + streams, `chunk` may be any JavaScript value other than `null`. +* `encoding` {String} The encoding, if `chunk` is a String +* `callback` {Function} Optional callback for when the stream is finished + +Calling the `writable.end()` method signals that no more data will be written +to the [Writable][]. The optional `chunk` and `encoding` arguments allow one +final additional chunk of data to be written immediately before closing the +stream. If provided, the optional `callback` function is attached as a listener +for the [`'finish'`][] event. + +Calling the [`stream.write()`][stream-write] method after calling +[`stream.end()`][stream-end] will raise an error. + +```js +// write 'hello, ' and then end with 'world!' +const file = fs.createWriteStream('example.txt'); +file.write('hello, '); +file.end('world!'); +// writing more now is not allowed! +``` + +##### writable.setDefaultEncoding(encoding) + +* `encoding` {String} The new default encoding +* Return: `this` + +The `writable.setDefaultEncoding()` method sets the default `encoding` for a +[Writable][] stream. + +##### writable.uncork() + +The `writable.uncork()` method flushes all data buffered since +[`stream.cork()`][] was called. + +When using `writable.cork()` and `writable.uncork()` to manage the buffering +of writes to a stream, it is recommended that calls to `writable.uncork()` be +deferred using `process.nextTick()`. Doing so allows batching of all +`writable.write()` calls that occur within a given Node.js event loop phase. + +```js +stream.cork(); +stream.write('some '); +stream.write('data '); +process.nextTick(() => stream.uncork()); +``` + +If the `writable.cork()` method is called multiple times on a stream, the same +number of calls to `writable.uncork()` must be called to flush the buffered +data. + +``` +stream.cork(); +stream.write('some '); +stream.cork(); +stream.write('data '); +process.nextTick(() => { + stream.uncork(); + // The data will not be flushed until uncork() is called a second time. + stream.uncork(); +}); +``` + +##### writable.write(chunk[, encoding][, callback]) + +* `chunk` {String|Buffer} The data to write +* `encoding` {String} The encoding, if `chunk` is a String +* `callback` {Function} Callback for when this chunk of data is flushed +* Returns: {Boolean} `false` if the stream wishes for the calling code to + wait for the `'drain'` event to be emitted before continuing to write + additional data; otherwise `true`. + +The `writable.write()` method writes some data to the stream, and calls the +supplied `callback` once the data has been fully handled. If an error +occurs, the `callback` *may or may not* be called with the error as its +first argument. To reliably detect write errors, add a listener for the +`'error'` event. -Note that, for backwards compatibility reasons, removing [`'data'`][] -event handlers will **not** automatically pause the stream. Also, if -there are piped destinations, then calling [`stream.pause()`][stream-pause] will -not guarantee that the stream will *remain* paused once those -destinations drain and ask for more data. +The return value indicates whether the written `chunk` was buffered internally +and the buffer has exceeded the `highWaterMark` configured when the stream was +created. If `false` is returned, further attempts to write data to the stream +should be paused until the `'drain'` event is emitted. -Examples of readable streams include: +A Writable stream in object mode will always ignore the `encoding` argument. + +### Readable Streams + +Readable streams are an abstraction for a *source* from which data is +consumed. + +Examples of Readable streams include: * [HTTP responses, on the client][http-incoming-message] * [HTTP requests, on the server][http-incoming-message] @@ -154,89 +428,190 @@ Examples of readable streams include: * [child process stdout and stderr][] * [`process.stdin`][] -#### Event: 'close' +All [Readable][] streams implement the interface defined by the +`stream.Readable` class. + +#### Two Modes + +Readable streams effectively operate in one of two modes: flowing and paused. + +When in flowing mode, data is read from the underlying system automatically +and provided to an application as quickly as possible using events via the +[`EventEmitter`][] interface. + +In paused mode, the [`stream.read()`][stream-read] method must be called +explicitly to read chunks of data from the stream. + +All [Readable][] streams begin in paused mode but can be switched to flowing +mode in one of the following ways: -Emitted when the stream and any of its underlying resources (a file -descriptor, for example) have been closed. The event indicates that -no more events will be emitted, and no further computation will occur. +* Adding a [`'data'`][] event handler. +* Calling the [`stream.resume()`][stream-resume] method. +* Calling the [`stream.pipe()`][] method to send the data to a [Writable][]. + +The Readable can switch back to paused mode using one of the following: + +* If there are no pipe destinations, by calling the + [`stream.pause()`][stream-pause] method. +* If there are pipe destinations, by removing any [`'data'`][] event + handlers, and removing all pipe destinations by calling the + [`stream.unpipe()`][] method. + +The important concept to remember is that a Readable will not generate data +until a mechanism for either consuming or ignoring that data is provided. If +the consuming mechanism is disabled or taken away, the Readable will *attempt* +to stop generating the data. + +*Note*: For backwards compatibility reasons, removing [`'data'`][] event +handlers will **not** automatically pause the stream. Also, if there are piped +destinations, then calling [`stream.pause()`][stream-pause] will not guarantee +that the stream will *remain* paused once those destinations drain and ask for +more data. + +*Note*: If a [Readable][] is switched into flowing mode and there are no +consumers available handle the data, that data will be lost. This can occur, +for instance, when the `readable.resume()` method is called without a listener +attached to the `'data'` event, or when a `'data'` event handler is removed +from the stream. + +#### Three States + +The "two modes" of operation for a Readable stream are a simplified abstraction +for the more complicated internal state management that is happening within the +Readable stream implementation. + +Specifically, at any given point in time, every Readable is in one of three +possible states: + +* `readable._readableState.flowing = null` +* `readable._readableState.flowing = false` +* `readable._readableState.flowing = true` + +When `readable._readableState.flowing` is `null`, no mechanism for consuming the +streams data is provided so the stream will not generate its data. -Not all streams will emit the `'close'` event as the `'close'` event is -optional. +Attaching a listener for the `'data'` event, calling the `readable.pipe()` +method, or calling the `readable.resume()` method will switch +`readable._readableState.flowing` to `true`, causing the Readable to begin +actively emitting events as data is generated. -#### Event: 'data' +Calling `readable.pause()`, `readable.unpipe()`, or receiving "back pressure" +will cause the `readable._readableState.flowing` to be set as `false`, +temporarily halting the flowing of events but *not* halting the generation of +data. -* `chunk` {Buffer|String} The chunk of data. +While `readable._readableState.flowing` is `false`, data may be accumulating +within the streams internal buffer. -Attaching a `'data'` event listener to a stream that has not been -explicitly paused will switch the stream into flowing mode. Data will -then be passed as soon as it is available. +#### Choose One -If you just want to get all the data out of the stream as fast as -possible, this is the best way to do so. +The Readable stream API evolved across multiple Node.js versions and provides +multiple methods of consuming stream data. In general, developers should choose +*one* of the methods of consuming data and *should never* use multiple methods +to consume data from a single stream. + +Use of the `readable.pipe()` method is recommended for most users as it has been +implemented to provide the easiest way of consuming stream data. Developers that +require more fine-grained control over the transfer and generation of data can +use the [`EventEmitter`][] and `readable.pause()`/`readable.resume()` APIs. + +#### Class: stream.Readable + + + +##### Event: 'close' + +The `'close'` event is emitted when the stream and any of its underlying +resources (a file descriptor, for example) have been closed. The event indicates +that no more events will be emitted, and no further computation will occur. + +Not all [Readable][] streams will emit the `'close'` event. + +##### Event: 'data' + +* `chunk` {Buffer|String|any} The chunk of data. For streams that are not + operating in object mode, the chunk will be either a string or `Buffer`. + For streams that are in object mode, the chunk can be any JavaScript value + other than `null`. + +The `'data'` event is emitted whenever the stream is relinquishing ownership of +a chunk of data to a consumer. This may occur whenever the stream is switched +in flowing mode by calling `readable.pipe()`, `readable.resume()`, or by +attaching a listener callback to the `'data'` event. The `'data'` event will +also be emitted whenever the `readable.read()` method is called and a chunk of +data is available to be returned. + +Attaching a `'data'` event listener to a stream that has not been explicitly +paused will switch the stream into flowing mode. Data will then be passed as +soon as it is available. + +The listener callback will be passed the chunk of data as a string if a default +encoding has been specified for the stream using the +`readable.setEncoding()` method; otherwise the data will be passed as a +`Buffer`. ```js -var readable = getReadableStreamSomehow(); +const readable = getReadableStreamSomehow(); readable.on('data', (chunk) => { - console.log('got %d bytes of data', chunk.length); + console.log(`Received ${chunk.length} bytes of data.`); }); ``` -#### Event: 'end' +##### Event: 'end' -This event fires when there will be no more data to read. +The `'end'` event is emitted when there is no more data to be consumed from +the stream. -Note that the `'end'` event **will not fire** unless the data is -completely consumed. This can be done by switching into flowing mode, -or by calling [`stream.read()`][stream-read] repeatedly until you get to the -end. +*Note*: The `'end'` event **will not be emitted** unless the data is +completely consumed. This can be accomplished by switching the stream into +flowing mode, or by calling [`stream.read()`][stream-read] repeatedly until +all data has been consumed. ```js -var readable = getReadableStreamSomehow(); +const readable = getReadableStreamSomehow(); readable.on('data', (chunk) => { - console.log('got %d bytes of data', chunk.length); + console.log(`Received ${chunk.length} bytes of data.`); }); readable.on('end', () => { - console.log('there will be no more data.'); + console.log('There will be no more data.'); }); ``` -#### Event: 'error' +##### Event: 'error' * {Error} -Emitted if there was an error receiving data. +The `'error'` event may be emitted by a Readable implementation at any time. +Typically, this may occur if the underlying stream in unable to generate data +due to an underlying internal failure, or when a stream implementation attempts +to push an invalid chunk of data. -#### Event: 'readable' +The listener callback will be passed a single `Error` object. -When a chunk of data can be read from the stream, it will emit a -`'readable'` event. +##### Event: 'readable' -In some cases, listening for a `'readable'` event will cause some data -to be read into the internal buffer from the underlying system, if it -hadn't already. +The `'readable'` event is emitted when there is data available to be read from +the stream. In some cases, attaching a listener for the `'readable'` event will +cause some amount of data to be read into an internal buffer. ```javascript -var readable = getReadableStreamSomehow(); +const readable = getReadableStreamSomehow(); readable.on('readable', () => { // there is some data to read now }); ``` +The `'readable'` event will also be emitted once the end of the stream data +has been reached but before the `'end'` event is emitted. -Once the internal buffer is drained, a `'readable'` event will fire -again when more data is available. - -The `'readable'` event is not emitted in the "flowing" mode with the -sole exception of the last one, on end-of-stream. - -The `'readable'` event indicates that the stream has new information: -either new data is available or the end of the stream has been reached. -In the former case, [`stream.read()`][stream-read] will return that data. In the -latter case, [`stream.read()`][stream-read] will return null. For instance, in -the following example, `foo.txt` is an empty file: +Effectively, the `'readable'` event indicates that the stream has new +information: either new data is available or the end of the stream has been +reached. In the former case, [`stream.read()`][stream-read] will return the +available data. In the latter case, [`stream.read()`][stream-read] will return +`null`. For instance, in the following example, `foo.txt` is an empty file: ```js const fs = require('fs'); -var rr = fs.createReadStream('foo.txt'); +const rr = fs.createReadStream('foo.txt'); rr.on('readable', () => { console.log('readable:', rr.read()); }); @@ -253,16 +628,20 @@ readable: null end ``` -#### readable.isPaused() +*Note*: In general, the `readable.pipe()` and `'data'` event mechanisms are +preferred over the use of the `'readable'` event. + +##### readable.isPaused() * Return: {Boolean} -This method returns whether or not the `readable` has been **explicitly** -paused by client code (using [`stream.pause()`][stream-pause] without a -corresponding [`stream.resume()`][stream-resume]). +The `readable.isPaused()` method returns the current operating state of the +Readable. This is used primarily by the mechanism that underlies the +`readable.pipe()` method. In most typical cases, there will be no reason to +use this method directly. ```js -var readable = new stream.Readable +const readable = new stream.Readable readable.isPaused() // === false readable.pause() @@ -271,68 +650,65 @@ readable.resume() readable.isPaused() // === false ``` -#### readable.pause() +##### readable.pause() * Return: `this` -This method will cause a stream in flowing mode to stop emitting -[`'data'`][] events, switching out of flowing mode. Any data that becomes -available will remain in the internal buffer. +The `readable.pause()` method will cause a stream in flowing mode to stop +emitting [`'data'`][] events, switching out of flowing mode. Any data that +becomes available will remain in the internal buffer. ```js -var readable = getReadableStreamSomehow(); +const readable = getReadableStreamSomehow(); readable.on('data', (chunk) => { - console.log('got %d bytes of data', chunk.length); + console.log(`Received ${chunk.length} bytes of data.`); readable.pause(); - console.log('there will be no more data for 1 second'); + console.log('There will be no additional data for 1 second.'); setTimeout(() => { - console.log('now data will start flowing again'); + console.log('Now data will start flowing again.'); readable.resume(); }, 1000); }); ``` -#### readable.pipe(destination[, options]) +##### readable.pipe(destination[, options]) * `destination` {stream.Writable} The destination for writing data * `options` {Object} Pipe options - * `end` {Boolean} End the writer when the reader ends. Default = `true` + * `end` {Boolean} End the writer when the reader ends. Defaults to `true`. -This method pulls all the data out of a readable stream, and writes it -to the supplied destination, automatically managing the flow so that -the destination is not overwhelmed by a fast readable stream. +The `readable.pipe()` method attaches a [Writable][] stream to the `readable`, +causing it to switch automatically into flowing mode and push all of its data +to the attached [Writable][]. The flow of data will be automatically managed so +that the destination Writable stream is not overwhelmed by a faster Readable +stream. -Multiple destinations can be piped to safely. +The following example pipes all of the data from the `readable` into a file +named `file.txt`: ```js -var readable = getReadableStreamSomehow(); -var writable = fs.createWriteStream('file.txt'); +const readable = getReadableStreamSomehow(); +const writable = fs.createWriteStream('file.txt'); // All the data from readable goes into 'file.txt' readable.pipe(writable); ``` +It is possible to attach multiple Writable streams to a single Readable stream. -This function returns the destination stream, so you can set up pipe -chains like so: +The `readable.pipe()` method returns a reference to the *destination* stream +making it possible to set up chains of piped streams: ```js -var r = fs.createReadStream('file.txt'); -var z = zlib.createGzip(); -var w = fs.createWriteStream('file.txt.gz'); +const r = fs.createReadStream('file.txt'); +const z = zlib.createGzip(); +const w = fs.createWriteStream('file.txt.gz'); r.pipe(z).pipe(w); ``` -For example, emulating the Unix `cat` command: - -```js -process.stdin.pipe(process.stdout); -``` - -By default [`stream.end()`][stream-end] is called on the destination when the -source stream emits [`'end'`][], so that `destination` is no longer writable. -Pass `{ end: false }` as `options` to keep the destination stream open. - -This keeps `writer` open so that "Goodbye" can be written at the -end. +By default, [`stream.end()`][stream-end] is called on the destination Writable +stream when the source Readable stream emits [`'end'`][], so that the +destination is no longer writable. To disable this default behavior, the `end` +option can be passed as `false`, causing the destination stream to remain open, +as illustrated in the following example: ```js reader.pipe(writer, { end: false }); @@ -341,88 +717,106 @@ reader.on('end', () => { }); ``` -Note that [`process.stderr`][] and [`process.stdout`][] are never closed until -the process exits, regardless of the specified options. +One important caveat is that if the Readable stream emits an error during +processing, the Writable destination *is not closed* automatically. If an +error occurs, it will be necessary to *manually* close each stream in order +to prevent memory leaks. -#### readable.read([size]) +*Note*: The [`process.stderr`][] and [`process.stdout`][] Writable streams are +never closed until the Node.js process exits, regardless of the specified +options. + +##### readable.read([size]) * `size` {Number} Optional argument to specify how much data to read. * Return {String|Buffer|Null} -The `read()` method pulls some data out of the internal buffer and -returns it. If there is no data available, then it will return -`null`. +The `readable.read()` method pulls some data out of the internal buffer and +returns it. If no data available to be read, `null` is returned. By default, +the data will be returned as a `Buffer` object unless an encoding has been +specified using the `readable.setEncoding()` method or the stream is operating +in object mode. -If you pass in a `size` argument, then it will return that many -bytes. If `size` bytes are not available, then it will return `null`, -unless we've ended, in which case it will return the data remaining -in the buffer. +The optional `size` argument specifies a specific number of bytes to read. If +`size` bytes are not available to be read, `null` will be returned *unless* +the stream has ended, in which case all of the data remaining in the internal +buffer will be returned (*even if it exceeds `size` bytes*). -If you do not specify a `size` argument, then it will return all the -data in the internal buffer. +If the `size` argument is not specified, all of the data contained in the +internal buffer will be returned. -This method should only be called in paused mode. In flowing mode, -this method is called automatically until the internal buffer is -drained. +The `readable.read()` method should only be called on Readable streams operating +in paused mode. In flowing mode, `readable.read()` is called automatically until +the internal buffer is fully drained. ```js -var readable = getReadableStreamSomehow(); +const readable = getReadableStreamSomehow(); readable.on('readable', () => { var chunk; while (null !== (chunk = readable.read())) { - console.log('got %d bytes of data', chunk.length); + console.log(`Received ${chunk.length} bytes of data.`); } }); ``` -If this method returns a data chunk, then it will also trigger the -emission of a [`'data'`][] event. +In general, it is recommended that developers avoid the use of the `'readable'` +event and the `readable.read()` method in favor of using either +`readable.pipe()` or the `'data'` event. + +A Readable stream in object mode will always return a single item from +a call to [`readable.read(size)`][stream-read], regardless of the value of the +`size` argument. + +*Note:* If the `readable.read()` method returns a chunk of data, a `'data'` +event will also be emitted. -Note that calling [`stream.read([size])`][stream-read] after the [`'end'`][] -event has been triggered will return `null`. No runtime error will be raised. +*Note*: Calling [`stream.read([size])`][stream-read] after the [`'end'`][] +event has been emitted will return `null`. No runtime error will be raised. -#### readable.resume() +##### readable.resume() * Return: `this` -This method will cause the readable stream to resume emitting [`'data'`][] -events. +The `readable.resume()` method causes an explicitly paused Readable stream to +resume emitting [`'data'`][] events, switching the stream into flowing mode. -This method will switch the stream into flowing mode. If you do *not* -want to consume the data from a stream, but you *do* want to get to -its [`'end'`][] event, you can call [`stream.resume()`][stream-resume] to open -the flow of data. +The `readable.resume()` method can be used to fully consume the data from a +stream without actually processing any of that data as illustrated in the +following example: ```js -var readable = getReadableStreamSomehow(); -readable.resume(); -readable.on('end', () => { - console.log('got to the end, but did not read anything'); -}); +getReadableStreamSomehow() + .resume(); + .on('end', () => { + console.log('Reached the end, but did not read anything.'); + }); ``` -#### readable.setEncoding(encoding) +##### readable.setEncoding(encoding) * `encoding` {String} The encoding to use. * Return: `this` -Call this function to cause the stream to return strings of the specified -encoding instead of Buffer objects. For example, if you do -`readable.setEncoding('utf8')`, then the output data will be interpreted as -UTF-8 data, and returned as strings. If you do `readable.setEncoding('hex')`, -then the data will be encoded in hexadecimal string format. +The `readable.setEncoding()` method sets the default character encoding for +data read from the Readable stream. + +Setting an encoding causes the stream data +to be returned as string of the specified encoding rather than as `Buffer` +objects. For instance, calling `readable.setEncoding('utf8')` will cause the +output data will be interpreted as UTF-8 data, and passed as strings. Calling +`readable.setEncoding('hex')` will cause the data to be encoded in hexadecimal +string format. -This properly handles multi-byte characters that would otherwise be -potentially mangled if you simply pulled the Buffers directly and -called [`buf.toString(encoding)`][] on them. If you want to read the data -as strings, always use this method. +The Readable stream will properly handle multi-byte characters delivered through +the stream that would otherwise become improperly decoded if simply pulled from +the stream as `Buffer` objects. -Also you can disable any encoding at all with `readable.setEncoding(null)`. -This approach is very useful if you deal with binary data or with large -multi-byte strings spread out over multiple chunks. +Encoding can be disabled by calling `readable.setEncoding(null)`. This approach +is useful when working with binary data or with large multi-byte strings spread +out over multiple chunks. ```js -var readable = getReadableStreamSomehow(); +const readable = getReadableStreamSomehow(); readable.setEncoding('utf8'); readable.on('data', (chunk) => { assert.equal(typeof chunk, 'string'); @@ -430,47 +824,47 @@ readable.on('data', (chunk) => { }); ``` -#### readable.unpipe([destination]) +##### readable.unpipe([destination]) * `destination` {stream.Writable} Optional specific stream to unpipe -This method will remove the hooks set up for a previous [`stream.pipe()`][] -call. +The `readable.unpipe()` method detaches a Writable stream previously attached +using the [`stream.pipe()`][] method. -If the destination is not specified, then all pipes are removed. +If the `destination` is not specified, then *all* pipes are detached. -If the destination is specified, but no pipe is set up for it, then -this is a no-op. +If the `destination` is specified, but no pipe is set up for it, then +the method does nothing. ```js -var readable = getReadableStreamSomehow(); -var writable = fs.createWriteStream('file.txt'); +const readable = getReadableStreamSomehow(); +const writable = fs.createWriteStream('file.txt'); // All the data from readable goes into 'file.txt', // but only for the first second readable.pipe(writable); setTimeout(() => { - console.log('stop writing to file.txt'); + console.log('Stop writing to file.txt'); readable.unpipe(writable); - console.log('manually close the file stream'); + console.log('Manually close the file stream'); writable.end(); }, 1000); ``` -#### readable.unshift(chunk) +##### readable.unshift(chunk) * `chunk` {Buffer|String} Chunk of data to unshift onto the read queue -This is useful in certain cases where a stream is being consumed by a -parser, which needs to "un-consume" some data that it has -optimistically pulled out of the source, so that the stream can be -passed on to some other party. +The `readable.unshift()` method pushes a chunk of data back into the internal +buffer. This is useful in certain situations where a stream is being consumed by +code that needs to "un-consume" some amount of data that it has optimistically +pulled out of the source, so that the data can be passed on to some other party. -Note that `stream.unshift(chunk)` cannot be called after the [`'end'`][] event -has been triggered; a runtime error will be raised. +*Note*: The `stream.unshift(chunk)` method cannot be called after the +[`'end'`][] event has been emitted or a runtime error will be thrown. -If you find that you must often call `stream.unshift(chunk)` in your -programs, consider implementing a [Transform][] stream instead. (See [API -for Stream Implementors][].) +Developers using `stream.unshift()` often should consider switching to +use of a [Transform][] stream instead. See the [API for Stream Implemeters][] +section for more information. ```js // Pull off a header delimited by \n\n @@ -480,7 +874,7 @@ const StringDecoder = require('string_decoder').StringDecoder; function parseHeader(stream, callback) { stream.on('error', callback); stream.on('readable', onReadable); - var decoder = new StringDecoder('utf8'); + const decoder = new StringDecoder('utf8'); var header = ''; function onReadable() { var chunk; @@ -490,8 +884,8 @@ function parseHeader(stream, callback) { // found the header boundary var split = str.split(/\n\n/); header += split.shift(); - var remaining = split.join('\n\n'); - var buf = Buffer.from(remaining, 'utf8'); + const remaining = split.join('\n\n'); + const buf = Buffer.from(remaining, 'utf8'); if (buf.length) stream.unshift(buf); stream.removeListener('error', callback); @@ -507,30 +901,31 @@ function parseHeader(stream, callback) { } ``` -Note that, unlike [`stream.push(chunk)`][stream-push], `stream.unshift(chunk)` +*Note*: Unlike [`stream.push(chunk)`][stream-push], `stream.unshift(chunk)` will not end the reading process by resetting the internal reading state of the -stream. This can cause unexpected results if `unshift()` is called during a -read (i.e. from within a [`stream._read()`][stream-_read] implementation on a -custom stream). Following the call to `unshift()` with an immediate -[`stream.push('')`][stream-push] will reset the reading state appropriately, -however it is best to simply avoid calling `unshift()` while in the process of -performing a read. +stream. This can cause unexpected results if `readable.unshift()` is called +during a read (i.e. from within a [`stream._read()`][stream-_read] +implementation on a custom stream). Following the call to `readable.unshift()` +with an immediate [`stream.push('')`][stream-push] will reset the reading state +appropriately, however it is best to simply avoid calling `readable.unshift()` +while in the process of performing a read. -#### readable.wrap(stream) +##### readable.wrap(stream) * `stream` {Stream} An "old style" readable stream Versions of Node.js prior to v0.10 had streams that did not implement the -entire Streams API as it is today. (See [Compatibility][] for -more information.) +entire `stream` module API as it is currently defined. (See [Compatibility][] +for more information.) -If you are using an older Node.js library that emits [`'data'`][] events and -has a [`stream.pause()`][stream-pause] method that is advisory only, then you -can use the `wrap()` method to create a [Readable][] stream that uses the old -stream as its data source. +When using an older Node.js library that emits [`'data'`][] events and has a +[`stream.pause()`][stream-pause] method that is advisory only, the +`readable.wrap()` method can be used to create a [Readable][] stream that uses +the old stream as its data source. -You will very rarely ever need to call this function, but it exists -as a convenience for interacting with old Node.js programs and libraries. +It will rarely be necessary to use `readable.wrap()` but the method has been +provided as a convenience for interacting with older Node.js applications and +libraries. For example: @@ -545,210 +940,59 @@ myReader.on('readable', () => { }); ``` -### Class: stream.Transform +### Duplex and Transform Streams -Transform streams are [Duplex][] streams where the output is in some way -computed from the input. They implement both the [Readable][] and +#### Class: stream.Duplex + + + +Duplex streams are streams that implement both the [Readable][] and [Writable][] interfaces. -Examples of Transform streams include: +Examples of Duplex streams include: +* [TCP sockets][] * [zlib streams][zlib] * [crypto streams][crypto] -### Class: stream.Writable +#### Class: stream.Transform -The Writable stream interface is an abstraction for a *destination* -that you are writing data *to*. +Transform streams are [Duplex][] streams where the output is in some way +related to the input. Like all [Duplex][] streams, Transform streams +implement both the [Readable][] and [Writable][] interfaces. -Examples of writable streams include: +Examples of Transform streams include: -* [HTTP requests, on the client][] -* [HTTP responses, on the server][] -* [fs write streams][] * [zlib streams][zlib] * [crypto streams][crypto] -* [TCP sockets][] -* [child process stdin][] -* [`process.stdout`][], [`process.stderr`][] -#### Event: 'close' -Emitted when the stream and any of its underlying resources (a file descriptor, -for example) have been closed. The event indicates that no more events will be -emitted, and no further computation will occur. +## API for Stream Implemeters -Not all streams will emit the `'close'` event as the `'close'` event is -optional. + -#### Event: 'drain' +The `stream` module API has been designed to make it possible to easily +implement streams using JavaScript's prototypical inheritance model. -If a [`stream.write(chunk)`][stream-write] call returns `false`, then the -`'drain'` event will indicate when it is appropriate to begin writing more data -to the stream. +First, a stream developer would declare a new JavaScript class that extends one +of the four basic stream classes (`stream.Writable`, `stream.Readable`, +`stream.Duplex`, or `stream.Transform`), making sure the call the appropriate +parent class constructor: ```js -// Write the data to the supplied writable stream one million times. -// Be attentive to back-pressure. -function writeOneMillionTimes(writer, data, encoding, callback) { - var i = 1000000; - write(); - function write() { - var ok = true; - do { - i -= 1; - if (i === 0) { - // last time! - writer.write(data, encoding, callback); - } else { - // see if we should continue, or wait - // don't pass the callback, because we're not done yet. - ok = writer.write(data, encoding); - } - } while (i > 0 && ok); - if (i > 0) { - // had to stop early! - // write some more once it drains - writer.once('drain', write); - } +const Writable = require('stream').Writable; + +class MyWritable extends Writable { + constructor(options) { + super(options); } } ``` -#### Event: 'error' - -* {Error} - -Emitted if there was an error when writing or piping data. - -#### Event: 'finish' - -When the [`stream.end()`][stream-end] method has been called, and all data has -been flushed to the underlying system, this event is emitted. - -```javascript -var writer = getWritableStreamSomehow(); -for (var i = 0; i < 100; i ++) { - writer.write('hello, #${i}!\n'); -} -writer.end('this is the end\n'); -writer.on('finish', () => { - console.error('all writes are now complete.'); -}); -``` - -#### Event: 'pipe' - -* `src` {stream.Readable} source stream that is piping to this writable - -This is emitted whenever the [`stream.pipe()`][] method is called on a readable -stream, adding this writable to its set of destinations. - -```js -var writer = getWritableStreamSomehow(); -var reader = getReadableStreamSomehow(); -writer.on('pipe', (src) => { - console.error('something is piping into the writer'); - assert.equal(src, reader); -}); -reader.pipe(writer); -``` - -#### Event: 'unpipe' - -* `src` {[Readable][] Stream} The source stream that - [unpiped][`stream.unpipe()`] this writable - -This is emitted whenever the [`stream.unpipe()`][] method is called on a -readable stream, removing this writable from its set of destinations. - -```js -var writer = getWritableStreamSomehow(); -var reader = getReadableStreamSomehow(); -writer.on('unpipe', (src) => { - console.error('something has stopped piping into the writer'); - assert.equal(src, reader); -}); -reader.pipe(writer); -reader.unpipe(writer); -``` - -#### writable.cork() - -Forces buffering of all writes. - -Buffered data will be flushed either at [`stream.uncork()`][] or at -[`stream.end()`][stream-end] call. - -#### writable.end([chunk][, encoding][, callback]) - -* `chunk` {String|Buffer} Optional data to write -* `encoding` {String} The encoding, if `chunk` is a String -* `callback` {Function} Optional callback for when the stream is finished - -Call this method when no more data will be written to the stream. If supplied, -the callback is attached as a listener on the [`'finish'`][] event. - -Calling [`stream.write()`][stream-write] after calling -[`stream.end()`][stream-end] will raise an error. - -```js -// write 'hello, ' and then end with 'world!' -var file = fs.createWriteStream('example.txt'); -file.write('hello, '); -file.end('world!'); -// writing more now is not allowed! -``` - -#### writable.setDefaultEncoding(encoding) - -* `encoding` {String} The new default encoding -* Return: `this` - -Sets the default encoding for a writable stream. - -#### writable.uncork() - -Flush all data, buffered since [`stream.cork()`][] call. - -#### writable.write(chunk[, encoding][, callback]) - -* `chunk` {String|Buffer} The data to write -* `encoding` {String} The encoding, if `chunk` is a String -* `callback` {Function} Callback for when this chunk of data is flushed -* Returns: {Boolean} `true` if the data was handled completely. - -This method writes some data to the underlying system, and calls the -supplied callback once the data has been fully handled. If an error -occurs, the callback may or may not be called with the error as its -first argument. To detect write errors, listen for the `'error'` event. - -The return value indicates if you should continue writing right now. -If the data had to be buffered internally, then it will return -`false`. Otherwise, it will return `true`. - -This return value is strictly advisory. You MAY continue to write, -even if it returns `false`. However, writes will be buffered in -memory, so it is best not to do this excessively. Instead, wait for -the [`'drain'`][] event before writing more data. - - -## API for Stream Implementors - - - -To implement any sort of stream, the pattern is the same: - -1. Extend the appropriate parent class in your own subclass via the `extends` - keyword. -2. Call the appropriate parent class constructor in your constructor, - to be sure that the internal mechanisms are set up properly. -3. Implement one or more specific methods, as detailed below. - -The class to extend and the method(s) to implement depend on the sort -of stream class you are writing: +The new stream class must then implement one or more specific methods, depending +on the type of stream being created, as detailed in the chart below: @@ -810,156 +1054,347 @@ of stream class you are writing:
-In your implementation code, it is very important to never call the methods -described in [API for Stream Consumers][]. Otherwise, you can potentially cause -adverse side effects in programs that consume your streaming interfaces. +*Note*: The implementation code for a stream should *never* call the "public" +methods of a stream that are intended for use by consumers (as described in +the [API for Stream Consumers][] section). Doing so may lead to adverse +side effects in application code consuming the stream. -### Class: stream.Duplex +### Simplified Construction - +For many simple cases, it is possible to construct a stream without relying on +inheritance. This can be accomplished by directly creating instances of the +`stream.Writable`, `stream.Readable`, `stream.Duplex` or `stream.Transform` +objects and passing appropriate methods as constructor options. -A "duplex" stream is one that is both Readable and Writable, such as a TCP -socket connection. +For example: -Note that `stream.Duplex` is an abstract class designed to be extended -with an underlying implementation of the [`stream._read(size)`][stream-_read] -and [`stream._write(chunk, encoding, callback)`][stream-_write] methods as you -would with a Readable or Writable stream class. +```js +const Writable = require('stream').Writable; -Since JavaScript doesn't have multiple prototypal inheritance, this class -prototypally inherits from Readable, and then parasitically from Writable. It is -thus up to the user to implement both the low-level -[`stream._read(n)`][stream-_read] method as well as the low-level -[`stream._write(chunk, encoding, callback)`][stream-_write] method on extension -duplex classes. +const myWritable = new Writable({ + write(chunk, encoding, callback) { + // ... + } +}); +``` -#### new stream.Duplex(options) +### Implementing a Writable Stream -* `options` {Object} Passed to both Writable and Readable - constructors. Also has the following fields: - * `allowHalfOpen` {Boolean} Default = `true`. If set to `false`, then - the stream will automatically end the readable side when the - writable side ends and vice versa. - * `readableObjectMode` {Boolean} Default = `false`. Sets `objectMode` - for readable side of the stream. Has no effect if `objectMode` - is `true`. - * `writableObjectMode` {Boolean} Default = `false`. Sets `objectMode` - for writable side of the stream. Has no effect if `objectMode` - is `true`. +The `stream.Writable` class is extended to implement a [Writable][] stream. -In classes that extend the Duplex class, make sure to call the -constructor so that the buffering settings can be properly -initialized. +Custom Writable streams *must* call the `new stream.Writable([options])` +constructor and implement the `writable.\_write()` method. The +`writable.\_writev()` method *may* also be implemented. -### Class: stream.PassThrough +#### Constructor: new stream.Writable([options]) -This is a trivial implementation of a [Transform][] stream that simply -passes the input bytes across to the output. Its purpose is mainly -for examples and testing, but there are occasionally use cases where -it can come in handy as a building block for novel sorts of streams. +* `options` {Object} + * `highWaterMark` {Number} Buffer level when + [`stream.write()`][stream-write] starts returning `false`. Defaults to + `16384` (16kb), or `16` for `objectMode` streams. + * `decodeStrings` {Boolean} Whether or not to decode strings into + Buffers before passing them to [`stream._write()`][stream-_write]. + Defaults to `true` + * `objectMode` {Boolean} Whether or not the + [`stream.write(anyObj)`][stream-write] is a valid operation. When set, + it becomes possible to write JavaScript values other than string or + `Buffer` if supported by the stream implementation. Defaults to `false` + * `write` {Function} Implementation for the + [`stream._write()`][stream-_write] method. + * `writev` {Function} Implementation for the + [`stream._writev()`][stream-_writev] method. -### Class: stream.Readable +For example: - +```js +const Writable = require('stream').Writable; + +class MyWritable extends Writable { + constructor(options) { + // Calls the stream.Writable() constructor + super(options); + } +} +``` + +Or, when using pre-ES6 style constructors: + +```js +const Writable = require('stream').Writable; +const util = require('util'); + +function MyWritable(options) { + if (!(this instanceof MyWritable)) + return new MyWritable(options); + Writable.call(this, options); +} +util.inherits(MyWritable, Writable); +``` + +Or, using the Simplified Constructor approach: + +```js +const Writable = require('stream').Writable; + +const myWritable = new Writable({ + write(chunk, encoding, callback) { + // ... + }, + writev(chunks, callback) { + // ... + } +}); +``` + +#### writable.\_write(chunk, encoding, callback) + +* `chunk` {Buffer|String} The chunk to be written. Will **always** + be a buffer unless the `decodeStrings` option was set to `false`. +* `encoding` {String} If the chunk is a string, then `encoding` is the + character encoding of that string. If chunk is a `Buffer`, or if the + stream is operating in object mode, `encoding` may be ignored. +* `callback` {Function} Call this function (optionally with an error + argument) when processing is complete for the supplied chunk. + +All Writable stream implementations must provide a +[`writable._write()`][stream-_write] method to send data to the underlying +resource. + +*Note*: [Transform][] streams provide their own implementation of the +[`writable._write()`]. + +*Note*: **This function MUST NOT be called by application code directly.** It +should be implemented by child classes, and called only by the internal Writable +class methods only. + +The `callback` method must be called to signal either that the write completed +successfully or failed with an error. The first argument passed to the +`callback` must be the `Error` object if the call failed or `null` if the +write succeeded. + +It is important to note that all calls to `writable.write()` that occur between +the time `writable.\_write()` is called and the `callback` is called will cause +the written data to be buffered. Once the `callback` is invoked, the stream will +emit a `'drain'` event. If a stream implementation is capable of processing +multiple chunks of data at once, the `writable.\_writev()` method should be +implemented. + +If the `decodeStrings` property is set in the constructor options, then +`chunk` may be a string rather than a Buffer, and `encoding` will +indicate the character encoding of the string. This is to support +implementations that have an optimized handling for certain string +data encodings. If the `decodeStrings` property is explicitly set to `false`, +the `encoding` argument can be safely ignored, and `chunk` will always be a +`Buffer`. + +The `writable.\_write()` method is prefixed with an underscore because it is +internal to the class that defines it, and should never be called directly by +user programs. + +#### writable.\_writev(chunks, callback) + +* `chunks` {Array} The chunks to be written. Each chunk has following + format: `{ chunk: ..., encoding: ... }`. +* `callback` {Function} A callback function (optionally with an error + argument) to be invoked when processing is complete for the supplied chunks. + +*Note*: **This function MUST NOT be called by application code directly.** It +should be implemented by child classes, and called only by the internal Writable +class methods only. + +The `writable.\_writev()` method may be implemented in addition to +`writable.\_write()` in stream implementations that are capable of processing +multiple chunks of data at once. If implemented, the method will be called with +all chunks of data currently buffered in the write queue. + +The `writable.\_writev()` method is prefixed with an underscore because it is +internal to the class that defines it, and should never be called directly by +user programs. + +#### Errors While Writing + +It is recommended that errors occurring during the processing of the +`writable.\_write()` and `writable.\_writev()` methods are reported by invoking +the callback and passing the error as the first argument. This will cause an +`'error'` event to be emitted by the Writable. Throwing an Error from within +`writable.\_write()` can result in expected and inconsistent behavior depending +on how the stream is being used. Using the callback ensures consistent and +predictable handling of errors. + +```js +const Writable = require('stream').Writable; + +const myWritable = new Writable({ + write(chunk, encoding, callback) { + if (chunk.toString().indexOf('a') >= 0) { + callback(new Error('chunk is invalid')) + } else { + callback() + } + } +}); +``` + +#### An Example Writable Stream + +The following illustrates a rather simplistic (and somewhat pointless) custom +Writable stream implementation. While this specific Writable stream instance +is not of any real particular usefulness, the example illustrates each of the +required elements of a custom [Writable][] stream instance: + +```js +const Writable = require('stream').Writable; + +class MyWritable extends Writable { + constructor(options) { + super(options); + } + + _write(chunk, encoding, callback) { + if (chunk.toString().indexOf('a') >= 0) { + callback(new Error('chunk is invalid')) + } else { + callback() + } + } +} +``` + +### Implementing a Readable Stream -`stream.Readable` is an abstract class designed to be extended with an -underlying implementation of the [`stream._read(size)`][stream-_read] method. +The `stream.Readable` class is extended to implement a [Readable][] stream. -Please see [API for Stream Consumers][] for how to consume -streams in your programs. What follows is an explanation of how to -implement Readable streams in your programs. +Custom Readable streams *must* call the `new stream.Readable([options])` +constructor and implement the `readable.\_read()` method. #### new stream.Readable([options]) * `options` {Object} * `highWaterMark` {Number} The maximum number of bytes to store in the internal buffer before ceasing to read from the underlying - resource. Default = `16384` (16kb), or `16` for `objectMode` streams + resource. Defaults to `16384` (16kb), or `16` for `objectMode` streams * `encoding` {String} If specified, then buffers will be decoded to - strings using the specified encoding. Default = `null` + strings using the specified encoding. Defaults to `null` * `objectMode` {Boolean} Whether this stream should behave as a stream of objects. Meaning that [`stream.read(n)`][stream-read] returns - a single value instead of a Buffer of size n. Default = `false` + a single value instead of a Buffer of size n. Defaults to `false` * `read` {Function} Implementation for the [`stream._read()`][stream-_read] method. -In classes that extend the Readable class, make sure to call the -Readable constructor so that the buffering settings can be properly -initialized. +For example: + +```js +const Readable = require('stream').Readable; + +class MyReadable extends Readable { + constructor(options) { + // Calls the stream.Readable(options) constructor + super(options); + } +} +``` + +Or, when using pre-ES6 style constructors: + +```js +const Readable = require('stream').Readable; +const util = require('util'); + +function MyReadable(options) { + if (!(this instanceof MyReadable)) + return new MyReadable(options); + Readable.call(this, options); +} +util.inherits(MyReadable, Readable); +``` + +Or, using the Simplified Constructor approach: + +```js +const Readable = require('stream').Readable; + +const myReadable = new Readable({ + read(size) { + // ... + } +}); +``` #### readable.\_read(size) * `size` {Number} Number of bytes to read asynchronously -Note: **Implement this method, but do NOT call it directly.** - -This method is prefixed with an underscore because it is internal to the -class that defines it and should only be called by the internal Readable -class methods. All Readable stream implementations must provide a \_read -method to fetch data from the underlying resource. - -When `_read()` is called, if data is available from the resource, the `_read()` -implementation should start pushing that data into the read queue by calling -[`this.push(dataChunk)`][stream-push]. `_read()` should continue reading from -the resource and pushing data until push returns `false`, at which point it -should stop reading from the resource. Only when `_read()` is called again after -it has stopped should it start reading more data from the resource and pushing -that data onto the queue. - -Note: once the `_read()` method is called, it will not be called again until -the [`stream.push()`][stream-push] method is called. - -The `size` argument is advisory. Implementations where a "read" is a -single call that returns data can use this to know how much data to -fetch. Implementations where that is not relevant, such as TCP or -TLS, may ignore this argument, and simply provide data whenever it -becomes available. There is no need, for example to "wait" until +*Note*: **This function MUST NOT be called by application code directly.** It +should be implemented by child classes, and called only by the internal Readable +class methods only. + +All Readable stream implementations must provide an implementation of the +`readable.\_read()` method to fetch data from the underlying resource. + +When `readable._read()` is called, if data is available from the resource, the +implementation should begin pushing that data into the read queue using the +[`this.push(dataChunk)`][stream-push] method. `_read()` should continue reading +from the resource and pushing data until `readable.push()` returns `false`. Only +when `_read()` is called again after it has stopped should it resume pushing +additional data onto the queue. + +*Note*: Once the `readable._read()` method has been called, it will not be +called again until the [`readable.push()`][stream-push] method is called. + +The `size` argument is advisory. For implementations where a "read" is a +single operation that returns data can use the `size` argument to determine how +much data to fetch. Other implementations may ignore this argument and simply +provide data whenever it becomes available. There is no need to "wait" until `size` bytes are available before calling [`stream.push(chunk)`][stream-push]. -#### readable.push(chunk[, encoding]) +The `readable.\_read()` method is prefixed with an underscore because it is +internal to the class that defines it, and should never be called directly by +user programs. +#### readable.push(chunk[, encoding]) * `chunk` {Buffer|Null|String} Chunk of data to push into the read queue * `encoding` {String} Encoding of String chunks. Must be a valid Buffer encoding, such as `'utf8'` or `'ascii'` -* return {Boolean} Whether or not more pushes should be performed +* Returns {Boolean} `true` if additional chunks of data may continued to be + pushed; `false` otherwise. -Note: **This method should be called by Readable implementors, NOT -by consumers of Readable streams.** +When `chunk` is a `Buffer` or `string`, the `chunk` of data will be added to the +internal queue for users of the stream to consume. Passing `chunk` as `null` +signals the end of the stream (EOF), after which no more data can be written. -If a value other than null is passed, The `push()` method adds a chunk of data -into the queue for subsequent stream processors to consume. If `null` is -passed, it signals the end of the stream (EOF), after which no more data -can be written. +When the Readable is operating in paused mode, the data added with +`readable.push()` can be read out by calling the +[`readable.read()`][stream-read] method when the [`'readable'`][] event is +emitted. -The data added with `push()` can be pulled out by calling the -[`stream.read()`][stream-read] method when the [`'readable'`][] event fires. +When the Readable is operating in flowing mode, the data added with +`readable.push()` will be delivered by emitting a `'data'` event. -This API is designed to be as flexible as possible. For example, -you may be wrapping a lower-level source which has some sort of -pause/resume mechanism, and a data callback. In those cases, you -could wrap the low-level source object by doing something like this: +The `readable.push()` method is designed to be as flexible as possible. For +example, when wrapping a lower-level source that provides some form of +pause/resume mechanism, and a data callback, the low-level source can be wrapped +by the custom Readable instance as illustrated in the following example: ```js // source is an object with readStop() and readStart() methods, // and an `ondata` member that gets called when it has data, and // an `onend` member that gets called when the data is over. - class SourceWrapper extends Readable { constructor(options) { super(options); this._source = getLowlevelSourceObject(); - // Every time there's data, we push it into the internal buffer. + // Every time there's data, push it into the internal buffer. this._source.ondata = (chunk) => { - // if push() returns false, then we need to stop reading from source + // if push() returns false, then stop reading from source if (!this.push(chunk)) this._source.readStop(); }; - // When the source ends, we push the EOF-signaling `null` chunk + // When the source ends, push the EOF-signaling `null` chunk this._source.onend = () => { this.push(null); }; @@ -971,12 +1406,37 @@ class SourceWrapper extends Readable { } } ``` +*Note*: The `readable.push()` method is intended be called only by Readable +Implemeters, and only from within the `readable.\_read()` method. + +#### Errors While Reading + +It is recommended that errors occurring during the processing of the +`readable.\_read()` method are emitted using the `'error'` event rather than +being thrown. Throwing an Error from within `readable.\_read()` can result in +expected and inconsistent behavior depending on whether the stream is operating +in flowing or paused mode. Using the `'error'` event ensures consistent and +predictable handling of errors. -#### Example: A Counting Stream +```js +const Readable = require('stream').Readable; + +const myReadable = new Readable({ + read(size) { + if (checkSomeErrorCondition()) { + process.nextTick(() => this.emit('error', err)); + return; + } + // do some work + } +}); +``` + +#### An Example Counting Stream -This is a basic example of a Readable stream. It emits the numerals +The following is a basic example of a Readable stream that emits the numerals from 1 to 1,000,000 in ascending order, and then ends. ```js @@ -1002,527 +1462,323 @@ class Counter extends Readable { } ``` -#### Example: SimpleProtocol v1 (Sub-optimal) - -This is similar to the `parseHeader` function described -[here](#stream_readable_unshift_chunk), but implemented as a custom stream. -Also, note that this implementation does not convert the incoming data to a -string. +### Implementing a Duplex Stream -However, this would be better implemented as a [Transform][] stream. See -[SimpleProtocol v2][] for a better implementation. +A [Duplex][] stream is one that implements both [Readable][] and [Writable][], +such as a TCP socket connection. -```js -// A parser for a simple data protocol. -// The "header" is a JSON object, followed by 2 \n characters, and -// then a message body. -// -// NOTE: This can be done more simply as a Transform stream! -// Using Readable directly for this is sub-optimal. See the -// alternative example below under the Transform section. +Because Javascript does not have support for multiple inheritance, the +`stream.Duplex` class is extended to implement a [Duplex][] stream (as opposed +to extending the `stream.Readable` *and* `stream.Writable` classes). -const Readable = require('stream').Readable; - -class SimpleProtocol extends Readable { - constructor(source, options) { - super(options); +*Note*: The `stream.Duplex` class prototypically inherits from `stream.Readable` +and parasitically from `stream.Writable`. - this._inBody = false; - this._sawFirstCr = false; +Custom Duplex streams *must* call the `new stream.Duplex([options])` +constructor and implement *both* the `readable.\_read()` and +`writable.\_write()` methods. - // source is a readable stream, such as a socket or file - this._source = source; - - source.on('end', () => { - this.push(null); - }); +#### new stream.Duplex(options) - // give it a kick whenever the source is readable - // read(0) will not consume any bytes - source.on('readable', () => { - this.read(0); - }); +* `options` {Object} Passed to both Writable and Readable + constructors. Also has the following fields: + * `allowHalfOpen` {Boolean} Defaults to `true`. If set to `false`, then + the stream will automatically end the readable side when the + writable side ends and vice versa. + * `readableObjectMode` {Boolean} Defaults to `false`. Sets `objectMode` + for readable side of the stream. Has no effect if `objectMode` + is `true`. + * `writableObjectMode` {Boolean} Defaults to `false`. Sets `objectMode` + for writable side of the stream. Has no effect if `objectMode` + is `true`. - this._rawHeader = []; - this.header = null; - } +For example: - _read(n) { - if (!this._inBody) { - var chunk = this._source.read(); - - // if the source doesn't have data, we don't have data yet. - if (chunk === null) - return this.push(''); - - // check if the chunk has a \n\n - var split = -1; - for (var i = 0; i < chunk.length; i++) { - if (chunk[i] === 10) { // '\n' - if (this._sawFirstCr) { - split = i; - break; - } else { - this._sawFirstCr = true; - } - } else { - this._sawFirstCr = false; - } - } +```js +const Duplex = require('stream').Duplex; - if (split === -1) { - // still waiting for the \n\n - // stash the chunk, and try again. - this._rawHeader.push(chunk); - this.push(''); - } else { - this._inBody = true; - var h = chunk.slice(0, split); - this._rawHeader.push(h); - var header = Buffer.concat(this._rawHeader).toString(); - try { - this.header = JSON.parse(header); - } catch (er) { - this.emit('error', new Error('invalid simple protocol data')); - return; - } - // now, because we got some extra data, unshift the rest - // back into the read queue so that our consumer will see it. - var b = chunk.slice(split); - this.unshift(b); - // calling unshift by itself does not reset the reading state - // of the stream; since we're inside _read, doing an additional - // push('') will reset the state appropriately. - this.push(''); - - // and let them know that we are done parsing the header. - this.emit('header', this.header); - } - } else { - // from there on, just provide the data to our consumer. - // careful not to push(null), since that would indicate EOF. - var chunk = this._source.read(); - if (chunk) this.push(chunk); - } +class MyDuplex extends Duplex { + constructor(options) { + super(options); } } -// Usage: -// var parser = new SimpleProtocol(source); -// Now parser is a readable stream that will emit 'header' -// with the parsed header data. ``` -### Class: stream.Transform - -A "transform" stream is a duplex stream where the output is causally -connected in some way to the input, such as a [zlib][] stream or a -[crypto][] stream. - -There is no requirement that the output be the same size as the input, -the same number of chunks, or arrive at the same time. For example, a -Hash stream will only ever have a single chunk of output which is -provided when the input is ended. A zlib stream will produce output -that is either much smaller or much larger than its input. - -Rather than implement the [`stream._read()`][stream-_read] and -[`stream._write()`][stream-_write] methods, Transform classes must implement the -[`stream._transform()`][stream-_transform] method, and may optionally -also implement the [`stream._flush()`][stream-_flush] method. (See below.) - -#### new stream.Transform([options]) - -* `options` {Object} Passed to both Writable and Readable - constructors. Also has the following fields: - * `transform` {Function} Implementation for the - [`stream._transform()`][stream-_transform] method. - * `flush` {Function} Implementation for the [`stream._flush()`][stream-_flush] - method. - -In classes that extend the Transform class, make sure to call the -constructor so that the buffering settings can be properly -initialized. - -#### Events: 'finish' and 'end' - -The [`'finish'`][] and [`'end'`][] events are from the parent Writable -and Readable classes respectively. The `'finish'` event is fired after -[`stream.end()`][stream-end] is called and all chunks have been processed by -[`stream._transform()`][stream-_transform], `'end'` is fired after all data has -been output which is after the callback in [`stream._flush()`][stream-_flush] -has been called. - -#### transform.\_flush(callback) - -* `callback` {Function} Call this function (optionally with an error - argument) when you are done flushing any remaining data. - -Note: **This function MUST NOT be called directly.** It MAY be implemented -by child classes, and if so, will be called by the internal Transform -class methods only. - -In some cases, your transform operation may need to emit a bit more -data at the end of the stream. For example, a `Zlib` compression -stream will store up some internal state so that it can optimally -compress the output. At the end, however, it needs to do the best it -can with what is left, so that the data will be complete. - -In those cases, you can implement a `_flush()` method, which will be -called at the very end, after all the written data is consumed, but -before emitting [`'end'`][] to signal the end of the readable side. Just -like with [`stream._transform()`][stream-_transform], call -`transform.push(chunk)` zero or more times, as appropriate, and call `callback` -when the flush operation is complete. - -This method is prefixed with an underscore because it is internal to -the class that defines it, and should not be called directly by user -programs. However, you **are** expected to override this method in -your own extension classes. - -#### transform.\_transform(chunk, encoding, callback) - -* `chunk` {Buffer|String} The chunk to be transformed. Will **always** - be a buffer unless the `decodeStrings` option was set to `false`. -* `encoding` {String} If the chunk is a string, then this is the - encoding type. If chunk is a buffer, then this is the special - value - 'buffer', ignore it in this case. -* `callback` {Function} Call this function (optionally with an error - argument and data) when you are done processing the supplied chunk. - -Note: **This function MUST NOT be called directly.** It should be -implemented by child classes, and called by the internal Transform -class methods only. +Or, when using pre-ES6 style constructors: -All Transform stream implementations must provide a `_transform()` -method to accept input and produce output. - -`_transform()` should do whatever has to be done in this specific -Transform class, to handle the bytes being written, and pass them off -to the readable portion of the interface. Do asynchronous I/O, -process things, and so on. +```js +const Duplex = require('stream').Duplex; +const util = require('util'); -Call `transform.push(outputChunk)` 0 or more times to generate output -from this input chunk, depending on how much data you want to output -as a result of this chunk. +function MyDuplex(options) { + if (!(this instanceof MyDuplex)) + return new MyDuplex(options); + Duplex.call(this, options); +} +util.inherits(MyDuplex, Duplex); +``` -Call the callback function only when the current chunk is completely -consumed. Note that there may or may not be output as a result of any -particular input chunk. If you supply a second argument to the callback -it will be passed to the push method. In other words the following are -equivalent: +Or, using the Simplified Constructor approach: ```js -transform.prototype._transform = function (data, encoding, callback) { - this.push(data); - callback(); -}; +const Duplex = require('stream').Duplex; -transform.prototype._transform = function (data, encoding, callback) { - callback(null, data); -}; +const myDuplex = new Duplex({ + read(size) { + // ... + }, + write(chunk, encoding, callback) { + // ... + } +}); ``` -This method is prefixed with an underscore because it is internal to -the class that defines it, and should not be called directly by user -programs. However, you **are** expected to override this method in -your own extension classes. - -#### Example: `SimpleProtocol` parser v2 +#### An Example Duplex Stream -The example [here](#stream_example_simpleprotocol_v1_sub_optimal) of a simple -protocol parser can be implemented simply by using the higher level -[Transform][] stream class, similar to the `parseHeader` and `SimpleProtocol -v1` examples. +The following illustrates a simple example of a Duplex stream that wraps a +hypothetical lower-level source object to which data can be written, and +from which data can be read, albeit using an API that is not compatible with +Node.js streams. +The following illustrates a simple example of a Duplex stream that buffers +incoming written data via the [Writable][] interface that is read back out +via the [Readable][] interface. -In this example, rather than providing the input as an argument, it -would be piped into the parser, which is a more idiomatic Node.js stream -approach. - -```javascript -const Transform = require('stream').Transform; +```js +const Duplex = require('stream').Duplex; +const kSource = Symbol('source'); -class SimpleProtocol extends Transform { - constructor(options) { +class MyDuplex extends Duplex { + constructor(source, options) { super(options); - - this._inBody = false; - this._sawFirstCr = false; - this._rawHeader = []; - this.header = null; + this[kSource] = source; } - _transform(chunk, encoding, done) { - if (!this._inBody) { - // check if the chunk has a \n\n - var split = -1; - for (var i = 0; i < chunk.length; i++) { - if (chunk[i] === 10) { // '\n' - if (this._sawFirstCr) { - split = i; - break; - } else { - this._sawFirstCr = true; - } - } else { - this._sawFirstCr = false; - } - } + _write(chunk, encoding, callback) { + // The underlying source only deals with strings + if (Buffer.isBuffer(chunk)) + chunk = chunk.toString(encoding); + this[kSource].writeSomeData(chunk, encoding); + callback(); + } - if (split === -1) { - // still waiting for the \n\n - // stash the chunk, and try again. - this._rawHeader.push(chunk); - } else { - this._inBody = true; - var h = chunk.slice(0, split); - this._rawHeader.push(h); - var header = Buffer.concat(this._rawHeader).toString(); - try { - this.header = JSON.parse(header); - } catch (er) { - this.emit('error', new Error('invalid simple protocol data')); - return; - } - // and let them know that we are done parsing the header. - this.emit('header', this.header); - - // now, because we got some extra data, emit this first. - this.push(chunk.slice(split)); - } - } else { - // from there on, just provide the data to our consumer as-is. - this.push(chunk); - } - done(); + _read(size) { + this[kSource].fetchSomeData(size, (data, encoding) => { + this.push(Buffer.from(data, encoding)); + }); } } -// Usage: -// var parser = new SimpleProtocol(); -// source.pipe(parser) -// Now parser is a readable stream that will emit 'header' -// with the parsed header data. ``` -### Class: stream.Writable +The most important aspect of a Duplex stream is that the Readable and Writable +sides operate independently of one another despite co-existing within a single +object instance. - +#### Object Mode Duplex Streams -`stream.Writable` is an abstract class designed to be extended with an -underlying implementation of the -[`stream._write(chunk, encoding, callback)`][stream-_write] method. +For Duplex streams, `objectMode` can be set exclusively for either the Readable +or Writable side using the `readableObjectMode` and `writableObjectMode` options +respectively. -Please see [API for Stream Consumers][] for how to consume -writable streams in your programs. What follows is an explanation of -how to implement Writable streams in your programs. +In the following example, for instance, a new Transform stream (which is a +type of [Duplex][] stream) is created that has an object mode Writable side +that accepts JavaScript numbers that are converted to hexidecimal strings on +the Readable side. -#### new stream.Writable([options]) - -* `options` {Object} - * `highWaterMark` {Number} Buffer level when - [`stream.write()`][stream-write] starts returning `false`. Default = `16384` - (16kb), or `16` for `objectMode` streams. - * `decodeStrings` {Boolean} Whether or not to decode strings into - Buffers before passing them to [`stream._write()`][stream-_write]. - Default = `true` - * `objectMode` {Boolean} Whether or not the - [`stream.write(anyObj)`][stream-write] is a valid operation. If set you can - write arbitrary data instead of only `Buffer` / `String` data. - Default = `false` - * `write` {Function} Implementation for the - [`stream._write()`][stream-_write] method. - * `writev` {Function} Implementation for the - [`stream._writev()`][stream-_writev] method. - -In classes that extend the Writable class, make sure to call the -constructor so that the buffering settings can be properly -initialized. - -#### writable.\_write(chunk, encoding, callback) - -* `chunk` {Buffer|String} The chunk to be written. Will **always** - be a buffer unless the `decodeStrings` option was set to `false`. -* `encoding` {String} If the chunk is a string, then this is the - encoding type. If chunk is a buffer, then this is the special - value - 'buffer', ignore it in this case. -* `callback` {Function} Call this function (optionally with an error - argument) when you are done processing the supplied chunk. - -All Writable stream implementations must provide a -[`stream._write()`][stream-_write] method to send data to the underlying -resource. +```js +const Transform = require('stream').Transform; -Note: **This function MUST NOT be called directly.** It should be -implemented by child classes, and called by the internal Writable -class methods only. +// All Transform streams are also Duplex Streams +const myTransform = new Transform({ + writableObjectMode: true, -Call the callback using the standard `callback(error)` pattern to -signal that the write completed successfully or with an error. + transform(chunk, encoding, callback) { + // Coerce the chunk to a number if necessary + chunk |= 0; -If the `decodeStrings` flag is set in the constructor options, then -`chunk` may be a string rather than a Buffer, and `encoding` will -indicate the sort of string that it is. This is to support -implementations that have an optimized handling for certain string -data encodings. If you do not explicitly set the `decodeStrings` -option to `false`, then you can safely ignore the `encoding` argument, -and assume that `chunk` will always be a Buffer. + // Transform the chunk into something else. + const data = chunk.toString(16); -This method is prefixed with an underscore because it is internal to -the class that defines it, and should not be called directly by user -programs. However, you **are** expected to override this method in -your own extension classes. + // Push the data onto the readable queue. + callback(null, '0'.repeat(data.length % 2) + data); + } +}); -#### writable.\_writev(chunks, callback) +myTransform.setEncoding('ascii'); +myTransform.on('data', (chunk) => console.log(chunk)); -* `chunks` {Array} The chunks to be written. Each chunk has following - format: `{ chunk: ..., encoding: ... }`. -* `callback` {Function} Call this function (optionally with an error - argument) when you are done processing the supplied chunks. +myTransform.write(1); + // Prints: 01 +myTransform.write(10); + // Prints: 0a +myTransform.write(100); + // Prints: 64 +``` -Note: **This function MUST NOT be called directly.** It may be -implemented by child classes, and called by the internal Writable -class methods only. +### Implementing a Transform Stream -This function is completely optional to implement. In most cases it is -unnecessary. If implemented, it will be called with all the chunks -that are buffered in the write queue. +A [Transform][] stream is a [Duplex][] stream where the output is computed +in some way from the input. Examples include [zlib][] streams or [crypto][] +streams that compress, encrypt, or decrypt data. +*Note*: There is no requirement that the output be the same size as the input, +the same number of chunks, or arrive at the same time. For example, a +Hash stream will only ever have a single chunk of output which is +provided when the input is ended. A `zlib` stream will produce output +that is either much smaller or much larger than its input. -## Simplified Constructor API +The `stream.Transform` class is extended to implement a [Transform][] stream. - +The `stream.Transform` class prototypically inherits from `stream.Duplex` and +implements its own versions of the `writable.\_write()` and `readable.\_read()` +methods. Custom Transform implementations *must* implement the +[`transform.\_transform()`][stream-_transform] method and *may* also implement +the [`transform.\_flush()`][stream-._flush] method. -In simple cases there is now the added benefit of being able to construct a -stream without inheritance. +*Note*: Care must be taken when using Transform streams in that data written +to the stream can cause the Writable side of the stream to become paused if +the output on the Readable side is not consumed. -This can be done by passing the appropriate methods as constructor options: +#### new stream.Transform([options]) -Examples: +* `options` {Object} Passed to both Writable and Readable + constructors. Also has the following fields: + * `transform` {Function} Implementation for the + [`stream._transform()`][stream-_transform] method. + * `flush` {Function} Implementation for the [`stream._flush()`][stream-_flush] + method. -### Duplex +For example: ```js -var duplex = new stream.Duplex({ - read: function(n) { - // sets this._read under the hood - - // push data onto the read queue, passing null - // will signal the end of the stream (EOF) - this.push(chunk); - }, - write: function(chunk, encoding, next) { - // sets this._write under the hood +const Transform = require('stream').Transform; - // An optional error can be passed as the first argument - next() +class MyTransform extends Transform { + constructor(options) { + super(options); } -}); - -// or +} +``` -var duplex = new stream.Duplex({ - read: function(n) { - // sets this._read under the hood +Or, when using pre-ES6 style constructors: - // push data onto the read queue, passing null - // will signal the end of the stream (EOF) - this.push(chunk); - }, - writev: function(chunks, next) { - // sets this._writev under the hood +```js +const Transform = require('stream').Transform; +const util = require('util'); - // An optional error can be passed as the first argument - next() - } -}); +function MyTransform(options) { + if (!(this instanceof MyTransform)) + return new MyTransform(options); + Transform.call(this, options); +} +util.inherits(MyTransform, Transform); ``` -### Readable +Or, using the Simplified Constructor approach: ```js -var readable = new stream.Readable({ - read: function(n) { - // sets this._read under the hood +const Transform = require('stream').Transform; - // push data onto the read queue, passing null - // will signal the end of the stream (EOF) - this.push(chunk); +const myTransform = new Transform({ + transform(chunk, encoding, callback) { + // ... } }); ``` -### Transform +#### Events: 'finish' and 'end' -```js -var transform = new stream.Transform({ - transform: function(chunk, encoding, next) { - // sets this._transform under the hood +The [`'finish'`][] and [`'end'`][] events are from the `stream.Writable` +and `stream.Readable` classes, respectively. The `'finish'` event is emitted +after [`stream.end()`][stream-end] is called and all chunks have been processed +by [`stream._transform()`][stream-_transform]. The `'end'` event is emitted +after all data has been output, which occurs after the callback in +[`transform._flush()`][stream-_flush] has been called. - // generate output as many times as needed - // this.push(chunk); +#### transform.\_flush(callback) - // call when the current chunk is consumed - next(); - }, - flush: function(done) { - // sets this._flush under the hood +* `callback` {Function} A callback function (optionally with an error + argument) to be called when remaining data has been flushed. - // generate output as many times as needed - // this.push(chunk); +*Note*: **This function MUST NOT be called by application code directly.** It +should be implemented by child classes, and called only by the internal Readable +class methods only. - done(); - } -}); -``` +In some cases, a transform operation may need to emit an additional bit of +data at the end of the stream. For example, a `zlib` compression stream will +store an amount of internal state used to optimally compress the output. When +the stream ends, however, that additional data needs to be flushed so that the +compressed data will be complete. -### Writable +Custom [Transform][] implementations *may* implement the `transform.\_flush()` +method. This will be called when there is no more written data to be consumed, +but before the [`'end'`][] event is emitted signaling the end of the +[Readable][] stream. -```js -var writable = new stream.Writable({ - write: function(chunk, encoding, next) { - // sets this._write under the hood +Within the `transform.\_flush()` implementation, the `readable.push()` method +may be called zero or more times, as appropriate. The `callback` function must +be called when the flush operation is complete. - // An optional error can be passed as the first argument - next() - } -}); +The `transform.\_flush()` method is prefixed with an underscore because it is +internal to the class that defines it, and should never be called directly by +user programs. -// or +#### transform.\_transform(chunk, encoding, callback) -var writable = new stream.Writable({ - writev: function(chunks, next) { - // sets this._writev under the hood +* `chunk` {Buffer|String} The chunk to be transformed. Will **always** + be a buffer unless the `decodeStrings` option was set to `false`. +* `encoding` {String} If the chunk is a string, then this is the + encoding type. If chunk is a buffer, then this is the special + value - 'buffer', ignore it in this case. +* `callback` {Function} A callback function (optionally with an error + argument and data) to be called after the supplied `chunk` has been + processed. - // An optional error can be passed as the first argument - next() - } -}); -``` +*Note*: **This function MUST NOT be called by application code directly.** It +should be implemented by child classes, and called only by the internal Readable +class methods only. + +All Transform stream implementations must provide a `_transform()` +method to accept input and produce output. The `transform.\_transform()` +implementation handles the bytes being written, computes an output, then passes +that output off to the readable portion using the `readable.push()` method. -## Streams: Under the Hood +The `transform.push()` method may be called zero or more times to generate +output from a single input chunk, depending on how much is to be output +as a result of the chunk. - +It is possible that no output is generated from any given chunk of input data. -### Buffering +The `callback` function must be called only when the current chunk is completely +consumed. The first argument passed to the `callback` must be an `Error` object +if an error occurred while processing the input or `null` otherwise. If a second +argument is passed to the `callback`, it will be forwarded on to the +`readable.push()` method. In other words the following are equivalent: - +```js +transform.prototype._transform = function (data, encoding, callback) { + this.push(data); + callback(); +}; -Both Writable and Readable streams will buffer data on an internal -object which can be retrieved from `_writableState.getBuffer()` or -`_readableState.buffer`, respectively. +transform.prototype._transform = function (data, encoding, callback) { + callback(null, data); +}; +``` -The amount of data that will potentially be buffered depends on the -`highWaterMark` option which is passed into the constructor. +The `transform.\_transform()` method is prefixed with an underscore because it +is internal to the class that defines it, and should never be called directly by +user programs. -Buffering in Readable streams happens when the implementation calls -[`stream.push(chunk)`][stream-push]. If the consumer of the Stream does not -call [`stream.read()`][stream-read], then the data will sit in the internal -queue until it is consumed. +#### Class: stream.PassThrough + +The `stream.PassThrough` class is a trivial implementation of a [Transform][] +stream that simply passes the input bytes across to the output. Its purpose is +primarily for examples and testing, but there are some use cases where +`stream.PassThrough` is useful as a building block for novel sorts of streams. -Buffering in Writable streams happens when the user calls -[`stream.write(chunk)`][stream-write] repeatedly, even when it returns `false`. +## Additional Notes -The purpose of streams, especially with the [`stream.pipe()`][] method, is to -limit the buffering of data to acceptable levels, so that sources and -destinations of varying speed will not overwhelm the available memory. + ### Compatibility with Older Node.js Versions @@ -1531,26 +1787,26 @@ destinations of varying speed will not overwhelm the available memory. In versions of Node.js prior to v0.10, the Readable stream interface was simpler, but also less powerful and less useful. -* Rather than waiting for you to call the [`stream.read()`][stream-read] method, - [`'data'`][] events would start emitting immediately. If you needed to do - some I/O to decide how to handle data, then you had to store the chunks - in some kind of buffer so that they would not be lost. +* Rather than waiting for calls the [`stream.read()`][stream-read] method, + [`'data'`][] events would begin emitting immediately. Applications that + would need to perform some amount of work to decide how to handle data + were required to store read data into buffers so the data would not be lost. * The [`stream.pause()`][stream-pause] method was advisory, rather than - guaranteed. This meant that you still had to be prepared to receive - [`'data'`][] events even when the stream was in a paused state. - -In Node.js v0.10, the [Readable][] class was added. -For backwards compatibility with older Node.js programs, Readable streams -switch into "flowing mode" when a [`'data'`][] event handler is added, or -when the [`stream.resume()`][stream-resume] method is called. The effect is -that, even if you are not using the new [`stream.read()`][stream-read] method -and [`'readable'`][] event, you no longer have to worry about losing + guaranteed. This meant that it was still necessary to be prepared to receive + [`'data'`][] events *even when the stream was in a paused state*. + +In Node.js v0.10, the [Readable][] class was added. For backwards compatibility +with older Node.js programs, Readable streams switch into "flowing mode" when a +[`'data'`][] event handler is added, or when the +[`stream.resume()`][stream-resume] method is called. The effect is that, even +when not using the new [`stream.read()`][stream-read] method and +[`'readable'`][] event, it is no longer necessary to worry about losing [`'data'`][] chunks. -Most programs will continue to function normally. However, this -introduces an edge case in the following conditions: +While most applications will continue to function normally, this introduces an +edge case in the following conditions: -* No [`'data'`][] event handler is added. +* No [`'data'`][] event listener is added. * The [`stream.resume()`][stream-resume] method is never called. * The stream is not piped to any writable destination. @@ -1563,25 +1819,25 @@ net.createServer((socket) => { // we add an 'end' method, but never consume the data socket.on('end', () => { // It will never get here. - socket.end('I got your message (but didnt read it)\n'); + socket.end('The message was received but was not processed.\n'); }); }).listen(1337); ``` In versions of Node.js prior to v0.10, the incoming message data would be -simply discarded. However, in Node.js v0.10 and beyond, -the socket will remain paused forever. +simply discarded. However, in Node.js v0.10 and beyond, the socket remains +paused forever. The workaround in this situation is to call the -[`stream.resume()`][stream-resume] method to start the flow of data: +[`stream.resume()`][stream-resume] method to begin the flow of data: ```js // Workaround net.createServer((socket) => { socket.on('end', () => { - socket.end('I got your message (but didnt read it)\n'); + socket.end('The message was received but was not processed.\n'); }); // start the flow of data, discarding it. @@ -1592,126 +1848,33 @@ net.createServer((socket) => { In addition to new Readable streams switching into flowing mode, pre-v0.10 style streams can be wrapped in a Readable class using the -[`stream.wrap()`][] method. - +[`readable.wrap()`][] method. -### Object Mode - - -Normally, Streams operate on Strings and Buffers exclusively. +### `readable.read(0)` -Streams that are in **object mode** can emit generic JavaScript values -other than Buffers and Strings. - -A Readable stream in object mode will always return a single item from -a call to [`stream.read(size)`][stream-read], regardless of what the size -argument is. - -A Writable stream in object mode will always ignore the `encoding` -argument to [`stream.write(data, encoding)`][stream-write]. - -The special value `null` still retains its special value for object -mode streams. That is, for object mode readable streams, `null` as a -return value from [`stream.read()`][stream-read] indicates that there is no more -data, and [`stream.push(null)`][stream-push] will signal the end of stream data -(`EOF`). - -No streams in Node.js core are object mode streams. This pattern is only -used by userland streaming libraries. - -You should set `objectMode` in your stream child class constructor on -the options object. Setting `objectMode` mid-stream is not safe. - -For Duplex streams `objectMode` can be set exclusively for readable or -writable side with `readableObjectMode` and `writableObjectMode` -respectively. These options can be used to implement parsers and -serializers with Transform streams. - -```js -const StringDecoder = require('string_decoder').StringDecoder; -const Transform = require('stream').Transform; - -// Gets \n-delimited JSON string data, and emits the parsed objects -class JSONParseStream extends Transform { - constructor() { - super({ readableObjectMode : true }); - - this._buffer = ''; - this._decoder = new StringDecoder('utf8'); - } - - _transform(chunk, encoding, cb) { - this._buffer += this._decoder.write(chunk); - // split on newlines - var lines = this._buffer.split(/\r?\n/); - // keep the last partial line buffered - this._buffer = lines.pop(); - for (var l = 0; l < lines.length; l++) { - var line = lines[l]; - try { - var obj = JSON.parse(line); - } catch (er) { - this.emit('error', er); - return; - } - // push the parsed object out to the readable consumer - this.push(obj); - } - cb(); - } - - _flush(cb) { - // Just handle any leftover - var rem = this._buffer.trim(); - if (rem) { - try { - var obj = JSON.parse(rem); - } catch (er) { - this.emit('error', er); - return; - } - // push the parsed object out to the readable consumer - this.push(obj); - } - cb(); - } -} -``` - -### `stream.read(0)` - -There are some cases where you want to trigger a refresh of the +There are some cases where it is necessary to trigger a refresh of the underlying readable stream mechanisms, without actually consuming any -data. In that case, you can call `stream.read(0)`, which will always -return null. +data. In such cases, it is possible to call `readable.read(0)`, which will +always return null. If the internal read buffer is below the `highWaterMark`, and the stream is not currently reading, then calling `stream.read(0)` will trigger a low-level [`stream._read()`][stream-_read] call. -There is almost never a need to do this. However, you will see some -cases in Node.js's internals where this is done, particularly in the +While most applications will almost never need to do this, there are +situations within Node.js where this is done, particularly in the Readable stream class internals. -### `stream.push('')` - -Pushing a zero-byte string or Buffer (when not in [Object mode][]) has an -interesting side effect. Because it *is* a call to -[`stream.push()`][stream-push], it will end the `reading` process. However, it -does *not* add any data to the readable buffer, so there's nothing for -a user to consume. +### `readable.push('')` -Very rarely, there are cases where you have no data to provide now, -but the consumer of your stream (or, perhaps, another bit of your own -code) will know when to check again, by calling [`stream.read(0)`][stream-read]. -In those cases, you *may* call `stream.push('')`. +Use of `readable.push('')` is not recommended. -So far, the only use case for this functionality is in the -[`tls.CryptoStream`][] class, which is deprecated in Node.js/io.js v1.0. If you -find that you have to use `stream.push('')`, please consider another -approach, because it almost certainly indicates that something is -horribly wrong. +Pushing a zero-byte string or `Buffer` to a stream that is not in object mode +has an interesting side effect. Because it *is* a call to +[`readable.push()`][stream-push], the call will end the reading process. +However, because the argument is an empty string, no data is added to the +readable buffer so there is nothing for a user to consume. [`'data'`]: #stream_event_data [`'drain'`]: #stream_event_drain @@ -1730,7 +1893,7 @@ horribly wrong. [`stream.wrap()`]: #stream_readable_wrap_stream [`tls.CryptoStream`]: tls.html#tls_class_cryptostream [API for Stream Consumers]: #stream_api_for_stream_consumers -[API for Stream Implementors]: #stream_api_for_stream_implementors +[API for Stream Implemeters]: #stream_api_for_stream_Implemeters [child process stdin]: child_process.html#child_process_child_stdin [child process stdout and stderr]: child_process.html#child_process_child_stdout [Compatibility]: #stream_compatibility_with_older_node_js_versions @@ -1738,6 +1901,10 @@ horribly wrong. [Duplex]: #stream_class_stream_duplex [fs read streams]: fs.html#fs_class_fs_readstream [fs write streams]: fs.html#fs_class_fs_writestream +[`fs.createReadStream()`]: fs.html#fs_fs_createreadstream_path_options +[`fs.createWriteStream()`]: fs.html#fs_fs_createwritestream_path_options +[`net.Socket`]: net.html#net_class_net_socket +[`zlib.createDeflate()`]: zlib.html#zlib_zlib_createdeflate_options [HTTP requests, on the client]: http.html#http_class_http_clientrequest [HTTP responses, on the server]: http.html#http_class_http_serverresponse [http-incoming-message]: http.html#http_class_http_incomingmessage