From d59beb9f686578a3c34606cba294df0fb7d844a9 Mon Sep 17 00:00:00 2001 From: Fedor Indutny Date: Thu, 31 Jan 2013 21:20:49 +0400 Subject: [PATCH] tls: port CryptoStream to streams2 --- lib/tls.js | 666 ++++++++++++++++++++++------------------------------- 1 file changed, 280 insertions(+), 386 deletions(-) diff --git a/lib/tls.js b/lib/tls.js index 3efa3e56e4..97b7f74843 100644 --- a/lib/tls.js +++ b/lib/tls.js @@ -24,8 +24,7 @@ var util = require('util'); var net = require('net'); var url = require('url'); var events = require('events'); -var Stream = require('stream'); -var END_OF_FILE = 42; +var stream = require('stream'); var assert = require('assert').ok; var constants = require('constants'); @@ -209,14 +208,17 @@ SlabBuffer.prototype.create = function create() { }; -SlabBuffer.prototype.use = function use(context, fn) { +SlabBuffer.prototype.use = function use(context, fn, size) { if (this.remaining === 0) { this.isFull = true; return 0; } - var bytes = fn.call(context, this.pool, this.offset, this.remaining); + var actualSize = this.remaining; + if (size !== null) actualSize = Math.min(size, actualSize); + + var bytes = fn.call(context, this.pool, this.offset, actualSize); if (bytes > 0) { this.offset += bytes; this.remaining -= bytes; @@ -232,93 +234,232 @@ var slabBuffer = null; // Base class of both CleartextStream and EncryptedStream -function CryptoStream(pair) { - Stream.call(this); +function CryptoStream(pair, options) { + stream.Duplex.call(this, options); this.pair = pair; + this._pending = null; + this._pendingCallback = null; + this._doneFlag = false; + this._resumingSession = false; + this._destroyed = false; + this._ended = false; + this._finished = false; + this._opposite = null; - this.readable = this.writable = true; - - this._paused = false; - this._needDrain = false; - this._pending = []; - this._pendingCallbacks = []; - this._pendingBytes = 0; if (slabBuffer === null) slabBuffer = new SlabBuffer(); this._buffer = slabBuffer; + + this.once('finish', onCryptoStreamFinish); + + // net.Socket calls .onend too + this.once('end', onCryptoStreamEnd); } -util.inherits(CryptoStream, Stream); +util.inherits(CryptoStream, stream.Duplex); + +function onCryptoStreamFinish() { + this._finished = true; -CryptoStream.prototype.write = function(data /* , encoding, cb */) { - if (this == this.pair.cleartext) { - debug('cleartext.write called with ' + data.length + ' bytes'); + if (this === this.pair.cleartext) { + debug('cleartext.onfinish'); + if (this.pair.ssl) { + // Generate close notify + // NOTE: first call checks if client has sent us shutdown, + // second call enqueues shutdown into the BIO. + if (this.pair.ssl.shutdown() !== 1) { + this.pair.ssl.shutdown(); + } + } } else { - debug('encrypted.write called with ' + data.length + ' bytes'); + debug('encrypted.onfinish'); } - if (!this.writable) { - throw new Error('CryptoStream is not writable'); - } + // Try to read just to get sure that we won't miss EOF + if (this._opposite.readable) this._opposite.read(0); - var encoding, cb; + if (this._opposite._ended) { + this._done(); - // parse arguments - if (typeof arguments[1] == 'string') { - encoding = arguments[1]; - cb = arguments[2]; - } else { - cb = arguments[1]; + // No half-close, sorry + if (this === this.pair.cleartext) this._opposite._done(); } +} - // Transform strings into buffers. - if (typeof data == 'string') { - data = new Buffer(data, encoding); +function onCryptoStreamEnd() { + this._ended = true; + if (this === this.pair.cleartext) { + debug('cleartext.onend'); + } else { + debug('encrypted.onend'); } - debug((this === this.pair.cleartext ? 'clear' : 'encrypted') + 'In data'); + if (this.onend) this.onend(); +} - this._pending.push(data); - this._pendingCallbacks.push(cb); - this._pendingBytes += data.length; - this.pair._writeCalled = true; - this.pair.cycle(); +CryptoStream.prototype._write = function write(data, cb) { + assert(this._pending === null); - // In the following cases, write() should return a false, - // then this stream should eventually emit 'drain' event. - // - // 1. There are pending data more than 128k bytes. - // 2. A forward stream shown below is paused. - // A) EncryptedStream for CleartextStream.write(). - // B) CleartextStream for EncryptedStream.write(). + // Black-hole data + if (!this.pair.ssl) return cb(null); + + // When resuming session don't accept any new data. + // And do not put too much data into openssl, before writing it from encrypted + // side. // - if (!this._needDrain) { - if (this._pendingBytes >= 128 * 1024) { - this._needDrain = true; + // TODO(indutny): Remove magic number, use watermark based limits + if (!this._resumingSession && + (this !== this.pair.cleartext || + this.pair.encrypted._internallyPendingBytes() < 128 * 1024)) { + // Write current buffer now + var written; + if (this === this.pair.cleartext) { + debug('cleartext.write called with ' + data.length + ' bytes'); + written = this.pair.ssl.clearIn(data, 0, data.length); } else { + debug('encrypted.write called with ' + data.length + ' bytes'); + written = this.pair.ssl.encIn(data, 0, data.length); + } + + var self = this; + + // Force SSL_read call to cycle some states/data inside OpenSSL + this.pair.cleartext.read(0); + + // Cycle encrypted data + if (this.pair.encrypted._internallyPendingBytes()) { + this.pair.encrypted.read(0); + } + + // Handle and report errors + if (this.pair.ssl && this.pair.ssl.error) { + return cb(this.pair.error()); + } + + // Get NPN and Server name when ready + this.pair.maybeInitFinished(); + + // Whole buffer was written + if (written === data.length) { if (this === this.pair.cleartext) { - this._needDrain = this.pair.encrypted._paused; + debug('cleartext.write succeed with ' + data.length + ' bytes'); } else { - this._needDrain = this.pair.cleartext._paused; + debug('encrypted.write succeed with ' + data.length + ' bytes'); } + + return cb(null); } + assert(written === 0 || written === -1); + } else { + debug('cleartext.write queue is full'); + + // Force SSL_read call to cycle some states/data inside OpenSSL + this.pair.cleartext.read(0); + } + + // No write has happened + this._pending = data; + this._pendingCallback = cb; + + if (this === this.pair.cleartext) { + debug('cleartext.write queued with ' + data.length + ' bytes'); + } else { + debug('encrypted.write queued with ' + data.length + ' bytes'); } - return !this._needDrain; }; -CryptoStream.prototype.pause = function() { - debug('paused ' + (this == this.pair.cleartext ? 'cleartext' : 'encrypted')); - this._paused = true; +CryptoStream.prototype._writePending = function writePending() { + var data = this._pending, + cb = this._pendingCallback; + + this._pending = null; + this._pendingCallback = null; + this._write(data, cb); }; -CryptoStream.prototype.resume = function() { - debug('resume ' + (this == this.pair.cleartext ? 'cleartext' : 'encrypted')); - this._paused = false; - this.pair.cycle(); +CryptoStream.prototype._read = function read(size, cb) { + // XXX: EOF?! + if (!this.pair.ssl) return cb(null, null); + + // Wait for session to be resumed + if (this._resumingSession) return cb(null, ''); + + var out; + if (this === this.pair.cleartext) { + debug('cleartext.read called with ' + size + ' bytes'); + out = this.pair.ssl.clearOut; + } else { + debug('encrypted.read called with ' + size + ' bytes'); + out = this.pair.ssl.encOut; + } + + var bytesRead = 0, + start = this._buffer.offset; + do { + var read = this._buffer.use(this.pair.ssl, out, size); + if (read > 0) { + bytesRead += read; + size -= read; + } + + // Handle and report errors + if (this.pair.ssl && this.pair.ssl.error) { + this.pair.error(); + break; + } + + // Get NPN and Server name when ready + this.pair.maybeInitFinished(); + } while (read > 0 && !this._buffer.isFull && bytesRead < size); + + // Create new buffer if previous was filled up + var pool = this._buffer.pool; + if (this._buffer.isFull) this._buffer.create(); + + assert(bytesRead >= 0); + + if (this === this.pair.cleartext) { + debug('cleartext.read succeed with ' + bytesRead + ' bytes'); + } else { + debug('encrypted.read succeed with ' + bytesRead + ' bytes'); + } + + // Try writing pending data + if (this._pending !== null) this._writePending(); + + if (bytesRead === 0) { + // EOF when cleartext has finished and we have nothing to read + if (this._opposite._finished && this._internallyPendingBytes() === 0) { + // Perform graceful shutdown + this._done(); + + // No half-open, sorry! + if (this === this.pair.cleartext) + this._opposite._done(); + + return cb(null, null); + } + + // Bail out + return cb(null, ''); + } + + // Give them requested data + if (this.ondata) { + var self = this; + this.ondata(pool, start, start + bytesRead); + + // Consume data automatically + // simple/test-https-drain fails without it + process.nextTick(function() { + self.read(bytesRead); + }); + } + return cb(null, pool.slice(start, start + bytesRead)); }; @@ -340,11 +481,6 @@ CryptoStream.prototype.__defineGetter__('bytesWritten', function() { return this.socket ? this.socket.bytesWritten : 0; }); -CryptoStream.prototype.setEncoding = function(encoding) { - var StringDecoder = require('string_decoder').StringDecoder; // lazy load - this._decoder = new StringDecoder(encoding); -}; - // Example: // C=US\nST=CA\nL=SF\nO=Joyent\nOU=Node.js\nCN=ca1\nemailAddress=ry@clouds.org @@ -409,53 +545,74 @@ CryptoStream.prototype.getCipher = function(err) { }; -CryptoStream.prototype.end = function(d) { - if (this.pair._doneFlag) return; - if (!this.writable) return; - - if (d) { - this.write(d); +CryptoStream.prototype.end = function(chunk, encoding) { + if (this === this.pair.cleartext) { + debug('cleartext.end'); + } else { + debug('encrypted.end'); } - this._pending.push(END_OF_FILE); - this._pendingCallbacks.push(null); - - // If this is an encrypted stream then we need to disable further 'data' - // events. + // Write pending data first + if (this._pending !== null) this._writePending(); this.writable = false; - this.pair.cycle(); + stream.Duplex.prototype.end.call(this, chunk, encoding); }; CryptoStream.prototype.destroySoon = function(err) { - if (this.writable) { - this.end(); + if (this === this.pair.cleartext) { + debug('cleartext.destroySoon'); } else { - this.destroy(); + debug('encrypted.destroySoon'); } + + if (this.writable) + this.end(); + + if (this._writableState.finishing || this._writableState.finished) + this.destroy(); + else + this.once('finish', this.destroy); }; CryptoStream.prototype.destroy = function(err) { - if (this.pair._doneFlag) return; - this.pair.destroy(); + if (this._destroyed) return; + this._destroyed = true; + this.readable = this.writable = false; + + // Destroy both ends + if (this === this.pair.cleartext) { + debug('cleartext.destroy'); + } else { + debug('encrypted.destroy'); + } + this._opposite.destroy(); + + var self = this; + process.nextTick(function() { + // Force EOF + self.push(null); + + // Emit 'close' event + self.emit('close', err ? true : false); + }); }; CryptoStream.prototype._done = function() { this._doneFlag = true; + if (this === this.pair.encrypted && !this.pair._secureEstablished) + return this.pair.error(); + if (this.pair.cleartext._doneFlag && this.pair.encrypted._doneFlag && !this.pair._doneFlag) { // If both streams are done: - if (!this.pair._secureEstablished) { - this.pair.error(); - } else { - this.pair.destroy(); - } + this.pair.destroy(); } }; @@ -478,182 +635,8 @@ Object.defineProperty(CryptoStream.prototype, 'readyState', { }); -// Move decrypted, clear data out into the application. -// From the user's perspective this occurs as a 'data' event -// on the pair.cleartext. -// also -// Move encrypted data to the stream. From the user's perspective this -// occurs as a 'data' event on the pair.encrypted. Usually the application -// will have some code which pipes the stream to a socket: -// -// pair.encrypted.on('data', function (d) { -// socket.write(d); -// }); -// -CryptoStream.prototype._push = function() { - if (this == this.pair.encrypted && !this.writable) { - // If the encrypted side got EOF, we do not attempt - // to write out data anymore. - return; - } - - while (!this._paused) { - var chunkBytes = 0, - bytesRead = 0, - start = this._buffer.offset; - - do { - chunkBytes = this._buffer.use(this, this._pusher); - if (chunkBytes > 0) bytesRead += chunkBytes; - - if (this.pair.ssl && this.pair.ssl.error) { - this.pair.error(); - return; - } - - this.pair.maybeInitFinished(); - - } while (chunkBytes > 0 && !this._buffer.isFull); - - var pool = this._buffer.pool; - - // Create new buffer if previous was filled up - if (this._buffer.isFull) this._buffer.create(); - - assert(bytesRead >= 0); - - // Bail out if we didn't read any data. - if (bytesRead == 0) { - if (this._internallyPendingBytes() == 0 && this._destroyAfterPush) { - this._done(); - } - return; - } - - var chunk = pool.slice(start, start + bytesRead); - - if (this === this.pair.cleartext) { - debug('cleartext emit "data" with ' + bytesRead + ' bytes'); - } else { - debug('encrypted emit "data" with ' + bytesRead + ' bytes'); - } - - if (this._decoder) { - var string = this._decoder.write(chunk); - if (string.length) this.emit('data', string); - } else { - this.emit('data', chunk); - } - - // Optimization: emit the original buffer with end points - if (this.ondata) this.ondata(pool, start, start + bytesRead); - } -}; - - -// Push in any clear data coming from the application. -// This arrives via some code like this: -// -// pair.cleartext.write("hello world"); -// -// also -// -// Push in incoming encrypted data from the socket. -// This arrives via some code like this: -// -// socket.on('data', function (d) { -// pair.encrypted.write(d) -// }); -// -CryptoStream.prototype._pull = function() { - var havePending = this._pending.length > 0; - - assert(havePending || this._pendingBytes == 0); - - while (this._pending.length > 0) { - if (!this.pair.ssl) break; - - var tmp = this._pending.shift(); - var cb = this._pendingCallbacks.shift(); - - assert(this._pending.length === this._pendingCallbacks.length); - - if (tmp === END_OF_FILE) { - // Sending EOF - if (this === this.pair.encrypted) { - debug('end encrypted ' + this.pair.fd); - this.pair.cleartext._destroyAfterPush = true; - } else { - // CleartextStream - assert(this === this.pair.cleartext); - debug('end cleartext'); - - this.pair.ssl.shutdown(); - - // TODO check if we get EAGAIN From shutdown, would have to do it - // again. should unshift END_OF_FILE back onto pending and wait for - // next cycle. - - this.pair.encrypted._destroyAfterPush = true; - } - this.pair.cycle(); - this._done(); - return; - } - - if (tmp.length == 0) continue; - - var rv = this._puller(tmp); - - if (this.pair.ssl && this.pair.ssl.error) { - this.pair.error(); - return; - } - - this.pair.maybeInitFinished(); - - if (rv === 0 || rv < 0) { - this._pending.unshift(tmp); - this._pendingCallbacks.unshift(cb); - break; - } - - this._pendingBytes -= tmp.length; - assert(this._pendingBytes >= 0); - - if (cb) cb(); - - assert(rv === tmp.length); - } - - // If pending data has cleared, 'drain' event should be emitted - // after write() returns a false. - // Except when a forward stream shown below is paused. - // A) EncryptedStream for CleartextStream._pull(). - // B) CleartextStream for EncryptedStream._pull(). - // - if (this._needDrain && this._pending.length === 0) { - var paused; - if (this === this.pair.cleartext) { - paused = this.pair.encrypted._paused; - } else { - paused = this.pair.cleartext._paused; - } - if (!paused) { - debug('drain ' + (this === this.pair.cleartext ? 'clear' : 'encrypted')); - var self = this; - process.nextTick(function() { - self.emit('drain'); - }); - this._needDrain = false; - if (this.__destroyOnDrain) this.end(); - } - } -}; - - -function CleartextStream(pair) { - CryptoStream.call(this, pair); +function CleartextStream(pair, options) { + CryptoStream.call(this, pair, options); } util.inherits(CleartextStream, CryptoStream); @@ -667,22 +650,11 @@ CleartextStream.prototype._internallyPendingBytes = function() { }; -CleartextStream.prototype._puller = function(b) { - debug('clearIn ' + b.length + ' bytes'); - return this.pair.ssl.clearIn(b, 0, b.length); -}; - - -CleartextStream.prototype._pusher = function(pool, offset, length) { - debug('reading from clearOut'); - if (!this.pair.ssl) return -1; - return this.pair.ssl.clearOut(pool, offset, length); -}; - CleartextStream.prototype.address = function() { return this.socket && this.socket.address(); }; + CleartextStream.prototype.__defineGetter__('remoteAddress', function() { return this.socket && this.socket.remoteAddress; }); @@ -692,8 +664,8 @@ CleartextStream.prototype.__defineGetter__('remotePort', function() { return this.socket && this.socket.remotePort; }); -function EncryptedStream(pair) { - CryptoStream.call(this, pair); +function EncryptedStream(pair, options) { + CryptoStream.call(this, pair, options); } util.inherits(EncryptedStream, CryptoStream); @@ -707,19 +679,6 @@ EncryptedStream.prototype._internallyPendingBytes = function() { }; -EncryptedStream.prototype._puller = function(b) { - debug('writing from encIn'); - return this.pair.ssl.encIn(b, 0, b.length); -}; - - -EncryptedStream.prototype._pusher = function(pool, offset, length) { - debug('reading from encOut'); - if (!this.pair.ssl) return -1; - return this.pair.ssl.encOut(pool, offset, length); -}; - - function onhandshakestart() { debug('onhandshakestart'); @@ -754,12 +713,12 @@ function onhandshakedone() { debug('onhandshakedone'); } + function onclienthello(hello) { var self = this, once = false; - this.encrypted.pause(); - this.cleartext.pause(); + this._resumingSession = true; function callback(err, session) { if (once) return; once = true; @@ -768,8 +727,10 @@ function onclienthello(hello) { self.ssl.loadSession(session); - self.encrypted.resume(); - self.cleartext.resume(); + // Cycle data + self._resumingSession = false; + self.cleartext.read(0); + self.encrypted.read(0); } if (hello.sessionId.length <= 0 || @@ -812,6 +773,7 @@ function SecurePair(credentials, isServer, requestCert, rejectUnauthorized, this._encWriteState = true; this._clearWriteState = true; this._doneFlag = false; + this._destroying = false; if (!credentials) { this.credentials = crypto.createCredentials(); @@ -856,17 +818,20 @@ function SecurePair(credentials, isServer, requestCert, rejectUnauthorized, } /* Acts as a r/w stream to the cleartext side of the stream. */ - this.cleartext = new CleartextStream(this); + this.cleartext = new CleartextStream(this, options.cleartext); /* Acts as a r/w stream to the encrypted side of the stream. */ - this.encrypted = new EncryptedStream(this); + this.encrypted = new EncryptedStream(this, options.encrypted); + + /* Let streams know about each other */ + this.cleartext._opposite = this.encrypted; + this.encrypted._opposite = this.cleartext; process.nextTick(function() { /* The Connection may be destroyed by an abort call */ if (self.ssl) { self.ssl.start(); } - self.cycle(); }); } @@ -885,81 +850,6 @@ exports.createSecurePair = function(credentials, }; - - -/* Attempt to cycle OpenSSLs buffers in various directions. - * - * An SSL Connection can be viewed as four separate piplines, - * interacting with one has no connection to the behavoir of - * any of the other 3 -- This might not sound reasonable, - * but consider things like mid-stream renegotiation of - * the ciphers. - * - * The four pipelines, using terminology of the client (server is just - * reversed): - * (1) Encrypted Output stream (Writing encrypted data to peer) - * (2) Encrypted Input stream (Reading encrypted data from peer) - * (3) Cleartext Output stream (Decrypted content from the peer) - * (4) Cleartext Input stream (Cleartext content to send to the peer) - * - * This function attempts to pull any available data out of the Cleartext - * input stream (4), and the Encrypted input stream (2). Then it pushes any - * data available from the cleartext output stream (3), and finally from the - * Encrypted output stream (1) - * - * It is called whenever we do something with OpenSSL -- post reciving - * content, trying to flush, trying to change ciphers, or shutting down the - * connection. - * - * Because it is also called everywhere, we also check if the connection has - * completed negotiation and emit 'secure' from here if it has. - */ -SecurePair.prototype.cycle = function(depth) { - if (this._doneFlag) return; - - depth = depth ? depth : 0; - - if (depth == 0) this._writeCalled = false; - - var established = this._secureEstablished; - - if (!this.cycleEncryptedPullLock) { - this.cycleEncryptedPullLock = true; - debug('encrypted._pull'); - this.encrypted._pull(); - this.cycleEncryptedPullLock = false; - } - - if (!this.cycleCleartextPullLock) { - this.cycleCleartextPullLock = true; - debug('cleartext._pull'); - this.cleartext._pull(); - this.cycleCleartextPullLock = false; - } - - if (!this.cycleCleartextPushLock) { - this.cycleCleartextPushLock = true; - debug('cleartext._push'); - this.cleartext._push(); - this.cycleCleartextPushLock = false; - } - - if (!this.cycleEncryptedPushLock) { - this.cycleEncryptedPushLock = true; - debug('encrypted._push'); - this.encrypted._push(); - this.cycleEncryptedPushLock = false; - } - - if ((!established && this._secureEstablished) || - (depth == 0 && this._writeCalled)) { - // If we were not established but now we are, let's cycle again. - // Or if there is some data to write... - this.cycle(depth + 1); - } -}; - - SecurePair.prototype.maybeInitFinished = function() { if (this.ssl && !this._secureEstablished && this.ssl.isInitFinished()) { if (process.features.tls_npn) { @@ -978,27 +868,20 @@ SecurePair.prototype.maybeInitFinished = function() { SecurePair.prototype.destroy = function() { - var self = this; + if (this._destroying) return; if (!this._doneFlag) { + debug('SecurePair.destroy'); + this._destroying = true; + + // SecurePair should be destroyed only after it's streams + this.cleartext.destroy(); + this.encrypted.destroy(); + this._doneFlag = true; this.ssl.error = null; this.ssl.close(); this.ssl = null; - - self.encrypted.writable = self.encrypted.readable = false; - self.cleartext.writable = self.cleartext.readable = false; - - process.nextTick(function() { - if (self.cleartext._decoder) { - var ret = self.cleartext._decoder.end(); - if (ret) - self.cleartext.emit('data', ret); - } - self.cleartext.emit('end'); - self.encrypted.emit('close'); - self.cleartext.emit('close'); - }); } }; @@ -1012,6 +895,7 @@ SecurePair.prototype.error = function() { } this.destroy(); this.emit('error', error); + return error; } else { var err = this.ssl.error; this.ssl.error = null; @@ -1024,6 +908,8 @@ SecurePair.prototype.error = function() { } else { this.cleartext.emit('error', err); } + + return err; } }; @@ -1155,7 +1041,11 @@ function Server(/* [options], listener */) { { server: self, NPNProtocols: self.NPNProtocols, - SNICallback: self.SNICallback + SNICallback: self.SNICallback, + + // Stream options + cleartext: self._cleartext, + encrypted: self._encrypted }); var cleartext = pipe(pair, socket); @@ -1254,6 +1144,8 @@ Server.prototype.setOptions = function(options) { .update(process.argv.join(' ')) .digest('hex'); } + if (options.cleartext) this.cleartext = options.cleartext; + if (options.encrypted) this.encrypted = options.encrypted; }; // SNI Contexts High-Level API @@ -1331,7 +1223,9 @@ exports.connect = function(/* [port, host], options, cb */) { options.rejectUnauthorized === true ? true : false, { NPNProtocols: this.NPNProtocols, - servername: hostname + servername: hostname, + cleartext: options.cleartext, + encrypted: options.encrypted }); if (options.session) {