diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index 6fe9d19853..c9beafa9be 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -530,11 +530,17 @@ Readable.prototype.pipe = function(dest, pipeOpts) { ondrain(); } + // If the user pushes more data while we're writing to dest then we'll end up + // in ondata again. However, we only want to increase awaitDrain once because + // dest will only emit one 'drain' event for the multiple writes. + // => Introduce a guard on increasing awaitDrain. + var increasedAwaitDrain = false; src.on('data', ondata); function ondata(chunk) { debug('ondata'); + increasedAwaitDrain = false; var ret = dest.write(chunk); - if (false === ret) { + if (false === ret && !increasedAwaitDrain) { // If the user unpiped during `dest.write()`, it is possible // to get stuck in a permanently paused state if that write // also returned false. @@ -544,6 +550,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) { !cleanedUp) { debug('false write response, pause', src._readableState.awaitDrain); src._readableState.awaitDrain++; + increasedAwaitDrain = true; } src.pause(); } diff --git a/test/parallel/test-stream-pipe-await-drain-push-while-write.js b/test/parallel/test-stream-pipe-await-drain-push-while-write.js new file mode 100644 index 0000000000..1dfdfdb80c --- /dev/null +++ b/test/parallel/test-stream-pipe-await-drain-push-while-write.js @@ -0,0 +1,28 @@ +'use strict'; +const common = require('../common'); +const stream = require('stream'); + +// A writable stream which pushes data onto the stream which pipes into it, +// but only the first time it's written to. Since it's not paused at this time, +// a second write will occur. If the pipe increases awaitDrain twice, we'll +// never get subsequent chunks because 'drain' is only emitted once. +const writable = new stream.Writable({ + write: common.mustCall((chunk, encoding, cb) => { + if (chunk.length === 32 * 1024) { // first chunk + readable.push(new Buffer(33 * 1024)); // above hwm + } + cb(); + }, 3) +}); + +// A readable stream which produces two buffers. +const bufs = [new Buffer(32 * 1024), new Buffer(33 * 1024)]; // above hwm +const readable = new stream.Readable({ + read: function() { + while (bufs.length > 0) { + this.push(bufs.shift()); + } + } +}); + +readable.pipe(writable);