From b9686233fc0be679d7ba1262b611711629ee334e Mon Sep 17 00:00:00 2001 From: Fedor Indutny Date: Sun, 22 Feb 2015 21:59:07 +0300 Subject: [PATCH] stream_base: introduce StreamBase StreamBase is an improved way to write C++ streams. The class itself is for separting `StreamWrap` (with the methods like `.writeAsciiString`, `.writeBuffer`, `.writev`, etc) from the `HandleWrap` class, making possible to write abstract C++ streams that are not bound to any uv socket. The following methods are important part of the abstraction (which mimics libuv's stream API): * Events: * `OnAlloc(size_t size, uv_buf_t*)` * `OnRead(ssize_t nread, const uv_buf_t*, uv_handle_type pending)` * `OnAfterWrite(WriteWrap*)` * Wrappers: * `DoShutdown(ShutdownWrap*)` * `DoTryWrite(uv_buf_t** bufs, size_t* count)` * `DoWrite(WriteWrap*, uv_buf_t*, size_t count, uv_stream_t* handle)` * `Error()` * `ClearError()` The implementation should provide all of these methods, thus providing the access to the underlying resource (be it uv handle, TLS socket, or anything else). A C++ stream may consume the input of another stream by replacing the event callbacks and proxying the writes. This kind of API is actually used now for the TLSWrap implementation, making it possible to wrap TLS stream into another TLS stream. Thus legacy API calls are no longer required in `_tls_wrap.js`. PR-URL: https://github.com/iojs/io.js/pull/840 Reviewed-By: Trevor Norris Reviewed-By: Chris Dickinson --- lib/_tls_legacy.js | 4 +- lib/_tls_wrap.js | 363 +++++----- lib/net.js | 4 +- node.gyp | 2 + src/env.h | 2 + src/js_stream.cc | 21 + src/js_stream.h | 20 + src/node_crypto.cc | 26 +- src/node_wrap.h | 19 +- src/pipe_wrap.cc | 21 +- src/stream_base.cc | 495 +++++++++++++ src/stream_base.h | 223 ++++++ src/stream_wrap.cc | 664 ++++-------------- src/stream_wrap.h | 166 +---- src/tcp_wrap.cc | 20 +- src/tls_wrap.cc | 302 +++++--- src/tls_wrap.h | 73 +- src/tty_wrap.cc | 18 +- .../test-tls-client-default-ciphers.js | 14 +- test/parallel/test-tls-close-notify.js | 4 +- test/parallel/test-tls-multi-key.js | 5 +- 21 files changed, 1406 insertions(+), 1060 deletions(-) create mode 100644 src/js_stream.cc create mode 100644 src/js_stream.h create mode 100644 src/stream_base.cc create mode 100644 src/stream_base.h diff --git a/lib/_tls_legacy.js b/lib/_tls_legacy.js index 4148085503..fc0d115aee 100644 --- a/lib/_tls_legacy.js +++ b/lib/_tls_legacy.js @@ -92,11 +92,11 @@ function onCryptoStreamFinish() { // Generate close notify // NOTE: first call checks if client has sent us shutdown, // second call enqueues shutdown into the BIO. - if (this.pair.ssl.shutdown() !== 1) { + if (this.pair.ssl.shutdownSSL() !== 1) { if (this.pair.ssl && this.pair.ssl.error) return this.pair.error(); - this.pair.ssl.shutdown(); + this.pair.ssl.shutdownSSL(); } if (this.pair.ssl && this.pair.ssl.error) diff --git a/lib/_tls_wrap.js b/lib/_tls_wrap.js index fb63667581..10221b99c3 100644 --- a/lib/_tls_wrap.js +++ b/lib/_tls_wrap.js @@ -11,14 +11,23 @@ const debug = util.debuglog('tls'); const Timer = process.binding('timer_wrap').Timer; const tls_wrap = process.binding('tls_wrap'); -// Lazy load -var tls_legacy; +// constructor for lazy loading +function createTCP() { + var TCP = process.binding('tcp_wrap').TCP; + return new TCP(); +} + +// constructor for lazy loading +function createPipe() { + var Pipe = process.binding('pipe_wrap').Pipe; + return new Pipe(); +} function onhandshakestart() { debug('onhandshakestart'); var self = this; - var ssl = self.ssl; + var ssl = self._handle; var now = Timer.now(); assert(now >= ssl.lastHandshakeTime); @@ -63,7 +72,7 @@ function loadSession(self, hello, cb) { // NOTE: That we have disabled OpenSSL's internal session storage in // `node_crypto.cc` and hence its safe to rely on getting servername only // from clienthello or this place. - var ret = self.ssl.loadSession(session); + var ret = self._handle.loadSession(session); cb(null, ret); } @@ -92,9 +101,9 @@ function loadSNI(self, servername, cb) { // TODO(indutny): eventually disallow raw `SecureContext` if (context) - self.ssl.sni_context = context.context || context; + self._handle.sni_context = context.context || context; - cb(null, self.ssl.sni_context); + cb(null, self._handle.sni_context); }); } @@ -127,7 +136,7 @@ function requestOCSP(self, hello, ctx, cb) { return cb(err); if (response) - self.ssl.setOCSPResponse(response); + self._handle.setOCSPResponse(response); cb(null); } } @@ -161,7 +170,7 @@ function onclienthello(hello) { if (err) return self.destroy(err); - self.ssl.endParser(); + self._handle.endParser(); }); }); }); @@ -184,7 +193,7 @@ function onnewsession(key, session) { return; once = true; - self.ssl.newSessionDone(); + self._handle.newSessionDone(); self._newSessionPending = false; if (self._securePending) @@ -204,29 +213,12 @@ function onocspresponse(resp) { */ function TLSSocket(socket, options) { - // Disallow wrapping TLSSocket in TLSSocket - assert(!(socket instanceof TLSSocket)); - - net.Socket.call(this, { - handle: socket && socket._handle, - allowHalfOpen: socket && socket.allowHalfOpen, - readable: false, - writable: false - }); - if (socket) { - this._parent = socket; - - // To prevent assertion in afterConnect() - this._connecting = socket._connecting; - } - this._tlsOptions = options; this._secureEstablished = false; this._securePending = false; this._newSessionPending = false; this._controlReleased = false; this._SNICallback = null; - this.ssl = null; this.servername = null; this.npnProtocol = null; this.authorized = false; @@ -236,15 +228,19 @@ function TLSSocket(socket, options) { // distinguishable from regular ones. this.encrypted = true; + net.Socket.call(this, { + handle: this._wrapHandle(socket && socket._handle), + allowHalfOpen: socket && socket.allowHalfOpen, + readable: false, + writable: false + }); + + // Proxy for API compatibility + this.ssl = this._handle; + this.on('error', this._tlsError); - if (!this._handle) { - this.once('connect', function() { - this._init(null); - }); - } else { - this._init(socket); - } + this._init(socket); // Make sure to setup all required properties like: `_connecting` before // starting the flow of the data @@ -255,23 +251,53 @@ function TLSSocket(socket, options) { util.inherits(TLSSocket, net.Socket); exports.TLSSocket = TLSSocket; -TLSSocket.prototype._init = function(socket) { - assert(this._handle); +var proxiedMethods = [ + 'close', 'ref', 'unref', 'open', 'bind', 'listen', 'connect', 'bind6', + 'connect6', 'getsockname', 'getpeername', 'setNoDelay', 'setKeepAlive', + 'setSimultaneousAccepts', 'setBlocking', - // lib/net.js expect this value to be non-zero if write hasn't been flushed - // immediately - // TODO(indutny): rewise this solution, it might be 1 before handshake and - // represent real writeQueueSize during regular writes. - this._handle.writeQueueSize = 1; + // PipeWrap + 'setPendingInstances' +]; + +TLSSocket.prototype._wrapHandle = function(handle) { + var res; - var self = this; var options = this._tlsOptions; + if (!handle) { + handle = options.pipe ? createPipe() : createTCP(); + handle.owner = this; + } // Wrap socket's handle var context = options.secureContext || options.credentials || tls.createSecureContext(); - this.ssl = tls_wrap.wrap(this._handle, context.context, options.isServer); + res = tls_wrap.wrap(handle, context.context, options.isServer); + res._parent = handle; + res._reading = handle._reading; + + // Proxy HandleWrap, PipeWrap and TCPWrap methods + proxiedMethods.forEach(function(name) { + res[name] = function methodProxy() { + return handle[name].apply(handle, arguments); + }; + }); + + return res; +}; + +TLSSocket.prototype._init = function(socket) { + var self = this; + var options = this._tlsOptions; + var ssl = this._handle; + + // lib/net.js expect this value to be non-zero if write hasn't been flushed + // immediately + // TODO(indutny): rewise this solution, it might be 1 before handshake and + // represent real writeQueueSize during regular writes. + ssl.writeQueueSize = 1; + this.server = options.server || null; // For clients, we will always have either a given ca list or be using @@ -282,32 +308,32 @@ TLSSocket.prototype._init = function(socket) { this._requestCert = requestCert; this._rejectUnauthorized = rejectUnauthorized; if (requestCert || rejectUnauthorized) - this.ssl.setVerifyMode(requestCert, rejectUnauthorized); + ssl.setVerifyMode(requestCert, rejectUnauthorized); if (options.isServer) { - this.ssl.onhandshakestart = onhandshakestart.bind(this); - this.ssl.onhandshakedone = onhandshakedone.bind(this); - this.ssl.onclienthello = onclienthello.bind(this); - this.ssl.onnewsession = onnewsession.bind(this); - this.ssl.lastHandshakeTime = 0; - this.ssl.handshakes = 0; + ssl.onhandshakestart = onhandshakestart.bind(this); + ssl.onhandshakedone = onhandshakedone.bind(this); + ssl.onclienthello = onclienthello.bind(this); + ssl.onnewsession = onnewsession.bind(this); + ssl.lastHandshakeTime = 0; + ssl.handshakes = 0; if (this.server && (listenerCount(this.server, 'resumeSession') > 0 || listenerCount(this.server, 'newSession') > 0 || listenerCount(this.server, 'OCSPRequest') > 0)) { - this.ssl.enableSessionCallbacks(); + ssl.enableSessionCallbacks(); } } else { - this.ssl.onhandshakestart = function() {}; - this.ssl.onhandshakedone = this._finishInit.bind(this); - this.ssl.onocspresponse = onocspresponse.bind(this); + ssl.onhandshakestart = function() {}; + ssl.onhandshakedone = this._finishInit.bind(this); + ssl.onocspresponse = onocspresponse.bind(this); if (options.session) - this.ssl.setSession(options.session); + ssl.setSession(options.session); } - this.ssl.onerror = function(err) { + ssl.onerror = function(err) { if (self._writableState.errorEmitted) return; self._writableState.errorEmitted = true; @@ -337,11 +363,11 @@ TLSSocket.prototype._init = function(socket) { options.server._contexts.length)) { assert(typeof options.SNICallback === 'function'); this._SNICallback = options.SNICallback; - this.ssl.enableHelloParser(); + ssl.enableHelloParser(); } if (process.features.tls_npn && options.NPNProtocols) - this.ssl.setNPNProtocols(options.NPNProtocols); + ssl.setNPNProtocols(options.NPNProtocols); if (options.handshakeTimeout > 0) this.setTimeout(options.handshakeTimeout, this._handleTimeout); @@ -350,8 +376,23 @@ TLSSocket.prototype._init = function(socket) { if (socket && socket._readableState.length) { var buf; while ((buf = socket.read()) !== null) - this.ssl.receive(buf); + ssl.receive(buf); + } + + if (socket) { + this._parent = socket; + + // To prevent assertion in afterConnect() and properly kick off readStart + this._connecting = socket._connecting; + socket.once('connect', function() { + self._connecting = false; + self.emit('connect'); + }); } + + // Assume `tls.connect()` + if (!socket) + this._connecting = true; }; TLSSocket.prototype.renegotiate = function(options, callback) { @@ -365,11 +406,11 @@ TLSSocket.prototype.renegotiate = function(options, callback) { if (requestCert !== this._requestCert || rejectUnauthorized !== this._rejectUnauthorized) { - this.ssl.setVerifyMode(requestCert, rejectUnauthorized); + this._handle.setVerifyMode(requestCert, rejectUnauthorized); this._requestCert = requestCert; this._rejectUnauthorized = rejectUnauthorized; } - if (!this.ssl.renegotiate()) { + if (!this._handle.renegotiate()) { if (callback) { process.nextTick(function() { callback(new Error('Failed to renegotiate')); @@ -391,11 +432,11 @@ TLSSocket.prototype.renegotiate = function(options, callback) { }; TLSSocket.prototype.setMaxSendFragment = function setMaxSendFragment(size) { - return this.ssl.setMaxSendFragment(size) == 1; + return this._handle.setMaxSendFragment(size) == 1; }; TLSSocket.prototype.getTLSTicket = function getTLSTicket() { - return this.ssl.getTLSTicket(); + return this._handle.getTLSTicket(); }; TLSSocket.prototype._handleTimeout = function() { @@ -424,11 +465,11 @@ TLSSocket.prototype._finishInit = function() { } if (process.features.tls_npn) { - this.npnProtocol = this.ssl.getNegotiatedProtocol(); + this.npnProtocol = this._handle.getNegotiatedProtocol(); } if (process.features.tls_sni && this._tlsOptions.isServer) { - this.servername = this.ssl.getServername(); + this.servername = this._handle.getServername(); } debug('secure established'); @@ -439,49 +480,56 @@ TLSSocket.prototype._finishInit = function() { }; TLSSocket.prototype._start = function() { + if (this._connecting) { + this.once('connect', function() { + this._start(); + }); + return; + } + if (this._tlsOptions.requestOCSP) - this.ssl.requestOCSP(); - this.ssl.start(); + this._handle.requestOCSP(); + this._handle.start(); }; TLSSocket.prototype.setServername = function(name) { - this.ssl.setServername(name); + this._handle.setServername(name); }; TLSSocket.prototype.setSession = function(session) { if (typeof session === 'string') session = new Buffer(session, 'binary'); - this.ssl.setSession(session); + this._handle.setSession(session); }; TLSSocket.prototype.getPeerCertificate = function(detailed) { - if (this.ssl) { + if (this._handle) { return common.translatePeerCertificate( - this.ssl.getPeerCertificate(detailed)); + this._handle.getPeerCertificate(detailed)); } return null; }; TLSSocket.prototype.getSession = function() { - if (this.ssl) { - return this.ssl.getSession(); + if (this._handle) { + return this._handle.getSession(); } return null; }; TLSSocket.prototype.isSessionReused = function() { - if (this.ssl) { - return this.ssl.isSessionReused(); + if (this._handle) { + return this._handle.isSessionReused(); } return null; }; TLSSocket.prototype.getCipher = function(err) { - if (this.ssl) { - return this.ssl.getCurrentCipher(); + if (this._handle) { + return this._handle.getCurrentCipher(); } else { return null; } @@ -620,7 +668,7 @@ function Server(/* [options], listener */) { socket.on('secure', function() { if (socket._requestCert) { - var verifyError = socket.ssl.verifyError(); + var verifyError = socket._handle.verifyError(); if (verifyError) { socket.authorizationError = verifyError.code; @@ -775,28 +823,6 @@ function normalizeConnectArgs(listArgs) { return (cb) ? [options, cb] : [options]; } -function legacyConnect(hostname, options, NPN, context) { - assert(options.socket); - if (!tls_legacy) - tls_legacy = require('_tls_legacy'); - - var pair = tls_legacy.createSecurePair(context, - false, - true, - !!options.rejectUnauthorized, - { - NPNProtocols: NPN.NPNProtocols, - servername: hostname - }); - tls_legacy.pipe(pair, options.socket); - pair.cleartext._controlReleased = true; - pair.on('error', function(err) { - pair.cleartext.emit('error', err); - }); - - return pair; -} - exports.connect = function(/* [port, host], options, cb */) { var args = normalizeConnectArgs(arguments); var options = args[0]; @@ -819,51 +845,21 @@ exports.connect = function(/* [port, host], options, cb */) { context = tls.createSecureContext(options); tls.convertNPNProtocols(options.NPNProtocols, NPN); - // Wrapping TLS socket inside another TLS socket was requested - - // create legacy secure pair - var socket; - var legacy; - var result; - if (options.socket instanceof TLSSocket) { - debug('legacy connect'); - legacy = true; - socket = legacyConnect(hostname, options, NPN, context); - result = socket.cleartext; - } else { - legacy = false; - socket = new TLSSocket(options.socket, { - secureContext: context, - isServer: false, - requestCert: true, - rejectUnauthorized: options.rejectUnauthorized, - session: options.session, - NPNProtocols: NPN.NPNProtocols, - requestOCSP: options.requestOCSP - }); - result = socket; - } - - if (socket._handle && !socket._connecting) { - onHandle(); - } else { - // Not even started connecting yet (or probably resolving dns address), - // catch socket errors and assign handle. - if (!legacy && options.socket) { - options.socket.once('connect', function() { - assert(options.socket._handle); - socket._handle = options.socket._handle; - socket._handle.owner = socket; - socket.emit('connect'); - }); - } - socket.once('connect', onHandle); - } + var socket = new TLSSocket(options.socket, { + pipe: options.path && !options.port, + secureContext: context, + isServer: false, + requestCert: true, + rejectUnauthorized: options.rejectUnauthorized, + session: options.session, + NPNProtocols: NPN.NPNProtocols, + requestOCSP: options.requestOCSP + }); if (cb) - result.once('secureConnect', cb); + socket.once('secureConnect', cb); if (!options.socket) { - assert(!legacy); var connect_opt; if (options.path && !options.port) { connect_opt = { path: options.path }; @@ -874,63 +870,62 @@ exports.connect = function(/* [port, host], options, cb */) { localAddress: options.localAddress }; } - socket.connect(connect_opt); + socket.connect(connect_opt, function() { + socket._start(); + }); } - return result; + socket._releaseControl(); - function onHandle() { - if (!legacy) - socket._releaseControl(); + if (options.session) + socket.setSession(options.session); - if (options.session) - socket.setSession(options.session); + if (options.servername) + socket.setServername(options.servername); - if (!legacy) { - if (options.servername) - socket.setServername(options.servername); + if (options.socket) + socket._start(); - socket._start(); - } - socket.on('secure', function() { - var verifyError = socket.ssl.verifyError(); + socket.on('secure', function() { + var verifyError = socket._handle.verifyError(); - // Verify that server's identity matches it's certificate's names - if (!verifyError) { - var cert = result.getPeerCertificate(); - verifyError = options.checkServerIdentity(hostname, cert); - } + // Verify that server's identity matches it's certificate's names + if (!verifyError) { + var cert = socket.getPeerCertificate(); + verifyError = options.checkServerIdentity(hostname, cert); + } - if (verifyError) { - result.authorized = false; - result.authorizationError = verifyError.code || verifyError.message; + if (verifyError) { + socket.authorized = false; + socket.authorizationError = verifyError.code || verifyError.message; - if (options.rejectUnauthorized) { - result.emit('error', verifyError); - result.destroy(); - return; - } else { - result.emit('secureConnect'); - } + if (options.rejectUnauthorized) { + socket.emit('error', verifyError); + socket.destroy(); + return; } else { - result.authorized = true; - result.emit('secureConnect'); + socket.emit('secureConnect'); } + } else { + socket.authorized = true; + socket.emit('secureConnect'); + } - // Uncork incoming data - result.removeListener('end', onHangUp); - }); + // Uncork incoming data + socket.removeListener('end', onHangUp); + }); - function onHangUp() { - // NOTE: This logic is shared with _http_client.js - if (!socket._hadError) { - socket._hadError = true; - var error = new Error('socket hang up'); - error.code = 'ECONNRESET'; - socket.destroy(); - socket.emit('error', error); - } + function onHangUp() { + // NOTE: This logic is shared with _http_client.js + if (!socket._hadError) { + socket._hadError = true; + var error = new Error('socket hang up'); + error.code = 'ECONNRESET'; + socket.destroy(); + socket.emit('error', error); } - result.once('end', onHangUp); } + socket.once('end', onHangUp); + + return socket; }; diff --git a/lib/net.js b/lib/net.js index 5bf2f292fb..cdabe6e798 100644 --- a/lib/net.js +++ b/lib/net.js @@ -961,7 +961,9 @@ function afterConnect(status, handle, req, readable, writable) { return; } - assert(handle === self._handle, 'handle != self._handle'); + // Update handle if it was wrapped + // TODO(indutny): assert that the handle is actually an ancestor of old one + handle = self._handle; debug('afterConnect'); diff --git a/node.gyp b/node.gyp index 01a67a08c8..996121ee45 100644 --- a/node.gyp +++ b/node.gyp @@ -115,6 +115,7 @@ 'src/smalloc.cc', 'src/spawn_sync.cc', 'src/string_bytes.cc', + 'src/stream_base.cc', 'src/stream_wrap.cc', 'src/tcp_wrap.cc', 'src/timer_wrap.cc', @@ -151,6 +152,7 @@ 'src/req-wrap.h', 'src/req-wrap-inl.h', 'src/string_bytes.h', + 'src/stream_base.h', 'src/stream_wrap.h', 'src/tree.h', 'src/util.h', diff --git a/src/env.h b/src/env.h index ccacbb09f5..c9b4cc0736 100644 --- a/src/env.h +++ b/src/env.h @@ -234,8 +234,10 @@ namespace node { V(tcp_constructor_template, v8::FunctionTemplate) \ V(tick_callback_function, v8::Function) \ V(tls_wrap_constructor_function, v8::Function) \ + V(tls_wrap_constructor_template, v8::FunctionTemplate) \ V(tty_constructor_template, v8::FunctionTemplate) \ V(udp_constructor_function, v8::Function) \ + V(write_wrap_constructor_function, v8::Function) \ class Environment; diff --git a/src/js_stream.cc b/src/js_stream.cc new file mode 100644 index 0000000000..3cc3a895fc --- /dev/null +++ b/src/js_stream.cc @@ -0,0 +1,21 @@ +#include "js_stream.h" + +#include "async-wrap.h" +#include "env.h" +#include "env-inl.h" +#include "stream_base.h" +#include "v8.h" + +namespace node { + +using v8::Context; +using v8::Handle; +using v8::Object; +using v8::Value; + +void JSStream::Initialize(Handle target, + Handle unused, + Handle context) { +} + +} // namespace node diff --git a/src/js_stream.h b/src/js_stream.h new file mode 100644 index 0000000000..6a2d3bfb4f --- /dev/null +++ b/src/js_stream.h @@ -0,0 +1,20 @@ +#ifndef SRC_JS_STREAM_H_ +#define SRC_JS_STREAM_H_ + +#include "async-wrap.h" +#include "env.h" +#include "stream_base.h" +#include "v8.h" + +namespace node { + +class JSStream : public StreamBase { + public: + static void Initialize(v8::Handle target, + v8::Handle unused, + v8::Handle context); +}; + +} // namespace node + +#endif // SRC_JS_STREAM_H_ diff --git a/src/node_crypto.cc b/src/node_crypto.cc index 230231080b..912320771e 100644 --- a/src/node_crypto.cc +++ b/src/node_crypto.cc @@ -3,7 +3,7 @@ #include "node_crypto.h" #include "node_crypto_bio.h" #include "node_crypto_groups.h" -#include "tls_wrap.h" // TLSCallbacks +#include "tls_wrap.h" // TLSWrap #include "async-wrap.h" #include "async-wrap-inl.h" @@ -98,28 +98,28 @@ const char* const root_certs[] = { X509_STORE* root_cert_store; // Just to generate static methods -template class SSLWrap; -template void SSLWrap::AddMethods(Environment* env, - Handle t); -template void SSLWrap::InitNPN(SecureContext* sc); -template SSL_SESSION* SSLWrap::GetSessionCallback( +template class SSLWrap; +template void SSLWrap::AddMethods(Environment* env, + Handle t); +template void SSLWrap::InitNPN(SecureContext* sc); +template SSL_SESSION* SSLWrap::GetSessionCallback( SSL* s, unsigned char* key, int len, int* copy); -template int SSLWrap::NewSessionCallback(SSL* s, - SSL_SESSION* sess); -template void SSLWrap::OnClientHello( +template int SSLWrap::NewSessionCallback(SSL* s, + SSL_SESSION* sess); +template void SSLWrap::OnClientHello( void* arg, const ClientHelloParser::ClientHello& hello); #ifdef OPENSSL_NPN_NEGOTIATED -template int SSLWrap::AdvertiseNextProtoCallback( +template int SSLWrap::AdvertiseNextProtoCallback( SSL* s, const unsigned char** data, unsigned int* len, void* arg); -template int SSLWrap::SelectNextProtoCallback( +template int SSLWrap::SelectNextProtoCallback( SSL* s, unsigned char** out, unsigned char* outlen, @@ -127,7 +127,7 @@ template int SSLWrap::SelectNextProtoCallback( unsigned int inlen, void* arg); #endif -template int SSLWrap::TLSExtStatusCallback(SSL* s, void* arg); +template int SSLWrap::TLSExtStatusCallback(SSL* s, void* arg); static void crypto_threadid_cb(CRYPTO_THREADID* tid) { @@ -973,7 +973,7 @@ void SSLWrap::AddMethods(Environment* env, Handle t) { env->SetProtoMethod(t, "getCurrentCipher", GetCurrentCipher); env->SetProtoMethod(t, "endParser", EndParser); env->SetProtoMethod(t, "renegotiate", Renegotiate); - env->SetProtoMethod(t, "shutdown", Shutdown); + env->SetProtoMethod(t, "shutdownSSL", Shutdown); env->SetProtoMethod(t, "getTLSTicket", GetTLSTicket); env->SetProtoMethod(t, "newSessionDone", NewSessionDone); env->SetProtoMethod(t, "setOCSPResponse", SetOCSPResponse); diff --git a/src/node_wrap.h b/src/node_wrap.h index 80d679606e..ddd7bd16e0 100644 --- a/src/node_wrap.h +++ b/src/node_wrap.h @@ -14,7 +14,7 @@ namespace node { -#define WITH_GENERIC_STREAM(env, obj, BODY) \ +#define WITH_GENERIC_UV_STREAM(env, obj, BODY, ELSE) \ do { \ if (env->tcp_constructor_template().IsEmpty() == false && \ env->tcp_constructor_template()->HasInstance(obj)) { \ @@ -28,16 +28,29 @@ namespace node { env->pipe_constructor_template()->HasInstance(obj)) { \ PipeWrap* const wrap = Unwrap(obj); \ BODY \ + } else { \ + ELSE \ } \ } while (0) +#define WITH_GENERIC_STREAM(env, obj, BODY) \ + do { \ + WITH_GENERIC_UV_STREAM(env, obj, BODY, { \ + if (env->tls_wrap_constructor_template().IsEmpty() == false && \ + env->tls_wrap_constructor_template()->HasInstance(obj)) { \ + TLSWrap* const wrap = Unwrap(obj); \ + BODY \ + } \ + }); \ + } while (0) + inline uv_stream_t* HandleToStream(Environment* env, v8::Local obj) { v8::HandleScope scope(env->isolate()); - WITH_GENERIC_STREAM(env, obj, { + WITH_GENERIC_UV_STREAM(env, obj, { return reinterpret_cast(wrap->UVHandle()); - }); + }, {}); return nullptr; } diff --git a/src/pipe_wrap.cc b/src/pipe_wrap.cc index 55d5f84ff4..08fed68741 100644 --- a/src/pipe_wrap.cc +++ b/src/pipe_wrap.cc @@ -77,30 +77,11 @@ void PipeWrap::Initialize(Handle target, t->SetClassName(FIXED_ONE_BYTE_STRING(env->isolate(), "Pipe")); t->InstanceTemplate()->SetInternalFieldCount(1); - enum PropertyAttribute attributes = - static_cast(v8::ReadOnly | v8::DontDelete); - t->InstanceTemplate()->SetAccessor(env->fd_string(), - StreamWrap::GetFD, - nullptr, - Handle(), - v8::DEFAULT, - attributes); - env->SetProtoMethod(t, "close", HandleWrap::Close); env->SetProtoMethod(t, "unref", HandleWrap::Unref); env->SetProtoMethod(t, "ref", HandleWrap::Ref); - env->SetProtoMethod(t, "setBlocking", StreamWrap::SetBlocking); - - env->SetProtoMethod(t, "readStart", StreamWrap::ReadStart); - env->SetProtoMethod(t, "readStop", StreamWrap::ReadStop); - env->SetProtoMethod(t, "shutdown", StreamWrap::Shutdown); - - env->SetProtoMethod(t, "writeBuffer", StreamWrap::WriteBuffer); - env->SetProtoMethod(t, "writeAsciiString", StreamWrap::WriteAsciiString); - env->SetProtoMethod(t, "writeUtf8String", StreamWrap::WriteUtf8String); - env->SetProtoMethod(t, "writeUcs2String", StreamWrap::WriteUcs2String); - env->SetProtoMethod(t, "writeBinaryString", StreamWrap::WriteBinaryString); + StreamWrap::AddMethods(env, t); env->SetProtoMethod(t, "bind", Bind); env->SetProtoMethod(t, "listen", Listen); diff --git a/src/stream_base.cc b/src/stream_base.cc new file mode 100644 index 0000000000..0a1324bb58 --- /dev/null +++ b/src/stream_base.cc @@ -0,0 +1,495 @@ +#include "stream_base.h" +#include "stream_wrap.h" + +#include "node.h" +#include "node_buffer.h" +#include "env.h" +#include "env-inl.h" +#include "string_bytes.h" +#include "tls_wrap.h" +#include "util.h" +#include "util-inl.h" +#include "v8.h" + +#include // INT_MAX + +namespace node { + +using v8::Array; +using v8::Context; +using v8::FunctionCallbackInfo; +using v8::FunctionTemplate; +using v8::Handle; +using v8::HandleScope; +using v8::Integer; +using v8::Local; +using v8::Number; +using v8::Object; +using v8::PropertyAttribute; +using v8::PropertyCallbackInfo; +using v8::String; +using v8::Value; + +template void StreamBase::AddMethods(Environment* env, + Handle t); +template void StreamBase::AddMethods(Environment* env, + Handle t); + + +template +void StreamBase::AddMethods(Environment* env, Handle t) { + HandleScope scope(env->isolate()); + + enum PropertyAttribute attributes = + static_cast(v8::ReadOnly | v8::DontDelete); + t->InstanceTemplate()->SetAccessor(env->fd_string(), + GetFD, + nullptr, + Handle(), + v8::DEFAULT, + attributes); + + env->SetProtoMethod(t, "readStart", JSMethod); + env->SetProtoMethod(t, "readStop", JSMethod); + env->SetProtoMethod(t, "shutdown", JSMethod); + env->SetProtoMethod(t, "writev", JSMethod); + env->SetProtoMethod(t, + "writeBuffer", + JSMethod); + env->SetProtoMethod(t, + "writeAsciiString", + JSMethod >); + env->SetProtoMethod(t, + "writeUtf8String", + JSMethod >); + env->SetProtoMethod(t, + "writeUcs2String", + JSMethod >); + env->SetProtoMethod(t, + "writeBinaryString", + JSMethod >); +} + + +template +void StreamBase::GetFD(Local key, + const PropertyCallbackInfo& args) { + StreamBase* wrap = Unwrap(args.Holder()); + + if (!wrap->IsAlive()) + return args.GetReturnValue().Set(UV_EINVAL); + + args.GetReturnValue().Set(wrap->GetFD()); +} + + +template & args)> +void StreamBase::JSMethod(const FunctionCallbackInfo& args) { + StreamBase* wrap = Unwrap(args.Holder()); + + if (!wrap->IsAlive()) + return args.GetReturnValue().Set(UV_EINVAL); + + args.GetReturnValue().Set((wrap->*Method)(args)); +} + + +int StreamBase::ReadStart(const FunctionCallbackInfo& args) { + return ReadStart(); +} + + +int StreamBase::ReadStop(const FunctionCallbackInfo& args) { + return ReadStop(); +} + + +int StreamBase::Shutdown(const FunctionCallbackInfo& args) { + Environment* env = Environment::GetCurrent(args); + + CHECK(args[0]->IsObject()); + Local req_wrap_obj = args[0].As(); + + ShutdownWrap* req_wrap = new ShutdownWrap(env, + req_wrap_obj, + this, + AfterShutdown); + + int err = DoShutdown(req_wrap); + req_wrap->Dispatched(); + if (err) + delete req_wrap; + return err; +} + + +void StreamBase::AfterShutdown(ShutdownWrap* req_wrap, int status) { + StreamBase* wrap = req_wrap->wrap(); + Environment* env = req_wrap->env(); + + // The wrap and request objects should still be there. + CHECK_EQ(req_wrap->persistent().IsEmpty(), false); + CHECK_EQ(wrap->GetAsyncWrap()->persistent().IsEmpty(), false); + + HandleScope handle_scope(env->isolate()); + Context::Scope context_scope(env->context()); + + Local req_wrap_obj = req_wrap->object(); + Local argv[3] = { + Integer::New(env->isolate(), status), + wrap->GetAsyncWrap()->object(), + req_wrap_obj + }; + + if (req_wrap->object()->Has(env->oncomplete_string())) + req_wrap->MakeCallback(env->oncomplete_string(), ARRAY_SIZE(argv), argv); + + delete req_wrap; +} + + +int StreamBase::Writev(const FunctionCallbackInfo& args) { + Environment* env = Environment::GetCurrent(args); + + CHECK(args[0]->IsObject()); + CHECK(args[1]->IsArray()); + + Local req_wrap_obj = args[0].As(); + Local chunks = args[1].As(); + + size_t count = chunks->Length() >> 1; + + uv_buf_t bufs_[16]; + uv_buf_t* bufs = bufs_; + + // Determine storage size first + size_t storage_size = 0; + for (size_t i = 0; i < count; i++) { + Handle chunk = chunks->Get(i * 2); + + if (Buffer::HasInstance(chunk)) + continue; + // Buffer chunk, no additional storage required + + // String chunk + Handle string = chunk->ToString(env->isolate()); + enum encoding encoding = ParseEncoding(env->isolate(), + chunks->Get(i * 2 + 1)); + size_t chunk_size; + if (encoding == UTF8 && string->Length() > 65535) + chunk_size = StringBytes::Size(env->isolate(), string, encoding); + else + chunk_size = StringBytes::StorageSize(env->isolate(), string, encoding); + + storage_size += chunk_size + 15; + } + + if (storage_size > INT_MAX) + return UV_ENOBUFS; + + if (ARRAY_SIZE(bufs_) < count) + bufs = new uv_buf_t[count]; + + storage_size += sizeof(WriteWrap); + char* storage = new char[storage_size]; + WriteWrap* req_wrap = + new(storage) WriteWrap(env, req_wrap_obj, this, AfterWrite); + + uint32_t bytes = 0; + size_t offset = sizeof(WriteWrap); + for (size_t i = 0; i < count; i++) { + Handle chunk = chunks->Get(i * 2); + + // Write buffer + if (Buffer::HasInstance(chunk)) { + bufs[i].base = Buffer::Data(chunk); + bufs[i].len = Buffer::Length(chunk); + bytes += bufs[i].len; + continue; + } + + // Write string + offset = ROUND_UP(offset, 16); + CHECK_LT(offset, storage_size); + char* str_storage = storage + offset; + size_t str_size = storage_size - offset; + + Handle string = chunk->ToString(env->isolate()); + enum encoding encoding = ParseEncoding(env->isolate(), + chunks->Get(i * 2 + 1)); + str_size = StringBytes::Write(env->isolate(), + str_storage, + str_size, + string, + encoding); + bufs[i].base = str_storage; + bufs[i].len = str_size; + offset += str_size; + bytes += str_size; + } + + int err = DoWrite(req_wrap, bufs, count, nullptr); + + // Deallocate space + if (bufs != bufs_) + delete[] bufs; + + req_wrap->Dispatched(); + req_wrap->object()->Set(env->async(), True(env->isolate())); + req_wrap->object()->Set(env->bytes_string(), + Number::New(env->isolate(), bytes)); + const char* msg = Error(); + if (msg != nullptr) { + req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg)); + ClearError(); + } + + if (err) { + req_wrap->~WriteWrap(); + delete[] storage; + } + + return err; +} + + + + +int StreamBase::WriteBuffer(const FunctionCallbackInfo& args) { + CHECK(args[0]->IsObject()); + CHECK(Buffer::HasInstance(args[1])); + Environment* env = Environment::GetCurrent(args); + + Local req_wrap_obj = args[0].As(); + const char* data = Buffer::Data(args[1]); + size_t length = Buffer::Length(args[1]); + + char* storage; + WriteWrap* req_wrap; + uv_buf_t buf; + buf.base = const_cast(data); + buf.len = length; + + // Try writing immediately without allocation + uv_buf_t* bufs = &buf; + size_t count = 1; + int err = DoTryWrite(&bufs, &count); + if (err != 0) + goto done; + if (count == 0) + goto done; + CHECK_EQ(count, 1); + + // Allocate, or write rest + storage = new char[sizeof(WriteWrap)]; + req_wrap = new(storage) WriteWrap(env, req_wrap_obj, this, AfterWrite); + + err = DoWrite(req_wrap, bufs, count, nullptr); + req_wrap->Dispatched(); + req_wrap_obj->Set(env->async(), True(env->isolate())); + + if (err) { + req_wrap->~WriteWrap(); + delete[] storage; + } + + done: + const char* msg = Error(); + if (msg != nullptr) { + req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg)); + ClearError(); + } + req_wrap_obj->Set(env->bytes_string(), + Integer::NewFromUnsigned(env->isolate(), length)); + return err; +} + + +template +int StreamBase::WriteString(const FunctionCallbackInfo& args) { + Environment* env = Environment::GetCurrent(args); + CHECK(args[0]->IsObject()); + CHECK(args[1]->IsString()); + + Local req_wrap_obj = args[0].As(); + Local string = args[1].As(); + Local send_handle_obj; + if (args[2]->IsObject()) + send_handle_obj = args[2].As(); + + int err; + + // Compute the size of the storage that the string will be flattened into. + // For UTF8 strings that are very long, go ahead and take the hit for + // computing their actual size, rather than tripling the storage. + size_t storage_size; + if (enc == UTF8 && string->Length() > 65535) + storage_size = StringBytes::Size(env->isolate(), string, enc); + else + storage_size = StringBytes::StorageSize(env->isolate(), string, enc); + + if (storage_size > INT_MAX) + return UV_ENOBUFS; + + // Try writing immediately if write size isn't too big + char* storage; + WriteWrap* req_wrap; + char* data; + char stack_storage[16384]; // 16kb + size_t data_size; + uv_buf_t buf; + + bool try_write = storage_size + 15 <= sizeof(stack_storage) && + (!IsIPCPipe() || send_handle_obj.IsEmpty()); + if (try_write) { + data_size = StringBytes::Write(env->isolate(), + stack_storage, + storage_size, + string, + enc); + buf = uv_buf_init(stack_storage, data_size); + + uv_buf_t* bufs = &buf; + size_t count = 1; + err = DoTryWrite(&bufs, &count); + + // Failure + if (err != 0) + goto done; + + // Success + if (count == 0) + goto done; + + // Partial write + CHECK_EQ(count, 1); + } + + storage = new char[sizeof(WriteWrap) + storage_size + 15]; + req_wrap = new(storage) WriteWrap(env, req_wrap_obj, this, AfterWrite); + + data = reinterpret_cast(ROUND_UP( + reinterpret_cast(storage) + sizeof(WriteWrap), 16)); + + if (try_write) { + // Copy partial data + memcpy(data, buf.base, buf.len); + data_size = buf.len; + } else { + // Write it + data_size = StringBytes::Write(env->isolate(), + data, + storage_size, + string, + enc); + } + + CHECK_LE(data_size, storage_size); + + buf = uv_buf_init(data, data_size); + + if (!IsIPCPipe()) { + err = DoWrite(req_wrap, &buf, 1, nullptr); + } else { + uv_handle_t* send_handle = nullptr; + + if (!send_handle_obj.IsEmpty()) { + HandleWrap* wrap = Unwrap(send_handle_obj); + send_handle = wrap->GetHandle(); + // Reference StreamWrap instance to prevent it from being garbage + // collected before `AfterWrite` is called. + CHECK_EQ(false, req_wrap->persistent().IsEmpty()); + req_wrap->object()->Set(env->handle_string(), send_handle_obj); + } + + err = DoWrite( + req_wrap, + &buf, + 1, + reinterpret_cast(send_handle)); + } + + req_wrap->Dispatched(); + req_wrap->object()->Set(env->async(), True(env->isolate())); + + if (err) { + req_wrap->~WriteWrap(); + delete[] storage; + } + + done: + const char* msg = Error(); + if (msg != nullptr) { + req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg)); + ClearError(); + } + req_wrap_obj->Set(env->bytes_string(), + Integer::NewFromUnsigned(env->isolate(), data_size)); + return err; +} + + +void StreamBase::AfterWrite(WriteWrap* req_wrap, int status) { + StreamBase* wrap = req_wrap->wrap(); + Environment* env = req_wrap->env(); + + HandleScope handle_scope(env->isolate()); + Context::Scope context_scope(env->context()); + + // The wrap and request objects should still be there. + CHECK_EQ(req_wrap->persistent().IsEmpty(), false); + CHECK_EQ(wrap->GetAsyncWrap()->persistent().IsEmpty(), false); + + // Unref handle property + Local req_wrap_obj = req_wrap->object(); + req_wrap_obj->Delete(env->handle_string()); + wrap->OnAfterWrite(req_wrap); + + Local argv[] = { + Integer::New(env->isolate(), status), + wrap->GetAsyncWrap()->object(), + req_wrap_obj, + Undefined(env->isolate()) + }; + + const char* msg = wrap->Error(); + if (msg != nullptr) { + argv[3] = OneByteString(env->isolate(), msg); + wrap->ClearError(); + } + + if (req_wrap->object()->Has(env->oncomplete_string())) + req_wrap->MakeCallback(env->oncomplete_string(), ARRAY_SIZE(argv), argv); + + req_wrap->~WriteWrap(); + delete[] reinterpret_cast(req_wrap); +} + + +void StreamBase::EmitData(ssize_t nread, + Local buf, + Local handle) { + Environment* env = env_; + + Local argv[] = { + Integer::New(env->isolate(), nread), + buf, + handle + }; + + if (argv[1].IsEmpty()) + argv[1] = Undefined(env->isolate()); + + if (argv[2].IsEmpty()) + argv[2] = Undefined(env->isolate()); + + GetAsyncWrap()->MakeCallback(env->onread_string(), ARRAY_SIZE(argv), argv); +} + + +AsyncWrap* StreamBase::GetAsyncWrap() { + return nullptr; +} + +} // namespace node diff --git a/src/stream_base.h b/src/stream_base.h new file mode 100644 index 0000000000..d6b3a555b0 --- /dev/null +++ b/src/stream_base.h @@ -0,0 +1,223 @@ +#ifndef SRC_STREAM_BASE_H_ +#define SRC_STREAM_BASE_H_ + +#include "env.h" +#include "async-wrap.h" +#include "req-wrap.h" +#include "req-wrap-inl.h" +#include "node.h" + +#include "v8.h" + +namespace node { + +// Forward declarations +class StreamBase; + +template +class StreamReq { + public: + typedef void (*DoneCb)(Req* req, int status); + + explicit StreamReq(DoneCb cb) : cb_(cb) { + } + + inline void Done(int status) { + cb_(static_cast(this), status); + } + + private: + DoneCb cb_; +}; + +class ShutdownWrap : public ReqWrap, + public StreamReq { + public: + ShutdownWrap(Environment* env, + v8::Local req_wrap_obj, + StreamBase* wrap, + DoneCb cb) + : ReqWrap(env, req_wrap_obj, AsyncWrap::PROVIDER_SHUTDOWNWRAP), + StreamReq(cb), + wrap_(wrap) { + Wrap(req_wrap_obj, this); + } + + static void NewShutdownWrap(const v8::FunctionCallbackInfo& args) { + CHECK(args.IsConstructCall()); + } + + inline StreamBase* wrap() const { return wrap_; } + + private: + StreamBase* const wrap_; +}; + +class WriteWrap: public ReqWrap, + public StreamReq { + public: + WriteWrap(Environment* env, + v8::Local obj, + StreamBase* wrap, + DoneCb cb) + : ReqWrap(env, obj, AsyncWrap::PROVIDER_WRITEWRAP), + StreamReq(cb), + wrap_(wrap) { + Wrap(obj, this); + } + + void* operator new(size_t size, char* storage) { return storage; } + + // This is just to keep the compiler happy. It should never be called, since + // we don't use exceptions in node. + void operator delete(void* ptr, char* storage) { UNREACHABLE(); } + + inline StreamBase* wrap() const { + return wrap_; + } + + static void NewWriteWrap(const v8::FunctionCallbackInfo& args) { + CHECK(args.IsConstructCall()); + } + + private: + // People should not be using the non-placement new and delete operator on a + // WriteWrap. Ensure this never happens. + void* operator new(size_t size) { UNREACHABLE(); } + void operator delete(void* ptr) { UNREACHABLE(); } + + StreamBase* const wrap_; +}; + +class StreamResource { + public: + typedef void (*AfterWriteCb)(WriteWrap* w, void* ctx); + typedef void (*AllocCb)(size_t size, uv_buf_t* buf, void* ctx); + typedef void (*ReadCb)(ssize_t nread, + const uv_buf_t* buf, + uv_handle_type pending, + void* ctx); + + StreamResource() : after_write_cb_(nullptr), + alloc_cb_(nullptr), + read_cb_(nullptr) { + } + + virtual ~StreamResource() = default; + + virtual int DoShutdown(ShutdownWrap* req_wrap) = 0; + virtual int DoTryWrite(uv_buf_t** bufs, size_t* count) = 0; + virtual int DoWrite(WriteWrap* w, + uv_buf_t* bufs, + size_t count, + uv_stream_t* send_handle) = 0; + virtual const char* Error() const = 0; + virtual void ClearError() = 0; + + // Events + inline void OnAfterWrite(WriteWrap* w) { + if (after_write_cb_ != nullptr) + after_write_cb_(w, after_write_ctx_); + } + + inline void OnAlloc(size_t size, uv_buf_t* buf) { + if (alloc_cb_ != nullptr) + alloc_cb_(size, buf, alloc_ctx_); + } + + inline void OnRead(size_t nread, + const uv_buf_t* buf, + uv_handle_type pending) { + if (read_cb_ != nullptr) + read_cb_(nread, buf, pending, read_ctx_); + } + + inline void set_after_write_cb(AfterWriteCb cb, void* ctx) { + after_write_ctx_ = ctx; + after_write_cb_ = cb; + } + + inline void set_alloc_cb(AllocCb cb, void* ctx) { + alloc_cb_ = cb; + alloc_ctx_ = ctx; + } + + inline void set_read_cb(ReadCb cb, void* ctx) { + read_cb_ = cb; + read_ctx_ = ctx; + } + + private: + AfterWriteCb after_write_cb_; + void* after_write_ctx_; + AllocCb alloc_cb_; + void* alloc_ctx_; + ReadCb read_cb_; + void* read_ctx_; +}; + +class StreamBase : public StreamResource { + public: + template + static void AddMethods(Environment* env, + v8::Handle target); + + virtual void* Cast() = 0; + virtual bool IsAlive() const = 0; + virtual bool IsClosing() const = 0; + virtual bool IsIPCPipe() const = 0; + virtual int GetFD() const = 0; + + virtual int ReadStart() = 0; + virtual int ReadStop() = 0; + + inline void Consume() { + CHECK_EQ(consumed_, false); + consumed_ = true; + } + + template + inline Outer* Cast() { return static_cast(Cast()); } + + void EmitData(ssize_t nread, + v8::Local buf, + v8::Local handle); + + protected: + explicit StreamBase(Environment* env) : env_(env), consumed_(false) { + } + + virtual ~StreamBase() = default; + + virtual AsyncWrap* GetAsyncWrap() = 0; + + // Libuv callbacks + static void AfterShutdown(ShutdownWrap* req, int status); + static void AfterWrite(WriteWrap* req, int status); + + // JS Methods + int ReadStart(const v8::FunctionCallbackInfo& args); + int ReadStop(const v8::FunctionCallbackInfo& args); + int Shutdown(const v8::FunctionCallbackInfo& args); + int Writev(const v8::FunctionCallbackInfo& args); + int WriteBuffer(const v8::FunctionCallbackInfo& args); + template + int WriteString(const v8::FunctionCallbackInfo& args); + + template + static void GetFD(v8::Local, + const v8::PropertyCallbackInfo&); + + template & args)> + static void JSMethod(const v8::FunctionCallbackInfo& args); + + private: + Environment* env_; + bool consumed_; +}; + +} // namespace node + +#endif // SRC_STREAM_BASE_H_ diff --git a/src/stream_wrap.cc b/src/stream_wrap.cc index a9f89e47bb..3b50f638eb 100644 --- a/src/stream_wrap.cc +++ b/src/stream_wrap.cc @@ -38,8 +38,8 @@ using v8::Value; void StreamWrap::Initialize(Handle target, - Handle unused, - Handle context) { + Handle unused, + Handle context) { Environment* env = Environment::GetCurrent(context); Local sw = @@ -55,6 +55,7 @@ void StreamWrap::Initialize(Handle target, ww->SetClassName(FIXED_ONE_BYTE_STRING(env->isolate(), "WriteWrap")); target->Set(FIXED_ONE_BYTE_STRING(env->isolate(), "WriteWrap"), ww->GetFunction()); + env->set_write_wrap_constructor_function(ww->GetFunction()); } @@ -68,23 +69,53 @@ StreamWrap::StreamWrap(Environment* env, reinterpret_cast(stream), provider, parent), - stream_(stream), - default_callbacks_(this), - callbacks_(&default_callbacks_), - callbacks_gc_(false) { + StreamBase(env), + stream_(stream) { + set_after_write_cb(OnAfterWriteImpl, this); + set_alloc_cb(OnAllocImpl, this); + set_read_cb(OnReadImpl, this); } -void StreamWrap::GetFD(Local, const PropertyCallbackInfo& args) { -#if !defined(_WIN32) - HandleScope scope(args.GetIsolate()); - StreamWrap* wrap = Unwrap(args.Holder()); +void StreamWrap::AddMethods(Environment* env, + v8::Handle target) { + env->SetProtoMethod(target, "setBlocking", SetBlocking); + StreamBase::AddMethods(env, target); +} + + +int StreamWrap::GetFD() const { int fd = -1; - if (wrap != nullptr && wrap->stream() != nullptr) { - fd = wrap->stream()->io_watcher.fd; - } - args.GetReturnValue().Set(fd); +#if !defined(_WIN32) + if (stream() != nullptr) + fd = stream()->io_watcher.fd; #endif + return fd; +} + + +bool StreamWrap::IsAlive() const { + return HandleWrap::IsAlive(this); +} + + +bool StreamWrap::IsClosing() const { + return uv_is_closing(reinterpret_cast(stream())); +} + + +void* StreamWrap::Cast() { + return reinterpret_cast(this); +} + + +AsyncWrap* StreamWrap::GetAsyncWrap() { + return static_cast(this); +} + + +bool StreamWrap::IsIPCPipe() const { + return is_named_pipe_ipc(); } @@ -96,22 +127,13 @@ void StreamWrap::UpdateWriteQueueSize() { } -void StreamWrap::ReadStart(const FunctionCallbackInfo& args) { - StreamWrap* wrap = Unwrap(args.Holder()); - if (!IsAlive(wrap)) - return args.GetReturnValue().Set(UV_EINVAL); - - int err = uv_read_start(wrap->stream(), OnAlloc, OnRead); - args.GetReturnValue().Set(err); +int StreamWrap::ReadStart() { + return uv_read_start(stream(), OnAlloc, OnRead); } -void StreamWrap::ReadStop(const FunctionCallbackInfo& args) { - StreamWrap* wrap = Unwrap(args.Holder()); - if (!IsAlive(wrap)) - return args.GetReturnValue().Set(UV_EINVAL); - int err = uv_read_stop(wrap->stream()); - args.GetReturnValue().Set(err); +int StreamWrap::ReadStop() { + return uv_read_stop(stream()); } @@ -120,14 +142,25 @@ void StreamWrap::OnAlloc(uv_handle_t* handle, uv_buf_t* buf) { StreamWrap* wrap = static_cast(handle->data); CHECK_EQ(wrap->stream(), reinterpret_cast(handle)); - wrap->callbacks()->DoAlloc(handle, suggested_size, buf); + + return static_cast(wrap)->OnAlloc(suggested_size, buf); +} + + +void StreamWrap::OnAllocImpl(size_t size, uv_buf_t* buf, void* ctx) { + buf->base = static_cast(malloc(size)); + buf->len = size; + + if (buf->base == nullptr && size > 0) { + FatalError( + "node::StreamWrap::DoAlloc(size_t, uv_buf_t*, void*)", + "Out Of Memory"); + } } template -static Local AcceptHandle(Environment* env, - uv_stream_t* pipe, - AsyncWrap* parent) { +static Local AcceptHandle(Environment* env, StreamWrap* parent) { EscapableHandleScope scope(env->isolate()); Local wrap_obj; UVType* handle; @@ -139,13 +172,54 @@ static Local AcceptHandle(Environment* env, WrapType* wrap = Unwrap(wrap_obj); handle = wrap->UVHandle(); - if (uv_accept(pipe, reinterpret_cast(handle))) + if (uv_accept(parent->stream(), reinterpret_cast(handle))) abort(); return scope.Escape(wrap_obj); } +void StreamWrap::OnReadImpl(ssize_t nread, + const uv_buf_t* buf, + uv_handle_type pending, + void* ctx) { + StreamWrap* wrap = static_cast(ctx); + Environment* env = wrap->env(); + HandleScope handle_scope(env->isolate()); + Context::Scope context_scope(env->context()); + + Local pending_obj; + + if (nread < 0) { + if (buf->base != nullptr) + free(buf->base); + wrap->EmitData(nread, Local(), pending_obj); + return; + } + + if (nread == 0) { + if (buf->base != nullptr) + free(buf->base); + return; + } + + char* base = static_cast(realloc(buf->base, nread)); + CHECK_LE(static_cast(nread), buf->len); + + if (pending == UV_TCP) { + pending_obj = AcceptHandle(env, wrap); + } else if (pending == UV_NAMED_PIPE) { + pending_obj = AcceptHandle(env, wrap); + } else if (pending == UV_UDP) { + pending_obj = AcceptHandle(env, wrap); + } else { + CHECK_EQ(pending, UV_UNKNOWN_HANDLE); + } + + wrap->EmitData(nread, Buffer::Use(env, base, nread), pending_obj); +} + + void StreamWrap::OnReadCommon(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf, @@ -164,7 +238,7 @@ void StreamWrap::OnReadCommon(uv_stream_t* handle, } } - wrap->callbacks()->DoRead(handle, nread, buf, pending); + static_cast(wrap)->OnRead(nread, buf, pending); } @@ -183,437 +257,26 @@ void StreamWrap::OnRead(uv_stream_t* handle, } -size_t StreamWrap::WriteBuffer(Handle val, uv_buf_t* buf) { - CHECK(Buffer::HasInstance(val)); - - // Simple non-writev case - buf->base = Buffer::Data(val); - buf->len = Buffer::Length(val); - - return buf->len; -} - - -void StreamWrap::WriteBuffer(const FunctionCallbackInfo& args) { - Environment* env = Environment::GetCurrent(args); - - StreamWrap* wrap = Unwrap(args.Holder()); - if (!IsAlive(wrap)) - return args.GetReturnValue().Set(UV_EINVAL); - - CHECK(args[0]->IsObject()); - CHECK(Buffer::HasInstance(args[1])); - - Local req_wrap_obj = args[0].As(); - Local buf_obj = args[1].As(); - - size_t length = Buffer::Length(buf_obj); - - char* storage; - WriteWrap* req_wrap; - uv_buf_t buf; - WriteBuffer(buf_obj, &buf); - - // Try writing immediately without allocation - uv_buf_t* bufs = &buf; - size_t count = 1; - int err = wrap->callbacks()->TryWrite(&bufs, &count); - if (err != 0) - goto done; - if (count == 0) - goto done; - CHECK_EQ(count, 1); - - // Allocate, or write rest - storage = new char[sizeof(WriteWrap)]; - req_wrap = new(storage) WriteWrap(env, req_wrap_obj, wrap); - - err = wrap->callbacks()->DoWrite(req_wrap, - bufs, - count, - nullptr, - StreamWrap::AfterWrite); - req_wrap->Dispatched(); - req_wrap_obj->Set(env->async(), True(env->isolate())); - - if (err) { - req_wrap->~WriteWrap(); - delete[] storage; - } - - done: - const char* msg = wrap->callbacks()->Error(); - if (msg != nullptr) { - req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg)); - wrap->callbacks()->ClearError(); - } - req_wrap_obj->Set(env->bytes_string(), - Integer::NewFromUnsigned(env->isolate(), length)); - args.GetReturnValue().Set(err); -} - - -template -void StreamWrap::WriteStringImpl(const FunctionCallbackInfo& args) { - Environment* env = Environment::GetCurrent(args); - int err; - - StreamWrap* wrap = Unwrap(args.Holder()); - if (!IsAlive(wrap)) - return args.GetReturnValue().Set(UV_EINVAL); - - CHECK(args[0]->IsObject()); - CHECK(args[1]->IsString()); - - Local req_wrap_obj = args[0].As(); - Local string = args[1].As(); - - // Compute the size of the storage that the string will be flattened into. - // For UTF8 strings that are very long, go ahead and take the hit for - // computing their actual size, rather than tripling the storage. - size_t storage_size; - if (encoding == UTF8 && string->Length() > 65535) - storage_size = StringBytes::Size(env->isolate(), string, encoding); - else - storage_size = StringBytes::StorageSize(env->isolate(), string, encoding); - - if (storage_size > INT_MAX) { - args.GetReturnValue().Set(UV_ENOBUFS); - return; - } - - // Try writing immediately if write size isn't too big - char* storage; - WriteWrap* req_wrap; - char* data; - char stack_storage[16384]; // 16kb - size_t data_size; - uv_buf_t buf; - - bool try_write = storage_size + 15 <= sizeof(stack_storage) && - (!wrap->is_named_pipe_ipc() || !args[2]->IsObject()); - if (try_write) { - data_size = StringBytes::Write(env->isolate(), - stack_storage, - storage_size, - string, - encoding); - buf = uv_buf_init(stack_storage, data_size); - - uv_buf_t* bufs = &buf; - size_t count = 1; - err = wrap->callbacks()->TryWrite(&bufs, &count); - - // Failure - if (err != 0) - goto done; - - // Success - if (count == 0) - goto done; - - // Partial write - CHECK_EQ(count, 1); - } - - storage = new char[sizeof(WriteWrap) + storage_size + 15]; - req_wrap = new(storage) WriteWrap(env, req_wrap_obj, wrap); - - data = reinterpret_cast(ROUND_UP( - reinterpret_cast(storage) + sizeof(WriteWrap), 16)); - - if (try_write) { - // Copy partial data - memcpy(data, buf.base, buf.len); - data_size = buf.len; - } else { - // Write it - data_size = StringBytes::Write(env->isolate(), - data, - storage_size, - string, - encoding); - } - - CHECK_LE(data_size, storage_size); - - buf = uv_buf_init(data, data_size); - - if (!wrap->is_named_pipe_ipc()) { - err = wrap->callbacks()->DoWrite(req_wrap, - &buf, - 1, - nullptr, - StreamWrap::AfterWrite); - } else { - uv_handle_t* send_handle = nullptr; - - if (args[2]->IsObject()) { - Local send_handle_obj = args[2].As(); - HandleWrap* wrap = Unwrap(send_handle_obj); - send_handle = wrap->GetHandle(); - // Reference StreamWrap instance to prevent it from being garbage - // collected before `AfterWrite` is called. - CHECK_EQ(false, req_wrap->persistent().IsEmpty()); - req_wrap->object()->Set(env->handle_string(), send_handle_obj); - } - - err = wrap->callbacks()->DoWrite( - req_wrap, - &buf, - 1, - reinterpret_cast(send_handle), - StreamWrap::AfterWrite); - } - - req_wrap->Dispatched(); - req_wrap->object()->Set(env->async(), True(env->isolate())); - - if (err) { - req_wrap->~WriteWrap(); - delete[] storage; - } - - done: - const char* msg = wrap->callbacks()->Error(); - if (msg != nullptr) { - req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg)); - wrap->callbacks()->ClearError(); - } - req_wrap_obj->Set(env->bytes_string(), - Integer::NewFromUnsigned(env->isolate(), data_size)); - args.GetReturnValue().Set(err); -} - - -void StreamWrap::Writev(const FunctionCallbackInfo& args) { - Environment* env = Environment::GetCurrent(args); - - StreamWrap* wrap = Unwrap(args.Holder()); - if (!IsAlive(wrap)) - return args.GetReturnValue().Set(UV_EINVAL); - - CHECK(args[0]->IsObject()); - CHECK(args[1]->IsArray()); - - Local req_wrap_obj = args[0].As(); - Local chunks = args[1].As(); - size_t count = chunks->Length() >> 1; - - uv_buf_t bufs_[16]; - uv_buf_t* bufs = bufs_; - - // Determine storage size first - size_t storage_size = 0; - for (size_t i = 0; i < count; i++) { - Handle chunk = chunks->Get(i * 2); - - if (Buffer::HasInstance(chunk)) - continue; - // Buffer chunk, no additional storage required - - // String chunk - Handle string = chunk->ToString(env->isolate()); - enum encoding encoding = ParseEncoding(env->isolate(), - chunks->Get(i * 2 + 1)); - size_t chunk_size; - if (encoding == UTF8 && string->Length() > 65535) - chunk_size = StringBytes::Size(env->isolate(), string, encoding); - else - chunk_size = StringBytes::StorageSize(env->isolate(), string, encoding); - - storage_size += chunk_size + 15; - } - - if (storage_size > INT_MAX) { - args.GetReturnValue().Set(UV_ENOBUFS); - return; - } - - if (ARRAY_SIZE(bufs_) < count) - bufs = new uv_buf_t[count]; - - storage_size += sizeof(WriteWrap); - char* storage = new char[storage_size]; - WriteWrap* req_wrap = - new(storage) WriteWrap(env, req_wrap_obj, wrap); - - uint32_t bytes = 0; - size_t offset = sizeof(WriteWrap); - for (size_t i = 0; i < count; i++) { - Handle chunk = chunks->Get(i * 2); - - // Write buffer - if (Buffer::HasInstance(chunk)) { - bufs[i].base = Buffer::Data(chunk); - bufs[i].len = Buffer::Length(chunk); - bytes += bufs[i].len; - continue; - } - - // Write string - offset = ROUND_UP(offset, 16); - CHECK_LT(offset, storage_size); - char* str_storage = storage + offset; - size_t str_size = storage_size - offset; - - Handle string = chunk->ToString(env->isolate()); - enum encoding encoding = ParseEncoding(env->isolate(), - chunks->Get(i * 2 + 1)); - str_size = StringBytes::Write(env->isolate(), - str_storage, - str_size, - string, - encoding); - bufs[i].base = str_storage; - bufs[i].len = str_size; - offset += str_size; - bytes += str_size; - } - - int err = wrap->callbacks()->DoWrite(req_wrap, - bufs, - count, - nullptr, - StreamWrap::AfterWrite); - - // Deallocate space - if (bufs != bufs_) - delete[] bufs; - - req_wrap->Dispatched(); - req_wrap->object()->Set(env->async(), True(env->isolate())); - req_wrap->object()->Set(env->bytes_string(), - Number::New(env->isolate(), bytes)); - const char* msg = wrap->callbacks()->Error(); - if (msg != nullptr) { - req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg)); - wrap->callbacks()->ClearError(); - } - - if (err) { - req_wrap->~WriteWrap(); - delete[] storage; - } - - args.GetReturnValue().Set(err); -} - - -void StreamWrap::WriteAsciiString(const FunctionCallbackInfo& args) { - WriteStringImpl(args); -} - - -void StreamWrap::WriteUtf8String(const FunctionCallbackInfo& args) { - WriteStringImpl(args); -} - - -void StreamWrap::WriteUcs2String(const FunctionCallbackInfo& args) { - WriteStringImpl(args); -} - -void StreamWrap::WriteBinaryString(const FunctionCallbackInfo& args) { - WriteStringImpl(args); -} - void StreamWrap::SetBlocking(const FunctionCallbackInfo& args) { StreamWrap* wrap = Unwrap(args.Holder()); - if (!IsAlive(wrap)) - return args.GetReturnValue().Set(UV_EINVAL); - CHECK_GT(args.Length(), 0); - int err = uv_stream_set_blocking(wrap->stream(), args[0]->IsTrue()); - args.GetReturnValue().Set(err); -} - -void StreamWrap::AfterWrite(uv_write_t* req, int status) { - WriteWrap* req_wrap = ContainerOf(&WriteWrap::req_, req); - StreamWrap* wrap = req_wrap->wrap(); - Environment* env = wrap->env(); - - HandleScope handle_scope(env->isolate()); - Context::Scope context_scope(env->context()); - - // The wrap and request objects should still be there. - CHECK_EQ(req_wrap->persistent().IsEmpty(), false); - CHECK_EQ(wrap->persistent().IsEmpty(), false); - - // Unref handle property - Local req_wrap_obj = req_wrap->object(); - req_wrap_obj->Delete(env->handle_string()); - wrap->callbacks()->AfterWrite(req_wrap); - - Local argv[] = { - Integer::New(env->isolate(), status), - wrap->object(), - req_wrap_obj, - Undefined(env->isolate()) - }; - - const char* msg = wrap->callbacks()->Error(); - if (msg != nullptr) { - argv[3] = OneByteString(env->isolate(), msg); - wrap->callbacks()->ClearError(); - } - - req_wrap->MakeCallback(env->oncomplete_string(), ARRAY_SIZE(argv), argv); - - req_wrap->~WriteWrap(); - delete[] reinterpret_cast(req_wrap); -} - -void StreamWrap::Shutdown(const FunctionCallbackInfo& args) { - Environment* env = Environment::GetCurrent(args); - - StreamWrap* wrap = Unwrap(args.Holder()); - if (!IsAlive(wrap)) + CHECK_GT(args.Length(), 0); + if (!wrap->IsAlive()) return args.GetReturnValue().Set(UV_EINVAL); - CHECK(args[0]->IsObject()); - Local req_wrap_obj = args[0].As(); - - ShutdownWrap* req_wrap = new ShutdownWrap(env, req_wrap_obj); - int err = wrap->callbacks()->DoShutdown(req_wrap, AfterShutdown); - req_wrap->Dispatched(); - if (err) - delete req_wrap; - args.GetReturnValue().Set(err); -} - - -void StreamWrap::AfterShutdown(uv_shutdown_t* req, int status) { - ShutdownWrap* req_wrap = static_cast(req->data); - StreamWrap* wrap = static_cast(req->handle->data); - Environment* env = wrap->env(); - - // The wrap and request objects should still be there. - CHECK_EQ(req_wrap->persistent().IsEmpty(), false); - CHECK_EQ(wrap->persistent().IsEmpty(), false); - - HandleScope handle_scope(env->isolate()); - Context::Scope context_scope(env->context()); - - Local req_wrap_obj = req_wrap->object(); - Local argv[3] = { - Integer::New(env->isolate(), status), - wrap->object(), - req_wrap_obj - }; - - req_wrap->MakeCallback(env->oncomplete_string(), ARRAY_SIZE(argv), argv); - - delete req_wrap; + bool enable = args[0]->IsTrue(); + args.GetReturnValue().Set(uv_stream_set_blocking(wrap->stream(), enable)); } -const char* StreamWrapCallbacks::Error() const { - return nullptr; +int StreamWrap::DoShutdown(ShutdownWrap* req_wrap) { + return uv_shutdown(&req_wrap->req_, stream(), AfterShutdown); } -void StreamWrapCallbacks::ClearError() { +void StreamWrap::AfterShutdown(uv_shutdown_t* req, int status) { + ShutdownWrap* req_wrap = ContainerOf(&ShutdownWrap::req_, req); + req_wrap->Done(status); } @@ -621,13 +284,13 @@ void StreamWrapCallbacks::ClearError() { // values, shifting their base and decrementing their length. This is // required in order to skip the data that was successfully written via // uv_try_write(). -int StreamWrapCallbacks::TryWrite(uv_buf_t** bufs, size_t* count) { +int StreamWrap::DoTryWrite(uv_buf_t** bufs, size_t* count) { int err; size_t written; uv_buf_t* vbufs = *bufs; size_t vcount = *count; - err = uv_try_write(wrap()->stream(), vbufs, vcount); + err = uv_try_write(stream(), vbufs, vcount); if (err == UV_ENOSYS || err == UV_EAGAIN) return 0; if (err < 0) @@ -657,106 +320,53 @@ int StreamWrapCallbacks::TryWrite(uv_buf_t** bufs, size_t* count) { } -int StreamWrapCallbacks::DoWrite(WriteWrap* w, - uv_buf_t* bufs, - size_t count, - uv_stream_t* send_handle, - uv_write_cb cb) { +int StreamWrap::DoWrite(WriteWrap* w, + uv_buf_t* bufs, + size_t count, + uv_stream_t* send_handle) { int r; if (send_handle == nullptr) { - r = uv_write(&w->req_, wrap()->stream(), bufs, count, cb); + r = uv_write(&w->req_, stream(), bufs, count, AfterWrite); } else { - r = uv_write2(&w->req_, wrap()->stream(), bufs, count, send_handle, cb); + r = uv_write2(&w->req_, stream(), bufs, count, send_handle, AfterWrite); } if (!r) { size_t bytes = 0; for (size_t i = 0; i < count; i++) bytes += bufs[i].len; - if (wrap()->stream()->type == UV_TCP) { + if (stream()->type == UV_TCP) { NODE_COUNT_NET_BYTES_SENT(bytes); - } else if (wrap()->stream()->type == UV_NAMED_PIPE) { + } else if (stream()->type == UV_NAMED_PIPE) { NODE_COUNT_PIPE_BYTES_SENT(bytes); } } - wrap()->UpdateWriteQueueSize(); + UpdateWriteQueueSize(); return r; } -void StreamWrapCallbacks::AfterWrite(WriteWrap* w) { - wrap()->UpdateWriteQueueSize(); +void StreamWrap::AfterWrite(uv_write_t* req, int status) { + WriteWrap* req_wrap = ContainerOf(&WriteWrap::req_, req); + req_wrap->Done(status); } -void StreamWrapCallbacks::DoAlloc(uv_handle_t* handle, - size_t suggested_size, - uv_buf_t* buf) { - buf->base = static_cast(malloc(suggested_size)); - buf->len = suggested_size; - - if (buf->base == nullptr && suggested_size > 0) { - FatalError( - "node::StreamWrapCallbacks::DoAlloc(uv_handle_t*, size_t, uv_buf_t*)", - "Out Of Memory"); - } +void StreamWrap::OnAfterWriteImpl(WriteWrap* w, void* ctx) { + StreamWrap* wrap = static_cast(ctx); + wrap->UpdateWriteQueueSize(); } -void StreamWrapCallbacks::DoRead(uv_stream_t* handle, - ssize_t nread, - const uv_buf_t* buf, - uv_handle_type pending) { - Environment* env = wrap()->env(); - HandleScope handle_scope(env->isolate()); - Context::Scope context_scope(env->context()); - - Local argv[] = { - Integer::New(env->isolate(), nread), - Undefined(env->isolate()), - Undefined(env->isolate()) - }; - - if (nread < 0) { - if (buf->base != nullptr) - free(buf->base); - wrap()->MakeCallback(env->onread_string(), ARRAY_SIZE(argv), argv); - return; - } - - if (nread == 0) { - if (buf->base != nullptr) - free(buf->base); - return; - } - - char* base = static_cast(realloc(buf->base, nread)); - CHECK_LE(static_cast(nread), buf->len); - argv[1] = Buffer::Use(env, base, nread); - - Local pending_obj; - if (pending == UV_TCP) { - pending_obj = AcceptHandle(env, handle, wrap()); - } else if (pending == UV_NAMED_PIPE) { - pending_obj = AcceptHandle(env, handle, wrap()); - } else if (pending == UV_UDP) { - pending_obj = AcceptHandle(env, handle, wrap()); - } else { - CHECK_EQ(pending, UV_UNKNOWN_HANDLE); - } - - if (!pending_obj.IsEmpty()) { - argv[2] = pending_obj; - } - - wrap()->MakeCallback(env->onread_string(), ARRAY_SIZE(argv), argv); +const char* StreamWrap::Error() const { + return nullptr; } -int StreamWrapCallbacks::DoShutdown(ShutdownWrap* req_wrap, uv_shutdown_cb cb) { - return uv_shutdown(&req_wrap->req_, wrap()->stream(), cb); +void StreamWrap::ClearError() { + // No-op } } // namespace node diff --git a/src/stream_wrap.h b/src/stream_wrap.h index 5148228112..ca673b4ef1 100644 --- a/src/stream_wrap.h +++ b/src/stream_wrap.h @@ -1,10 +1,10 @@ #ifndef SRC_STREAM_WRAP_H_ #define SRC_STREAM_WRAP_H_ +#include "stream_base.h" + #include "env.h" #include "handle_wrap.h" -#include "req-wrap.h" -#include "req-wrap-inl.h" #include "string_bytes.h" #include "v8.h" @@ -13,126 +13,31 @@ namespace node { // Forward declaration class StreamWrap; -class ShutdownWrap : public ReqWrap { - public: - ShutdownWrap(Environment* env, v8::Local req_wrap_obj) - : ReqWrap(env, req_wrap_obj, AsyncWrap::PROVIDER_SHUTDOWNWRAP) { - Wrap(req_wrap_obj, this); - } - - static void NewShutdownWrap(const v8::FunctionCallbackInfo& args) { - CHECK(args.IsConstructCall()); - } -}; - -class WriteWrap: public ReqWrap { - public: - // TODO(trevnorris): WrapWrap inherits from ReqWrap, which I've globbed - // into the same provider. How should these be broken apart? - WriteWrap(Environment* env, v8::Local obj, StreamWrap* wrap) - : ReqWrap(env, obj, AsyncWrap::PROVIDER_WRITEWRAP), - wrap_(wrap) { - Wrap(obj, this); - } - - void* operator new(size_t size, char* storage) { return storage; } - - // This is just to keep the compiler happy. It should never be called, since - // we don't use exceptions in node. - void operator delete(void* ptr, char* storage) { UNREACHABLE(); } - - inline StreamWrap* wrap() const { - return wrap_; - } - - static void NewWriteWrap(const v8::FunctionCallbackInfo& args) { - CHECK(args.IsConstructCall()); - } - - private: - // People should not be using the non-placement new and delete operator on a - // WriteWrap. Ensure this never happens. - void* operator new(size_t size) { UNREACHABLE(); } - void operator delete(void* ptr) { UNREACHABLE(); } - - StreamWrap* const wrap_; -}; - -// Overridable callbacks' types -class StreamWrapCallbacks { - public: - explicit StreamWrapCallbacks(StreamWrap* wrap) : wrap_(wrap) { - } - - explicit StreamWrapCallbacks(StreamWrapCallbacks* old) : wrap_(old->wrap()) { - } - - virtual ~StreamWrapCallbacks() = default; - - virtual const char* Error() const; - virtual void ClearError(); - - virtual int TryWrite(uv_buf_t** bufs, size_t* count); - - virtual int DoWrite(WriteWrap* w, - uv_buf_t* bufs, - size_t count, - uv_stream_t* send_handle, - uv_write_cb cb); - virtual void AfterWrite(WriteWrap* w); - virtual void DoAlloc(uv_handle_t* handle, - size_t suggested_size, - uv_buf_t* buf); - virtual void DoRead(uv_stream_t* handle, - ssize_t nread, - const uv_buf_t* buf, - uv_handle_type pending); - virtual int DoShutdown(ShutdownWrap* req_wrap, uv_shutdown_cb cb); - - protected: - inline StreamWrap* wrap() const { - return wrap_; - } - - private: - StreamWrap* const wrap_; -}; - -class StreamWrap : public HandleWrap { +class StreamWrap : public HandleWrap, public StreamBase { public: static void Initialize(v8::Handle target, v8::Handle unused, v8::Handle context); - void OverrideCallbacks(StreamWrapCallbacks* callbacks, bool gc) { - StreamWrapCallbacks* old = callbacks_; - callbacks_ = callbacks; - callbacks_gc_ = gc; - if (old != &default_callbacks_) - delete old; - } - - static void GetFD(v8::Local, - const v8::PropertyCallbackInfo&); + int GetFD() const override; + void* Cast() override; + bool IsAlive() const override; + bool IsClosing() const override; + bool IsIPCPipe() const override; // JavaScript functions - static void ReadStart(const v8::FunctionCallbackInfo& args); - static void ReadStop(const v8::FunctionCallbackInfo& args); - static void Shutdown(const v8::FunctionCallbackInfo& args); - - static void Writev(const v8::FunctionCallbackInfo& args); - static void WriteBuffer(const v8::FunctionCallbackInfo& args); - static void WriteAsciiString(const v8::FunctionCallbackInfo& args); - static void WriteUtf8String(const v8::FunctionCallbackInfo& args); - static void WriteUcs2String(const v8::FunctionCallbackInfo& args); - static void WriteBinaryString( - const v8::FunctionCallbackInfo& args); - - static void SetBlocking(const v8::FunctionCallbackInfo& args); - - inline StreamWrapCallbacks* callbacks() const { - return callbacks_; - } + int ReadStart() override; + int ReadStop() override; + + // Resource implementation + int DoShutdown(ShutdownWrap* req_wrap) override; + int DoTryWrite(uv_buf_t** bufs, size_t* count) override; + int DoWrite(WriteWrap* w, + uv_buf_t* bufs, + size_t count, + uv_stream_t* send_handle) override; + const char* Error() const override; + void ClearError() override; inline uv_stream_t* stream() const { return stream_; @@ -152,8 +57,6 @@ class StreamWrap : public HandleWrap { } protected: - static size_t WriteBuffer(v8::Handle val, uv_buf_t* buf); - StreamWrap(Environment* env, v8::Local object, uv_stream_t* stream, @@ -161,22 +64,21 @@ class StreamWrap : public HandleWrap { AsyncWrap* parent = nullptr); ~StreamWrap() { - if (!callbacks_gc_ && callbacks_ != &default_callbacks_) { - delete callbacks_; - } - callbacks_ = nullptr; } - void StateChange() { } + AsyncWrap* GetAsyncWrap() override; void UpdateWriteQueueSize(); + static void AddMethods(Environment* env, + v8::Handle target); + private: + static void SetBlocking(const v8::FunctionCallbackInfo& args); + // Callbacks for libuv - static void AfterWrite(uv_write_t* req, int status); static void OnAlloc(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); - static void AfterShutdown(uv_shutdown_t* req, int status); static void OnRead(uv_stream_t* handle, ssize_t nread, @@ -185,16 +87,18 @@ class StreamWrap : public HandleWrap { ssize_t nread, const uv_buf_t* buf, uv_handle_type pending); + static void AfterWrite(uv_write_t* req, int status); + static void AfterShutdown(uv_shutdown_t* req, int status); - template - static void WriteStringImpl(const v8::FunctionCallbackInfo& args); + // Resource interface implementation + static void OnAfterWriteImpl(WriteWrap* w, void* ctx); + static void OnAllocImpl(size_t size, uv_buf_t* buf, void* ctx); + static void OnReadImpl(ssize_t nread, + const uv_buf_t* buf, + uv_handle_type pending, + void* ctx); uv_stream_t* const stream_; - StreamWrapCallbacks default_callbacks_; - StreamWrapCallbacks* callbacks_; // Overridable callbacks - bool callbacks_gc_; - - friend class StreamWrapCallbacks; }; diff --git a/src/tcp_wrap.cc b/src/tcp_wrap.cc index 3f011422ef..a823e758ee 100644 --- a/src/tcp_wrap.cc +++ b/src/tcp_wrap.cc @@ -72,15 +72,6 @@ void TCPWrap::Initialize(Handle target, t->SetClassName(FIXED_ONE_BYTE_STRING(env->isolate(), "TCP")); t->InstanceTemplate()->SetInternalFieldCount(1); - enum PropertyAttribute attributes = - static_cast(v8::ReadOnly | v8::DontDelete); - t->InstanceTemplate()->SetAccessor(env->fd_string(), - StreamWrap::GetFD, - nullptr, - Handle(), - v8::DEFAULT, - attributes); - // Init properties t->InstanceTemplate()->Set(String::NewFromUtf8(env->isolate(), "reading"), Boolean::New(env->isolate(), false)); @@ -98,16 +89,7 @@ void TCPWrap::Initialize(Handle target, env->SetProtoMethod(t, "ref", HandleWrap::Ref); env->SetProtoMethod(t, "unref", HandleWrap::Unref); - env->SetProtoMethod(t, "readStart", StreamWrap::ReadStart); - env->SetProtoMethod(t, "readStop", StreamWrap::ReadStop); - env->SetProtoMethod(t, "shutdown", StreamWrap::Shutdown); - - env->SetProtoMethod(t, "writeBuffer", StreamWrap::WriteBuffer); - env->SetProtoMethod(t, "writeAsciiString", StreamWrap::WriteAsciiString); - env->SetProtoMethod(t, "writeUtf8String", StreamWrap::WriteUtf8String); - env->SetProtoMethod(t, "writeUcs2String", StreamWrap::WriteUcs2String); - env->SetProtoMethod(t, "writeBinaryString", StreamWrap::WriteBinaryString); - env->SetProtoMethod(t, "writev", StreamWrap::Writev); + StreamWrap::AddMethods(env, t); env->SetProtoMethod(t, "open", Open); env->SetProtoMethod(t, "bind", Bind); diff --git a/src/tls_wrap.cc b/src/tls_wrap.cc index 9aafe3925d..ab8db6951b 100644 --- a/src/tls_wrap.cc +++ b/src/tls_wrap.cc @@ -33,17 +33,20 @@ using v8::String; using v8::Value; -TLSCallbacks::TLSCallbacks(Environment* env, - Kind kind, - Handle sc, - StreamWrapCallbacks* old) - : SSLWrap(env, Unwrap(sc), kind), - StreamWrapCallbacks(old), +TLSWrap::TLSWrap(Environment* env, + Kind kind, + StreamBase* stream, + Handle stream_obj, + Handle sc) + : SSLWrap(env, Unwrap(sc), kind), + StreamBase(env), AsyncWrap(env, env->tls_wrap_constructor_function()->NewInstance(), AsyncWrap::PROVIDER_TLSWRAP), sc_(Unwrap(sc)), sc_handle_(env->isolate(), sc), + stream_(stream), + stream_handle_(env->isolate(), stream_obj), enc_in_(nullptr), enc_out_(nullptr), clear_in_(nullptr), @@ -58,14 +61,22 @@ TLSCallbacks::TLSCallbacks(Environment* env, MakeWeak(this); // We've our own session callbacks - SSL_CTX_sess_set_get_cb(sc_->ctx_, SSLWrap::GetSessionCallback); - SSL_CTX_sess_set_new_cb(sc_->ctx_, SSLWrap::NewSessionCallback); + SSL_CTX_sess_set_get_cb(sc_->ctx_, SSLWrap::GetSessionCallback); + SSL_CTX_sess_set_new_cb(sc_->ctx_, SSLWrap::NewSessionCallback); + + stream_->Consume(); + stream_->set_after_write_cb(OnAfterWriteImpl, this); + stream_->set_alloc_cb(OnAllocImpl, this); + stream_->set_read_cb(OnReadImpl, this); + + set_alloc_cb(OnAllocSelf, this); + set_read_cb(OnReadSelf, this); InitSSL(); } -TLSCallbacks::~TLSCallbacks() { +TLSWrap::~TLSWrap() { enc_in_ = nullptr; enc_out_ = nullptr; delete clear_in_; @@ -73,6 +84,7 @@ TLSCallbacks::~TLSCallbacks() { sc_ = nullptr; sc_handle_.Reset(); + stream_handle_.Reset(); persistent().Reset(); #ifdef SSL_CTRL_SET_TLSEXT_SERVERNAME_CB @@ -90,12 +102,12 @@ TLSCallbacks::~TLSCallbacks() { } -void TLSCallbacks::MakePending() { +void TLSWrap::MakePending() { write_item_queue_.MoveBack(&pending_write_items_); } -bool TLSCallbacks::InvokeQueued(int status) { +bool TLSWrap::InvokeQueued(int status) { if (pending_write_items_.IsEmpty()) return false; @@ -103,7 +115,7 @@ bool TLSCallbacks::InvokeQueued(int status) { WriteItemList queue; pending_write_items_.MoveBack(&queue); while (WriteItem* wi = queue.PopFront()) { - wi->cb_(&wi->w_->req_, status); + wi->w_->Done(status); delete wi; } @@ -111,12 +123,12 @@ bool TLSCallbacks::InvokeQueued(int status) { } -void TLSCallbacks::NewSessionDoneCb() { +void TLSWrap::NewSessionDoneCb() { Cycle(); } -void TLSCallbacks::InitSSL() { +void TLSWrap::InitSSL() { // Initialize SSL enc_in_ = NodeBIO::New(); enc_out_ = NodeBIO::New(); @@ -158,7 +170,7 @@ void TLSCallbacks::InitSSL() { } -void TLSCallbacks::Wrap(const FunctionCallbackInfo& args) { +void TLSWrap::Wrap(const FunctionCallbackInfo& args) { Environment* env = Environment::GetCurrent(args); if (args.Length() < 1 || !args[0]->IsObject()) { @@ -172,42 +184,39 @@ void TLSCallbacks::Wrap(const FunctionCallbackInfo& args) { if (args.Length() < 3 || !args[2]->IsBoolean()) return env->ThrowTypeError("Third argument should be boolean"); - Local stream = args[0].As(); + Local stream_obj = args[0].As(); Local sc = args[1].As(); - Kind kind = args[2]->IsTrue() ? SSLWrap::kServer : - SSLWrap::kClient; + Kind kind = args[2]->IsTrue() ? SSLWrap::kServer : + SSLWrap::kClient; - TLSCallbacks* callbacks = nullptr; - WITH_GENERIC_STREAM(env, stream, { - callbacks = new TLSCallbacks(env, kind, sc, wrap->callbacks()); - wrap->OverrideCallbacks(callbacks, true); + StreamBase* stream = nullptr; + WITH_GENERIC_STREAM(env, stream_obj, { + stream = wrap; }); + CHECK_NE(stream, nullptr); - if (callbacks == nullptr) { - return args.GetReturnValue().SetNull(); - } + TLSWrap* res = new TLSWrap(env, kind, stream, stream_obj, sc); - args.GetReturnValue().Set(callbacks->persistent()); + args.GetReturnValue().Set(res->persistent()); } -void TLSCallbacks::Receive(const FunctionCallbackInfo& args) { - TLSCallbacks* wrap = Unwrap(args.Holder()); +void TLSWrap::Receive(const FunctionCallbackInfo& args) { + TLSWrap* wrap = Unwrap(args.Holder()); CHECK(Buffer::HasInstance(args[0])); char* data = Buffer::Data(args[0]); size_t len = Buffer::Length(args[0]); uv_buf_t buf; - uv_stream_t* stream = wrap->wrap()->stream(); // Copy given buffer entirely or partiall if handle becomes closed - while (len > 0 && !uv_is_closing(reinterpret_cast(stream))) { - wrap->DoAlloc(reinterpret_cast(stream), len, &buf); + while (len > 0 && !wrap->IsClosing()) { + wrap->stream_->OnAlloc(len, &buf); size_t copy = buf.len > len ? len : buf.len; memcpy(buf.base, data, copy); buf.len = copy; - wrap->DoRead(stream, buf.len, &buf, UV_UNKNOWN_HANDLE); + wrap->stream_->OnRead(buf.len, &buf, UV_UNKNOWN_HANDLE); data += copy; len -= copy; @@ -215,10 +224,10 @@ void TLSCallbacks::Receive(const FunctionCallbackInfo& args) { } -void TLSCallbacks::Start(const FunctionCallbackInfo& args) { +void TLSWrap::Start(const FunctionCallbackInfo& args) { Environment* env = Environment::GetCurrent(args); - TLSCallbacks* wrap = Unwrap(args.Holder()); + TLSWrap* wrap = Unwrap(args.Holder()); if (wrap->started_) return env->ThrowError("Already started."); @@ -231,14 +240,14 @@ void TLSCallbacks::Start(const FunctionCallbackInfo& args) { } -void TLSCallbacks::SSLInfoCallback(const SSL* ssl_, int where, int ret) { +void TLSWrap::SSLInfoCallback(const SSL* ssl_, int where, int ret) { if (!(where & (SSL_CB_HANDSHAKE_START | SSL_CB_HANDSHAKE_DONE))) return; // Be compatible with older versions of OpenSSL. SSL_get_app_data() wants // a non-const SSL* in OpenSSL <= 0.9.7e. SSL* ssl = const_cast(ssl_); - TLSCallbacks* c = static_cast(SSL_get_app_data(ssl)); + TLSWrap* c = static_cast(SSL_get_app_data(ssl)); Environment* env = c->env(); HandleScope handle_scope(env->isolate()); Context::Scope context_scope(env->context()); @@ -261,7 +270,7 @@ void TLSCallbacks::SSLInfoCallback(const SSL* ssl_, int where, int ret) { } -void TLSCallbacks::EncOut() { +void TLSWrap::EncOut() { // Ignore cycling data if ClientHello wasn't yet parsed if (!hello_parser_.IsEnded()) return; @@ -291,47 +300,49 @@ void TLSCallbacks::EncOut() { write_size_ = NodeBIO::FromBIO(enc_out_)->PeekMultiple(data, size, &count); CHECK(write_size_ != 0 && count != 0); - write_req_.data = this; + Local req_wrap_obj = + env()->write_wrap_constructor_function()->NewInstance(); + char* storage = new char[sizeof(WriteWrap)]; + WriteWrap* write_req = new(storage) WriteWrap(env(), + req_wrap_obj, + this, + EncOutCb); + uv_buf_t buf[ARRAY_SIZE(data)]; for (size_t i = 0; i < count; i++) buf[i] = uv_buf_init(data[i], size[i]); - int r = uv_write(&write_req_, wrap()->stream(), buf, count, EncOutCb); + int r = stream_->DoWrite(write_req, buf, count, nullptr); // Ignore errors, this should be already handled in js - if (!r) { - if (wrap()->is_tcp()) { - NODE_COUNT_NET_BYTES_SENT(write_size_); - } else if (wrap()->is_named_pipe()) { - NODE_COUNT_PIPE_BYTES_SENT(write_size_); - } - } + if (!r) + NODE_COUNT_NET_BYTES_SENT(write_size_); } -void TLSCallbacks::EncOutCb(uv_write_t* req, int status) { - TLSCallbacks* callbacks = static_cast(req->data); +void TLSWrap::EncOutCb(WriteWrap* req_wrap, int status) { + TLSWrap* wrap = req_wrap->wrap()->Cast(); // Handle error if (status) { // Ignore errors after shutdown - if (callbacks->shutdown_) + if (wrap->shutdown_) return; // Notify about error - callbacks->InvokeQueued(status); + wrap->InvokeQueued(status); return; } // Commit - NodeBIO::FromBIO(callbacks->enc_out_)->Read(nullptr, callbacks->write_size_); + NodeBIO::FromBIO(wrap->enc_out_)->Read(nullptr, wrap->write_size_); // Try writing more data - callbacks->write_size_ = 0; - callbacks->EncOut(); + wrap->write_size_ = 0; + wrap->EncOut(); } -Local TLSCallbacks::GetSSLError(int status, int* err, const char** msg) { +Local TLSWrap::GetSSLError(int status, int* err, const char** msg) { EscapableHandleScope scope(env()->isolate()); *err = SSL_get_error(ssl_, status); @@ -373,7 +384,7 @@ Local TLSCallbacks::GetSSLError(int status, int* err, const char** msg) { } -void TLSCallbacks::ClearOut() { +void TLSWrap::ClearOut() { // Ignore cycling data if ClientHello wasn't yet parsed if (!hello_parser_.IsEnded()) return; @@ -389,22 +400,30 @@ void TLSCallbacks::ClearOut() { char out[kClearOutChunkSize]; int read; - do { + for (;;) { read = SSL_read(ssl_, out, sizeof(out)); - if (read > 0) { - Local argv[] = { - Integer::New(env()->isolate(), read), - Buffer::New(env(), out, read) - }; - wrap()->MakeCallback(env()->onread_string(), ARRAY_SIZE(argv), argv); + + if (read <= 0) + break; + + while (read > 0) { + int avail = read; + + uv_buf_t buf; + OnAlloc(avail, &buf); + if (static_cast(buf.len) < avail) + avail = buf.len; + memcpy(buf.base, out, avail); + OnRead(avail, &buf, UV_UNKNOWN_HANDLE); + + read -= avail; } - } while (read > 0); + } int flags = SSL_get_shutdown(ssl_); if (!eof_ && flags & SSL_RECEIVED_SHUTDOWN) { eof_ = true; - Local arg = Integer::New(env()->isolate(), UV_EOF); - wrap()->MakeCallback(env()->onread_string(), 1, &arg); + OnRead(UV_EOF, nullptr, UV_UNKNOWN_HANDLE); } if (read == -1) { @@ -427,7 +446,7 @@ void TLSCallbacks::ClearOut() { } -bool TLSCallbacks::ClearIn() { +bool TLSWrap::ClearIn() { // Ignore cycling data if ClientHello wasn't yet parsed if (!hello_parser_.IsEnded()) return false; @@ -466,28 +485,67 @@ bool TLSCallbacks::ClearIn() { } -const char* TLSCallbacks::Error() const { +void* TLSWrap::Cast() { + return reinterpret_cast(this); +} + + +AsyncWrap* TLSWrap::GetAsyncWrap() { + return static_cast(this); +} + + +bool TLSWrap::IsIPCPipe() const { + return stream_->IsIPCPipe(); +} + + +int TLSWrap::GetFD() const { + return stream_->GetFD(); +} + + +bool TLSWrap::IsAlive() const { + return stream_->IsAlive(); +} + + +bool TLSWrap::IsClosing() const { + return stream_->IsClosing(); +} + + +int TLSWrap::ReadStart() { + return stream_->ReadStart(); +} + + +int TLSWrap::ReadStop() { + return stream_->ReadStop(); +} + + +const char* TLSWrap::Error() const { return error_; } -void TLSCallbacks::ClearError() { +void TLSWrap::ClearError() { delete[] error_; error_ = nullptr; } -int TLSCallbacks::TryWrite(uv_buf_t** bufs, size_t* count) { +int TLSWrap::DoTryWrite(uv_buf_t** bufs, size_t* count) { // TODO(indutny): Support it return 0; } -int TLSCallbacks::DoWrite(WriteWrap* w, - uv_buf_t* bufs, - size_t count, - uv_stream_t* send_handle, - uv_write_cb cb) { +int TLSWrap::DoWrite(WriteWrap* w, + uv_buf_t* bufs, + size_t count, + uv_stream_t* send_handle) { CHECK_EQ(send_handle, nullptr); bool empty = true; @@ -504,11 +562,11 @@ int TLSCallbacks::DoWrite(WriteWrap* w, // However if there any data that should be written to socket, // callback should not be invoked immediately if (BIO_pending(enc_out_) == 0) - return uv_write(&w->req_, wrap()->stream(), bufs, count, cb); + return stream_->DoWrite(w, bufs, count, send_handle); } // Queue callback to execute it on next tick - write_item_queue_.PushBack(new WriteItem(w, cb)); + write_item_queue_.PushBack(new WriteItem(w)); // Write queued data if (empty) { @@ -552,24 +610,51 @@ int TLSCallbacks::DoWrite(WriteWrap* w, } -void TLSCallbacks::AfterWrite(WriteWrap* w) { +void TLSWrap::OnAfterWriteImpl(WriteWrap* w, void* ctx) { // Intentionally empty } -void TLSCallbacks::DoAlloc(uv_handle_t* handle, - size_t suggested_size, - uv_buf_t* buf) { +void TLSWrap::OnAllocImpl(size_t suggested_size, uv_buf_t* buf, void* ctx) { + TLSWrap* wrap = static_cast(ctx); + size_t size = 0; - buf->base = NodeBIO::FromBIO(enc_in_)->PeekWritable(&size); + buf->base = NodeBIO::FromBIO(wrap->enc_in_)->PeekWritable(&size); buf->len = size; } -void TLSCallbacks::DoRead(uv_stream_t* handle, - ssize_t nread, - const uv_buf_t* buf, - uv_handle_type pending) { +void TLSWrap::OnReadImpl(ssize_t nread, + const uv_buf_t* buf, + uv_handle_type pending, + void* ctx) { + TLSWrap* wrap = static_cast(ctx); + wrap->DoRead(nread, buf, pending); +} + + +void TLSWrap::OnAllocSelf(size_t suggested_size, uv_buf_t* buf, void* ctx) { + buf->base = static_cast(malloc(suggested_size)); + CHECK_NE(buf->base, nullptr); + buf->len = suggested_size; +} + + +void TLSWrap::OnReadSelf(ssize_t nread, + const uv_buf_t* buf, + uv_handle_type pending, + void* ctx) { + TLSWrap* wrap = static_cast(ctx); + Local buf_obj; + if (buf != nullptr) + buf_obj = Buffer::Use(wrap->env(), buf->base, buf->len); + wrap->EmitData(nread, buf_obj, Local()); +} + + +void TLSWrap::DoRead(ssize_t nread, + const uv_buf_t* buf, + uv_handle_type pending) { if (nread < 0) { // Error should be emitted only after all data was read ClearOut(); @@ -583,8 +668,7 @@ void TLSCallbacks::DoRead(uv_stream_t* handle, HandleScope handle_scope(env()->isolate()); Context::Scope context_scope(env()->context()); - Local arg = Integer::New(env()->isolate(), nread); - wrap()->MakeCallback(env()->onread_string(), 1, &arg); + OnRead(nread, nullptr, UV_UNKNOWN_HANDLE); return; } @@ -608,19 +692,19 @@ void TLSCallbacks::DoRead(uv_stream_t* handle, } -int TLSCallbacks::DoShutdown(ShutdownWrap* req_wrap, uv_shutdown_cb cb) { +int TLSWrap::DoShutdown(ShutdownWrap* req_wrap) { if (SSL_shutdown(ssl_) == 0) SSL_shutdown(ssl_); shutdown_ = true; EncOut(); - return StreamWrapCallbacks::DoShutdown(req_wrap, cb); + return stream_->DoShutdown(req_wrap); } -void TLSCallbacks::SetVerifyMode(const FunctionCallbackInfo& args) { +void TLSWrap::SetVerifyMode(const FunctionCallbackInfo& args) { Environment* env = Environment::GetCurrent(args); - TLSCallbacks* wrap = Unwrap(args.Holder()); + TLSWrap* wrap = Unwrap(args.Holder()); if (args.Length() < 2 || !args[0]->IsBoolean() || !args[1]->IsBoolean()) return env->ThrowTypeError("Bad arguments, expected two booleans"); @@ -647,34 +731,34 @@ void TLSCallbacks::SetVerifyMode(const FunctionCallbackInfo& args) { } -void TLSCallbacks::EnableSessionCallbacks( +void TLSWrap::EnableSessionCallbacks( const FunctionCallbackInfo& args) { - TLSCallbacks* wrap = Unwrap(args.Holder()); + TLSWrap* wrap = Unwrap(args.Holder()); wrap->enable_session_callbacks(); EnableHelloParser(args); } -void TLSCallbacks::EnableHelloParser(const FunctionCallbackInfo& args) { - TLSCallbacks* wrap = Unwrap(args.Holder()); +void TLSWrap::EnableHelloParser(const FunctionCallbackInfo& args) { + TLSWrap* wrap = Unwrap(args.Holder()); NodeBIO::FromBIO(wrap->enc_in_)->set_initial(kMaxHelloLength); - wrap->hello_parser_.Start(SSLWrap::OnClientHello, + wrap->hello_parser_.Start(SSLWrap::OnClientHello, OnClientHelloParseEnd, wrap); } -void TLSCallbacks::OnClientHelloParseEnd(void* arg) { - TLSCallbacks* c = static_cast(arg); +void TLSWrap::OnClientHelloParseEnd(void* arg) { + TLSWrap* c = static_cast(arg); c->Cycle(); } #ifdef SSL_CTRL_SET_TLSEXT_SERVERNAME_CB -void TLSCallbacks::GetServername(const FunctionCallbackInfo& args) { +void TLSWrap::GetServername(const FunctionCallbackInfo& args) { Environment* env = Environment::GetCurrent(args); - TLSCallbacks* wrap = Unwrap(args.Holder()); + TLSWrap* wrap = Unwrap(args.Holder()); const char* servername = SSL_get_servername(wrap->ssl_, TLSEXT_NAMETYPE_host_name); @@ -686,10 +770,10 @@ void TLSCallbacks::GetServername(const FunctionCallbackInfo& args) { } -void TLSCallbacks::SetServername(const FunctionCallbackInfo& args) { +void TLSWrap::SetServername(const FunctionCallbackInfo& args) { Environment* env = Environment::GetCurrent(args); - TLSCallbacks* wrap = Unwrap(args.Holder()); + TLSWrap* wrap = Unwrap(args.Holder()); if (args.Length() < 1 || !args[0]->IsString()) return env->ThrowTypeError("First argument should be a string"); @@ -707,8 +791,8 @@ void TLSCallbacks::SetServername(const FunctionCallbackInfo& args) { } -int TLSCallbacks::SelectSNIContextCallback(SSL* s, int* ad, void* arg) { - TLSCallbacks* p = static_cast(SSL_get_app_data(s)); +int TLSWrap::SelectSNIContextCallback(SSL* s, int* ad, void* arg) { + TLSWrap* p = static_cast(SSL_get_app_data(s)); Environment* env = p->env(); const char* servername = SSL_get_servername(s, TLSEXT_NAMETYPE_host_name); @@ -744,12 +828,12 @@ int TLSCallbacks::SelectSNIContextCallback(SSL* s, int* ad, void* arg) { #endif // SSL_CTRL_SET_TLSEXT_SERVERNAME_CB -void TLSCallbacks::Initialize(Handle target, +void TLSWrap::Initialize(Handle target, Handle unused, Handle context) { Environment* env = Environment::GetCurrent(context); - env->SetMethod(target, "wrap", TLSCallbacks::Wrap); + env->SetMethod(target, "wrap", TLSWrap::Wrap); Local t = FunctionTemplate::New(env->isolate()); t->InstanceTemplate()->SetInternalFieldCount(1); @@ -761,16 +845,18 @@ void TLSCallbacks::Initialize(Handle target, env->SetProtoMethod(t, "enableSessionCallbacks", EnableSessionCallbacks); env->SetProtoMethod(t, "enableHelloParser", EnableHelloParser); - SSLWrap::AddMethods(env, t); + StreamBase::AddMethods(env, t); + SSLWrap::AddMethods(env, t); #ifdef SSL_CTRL_SET_TLSEXT_SERVERNAME_CB env->SetProtoMethod(t, "getServername", GetServername); env->SetProtoMethod(t, "setServername", SetServername); #endif // SSL_CRT_SET_TLSEXT_SERVERNAME_CB + env->set_tls_wrap_constructor_template(t); env->set_tls_wrap_constructor_function(t->GetFunction()); } } // namespace node -NODE_MODULE_CONTEXT_AWARE_BUILTIN(tls_wrap, node::TLSCallbacks::Initialize) +NODE_MODULE_CONTEXT_AWARE_BUILTIN(tls_wrap, node::TLSWrap::Initialize) diff --git a/src/tls_wrap.h b/src/tls_wrap.h index 3815878d58..42452055ce 100644 --- a/src/tls_wrap.h +++ b/src/tls_wrap.h @@ -21,33 +21,33 @@ namespace crypto { class SecureContext; } -class TLSCallbacks : public crypto::SSLWrap, - public StreamWrapCallbacks, - public AsyncWrap { +class TLSWrap : public crypto::SSLWrap, + public StreamBase, + public AsyncWrap { public: - ~TLSCallbacks() override; + ~TLSWrap() override; static void Initialize(v8::Handle target, v8::Handle unused, v8::Handle context); - const char* Error() const override; - void ClearError() override; - int TryWrite(uv_buf_t** bufs, size_t* count) override; + void* Cast() override; + int GetFD() const override; + bool IsAlive() const override; + bool IsClosing() const override; + + // JavaScript functions + int ReadStart() override; + int ReadStop() override; + + int DoShutdown(ShutdownWrap* req_wrap) override; + int DoTryWrite(uv_buf_t** bufs, size_t* count) override; int DoWrite(WriteWrap* w, uv_buf_t* bufs, size_t count, - uv_stream_t* send_handle, - uv_write_cb cb) override; - void AfterWrite(WriteWrap* w) override; - void DoAlloc(uv_handle_t* handle, - size_t suggested_size, - uv_buf_t* buf) override; - void DoRead(uv_stream_t* handle, - ssize_t nread, - const uv_buf_t* buf, - uv_handle_type pending) override; - int DoShutdown(ShutdownWrap* req_wrap, uv_shutdown_cb cb) override; + uv_stream_t* send_handle) override; + const char* Error() const override; + void ClearError() override; void NewSessionDoneCb(); @@ -66,27 +66,26 @@ class TLSCallbacks : public crypto::SSLWrap, // Write callback queue's item class WriteItem { public: - WriteItem(WriteWrap* w, uv_write_cb cb) : w_(w), cb_(cb) { + explicit WriteItem(WriteWrap* w) : w_(w) { } ~WriteItem() { w_ = nullptr; - cb_ = nullptr; } WriteWrap* w_; - uv_write_cb cb_; ListNode member_; }; - TLSCallbacks(Environment* env, - Kind kind, - v8::Handle sc, - StreamWrapCallbacks* old); + TLSWrap(Environment* env, + Kind kind, + StreamBase* steram, + v8::Handle stream_obj, + v8::Handle sc); static void SSLInfoCallback(const SSL* ssl_, int where, int ret); void InitSSL(); void EncOut(); - static void EncOutCb(uv_write_t* req, int status); + static void EncOutCb(WriteWrap* req_wrap, int status); bool ClearIn(); void ClearOut(); void MakePending(); @@ -104,6 +103,25 @@ class TLSCallbacks : public crypto::SSLWrap, } } + AsyncWrap* GetAsyncWrap() override; + bool IsIPCPipe() const override; + + // Resource implementation + static void OnAfterWriteImpl(WriteWrap* w, void* ctx); + static void OnAllocImpl(size_t size, uv_buf_t* buf, void* ctx); + static void OnReadImpl(ssize_t nread, + const uv_buf_t* buf, + uv_handle_type pending, + void* ctx); + static void OnAfterWriteSelf(WriteWrap* w, void* ctx); + static void OnAllocSelf(size_t size, uv_buf_t* buf, void* ctx); + static void OnReadSelf(ssize_t nread, + const uv_buf_t* buf, + uv_handle_type pending, + void* ctx); + + void DoRead(ssize_t nread, const uv_buf_t* buf, uv_handle_type pending); + // If |msg| is not nullptr, caller is responsible for calling `delete[] *msg`. v8::Local GetSSLError(int status, int* err, const char** msg); @@ -125,10 +143,11 @@ class TLSCallbacks : public crypto::SSLWrap, crypto::SecureContext* sc_; v8::Persistent sc_handle_; + StreamBase* stream_; + v8::Persistent stream_handle_; BIO* enc_in_; BIO* enc_out_; NodeBIO* clear_in_; - uv_write_t write_req_; size_t write_size_; size_t write_queue_size_; typedef ListHead WriteItemList; diff --git a/src/tty_wrap.cc b/src/tty_wrap.cc index 08c50d911f..186f2f0100 100644 --- a/src/tty_wrap.cc +++ b/src/tty_wrap.cc @@ -36,26 +36,10 @@ void TTYWrap::Initialize(Handle target, t->SetClassName(FIXED_ONE_BYTE_STRING(env->isolate(), "TTY")); t->InstanceTemplate()->SetInternalFieldCount(1); - enum PropertyAttribute attributes = - static_cast(v8::ReadOnly | v8::DontDelete); - t->InstanceTemplate()->SetAccessor(env->fd_string(), - StreamWrap::GetFD, - nullptr, - Handle(), - v8::DEFAULT, - attributes); - env->SetProtoMethod(t, "close", HandleWrap::Close); env->SetProtoMethod(t, "unref", HandleWrap::Unref); - env->SetProtoMethod(t, "readStart", StreamWrap::ReadStart); - env->SetProtoMethod(t, "readStop", StreamWrap::ReadStop); - - env->SetProtoMethod(t, "writeBuffer", StreamWrap::WriteBuffer); - env->SetProtoMethod(t, "writeAsciiString", StreamWrap::WriteAsciiString); - env->SetProtoMethod(t, "writeUtf8String", StreamWrap::WriteUtf8String); - env->SetProtoMethod(t, "writeUcs2String", StreamWrap::WriteUcs2String); - env->SetProtoMethod(t, "writeBinaryString", StreamWrap::WriteBinaryString); + StreamWrap::AddMethods(env, t); env->SetProtoMethod(t, "getWindowSize", TTYWrap::GetWindowSize); env->SetProtoMethod(t, "setRawMode", SetRawMode); diff --git a/test/parallel/test-tls-client-default-ciphers.js b/test/parallel/test-tls-client-default-ciphers.js index 1eb74e6981..dfae4a7bb9 100644 --- a/test/parallel/test-tls-client-default-ciphers.js +++ b/test/parallel/test-tls-client-default-ciphers.js @@ -2,13 +2,21 @@ var assert = require('assert'); var common = require('../common'); var tls = require('tls'); +function Done() {} + function test1() { var ciphers = ''; + tls.createSecureContext = function(options) { - ciphers = options.ciphers + ciphers = options.ciphers; + throw new Done(); + } + + try { + var s = tls.connect(common.PORT); + } catch (e) { + assert(e instanceof Done); } - var s = tls.connect(common.PORT); - s.destroy(); assert.equal(ciphers, tls.DEFAULT_CIPHERS); } test1(); diff --git a/test/parallel/test-tls-close-notify.js b/test/parallel/test-tls-close-notify.js index 54f7314e2f..c5decad5e5 100644 --- a/test/parallel/test-tls-close-notify.js +++ b/test/parallel/test-tls-close-notify.js @@ -17,8 +17,8 @@ var server = tls.createServer({ cert: fs.readFileSync(common.fixturesDir + '/keys/agent1-cert.pem') }, function(c) { // Send close-notify without shutting down TCP socket - if (c.ssl.shutdown() !== 1) - c.ssl.shutdown(); + if (c._handle.shutdownSSL() !== 1) + c._handle.shutdownSSL(); }).listen(common.PORT, function() { var c = tls.connect(common.PORT, { rejectUnauthorized: false diff --git a/test/parallel/test-tls-multi-key.js b/test/parallel/test-tls-multi-key.js index cdf8500874..657d9084d4 100644 --- a/test/parallel/test-tls-multi-key.js +++ b/test/parallel/test-tls-multi-key.js @@ -28,15 +28,14 @@ var server = tls.createServer(options, function(conn) { ciphers: 'ECDHE-ECDSA-AES256-GCM-SHA384', rejectUnauthorized: false }, function() { + ciphers.push(ecdsa.getCipher()); var rsa = tls.connect(common.PORT, { ciphers: 'ECDHE-RSA-AES256-GCM-SHA384', rejectUnauthorized: false }, function() { + ciphers.push(rsa.getCipher()); ecdsa.destroy(); rsa.destroy(); - - ciphers.push(ecdsa.getCipher()); - ciphers.push(rsa.getCipher()); server.close(); }); });