|
@ -48,7 +48,6 @@ var ENOENT = constants.ENOENT; |
|
|
var EMFILE = constants.EMFILE; |
|
|
var EMFILE = constants.EMFILE; |
|
|
|
|
|
|
|
|
var END_OF_FILE = 42; |
|
|
var END_OF_FILE = 42; |
|
|
var SecureContext, SecureStream; // lazy loaded
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
var ioWatchers = new FreeList("iowatcher", 100, function () { |
|
|
var ioWatchers = new FreeList("iowatcher", 100, function () { |
|
@ -78,11 +77,6 @@ function allocNewPool () { |
|
|
pool.used = 0; |
|
|
pool.used = 0; |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
var securePool = null; |
|
|
|
|
|
function allocNewSecurePool () { |
|
|
|
|
|
securePool = new Buffer(40*1024); |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
var emptyBuffer = null; |
|
|
var emptyBuffer = null; |
|
|
function allocEmptyBuffer () { |
|
|
function allocEmptyBuffer () { |
|
|
emptyBuffer = new Buffer(1); |
|
|
emptyBuffer = new Buffer(1); |
|
@ -108,7 +102,7 @@ function setImplmentationMethods (self) { |
|
|
return sendMsg(self.fd, buf, off, len, fd, flags); |
|
|
return sendMsg(self.fd, buf, off, len, fd, flags); |
|
|
}; |
|
|
}; |
|
|
|
|
|
|
|
|
self._readImpl = function (buf, off, len, calledByIOWatcher) { |
|
|
self._readImpl = function (buf, off, len) { |
|
|
var bytesRead = recvMsg(self.fd, buf, off, len); |
|
|
var bytesRead = recvMsg(self.fd, buf, off, len); |
|
|
|
|
|
|
|
|
// Do not emit this in the same stack, otherwise we risk corrupting our
|
|
|
// Do not emit this in the same stack, otherwise we risk corrupting our
|
|
@ -139,7 +133,7 @@ function setImplmentationMethods (self) { |
|
|
return write(self.fd, buf, off, len); |
|
|
return write(self.fd, buf, off, len); |
|
|
}; |
|
|
}; |
|
|
|
|
|
|
|
|
self._readImpl = function (buf, off, len, calledByIOWatcher) { |
|
|
self._readImpl = function (buf, off, len) { |
|
|
return read(self.fd, buf, off, len); |
|
|
return read(self.fd, buf, off, len); |
|
|
}; |
|
|
}; |
|
|
} |
|
|
} |
|
@ -148,132 +142,13 @@ function setImplmentationMethods (self) { |
|
|
shutdown(self.fd, 'write'); |
|
|
shutdown(self.fd, 'write'); |
|
|
}; |
|
|
}; |
|
|
|
|
|
|
|
|
if (self.secure) { |
|
|
|
|
|
var oldWrite = self._writeImpl; |
|
|
|
|
|
self._writeImpl = function (buf, off, len, fd, flags) { |
|
|
|
|
|
assert(buf); |
|
|
|
|
|
assert(self.secure); |
|
|
|
|
|
|
|
|
|
|
|
var bytesWritten = self.secureStream.clearIn(buf, off, len); |
|
|
|
|
|
|
|
|
|
|
|
if (!securePool) { |
|
|
|
|
|
allocNewSecurePool(); |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
var secureLen = self.secureStream.encOut(securePool, |
|
|
|
|
|
0, |
|
|
|
|
|
securePool.length); |
|
|
|
|
|
|
|
|
|
|
|
if (secureLen == -1) { |
|
|
|
|
|
// Check our read again for secure handshake
|
|
|
|
|
|
self._onReadable(); |
|
|
|
|
|
} else { |
|
|
|
|
|
oldWrite(securePool, 0, secureLen, fd, flags); |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
if (!self.secureEstablished && self.secureStream.isInitFinished()) { |
|
|
|
|
|
self.secureEstablished = true; |
|
|
|
|
|
|
|
|
|
|
|
if (self._events && self._events['secure']) { |
|
|
|
|
|
self.emit('secure'); |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
return bytesWritten; |
|
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
|
|
var oldRead = self._readImpl; |
|
|
|
|
|
self._readImpl = function (buf, off, len, calledByIOWatcher) { |
|
|
|
|
|
assert(self.secure); |
|
|
|
|
|
|
|
|
|
|
|
var bytesRead = 0; |
|
|
|
|
|
var secureBytesRead = null; |
|
|
|
|
|
|
|
|
|
|
|
if (!securePool) { |
|
|
|
|
|
allocNewSecurePool(); |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
if (calledByIOWatcher) { |
|
|
|
|
|
secureBytesRead = oldRead(securePool, 0, securePool.length); |
|
|
|
|
|
self.secureStream.encIn(securePool, 0, secureBytesRead); |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
var chunkBytes; |
|
|
|
|
|
do { |
|
|
|
|
|
chunkBytes = |
|
|
|
|
|
self.secureStream.clearOut(pool, |
|
|
|
|
|
pool.used + bytesRead, |
|
|
|
|
|
pool.length - pool.used - bytesRead); |
|
|
|
|
|
bytesRead += chunkBytes; |
|
|
|
|
|
} while ((chunkBytes > 0) && (pool.used + bytesRead < pool.length)); |
|
|
|
|
|
|
|
|
|
|
|
if (bytesRead == 0 && !calledByIOWatcher) { |
|
|
|
|
|
return -1; |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
if (self.secureStream.clearPending()) { |
|
|
|
|
|
process.nextTick(function () { |
|
|
|
|
|
if (self.readable) self._onReadable(); |
|
|
|
|
|
}); |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
if (!self.secureEstablished) { |
|
|
|
|
|
if (self.secureStream.isInitFinished()) { |
|
|
|
|
|
self.secureEstablished = true; |
|
|
|
|
|
if (self._events && self._events['secure']) { |
|
|
|
|
|
self.emit('secure'); |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
if (calledByIOWatcher && secureBytesRead === null && !self.server) { |
|
|
|
|
|
// Client needs to write as part of handshake
|
|
|
|
|
|
self._writeWatcher.start(); |
|
|
|
|
|
return -1; |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
if (bytesRead == 0 && secureBytesRead > 0) { |
|
|
|
|
|
// Deal with SSL handshake
|
|
|
|
|
|
if (self.server) { |
|
|
|
|
|
self._checkForSecureHandshake(); |
|
|
|
|
|
} else { |
|
|
|
|
|
if (self.secureEstablised) { |
|
|
|
|
|
self.flush(); |
|
|
|
|
|
} else { |
|
|
|
|
|
self._checkForSecureHandshake(); |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
return -1; |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
return bytesRead; |
|
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
|
|
var oldShutdown = self._shutdownImpl; |
|
|
|
|
|
self._shutdownImpl = function () { |
|
|
|
|
|
self.secureStream.shutdown(); |
|
|
|
|
|
|
|
|
|
|
|
if (!securePool) { |
|
|
|
|
|
allocNewSecurePool(); |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
var len = self.secureStream.encOut(securePool, 0, securePool.length); |
|
|
|
|
|
|
|
|
|
|
|
try { |
|
|
|
|
|
oldWrite(securePool, 0, len); |
|
|
|
|
|
} catch (e) { } |
|
|
|
|
|
|
|
|
|
|
|
oldShutdown(); |
|
|
|
|
|
}; |
|
|
|
|
|
} |
|
|
|
|
|
}; |
|
|
}; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
function onReadable (readable, writeable) { |
|
|
function onReadable (readable, writeable) { |
|
|
assert(this.socket); |
|
|
assert(this.socket); |
|
|
var socket = this.socket; |
|
|
var socket = this.socket; |
|
|
socket._onReadable(true); |
|
|
socket._onReadable(); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
@ -312,13 +187,11 @@ function Stream (options) { |
|
|
|
|
|
|
|
|
this.fd = null; |
|
|
this.fd = null; |
|
|
this.type = null; |
|
|
this.type = null; |
|
|
this.secure = false; |
|
|
|
|
|
this.allowHalfOpen = false; |
|
|
this.allowHalfOpen = false; |
|
|
|
|
|
|
|
|
if (typeof options == "object") { |
|
|
if (typeof options == "object") { |
|
|
this.fd = options.fd !== undefined ? parseInt(options.fd, 10) : null; |
|
|
this.fd = options.fd !== undefined ? parseInt(options.fd, 10) : null; |
|
|
this.type = options.type || null; |
|
|
this.type = options.type || null; |
|
|
this.secure = options.secure || false; |
|
|
|
|
|
this.allowHalfOpen = options.allowHalfOpen || false; |
|
|
this.allowHalfOpen = options.allowHalfOpen || false; |
|
|
} else if (typeof options == "number") { |
|
|
} else if (typeof options == "number") { |
|
|
this.fd = arguments[0]; |
|
|
this.fd = arguments[0]; |
|
@ -340,76 +213,6 @@ Stream.prototype._onTimeout = function () { |
|
|
}; |
|
|
}; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Stream.prototype.setSecure = function (credentials) { |
|
|
|
|
|
// Do we have openssl crypto?
|
|
|
|
|
|
try { |
|
|
|
|
|
SecureContext = process.binding('crypto').SecureContext; |
|
|
|
|
|
SecureStream = process.binding('crypto').SecureStream; |
|
|
|
|
|
} catch (e) { |
|
|
|
|
|
throw new Error('node.js not compiled with openssl crypto support.'); |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
var crypto = require("crypto"); |
|
|
|
|
|
this.secure = true; |
|
|
|
|
|
this.secureEstablished = false; |
|
|
|
|
|
// If no credentials given, create a new one for just this Stream
|
|
|
|
|
|
if (!credentials) { |
|
|
|
|
|
this.credentials = crypto.createCredentials(); |
|
|
|
|
|
} else { |
|
|
|
|
|
this.credentials = credentials; |
|
|
|
|
|
} |
|
|
|
|
|
if (!this.server) { |
|
|
|
|
|
// For clients, we will always have either a given ca list or the default on
|
|
|
|
|
|
this.credentials.shouldVerify = true; |
|
|
|
|
|
} |
|
|
|
|
|
this.secureStream = new SecureStream(this.credentials.context, |
|
|
|
|
|
this.server ? true : false, |
|
|
|
|
|
this.credentials.shouldVerify); |
|
|
|
|
|
|
|
|
|
|
|
setImplmentationMethods(this); |
|
|
|
|
|
|
|
|
|
|
|
if (!this.server) { |
|
|
|
|
|
// If client, trigger handshake
|
|
|
|
|
|
this._checkForSecureHandshake(); |
|
|
|
|
|
} |
|
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Stream.prototype.verifyPeer = function () { |
|
|
|
|
|
if (!this.secure) { |
|
|
|
|
|
throw new Error('Stream is not a secure stream.'); |
|
|
|
|
|
} |
|
|
|
|
|
return this.secureStream.verifyPeer(this.credentials.context); |
|
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Stream.prototype._checkForSecureHandshake = function () { |
|
|
|
|
|
if (!this.writable) { |
|
|
|
|
|
return; |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// Do an empty write to see if we need to write out as part of handshake
|
|
|
|
|
|
if (!emptyBuffer) allocEmptyBuffer(); |
|
|
|
|
|
this.write(emptyBuffer); |
|
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Stream.prototype.getPeerCertificate = function (credentials) { |
|
|
|
|
|
if (!this.secure) { |
|
|
|
|
|
throw new Error('Stream is not a secure stream.'); |
|
|
|
|
|
} |
|
|
|
|
|
return this.secureStream.getPeerCertificate(); |
|
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Stream.prototype.getCipher = function () { |
|
|
|
|
|
if (!this.secure) { |
|
|
|
|
|
throw new Error('Stream is not a secure stream.'); |
|
|
|
|
|
} |
|
|
|
|
|
return this.secureStream.getCurrentCipher(); |
|
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Stream.prototype.open = function (fd, type) { |
|
|
Stream.prototype.open = function (fd, type) { |
|
|
initStream(this); |
|
|
initStream(this); |
|
|
|
|
|
|
|
@ -699,7 +502,7 @@ Stream.prototype._onWritable = function () { |
|
|
}; |
|
|
}; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Stream.prototype._onReadable = function (calledByIOWatcher) { |
|
|
Stream.prototype._onReadable = function () { |
|
|
var self = this; |
|
|
var self = this; |
|
|
|
|
|
|
|
|
// If this is the first recv (pool doesn't exist) or we've used up
|
|
|
// If this is the first recv (pool doesn't exist) or we've used up
|
|
@ -717,8 +520,7 @@ Stream.prototype._onReadable = function (calledByIOWatcher) { |
|
|
try { |
|
|
try { |
|
|
bytesRead = self._readImpl(pool, |
|
|
bytesRead = self._readImpl(pool, |
|
|
pool.used, |
|
|
pool.used, |
|
|
pool.length - pool.used, |
|
|
pool.length - pool.used); |
|
|
calledByIOWatcher); |
|
|
|
|
|
} catch (e) { |
|
|
} catch (e) { |
|
|
self.destroy(e); |
|
|
self.destroy(e); |
|
|
return; |
|
|
return; |
|
@ -760,11 +562,6 @@ Stream.prototype._onReadable = function (calledByIOWatcher) { |
|
|
|
|
|
|
|
|
// Optimization: emit the original buffer with end points
|
|
|
// Optimization: emit the original buffer with end points
|
|
|
if (self.ondata) self.ondata(pool, start, end); |
|
|
if (self.ondata) self.ondata(pool, start, end); |
|
|
} else if (bytesRead == -2) { |
|
|
|
|
|
// Temporary fix - need SSL refactor.
|
|
|
|
|
|
// -2 originates from SecureStream::ReadExtract
|
|
|
|
|
|
self.destroy(new Error('openssl read error')); |
|
|
|
|
|
return false; |
|
|
|
|
|
} |
|
|
} |
|
|
}; |
|
|
}; |
|
|
|
|
|
|
|
@ -873,10 +670,6 @@ Stream.prototype.destroy = function (exception) { |
|
|
|
|
|
|
|
|
require('timers').unenroll(this); |
|
|
require('timers').unenroll(this); |
|
|
|
|
|
|
|
|
if (this.secure) { |
|
|
|
|
|
this.secureStream.close(); |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
if (this.server) { |
|
|
if (this.server) { |
|
|
this.server.connections--; |
|
|
this.server.connections--; |
|
|
} |
|
|
} |
|
|