From 049903e333f843d7587e7c40845a7d51ea5955f8 Mon Sep 17 00:00:00 2001 From: isaacs Date: Sun, 3 Mar 2013 16:12:02 -0800 Subject: [PATCH] stream: Split Writable logic into small functions 1. Get rid of unnecessary 'finishing' flag 2. Dont check both ending and ended. Extraneous. Also: Remove extraneous 'finishing' flag, and don't check both 'ending' and 'ended', since checking just 'ending' is sufficient. --- lib/_stream_writable.js | 307 +++++++++++++++++++++------------------- lib/net.js | 3 +- lib/tls.js | 2 +- 3 files changed, 163 insertions(+), 149 deletions(-) diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index debbab66de..2dff2d8c75 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -53,10 +53,8 @@ function WritableState(options, stream) { this.ending = false; // when end() has been called, and returned this.ended = false; - // when 'finish' has emitted + // when 'finish' is emitted this.finished = false; - // when 'finish' is being emitted - this.finishing = false; // should we decode strings into buffers before passing to _write? // this is here so that some node-core streams can optimize string @@ -116,183 +114,196 @@ Writable.prototype.pipe = function() { this.emit('error', new Error('Cannot pipe. Not readable.')); }; -// Override this method or _write(chunk, cb) -Writable.prototype.write = function(chunk, encoding, cb) { - var state = this._writableState; - - if (typeof encoding === 'function') { - cb = encoding; - encoding = null; - } - if (state.ended) { - var self = this; - var er = new Error('write after end'); - // TODO: defer error events consistently everywhere, not just the cb - self.emit('error', er); - if (typeof cb === 'function') { - process.nextTick(function() { - cb(er); - }); - } - return; - } +function writeAfterEnd(stream, state, cb) { + var er = new Error('write after end'); + // TODO: defer error events consistently everywhere, not just the cb + stream.emit('error', er); + process.nextTick(function() { + cb(er); + }); +} - // If we get something that is not a buffer, string, null, or undefined, - // and we're not in objectMode, then that's an error. - // Otherwise stream chunks are all considered to be of length=1, and the - // watermarks determine how many objects to keep in the buffer, rather than - // how many bytes or characters. +// If we get something that is not a buffer, string, null, or undefined, +// and we're not in objectMode, then that's an error. +// Otherwise stream chunks are all considered to be of length=1, and the +// watermarks determine how many objects to keep in the buffer, rather than +// how many bytes or characters. +function validChunk(stream, state, chunk, cb) { + var valid = true; if (!Buffer.isBuffer(chunk) && 'string' !== typeof chunk && chunk !== null && chunk !== undefined && !state.objectMode) { var er = new TypeError('Invalid non-string/buffer chunk'); - if (typeof cb === 'function') + stream.emit('error', er); + process.nextTick(function() { cb(er); - this.emit('error', er); - return; + }); + valid = false; } + return valid; +} - var len; - if (state.objectMode) - len = 1; - else { - len = chunk.length; - if (false === state.decodeStrings) - chunk = [chunk, encoding || 'utf8']; - else if (typeof chunk === 'string') { - chunk = new Buffer(chunk, encoding); - len = chunk.length; - } +function decodeChunk(state, chunk, encoding) { + if (!state.objectMode && + state.decodeStrings !== false && + typeof chunk === 'string') { + chunk = new Buffer(chunk, encoding); + } + return chunk; +} + +Writable.prototype.write = function(chunk, encoding, cb) { + var state = this._writableState; + var ret = false; + + if (typeof encoding === 'function') { + cb = encoding; + encoding = null; } + if (!encoding) + encoding = 'utf8'; + + if (typeof cb !== 'function') + cb = function() {}; + + if (state.ended) + writeAfterEnd(this, state, cb); + else if (validChunk(this, state, chunk, cb)) + ret = writeOrBuffer(this, state, chunk, encoding, cb); + + return ret; +}; + +// if we're already writing something, then just put this +// in the queue, and wait our turn. Otherwise, call _write +// If we return false, then we need a drain event, so set that flag. +function writeOrBuffer(stream, state, chunk, encoding, cb) { + chunk = decodeChunk(state, chunk, encoding); + var len = state.objectMode ? 1 : chunk.length; + + // XXX Remove. _write() should take an encoding. + if (state.decodeStrings === false) + chunk = [chunk, encoding]; state.length += len; var ret = state.length < state.highWaterMark; - if (ret === false) - state.needDrain = true; - - // if we're already writing something, then just put this - // in the queue, and wait our turn. - if (state.writing) { - state.buffer.push([chunk, cb]); - return ret; - } + state.needDrain = !ret; - state.writing = true; - state.sync = true; + if (state.writing) + state.buffer.push([chunk, cb]); // XXX [chunk,encoding,cb] + else + doWrite(stream, state, len, chunk, encoding, cb); + + return ret; +} + +function doWrite(stream, state, len, chunk, encoding, cb) { state.writelen = len; state.writecb = cb; - this._write(chunk, state.onwrite); + state.writing = true; + state.sync = true; + // XXX stream._write(chunk, encoding, state.onwrite) + stream._write(chunk, state.onwrite); state.sync = false; +} - return ret; -}; +function onwriteError(stream, state, sync, er, cb) { + if (sync) + process.nextTick(function() { + cb(er); + }); + else + cb(er); + + stream.emit('error', er); +} + +function onwriteStateUpdate(state) { + state.writing = false; + state.writecb = null; + state.length -= state.writelen; + state.writelen = 0; +} function onwrite(stream, er) { var state = stream._writableState; var sync = state.sync; var cb = state.writecb; - var len = state.writelen; - state.writing = false; - state.writelen = null; - state.writecb = null; + onwriteStateUpdate(state); - if (er) { - if (cb) { - // If _write(chunk,cb) calls cb() in this tick, we still defer - // the *user's* write callback to the next tick. - // Never present an external API that is *sometimes* async! - if (sync) - process.nextTick(function() { - cb(er); - }); - else - cb(er); - } + if (er) + onwriteError(stream, state, sync, er, cb); + else { + if (!finishMaybe(stream, state)) { + if (state.length === 0 && state.needDrain) + onwriteDrain(stream, state); - // backwards compatibility. still emit if there was a cb. - stream.emit('error', er); - return; - } - state.length -= len; + if (!state.bufferProcessing && state.buffer.length) + clearBuffer(stream, state); + } - if (cb) { - // Don't call the cb until the next tick if we're in sync mode. if (sync) process.nextTick(cb); else cb(); } +} - if (state.length === 0 && (state.ended || state.ending) && - !state.finished && !state.finishing) { - // emit 'finish' at the very end. - state.finishing = true; - stream.emit('finish'); - state.finished = true; - return; - } - - if (state.length === 0 && state.needDrain) { - // Must force callback to be called on nextTick, so that we don't - // emit 'drain' before the write() consumer gets the 'false' return - // value, and has a chance to attach a 'drain' listener. - process.nextTick(function() { - if (!state.needDrain) - return; +// Must force callback to be called on nextTick, so that we don't +// emit 'drain' before the write() consumer gets the 'false' return +// value, and has a chance to attach a 'drain' listener. +function onwriteDrain(stream, state) { + process.nextTick(function() { + if (state.needDrain) { state.needDrain = false; stream.emit('drain'); - }); - } - - // if there's something in the buffer waiting, then process it - // It would be nice if there were TCO in JS, and we could just - // shift the top off the buffer and _write that, but that approach - // causes RangeErrors when you have a very large number of very - // small writes, and is not very efficient otherwise. - if (!state.bufferProcessing && state.buffer.length) { - state.bufferProcessing = true; - - for (var c = 0; c < state.buffer.length; c++) { - var chunkCb = state.buffer[c]; - var chunk = chunkCb[0]; - cb = chunkCb[1]; - - if (state.objectMode) - len = 1; - else if (false === state.decodeStrings) - len = chunk[0].length; - else - len = chunk.length; - - state.writelen = len; - state.writecb = cb; - state.writechunk = chunk; - state.writing = true; - state.sync = true; - stream._write(chunk, state.onwrite); - state.sync = false; - - // if we didn't call the onwrite immediately, then - // it means that we need to wait until it does. - // also, that means that the chunk and cb are currently - // being processed, so move the buffer counter past them. - if (state.writing) { - c++; - break; - } } + }); +} - state.bufferProcessing = false; - if (c < state.buffer.length) - state.buffer = state.buffer.slice(c); - else - state.buffer.length = 0; + +// if there's something in the buffer waiting, then process it +function clearBuffer(stream, state) { + state.bufferProcessing = true; + + // XXX buffer entry should be [chunk, encoding, cb] + for (var c = 0; c < state.buffer.length; c++) { + var chunkCb = state.buffer[c]; + var chunk = chunkCb[0]; + var cb = chunkCb[1]; + var encoding = ''; + var len; + + if (state.objectMode) + len = 1; + else if (false === state.decodeStrings) { + len = chunk[0].length; + encoding = chunk[1]; + } else + len = chunk.length; + + doWrite(stream, state, len, chunk, encoding, cb); + + // if we didn't call the onwrite immediately, then + // it means that we need to wait until it does. + // also, that means that the chunk and cb are currently + // being processed, so move the buffer counter past them. + if (state.writing) { + c++; + break; + } } + + state.bufferProcessing = false; + if (c < state.buffer.length) + state.buffer = state.buffer.slice(c); + else + state.buffer.length = 0; } Writable.prototype._write = function(chunk, cb) { @@ -317,19 +328,23 @@ Writable.prototype.end = function(chunk, encoding, cb) { this.write(chunk, encoding); // ignore unnecessary end() calls. - if (!state.ending && !state.ended && !state.finished) + if (!state.ending && !state.finished) endWritable(this, state, cb); }; -function endWritable(stream, state, cb) { - state.ending = true; - if (state.length === 0 && !state.finishing) { - state.finishing = true; - stream.emit('finish'); +function finishMaybe(stream, state) { + if (state.ending && state.length === 0 && !state.finished) { state.finished = true; + stream.emit('finish'); } + return state.finished; +} + +function endWritable(stream, state, cb) { + state.ending = true; + finishMaybe(stream, state); if (cb) { - if (state.finished || state.finishing) + if (state.finished) process.nextTick(cb); else stream.once('finish', cb); diff --git a/lib/net.js b/lib/net.js index 4be6402d7f..35223c3396 100644 --- a/lib/net.js +++ b/lib/net.js @@ -391,7 +391,7 @@ Socket.prototype.destroySoon = function() { if (this.writable) this.end(); - if (this._writableState.finishing || this._writableState.finished) + if (this._writableState.finished) this.destroy(); else this.once('finish', this.destroy); @@ -748,7 +748,6 @@ Socket.prototype.connect = function(options, cb) { this._writableState.ended = false; this._writableState.ending = false; this._writableState.finished = false; - this._writableState.finishing = false; this.destroyed = false; this._handle = null; } diff --git a/lib/tls.js b/lib/tls.js index bb0a03af22..86ace15b16 100644 --- a/lib/tls.js +++ b/lib/tls.js @@ -573,7 +573,7 @@ CryptoStream.prototype.destroySoon = function(err) { if (this.writable) this.end(); - if (this._writableState.finishing || this._writableState.finished) + if (this._writableState.finished) this.destroy(); else this.once('finish', this.destroy);