From a58454226f4833e40c6a9040c813abd0c0e10893 Mon Sep 17 00:00:00 2001 From: isaacs Date: Wed, 8 May 2013 13:29:54 -0700 Subject: [PATCH] stream: Handle multi-corking properly This adds proper support for the following situation: w.cork(); w.write(...); w.cork(); w.write(...); w.uncork(); w.write(...); w.uncork(); This is relevant when you have a function (as we do in HTTP) that wants to use cork, but in some cases, want to have a cork/uncork *around* that function, without losing the benefits of writev. --- lib/_stream_writable.js | 16 ++-- test/simple/test-stream-writev.js | 121 ++++++++++++++++++++++++++++++ 2 files changed, 132 insertions(+), 5 deletions(-) create mode 100644 test/simple/test-stream-writev.js diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index f0034e3ba5..e40239ba86 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -77,7 +77,7 @@ function WritableState(options, stream) { this.writing = false; // when true all writes will be buffered until .uncork() call - this.corked = false; + this.corked = 0; // a flag to be able to tell if the onwrite cb is called immediately, // or on a later tick. We set this to true at first, becuase any @@ -190,16 +190,17 @@ Writable.prototype.write = function(chunk, encoding, cb) { Writable.prototype.cork = function() { var state = this._writableState; - state.corked = true; + state.corked++; }; Writable.prototype.uncork = function() { var state = this._writableState; if (state.corked) { - state.corked = false; + state.corked--; if (!state.writing && + !state.corked && !state.finished && !state.bufferProcessing && state.buffer.length) @@ -221,6 +222,8 @@ function decodeChunk(state, chunk, encoding) { // If we return false, then we need a drain event, so set that flag. function writeOrBuffer(stream, state, chunk, encoding, cb) { chunk = decodeChunk(state, chunk, encoding); + if (Buffer.isBuffer(chunk)) + encoding = 'buffer'; var len = state.objectMode ? 1 : chunk.length; state.length += len; @@ -392,8 +395,11 @@ Writable.prototype.end = function(chunk, encoding, cb) { if (typeof chunk !== 'undefined' && chunk !== null) this.write(chunk, encoding); - // .end() should .uncork() - this.uncork(); + // .end() fully uncorks + if (state.corked) { + state.corked = 1; + this.uncork(); + } // ignore unnecessary end() calls. if (!state.ending && !state.finished) diff --git a/test/simple/test-stream-writev.js b/test/simple/test-stream-writev.js new file mode 100644 index 0000000000..5b49e6e416 --- /dev/null +++ b/test/simple/test-stream-writev.js @@ -0,0 +1,121 @@ +// Copyright Joyent, Inc. and other Node contributors. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the +// "Software"), to deal in the Software without restriction, including +// without limitation the rights to use, copy, modify, merge, publish, +// distribute, sublicense, and/or sell copies of the Software, and to permit +// persons to whom the Software is furnished to do so, subject to the +// following conditions: +// +// The above copyright notice and this permission notice shall be included +// in all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN +// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR +// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE +// USE OR OTHER DEALINGS IN THE SOFTWARE. + +var common = require('../common'); +var assert = require('assert'); + +var stream = require('stream'); + +var queue = []; +for (var decode = 0; decode < 2; decode++) { + for (var uncork = 0; uncork < 2; uncork++) { + for (var multi = 0; multi < 2; multi++) { + queue.push([!!decode, !!uncork, !!multi]); + } + } +} + +run(); + +function run() { + var t = queue.pop(); + if (t) + test(t[0], t[1], t[2], run); + else + console.log('ok'); +} + +function test(decode, uncork, multi, next) { + console.log('# decode=%j uncork=%j multi=%j', decode, uncork, multi); + var counter = 0; + var expectCount = 0; + function cnt(msg) { + expectCount++; + var expect = expectCount; + var called = false; + return function(er) { + if (er) + throw er; + called = true; + counter++; + assert.equal(counter, expect); + }; + } + + var w = new stream.Writable({ decodeStrings: decode }); + w._write = function(chunk, e, cb) { + assert(false, 'Should not call _write'); + }; + + var expectChunks = decode ? + [{ encoding: 'buffer', + chunk: [104, 101, 108, 108, 111, 44, 32] }, + { encoding: 'buffer', chunk: [119, 111, 114, 108, 100] }, + { encoding: 'buffer', chunk: [33] }, + { encoding: 'buffer', + chunk: [10, 97, 110, 100, 32, 116, 104, 101, 110, 46, 46, 46] }, + { encoding: 'buffer', + chunk: [250, 206, 190, 167, 222, 173, 190, 239, 222, 202, 251, 173] }] : + [{ encoding: 'ascii', chunk: 'hello, ' }, + { encoding: 'utf8', chunk: 'world' }, + { encoding: 'buffer', chunk: [33] }, + { encoding: 'binary', chunk: '\nand then...' }, + { encoding: 'hex', chunk: 'facebea7deadbeefdecafbad' }]; + + var actualChunks; + w._writev = function(chunks, cb) { + actualChunks = chunks.map(function(chunk) { + return { + encoding: chunk.encoding, + chunk: Buffer.isBuffer(chunk.chunk) ? + Array.prototype.slice.call(chunk.chunk) : chunk.chunk + }; + }); + cb(); + }; + + w.cork(); + w.write('hello, ', 'ascii', cnt('hello')); + w.write('world', 'utf8', cnt('world')); + + if (multi) + w.cork(); + + w.write(new Buffer('!'), 'buffer', cnt('!')); + w.write('\nand then...', 'binary', cnt('and then')); + + if (multi) + w.uncork(); + + w.write('facebea7deadbeefdecafbad', 'hex', cnt('hex')); + + if (uncork) + w.uncork(); + + w.end(cnt('end')); + + w.on('finish', function() { + // make sure finish comes after all the write cb + cnt('finish')(); + assert.deepEqual(expectChunks, actualChunks); + next(); + }); +}