Browse Source

streams: introduce .cork/.uncork/._writev

v0.11.2-release
Fedor Indutny 12 years ago
parent
commit
21ed8df696
  1. 21
      doc/api/stream.markdown
  2. 103
      lib/_stream_writable.js

21
doc/api/stream.markdown

@ -495,6 +495,17 @@ programs. However, you **are** expected to override this method in
your own extension classes. your own extension classes.
### writable.\_writev(chunks, callback)
* `chunks` {Array} The chunks to be written, each chunk should have following
format: `{ chunk: ..., encoding: ... }`.
* `callback` {Function} Call this function (optionally with an error
argument) when you are done processing the supplied chunks.
NOTE: This function is completely optional to implement. Even more, in the most
of the cases you won't need it.
### writable.write(chunk, [encoding], [callback]) ### writable.write(chunk, [encoding], [callback])
* `chunk` {Buffer | String} Data to be written * `chunk` {Buffer | String} Data to be written
@ -512,6 +523,16 @@ the buffer is full, and the data will be sent out in the future. The
The specifics of when `write()` will return false, is determined by The specifics of when `write()` will return false, is determined by
the `highWaterMark` option provided to the constructor. the `highWaterMark` option provided to the constructor.
### writable.cork()
Forces buffering of all writes.
NOTE: buffered data will be flushed either at `.uncork()` or at `.end()` call.
### writable.uncork()
Flush all data, buffered since `.cork()` call.
### writable.end([chunk], [encoding], [callback]) ### writable.end([chunk], [encoding], [callback])
* `chunk` {Buffer | String} Optional final data to be written * `chunk` {Buffer | String} Optional final data to be written

103
lib/_stream_writable.js

@ -76,6 +76,9 @@ function WritableState(options, stream) {
// a flag to see when we're in the middle of a write. // a flag to see when we're in the middle of a write.
this.writing = false; this.writing = false;
// when true all writes will be buffered until .uncork() call
this.corked = false;
// a flag to be able to tell if the onwrite cb is called immediately, // a flag to be able to tell if the onwrite cb is called immediately,
// or on a later tick. We set this to true at first, becuase any // or on a later tick. We set this to true at first, becuase any
// actions that shouldn't happen until "later" should generally also // actions that shouldn't happen until "later" should generally also
@ -174,6 +177,26 @@ Writable.prototype.write = function(chunk, encoding, cb) {
return ret; return ret;
}; };
Writable.prototype.cork = function() {
var state = this._writableState;
state.corked = true;
};
Writable.prototype.uncork = function() {
var state = this._writableState;
if (state.corked) {
state.corked = false;
if (!state.writing &&
!state.finished &&
!state.bufferProcessing &&
state.buffer.length)
clearBuffer(this, state);
}
};
function decodeChunk(state, chunk, encoding) { function decodeChunk(state, chunk, encoding) {
if (!state.objectMode && if (!state.objectMode &&
state.decodeStrings !== false && state.decodeStrings !== false &&
@ -195,20 +218,23 @@ function writeOrBuffer(stream, state, chunk, encoding, cb) {
var ret = state.length < state.highWaterMark; var ret = state.length < state.highWaterMark;
state.needDrain = !ret; state.needDrain = !ret;
if (state.writing) if (state.writing || state.corked)
state.buffer.push(new WriteReq(chunk, encoding, cb)); state.buffer.push(new WriteReq(chunk, encoding, cb));
else else
doWrite(stream, state, len, chunk, encoding, cb); doWrite(stream, state, false, len, chunk, encoding, cb);
return ret; return ret;
} }
function doWrite(stream, state, len, chunk, encoding, cb) { function doWrite(stream, state, writev, len, chunk, encoding, cb) {
state.writelen = len; state.writelen = len;
state.writecb = cb; state.writecb = cb;
state.writing = true; state.writing = true;
state.sync = true; state.sync = true;
stream._write(chunk, encoding, state.onwrite); if (writev)
stream._writev(chunk, state.onwrite);
else
stream._write(chunk, encoding, state.onwrite);
state.sync = false; state.sync = false;
} }
@ -243,8 +269,12 @@ function onwrite(stream, er) {
// Check if we're actually ready to finish, but don't emit yet // Check if we're actually ready to finish, but don't emit yet
var finished = needFinish(stream, state); var finished = needFinish(stream, state);
if (!finished && !state.bufferProcessing && state.buffer.length) if (!finished &&
!state.corked &&
!state.bufferProcessing &&
state.buffer.length) {
clearBuffer(stream, state); clearBuffer(stream, state);
}
if (sync) { if (sync) {
process.nextTick(function() { process.nextTick(function() {
@ -279,36 +309,56 @@ function onwriteDrain(stream, state) {
function clearBuffer(stream, state) { function clearBuffer(stream, state) {
state.bufferProcessing = true; state.bufferProcessing = true;
for (var c = 0; c < state.buffer.length; c++) { if (stream._writev && state.buffer.length > 1) {
var entry = state.buffer[c]; // Fast case, write everything using _writev()
var chunk = entry.chunk; var cbs = [];
var encoding = entry.encoding; for (var c = 0; c < state.buffer.length; c++)
var cb = entry.callback; cbs.push(state.buffer[c].callback);
var len = state.objectMode ? 1 : chunk.length;
doWrite(stream, state, true, state.length, state.buffer, '', function(err) {
doWrite(stream, state, len, chunk, encoding, cb); for (var i = 0; i < cbs.length; i++)
cbs[i](err);
// if we didn't call the onwrite immediately, then });
// it means that we need to wait until it does.
// also, that means that the chunk and cb are currently // Clear buffer
// being processed, so move the buffer counter past them. state.buffer = [];
if (state.writing) { } else {
c++; // Slow case, write chunks one-by-one
break; for (var c = 0; c < state.buffer.length; c++) {
var entry = state.buffer[c];
var chunk = entry.chunk;
var encoding = entry.encoding;
var cb = entry.callback;
var len = state.objectMode ? 1 : chunk.length;
doWrite(stream, state, false, len, chunk, encoding, cb);
// if we didn't call the onwrite immediately, then
// it means that we need to wait until it does.
// also, that means that the chunk and cb are currently
// being processed, so move the buffer counter past them.
if (state.writing) {
c++;
break;
}
} }
if (c < state.buffer.length)
state.buffer = state.buffer.slice(c);
else
state.buffer.length = 0;
} }
state.bufferProcessing = false; state.bufferProcessing = false;
if (c < state.buffer.length)
state.buffer = state.buffer.slice(c);
else
state.buffer.length = 0;
} }
Writable.prototype._write = function(chunk, encoding, cb) { Writable.prototype._write = function(chunk, encoding, cb) {
cb(new Error('not implemented')); cb(new Error('not implemented'));
}; };
Writable.prototype._writev = null;
Writable.prototype.end = function(chunk, encoding, cb) { Writable.prototype.end = function(chunk, encoding, cb) {
var state = this._writableState; var state = this._writableState;
@ -324,6 +374,9 @@ Writable.prototype.end = function(chunk, encoding, cb) {
if (typeof chunk !== 'undefined' && chunk !== null) if (typeof chunk !== 'undefined' && chunk !== null)
this.write(chunk, encoding); this.write(chunk, encoding);
// .end() should .uncork()
this.uncork();
// ignore unnecessary end() calls. // ignore unnecessary end() calls.
if (!state.ending && !state.finished) if (!state.ending && !state.finished)
endWritable(this, state, cb); endWritable(this, state, cb);

Loading…
Cancel
Save