diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index 744ec41945..cfcd2e25d7 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -27,20 +27,41 @@ module.exports = Writable; Writable.WritableState = WritableState; var util = require('util'); +var assert = require('assert'); var Stream = require('stream'); util.inherits(Writable, Stream); -function WritableState(options) { +function WritableState(options, stream) { options = options || {}; - this.highWaterMark = options.highWaterMark || 16 * 1024; + + // the point at which write() starts returning false this.highWaterMark = options.hasOwnProperty('highWaterMark') ? options.highWaterMark : 16 * 1024; + + // the point that it has to get to before we call _write(chunk,cb) + // default to pushing everything out as fast as possible. this.lowWaterMark = options.hasOwnProperty('lowWaterMark') ? - options.lowWaterMark : 1024; + options.lowWaterMark : 0; + + // cast to ints. + assert(typeof this.lowWaterMark === 'number'); + assert(typeof this.highWaterMark === 'number'); + this.lowWaterMark = ~~this.lowWaterMark; + this.highWaterMark = ~~this.highWaterMark; + assert(this.lowWaterMark >= 0); + assert(this.highWaterMark >= this.lowWaterMark, + this.highWaterMark + '>=' + this.lowWaterMark); + this.needDrain = false; - this.ended = false; + // at the start of calling end() this.ending = false; + // when end() has been called, and returned + this.ended = false; + // when 'finish' has emitted + this.finished = false; + // when 'finish' is being emitted + this.finishing = false; // should we decode strings into buffers before passing to _write? // this is here so that some node-core streams can optimize string @@ -53,7 +74,22 @@ function WritableState(options) { // socket or file. this.length = 0; + // a flag to see when we're in the middle of a write. this.writing = false; + + // a flag to be able to tell if the onwrite cb is called immediately, + // or on a later tick. + this.sync = false; + + // the callback that's passed to _write(chunk,cb) + this.onwrite = onwrite.bind(stream); + + // the callback that the user supplies to write(chunk,encoding,cb) + this.writecb = null; + + // the amount that is being written when _write is called. + this.writelen = 0; + this.buffer = []; } @@ -63,7 +99,7 @@ function Writable(options) { if (!(this instanceof Writable) && !(this instanceof Stream.Duplex)) return new Writable(options); - this._writableState = new WritableState(options); + this._writableState = new WritableState(options, this); // legacy. this.writable = true; @@ -71,23 +107,26 @@ function Writable(options) { Stream.call(this); } -// Override this method for sync streams -// override the _write(chunk, cb) method for async streams +// Override this method or _write(chunk, cb) Writable.prototype.write = function(chunk, encoding, cb) { var state = this._writableState; - if (state.ended) { - this.emit('error', new Error('write after end')); - return; - } if (typeof encoding === 'function') { cb = encoding; encoding = null; } + if (state.ended) { + var er = new Error('write after end'); + if (typeof cb === 'function') + cb(er); + this.emit('error', er); + return; + } + var l = chunk.length; if (false === state.decodeStrings) - chunk = [chunk, encoding]; + chunk = [chunk, encoding || 'utf8']; else if (typeof chunk === 'string' || encoding) { chunk = new Buffer(chunk + '', encoding); l = chunk.length; @@ -107,70 +146,84 @@ Writable.prototype.write = function(chunk, encoding, cb) { } state.writing = true; - var sync = true; - this._write(chunk, writecb.bind(this)); - sync = false; + state.sync = true; + state.writelen = l; + state.writecb = cb; + this._write(chunk, state.onwrite); + state.sync = false; return ret; +}; - function writecb(er) { - state.writing = false; - if (er) { - if (cb) { - if (sync) - process.nextTick(cb.bind(null, er)); - else - cb(er); - } else - this.emit('error', er); - return; - } - state.length -= l; +function onwrite(er) { + var state = this._writableState; + var sync = state.sync; + var cb = state.writecb; + var l = state.writelen; + + state.writing = false; + state.writelen = null; + state.writecb = null; + if (er) { if (cb) { - // don't call the cb until the next tick if we're in sync mode. - // also, defer if we're about to write some more right now. - if (sync || state.buffer.length) - process.nextTick(cb); - else - cb(); - } - - if (state.length === 0 && (state.ended || state.ending)) { - // emit 'finish' at the very end. - this.emit('finish'); - return; - } - - // if there's something in the buffer waiting, then do that, too. - if (state.buffer.length) { - var chunkCb = state.buffer.shift(); - chunk = chunkCb[0]; - cb = chunkCb[1]; - - if (false === state.decodeStrings) - l = chunk[0].length; + if (sync) + process.nextTick(cb.bind(null, er)); else - l = chunk.length; - - state.writing = true; - this._write(chunk, writecb.bind(this)); - } - - if (state.length <= state.lowWaterMark && state.needDrain) { - // Must force callback to be called on nextTick, so that we don't - // emit 'drain' before the write() consumer gets the 'false' return - // value, and has a chance to attach a 'drain' listener. - process.nextTick(function() { - if (!state.needDrain) - return; - state.needDrain = false; - this.emit('drain'); - }.bind(this)); - } + cb(er); + } else + this.emit('error', er); + return; + } + state.length -= l; + + if (cb) { + // don't call the cb until the next tick if we're in sync mode. + // also, defer if we're about to write some more right now. + if (sync || state.buffer.length) + process.nextTick(cb); + else + cb(); } -}; + if (state.length === 0 && (state.ended || state.ending)) { + // emit 'finish' at the very end. + state.finishing = true; + this.emit('finish'); + state.finished = true; + return; + } + + // if there's something in the buffer waiting, then do that, too. + if (state.buffer.length) { + var chunkCb = state.buffer.shift(); + var chunk = chunkCb[0]; + cb = chunkCb[1]; + + if (false === state.decodeStrings) + l = chunk[0].length; + else + l = chunk.length; + + state.writelen = l; + state.writecb = cb; + state.writechunk = chunk; + state.writing = true; + this._write(chunk, state.onwrite); + } + + if (state.length <= state.lowWaterMark && state.needDrain) { + // Must force callback to be called on nextTick, so that we don't + // emit 'drain' before the write() consumer gets the 'false' return + // value, and has a chance to attach a 'drain' listener. + process.nextTick(function() { + if (!state.needDrain) + return; + state.needDrain = false; + this.emit('drain'); + }.bind(this)); + } +} Writable.prototype._write = function(chunk, cb) { process.nextTick(cb.bind(this, new Error('not implemented'))); @@ -178,10 +231,18 @@ Writable.prototype._write = function(chunk, cb) { Writable.prototype.end = function(chunk, encoding) { var state = this._writableState; + + // ignore unnecessary end() calls. + if (state.ending || state.ended || state.finished) + return; + state.ending = true; if (chunk) this.write(chunk, encoding); - else if (state.length === 0) + else if (state.length === 0) { + state.finishing = true; this.emit('finish'); + state.finished = true; + } state.ended = true; };