diff --git a/lib/tls.js b/lib/tls.js index f5bb3337fd..8adb7a2267 100644 --- a/lib/tls.js +++ b/lib/tls.js @@ -52,6 +52,7 @@ function CryptoStream(pair) { this.readable = this.writable = true; this._paused = false; + this._needDrain = false; this._pending = []; this._pendingCallbacks = []; this._pendingBytes = 0; @@ -86,7 +87,7 @@ CryptoStream.prototype.write = function(data /* , encoding, cb */) { data = new Buffer(data, encoding); } - debug('clearIn data'); + debug((this === this.pair.cleartext ? 'clear' : 'encrypted') + 'In data'); this._pending.push(data); this._pendingCallbacks.push(cb); @@ -95,7 +96,26 @@ CryptoStream.prototype.write = function(data /* , encoding, cb */) { this.pair._writeCalled = true; this.pair.cycle(); - return this._pendingBytes < 128 * 1024; + // In the following cases, write() should return a false, + // then this stream should eventually emit 'drain' event. + // + // 1. There are pending data more than 128k bytes. + // 2. A forward stream shown below is paused. + // A) EncryptedStream for CleartextStream.write(). + // B) CleartextStream for EncryptedStream.write(). + // + if (!this._needDrain) { + if (this._pendingBytes >= 128 * 1024) { + this._needDrain = true; + } else { + if (this === this.pair.cleartext) { + this._needDrain = this.pair.encrypted._paused; + } else { + this._needDrain = this.pair.cleartext._paused; + } + } + } + return !this._needDrain; }; @@ -380,11 +400,25 @@ CryptoStream.prototype._pull = function() { assert(rv === tmp.length); } - // If we've cleared all of incoming encrypted data, emit drain. - if (havePending && this._pending.length === 0) { - debug('drain'); - this.emit('drain'); - if (this.__destroyOnDrain) this.end(); + // If pending data has cleared, 'drain' event should be emitted + // after write() returns a false. + // Except when a forward stream shown below is paused. + // A) EncryptedStream for CleartextStream._pull(). + // B) CleartextStream for EncryptedStream._pull(). + // + if (this._needDrain && this._pending.length === 0) { + var paused; + if (this === this.pair.cleartext) { + paused = this.pair.encrypted._paused; + } else { + paused = this.pair.cleartext._paused; + } + if (!paused) { + debug('drain'); + process.nextTick(this.emit.bind(this, 'drain')); + this._needDrain = false; + if (this.__destroyOnDrain) this.end(); + } } };