Browse Source

tls: port CryptoStream to streams2

v0.9.9-release
Fedor Indutny 12 years ago
parent
commit
d59beb9f68
  1. 666
      lib/tls.js

666
lib/tls.js

@ -24,8 +24,7 @@ var util = require('util');
var net = require('net');
var url = require('url');
var events = require('events');
var Stream = require('stream');
var END_OF_FILE = 42;
var stream = require('stream');
var assert = require('assert').ok;
var constants = require('constants');
@ -209,14 +208,17 @@ SlabBuffer.prototype.create = function create() {
};
SlabBuffer.prototype.use = function use(context, fn) {
SlabBuffer.prototype.use = function use(context, fn, size) {
if (this.remaining === 0) {
this.isFull = true;
return 0;
}
var bytes = fn.call(context, this.pool, this.offset, this.remaining);
var actualSize = this.remaining;
if (size !== null) actualSize = Math.min(size, actualSize);
var bytes = fn.call(context, this.pool, this.offset, actualSize);
if (bytes > 0) {
this.offset += bytes;
this.remaining -= bytes;
@ -232,93 +234,232 @@ var slabBuffer = null;
// Base class of both CleartextStream and EncryptedStream
function CryptoStream(pair) {
Stream.call(this);
function CryptoStream(pair, options) {
stream.Duplex.call(this, options);
this.pair = pair;
this._pending = null;
this._pendingCallback = null;
this._doneFlag = false;
this._resumingSession = false;
this._destroyed = false;
this._ended = false;
this._finished = false;
this._opposite = null;
this.readable = this.writable = true;
this._paused = false;
this._needDrain = false;
this._pending = [];
this._pendingCallbacks = [];
this._pendingBytes = 0;
if (slabBuffer === null) slabBuffer = new SlabBuffer();
this._buffer = slabBuffer;
this.once('finish', onCryptoStreamFinish);
// net.Socket calls .onend too
this.once('end', onCryptoStreamEnd);
}
util.inherits(CryptoStream, Stream);
util.inherits(CryptoStream, stream.Duplex);
function onCryptoStreamFinish() {
this._finished = true;
CryptoStream.prototype.write = function(data /* , encoding, cb */) {
if (this == this.pair.cleartext) {
debug('cleartext.write called with ' + data.length + ' bytes');
if (this === this.pair.cleartext) {
debug('cleartext.onfinish');
if (this.pair.ssl) {
// Generate close notify
// NOTE: first call checks if client has sent us shutdown,
// second call enqueues shutdown into the BIO.
if (this.pair.ssl.shutdown() !== 1) {
this.pair.ssl.shutdown();
}
}
} else {
debug('encrypted.write called with ' + data.length + ' bytes');
debug('encrypted.onfinish');
}
if (!this.writable) {
throw new Error('CryptoStream is not writable');
}
// Try to read just to get sure that we won't miss EOF
if (this._opposite.readable) this._opposite.read(0);
var encoding, cb;
if (this._opposite._ended) {
this._done();
// parse arguments
if (typeof arguments[1] == 'string') {
encoding = arguments[1];
cb = arguments[2];
} else {
cb = arguments[1];
// No half-close, sorry
if (this === this.pair.cleartext) this._opposite._done();
}
}
// Transform strings into buffers.
if (typeof data == 'string') {
data = new Buffer(data, encoding);
function onCryptoStreamEnd() {
this._ended = true;
if (this === this.pair.cleartext) {
debug('cleartext.onend');
} else {
debug('encrypted.onend');
}
debug((this === this.pair.cleartext ? 'clear' : 'encrypted') + 'In data');
if (this.onend) this.onend();
}
this._pending.push(data);
this._pendingCallbacks.push(cb);
this._pendingBytes += data.length;
this.pair._writeCalled = true;
this.pair.cycle();
CryptoStream.prototype._write = function write(data, cb) {
assert(this._pending === null);
// In the following cases, write() should return a false,
// then this stream should eventually emit 'drain' event.
//
// 1. There are pending data more than 128k bytes.
// 2. A forward stream shown below is paused.
// A) EncryptedStream for CleartextStream.write().
// B) CleartextStream for EncryptedStream.write().
// Black-hole data
if (!this.pair.ssl) return cb(null);
// When resuming session don't accept any new data.
// And do not put too much data into openssl, before writing it from encrypted
// side.
//
if (!this._needDrain) {
if (this._pendingBytes >= 128 * 1024) {
this._needDrain = true;
// TODO(indutny): Remove magic number, use watermark based limits
if (!this._resumingSession &&
(this !== this.pair.cleartext ||
this.pair.encrypted._internallyPendingBytes() < 128 * 1024)) {
// Write current buffer now
var written;
if (this === this.pair.cleartext) {
debug('cleartext.write called with ' + data.length + ' bytes');
written = this.pair.ssl.clearIn(data, 0, data.length);
} else {
debug('encrypted.write called with ' + data.length + ' bytes');
written = this.pair.ssl.encIn(data, 0, data.length);
}
var self = this;
// Force SSL_read call to cycle some states/data inside OpenSSL
this.pair.cleartext.read(0);
// Cycle encrypted data
if (this.pair.encrypted._internallyPendingBytes()) {
this.pair.encrypted.read(0);
}
// Handle and report errors
if (this.pair.ssl && this.pair.ssl.error) {
return cb(this.pair.error());
}
// Get NPN and Server name when ready
this.pair.maybeInitFinished();
// Whole buffer was written
if (written === data.length) {
if (this === this.pair.cleartext) {
this._needDrain = this.pair.encrypted._paused;
debug('cleartext.write succeed with ' + data.length + ' bytes');
} else {
this._needDrain = this.pair.cleartext._paused;
debug('encrypted.write succeed with ' + data.length + ' bytes');
}
return cb(null);
}
assert(written === 0 || written === -1);
} else {
debug('cleartext.write queue is full');
// Force SSL_read call to cycle some states/data inside OpenSSL
this.pair.cleartext.read(0);
}
// No write has happened
this._pending = data;
this._pendingCallback = cb;
if (this === this.pair.cleartext) {
debug('cleartext.write queued with ' + data.length + ' bytes');
} else {
debug('encrypted.write queued with ' + data.length + ' bytes');
}
return !this._needDrain;
};
CryptoStream.prototype.pause = function() {
debug('paused ' + (this == this.pair.cleartext ? 'cleartext' : 'encrypted'));
this._paused = true;
CryptoStream.prototype._writePending = function writePending() {
var data = this._pending,
cb = this._pendingCallback;
this._pending = null;
this._pendingCallback = null;
this._write(data, cb);
};
CryptoStream.prototype.resume = function() {
debug('resume ' + (this == this.pair.cleartext ? 'cleartext' : 'encrypted'));
this._paused = false;
this.pair.cycle();
CryptoStream.prototype._read = function read(size, cb) {
// XXX: EOF?!
if (!this.pair.ssl) return cb(null, null);
// Wait for session to be resumed
if (this._resumingSession) return cb(null, '');
var out;
if (this === this.pair.cleartext) {
debug('cleartext.read called with ' + size + ' bytes');
out = this.pair.ssl.clearOut;
} else {
debug('encrypted.read called with ' + size + ' bytes');
out = this.pair.ssl.encOut;
}
var bytesRead = 0,
start = this._buffer.offset;
do {
var read = this._buffer.use(this.pair.ssl, out, size);
if (read > 0) {
bytesRead += read;
size -= read;
}
// Handle and report errors
if (this.pair.ssl && this.pair.ssl.error) {
this.pair.error();
break;
}
// Get NPN and Server name when ready
this.pair.maybeInitFinished();
} while (read > 0 && !this._buffer.isFull && bytesRead < size);
// Create new buffer if previous was filled up
var pool = this._buffer.pool;
if (this._buffer.isFull) this._buffer.create();
assert(bytesRead >= 0);
if (this === this.pair.cleartext) {
debug('cleartext.read succeed with ' + bytesRead + ' bytes');
} else {
debug('encrypted.read succeed with ' + bytesRead + ' bytes');
}
// Try writing pending data
if (this._pending !== null) this._writePending();
if (bytesRead === 0) {
// EOF when cleartext has finished and we have nothing to read
if (this._opposite._finished && this._internallyPendingBytes() === 0) {
// Perform graceful shutdown
this._done();
// No half-open, sorry!
if (this === this.pair.cleartext)
this._opposite._done();
return cb(null, null);
}
// Bail out
return cb(null, '');
}
// Give them requested data
if (this.ondata) {
var self = this;
this.ondata(pool, start, start + bytesRead);
// Consume data automatically
// simple/test-https-drain fails without it
process.nextTick(function() {
self.read(bytesRead);
});
}
return cb(null, pool.slice(start, start + bytesRead));
};
@ -340,11 +481,6 @@ CryptoStream.prototype.__defineGetter__('bytesWritten', function() {
return this.socket ? this.socket.bytesWritten : 0;
});
CryptoStream.prototype.setEncoding = function(encoding) {
var StringDecoder = require('string_decoder').StringDecoder; // lazy load
this._decoder = new StringDecoder(encoding);
};
// Example:
// C=US\nST=CA\nL=SF\nO=Joyent\nOU=Node.js\nCN=ca1\nemailAddress=ry@clouds.org
@ -409,53 +545,74 @@ CryptoStream.prototype.getCipher = function(err) {
};
CryptoStream.prototype.end = function(d) {
if (this.pair._doneFlag) return;
if (!this.writable) return;
if (d) {
this.write(d);
CryptoStream.prototype.end = function(chunk, encoding) {
if (this === this.pair.cleartext) {
debug('cleartext.end');
} else {
debug('encrypted.end');
}
this._pending.push(END_OF_FILE);
this._pendingCallbacks.push(null);
// If this is an encrypted stream then we need to disable further 'data'
// events.
// Write pending data first
if (this._pending !== null) this._writePending();
this.writable = false;
this.pair.cycle();
stream.Duplex.prototype.end.call(this, chunk, encoding);
};
CryptoStream.prototype.destroySoon = function(err) {
if (this.writable) {
this.end();
if (this === this.pair.cleartext) {
debug('cleartext.destroySoon');
} else {
this.destroy();
debug('encrypted.destroySoon');
}
if (this.writable)
this.end();
if (this._writableState.finishing || this._writableState.finished)
this.destroy();
else
this.once('finish', this.destroy);
};
CryptoStream.prototype.destroy = function(err) {
if (this.pair._doneFlag) return;
this.pair.destroy();
if (this._destroyed) return;
this._destroyed = true;
this.readable = this.writable = false;
// Destroy both ends
if (this === this.pair.cleartext) {
debug('cleartext.destroy');
} else {
debug('encrypted.destroy');
}
this._opposite.destroy();
var self = this;
process.nextTick(function() {
// Force EOF
self.push(null);
// Emit 'close' event
self.emit('close', err ? true : false);
});
};
CryptoStream.prototype._done = function() {
this._doneFlag = true;
if (this === this.pair.encrypted && !this.pair._secureEstablished)
return this.pair.error();
if (this.pair.cleartext._doneFlag &&
this.pair.encrypted._doneFlag &&
!this.pair._doneFlag) {
// If both streams are done:
if (!this.pair._secureEstablished) {
this.pair.error();
} else {
this.pair.destroy();
}
this.pair.destroy();
}
};
@ -478,182 +635,8 @@ Object.defineProperty(CryptoStream.prototype, 'readyState', {
});
// Move decrypted, clear data out into the application.
// From the user's perspective this occurs as a 'data' event
// on the pair.cleartext.
// also
// Move encrypted data to the stream. From the user's perspective this
// occurs as a 'data' event on the pair.encrypted. Usually the application
// will have some code which pipes the stream to a socket:
//
// pair.encrypted.on('data', function (d) {
// socket.write(d);
// });
//
CryptoStream.prototype._push = function() {
if (this == this.pair.encrypted && !this.writable) {
// If the encrypted side got EOF, we do not attempt
// to write out data anymore.
return;
}
while (!this._paused) {
var chunkBytes = 0,
bytesRead = 0,
start = this._buffer.offset;
do {
chunkBytes = this._buffer.use(this, this._pusher);
if (chunkBytes > 0) bytesRead += chunkBytes;
if (this.pair.ssl && this.pair.ssl.error) {
this.pair.error();
return;
}
this.pair.maybeInitFinished();
} while (chunkBytes > 0 && !this._buffer.isFull);
var pool = this._buffer.pool;
// Create new buffer if previous was filled up
if (this._buffer.isFull) this._buffer.create();
assert(bytesRead >= 0);
// Bail out if we didn't read any data.
if (bytesRead == 0) {
if (this._internallyPendingBytes() == 0 && this._destroyAfterPush) {
this._done();
}
return;
}
var chunk = pool.slice(start, start + bytesRead);
if (this === this.pair.cleartext) {
debug('cleartext emit "data" with ' + bytesRead + ' bytes');
} else {
debug('encrypted emit "data" with ' + bytesRead + ' bytes');
}
if (this._decoder) {
var string = this._decoder.write(chunk);
if (string.length) this.emit('data', string);
} else {
this.emit('data', chunk);
}
// Optimization: emit the original buffer with end points
if (this.ondata) this.ondata(pool, start, start + bytesRead);
}
};
// Push in any clear data coming from the application.
// This arrives via some code like this:
//
// pair.cleartext.write("hello world");
//
// also
//
// Push in incoming encrypted data from the socket.
// This arrives via some code like this:
//
// socket.on('data', function (d) {
// pair.encrypted.write(d)
// });
//
CryptoStream.prototype._pull = function() {
var havePending = this._pending.length > 0;
assert(havePending || this._pendingBytes == 0);
while (this._pending.length > 0) {
if (!this.pair.ssl) break;
var tmp = this._pending.shift();
var cb = this._pendingCallbacks.shift();
assert(this._pending.length === this._pendingCallbacks.length);
if (tmp === END_OF_FILE) {
// Sending EOF
if (this === this.pair.encrypted) {
debug('end encrypted ' + this.pair.fd);
this.pair.cleartext._destroyAfterPush = true;
} else {
// CleartextStream
assert(this === this.pair.cleartext);
debug('end cleartext');
this.pair.ssl.shutdown();
// TODO check if we get EAGAIN From shutdown, would have to do it
// again. should unshift END_OF_FILE back onto pending and wait for
// next cycle.
this.pair.encrypted._destroyAfterPush = true;
}
this.pair.cycle();
this._done();
return;
}
if (tmp.length == 0) continue;
var rv = this._puller(tmp);
if (this.pair.ssl && this.pair.ssl.error) {
this.pair.error();
return;
}
this.pair.maybeInitFinished();
if (rv === 0 || rv < 0) {
this._pending.unshift(tmp);
this._pendingCallbacks.unshift(cb);
break;
}
this._pendingBytes -= tmp.length;
assert(this._pendingBytes >= 0);
if (cb) cb();
assert(rv === tmp.length);
}
// If pending data has cleared, 'drain' event should be emitted
// after write() returns a false.
// Except when a forward stream shown below is paused.
// A) EncryptedStream for CleartextStream._pull().
// B) CleartextStream for EncryptedStream._pull().
//
if (this._needDrain && this._pending.length === 0) {
var paused;
if (this === this.pair.cleartext) {
paused = this.pair.encrypted._paused;
} else {
paused = this.pair.cleartext._paused;
}
if (!paused) {
debug('drain ' + (this === this.pair.cleartext ? 'clear' : 'encrypted'));
var self = this;
process.nextTick(function() {
self.emit('drain');
});
this._needDrain = false;
if (this.__destroyOnDrain) this.end();
}
}
};
function CleartextStream(pair) {
CryptoStream.call(this, pair);
function CleartextStream(pair, options) {
CryptoStream.call(this, pair, options);
}
util.inherits(CleartextStream, CryptoStream);
@ -667,22 +650,11 @@ CleartextStream.prototype._internallyPendingBytes = function() {
};
CleartextStream.prototype._puller = function(b) {
debug('clearIn ' + b.length + ' bytes');
return this.pair.ssl.clearIn(b, 0, b.length);
};
CleartextStream.prototype._pusher = function(pool, offset, length) {
debug('reading from clearOut');
if (!this.pair.ssl) return -1;
return this.pair.ssl.clearOut(pool, offset, length);
};
CleartextStream.prototype.address = function() {
return this.socket && this.socket.address();
};
CleartextStream.prototype.__defineGetter__('remoteAddress', function() {
return this.socket && this.socket.remoteAddress;
});
@ -692,8 +664,8 @@ CleartextStream.prototype.__defineGetter__('remotePort', function() {
return this.socket && this.socket.remotePort;
});
function EncryptedStream(pair) {
CryptoStream.call(this, pair);
function EncryptedStream(pair, options) {
CryptoStream.call(this, pair, options);
}
util.inherits(EncryptedStream, CryptoStream);
@ -707,19 +679,6 @@ EncryptedStream.prototype._internallyPendingBytes = function() {
};
EncryptedStream.prototype._puller = function(b) {
debug('writing from encIn');
return this.pair.ssl.encIn(b, 0, b.length);
};
EncryptedStream.prototype._pusher = function(pool, offset, length) {
debug('reading from encOut');
if (!this.pair.ssl) return -1;
return this.pair.ssl.encOut(pool, offset, length);
};
function onhandshakestart() {
debug('onhandshakestart');
@ -754,12 +713,12 @@ function onhandshakedone() {
debug('onhandshakedone');
}
function onclienthello(hello) {
var self = this,
once = false;
this.encrypted.pause();
this.cleartext.pause();
this._resumingSession = true;
function callback(err, session) {
if (once) return;
once = true;
@ -768,8 +727,10 @@ function onclienthello(hello) {
self.ssl.loadSession(session);
self.encrypted.resume();
self.cleartext.resume();
// Cycle data
self._resumingSession = false;
self.cleartext.read(0);
self.encrypted.read(0);
}
if (hello.sessionId.length <= 0 ||
@ -812,6 +773,7 @@ function SecurePair(credentials, isServer, requestCert, rejectUnauthorized,
this._encWriteState = true;
this._clearWriteState = true;
this._doneFlag = false;
this._destroying = false;
if (!credentials) {
this.credentials = crypto.createCredentials();
@ -856,17 +818,20 @@ function SecurePair(credentials, isServer, requestCert, rejectUnauthorized,
}
/* Acts as a r/w stream to the cleartext side of the stream. */
this.cleartext = new CleartextStream(this);
this.cleartext = new CleartextStream(this, options.cleartext);
/* Acts as a r/w stream to the encrypted side of the stream. */
this.encrypted = new EncryptedStream(this);
this.encrypted = new EncryptedStream(this, options.encrypted);
/* Let streams know about each other */
this.cleartext._opposite = this.encrypted;
this.encrypted._opposite = this.cleartext;
process.nextTick(function() {
/* The Connection may be destroyed by an abort call */
if (self.ssl) {
self.ssl.start();
}
self.cycle();
});
}
@ -885,81 +850,6 @@ exports.createSecurePair = function(credentials,
};
/* Attempt to cycle OpenSSLs buffers in various directions.
*
* An SSL Connection can be viewed as four separate piplines,
* interacting with one has no connection to the behavoir of
* any of the other 3 -- This might not sound reasonable,
* but consider things like mid-stream renegotiation of
* the ciphers.
*
* The four pipelines, using terminology of the client (server is just
* reversed):
* (1) Encrypted Output stream (Writing encrypted data to peer)
* (2) Encrypted Input stream (Reading encrypted data from peer)
* (3) Cleartext Output stream (Decrypted content from the peer)
* (4) Cleartext Input stream (Cleartext content to send to the peer)
*
* This function attempts to pull any available data out of the Cleartext
* input stream (4), and the Encrypted input stream (2). Then it pushes any
* data available from the cleartext output stream (3), and finally from the
* Encrypted output stream (1)
*
* It is called whenever we do something with OpenSSL -- post reciving
* content, trying to flush, trying to change ciphers, or shutting down the
* connection.
*
* Because it is also called everywhere, we also check if the connection has
* completed negotiation and emit 'secure' from here if it has.
*/
SecurePair.prototype.cycle = function(depth) {
if (this._doneFlag) return;
depth = depth ? depth : 0;
if (depth == 0) this._writeCalled = false;
var established = this._secureEstablished;
if (!this.cycleEncryptedPullLock) {
this.cycleEncryptedPullLock = true;
debug('encrypted._pull');
this.encrypted._pull();
this.cycleEncryptedPullLock = false;
}
if (!this.cycleCleartextPullLock) {
this.cycleCleartextPullLock = true;
debug('cleartext._pull');
this.cleartext._pull();
this.cycleCleartextPullLock = false;
}
if (!this.cycleCleartextPushLock) {
this.cycleCleartextPushLock = true;
debug('cleartext._push');
this.cleartext._push();
this.cycleCleartextPushLock = false;
}
if (!this.cycleEncryptedPushLock) {
this.cycleEncryptedPushLock = true;
debug('encrypted._push');
this.encrypted._push();
this.cycleEncryptedPushLock = false;
}
if ((!established && this._secureEstablished) ||
(depth == 0 && this._writeCalled)) {
// If we were not established but now we are, let's cycle again.
// Or if there is some data to write...
this.cycle(depth + 1);
}
};
SecurePair.prototype.maybeInitFinished = function() {
if (this.ssl && !this._secureEstablished && this.ssl.isInitFinished()) {
if (process.features.tls_npn) {
@ -978,27 +868,20 @@ SecurePair.prototype.maybeInitFinished = function() {
SecurePair.prototype.destroy = function() {
var self = this;
if (this._destroying) return;
if (!this._doneFlag) {
debug('SecurePair.destroy');
this._destroying = true;
// SecurePair should be destroyed only after it's streams
this.cleartext.destroy();
this.encrypted.destroy();
this._doneFlag = true;
this.ssl.error = null;
this.ssl.close();
this.ssl = null;
self.encrypted.writable = self.encrypted.readable = false;
self.cleartext.writable = self.cleartext.readable = false;
process.nextTick(function() {
if (self.cleartext._decoder) {
var ret = self.cleartext._decoder.end();
if (ret)
self.cleartext.emit('data', ret);
}
self.cleartext.emit('end');
self.encrypted.emit('close');
self.cleartext.emit('close');
});
}
};
@ -1012,6 +895,7 @@ SecurePair.prototype.error = function() {
}
this.destroy();
this.emit('error', error);
return error;
} else {
var err = this.ssl.error;
this.ssl.error = null;
@ -1024,6 +908,8 @@ SecurePair.prototype.error = function() {
} else {
this.cleartext.emit('error', err);
}
return err;
}
};
@ -1155,7 +1041,11 @@ function Server(/* [options], listener */) {
{
server: self,
NPNProtocols: self.NPNProtocols,
SNICallback: self.SNICallback
SNICallback: self.SNICallback,
// Stream options
cleartext: self._cleartext,
encrypted: self._encrypted
});
var cleartext = pipe(pair, socket);
@ -1254,6 +1144,8 @@ Server.prototype.setOptions = function(options) {
.update(process.argv.join(' '))
.digest('hex');
}
if (options.cleartext) this.cleartext = options.cleartext;
if (options.encrypted) this.encrypted = options.encrypted;
};
// SNI Contexts High-Level API
@ -1331,7 +1223,9 @@ exports.connect = function(/* [port, host], options, cb */) {
options.rejectUnauthorized === true ? true : false,
{
NPNProtocols: this.NPNProtocols,
servername: hostname
servername: hostname,
cleartext: options.cleartext,
encrypted: options.encrypted
});
if (options.session) {

Loading…
Cancel
Save