diff --git a/doc/api/stream.markdown b/doc/api/stream.markdown index ce7656b8d7..8b85fcc4e5 100644 --- a/doc/api/stream.markdown +++ b/doc/api/stream.markdown @@ -495,6 +495,17 @@ programs. However, you **are** expected to override this method in your own extension classes. +### writable.\_writev(chunks, callback) + +* `chunks` {Array} The chunks to be written, each chunk should have following + format: `{ chunk: ..., encoding: ... }`. +* `callback` {Function} Call this function (optionally with an error + argument) when you are done processing the supplied chunks. + +NOTE: This function is completely optional to implement. Even more, in the most +of the cases you won't need it. + + ### writable.write(chunk, [encoding], [callback]) * `chunk` {Buffer | String} Data to be written @@ -512,6 +523,16 @@ the buffer is full, and the data will be sent out in the future. The The specifics of when `write()` will return false, is determined by the `highWaterMark` option provided to the constructor. +### writable.cork() + +Forces buffering of all writes. + +NOTE: buffered data will be flushed either at `.uncork()` or at `.end()` call. + +### writable.uncork() + +Flush all data, buffered since `.cork()` call. + ### writable.end([chunk], [encoding], [callback]) * `chunk` {Buffer | String} Optional final data to be written diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index c060e015a0..9042df721b 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -76,6 +76,9 @@ function WritableState(options, stream) { // a flag to see when we're in the middle of a write. this.writing = false; + // when true all writes will be buffered until .uncork() call + this.corked = false; + // a flag to be able to tell if the onwrite cb is called immediately, // or on a later tick. We set this to true at first, becuase any // actions that shouldn't happen until "later" should generally also @@ -174,6 +177,26 @@ Writable.prototype.write = function(chunk, encoding, cb) { return ret; }; +Writable.prototype.cork = function() { + var state = this._writableState; + + state.corked = true; +}; + +Writable.prototype.uncork = function() { + var state = this._writableState; + + if (state.corked) { + state.corked = false; + + if (!state.writing && + !state.finished && + !state.bufferProcessing && + state.buffer.length) + clearBuffer(this, state); + } +}; + function decodeChunk(state, chunk, encoding) { if (!state.objectMode && state.decodeStrings !== false && @@ -195,20 +218,23 @@ function writeOrBuffer(stream, state, chunk, encoding, cb) { var ret = state.length < state.highWaterMark; state.needDrain = !ret; - if (state.writing) + if (state.writing || state.corked) state.buffer.push(new WriteReq(chunk, encoding, cb)); else - doWrite(stream, state, len, chunk, encoding, cb); + doWrite(stream, state, false, len, chunk, encoding, cb); return ret; } -function doWrite(stream, state, len, chunk, encoding, cb) { +function doWrite(stream, state, writev, len, chunk, encoding, cb) { state.writelen = len; state.writecb = cb; state.writing = true; state.sync = true; - stream._write(chunk, encoding, state.onwrite); + if (writev) + stream._writev(chunk, state.onwrite); + else + stream._write(chunk, encoding, state.onwrite); state.sync = false; } @@ -243,8 +269,12 @@ function onwrite(stream, er) { // Check if we're actually ready to finish, but don't emit yet var finished = needFinish(stream, state); - if (!finished && !state.bufferProcessing && state.buffer.length) + if (!finished && + !state.corked && + !state.bufferProcessing && + state.buffer.length) { clearBuffer(stream, state); + } if (sync) { process.nextTick(function() { @@ -279,36 +309,56 @@ function onwriteDrain(stream, state) { function clearBuffer(stream, state) { state.bufferProcessing = true; - for (var c = 0; c < state.buffer.length; c++) { - var entry = state.buffer[c]; - var chunk = entry.chunk; - var encoding = entry.encoding; - var cb = entry.callback; - var len = state.objectMode ? 1 : chunk.length; - - doWrite(stream, state, len, chunk, encoding, cb); - - // if we didn't call the onwrite immediately, then - // it means that we need to wait until it does. - // also, that means that the chunk and cb are currently - // being processed, so move the buffer counter past them. - if (state.writing) { - c++; - break; + if (stream._writev && state.buffer.length > 1) { + // Fast case, write everything using _writev() + var cbs = []; + for (var c = 0; c < state.buffer.length; c++) + cbs.push(state.buffer[c].callback); + + doWrite(stream, state, true, state.length, state.buffer, '', function(err) { + for (var i = 0; i < cbs.length; i++) + cbs[i](err); + }); + + // Clear buffer + state.buffer = []; + } else { + // Slow case, write chunks one-by-one + for (var c = 0; c < state.buffer.length; c++) { + var entry = state.buffer[c]; + var chunk = entry.chunk; + var encoding = entry.encoding; + var cb = entry.callback; + var len = state.objectMode ? 1 : chunk.length; + + doWrite(stream, state, false, len, chunk, encoding, cb); + + // if we didn't call the onwrite immediately, then + // it means that we need to wait until it does. + // also, that means that the chunk and cb are currently + // being processed, so move the buffer counter past them. + if (state.writing) { + c++; + break; + } } + + if (c < state.buffer.length) + state.buffer = state.buffer.slice(c); + else + state.buffer.length = 0; } state.bufferProcessing = false; - if (c < state.buffer.length) - state.buffer = state.buffer.slice(c); - else - state.buffer.length = 0; } Writable.prototype._write = function(chunk, encoding, cb) { cb(new Error('not implemented')); + }; +Writable.prototype._writev = null; + Writable.prototype.end = function(chunk, encoding, cb) { var state = this._writableState; @@ -324,6 +374,9 @@ Writable.prototype.end = function(chunk, encoding, cb) { if (typeof chunk !== 'undefined' && chunk !== null) this.write(chunk, encoding); + // .end() should .uncork() + this.uncork(); + // ignore unnecessary end() calls. if (!state.ending && !state.finished) endWritable(this, state, cb);