diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index 601f5b713f..403cb7b477 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -203,7 +203,9 @@ function writeOrBuffer(stream, state, chunk, encoding, cb) { state.length += len; var ret = state.length < state.highWaterMark; - state.needDrain = !ret; + // we must ensure that previous needDrain will not be reset to false. + if (!ret) + state.needDrain = true; if (state.writing) state.buffer.push(new WriteReq(chunk, encoding, cb)); diff --git a/test/simple/test-stream-big-packet.js b/test/simple/test-stream-big-packet.js new file mode 100644 index 0000000000..9ec29ca0dc --- /dev/null +++ b/test/simple/test-stream-big-packet.js @@ -0,0 +1,73 @@ +// Copyright Joyent, Inc. and other Node contributors. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the +// "Software"), to deal in the Software without restriction, including +// without limitation the rights to use, copy, modify, merge, publish, +// distribute, sublicense, and/or sell copies of the Software, and to permit +// persons to whom the Software is furnished to do so, subject to the +// following conditions: +// +// The above copyright notice and this permission notice shall be included +// in all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN +// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR +// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE +// USE OR OTHER DEALINGS IN THE SOFTWARE. + +var common = require('../common'); +var assert = require('assert'); +var util = require('util'); +var stream = require('stream'); + +var passed = false; + +function PassThrough () { + stream.Transform.call(this); +}; +util.inherits(PassThrough, stream.Transform); +PassThrough.prototype._transform = function (chunk, encoding, done) { + this.push(chunk); + done(); +}; + +function TestStream () { + stream.Transform.call(this); +}; +util.inherits(TestStream, stream.Transform); +TestStream.prototype._transform = function (chunk, encoding, done) { + if (!passed) { + // Char 'a' only exists in the last write + passed = chunk.toString().indexOf('a') >= 0; + } + done(); +}; + +var s1 = new PassThrough(); +var s2 = new PassThrough(); +var s3 = new TestStream(); +s1.pipe(s3); +// Don't let s2 auto close which may close s3 +s2.pipe(s3, {end: false}); + +// We must write a buffer larger than highWaterMark +var big = new Buffer(s1._writableState.highWaterMark + 1); +big.fill('x'); + +// Since big is larger than highWaterMark, it will be buffered internally. +assert(!s1.write(big)); +// 'tiny' is small enough to pass through internal buffer. +assert(s2.write('tiny')); + +// Write some small data in next IO loop, which will never be written to s3 +// Because 'drain' event is not emitted from s1 and s1 is still paused +setImmediate(s1.write.bind(s1), 'later'); + +// Assert after two IO loops when all operations have been done. +process.on('exit', function () { + assert(passed, 'Large buffer is not handled properly by Writable Stream'); +});