From 420e07c5777bdb2e493147d296abfc102f725015 Mon Sep 17 00:00:00 2001 From: isaacs Date: Tue, 2 Oct 2012 15:44:50 -0700 Subject: [PATCH] streams2: The new stream base classes --- lib/_stream_duplex.js | 81 +++++++ lib/_stream_passthrough.js | 39 ++++ lib/_stream_readable.js | 429 +++++++++++++++++++++++++++++++++++++ lib/_stream_transform.js | 123 +++++++++++ lib/_stream_writable.js | 135 ++++++++++++ lib/stream.js | 21 +- node.gyp | 5 + 7 files changed, 829 insertions(+), 4 deletions(-) create mode 100644 lib/_stream_duplex.js create mode 100644 lib/_stream_passthrough.js create mode 100644 lib/_stream_readable.js create mode 100644 lib/_stream_transform.js create mode 100644 lib/_stream_writable.js diff --git a/lib/_stream_duplex.js b/lib/_stream_duplex.js new file mode 100644 index 0000000000..0256b0f2f2 --- /dev/null +++ b/lib/_stream_duplex.js @@ -0,0 +1,81 @@ +// Copyright Joyent, Inc. and other Node contributors. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the +// "Software"), to deal in the Software without restriction, including +// without limitation the rights to use, copy, modify, merge, publish, +// distribute, sublicense, and/or sell copies of the Software, and to permit +// persons to whom the Software is furnished to do so, subject to the +// following conditions: +// +// The above copyright notice and this permission notice shall be included +// in all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN +// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR +// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE +// USE OR OTHER DEALINGS IN THE SOFTWARE. + +// a duplex stream is just a stream that is both readable and writable. +// Since JS doesn't have multiple prototypal inheritance, this class +// prototypally inherits from Readable, and then parasitically from +// Writable. + +module.exports = Duplex; +var util = require('util'); +var Readable = require('_stream_readable'); +var Writable = require('_stream_writable'); + +util.inherits(Duplex, Readable); + +Object.keys(Writable.prototype).forEach(function(method) { + if (!Duplex.prototype[method]) + Duplex.prototype[method] = Writable.prototype[method]; +}); + +function Duplex(options) { + Readable.call(this, options); + Writable.call(this, options); + + this.allowHalfOpen = true; + if (options && options.allowHalfOpen === false) + this.allowHalfOpen = false; + + this.once('finish', onfinish); + this.once('end', onend); +} + +// the no-half-open enforcers. +function onfinish() { + // if we allow half-open state, or if the readable side ended, + // then we're ok. + if (this.allowHalfOpen || this._readableState.ended) + return; + + // mark that we're done. + this._readableState.ended = true; + + // tell the user + if (this._readableState.length === 0) + this.emit('end'); + else + this.emit('readable'); +} + +function onend() { + // if we allow half-open state, or if the writable side ended, + // then we're ok. + if (this.allowHalfOpen || this._writableState.ended) + return; + + // just in case the user is about to call write() again. + this.write = function() { + return false; + }; + + // no more data can be written. + this.end(); +} diff --git a/lib/_stream_passthrough.js b/lib/_stream_passthrough.js new file mode 100644 index 0000000000..dd6390fc6e --- /dev/null +++ b/lib/_stream_passthrough.js @@ -0,0 +1,39 @@ +// Copyright Joyent, Inc. and other Node contributors. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the +// "Software"), to deal in the Software without restriction, including +// without limitation the rights to use, copy, modify, merge, publish, +// distribute, sublicense, and/or sell copies of the Software, and to permit +// persons to whom the Software is furnished to do so, subject to the +// following conditions: +// +// The above copyright notice and this permission notice shall be included +// in all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN +// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR +// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE +// USE OR OTHER DEALINGS IN THE SOFTWARE. + +// a passthrough stream. +// basically just the most minimal sort of Transform stream. +// Every written chunk gets output as-is. + +module.exports = PassThrough; + +var Transform = require('_stream_transform'); +var util = require('util'); +util.inherits(PassThrough, Transform); + +function PassThrough(options) { + Transform.call(this, options); +} + +PassThrough.prototype._transform = function(chunk, output, cb) { + output(chunk); + cb(); +}; diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js new file mode 100644 index 0000000000..b71c22aecf --- /dev/null +++ b/lib/_stream_readable.js @@ -0,0 +1,429 @@ +// Copyright Joyent, Inc. and other Node contributors. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the +// "Software"), to deal in the Software without restriction, including +// without limitation the rights to use, copy, modify, merge, publish, +// distribute, sublicense, and/or sell copies of the Software, and to permit +// persons to whom the Software is furnished to do so, subject to the +// following conditions: +// +// The above copyright notice and this permission notice shall be included +// in all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN +// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR +// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE +// USE OR OTHER DEALINGS IN THE SOFTWARE. + +module.exports = Readable; + +var Stream = require('stream'); +var util = require('util'); +var assert = require('assert'); + +util.inherits(Readable, Stream); + +function ReadableState(options, stream) { + options = options || {}; + + this.bufferSize = options.bufferSize || 16 * 1024; + assert(typeof this.bufferSize === 'number'); + // cast to an int + this.bufferSize = ~~this.bufferSize; + + this.lowWaterMark = options.lowWaterMark || 1024; + this.buffer = []; + this.length = 0; + this.pipes = []; + this.flowing = false; + this.ended = false; + this.stream = stream; + this.reading = false; + + // whenever we return null, then we set a flag to say + // that we're awaiting a 'readable' event emission. + this.needReadable = false; +} + +function Readable(options) { + this._readableState = new ReadableState(options, this); + Stream.apply(this); +} + +// you can override either this method, or _read(n, cb) below. +Readable.prototype.read = function(n) { + var state = this._readableState; + + if (state.length === 0 && state.ended) { + process.nextTick(this.emit.bind(this, 'end')); + return null; + } + + if (isNaN(n) || n <= 0) + n = state.length + + // XXX: controversial. + // don't have that much. return null, unless we've ended. + // However, if the low water mark is lower than the number of bytes, + // then we still need to return what we have, or else it won't kick + // off another _read() call. For example, + // lwm=5 + // len=9 + // read(10) + // We don't have that many bytes, so it'd be tempting to return null, + // but then it won't ever cause _read to be called, so in that case, + // we just return what we have, and let the programmer deal with it. + if (n > state.length) { + if (!state.ended && state.length < state.lowWaterMark) { + state.needReadable = true; + n = 0; + } else + n = state.length; + } + + var ret = n > 0 ? fromList(n, state.buffer, state.length) : null; + + if (ret === null || ret.length === 0) + state.needReadable = true; + + state.length -= n; + + if (!state.ended && + state.length < state.lowWaterMark && + !state.reading) { + state.reading = true; + // call internal read method + this._read(state.bufferSize, function onread(er, chunk) { + state.reading = false; + if (er) + return this.emit('error', er); + + if (!chunk || !chunk.length) { + state.ended = true; + // if we've ended and we have some data left, then emit + // 'readable' now to make sure it gets picked up. + if (state.length > 0) + this.emit('readable'); + return; + } + + state.length += chunk.length; + state.buffer.push(chunk); + if (state.length < state.lowWaterMark) + this._read(state.bufferSize, onread.bind(this)); + + // now we have something to call this.read() to get. + if (state.needReadable) { + state.needReadable = false; + this.emit('readable'); + } + }.bind(this)); + } + + return ret; +}; + +// abstract method. to be overridden in specific implementation classes. +// call cb(er, data) where data is <= n in length. +// for virtual (non-string, non-buffer) streams, "length" is somewhat +// arbitrary, and perhaps not very meaningful. +Readable.prototype._read = function(n, cb) { + process.nextTick(cb.bind(this, new Error('not implemented'))); +}; + +Readable.prototype.pipe = function(dest, pipeOpts) { + var src = this; + var state = this._readableState; + if (!pipeOpts) + pipeOpts = {}; + state.pipes.push(dest); + + if ((!pipeOpts || pipeOpts.end !== false) && + dest !== process.stdout && + dest !== process.stderr) { + src.once('end', onend); + dest.on('unpipe', function(readable) { + if (readable === src) + src.removeListener('end', onend); + }); + } + + function onend() { + dest.end(); + } + + dest.emit('pipe', src); + + // start the flow. + if (!state.flowing) + process.nextTick(flow.bind(null, src, pipeOpts)); + + return dest; +}; + +function flow(src, pipeOpts) { + var state = src._readableState; + var chunk; + var dest; + var needDrain = 0; + + function ondrain() { + needDrain--; + if (needDrain === 0) + flow(src, pipeOpts); + } + + while (state.pipes.length && + null !== (chunk = src.read(pipeOpts.chunkSize))) { + state.pipes.forEach(function(dest, i, list) { + var written = dest.write(chunk); + if (false === written) { + needDrain++; + dest.once('drain', ondrain); + } + }); + src.emit('data', chunk); + + // if anyone needs a drain, then we have to wait for that. + if (needDrain > 0) + return; + } + + // if every destination was unpiped, either before entering this + // function, or in the while loop, then stop flowing. + // + // NB: This is a pretty rare edge case. + if (state.pipes.length === 0) { + state.flowing = false; + + // if there were data event listeners added, then switch to old mode. + if (this.listeners('data').length) + emitDataEvents(this); + return; + } + + // at this point, no one needed a drain, so we just ran out of data + // on the next readable event, start it over again. + src.once('readable', flow.bind(null, src, pipeOpts)); +} + +Readable.prototype.unpipe = function(dest) { + var state = this._readableState; + if (!dest) { + // remove all of them. + state.pipes.forEach(function(dest, i, list) { + dest.emit('unpipe', this); + }, this); + state.pipes.length = 0; + } else { + var i = state.pipes.indexOf(dest); + if (i !== -1) { + dest.emit('unpipe', this); + state.pipes.splice(i, 1); + } + } + return this; +}; + +// kludge for on('data', fn) consumers. Sad. +// This is *not* part of the new readable stream interface. +// It is an ugly unfortunate mess of history. +Readable.prototype.on = function(ev, fn) { + // https://github.com/isaacs/readable-stream/issues/16 + // if we're already flowing, then no need to set up data events. + if (ev === 'data' && !this._readableState.flowing) + emitDataEvents(this); + + return Stream.prototype.on.call(this, ev, fn); +}; +Readable.prototype.addListener = Readable.prototype.on; + +// pause() and resume() are remnants of the legacy readable stream API +// If the user uses them, then switch into old mode. +Readable.prototype.resume = function() { + emitDataEvents(this); + return this.resume(); +}; + +Readable.prototype.pause = function() { + emitDataEvents(this); + return this.pause(); +}; + +function emitDataEvents(stream) { + var state = stream._readableState; + + if (state.flowing) { + // https://github.com/isaacs/readable-stream/issues/16 + throw new Error('Cannot switch to old mode now.'); + } + + var paused = false; + var readable = false; + + // convert to an old-style stream. + stream.readable = true; + stream.pipe = Stream.prototype.pipe; + stream.on = stream.addEventListener = Stream.prototype.on; + + stream.on('readable', function() { + readable = true; + var c; + while (!paused && (null !== (c = stream.read()))) + stream.emit('data', c); + + if (c === null) { + readable = false; + stream._readableState.needReadable = true; + } + }); + + stream.pause = function() { + paused = true; + }; + + stream.resume = function() { + paused = false; + if (readable) + stream.emit('readable'); + }; + + // now make it start, just in case it hadn't already. + process.nextTick(function() { + stream.emit('readable'); + }); +} + +// wrap an old-style stream as the async data source. +// This is *not* part of the readable stream interface. +// It is an ugly unfortunate mess of history. +Readable.prototype.wrap = function(stream) { + var state = this._readableState; + var paused = false; + + stream.on('end', function() { + state.ended = true; + if (state.length === 0) + this.emit('end'); + }.bind(this)); + + stream.on('data', function(chunk) { + state.buffer.push(chunk); + state.length += chunk.length; + this.emit('readable'); + + // if not consumed, then pause the stream. + if (state.length > state.lowWaterMark && !paused) { + paused = true; + stream.pause(); + } + }.bind(this)); + + // proxy all the other methods. + // important when wrapping filters and duplexes. + for (var i in stream) { + if (typeof stream[i] === 'function' && + typeof this[i] === 'undefined') { + this[i] = function(method) { return function() { + return stream[method].apply(stream, arguments); + }}(i); + } + } + + // proxy certain important events. + var events = ['error', 'close', 'destroy', 'pause', 'resume']; + events.forEach(function(ev) { + stream.on(ev, this.emit.bind(this, ev)); + }.bind(this)); + + // consume some bytes. if not all is consumed, then + // pause the underlying stream. + this.read = function(n) { + if (state.length === 0) { + state.needReadable = true; + return null; + } + + if (isNaN(n) || n <= 0) + n = state.length; + + if (n > state.length) { + if (!state.ended) { + state.needReadable = true; + return null; + } else + n = state.length; + } + + var ret = fromList(n, state.buffer, state.length); + state.length -= n; + + if (state.length < state.lowWaterMark && paused) { + stream.resume(); + paused = false; + } + + if (state.length === 0 && state.ended) + process.nextTick(this.emit.bind(this, 'end')); + + return ret; + }; +}; + + + +// exposed for testing purposes only. +Readable._fromList = fromList; + +// Pluck off n bytes from an array of buffers. +// Length is the combined lengths of all the buffers in the list. +// If there's no data, then +function fromList(n, list, length) { + var ret; + + // nothing in the list, definitely empty. + if (list.length === 0) { + return null; + } + + if (length === 0) { + ret = null; + } else if (!n || n >= length) { + // read it all, truncate the array. + ret = Buffer.concat(list, length); + list.length = 0; + } else { + // read just some of it. + if (n < list[0].length) { + // just take a part of the first list item. + var buf = list[0]; + ret = buf.slice(0, n); + list[0] = buf.slice(n); + } else if (n === list[0].length) { + // first list is a perfect match + ret = list.shift(); + } else { + // complex case. + // we have enough to cover it, but it spans past the first buffer. + ret = new Buffer(n); + var c = 0; + for (var i = 0, l = list.length; i < l && c < n; i++) { + var buf = list[0]; + var cpy = Math.min(n - c, buf.length); + buf.copy(ret, c, 0, cpy); + if (cpy < buf.length) { + list[0] = buf.slice(cpy); + } else { + list.shift(); + } + c += cpy; + } + } + } + + return ret; +} diff --git a/lib/_stream_transform.js b/lib/_stream_transform.js new file mode 100644 index 0000000000..79d40cffab --- /dev/null +++ b/lib/_stream_transform.js @@ -0,0 +1,123 @@ +// Copyright Joyent, Inc. and other Node contributors. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the +// "Software"), to deal in the Software without restriction, including +// without limitation the rights to use, copy, modify, merge, publish, +// distribute, sublicense, and/or sell copies of the Software, and to permit +// persons to whom the Software is furnished to do so, subject to the +// following conditions: +// +// The above copyright notice and this permission notice shall be included +// in all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN +// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR +// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE +// USE OR OTHER DEALINGS IN THE SOFTWARE. + +// a transform stream is a readable/writable stream where you do +// something with the data. Sometimes it's called a "filter", +// but that's not a great name for it, since that implies a thing where +// some bits pass through, and others are simply ignored. (That would +// be a valid example of a transform, of course.) +// +// While the output is causally related to the input, it's not a +// necessarily symmetric or synchronous transformation. For example, +// a zlib stream might take multiple plain-text writes(), and then +// emit a single compressed chunk some time in the future. + +module.exports = Transform; + +var Duplex = require('_stream_duplex'); +var util = require('util'); +util.inherits(Transform, Duplex); + +function Transform(options) { + Duplex.call(this, options); + + // bind output so that it can be passed around as a regular function. + this._output = this._output.bind(this); + + // when the writable side finishes, then flush out anything remaining. + this.once('finish', function() { + if ('function' === typeof this._flush) + this._flush(this._output, done.bind(this)); + else + done.call(this); + }); +} + +// This is the part where you do stuff! +// override this function in implementation classes. +// 'chunk' is an input chunk. +// +// Call `output(newChunk)` to pass along transformed output +// to the readable side. You may call 'output' zero or more times. +// +// Call `cb(err)` when you are done with this chunk. If you pass +// an error, then that'll put the hurt on the whole operation. If you +// never call cb(), then you'll never get another chunk. +Transform.prototype._transform = function(chunk, output, cb) { + throw new Error('not implemented'); +}; + + +Transform.prototype._write = function(chunk, cb) { + this._transform(chunk, this._output, cb); +}; + +Transform.prototype._read = function(n, cb) { + var ws = this._writableState; + var rs = this._readableState; + + // basically a no-op, since the _transform will fill the + // _readableState.buffer and emit 'readable' for us, and set ended + // Usually, we want to just not call the cb, and set the reading + // flag to false, so that another _read will happen next time, + // but no state changes. + rs.reading = false; + + // however, if the writable side has ended, and its buffer is clear, + // then that means that the input has all been consumed, and no more + // will ever be provide. treat this as an EOF, and pass back 0 bytes. + if ((ws.ended || ws.ending) && ws.length === 0) + cb(); +}; + +Transform.prototype._output = function(chunk) { + if (!chunk || !chunk.length) + return; + + var state = this._readableState; + var len = state.length; + state.buffer.push(chunk); + state.length += chunk.length; + if (state.needReadable) { + state.needReadable = false; + this.emit('readable'); + } +}; + +function done(er) { + if (er) + return this.emit('error', er); + + // if there's nothing in the write buffer, then that means + // that nothing more will ever be provided + var ws = this._writableState; + var rs = this._readableState; + + rs.ended = true; + // we may have gotten a 'null' read before, and since there is + // no more data coming from the writable side, we need to emit + // now so that the consumer knows to pick up the tail bits. + if (rs.length && rs.needReadable) + this.emit('readable'); + else if (rs.length === 0) { + this.emit('end'); + } +} diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js new file mode 100644 index 0000000000..e2343e63f9 --- /dev/null +++ b/lib/_stream_writable.js @@ -0,0 +1,135 @@ +// Copyright Joyent, Inc. and other Node contributors. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the +// "Software"), to deal in the Software without restriction, including +// without limitation the rights to use, copy, modify, merge, publish, +// distribute, sublicense, and/or sell copies of the Software, and to permit +// persons to whom the Software is furnished to do so, subject to the +// following conditions: +// +// The above copyright notice and this permission notice shall be included +// in all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN +// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR +// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE +// USE OR OTHER DEALINGS IN THE SOFTWARE. + +// A bit simpler than readable streams. +// Implement an async ._write(chunk, cb), and it'll handle all +// the drain event emission and buffering. + +module.exports = Writable + +var util = require('util'); +var Stream = require('stream'); + +util.inherits(Writable, Stream); + +function WritableState(options) { + options = options || {}; + this.highWaterMark = options.highWaterMark || 16 * 1024; + this.lowWaterMark = options.lowWaterMark || 1024; + this.needDrain = false; + this.ended = false; + this.ending = false; + + // not an actual buffer we keep track of, but a measurement + // of how much we're waiting to get pushed to some underlying + // socket or file. + this.length = 0; + + this.writing = false; + this.buffer = []; +} + +function Writable(options) { + this._writableState = new WritableState(options); + + // legacy. + this.writable = true; + + Stream.call(this); +} + +// Override this method for sync streams +// override the _write(chunk, cb) method for async streams +Writable.prototype.write = function(chunk, encoding) { + var state = this._writableState; + if (state.ended) { + this.emit('error', new Error('write after end')); + return; + } + + if (typeof chunk === 'string' && encoding) + chunk = new Buffer(chunk, encoding); + + var ret = state.length >= state.highWaterMark; + if (ret === false) + state.needDrain = true; + + var l = chunk.length; + state.length += l; + + if (state.writing) { + state.buffer.push(chunk); + return ret; + } + + state.writing = true; + this._write(chunk, function writecb(er) { + state.writing = false; + if (er) { + this.emit('error', er); + return; + } + state.length -= l; + + if (state.length === 0 && (state.ended || state.ending)) { + // emit 'finish' at the very end. + this.emit('finish'); + return; + } + + // if there's something in the buffer waiting, then do that, too. + if (state.buffer.length) { + chunk = state.buffer.shift(); + l = chunk.length; + state.writing = true; + this._write(chunk, writecb.bind(this)); + } + + if (state.length < state.lowWaterMark && state.needDrain) { + // Must force callback to be called on nextTick, so that we don't + // emit 'drain' before the write() consumer gets the 'false' return + // value, and has a chance to attach a 'drain' listener. + process.nextTick(function() { + if (!state.needDrain) + return; + state.needDrain = false; + this.emit('drain'); + }.bind(this)); + } + + }.bind(this)); + + return ret; +}; + +Writable.prototype._write = function(chunk, cb) { + process.nextTick(cb.bind(this, new Error('not implemented'))); +}; + +Writable.prototype.end = function(chunk, encoding) { + var state = this._writableState; + state.ending = true; + if (chunk) + this.write(chunk, encoding); + else if (state.length === 0) + this.emit('finish'); + state.ended = true; +}; diff --git a/lib/stream.js b/lib/stream.js index 16e2e0e723..481d7644e5 100644 --- a/lib/stream.js +++ b/lib/stream.js @@ -19,17 +19,30 @@ // OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE // USE OR OTHER DEALINGS IN THE SOFTWARE. +module.exports = Stream; + var events = require('events'); var util = require('util'); -function Stream() { - events.EventEmitter.call(this); -} util.inherits(Stream, events.EventEmitter); -module.exports = Stream; +Stream.Readable = require('_stream_readable'); +Stream.Writable = require('_stream_writable'); +Stream.Duplex = require('_stream_duplex'); +Stream.Transform = require('_stream_transform'); +Stream.PassThrough = require('_stream_passthrough'); + // Backwards-compat with node 0.4.x Stream.Stream = Stream; + + +// old-style streams. Note that the pipe method (the only relevant +// part of this class) is overridden in the Readable class. + +function Stream() { + events.EventEmitter.call(this); +} + Stream.prototype.pipe = function(dest, options) { var source = this; diff --git a/node.gyp b/node.gyp index f6651db83d..14058eb960 100644 --- a/node.gyp +++ b/node.gyp @@ -44,6 +44,11 @@ 'lib/readline.js', 'lib/repl.js', 'lib/stream.js', + 'lib/_stream_readable.js', + 'lib/_stream_writable.js', + 'lib/_stream_duplex.js', + 'lib/_stream_transform.js', + 'lib/_stream_passthrough.js', 'lib/string_decoder.js', 'lib/sys.js', 'lib/timers.js',