diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index 4eb2a5f981..a84cbc3785 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -73,13 +73,18 @@ function Writable(options) { // Override this method for sync streams // override the _write(chunk, cb) method for async streams -Writable.prototype.write = function(chunk, encoding) { +Writable.prototype.write = function(chunk, encoding, cb) { var state = this._writableState; if (state.ended) { this.emit('error', new Error('write after end')); return; } + if (typeof encoding === 'function') { + cb = encoding; + encoding = null; + } + var l = chunk.length; if (false === state.decodeStrings) chunk = [chunk, encoding]; @@ -94,24 +99,43 @@ Writable.prototype.write = function(chunk, encoding) { if (ret === false) state.needDrain = true; + // if we're already writing something, then just put this + // in the queue, and wait our turn. if (state.writing) { - state.buffer.push(chunk); + state.buffer.push([chunk, cb]); return ret; } state.writing = true; + var sync = true; this._write(chunk, writecb.bind(this)); + sync = false; return ret; function writecb(er) { state.writing = false; if (er) { - this.emit('error', er); + if (cb) { + if (sync) + process.nextTick(cb.bind(null, er)); + else + cb(er); + } else + this.emit('error', er); return; } state.length -= l; + if (cb) { + // don't call the cb until the next tick if we're in sync mode. + // also, defer if we're about to write some more right now. + if (sync || state.buffer.length) + process.nextTick(cb); + else + cb(); + } + if (state.length === 0 && (state.ended || state.ending)) { // emit 'finish' at the very end. this.emit('finish'); @@ -120,8 +144,15 @@ Writable.prototype.write = function(chunk, encoding) { // if there's something in the buffer waiting, then do that, too. if (state.buffer.length) { - chunk = state.buffer.shift(); - l = chunk.length; + var chunkCb = state.buffer.shift(); + chunk = chunkCb[0]; + cb = chunkCb[1]; + + if (false === state.decodeStrings) + l = chunk[0].length; + else + l = chunk.length; + state.writing = true; this._write(chunk, writecb.bind(this)); }