Browse Source

stream: Split Writable logic into small functions

1. Get rid of unnecessary 'finishing' flag
2. Dont check both ending and ended. Extraneous.

Also: Remove extraneous 'finishing' flag, and don't check both 'ending'
and 'ended', since checking just 'ending' is sufficient.
v0.9.12-release
isaacs 12 years ago
parent
commit
049903e333
  1. 307
      lib/_stream_writable.js
  2. 3
      lib/net.js
  3. 2
      lib/tls.js

307
lib/_stream_writable.js

@ -53,10 +53,8 @@ function WritableState(options, stream) {
this.ending = false; this.ending = false;
// when end() has been called, and returned // when end() has been called, and returned
this.ended = false; this.ended = false;
// when 'finish' has emitted // when 'finish' is emitted
this.finished = false; this.finished = false;
// when 'finish' is being emitted
this.finishing = false;
// should we decode strings into buffers before passing to _write? // should we decode strings into buffers before passing to _write?
// this is here so that some node-core streams can optimize string // this is here so that some node-core streams can optimize string
@ -116,183 +114,196 @@ Writable.prototype.pipe = function() {
this.emit('error', new Error('Cannot pipe. Not readable.')); this.emit('error', new Error('Cannot pipe. Not readable.'));
}; };
// Override this method or _write(chunk, cb)
Writable.prototype.write = function(chunk, encoding, cb) {
var state = this._writableState;
if (typeof encoding === 'function') {
cb = encoding;
encoding = null;
}
if (state.ended) { function writeAfterEnd(stream, state, cb) {
var self = this; var er = new Error('write after end');
var er = new Error('write after end'); // TODO: defer error events consistently everywhere, not just the cb
// TODO: defer error events consistently everywhere, not just the cb stream.emit('error', er);
self.emit('error', er); process.nextTick(function() {
if (typeof cb === 'function') { cb(er);
process.nextTick(function() { });
cb(er); }
});
}
return;
}
// If we get something that is not a buffer, string, null, or undefined, // If we get something that is not a buffer, string, null, or undefined,
// and we're not in objectMode, then that's an error. // and we're not in objectMode, then that's an error.
// Otherwise stream chunks are all considered to be of length=1, and the // Otherwise stream chunks are all considered to be of length=1, and the
// watermarks determine how many objects to keep in the buffer, rather than // watermarks determine how many objects to keep in the buffer, rather than
// how many bytes or characters. // how many bytes or characters.
function validChunk(stream, state, chunk, cb) {
var valid = true;
if (!Buffer.isBuffer(chunk) && if (!Buffer.isBuffer(chunk) &&
'string' !== typeof chunk && 'string' !== typeof chunk &&
chunk !== null && chunk !== null &&
chunk !== undefined && chunk !== undefined &&
!state.objectMode) { !state.objectMode) {
var er = new TypeError('Invalid non-string/buffer chunk'); var er = new TypeError('Invalid non-string/buffer chunk');
if (typeof cb === 'function') stream.emit('error', er);
process.nextTick(function() {
cb(er); cb(er);
this.emit('error', er); });
return; valid = false;
} }
return valid;
}
var len; function decodeChunk(state, chunk, encoding) {
if (state.objectMode) if (!state.objectMode &&
len = 1; state.decodeStrings !== false &&
else { typeof chunk === 'string') {
len = chunk.length; chunk = new Buffer(chunk, encoding);
if (false === state.decodeStrings) }
chunk = [chunk, encoding || 'utf8']; return chunk;
else if (typeof chunk === 'string') { }
chunk = new Buffer(chunk, encoding);
len = chunk.length; Writable.prototype.write = function(chunk, encoding, cb) {
} var state = this._writableState;
var ret = false;
if (typeof encoding === 'function') {
cb = encoding;
encoding = null;
} }
if (!encoding)
encoding = 'utf8';
if (typeof cb !== 'function')
cb = function() {};
if (state.ended)
writeAfterEnd(this, state, cb);
else if (validChunk(this, state, chunk, cb))
ret = writeOrBuffer(this, state, chunk, encoding, cb);
return ret;
};
// if we're already writing something, then just put this
// in the queue, and wait our turn. Otherwise, call _write
// If we return false, then we need a drain event, so set that flag.
function writeOrBuffer(stream, state, chunk, encoding, cb) {
chunk = decodeChunk(state, chunk, encoding);
var len = state.objectMode ? 1 : chunk.length;
// XXX Remove. _write() should take an encoding.
if (state.decodeStrings === false)
chunk = [chunk, encoding];
state.length += len; state.length += len;
var ret = state.length < state.highWaterMark; var ret = state.length < state.highWaterMark;
if (ret === false) state.needDrain = !ret;
state.needDrain = true;
// if we're already writing something, then just put this
// in the queue, and wait our turn.
if (state.writing) {
state.buffer.push([chunk, cb]);
return ret;
}
state.writing = true; if (state.writing)
state.sync = true; state.buffer.push([chunk, cb]); // XXX [chunk,encoding,cb]
else
doWrite(stream, state, len, chunk, encoding, cb);
return ret;
}
function doWrite(stream, state, len, chunk, encoding, cb) {
state.writelen = len; state.writelen = len;
state.writecb = cb; state.writecb = cb;
this._write(chunk, state.onwrite); state.writing = true;
state.sync = true;
// XXX stream._write(chunk, encoding, state.onwrite)
stream._write(chunk, state.onwrite);
state.sync = false; state.sync = false;
}
return ret; function onwriteError(stream, state, sync, er, cb) {
}; if (sync)
process.nextTick(function() {
cb(er);
});
else
cb(er);
stream.emit('error', er);
}
function onwriteStateUpdate(state) {
state.writing = false;
state.writecb = null;
state.length -= state.writelen;
state.writelen = 0;
}
function onwrite(stream, er) { function onwrite(stream, er) {
var state = stream._writableState; var state = stream._writableState;
var sync = state.sync; var sync = state.sync;
var cb = state.writecb; var cb = state.writecb;
var len = state.writelen;
state.writing = false; onwriteStateUpdate(state);
state.writelen = null;
state.writecb = null;
if (er) { if (er)
if (cb) { onwriteError(stream, state, sync, er, cb);
// If _write(chunk,cb) calls cb() in this tick, we still defer else {
// the *user's* write callback to the next tick. if (!finishMaybe(stream, state)) {
// Never present an external API that is *sometimes* async! if (state.length === 0 && state.needDrain)
if (sync) onwriteDrain(stream, state);
process.nextTick(function() {
cb(er);
});
else
cb(er);
}
// backwards compatibility. still emit if there was a cb. if (!state.bufferProcessing && state.buffer.length)
stream.emit('error', er); clearBuffer(stream, state);
return; }
}
state.length -= len;
if (cb) {
// Don't call the cb until the next tick if we're in sync mode.
if (sync) if (sync)
process.nextTick(cb); process.nextTick(cb);
else else
cb(); cb();
} }
}
if (state.length === 0 && (state.ended || state.ending) && // Must force callback to be called on nextTick, so that we don't
!state.finished && !state.finishing) { // emit 'drain' before the write() consumer gets the 'false' return
// emit 'finish' at the very end. // value, and has a chance to attach a 'drain' listener.
state.finishing = true; function onwriteDrain(stream, state) {
stream.emit('finish'); process.nextTick(function() {
state.finished = true; if (state.needDrain) {
return;
}
if (state.length === 0 && state.needDrain) {
// Must force callback to be called on nextTick, so that we don't
// emit 'drain' before the write() consumer gets the 'false' return
// value, and has a chance to attach a 'drain' listener.
process.nextTick(function() {
if (!state.needDrain)
return;
state.needDrain = false; state.needDrain = false;
stream.emit('drain'); stream.emit('drain');
});
}
// if there's something in the buffer waiting, then process it
// It would be nice if there were TCO in JS, and we could just
// shift the top off the buffer and _write that, but that approach
// causes RangeErrors when you have a very large number of very
// small writes, and is not very efficient otherwise.
if (!state.bufferProcessing && state.buffer.length) {
state.bufferProcessing = true;
for (var c = 0; c < state.buffer.length; c++) {
var chunkCb = state.buffer[c];
var chunk = chunkCb[0];
cb = chunkCb[1];
if (state.objectMode)
len = 1;
else if (false === state.decodeStrings)
len = chunk[0].length;
else
len = chunk.length;
state.writelen = len;
state.writecb = cb;
state.writechunk = chunk;
state.writing = true;
state.sync = true;
stream._write(chunk, state.onwrite);
state.sync = false;
// if we didn't call the onwrite immediately, then
// it means that we need to wait until it does.
// also, that means that the chunk and cb are currently
// being processed, so move the buffer counter past them.
if (state.writing) {
c++;
break;
}
} }
});
}
state.bufferProcessing = false;
if (c < state.buffer.length) // if there's something in the buffer waiting, then process it
state.buffer = state.buffer.slice(c); function clearBuffer(stream, state) {
else state.bufferProcessing = true;
state.buffer.length = 0;
// XXX buffer entry should be [chunk, encoding, cb]
for (var c = 0; c < state.buffer.length; c++) {
var chunkCb = state.buffer[c];
var chunk = chunkCb[0];
var cb = chunkCb[1];
var encoding = '';
var len;
if (state.objectMode)
len = 1;
else if (false === state.decodeStrings) {
len = chunk[0].length;
encoding = chunk[1];
} else
len = chunk.length;
doWrite(stream, state, len, chunk, encoding, cb);
// if we didn't call the onwrite immediately, then
// it means that we need to wait until it does.
// also, that means that the chunk and cb are currently
// being processed, so move the buffer counter past them.
if (state.writing) {
c++;
break;
}
} }
state.bufferProcessing = false;
if (c < state.buffer.length)
state.buffer = state.buffer.slice(c);
else
state.buffer.length = 0;
} }
Writable.prototype._write = function(chunk, cb) { Writable.prototype._write = function(chunk, cb) {
@ -317,19 +328,23 @@ Writable.prototype.end = function(chunk, encoding, cb) {
this.write(chunk, encoding); this.write(chunk, encoding);
// ignore unnecessary end() calls. // ignore unnecessary end() calls.
if (!state.ending && !state.ended && !state.finished) if (!state.ending && !state.finished)
endWritable(this, state, cb); endWritable(this, state, cb);
}; };
function endWritable(stream, state, cb) { function finishMaybe(stream, state) {
state.ending = true; if (state.ending && state.length === 0 && !state.finished) {
if (state.length === 0 && !state.finishing) {
state.finishing = true;
stream.emit('finish');
state.finished = true; state.finished = true;
stream.emit('finish');
} }
return state.finished;
}
function endWritable(stream, state, cb) {
state.ending = true;
finishMaybe(stream, state);
if (cb) { if (cb) {
if (state.finished || state.finishing) if (state.finished)
process.nextTick(cb); process.nextTick(cb);
else else
stream.once('finish', cb); stream.once('finish', cb);

3
lib/net.js

@ -391,7 +391,7 @@ Socket.prototype.destroySoon = function() {
if (this.writable) if (this.writable)
this.end(); this.end();
if (this._writableState.finishing || this._writableState.finished) if (this._writableState.finished)
this.destroy(); this.destroy();
else else
this.once('finish', this.destroy); this.once('finish', this.destroy);
@ -748,7 +748,6 @@ Socket.prototype.connect = function(options, cb) {
this._writableState.ended = false; this._writableState.ended = false;
this._writableState.ending = false; this._writableState.ending = false;
this._writableState.finished = false; this._writableState.finished = false;
this._writableState.finishing = false;
this.destroyed = false; this.destroyed = false;
this._handle = null; this._handle = null;
} }

2
lib/tls.js

@ -573,7 +573,7 @@ CryptoStream.prototype.destroySoon = function(err) {
if (this.writable) if (this.writable)
this.end(); this.end();
if (this._writableState.finishing || this._writableState.finished) if (this._writableState.finished)
this.destroy(); this.destroy();
else else
this.once('finish', this.destroy); this.once('finish', this.destroy);

Loading…
Cancel
Save