From b922b5e90d2c14dd332b95827c2533e083df7e55 Mon Sep 17 00:00:00 2001 From: ayanamist Date: Fri, 3 Jan 2014 19:37:16 +0800 Subject: [PATCH] stream: writes may return false but forget to emit drain If a write is above the highWaterMark, _write still manages to fully send it synchronously, _writableState.length will be adjusted down to 0 synchronously with the write returning false, but 'drain' will not be emitted until process.nextTick. If another small write which is below highWaterMark is issued before process.nextTick happens, _writableState.needDrain will be reset to false, and the drain event will never be fired. So we should check needDrain before setting it up, which prevents it from inproperly resetting to false. --- lib/_stream_writable.js | 4 +- test/simple/test-stream-big-packet.js | 73 +++++++++++++++++++++++++++ 2 files changed, 76 insertions(+), 1 deletion(-) create mode 100644 test/simple/test-stream-big-packet.js diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index 601f5b713f..403cb7b477 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -203,7 +203,9 @@ function writeOrBuffer(stream, state, chunk, encoding, cb) { state.length += len; var ret = state.length < state.highWaterMark; - state.needDrain = !ret; + // we must ensure that previous needDrain will not be reset to false. + if (!ret) + state.needDrain = true; if (state.writing) state.buffer.push(new WriteReq(chunk, encoding, cb)); diff --git a/test/simple/test-stream-big-packet.js b/test/simple/test-stream-big-packet.js new file mode 100644 index 0000000000..9ec29ca0dc --- /dev/null +++ b/test/simple/test-stream-big-packet.js @@ -0,0 +1,73 @@ +// Copyright Joyent, Inc. and other Node contributors. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the +// "Software"), to deal in the Software without restriction, including +// without limitation the rights to use, copy, modify, merge, publish, +// distribute, sublicense, and/or sell copies of the Software, and to permit +// persons to whom the Software is furnished to do so, subject to the +// following conditions: +// +// The above copyright notice and this permission notice shall be included +// in all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN +// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR +// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE +// USE OR OTHER DEALINGS IN THE SOFTWARE. + +var common = require('../common'); +var assert = require('assert'); +var util = require('util'); +var stream = require('stream'); + +var passed = false; + +function PassThrough () { + stream.Transform.call(this); +}; +util.inherits(PassThrough, stream.Transform); +PassThrough.prototype._transform = function (chunk, encoding, done) { + this.push(chunk); + done(); +}; + +function TestStream () { + stream.Transform.call(this); +}; +util.inherits(TestStream, stream.Transform); +TestStream.prototype._transform = function (chunk, encoding, done) { + if (!passed) { + // Char 'a' only exists in the last write + passed = chunk.toString().indexOf('a') >= 0; + } + done(); +}; + +var s1 = new PassThrough(); +var s2 = new PassThrough(); +var s3 = new TestStream(); +s1.pipe(s3); +// Don't let s2 auto close which may close s3 +s2.pipe(s3, {end: false}); + +// We must write a buffer larger than highWaterMark +var big = new Buffer(s1._writableState.highWaterMark + 1); +big.fill('x'); + +// Since big is larger than highWaterMark, it will be buffered internally. +assert(!s1.write(big)); +// 'tiny' is small enough to pass through internal buffer. +assert(s2.write('tiny')); + +// Write some small data in next IO loop, which will never be written to s3 +// Because 'drain' event is not emitted from s1 and s1 is still paused +setImmediate(s1.write.bind(s1), 'later'); + +// Assert after two IO loops when all operations have been done. +process.on('exit', function () { + assert(passed, 'Large buffer is not handled properly by Writable Stream'); +});