|
|
@ -52,6 +52,7 @@ function CryptoStream(pair) { |
|
|
|
this.readable = this.writable = true; |
|
|
|
|
|
|
|
this._paused = false; |
|
|
|
this._needDrain = false; |
|
|
|
this._pending = []; |
|
|
|
this._pendingCallbacks = []; |
|
|
|
this._pendingBytes = 0; |
|
|
@ -86,7 +87,7 @@ CryptoStream.prototype.write = function(data /* , encoding, cb */) { |
|
|
|
data = new Buffer(data, encoding); |
|
|
|
} |
|
|
|
|
|
|
|
debug('clearIn data'); |
|
|
|
debug((this === this.pair.cleartext ? 'clear' : 'encrypted') + 'In data'); |
|
|
|
|
|
|
|
this._pending.push(data); |
|
|
|
this._pendingCallbacks.push(cb); |
|
|
@ -95,7 +96,26 @@ CryptoStream.prototype.write = function(data /* , encoding, cb */) { |
|
|
|
this.pair._writeCalled = true; |
|
|
|
this.pair.cycle(); |
|
|
|
|
|
|
|
return this._pendingBytes < 128 * 1024; |
|
|
|
// In the following cases, write() should return a false,
|
|
|
|
// then this stream should eventually emit 'drain' event.
|
|
|
|
//
|
|
|
|
// 1. There are pending data more than 128k bytes.
|
|
|
|
// 2. A forward stream shown below is paused.
|
|
|
|
// A) EncryptedStream for CleartextStream.write().
|
|
|
|
// B) CleartextStream for EncryptedStream.write().
|
|
|
|
//
|
|
|
|
if (!this._needDrain) { |
|
|
|
if (this._pendingBytes >= 128 * 1024) { |
|
|
|
this._needDrain = true; |
|
|
|
} else { |
|
|
|
if (this === this.pair.cleartext) { |
|
|
|
this._needDrain = this.pair.encrypted._paused; |
|
|
|
} else { |
|
|
|
this._needDrain = this.pair.cleartext._paused; |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
return !this._needDrain; |
|
|
|
}; |
|
|
|
|
|
|
|
|
|
|
@ -380,11 +400,25 @@ CryptoStream.prototype._pull = function() { |
|
|
|
assert(rv === tmp.length); |
|
|
|
} |
|
|
|
|
|
|
|
// If we've cleared all of incoming encrypted data, emit drain.
|
|
|
|
if (havePending && this._pending.length === 0) { |
|
|
|
debug('drain'); |
|
|
|
this.emit('drain'); |
|
|
|
if (this.__destroyOnDrain) this.end(); |
|
|
|
// If pending data has cleared, 'drain' event should be emitted
|
|
|
|
// after write() returns a false.
|
|
|
|
// Except when a forward stream shown below is paused.
|
|
|
|
// A) EncryptedStream for CleartextStream._pull().
|
|
|
|
// B) CleartextStream for EncryptedStream._pull().
|
|
|
|
//
|
|
|
|
if (this._needDrain && this._pending.length === 0) { |
|
|
|
var paused; |
|
|
|
if (this === this.pair.cleartext) { |
|
|
|
paused = this.pair.encrypted._paused; |
|
|
|
} else { |
|
|
|
paused = this.pair.cleartext._paused; |
|
|
|
} |
|
|
|
if (!paused) { |
|
|
|
debug('drain'); |
|
|
|
process.nextTick(this.emit.bind(this, 'drain')); |
|
|
|
this._needDrain = false; |
|
|
|
if (this.__destroyOnDrain) this.end(); |
|
|
|
} |
|
|
|
} |
|
|
|
}; |
|
|
|
|
|
|
|