Browse Source

stream_base: introduce StreamBase

StreamBase is an improved way to write C++ streams. The class itself is
for separting `StreamWrap` (with the methods like `.writeAsciiString`,
`.writeBuffer`, `.writev`, etc) from the `HandleWrap` class, making
possible to write abstract C++ streams that are not bound to any uv
socket.

The following methods are important part of the abstraction (which
mimics libuv's stream API):

* Events:
  * `OnAlloc(size_t size, uv_buf_t*)`
  * `OnRead(ssize_t nread, const uv_buf_t*, uv_handle_type pending)`
  * `OnAfterWrite(WriteWrap*)`
* Wrappers:
  * `DoShutdown(ShutdownWrap*)`
  * `DoTryWrite(uv_buf_t** bufs, size_t* count)`
  * `DoWrite(WriteWrap*, uv_buf_t*, size_t count, uv_stream_t* handle)`
  * `Error()`
  * `ClearError()`

The implementation should provide all of these methods, thus providing
the access to the underlying resource (be it uv handle, TLS socket, or
anything else).

A C++ stream may consume the input of another stream by replacing the
event callbacks and proxying the writes. This kind of API is actually
used now for the TLSWrap implementation, making it possible to wrap TLS
stream into another TLS stream. Thus legacy API calls are no longer
required in `_tls_wrap.js`.

PR-URL: https://github.com/iojs/io.js/pull/840
Reviewed-By: Trevor Norris <trev.norris@gmail.com>
Reviewed-By: Chris Dickinson <christopher.s.dickinson@gmail.com>
v1.8.0-commit
Fedor Indutny 10 years ago
parent
commit
b9686233fc
  1. 4
      lib/_tls_legacy.js
  2. 295
      lib/_tls_wrap.js
  3. 4
      lib/net.js
  4. 2
      node.gyp
  5. 2
      src/env.h
  6. 21
      src/js_stream.cc
  7. 20
      src/js_stream.h
  8. 22
      src/node_crypto.cc
  9. 19
      src/node_wrap.h
  10. 21
      src/pipe_wrap.cc
  11. 495
      src/stream_base.cc
  12. 223
      src/stream_base.h
  13. 656
      src/stream_wrap.cc
  14. 164
      src/stream_wrap.h
  15. 20
      src/tcp_wrap.cc
  16. 290
      src/tls_wrap.cc
  17. 69
      src/tls_wrap.h
  18. 18
      src/tty_wrap.cc
  19. 12
      test/parallel/test-tls-client-default-ciphers.js
  20. 4
      test/parallel/test-tls-close-notify.js
  21. 5
      test/parallel/test-tls-multi-key.js

4
lib/_tls_legacy.js

@ -92,11 +92,11 @@ function onCryptoStreamFinish() {
// Generate close notify
// NOTE: first call checks if client has sent us shutdown,
// second call enqueues shutdown into the BIO.
if (this.pair.ssl.shutdown() !== 1) {
if (this.pair.ssl.shutdownSSL() !== 1) {
if (this.pair.ssl && this.pair.ssl.error)
return this.pair.error();
this.pair.ssl.shutdown();
this.pair.ssl.shutdownSSL();
}
if (this.pair.ssl && this.pair.ssl.error)

295
lib/_tls_wrap.js

@ -11,14 +11,23 @@ const debug = util.debuglog('tls');
const Timer = process.binding('timer_wrap').Timer;
const tls_wrap = process.binding('tls_wrap');
// Lazy load
var tls_legacy;
// constructor for lazy loading
function createTCP() {
var TCP = process.binding('tcp_wrap').TCP;
return new TCP();
}
// constructor for lazy loading
function createPipe() {
var Pipe = process.binding('pipe_wrap').Pipe;
return new Pipe();
}
function onhandshakestart() {
debug('onhandshakestart');
var self = this;
var ssl = self.ssl;
var ssl = self._handle;
var now = Timer.now();
assert(now >= ssl.lastHandshakeTime);
@ -63,7 +72,7 @@ function loadSession(self, hello, cb) {
// NOTE: That we have disabled OpenSSL's internal session storage in
// `node_crypto.cc` and hence its safe to rely on getting servername only
// from clienthello or this place.
var ret = self.ssl.loadSession(session);
var ret = self._handle.loadSession(session);
cb(null, ret);
}
@ -92,9 +101,9 @@ function loadSNI(self, servername, cb) {
// TODO(indutny): eventually disallow raw `SecureContext`
if (context)
self.ssl.sni_context = context.context || context;
self._handle.sni_context = context.context || context;
cb(null, self.ssl.sni_context);
cb(null, self._handle.sni_context);
});
}
@ -127,7 +136,7 @@ function requestOCSP(self, hello, ctx, cb) {
return cb(err);
if (response)
self.ssl.setOCSPResponse(response);
self._handle.setOCSPResponse(response);
cb(null);
}
}
@ -161,7 +170,7 @@ function onclienthello(hello) {
if (err)
return self.destroy(err);
self.ssl.endParser();
self._handle.endParser();
});
});
});
@ -184,7 +193,7 @@ function onnewsession(key, session) {
return;
once = true;
self.ssl.newSessionDone();
self._handle.newSessionDone();
self._newSessionPending = false;
if (self._securePending)
@ -204,29 +213,12 @@ function onocspresponse(resp) {
*/
function TLSSocket(socket, options) {
// Disallow wrapping TLSSocket in TLSSocket
assert(!(socket instanceof TLSSocket));
net.Socket.call(this, {
handle: socket && socket._handle,
allowHalfOpen: socket && socket.allowHalfOpen,
readable: false,
writable: false
});
if (socket) {
this._parent = socket;
// To prevent assertion in afterConnect()
this._connecting = socket._connecting;
}
this._tlsOptions = options;
this._secureEstablished = false;
this._securePending = false;
this._newSessionPending = false;
this._controlReleased = false;
this._SNICallback = null;
this.ssl = null;
this.servername = null;
this.npnProtocol = null;
this.authorized = false;
@ -236,15 +228,19 @@ function TLSSocket(socket, options) {
// distinguishable from regular ones.
this.encrypted = true;
net.Socket.call(this, {
handle: this._wrapHandle(socket && socket._handle),
allowHalfOpen: socket && socket.allowHalfOpen,
readable: false,
writable: false
});
// Proxy for API compatibility
this.ssl = this._handle;
this.on('error', this._tlsError);
if (!this._handle) {
this.once('connect', function() {
this._init(null);
});
} else {
this._init(socket);
}
// Make sure to setup all required properties like: `_connecting` before
// starting the flow of the data
@ -255,23 +251,53 @@ function TLSSocket(socket, options) {
util.inherits(TLSSocket, net.Socket);
exports.TLSSocket = TLSSocket;
TLSSocket.prototype._init = function(socket) {
assert(this._handle);
var proxiedMethods = [
'close', 'ref', 'unref', 'open', 'bind', 'listen', 'connect', 'bind6',
'connect6', 'getsockname', 'getpeername', 'setNoDelay', 'setKeepAlive',
'setSimultaneousAccepts', 'setBlocking',
// lib/net.js expect this value to be non-zero if write hasn't been flushed
// immediately
// TODO(indutny): rewise this solution, it might be 1 before handshake and
// represent real writeQueueSize during regular writes.
this._handle.writeQueueSize = 1;
// PipeWrap
'setPendingInstances'
];
TLSSocket.prototype._wrapHandle = function(handle) {
var res;
var self = this;
var options = this._tlsOptions;
if (!handle) {
handle = options.pipe ? createPipe() : createTCP();
handle.owner = this;
}
// Wrap socket's handle
var context = options.secureContext ||
options.credentials ||
tls.createSecureContext();
this.ssl = tls_wrap.wrap(this._handle, context.context, options.isServer);
res = tls_wrap.wrap(handle, context.context, options.isServer);
res._parent = handle;
res._reading = handle._reading;
// Proxy HandleWrap, PipeWrap and TCPWrap methods
proxiedMethods.forEach(function(name) {
res[name] = function methodProxy() {
return handle[name].apply(handle, arguments);
};
});
return res;
};
TLSSocket.prototype._init = function(socket) {
var self = this;
var options = this._tlsOptions;
var ssl = this._handle;
// lib/net.js expect this value to be non-zero if write hasn't been flushed
// immediately
// TODO(indutny): rewise this solution, it might be 1 before handshake and
// represent real writeQueueSize during regular writes.
ssl.writeQueueSize = 1;
this.server = options.server || null;
// For clients, we will always have either a given ca list or be using
@ -282,32 +308,32 @@ TLSSocket.prototype._init = function(socket) {
this._requestCert = requestCert;
this._rejectUnauthorized = rejectUnauthorized;
if (requestCert || rejectUnauthorized)
this.ssl.setVerifyMode(requestCert, rejectUnauthorized);
ssl.setVerifyMode(requestCert, rejectUnauthorized);
if (options.isServer) {
this.ssl.onhandshakestart = onhandshakestart.bind(this);
this.ssl.onhandshakedone = onhandshakedone.bind(this);
this.ssl.onclienthello = onclienthello.bind(this);
this.ssl.onnewsession = onnewsession.bind(this);
this.ssl.lastHandshakeTime = 0;
this.ssl.handshakes = 0;
ssl.onhandshakestart = onhandshakestart.bind(this);
ssl.onhandshakedone = onhandshakedone.bind(this);
ssl.onclienthello = onclienthello.bind(this);
ssl.onnewsession = onnewsession.bind(this);
ssl.lastHandshakeTime = 0;
ssl.handshakes = 0;
if (this.server &&
(listenerCount(this.server, 'resumeSession') > 0 ||
listenerCount(this.server, 'newSession') > 0 ||
listenerCount(this.server, 'OCSPRequest') > 0)) {
this.ssl.enableSessionCallbacks();
ssl.enableSessionCallbacks();
}
} else {
this.ssl.onhandshakestart = function() {};
this.ssl.onhandshakedone = this._finishInit.bind(this);
this.ssl.onocspresponse = onocspresponse.bind(this);
ssl.onhandshakestart = function() {};
ssl.onhandshakedone = this._finishInit.bind(this);
ssl.onocspresponse = onocspresponse.bind(this);
if (options.session)
this.ssl.setSession(options.session);
ssl.setSession(options.session);
}
this.ssl.onerror = function(err) {
ssl.onerror = function(err) {
if (self._writableState.errorEmitted)
return;
self._writableState.errorEmitted = true;
@ -337,11 +363,11 @@ TLSSocket.prototype._init = function(socket) {
options.server._contexts.length)) {
assert(typeof options.SNICallback === 'function');
this._SNICallback = options.SNICallback;
this.ssl.enableHelloParser();
ssl.enableHelloParser();
}
if (process.features.tls_npn && options.NPNProtocols)
this.ssl.setNPNProtocols(options.NPNProtocols);
ssl.setNPNProtocols(options.NPNProtocols);
if (options.handshakeTimeout > 0)
this.setTimeout(options.handshakeTimeout, this._handleTimeout);
@ -350,8 +376,23 @@ TLSSocket.prototype._init = function(socket) {
if (socket && socket._readableState.length) {
var buf;
while ((buf = socket.read()) !== null)
this.ssl.receive(buf);
ssl.receive(buf);
}
if (socket) {
this._parent = socket;
// To prevent assertion in afterConnect() and properly kick off readStart
this._connecting = socket._connecting;
socket.once('connect', function() {
self._connecting = false;
self.emit('connect');
});
}
// Assume `tls.connect()`
if (!socket)
this._connecting = true;
};
TLSSocket.prototype.renegotiate = function(options, callback) {
@ -365,11 +406,11 @@ TLSSocket.prototype.renegotiate = function(options, callback) {
if (requestCert !== this._requestCert ||
rejectUnauthorized !== this._rejectUnauthorized) {
this.ssl.setVerifyMode(requestCert, rejectUnauthorized);
this._handle.setVerifyMode(requestCert, rejectUnauthorized);
this._requestCert = requestCert;
this._rejectUnauthorized = rejectUnauthorized;
}
if (!this.ssl.renegotiate()) {
if (!this._handle.renegotiate()) {
if (callback) {
process.nextTick(function() {
callback(new Error('Failed to renegotiate'));
@ -391,11 +432,11 @@ TLSSocket.prototype.renegotiate = function(options, callback) {
};
TLSSocket.prototype.setMaxSendFragment = function setMaxSendFragment(size) {
return this.ssl.setMaxSendFragment(size) == 1;
return this._handle.setMaxSendFragment(size) == 1;
};
TLSSocket.prototype.getTLSTicket = function getTLSTicket() {
return this.ssl.getTLSTicket();
return this._handle.getTLSTicket();
};
TLSSocket.prototype._handleTimeout = function() {
@ -424,11 +465,11 @@ TLSSocket.prototype._finishInit = function() {
}
if (process.features.tls_npn) {
this.npnProtocol = this.ssl.getNegotiatedProtocol();
this.npnProtocol = this._handle.getNegotiatedProtocol();
}
if (process.features.tls_sni && this._tlsOptions.isServer) {
this.servername = this.ssl.getServername();
this.servername = this._handle.getServername();
}
debug('secure established');
@ -439,49 +480,56 @@ TLSSocket.prototype._finishInit = function() {
};
TLSSocket.prototype._start = function() {
if (this._connecting) {
this.once('connect', function() {
this._start();
});
return;
}
if (this._tlsOptions.requestOCSP)
this.ssl.requestOCSP();
this.ssl.start();
this._handle.requestOCSP();
this._handle.start();
};
TLSSocket.prototype.setServername = function(name) {
this.ssl.setServername(name);
this._handle.setServername(name);
};
TLSSocket.prototype.setSession = function(session) {
if (typeof session === 'string')
session = new Buffer(session, 'binary');
this.ssl.setSession(session);
this._handle.setSession(session);
};
TLSSocket.prototype.getPeerCertificate = function(detailed) {
if (this.ssl) {
if (this._handle) {
return common.translatePeerCertificate(
this.ssl.getPeerCertificate(detailed));
this._handle.getPeerCertificate(detailed));
}
return null;
};
TLSSocket.prototype.getSession = function() {
if (this.ssl) {
return this.ssl.getSession();
if (this._handle) {
return this._handle.getSession();
}
return null;
};
TLSSocket.prototype.isSessionReused = function() {
if (this.ssl) {
return this.ssl.isSessionReused();
if (this._handle) {
return this._handle.isSessionReused();
}
return null;
};
TLSSocket.prototype.getCipher = function(err) {
if (this.ssl) {
return this.ssl.getCurrentCipher();
if (this._handle) {
return this._handle.getCurrentCipher();
} else {
return null;
}
@ -620,7 +668,7 @@ function Server(/* [options], listener */) {
socket.on('secure', function() {
if (socket._requestCert) {
var verifyError = socket.ssl.verifyError();
var verifyError = socket._handle.verifyError();
if (verifyError) {
socket.authorizationError = verifyError.code;
@ -775,28 +823,6 @@ function normalizeConnectArgs(listArgs) {
return (cb) ? [options, cb] : [options];
}
function legacyConnect(hostname, options, NPN, context) {
assert(options.socket);
if (!tls_legacy)
tls_legacy = require('_tls_legacy');
var pair = tls_legacy.createSecurePair(context,
false,
true,
!!options.rejectUnauthorized,
{
NPNProtocols: NPN.NPNProtocols,
servername: hostname
});
tls_legacy.pipe(pair, options.socket);
pair.cleartext._controlReleased = true;
pair.on('error', function(err) {
pair.cleartext.emit('error', err);
});
return pair;
}
exports.connect = function(/* [port, host], options, cb */) {
var args = normalizeConnectArgs(arguments);
var options = args[0];
@ -819,19 +845,8 @@ exports.connect = function(/* [port, host], options, cb */) {
context = tls.createSecureContext(options);
tls.convertNPNProtocols(options.NPNProtocols, NPN);
// Wrapping TLS socket inside another TLS socket was requested -
// create legacy secure pair
var socket;
var legacy;
var result;
if (options.socket instanceof TLSSocket) {
debug('legacy connect');
legacy = true;
socket = legacyConnect(hostname, options, NPN, context);
result = socket.cleartext;
} else {
legacy = false;
socket = new TLSSocket(options.socket, {
var socket = new TLSSocket(options.socket, {
pipe: options.path && !options.port,
secureContext: context,
isServer: false,
requestCert: true,
@ -840,30 +855,11 @@ exports.connect = function(/* [port, host], options, cb */) {
NPNProtocols: NPN.NPNProtocols,
requestOCSP: options.requestOCSP
});
result = socket;
}
if (socket._handle && !socket._connecting) {
onHandle();
} else {
// Not even started connecting yet (or probably resolving dns address),
// catch socket errors and assign handle.
if (!legacy && options.socket) {
options.socket.once('connect', function() {
assert(options.socket._handle);
socket._handle = options.socket._handle;
socket._handle.owner = socket;
socket.emit('connect');
});
}
socket.once('connect', onHandle);
}
if (cb)
result.once('secureConnect', cb);
socket.once('secureConnect', cb);
if (!options.socket) {
assert(!legacy);
var connect_opt;
if (options.path && !options.port) {
connect_opt = { path: options.path };
@ -874,51 +870,49 @@ exports.connect = function(/* [port, host], options, cb */) {
localAddress: options.localAddress
};
}
socket.connect(connect_opt);
socket.connect(connect_opt, function() {
socket._start();
});
}
return result;
function onHandle() {
if (!legacy)
socket._releaseControl();
if (options.session)
socket.setSession(options.session);
if (!legacy) {
if (options.servername)
socket.setServername(options.servername);
if (options.socket)
socket._start();
}
socket.on('secure', function() {
var verifyError = socket.ssl.verifyError();
var verifyError = socket._handle.verifyError();
// Verify that server's identity matches it's certificate's names
if (!verifyError) {
var cert = result.getPeerCertificate();
var cert = socket.getPeerCertificate();
verifyError = options.checkServerIdentity(hostname, cert);
}
if (verifyError) {
result.authorized = false;
result.authorizationError = verifyError.code || verifyError.message;
socket.authorized = false;
socket.authorizationError = verifyError.code || verifyError.message;
if (options.rejectUnauthorized) {
result.emit('error', verifyError);
result.destroy();
socket.emit('error', verifyError);
socket.destroy();
return;
} else {
result.emit('secureConnect');
socket.emit('secureConnect');
}
} else {
result.authorized = true;
result.emit('secureConnect');
socket.authorized = true;
socket.emit('secureConnect');
}
// Uncork incoming data
result.removeListener('end', onHangUp);
socket.removeListener('end', onHangUp);
});
function onHangUp() {
@ -931,6 +925,7 @@ exports.connect = function(/* [port, host], options, cb */) {
socket.emit('error', error);
}
}
result.once('end', onHangUp);
}
socket.once('end', onHangUp);
return socket;
};

4
lib/net.js

@ -961,7 +961,9 @@ function afterConnect(status, handle, req, readable, writable) {
return;
}
assert(handle === self._handle, 'handle != self._handle');
// Update handle if it was wrapped
// TODO(indutny): assert that the handle is actually an ancestor of old one
handle = self._handle;
debug('afterConnect');

2
node.gyp

@ -115,6 +115,7 @@
'src/smalloc.cc',
'src/spawn_sync.cc',
'src/string_bytes.cc',
'src/stream_base.cc',
'src/stream_wrap.cc',
'src/tcp_wrap.cc',
'src/timer_wrap.cc',
@ -151,6 +152,7 @@
'src/req-wrap.h',
'src/req-wrap-inl.h',
'src/string_bytes.h',
'src/stream_base.h',
'src/stream_wrap.h',
'src/tree.h',
'src/util.h',

2
src/env.h

@ -234,8 +234,10 @@ namespace node {
V(tcp_constructor_template, v8::FunctionTemplate) \
V(tick_callback_function, v8::Function) \
V(tls_wrap_constructor_function, v8::Function) \
V(tls_wrap_constructor_template, v8::FunctionTemplate) \
V(tty_constructor_template, v8::FunctionTemplate) \
V(udp_constructor_function, v8::Function) \
V(write_wrap_constructor_function, v8::Function) \
class Environment;

21
src/js_stream.cc

@ -0,0 +1,21 @@
#include "js_stream.h"
#include "async-wrap.h"
#include "env.h"
#include "env-inl.h"
#include "stream_base.h"
#include "v8.h"
namespace node {
using v8::Context;
using v8::Handle;
using v8::Object;
using v8::Value;
void JSStream::Initialize(Handle<Object> target,
Handle<Value> unused,
Handle<Context> context) {
}
} // namespace node

20
src/js_stream.h

@ -0,0 +1,20 @@
#ifndef SRC_JS_STREAM_H_
#define SRC_JS_STREAM_H_
#include "async-wrap.h"
#include "env.h"
#include "stream_base.h"
#include "v8.h"
namespace node {
class JSStream : public StreamBase {
public:
static void Initialize(v8::Handle<v8::Object> target,
v8::Handle<v8::Value> unused,
v8::Handle<v8::Context> context);
};
} // namespace node
#endif // SRC_JS_STREAM_H_

22
src/node_crypto.cc

@ -3,7 +3,7 @@
#include "node_crypto.h"
#include "node_crypto_bio.h"
#include "node_crypto_groups.h"
#include "tls_wrap.h" // TLSCallbacks
#include "tls_wrap.h" // TLSWrap
#include "async-wrap.h"
#include "async-wrap-inl.h"
@ -98,28 +98,28 @@ const char* const root_certs[] = {
X509_STORE* root_cert_store;
// Just to generate static methods
template class SSLWrap<TLSCallbacks>;
template void SSLWrap<TLSCallbacks>::AddMethods(Environment* env,
template class SSLWrap<TLSWrap>;
template void SSLWrap<TLSWrap>::AddMethods(Environment* env,
Handle<FunctionTemplate> t);
template void SSLWrap<TLSCallbacks>::InitNPN(SecureContext* sc);
template SSL_SESSION* SSLWrap<TLSCallbacks>::GetSessionCallback(
template void SSLWrap<TLSWrap>::InitNPN(SecureContext* sc);
template SSL_SESSION* SSLWrap<TLSWrap>::GetSessionCallback(
SSL* s,
unsigned char* key,
int len,
int* copy);
template int SSLWrap<TLSCallbacks>::NewSessionCallback(SSL* s,
template int SSLWrap<TLSWrap>::NewSessionCallback(SSL* s,
SSL_SESSION* sess);
template void SSLWrap<TLSCallbacks>::OnClientHello(
template void SSLWrap<TLSWrap>::OnClientHello(
void* arg,
const ClientHelloParser::ClientHello& hello);
#ifdef OPENSSL_NPN_NEGOTIATED
template int SSLWrap<TLSCallbacks>::AdvertiseNextProtoCallback(
template int SSLWrap<TLSWrap>::AdvertiseNextProtoCallback(
SSL* s,
const unsigned char** data,
unsigned int* len,
void* arg);
template int SSLWrap<TLSCallbacks>::SelectNextProtoCallback(
template int SSLWrap<TLSWrap>::SelectNextProtoCallback(
SSL* s,
unsigned char** out,
unsigned char* outlen,
@ -127,7 +127,7 @@ template int SSLWrap<TLSCallbacks>::SelectNextProtoCallback(
unsigned int inlen,
void* arg);
#endif
template int SSLWrap<TLSCallbacks>::TLSExtStatusCallback(SSL* s, void* arg);
template int SSLWrap<TLSWrap>::TLSExtStatusCallback(SSL* s, void* arg);
static void crypto_threadid_cb(CRYPTO_THREADID* tid) {
@ -973,7 +973,7 @@ void SSLWrap<Base>::AddMethods(Environment* env, Handle<FunctionTemplate> t) {
env->SetProtoMethod(t, "getCurrentCipher", GetCurrentCipher);
env->SetProtoMethod(t, "endParser", EndParser);
env->SetProtoMethod(t, "renegotiate", Renegotiate);
env->SetProtoMethod(t, "shutdown", Shutdown);
env->SetProtoMethod(t, "shutdownSSL", Shutdown);
env->SetProtoMethod(t, "getTLSTicket", GetTLSTicket);
env->SetProtoMethod(t, "newSessionDone", NewSessionDone);
env->SetProtoMethod(t, "setOCSPResponse", SetOCSPResponse);

19
src/node_wrap.h

@ -14,7 +14,7 @@
namespace node {
#define WITH_GENERIC_STREAM(env, obj, BODY) \
#define WITH_GENERIC_UV_STREAM(env, obj, BODY, ELSE) \
do { \
if (env->tcp_constructor_template().IsEmpty() == false && \
env->tcp_constructor_template()->HasInstance(obj)) { \
@ -28,16 +28,29 @@ namespace node {
env->pipe_constructor_template()->HasInstance(obj)) { \
PipeWrap* const wrap = Unwrap<PipeWrap>(obj); \
BODY \
} else { \
ELSE \
} \
} while (0)
#define WITH_GENERIC_STREAM(env, obj, BODY) \
do { \
WITH_GENERIC_UV_STREAM(env, obj, BODY, { \
if (env->tls_wrap_constructor_template().IsEmpty() == false && \
env->tls_wrap_constructor_template()->HasInstance(obj)) { \
TLSWrap* const wrap = Unwrap<TLSWrap>(obj); \
BODY \
} \
}); \
} while (0)
inline uv_stream_t* HandleToStream(Environment* env,
v8::Local<v8::Object> obj) {
v8::HandleScope scope(env->isolate());
WITH_GENERIC_STREAM(env, obj, {
WITH_GENERIC_UV_STREAM(env, obj, {
return reinterpret_cast<uv_stream_t*>(wrap->UVHandle());
});
}, {});
return nullptr;
}

21
src/pipe_wrap.cc

@ -77,30 +77,11 @@ void PipeWrap::Initialize(Handle<Object> target,
t->SetClassName(FIXED_ONE_BYTE_STRING(env->isolate(), "Pipe"));
t->InstanceTemplate()->SetInternalFieldCount(1);
enum PropertyAttribute attributes =
static_cast<PropertyAttribute>(v8::ReadOnly | v8::DontDelete);
t->InstanceTemplate()->SetAccessor(env->fd_string(),
StreamWrap::GetFD,
nullptr,
Handle<Value>(),
v8::DEFAULT,
attributes);
env->SetProtoMethod(t, "close", HandleWrap::Close);
env->SetProtoMethod(t, "unref", HandleWrap::Unref);
env->SetProtoMethod(t, "ref", HandleWrap::Ref);
env->SetProtoMethod(t, "setBlocking", StreamWrap::SetBlocking);
env->SetProtoMethod(t, "readStart", StreamWrap::ReadStart);
env->SetProtoMethod(t, "readStop", StreamWrap::ReadStop);
env->SetProtoMethod(t, "shutdown", StreamWrap::Shutdown);
env->SetProtoMethod(t, "writeBuffer", StreamWrap::WriteBuffer);
env->SetProtoMethod(t, "writeAsciiString", StreamWrap::WriteAsciiString);
env->SetProtoMethod(t, "writeUtf8String", StreamWrap::WriteUtf8String);
env->SetProtoMethod(t, "writeUcs2String", StreamWrap::WriteUcs2String);
env->SetProtoMethod(t, "writeBinaryString", StreamWrap::WriteBinaryString);
StreamWrap::AddMethods(env, t);
env->SetProtoMethod(t, "bind", Bind);
env->SetProtoMethod(t, "listen", Listen);

495
src/stream_base.cc

@ -0,0 +1,495 @@
#include "stream_base.h"
#include "stream_wrap.h"
#include "node.h"
#include "node_buffer.h"
#include "env.h"
#include "env-inl.h"
#include "string_bytes.h"
#include "tls_wrap.h"
#include "util.h"
#include "util-inl.h"
#include "v8.h"
#include <limits.h> // INT_MAX
namespace node {
using v8::Array;
using v8::Context;
using v8::FunctionCallbackInfo;
using v8::FunctionTemplate;
using v8::Handle;
using v8::HandleScope;
using v8::Integer;
using v8::Local;
using v8::Number;
using v8::Object;
using v8::PropertyAttribute;
using v8::PropertyCallbackInfo;
using v8::String;
using v8::Value;
template void StreamBase::AddMethods<StreamWrap>(Environment* env,
Handle<FunctionTemplate> t);
template void StreamBase::AddMethods<TLSWrap>(Environment* env,
Handle<FunctionTemplate> t);
template <class Base>
void StreamBase::AddMethods(Environment* env, Handle<FunctionTemplate> t) {
HandleScope scope(env->isolate());
enum PropertyAttribute attributes =
static_cast<PropertyAttribute>(v8::ReadOnly | v8::DontDelete);
t->InstanceTemplate()->SetAccessor(env->fd_string(),
GetFD<Base>,
nullptr,
Handle<Value>(),
v8::DEFAULT,
attributes);
env->SetProtoMethod(t, "readStart", JSMethod<Base, &StreamBase::ReadStart>);
env->SetProtoMethod(t, "readStop", JSMethod<Base, &StreamBase::ReadStop>);
env->SetProtoMethod(t, "shutdown", JSMethod<Base, &StreamBase::Shutdown>);
env->SetProtoMethod(t, "writev", JSMethod<Base, &StreamBase::Writev>);
env->SetProtoMethod(t,
"writeBuffer",
JSMethod<Base, &StreamBase::WriteBuffer>);
env->SetProtoMethod(t,
"writeAsciiString",
JSMethod<Base, &StreamBase::WriteString<ASCII> >);
env->SetProtoMethod(t,
"writeUtf8String",
JSMethod<Base, &StreamBase::WriteString<UTF8> >);
env->SetProtoMethod(t,
"writeUcs2String",
JSMethod<Base, &StreamBase::WriteString<UCS2> >);
env->SetProtoMethod(t,
"writeBinaryString",
JSMethod<Base, &StreamBase::WriteString<BINARY> >);
}
template <class Base>
void StreamBase::GetFD(Local<String> key,
const PropertyCallbackInfo<Value>& args) {
StreamBase* wrap = Unwrap<Base>(args.Holder());
if (!wrap->IsAlive())
return args.GetReturnValue().Set(UV_EINVAL);
args.GetReturnValue().Set(wrap->GetFD());
}
template <class Base,
int (StreamBase::*Method)(const FunctionCallbackInfo<Value>& args)>
void StreamBase::JSMethod(const FunctionCallbackInfo<Value>& args) {
StreamBase* wrap = Unwrap<Base>(args.Holder());
if (!wrap->IsAlive())
return args.GetReturnValue().Set(UV_EINVAL);
args.GetReturnValue().Set((wrap->*Method)(args));
}
int StreamBase::ReadStart(const FunctionCallbackInfo<Value>& args) {
return ReadStart();
}
int StreamBase::ReadStop(const FunctionCallbackInfo<Value>& args) {
return ReadStop();
}
int StreamBase::Shutdown(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
CHECK(args[0]->IsObject());
Local<Object> req_wrap_obj = args[0].As<Object>();
ShutdownWrap* req_wrap = new ShutdownWrap(env,
req_wrap_obj,
this,
AfterShutdown);
int err = DoShutdown(req_wrap);
req_wrap->Dispatched();
if (err)
delete req_wrap;
return err;
}
void StreamBase::AfterShutdown(ShutdownWrap* req_wrap, int status) {
StreamBase* wrap = req_wrap->wrap();
Environment* env = req_wrap->env();
// The wrap and request objects should still be there.
CHECK_EQ(req_wrap->persistent().IsEmpty(), false);
CHECK_EQ(wrap->GetAsyncWrap()->persistent().IsEmpty(), false);
HandleScope handle_scope(env->isolate());
Context::Scope context_scope(env->context());
Local<Object> req_wrap_obj = req_wrap->object();
Local<Value> argv[3] = {
Integer::New(env->isolate(), status),
wrap->GetAsyncWrap()->object(),
req_wrap_obj
};
if (req_wrap->object()->Has(env->oncomplete_string()))
req_wrap->MakeCallback(env->oncomplete_string(), ARRAY_SIZE(argv), argv);
delete req_wrap;
}
int StreamBase::Writev(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
CHECK(args[0]->IsObject());
CHECK(args[1]->IsArray());
Local<Object> req_wrap_obj = args[0].As<Object>();
Local<Array> chunks = args[1].As<Array>();
size_t count = chunks->Length() >> 1;
uv_buf_t bufs_[16];
uv_buf_t* bufs = bufs_;
// Determine storage size first
size_t storage_size = 0;
for (size_t i = 0; i < count; i++) {
Handle<Value> chunk = chunks->Get(i * 2);
if (Buffer::HasInstance(chunk))
continue;
// Buffer chunk, no additional storage required
// String chunk
Handle<String> string = chunk->ToString(env->isolate());
enum encoding encoding = ParseEncoding(env->isolate(),
chunks->Get(i * 2 + 1));
size_t chunk_size;
if (encoding == UTF8 && string->Length() > 65535)
chunk_size = StringBytes::Size(env->isolate(), string, encoding);
else
chunk_size = StringBytes::StorageSize(env->isolate(), string, encoding);
storage_size += chunk_size + 15;
}
if (storage_size > INT_MAX)
return UV_ENOBUFS;
if (ARRAY_SIZE(bufs_) < count)
bufs = new uv_buf_t[count];
storage_size += sizeof(WriteWrap);
char* storage = new char[storage_size];
WriteWrap* req_wrap =
new(storage) WriteWrap(env, req_wrap_obj, this, AfterWrite);
uint32_t bytes = 0;
size_t offset = sizeof(WriteWrap);
for (size_t i = 0; i < count; i++) {
Handle<Value> chunk = chunks->Get(i * 2);
// Write buffer
if (Buffer::HasInstance(chunk)) {
bufs[i].base = Buffer::Data(chunk);
bufs[i].len = Buffer::Length(chunk);
bytes += bufs[i].len;
continue;
}
// Write string
offset = ROUND_UP(offset, 16);
CHECK_LT(offset, storage_size);
char* str_storage = storage + offset;
size_t str_size = storage_size - offset;
Handle<String> string = chunk->ToString(env->isolate());
enum encoding encoding = ParseEncoding(env->isolate(),
chunks->Get(i * 2 + 1));
str_size = StringBytes::Write(env->isolate(),
str_storage,
str_size,
string,
encoding);
bufs[i].base = str_storage;
bufs[i].len = str_size;
offset += str_size;
bytes += str_size;
}
int err = DoWrite(req_wrap, bufs, count, nullptr);
// Deallocate space
if (bufs != bufs_)
delete[] bufs;
req_wrap->Dispatched();
req_wrap->object()->Set(env->async(), True(env->isolate()));
req_wrap->object()->Set(env->bytes_string(),
Number::New(env->isolate(), bytes));
const char* msg = Error();
if (msg != nullptr) {
req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg));
ClearError();
}
if (err) {
req_wrap->~WriteWrap();
delete[] storage;
}
return err;
}
int StreamBase::WriteBuffer(const FunctionCallbackInfo<Value>& args) {
CHECK(args[0]->IsObject());
CHECK(Buffer::HasInstance(args[1]));
Environment* env = Environment::GetCurrent(args);
Local<Object> req_wrap_obj = args[0].As<Object>();
const char* data = Buffer::Data(args[1]);
size_t length = Buffer::Length(args[1]);
char* storage;
WriteWrap* req_wrap;
uv_buf_t buf;
buf.base = const_cast<char*>(data);
buf.len = length;
// Try writing immediately without allocation
uv_buf_t* bufs = &buf;
size_t count = 1;
int err = DoTryWrite(&bufs, &count);
if (err != 0)
goto done;
if (count == 0)
goto done;
CHECK_EQ(count, 1);
// Allocate, or write rest
storage = new char[sizeof(WriteWrap)];
req_wrap = new(storage) WriteWrap(env, req_wrap_obj, this, AfterWrite);
err = DoWrite(req_wrap, bufs, count, nullptr);
req_wrap->Dispatched();
req_wrap_obj->Set(env->async(), True(env->isolate()));
if (err) {
req_wrap->~WriteWrap();
delete[] storage;
}
done:
const char* msg = Error();
if (msg != nullptr) {
req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg));
ClearError();
}
req_wrap_obj->Set(env->bytes_string(),
Integer::NewFromUnsigned(env->isolate(), length));
return err;
}
template <enum encoding enc>
int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
CHECK(args[0]->IsObject());
CHECK(args[1]->IsString());
Local<Object> req_wrap_obj = args[0].As<Object>();
Local<String> string = args[1].As<String>();
Local<Object> send_handle_obj;
if (args[2]->IsObject())
send_handle_obj = args[2].As<Object>();
int err;
// Compute the size of the storage that the string will be flattened into.
// For UTF8 strings that are very long, go ahead and take the hit for
// computing their actual size, rather than tripling the storage.
size_t storage_size;
if (enc == UTF8 && string->Length() > 65535)
storage_size = StringBytes::Size(env->isolate(), string, enc);
else
storage_size = StringBytes::StorageSize(env->isolate(), string, enc);
if (storage_size > INT_MAX)
return UV_ENOBUFS;
// Try writing immediately if write size isn't too big
char* storage;
WriteWrap* req_wrap;
char* data;
char stack_storage[16384]; // 16kb
size_t data_size;
uv_buf_t buf;
bool try_write = storage_size + 15 <= sizeof(stack_storage) &&
(!IsIPCPipe() || send_handle_obj.IsEmpty());
if (try_write) {
data_size = StringBytes::Write(env->isolate(),
stack_storage,
storage_size,
string,
enc);
buf = uv_buf_init(stack_storage, data_size);
uv_buf_t* bufs = &buf;
size_t count = 1;
err = DoTryWrite(&bufs, &count);
// Failure
if (err != 0)
goto done;
// Success
if (count == 0)
goto done;
// Partial write
CHECK_EQ(count, 1);
}
storage = new char[sizeof(WriteWrap) + storage_size + 15];
req_wrap = new(storage) WriteWrap(env, req_wrap_obj, this, AfterWrite);
data = reinterpret_cast<char*>(ROUND_UP(
reinterpret_cast<uintptr_t>(storage) + sizeof(WriteWrap), 16));
if (try_write) {
// Copy partial data
memcpy(data, buf.base, buf.len);
data_size = buf.len;
} else {
// Write it
data_size = StringBytes::Write(env->isolate(),
data,
storage_size,
string,
enc);
}
CHECK_LE(data_size, storage_size);
buf = uv_buf_init(data, data_size);
if (!IsIPCPipe()) {
err = DoWrite(req_wrap, &buf, 1, nullptr);
} else {
uv_handle_t* send_handle = nullptr;
if (!send_handle_obj.IsEmpty()) {
HandleWrap* wrap = Unwrap<HandleWrap>(send_handle_obj);
send_handle = wrap->GetHandle();
// Reference StreamWrap instance to prevent it from being garbage
// collected before `AfterWrite` is called.
CHECK_EQ(false, req_wrap->persistent().IsEmpty());
req_wrap->object()->Set(env->handle_string(), send_handle_obj);
}
err = DoWrite(
req_wrap,
&buf,
1,
reinterpret_cast<uv_stream_t*>(send_handle));
}
req_wrap->Dispatched();
req_wrap->object()->Set(env->async(), True(env->isolate()));
if (err) {
req_wrap->~WriteWrap();
delete[] storage;
}
done:
const char* msg = Error();
if (msg != nullptr) {
req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg));
ClearError();
}
req_wrap_obj->Set(env->bytes_string(),
Integer::NewFromUnsigned(env->isolate(), data_size));
return err;
}
void StreamBase::AfterWrite(WriteWrap* req_wrap, int status) {
StreamBase* wrap = req_wrap->wrap();
Environment* env = req_wrap->env();
HandleScope handle_scope(env->isolate());
Context::Scope context_scope(env->context());
// The wrap and request objects should still be there.
CHECK_EQ(req_wrap->persistent().IsEmpty(), false);
CHECK_EQ(wrap->GetAsyncWrap()->persistent().IsEmpty(), false);
// Unref handle property
Local<Object> req_wrap_obj = req_wrap->object();
req_wrap_obj->Delete(env->handle_string());
wrap->OnAfterWrite(req_wrap);
Local<Value> argv[] = {
Integer::New(env->isolate(), status),
wrap->GetAsyncWrap()->object(),
req_wrap_obj,
Undefined(env->isolate())
};
const char* msg = wrap->Error();
if (msg != nullptr) {
argv[3] = OneByteString(env->isolate(), msg);
wrap->ClearError();
}
if (req_wrap->object()->Has(env->oncomplete_string()))
req_wrap->MakeCallback(env->oncomplete_string(), ARRAY_SIZE(argv), argv);
req_wrap->~WriteWrap();
delete[] reinterpret_cast<char*>(req_wrap);
}
void StreamBase::EmitData(ssize_t nread,
Local<Object> buf,
Local<Object> handle) {
Environment* env = env_;
Local<Value> argv[] = {
Integer::New(env->isolate(), nread),
buf,
handle
};
if (argv[1].IsEmpty())
argv[1] = Undefined(env->isolate());
if (argv[2].IsEmpty())
argv[2] = Undefined(env->isolate());
GetAsyncWrap()->MakeCallback(env->onread_string(), ARRAY_SIZE(argv), argv);
}
AsyncWrap* StreamBase::GetAsyncWrap() {
return nullptr;
}
} // namespace node

223
src/stream_base.h

@ -0,0 +1,223 @@
#ifndef SRC_STREAM_BASE_H_
#define SRC_STREAM_BASE_H_
#include "env.h"
#include "async-wrap.h"
#include "req-wrap.h"
#include "req-wrap-inl.h"
#include "node.h"
#include "v8.h"
namespace node {
// Forward declarations
class StreamBase;
template <class Req>
class StreamReq {
public:
typedef void (*DoneCb)(Req* req, int status);
explicit StreamReq(DoneCb cb) : cb_(cb) {
}
inline void Done(int status) {
cb_(static_cast<Req*>(this), status);
}
private:
DoneCb cb_;
};
class ShutdownWrap : public ReqWrap<uv_shutdown_t>,
public StreamReq<ShutdownWrap> {
public:
ShutdownWrap(Environment* env,
v8::Local<v8::Object> req_wrap_obj,
StreamBase* wrap,
DoneCb cb)
: ReqWrap(env, req_wrap_obj, AsyncWrap::PROVIDER_SHUTDOWNWRAP),
StreamReq<ShutdownWrap>(cb),
wrap_(wrap) {
Wrap(req_wrap_obj, this);
}
static void NewShutdownWrap(const v8::FunctionCallbackInfo<v8::Value>& args) {
CHECK(args.IsConstructCall());
}
inline StreamBase* wrap() const { return wrap_; }
private:
StreamBase* const wrap_;
};
class WriteWrap: public ReqWrap<uv_write_t>,
public StreamReq<WriteWrap> {
public:
WriteWrap(Environment* env,
v8::Local<v8::Object> obj,
StreamBase* wrap,
DoneCb cb)
: ReqWrap(env, obj, AsyncWrap::PROVIDER_WRITEWRAP),
StreamReq<WriteWrap>(cb),
wrap_(wrap) {
Wrap(obj, this);
}
void* operator new(size_t size, char* storage) { return storage; }
// This is just to keep the compiler happy. It should never be called, since
// we don't use exceptions in node.
void operator delete(void* ptr, char* storage) { UNREACHABLE(); }
inline StreamBase* wrap() const {
return wrap_;
}
static void NewWriteWrap(const v8::FunctionCallbackInfo<v8::Value>& args) {
CHECK(args.IsConstructCall());
}
private:
// People should not be using the non-placement new and delete operator on a
// WriteWrap. Ensure this never happens.
void* operator new(size_t size) { UNREACHABLE(); }
void operator delete(void* ptr) { UNREACHABLE(); }
StreamBase* const wrap_;
};
class StreamResource {
public:
typedef void (*AfterWriteCb)(WriteWrap* w, void* ctx);
typedef void (*AllocCb)(size_t size, uv_buf_t* buf, void* ctx);
typedef void (*ReadCb)(ssize_t nread,
const uv_buf_t* buf,
uv_handle_type pending,
void* ctx);
StreamResource() : after_write_cb_(nullptr),
alloc_cb_(nullptr),
read_cb_(nullptr) {
}
virtual ~StreamResource() = default;
virtual int DoShutdown(ShutdownWrap* req_wrap) = 0;
virtual int DoTryWrite(uv_buf_t** bufs, size_t* count) = 0;
virtual int DoWrite(WriteWrap* w,
uv_buf_t* bufs,
size_t count,
uv_stream_t* send_handle) = 0;
virtual const char* Error() const = 0;
virtual void ClearError() = 0;
// Events
inline void OnAfterWrite(WriteWrap* w) {
if (after_write_cb_ != nullptr)
after_write_cb_(w, after_write_ctx_);
}
inline void OnAlloc(size_t size, uv_buf_t* buf) {
if (alloc_cb_ != nullptr)
alloc_cb_(size, buf, alloc_ctx_);
}
inline void OnRead(size_t nread,
const uv_buf_t* buf,
uv_handle_type pending) {
if (read_cb_ != nullptr)
read_cb_(nread, buf, pending, read_ctx_);
}
inline void set_after_write_cb(AfterWriteCb cb, void* ctx) {
after_write_ctx_ = ctx;
after_write_cb_ = cb;
}
inline void set_alloc_cb(AllocCb cb, void* ctx) {
alloc_cb_ = cb;
alloc_ctx_ = ctx;
}
inline void set_read_cb(ReadCb cb, void* ctx) {
read_cb_ = cb;
read_ctx_ = ctx;
}
private:
AfterWriteCb after_write_cb_;
void* after_write_ctx_;
AllocCb alloc_cb_;
void* alloc_ctx_;
ReadCb read_cb_;
void* read_ctx_;
};
class StreamBase : public StreamResource {
public:
template <class Base>
static void AddMethods(Environment* env,
v8::Handle<v8::FunctionTemplate> target);
virtual void* Cast() = 0;
virtual bool IsAlive() const = 0;
virtual bool IsClosing() const = 0;
virtual bool IsIPCPipe() const = 0;
virtual int GetFD() const = 0;
virtual int ReadStart() = 0;
virtual int ReadStop() = 0;
inline void Consume() {
CHECK_EQ(consumed_, false);
consumed_ = true;
}
template <class Outer>
inline Outer* Cast() { return static_cast<Outer*>(Cast()); }
void EmitData(ssize_t nread,
v8::Local<v8::Object> buf,
v8::Local<v8::Object> handle);
protected:
explicit StreamBase(Environment* env) : env_(env), consumed_(false) {
}
virtual ~StreamBase() = default;
virtual AsyncWrap* GetAsyncWrap() = 0;
// Libuv callbacks
static void AfterShutdown(ShutdownWrap* req, int status);
static void AfterWrite(WriteWrap* req, int status);
// JS Methods
int ReadStart(const v8::FunctionCallbackInfo<v8::Value>& args);
int ReadStop(const v8::FunctionCallbackInfo<v8::Value>& args);
int Shutdown(const v8::FunctionCallbackInfo<v8::Value>& args);
int Writev(const v8::FunctionCallbackInfo<v8::Value>& args);
int WriteBuffer(const v8::FunctionCallbackInfo<v8::Value>& args);
template <enum encoding enc>
int WriteString(const v8::FunctionCallbackInfo<v8::Value>& args);
template <class Base>
static void GetFD(v8::Local<v8::String>,
const v8::PropertyCallbackInfo<v8::Value>&);
template <class Base,
int (StreamBase::*Method)( // NOLINT(whitespace/parens)
const v8::FunctionCallbackInfo<v8::Value>& args)>
static void JSMethod(const v8::FunctionCallbackInfo<v8::Value>& args);
private:
Environment* env_;
bool consumed_;
};
} // namespace node
#endif // SRC_STREAM_BASE_H_

656
src/stream_wrap.cc

@ -55,6 +55,7 @@ void StreamWrap::Initialize(Handle<Object> target,
ww->SetClassName(FIXED_ONE_BYTE_STRING(env->isolate(), "WriteWrap"));
target->Set(FIXED_ONE_BYTE_STRING(env->isolate(), "WriteWrap"),
ww->GetFunction());
env->set_write_wrap_constructor_function(ww->GetFunction());
}
@ -68,23 +69,53 @@ StreamWrap::StreamWrap(Environment* env,
reinterpret_cast<uv_handle_t*>(stream),
provider,
parent),
stream_(stream),
default_callbacks_(this),
callbacks_(&default_callbacks_),
callbacks_gc_(false) {
StreamBase(env),
stream_(stream) {
set_after_write_cb(OnAfterWriteImpl, this);
set_alloc_cb(OnAllocImpl, this);
set_read_cb(OnReadImpl, this);
}
void StreamWrap::GetFD(Local<String>, const PropertyCallbackInfo<Value>& args) {
#if !defined(_WIN32)
HandleScope scope(args.GetIsolate());
StreamWrap* wrap = Unwrap<StreamWrap>(args.Holder());
int fd = -1;
if (wrap != nullptr && wrap->stream() != nullptr) {
fd = wrap->stream()->io_watcher.fd;
void StreamWrap::AddMethods(Environment* env,
v8::Handle<v8::FunctionTemplate> target) {
env->SetProtoMethod(target, "setBlocking", SetBlocking);
StreamBase::AddMethods<StreamWrap>(env, target);
}
args.GetReturnValue().Set(fd);
int StreamWrap::GetFD() const {
int fd = -1;
#if !defined(_WIN32)
if (stream() != nullptr)
fd = stream()->io_watcher.fd;
#endif
return fd;
}
bool StreamWrap::IsAlive() const {
return HandleWrap::IsAlive(this);
}
bool StreamWrap::IsClosing() const {
return uv_is_closing(reinterpret_cast<uv_handle_t*>(stream()));
}
void* StreamWrap::Cast() {
return reinterpret_cast<void*>(this);
}
AsyncWrap* StreamWrap::GetAsyncWrap() {
return static_cast<AsyncWrap*>(this);
}
bool StreamWrap::IsIPCPipe() const {
return is_named_pipe_ipc();
}
@ -96,22 +127,13 @@ void StreamWrap::UpdateWriteQueueSize() {
}
void StreamWrap::ReadStart(const FunctionCallbackInfo<Value>& args) {
StreamWrap* wrap = Unwrap<StreamWrap>(args.Holder());
if (!IsAlive(wrap))
return args.GetReturnValue().Set(UV_EINVAL);
int err = uv_read_start(wrap->stream(), OnAlloc, OnRead);
args.GetReturnValue().Set(err);
int StreamWrap::ReadStart() {
return uv_read_start(stream(), OnAlloc, OnRead);
}
void StreamWrap::ReadStop(const FunctionCallbackInfo<Value>& args) {
StreamWrap* wrap = Unwrap<StreamWrap>(args.Holder());
if (!IsAlive(wrap))
return args.GetReturnValue().Set(UV_EINVAL);
int err = uv_read_stop(wrap->stream());
args.GetReturnValue().Set(err);
int StreamWrap::ReadStop() {
return uv_read_stop(stream());
}
@ -120,14 +142,25 @@ void StreamWrap::OnAlloc(uv_handle_t* handle,
uv_buf_t* buf) {
StreamWrap* wrap = static_cast<StreamWrap*>(handle->data);
CHECK_EQ(wrap->stream(), reinterpret_cast<uv_stream_t*>(handle));
wrap->callbacks()->DoAlloc(handle, suggested_size, buf);
return static_cast<StreamBase*>(wrap)->OnAlloc(suggested_size, buf);
}
void StreamWrap::OnAllocImpl(size_t size, uv_buf_t* buf, void* ctx) {
buf->base = static_cast<char*>(malloc(size));
buf->len = size;
if (buf->base == nullptr && size > 0) {
FatalError(
"node::StreamWrap::DoAlloc(size_t, uv_buf_t*, void*)",
"Out Of Memory");
}
}
template <class WrapType, class UVType>
static Local<Object> AcceptHandle(Environment* env,
uv_stream_t* pipe,
AsyncWrap* parent) {
static Local<Object> AcceptHandle(Environment* env, StreamWrap* parent) {
EscapableHandleScope scope(env->isolate());
Local<Object> wrap_obj;
UVType* handle;
@ -139,13 +172,54 @@ static Local<Object> AcceptHandle(Environment* env,
WrapType* wrap = Unwrap<WrapType>(wrap_obj);
handle = wrap->UVHandle();
if (uv_accept(pipe, reinterpret_cast<uv_stream_t*>(handle)))
if (uv_accept(parent->stream(), reinterpret_cast<uv_stream_t*>(handle)))
abort();
return scope.Escape(wrap_obj);
}
void StreamWrap::OnReadImpl(ssize_t nread,
const uv_buf_t* buf,
uv_handle_type pending,
void* ctx) {
StreamWrap* wrap = static_cast<StreamWrap*>(ctx);
Environment* env = wrap->env();
HandleScope handle_scope(env->isolate());
Context::Scope context_scope(env->context());
Local<Object> pending_obj;
if (nread < 0) {
if (buf->base != nullptr)
free(buf->base);
wrap->EmitData(nread, Local<Object>(), pending_obj);
return;
}
if (nread == 0) {
if (buf->base != nullptr)
free(buf->base);
return;
}
char* base = static_cast<char*>(realloc(buf->base, nread));
CHECK_LE(static_cast<size_t>(nread), buf->len);
if (pending == UV_TCP) {
pending_obj = AcceptHandle<TCPWrap, uv_tcp_t>(env, wrap);
} else if (pending == UV_NAMED_PIPE) {
pending_obj = AcceptHandle<PipeWrap, uv_pipe_t>(env, wrap);
} else if (pending == UV_UDP) {
pending_obj = AcceptHandle<UDPWrap, uv_udp_t>(env, wrap);
} else {
CHECK_EQ(pending, UV_UNKNOWN_HANDLE);
}
wrap->EmitData(nread, Buffer::Use(env, base, nread), pending_obj);
}
void StreamWrap::OnReadCommon(uv_stream_t* handle,
ssize_t nread,
const uv_buf_t* buf,
@ -164,7 +238,7 @@ void StreamWrap::OnReadCommon(uv_stream_t* handle,
}
}
wrap->callbacks()->DoRead(handle, nread, buf, pending);
static_cast<StreamBase*>(wrap)->OnRead(nread, buf, pending);
}
@ -183,437 +257,26 @@ void StreamWrap::OnRead(uv_stream_t* handle,
}
size_t StreamWrap::WriteBuffer(Handle<Value> val, uv_buf_t* buf) {
CHECK(Buffer::HasInstance(val));
// Simple non-writev case
buf->base = Buffer::Data(val);
buf->len = Buffer::Length(val);
return buf->len;
}
void StreamWrap::WriteBuffer(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
StreamWrap* wrap = Unwrap<StreamWrap>(args.Holder());
if (!IsAlive(wrap))
return args.GetReturnValue().Set(UV_EINVAL);
CHECK(args[0]->IsObject());
CHECK(Buffer::HasInstance(args[1]));
Local<Object> req_wrap_obj = args[0].As<Object>();
Local<Object> buf_obj = args[1].As<Object>();
size_t length = Buffer::Length(buf_obj);
char* storage;
WriteWrap* req_wrap;
uv_buf_t buf;
WriteBuffer(buf_obj, &buf);
// Try writing immediately without allocation
uv_buf_t* bufs = &buf;
size_t count = 1;
int err = wrap->callbacks()->TryWrite(&bufs, &count);
if (err != 0)
goto done;
if (count == 0)
goto done;
CHECK_EQ(count, 1);
// Allocate, or write rest
storage = new char[sizeof(WriteWrap)];
req_wrap = new(storage) WriteWrap(env, req_wrap_obj, wrap);
err = wrap->callbacks()->DoWrite(req_wrap,
bufs,
count,
nullptr,
StreamWrap::AfterWrite);
req_wrap->Dispatched();
req_wrap_obj->Set(env->async(), True(env->isolate()));
if (err) {
req_wrap->~WriteWrap();
delete[] storage;
}
done:
const char* msg = wrap->callbacks()->Error();
if (msg != nullptr) {
req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg));
wrap->callbacks()->ClearError();
}
req_wrap_obj->Set(env->bytes_string(),
Integer::NewFromUnsigned(env->isolate(), length));
args.GetReturnValue().Set(err);
}
template <enum encoding encoding>
void StreamWrap::WriteStringImpl(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
int err;
StreamWrap* wrap = Unwrap<StreamWrap>(args.Holder());
if (!IsAlive(wrap))
return args.GetReturnValue().Set(UV_EINVAL);
CHECK(args[0]->IsObject());
CHECK(args[1]->IsString());
Local<Object> req_wrap_obj = args[0].As<Object>();
Local<String> string = args[1].As<String>();
// Compute the size of the storage that the string will be flattened into.
// For UTF8 strings that are very long, go ahead and take the hit for
// computing their actual size, rather than tripling the storage.
size_t storage_size;
if (encoding == UTF8 && string->Length() > 65535)
storage_size = StringBytes::Size(env->isolate(), string, encoding);
else
storage_size = StringBytes::StorageSize(env->isolate(), string, encoding);
if (storage_size > INT_MAX) {
args.GetReturnValue().Set(UV_ENOBUFS);
return;
}
// Try writing immediately if write size isn't too big
char* storage;
WriteWrap* req_wrap;
char* data;
char stack_storage[16384]; // 16kb
size_t data_size;
uv_buf_t buf;
bool try_write = storage_size + 15 <= sizeof(stack_storage) &&
(!wrap->is_named_pipe_ipc() || !args[2]->IsObject());
if (try_write) {
data_size = StringBytes::Write(env->isolate(),
stack_storage,
storage_size,
string,
encoding);
buf = uv_buf_init(stack_storage, data_size);
uv_buf_t* bufs = &buf;
size_t count = 1;
err = wrap->callbacks()->TryWrite(&bufs, &count);
// Failure
if (err != 0)
goto done;
// Success
if (count == 0)
goto done;
// Partial write
CHECK_EQ(count, 1);
}
storage = new char[sizeof(WriteWrap) + storage_size + 15];
req_wrap = new(storage) WriteWrap(env, req_wrap_obj, wrap);
data = reinterpret_cast<char*>(ROUND_UP(
reinterpret_cast<uintptr_t>(storage) + sizeof(WriteWrap), 16));
if (try_write) {
// Copy partial data
memcpy(data, buf.base, buf.len);
data_size = buf.len;
} else {
// Write it
data_size = StringBytes::Write(env->isolate(),
data,
storage_size,
string,
encoding);
}
CHECK_LE(data_size, storage_size);
buf = uv_buf_init(data, data_size);
if (!wrap->is_named_pipe_ipc()) {
err = wrap->callbacks()->DoWrite(req_wrap,
&buf,
1,
nullptr,
StreamWrap::AfterWrite);
} else {
uv_handle_t* send_handle = nullptr;
if (args[2]->IsObject()) {
Local<Object> send_handle_obj = args[2].As<Object>();
HandleWrap* wrap = Unwrap<HandleWrap>(send_handle_obj);
send_handle = wrap->GetHandle();
// Reference StreamWrap instance to prevent it from being garbage
// collected before `AfterWrite` is called.
CHECK_EQ(false, req_wrap->persistent().IsEmpty());
req_wrap->object()->Set(env->handle_string(), send_handle_obj);
}
err = wrap->callbacks()->DoWrite(
req_wrap,
&buf,
1,
reinterpret_cast<uv_stream_t*>(send_handle),
StreamWrap::AfterWrite);
}
req_wrap->Dispatched();
req_wrap->object()->Set(env->async(), True(env->isolate()));
if (err) {
req_wrap->~WriteWrap();
delete[] storage;
}
done:
const char* msg = wrap->callbacks()->Error();
if (msg != nullptr) {
req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg));
wrap->callbacks()->ClearError();
}
req_wrap_obj->Set(env->bytes_string(),
Integer::NewFromUnsigned(env->isolate(), data_size));
args.GetReturnValue().Set(err);
}
void StreamWrap::Writev(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
StreamWrap* wrap = Unwrap<StreamWrap>(args.Holder());
if (!IsAlive(wrap))
return args.GetReturnValue().Set(UV_EINVAL);
CHECK(args[0]->IsObject());
CHECK(args[1]->IsArray());
Local<Object> req_wrap_obj = args[0].As<Object>();
Local<Array> chunks = args[1].As<Array>();
size_t count = chunks->Length() >> 1;
uv_buf_t bufs_[16];
uv_buf_t* bufs = bufs_;
// Determine storage size first
size_t storage_size = 0;
for (size_t i = 0; i < count; i++) {
Handle<Value> chunk = chunks->Get(i * 2);
if (Buffer::HasInstance(chunk))
continue;
// Buffer chunk, no additional storage required
// String chunk
Handle<String> string = chunk->ToString(env->isolate());
enum encoding encoding = ParseEncoding(env->isolate(),
chunks->Get(i * 2 + 1));
size_t chunk_size;
if (encoding == UTF8 && string->Length() > 65535)
chunk_size = StringBytes::Size(env->isolate(), string, encoding);
else
chunk_size = StringBytes::StorageSize(env->isolate(), string, encoding);
storage_size += chunk_size + 15;
}
if (storage_size > INT_MAX) {
args.GetReturnValue().Set(UV_ENOBUFS);
return;
}
if (ARRAY_SIZE(bufs_) < count)
bufs = new uv_buf_t[count];
storage_size += sizeof(WriteWrap);
char* storage = new char[storage_size];
WriteWrap* req_wrap =
new(storage) WriteWrap(env, req_wrap_obj, wrap);
uint32_t bytes = 0;
size_t offset = sizeof(WriteWrap);
for (size_t i = 0; i < count; i++) {
Handle<Value> chunk = chunks->Get(i * 2);
// Write buffer
if (Buffer::HasInstance(chunk)) {
bufs[i].base = Buffer::Data(chunk);
bufs[i].len = Buffer::Length(chunk);
bytes += bufs[i].len;
continue;
}
// Write string
offset = ROUND_UP(offset, 16);
CHECK_LT(offset, storage_size);
char* str_storage = storage + offset;
size_t str_size = storage_size - offset;
Handle<String> string = chunk->ToString(env->isolate());
enum encoding encoding = ParseEncoding(env->isolate(),
chunks->Get(i * 2 + 1));
str_size = StringBytes::Write(env->isolate(),
str_storage,
str_size,
string,
encoding);
bufs[i].base = str_storage;
bufs[i].len = str_size;
offset += str_size;
bytes += str_size;
}
int err = wrap->callbacks()->DoWrite(req_wrap,
bufs,
count,
nullptr,
StreamWrap::AfterWrite);
// Deallocate space
if (bufs != bufs_)
delete[] bufs;
req_wrap->Dispatched();
req_wrap->object()->Set(env->async(), True(env->isolate()));
req_wrap->object()->Set(env->bytes_string(),
Number::New(env->isolate(), bytes));
const char* msg = wrap->callbacks()->Error();
if (msg != nullptr) {
req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg));
wrap->callbacks()->ClearError();
}
if (err) {
req_wrap->~WriteWrap();
delete[] storage;
}
args.GetReturnValue().Set(err);
}
void StreamWrap::WriteAsciiString(const FunctionCallbackInfo<Value>& args) {
WriteStringImpl<ASCII>(args);
}
void StreamWrap::WriteUtf8String(const FunctionCallbackInfo<Value>& args) {
WriteStringImpl<UTF8>(args);
}
void StreamWrap::WriteUcs2String(const FunctionCallbackInfo<Value>& args) {
WriteStringImpl<UCS2>(args);
}
void StreamWrap::WriteBinaryString(const FunctionCallbackInfo<Value>& args) {
WriteStringImpl<BINARY>(args);
}
void StreamWrap::SetBlocking(const FunctionCallbackInfo<Value>& args) {
StreamWrap* wrap = Unwrap<StreamWrap>(args.Holder());
if (!IsAlive(wrap))
return args.GetReturnValue().Set(UV_EINVAL);
CHECK_GT(args.Length(), 0);
int err = uv_stream_set_blocking(wrap->stream(), args[0]->IsTrue());
args.GetReturnValue().Set(err);
}
void StreamWrap::AfterWrite(uv_write_t* req, int status) {
WriteWrap* req_wrap = ContainerOf(&WriteWrap::req_, req);
StreamWrap* wrap = req_wrap->wrap();
Environment* env = wrap->env();
HandleScope handle_scope(env->isolate());
Context::Scope context_scope(env->context());
// The wrap and request objects should still be there.
CHECK_EQ(req_wrap->persistent().IsEmpty(), false);
CHECK_EQ(wrap->persistent().IsEmpty(), false);
// Unref handle property
Local<Object> req_wrap_obj = req_wrap->object();
req_wrap_obj->Delete(env->handle_string());
wrap->callbacks()->AfterWrite(req_wrap);
Local<Value> argv[] = {
Integer::New(env->isolate(), status),
wrap->object(),
req_wrap_obj,
Undefined(env->isolate())
};
const char* msg = wrap->callbacks()->Error();
if (msg != nullptr) {
argv[3] = OneByteString(env->isolate(), msg);
wrap->callbacks()->ClearError();
}
req_wrap->MakeCallback(env->oncomplete_string(), ARRAY_SIZE(argv), argv);
req_wrap->~WriteWrap();
delete[] reinterpret_cast<char*>(req_wrap);
}
void StreamWrap::Shutdown(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
StreamWrap* wrap = Unwrap<StreamWrap>(args.Holder());
if (!IsAlive(wrap))
CHECK_GT(args.Length(), 0);
if (!wrap->IsAlive())
return args.GetReturnValue().Set(UV_EINVAL);
CHECK(args[0]->IsObject());
Local<Object> req_wrap_obj = args[0].As<Object>();
ShutdownWrap* req_wrap = new ShutdownWrap(env, req_wrap_obj);
int err = wrap->callbacks()->DoShutdown(req_wrap, AfterShutdown);
req_wrap->Dispatched();
if (err)
delete req_wrap;
args.GetReturnValue().Set(err);
bool enable = args[0]->IsTrue();
args.GetReturnValue().Set(uv_stream_set_blocking(wrap->stream(), enable));
}
void StreamWrap::AfterShutdown(uv_shutdown_t* req, int status) {
ShutdownWrap* req_wrap = static_cast<ShutdownWrap*>(req->data);
StreamWrap* wrap = static_cast<StreamWrap*>(req->handle->data);
Environment* env = wrap->env();
// The wrap and request objects should still be there.
CHECK_EQ(req_wrap->persistent().IsEmpty(), false);
CHECK_EQ(wrap->persistent().IsEmpty(), false);
HandleScope handle_scope(env->isolate());
Context::Scope context_scope(env->context());
Local<Object> req_wrap_obj = req_wrap->object();
Local<Value> argv[3] = {
Integer::New(env->isolate(), status),
wrap->object(),
req_wrap_obj
};
req_wrap->MakeCallback(env->oncomplete_string(), ARRAY_SIZE(argv), argv);
delete req_wrap;
int StreamWrap::DoShutdown(ShutdownWrap* req_wrap) {
return uv_shutdown(&req_wrap->req_, stream(), AfterShutdown);
}
const char* StreamWrapCallbacks::Error() const {
return nullptr;
}
void StreamWrapCallbacks::ClearError() {
void StreamWrap::AfterShutdown(uv_shutdown_t* req, int status) {
ShutdownWrap* req_wrap = ContainerOf(&ShutdownWrap::req_, req);
req_wrap->Done(status);
}
@ -621,13 +284,13 @@ void StreamWrapCallbacks::ClearError() {
// values, shifting their base and decrementing their length. This is
// required in order to skip the data that was successfully written via
// uv_try_write().
int StreamWrapCallbacks::TryWrite(uv_buf_t** bufs, size_t* count) {
int StreamWrap::DoTryWrite(uv_buf_t** bufs, size_t* count) {
int err;
size_t written;
uv_buf_t* vbufs = *bufs;
size_t vcount = *count;
err = uv_try_write(wrap()->stream(), vbufs, vcount);
err = uv_try_write(stream(), vbufs, vcount);
if (err == UV_ENOSYS || err == UV_EAGAIN)
return 0;
if (err < 0)
@ -657,106 +320,53 @@ int StreamWrapCallbacks::TryWrite(uv_buf_t** bufs, size_t* count) {
}
int StreamWrapCallbacks::DoWrite(WriteWrap* w,
int StreamWrap::DoWrite(WriteWrap* w,
uv_buf_t* bufs,
size_t count,
uv_stream_t* send_handle,
uv_write_cb cb) {
uv_stream_t* send_handle) {
int r;
if (send_handle == nullptr) {
r = uv_write(&w->req_, wrap()->stream(), bufs, count, cb);
r = uv_write(&w->req_, stream(), bufs, count, AfterWrite);
} else {
r = uv_write2(&w->req_, wrap()->stream(), bufs, count, send_handle, cb);
r = uv_write2(&w->req_, stream(), bufs, count, send_handle, AfterWrite);
}
if (!r) {
size_t bytes = 0;
for (size_t i = 0; i < count; i++)
bytes += bufs[i].len;
if (wrap()->stream()->type == UV_TCP) {
if (stream()->type == UV_TCP) {
NODE_COUNT_NET_BYTES_SENT(bytes);
} else if (wrap()->stream()->type == UV_NAMED_PIPE) {
} else if (stream()->type == UV_NAMED_PIPE) {
NODE_COUNT_PIPE_BYTES_SENT(bytes);
}
}
wrap()->UpdateWriteQueueSize();
UpdateWriteQueueSize();
return r;
}
void StreamWrapCallbacks::AfterWrite(WriteWrap* w) {
wrap()->UpdateWriteQueueSize();
}
void StreamWrapCallbacks::DoAlloc(uv_handle_t* handle,
size_t suggested_size,
uv_buf_t* buf) {
buf->base = static_cast<char*>(malloc(suggested_size));
buf->len = suggested_size;
if (buf->base == nullptr && suggested_size > 0) {
FatalError(
"node::StreamWrapCallbacks::DoAlloc(uv_handle_t*, size_t, uv_buf_t*)",
"Out Of Memory");
}
}
void StreamWrapCallbacks::DoRead(uv_stream_t* handle,
ssize_t nread,
const uv_buf_t* buf,
uv_handle_type pending) {
Environment* env = wrap()->env();
HandleScope handle_scope(env->isolate());
Context::Scope context_scope(env->context());
Local<Value> argv[] = {
Integer::New(env->isolate(), nread),
Undefined(env->isolate()),
Undefined(env->isolate())
};
if (nread < 0) {
if (buf->base != nullptr)
free(buf->base);
wrap()->MakeCallback(env->onread_string(), ARRAY_SIZE(argv), argv);
return;
}
if (nread == 0) {
if (buf->base != nullptr)
free(buf->base);
return;
void StreamWrap::AfterWrite(uv_write_t* req, int status) {
WriteWrap* req_wrap = ContainerOf(&WriteWrap::req_, req);
req_wrap->Done(status);
}
char* base = static_cast<char*>(realloc(buf->base, nread));
CHECK_LE(static_cast<size_t>(nread), buf->len);
argv[1] = Buffer::Use(env, base, nread);
Local<Object> pending_obj;
if (pending == UV_TCP) {
pending_obj = AcceptHandle<TCPWrap, uv_tcp_t>(env, handle, wrap());
} else if (pending == UV_NAMED_PIPE) {
pending_obj = AcceptHandle<PipeWrap, uv_pipe_t>(env, handle, wrap());
} else if (pending == UV_UDP) {
pending_obj = AcceptHandle<UDPWrap, uv_udp_t>(env, handle, wrap());
} else {
CHECK_EQ(pending, UV_UNKNOWN_HANDLE);
void StreamWrap::OnAfterWriteImpl(WriteWrap* w, void* ctx) {
StreamWrap* wrap = static_cast<StreamWrap*>(ctx);
wrap->UpdateWriteQueueSize();
}
if (!pending_obj.IsEmpty()) {
argv[2] = pending_obj;
}
wrap()->MakeCallback(env->onread_string(), ARRAY_SIZE(argv), argv);
const char* StreamWrap::Error() const {
return nullptr;
}
int StreamWrapCallbacks::DoShutdown(ShutdownWrap* req_wrap, uv_shutdown_cb cb) {
return uv_shutdown(&req_wrap->req_, wrap()->stream(), cb);
void StreamWrap::ClearError() {
// No-op
}
} // namespace node

164
src/stream_wrap.h

@ -1,10 +1,10 @@
#ifndef SRC_STREAM_WRAP_H_
#define SRC_STREAM_WRAP_H_
#include "stream_base.h"
#include "env.h"
#include "handle_wrap.h"
#include "req-wrap.h"
#include "req-wrap-inl.h"
#include "string_bytes.h"
#include "v8.h"
@ -13,126 +13,31 @@ namespace node {
// Forward declaration
class StreamWrap;
class ShutdownWrap : public ReqWrap<uv_shutdown_t> {
public:
ShutdownWrap(Environment* env, v8::Local<v8::Object> req_wrap_obj)
: ReqWrap(env, req_wrap_obj, AsyncWrap::PROVIDER_SHUTDOWNWRAP) {
Wrap(req_wrap_obj, this);
}
static void NewShutdownWrap(const v8::FunctionCallbackInfo<v8::Value>& args) {
CHECK(args.IsConstructCall());
}
};
class WriteWrap: public ReqWrap<uv_write_t> {
public:
// TODO(trevnorris): WrapWrap inherits from ReqWrap, which I've globbed
// into the same provider. How should these be broken apart?
WriteWrap(Environment* env, v8::Local<v8::Object> obj, StreamWrap* wrap)
: ReqWrap(env, obj, AsyncWrap::PROVIDER_WRITEWRAP),
wrap_(wrap) {
Wrap(obj, this);
}
void* operator new(size_t size, char* storage) { return storage; }
// This is just to keep the compiler happy. It should never be called, since
// we don't use exceptions in node.
void operator delete(void* ptr, char* storage) { UNREACHABLE(); }
inline StreamWrap* wrap() const {
return wrap_;
}
static void NewWriteWrap(const v8::FunctionCallbackInfo<v8::Value>& args) {
CHECK(args.IsConstructCall());
}
private:
// People should not be using the non-placement new and delete operator on a
// WriteWrap. Ensure this never happens.
void* operator new(size_t size) { UNREACHABLE(); }
void operator delete(void* ptr) { UNREACHABLE(); }
StreamWrap* const wrap_;
};
// Overridable callbacks' types
class StreamWrapCallbacks {
public:
explicit StreamWrapCallbacks(StreamWrap* wrap) : wrap_(wrap) {
}
explicit StreamWrapCallbacks(StreamWrapCallbacks* old) : wrap_(old->wrap()) {
}
virtual ~StreamWrapCallbacks() = default;
virtual const char* Error() const;
virtual void ClearError();
virtual int TryWrite(uv_buf_t** bufs, size_t* count);
virtual int DoWrite(WriteWrap* w,
uv_buf_t* bufs,
size_t count,
uv_stream_t* send_handle,
uv_write_cb cb);
virtual void AfterWrite(WriteWrap* w);
virtual void DoAlloc(uv_handle_t* handle,
size_t suggested_size,
uv_buf_t* buf);
virtual void DoRead(uv_stream_t* handle,
ssize_t nread,
const uv_buf_t* buf,
uv_handle_type pending);
virtual int DoShutdown(ShutdownWrap* req_wrap, uv_shutdown_cb cb);
protected:
inline StreamWrap* wrap() const {
return wrap_;
}
private:
StreamWrap* const wrap_;
};
class StreamWrap : public HandleWrap {
class StreamWrap : public HandleWrap, public StreamBase {
public:
static void Initialize(v8::Handle<v8::Object> target,
v8::Handle<v8::Value> unused,
v8::Handle<v8::Context> context);
void OverrideCallbacks(StreamWrapCallbacks* callbacks, bool gc) {
StreamWrapCallbacks* old = callbacks_;
callbacks_ = callbacks;
callbacks_gc_ = gc;
if (old != &default_callbacks_)
delete old;
}
static void GetFD(v8::Local<v8::String>,
const v8::PropertyCallbackInfo<v8::Value>&);
int GetFD() const override;
void* Cast() override;
bool IsAlive() const override;
bool IsClosing() const override;
bool IsIPCPipe() const override;
// JavaScript functions
static void ReadStart(const v8::FunctionCallbackInfo<v8::Value>& args);
static void ReadStop(const v8::FunctionCallbackInfo<v8::Value>& args);
static void Shutdown(const v8::FunctionCallbackInfo<v8::Value>& args);
static void Writev(const v8::FunctionCallbackInfo<v8::Value>& args);
static void WriteBuffer(const v8::FunctionCallbackInfo<v8::Value>& args);
static void WriteAsciiString(const v8::FunctionCallbackInfo<v8::Value>& args);
static void WriteUtf8String(const v8::FunctionCallbackInfo<v8::Value>& args);
static void WriteUcs2String(const v8::FunctionCallbackInfo<v8::Value>& args);
static void WriteBinaryString(
const v8::FunctionCallbackInfo<v8::Value>& args);
static void SetBlocking(const v8::FunctionCallbackInfo<v8::Value>& args);
int ReadStart() override;
int ReadStop() override;
inline StreamWrapCallbacks* callbacks() const {
return callbacks_;
}
// Resource implementation
int DoShutdown(ShutdownWrap* req_wrap) override;
int DoTryWrite(uv_buf_t** bufs, size_t* count) override;
int DoWrite(WriteWrap* w,
uv_buf_t* bufs,
size_t count,
uv_stream_t* send_handle) override;
const char* Error() const override;
void ClearError() override;
inline uv_stream_t* stream() const {
return stream_;
@ -152,8 +57,6 @@ class StreamWrap : public HandleWrap {
}
protected:
static size_t WriteBuffer(v8::Handle<v8::Value> val, uv_buf_t* buf);
StreamWrap(Environment* env,
v8::Local<v8::Object> object,
uv_stream_t* stream,
@ -161,22 +64,21 @@ class StreamWrap : public HandleWrap {
AsyncWrap* parent = nullptr);
~StreamWrap() {
if (!callbacks_gc_ && callbacks_ != &default_callbacks_) {
delete callbacks_;
}
callbacks_ = nullptr;
}
void StateChange() { }
AsyncWrap* GetAsyncWrap() override;
void UpdateWriteQueueSize();
static void AddMethods(Environment* env,
v8::Handle<v8::FunctionTemplate> target);
private:
static void SetBlocking(const v8::FunctionCallbackInfo<v8::Value>& args);
// Callbacks for libuv
static void AfterWrite(uv_write_t* req, int status);
static void OnAlloc(uv_handle_t* handle,
size_t suggested_size,
uv_buf_t* buf);
static void AfterShutdown(uv_shutdown_t* req, int status);
static void OnRead(uv_stream_t* handle,
ssize_t nread,
@ -185,16 +87,18 @@ class StreamWrap : public HandleWrap {
ssize_t nread,
const uv_buf_t* buf,
uv_handle_type pending);
static void AfterWrite(uv_write_t* req, int status);
static void AfterShutdown(uv_shutdown_t* req, int status);
template <enum encoding encoding>
static void WriteStringImpl(const v8::FunctionCallbackInfo<v8::Value>& args);
// Resource interface implementation
static void OnAfterWriteImpl(WriteWrap* w, void* ctx);
static void OnAllocImpl(size_t size, uv_buf_t* buf, void* ctx);
static void OnReadImpl(ssize_t nread,
const uv_buf_t* buf,
uv_handle_type pending,
void* ctx);
uv_stream_t* const stream_;
StreamWrapCallbacks default_callbacks_;
StreamWrapCallbacks* callbacks_; // Overridable callbacks
bool callbacks_gc_;
friend class StreamWrapCallbacks;
};

20
src/tcp_wrap.cc

@ -72,15 +72,6 @@ void TCPWrap::Initialize(Handle<Object> target,
t->SetClassName(FIXED_ONE_BYTE_STRING(env->isolate(), "TCP"));
t->InstanceTemplate()->SetInternalFieldCount(1);
enum PropertyAttribute attributes =
static_cast<PropertyAttribute>(v8::ReadOnly | v8::DontDelete);
t->InstanceTemplate()->SetAccessor(env->fd_string(),
StreamWrap::GetFD,
nullptr,
Handle<Value>(),
v8::DEFAULT,
attributes);
// Init properties
t->InstanceTemplate()->Set(String::NewFromUtf8(env->isolate(), "reading"),
Boolean::New(env->isolate(), false));
@ -98,16 +89,7 @@ void TCPWrap::Initialize(Handle<Object> target,
env->SetProtoMethod(t, "ref", HandleWrap::Ref);
env->SetProtoMethod(t, "unref", HandleWrap::Unref);
env->SetProtoMethod(t, "readStart", StreamWrap::ReadStart);
env->SetProtoMethod(t, "readStop", StreamWrap::ReadStop);
env->SetProtoMethod(t, "shutdown", StreamWrap::Shutdown);
env->SetProtoMethod(t, "writeBuffer", StreamWrap::WriteBuffer);
env->SetProtoMethod(t, "writeAsciiString", StreamWrap::WriteAsciiString);
env->SetProtoMethod(t, "writeUtf8String", StreamWrap::WriteUtf8String);
env->SetProtoMethod(t, "writeUcs2String", StreamWrap::WriteUcs2String);
env->SetProtoMethod(t, "writeBinaryString", StreamWrap::WriteBinaryString);
env->SetProtoMethod(t, "writev", StreamWrap::Writev);
StreamWrap::AddMethods(env, t);
env->SetProtoMethod(t, "open", Open);
env->SetProtoMethod(t, "bind", Bind);

290
src/tls_wrap.cc

@ -33,17 +33,20 @@ using v8::String;
using v8::Value;
TLSCallbacks::TLSCallbacks(Environment* env,
TLSWrap::TLSWrap(Environment* env,
Kind kind,
Handle<Object> sc,
StreamWrapCallbacks* old)
: SSLWrap<TLSCallbacks>(env, Unwrap<SecureContext>(sc), kind),
StreamWrapCallbacks(old),
StreamBase* stream,
Handle<Object> stream_obj,
Handle<Object> sc)
: SSLWrap<TLSWrap>(env, Unwrap<SecureContext>(sc), kind),
StreamBase(env),
AsyncWrap(env,
env->tls_wrap_constructor_function()->NewInstance(),
AsyncWrap::PROVIDER_TLSWRAP),
sc_(Unwrap<SecureContext>(sc)),
sc_handle_(env->isolate(), sc),
stream_(stream),
stream_handle_(env->isolate(), stream_obj),
enc_in_(nullptr),
enc_out_(nullptr),
clear_in_(nullptr),
@ -58,14 +61,22 @@ TLSCallbacks::TLSCallbacks(Environment* env,
MakeWeak(this);
// We've our own session callbacks
SSL_CTX_sess_set_get_cb(sc_->ctx_, SSLWrap<TLSCallbacks>::GetSessionCallback);
SSL_CTX_sess_set_new_cb(sc_->ctx_, SSLWrap<TLSCallbacks>::NewSessionCallback);
SSL_CTX_sess_set_get_cb(sc_->ctx_, SSLWrap<TLSWrap>::GetSessionCallback);
SSL_CTX_sess_set_new_cb(sc_->ctx_, SSLWrap<TLSWrap>::NewSessionCallback);
stream_->Consume();
stream_->set_after_write_cb(OnAfterWriteImpl, this);
stream_->set_alloc_cb(OnAllocImpl, this);
stream_->set_read_cb(OnReadImpl, this);
set_alloc_cb(OnAllocSelf, this);
set_read_cb(OnReadSelf, this);
InitSSL();
}
TLSCallbacks::~TLSCallbacks() {
TLSWrap::~TLSWrap() {
enc_in_ = nullptr;
enc_out_ = nullptr;
delete clear_in_;
@ -73,6 +84,7 @@ TLSCallbacks::~TLSCallbacks() {
sc_ = nullptr;
sc_handle_.Reset();
stream_handle_.Reset();
persistent().Reset();
#ifdef SSL_CTRL_SET_TLSEXT_SERVERNAME_CB
@ -90,12 +102,12 @@ TLSCallbacks::~TLSCallbacks() {
}
void TLSCallbacks::MakePending() {
void TLSWrap::MakePending() {
write_item_queue_.MoveBack(&pending_write_items_);
}
bool TLSCallbacks::InvokeQueued(int status) {
bool TLSWrap::InvokeQueued(int status) {
if (pending_write_items_.IsEmpty())
return false;
@ -103,7 +115,7 @@ bool TLSCallbacks::InvokeQueued(int status) {
WriteItemList queue;
pending_write_items_.MoveBack(&queue);
while (WriteItem* wi = queue.PopFront()) {
wi->cb_(&wi->w_->req_, status);
wi->w_->Done(status);
delete wi;
}
@ -111,12 +123,12 @@ bool TLSCallbacks::InvokeQueued(int status) {
}
void TLSCallbacks::NewSessionDoneCb() {
void TLSWrap::NewSessionDoneCb() {
Cycle();
}
void TLSCallbacks::InitSSL() {
void TLSWrap::InitSSL() {
// Initialize SSL
enc_in_ = NodeBIO::New();
enc_out_ = NodeBIO::New();
@ -158,7 +170,7 @@ void TLSCallbacks::InitSSL() {
}
void TLSCallbacks::Wrap(const FunctionCallbackInfo<Value>& args) {
void TLSWrap::Wrap(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
if (args.Length() < 1 || !args[0]->IsObject()) {
@ -172,42 +184,39 @@ void TLSCallbacks::Wrap(const FunctionCallbackInfo<Value>& args) {
if (args.Length() < 3 || !args[2]->IsBoolean())
return env->ThrowTypeError("Third argument should be boolean");
Local<Object> stream = args[0].As<Object>();
Local<Object> stream_obj = args[0].As<Object>();
Local<Object> sc = args[1].As<Object>();
Kind kind = args[2]->IsTrue() ? SSLWrap<TLSCallbacks>::kServer :
SSLWrap<TLSCallbacks>::kClient;
Kind kind = args[2]->IsTrue() ? SSLWrap<TLSWrap>::kServer :
SSLWrap<TLSWrap>::kClient;
TLSCallbacks* callbacks = nullptr;
WITH_GENERIC_STREAM(env, stream, {
callbacks = new TLSCallbacks(env, kind, sc, wrap->callbacks());
wrap->OverrideCallbacks(callbacks, true);
StreamBase* stream = nullptr;
WITH_GENERIC_STREAM(env, stream_obj, {
stream = wrap;
});
CHECK_NE(stream, nullptr);
if (callbacks == nullptr) {
return args.GetReturnValue().SetNull();
}
TLSWrap* res = new TLSWrap(env, kind, stream, stream_obj, sc);
args.GetReturnValue().Set(callbacks->persistent());
args.GetReturnValue().Set(res->persistent());
}
void TLSCallbacks::Receive(const FunctionCallbackInfo<Value>& args) {
TLSCallbacks* wrap = Unwrap<TLSCallbacks>(args.Holder());
void TLSWrap::Receive(const FunctionCallbackInfo<Value>& args) {
TLSWrap* wrap = Unwrap<TLSWrap>(args.Holder());
CHECK(Buffer::HasInstance(args[0]));
char* data = Buffer::Data(args[0]);
size_t len = Buffer::Length(args[0]);
uv_buf_t buf;
uv_stream_t* stream = wrap->wrap()->stream();
// Copy given buffer entirely or partiall if handle becomes closed
while (len > 0 && !uv_is_closing(reinterpret_cast<uv_handle_t*>(stream))) {
wrap->DoAlloc(reinterpret_cast<uv_handle_t*>(stream), len, &buf);
while (len > 0 && !wrap->IsClosing()) {
wrap->stream_->OnAlloc(len, &buf);
size_t copy = buf.len > len ? len : buf.len;
memcpy(buf.base, data, copy);
buf.len = copy;
wrap->DoRead(stream, buf.len, &buf, UV_UNKNOWN_HANDLE);
wrap->stream_->OnRead(buf.len, &buf, UV_UNKNOWN_HANDLE);
data += copy;
len -= copy;
@ -215,10 +224,10 @@ void TLSCallbacks::Receive(const FunctionCallbackInfo<Value>& args) {
}
void TLSCallbacks::Start(const FunctionCallbackInfo<Value>& args) {
void TLSWrap::Start(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
TLSCallbacks* wrap = Unwrap<TLSCallbacks>(args.Holder());
TLSWrap* wrap = Unwrap<TLSWrap>(args.Holder());
if (wrap->started_)
return env->ThrowError("Already started.");
@ -231,14 +240,14 @@ void TLSCallbacks::Start(const FunctionCallbackInfo<Value>& args) {
}
void TLSCallbacks::SSLInfoCallback(const SSL* ssl_, int where, int ret) {
void TLSWrap::SSLInfoCallback(const SSL* ssl_, int where, int ret) {
if (!(where & (SSL_CB_HANDSHAKE_START | SSL_CB_HANDSHAKE_DONE)))
return;
// Be compatible with older versions of OpenSSL. SSL_get_app_data() wants
// a non-const SSL* in OpenSSL <= 0.9.7e.
SSL* ssl = const_cast<SSL*>(ssl_);
TLSCallbacks* c = static_cast<TLSCallbacks*>(SSL_get_app_data(ssl));
TLSWrap* c = static_cast<TLSWrap*>(SSL_get_app_data(ssl));
Environment* env = c->env();
HandleScope handle_scope(env->isolate());
Context::Scope context_scope(env->context());
@ -261,7 +270,7 @@ void TLSCallbacks::SSLInfoCallback(const SSL* ssl_, int where, int ret) {
}
void TLSCallbacks::EncOut() {
void TLSWrap::EncOut() {
// Ignore cycling data if ClientHello wasn't yet parsed
if (!hello_parser_.IsEnded())
return;
@ -291,47 +300,49 @@ void TLSCallbacks::EncOut() {
write_size_ = NodeBIO::FromBIO(enc_out_)->PeekMultiple(data, size, &count);
CHECK(write_size_ != 0 && count != 0);
write_req_.data = this;
Local<Object> req_wrap_obj =
env()->write_wrap_constructor_function()->NewInstance();
char* storage = new char[sizeof(WriteWrap)];
WriteWrap* write_req = new(storage) WriteWrap(env(),
req_wrap_obj,
this,
EncOutCb);
uv_buf_t buf[ARRAY_SIZE(data)];
for (size_t i = 0; i < count; i++)
buf[i] = uv_buf_init(data[i], size[i]);
int r = uv_write(&write_req_, wrap()->stream(), buf, count, EncOutCb);
int r = stream_->DoWrite(write_req, buf, count, nullptr);
// Ignore errors, this should be already handled in js
if (!r) {
if (wrap()->is_tcp()) {
if (!r)
NODE_COUNT_NET_BYTES_SENT(write_size_);
} else if (wrap()->is_named_pipe()) {
NODE_COUNT_PIPE_BYTES_SENT(write_size_);
}
}
}
void TLSCallbacks::EncOutCb(uv_write_t* req, int status) {
TLSCallbacks* callbacks = static_cast<TLSCallbacks*>(req->data);
void TLSWrap::EncOutCb(WriteWrap* req_wrap, int status) {
TLSWrap* wrap = req_wrap->wrap()->Cast<TLSWrap>();
// Handle error
if (status) {
// Ignore errors after shutdown
if (callbacks->shutdown_)
if (wrap->shutdown_)
return;
// Notify about error
callbacks->InvokeQueued(status);
wrap->InvokeQueued(status);
return;
}
// Commit
NodeBIO::FromBIO(callbacks->enc_out_)->Read(nullptr, callbacks->write_size_);
NodeBIO::FromBIO(wrap->enc_out_)->Read(nullptr, wrap->write_size_);
// Try writing more data
callbacks->write_size_ = 0;
callbacks->EncOut();
wrap->write_size_ = 0;
wrap->EncOut();
}
Local<Value> TLSCallbacks::GetSSLError(int status, int* err, const char** msg) {
Local<Value> TLSWrap::GetSSLError(int status, int* err, const char** msg) {
EscapableHandleScope scope(env()->isolate());
*err = SSL_get_error(ssl_, status);
@ -373,7 +384,7 @@ Local<Value> TLSCallbacks::GetSSLError(int status, int* err, const char** msg) {
}
void TLSCallbacks::ClearOut() {
void TLSWrap::ClearOut() {
// Ignore cycling data if ClientHello wasn't yet parsed
if (!hello_parser_.IsEnded())
return;
@ -389,22 +400,30 @@ void TLSCallbacks::ClearOut() {
char out[kClearOutChunkSize];
int read;
do {
for (;;) {
read = SSL_read(ssl_, out, sizeof(out));
if (read > 0) {
Local<Value> argv[] = {
Integer::New(env()->isolate(), read),
Buffer::New(env(), out, read)
};
wrap()->MakeCallback(env()->onread_string(), ARRAY_SIZE(argv), argv);
if (read <= 0)
break;
while (read > 0) {
int avail = read;
uv_buf_t buf;
OnAlloc(avail, &buf);
if (static_cast<int>(buf.len) < avail)
avail = buf.len;
memcpy(buf.base, out, avail);
OnRead(avail, &buf, UV_UNKNOWN_HANDLE);
read -= avail;
}
}
} while (read > 0);
int flags = SSL_get_shutdown(ssl_);
if (!eof_ && flags & SSL_RECEIVED_SHUTDOWN) {
eof_ = true;
Local<Value> arg = Integer::New(env()->isolate(), UV_EOF);
wrap()->MakeCallback(env()->onread_string(), 1, &arg);
OnRead(UV_EOF, nullptr, UV_UNKNOWN_HANDLE);
}
if (read == -1) {
@ -427,7 +446,7 @@ void TLSCallbacks::ClearOut() {
}
bool TLSCallbacks::ClearIn() {
bool TLSWrap::ClearIn() {
// Ignore cycling data if ClientHello wasn't yet parsed
if (!hello_parser_.IsEnded())
return false;
@ -466,28 +485,67 @@ bool TLSCallbacks::ClearIn() {
}
const char* TLSCallbacks::Error() const {
void* TLSWrap::Cast() {
return reinterpret_cast<void*>(this);
}
AsyncWrap* TLSWrap::GetAsyncWrap() {
return static_cast<AsyncWrap*>(this);
}
bool TLSWrap::IsIPCPipe() const {
return stream_->IsIPCPipe();
}
int TLSWrap::GetFD() const {
return stream_->GetFD();
}
bool TLSWrap::IsAlive() const {
return stream_->IsAlive();
}
bool TLSWrap::IsClosing() const {
return stream_->IsClosing();
}
int TLSWrap::ReadStart() {
return stream_->ReadStart();
}
int TLSWrap::ReadStop() {
return stream_->ReadStop();
}
const char* TLSWrap::Error() const {
return error_;
}
void TLSCallbacks::ClearError() {
void TLSWrap::ClearError() {
delete[] error_;
error_ = nullptr;
}
int TLSCallbacks::TryWrite(uv_buf_t** bufs, size_t* count) {
int TLSWrap::DoTryWrite(uv_buf_t** bufs, size_t* count) {
// TODO(indutny): Support it
return 0;
}
int TLSCallbacks::DoWrite(WriteWrap* w,
int TLSWrap::DoWrite(WriteWrap* w,
uv_buf_t* bufs,
size_t count,
uv_stream_t* send_handle,
uv_write_cb cb) {
uv_stream_t* send_handle) {
CHECK_EQ(send_handle, nullptr);
bool empty = true;
@ -504,11 +562,11 @@ int TLSCallbacks::DoWrite(WriteWrap* w,
// However if there any data that should be written to socket,
// callback should not be invoked immediately
if (BIO_pending(enc_out_) == 0)
return uv_write(&w->req_, wrap()->stream(), bufs, count, cb);
return stream_->DoWrite(w, bufs, count, send_handle);
}
// Queue callback to execute it on next tick
write_item_queue_.PushBack(new WriteItem(w, cb));
write_item_queue_.PushBack(new WriteItem(w));
// Write queued data
if (empty) {
@ -552,22 +610,49 @@ int TLSCallbacks::DoWrite(WriteWrap* w,
}
void TLSCallbacks::AfterWrite(WriteWrap* w) {
void TLSWrap::OnAfterWriteImpl(WriteWrap* w, void* ctx) {
// Intentionally empty
}
void TLSCallbacks::DoAlloc(uv_handle_t* handle,
size_t suggested_size,
uv_buf_t* buf) {
void TLSWrap::OnAllocImpl(size_t suggested_size, uv_buf_t* buf, void* ctx) {
TLSWrap* wrap = static_cast<TLSWrap*>(ctx);
size_t size = 0;
buf->base = NodeBIO::FromBIO(enc_in_)->PeekWritable(&size);
buf->base = NodeBIO::FromBIO(wrap->enc_in_)->PeekWritable(&size);
buf->len = size;
}
void TLSCallbacks::DoRead(uv_stream_t* handle,
ssize_t nread,
void TLSWrap::OnReadImpl(ssize_t nread,
const uv_buf_t* buf,
uv_handle_type pending,
void* ctx) {
TLSWrap* wrap = static_cast<TLSWrap*>(ctx);
wrap->DoRead(nread, buf, pending);
}
void TLSWrap::OnAllocSelf(size_t suggested_size, uv_buf_t* buf, void* ctx) {
buf->base = static_cast<char*>(malloc(suggested_size));
CHECK_NE(buf->base, nullptr);
buf->len = suggested_size;
}
void TLSWrap::OnReadSelf(ssize_t nread,
const uv_buf_t* buf,
uv_handle_type pending,
void* ctx) {
TLSWrap* wrap = static_cast<TLSWrap*>(ctx);
Local<Object> buf_obj;
if (buf != nullptr)
buf_obj = Buffer::Use(wrap->env(), buf->base, buf->len);
wrap->EmitData(nread, buf_obj, Local<Object>());
}
void TLSWrap::DoRead(ssize_t nread,
const uv_buf_t* buf,
uv_handle_type pending) {
if (nread < 0) {
@ -583,8 +668,7 @@ void TLSCallbacks::DoRead(uv_stream_t* handle,
HandleScope handle_scope(env()->isolate());
Context::Scope context_scope(env()->context());
Local<Value> arg = Integer::New(env()->isolate(), nread);
wrap()->MakeCallback(env()->onread_string(), 1, &arg);
OnRead(nread, nullptr, UV_UNKNOWN_HANDLE);
return;
}
@ -608,19 +692,19 @@ void TLSCallbacks::DoRead(uv_stream_t* handle,
}
int TLSCallbacks::DoShutdown(ShutdownWrap* req_wrap, uv_shutdown_cb cb) {
int TLSWrap::DoShutdown(ShutdownWrap* req_wrap) {
if (SSL_shutdown(ssl_) == 0)
SSL_shutdown(ssl_);
shutdown_ = true;
EncOut();
return StreamWrapCallbacks::DoShutdown(req_wrap, cb);
return stream_->DoShutdown(req_wrap);
}
void TLSCallbacks::SetVerifyMode(const FunctionCallbackInfo<Value>& args) {
void TLSWrap::SetVerifyMode(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
TLSCallbacks* wrap = Unwrap<TLSCallbacks>(args.Holder());
TLSWrap* wrap = Unwrap<TLSWrap>(args.Holder());
if (args.Length() < 2 || !args[0]->IsBoolean() || !args[1]->IsBoolean())
return env->ThrowTypeError("Bad arguments, expected two booleans");
@ -647,34 +731,34 @@ void TLSCallbacks::SetVerifyMode(const FunctionCallbackInfo<Value>& args) {
}
void TLSCallbacks::EnableSessionCallbacks(
void TLSWrap::EnableSessionCallbacks(
const FunctionCallbackInfo<Value>& args) {
TLSCallbacks* wrap = Unwrap<TLSCallbacks>(args.Holder());
TLSWrap* wrap = Unwrap<TLSWrap>(args.Holder());
wrap->enable_session_callbacks();
EnableHelloParser(args);
}
void TLSCallbacks::EnableHelloParser(const FunctionCallbackInfo<Value>& args) {
TLSCallbacks* wrap = Unwrap<TLSCallbacks>(args.Holder());
void TLSWrap::EnableHelloParser(const FunctionCallbackInfo<Value>& args) {
TLSWrap* wrap = Unwrap<TLSWrap>(args.Holder());
NodeBIO::FromBIO(wrap->enc_in_)->set_initial(kMaxHelloLength);
wrap->hello_parser_.Start(SSLWrap<TLSCallbacks>::OnClientHello,
wrap->hello_parser_.Start(SSLWrap<TLSWrap>::OnClientHello,
OnClientHelloParseEnd,
wrap);
}
void TLSCallbacks::OnClientHelloParseEnd(void* arg) {
TLSCallbacks* c = static_cast<TLSCallbacks*>(arg);
void TLSWrap::OnClientHelloParseEnd(void* arg) {
TLSWrap* c = static_cast<TLSWrap*>(arg);
c->Cycle();
}
#ifdef SSL_CTRL_SET_TLSEXT_SERVERNAME_CB
void TLSCallbacks::GetServername(const FunctionCallbackInfo<Value>& args) {
void TLSWrap::GetServername(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
TLSCallbacks* wrap = Unwrap<TLSCallbacks>(args.Holder());
TLSWrap* wrap = Unwrap<TLSWrap>(args.Holder());
const char* servername = SSL_get_servername(wrap->ssl_,
TLSEXT_NAMETYPE_host_name);
@ -686,10 +770,10 @@ void TLSCallbacks::GetServername(const FunctionCallbackInfo<Value>& args) {
}
void TLSCallbacks::SetServername(const FunctionCallbackInfo<Value>& args) {
void TLSWrap::SetServername(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
TLSCallbacks* wrap = Unwrap<TLSCallbacks>(args.Holder());
TLSWrap* wrap = Unwrap<TLSWrap>(args.Holder());
if (args.Length() < 1 || !args[0]->IsString())
return env->ThrowTypeError("First argument should be a string");
@ -707,8 +791,8 @@ void TLSCallbacks::SetServername(const FunctionCallbackInfo<Value>& args) {
}
int TLSCallbacks::SelectSNIContextCallback(SSL* s, int* ad, void* arg) {
TLSCallbacks* p = static_cast<TLSCallbacks*>(SSL_get_app_data(s));
int TLSWrap::SelectSNIContextCallback(SSL* s, int* ad, void* arg) {
TLSWrap* p = static_cast<TLSWrap*>(SSL_get_app_data(s));
Environment* env = p->env();
const char* servername = SSL_get_servername(s, TLSEXT_NAMETYPE_host_name);
@ -744,12 +828,12 @@ int TLSCallbacks::SelectSNIContextCallback(SSL* s, int* ad, void* arg) {
#endif // SSL_CTRL_SET_TLSEXT_SERVERNAME_CB
void TLSCallbacks::Initialize(Handle<Object> target,
void TLSWrap::Initialize(Handle<Object> target,
Handle<Value> unused,
Handle<Context> context) {
Environment* env = Environment::GetCurrent(context);
env->SetMethod(target, "wrap", TLSCallbacks::Wrap);
env->SetMethod(target, "wrap", TLSWrap::Wrap);
Local<FunctionTemplate> t = FunctionTemplate::New(env->isolate());
t->InstanceTemplate()->SetInternalFieldCount(1);
@ -761,16 +845,18 @@ void TLSCallbacks::Initialize(Handle<Object> target,
env->SetProtoMethod(t, "enableSessionCallbacks", EnableSessionCallbacks);
env->SetProtoMethod(t, "enableHelloParser", EnableHelloParser);
SSLWrap<TLSCallbacks>::AddMethods(env, t);
StreamBase::AddMethods<TLSWrap>(env, t);
SSLWrap<TLSWrap>::AddMethods(env, t);
#ifdef SSL_CTRL_SET_TLSEXT_SERVERNAME_CB
env->SetProtoMethod(t, "getServername", GetServername);
env->SetProtoMethod(t, "setServername", SetServername);
#endif // SSL_CRT_SET_TLSEXT_SERVERNAME_CB
env->set_tls_wrap_constructor_template(t);
env->set_tls_wrap_constructor_function(t->GetFunction());
}
} // namespace node
NODE_MODULE_CONTEXT_AWARE_BUILTIN(tls_wrap, node::TLSCallbacks::Initialize)
NODE_MODULE_CONTEXT_AWARE_BUILTIN(tls_wrap, node::TLSWrap::Initialize)

69
src/tls_wrap.h

@ -21,33 +21,33 @@ namespace crypto {
class SecureContext;
}
class TLSCallbacks : public crypto::SSLWrap<TLSCallbacks>,
public StreamWrapCallbacks,
class TLSWrap : public crypto::SSLWrap<TLSWrap>,
public StreamBase,
public AsyncWrap {
public:
~TLSCallbacks() override;
~TLSWrap() override;
static void Initialize(v8::Handle<v8::Object> target,
v8::Handle<v8::Value> unused,
v8::Handle<v8::Context> context);
const char* Error() const override;
void ClearError() override;
int TryWrite(uv_buf_t** bufs, size_t* count) override;
void* Cast() override;
int GetFD() const override;
bool IsAlive() const override;
bool IsClosing() const override;
// JavaScript functions
int ReadStart() override;
int ReadStop() override;
int DoShutdown(ShutdownWrap* req_wrap) override;
int DoTryWrite(uv_buf_t** bufs, size_t* count) override;
int DoWrite(WriteWrap* w,
uv_buf_t* bufs,
size_t count,
uv_stream_t* send_handle,
uv_write_cb cb) override;
void AfterWrite(WriteWrap* w) override;
void DoAlloc(uv_handle_t* handle,
size_t suggested_size,
uv_buf_t* buf) override;
void DoRead(uv_stream_t* handle,
ssize_t nread,
const uv_buf_t* buf,
uv_handle_type pending) override;
int DoShutdown(ShutdownWrap* req_wrap, uv_shutdown_cb cb) override;
uv_stream_t* send_handle) override;
const char* Error() const override;
void ClearError() override;
void NewSessionDoneCb();
@ -66,27 +66,26 @@ class TLSCallbacks : public crypto::SSLWrap<TLSCallbacks>,
// Write callback queue's item
class WriteItem {
public:
WriteItem(WriteWrap* w, uv_write_cb cb) : w_(w), cb_(cb) {
explicit WriteItem(WriteWrap* w) : w_(w) {
}
~WriteItem() {
w_ = nullptr;
cb_ = nullptr;
}
WriteWrap* w_;
uv_write_cb cb_;
ListNode<WriteItem> member_;
};
TLSCallbacks(Environment* env,
TLSWrap(Environment* env,
Kind kind,
v8::Handle<v8::Object> sc,
StreamWrapCallbacks* old);
StreamBase* steram,
v8::Handle<v8::Object> stream_obj,
v8::Handle<v8::Object> sc);
static void SSLInfoCallback(const SSL* ssl_, int where, int ret);
void InitSSL();
void EncOut();
static void EncOutCb(uv_write_t* req, int status);
static void EncOutCb(WriteWrap* req_wrap, int status);
bool ClearIn();
void ClearOut();
void MakePending();
@ -104,6 +103,25 @@ class TLSCallbacks : public crypto::SSLWrap<TLSCallbacks>,
}
}
AsyncWrap* GetAsyncWrap() override;
bool IsIPCPipe() const override;
// Resource implementation
static void OnAfterWriteImpl(WriteWrap* w, void* ctx);
static void OnAllocImpl(size_t size, uv_buf_t* buf, void* ctx);
static void OnReadImpl(ssize_t nread,
const uv_buf_t* buf,
uv_handle_type pending,
void* ctx);
static void OnAfterWriteSelf(WriteWrap* w, void* ctx);
static void OnAllocSelf(size_t size, uv_buf_t* buf, void* ctx);
static void OnReadSelf(ssize_t nread,
const uv_buf_t* buf,
uv_handle_type pending,
void* ctx);
void DoRead(ssize_t nread, const uv_buf_t* buf, uv_handle_type pending);
// If |msg| is not nullptr, caller is responsible for calling `delete[] *msg`.
v8::Local<v8::Value> GetSSLError(int status, int* err, const char** msg);
@ -125,10 +143,11 @@ class TLSCallbacks : public crypto::SSLWrap<TLSCallbacks>,
crypto::SecureContext* sc_;
v8::Persistent<v8::Object> sc_handle_;
StreamBase* stream_;
v8::Persistent<v8::Object> stream_handle_;
BIO* enc_in_;
BIO* enc_out_;
NodeBIO* clear_in_;
uv_write_t write_req_;
size_t write_size_;
size_t write_queue_size_;
typedef ListHead<WriteItem, &WriteItem::member_> WriteItemList;

18
src/tty_wrap.cc

@ -36,26 +36,10 @@ void TTYWrap::Initialize(Handle<Object> target,
t->SetClassName(FIXED_ONE_BYTE_STRING(env->isolate(), "TTY"));
t->InstanceTemplate()->SetInternalFieldCount(1);
enum PropertyAttribute attributes =
static_cast<PropertyAttribute>(v8::ReadOnly | v8::DontDelete);
t->InstanceTemplate()->SetAccessor(env->fd_string(),
StreamWrap::GetFD,
nullptr,
Handle<Value>(),
v8::DEFAULT,
attributes);
env->SetProtoMethod(t, "close", HandleWrap::Close);
env->SetProtoMethod(t, "unref", HandleWrap::Unref);
env->SetProtoMethod(t, "readStart", StreamWrap::ReadStart);
env->SetProtoMethod(t, "readStop", StreamWrap::ReadStop);
env->SetProtoMethod(t, "writeBuffer", StreamWrap::WriteBuffer);
env->SetProtoMethod(t, "writeAsciiString", StreamWrap::WriteAsciiString);
env->SetProtoMethod(t, "writeUtf8String", StreamWrap::WriteUtf8String);
env->SetProtoMethod(t, "writeUcs2String", StreamWrap::WriteUcs2String);
env->SetProtoMethod(t, "writeBinaryString", StreamWrap::WriteBinaryString);
StreamWrap::AddMethods(env, t);
env->SetProtoMethod(t, "getWindowSize", TTYWrap::GetWindowSize);
env->SetProtoMethod(t, "setRawMode", SetRawMode);

12
test/parallel/test-tls-client-default-ciphers.js

@ -2,13 +2,21 @@ var assert = require('assert');
var common = require('../common');
var tls = require('tls');
function Done() {}
function test1() {
var ciphers = '';
tls.createSecureContext = function(options) {
ciphers = options.ciphers
ciphers = options.ciphers;
throw new Done();
}
try {
var s = tls.connect(common.PORT);
s.destroy();
} catch (e) {
assert(e instanceof Done);
}
assert.equal(ciphers, tls.DEFAULT_CIPHERS);
}
test1();

4
test/parallel/test-tls-close-notify.js

@ -17,8 +17,8 @@ var server = tls.createServer({
cert: fs.readFileSync(common.fixturesDir + '/keys/agent1-cert.pem')
}, function(c) {
// Send close-notify without shutting down TCP socket
if (c.ssl.shutdown() !== 1)
c.ssl.shutdown();
if (c._handle.shutdownSSL() !== 1)
c._handle.shutdownSSL();
}).listen(common.PORT, function() {
var c = tls.connect(common.PORT, {
rejectUnauthorized: false

5
test/parallel/test-tls-multi-key.js

@ -28,15 +28,14 @@ var server = tls.createServer(options, function(conn) {
ciphers: 'ECDHE-ECDSA-AES256-GCM-SHA384',
rejectUnauthorized: false
}, function() {
ciphers.push(ecdsa.getCipher());
var rsa = tls.connect(common.PORT, {
ciphers: 'ECDHE-RSA-AES256-GCM-SHA384',
rejectUnauthorized: false
}, function() {
ciphers.push(rsa.getCipher());
ecdsa.destroy();
rsa.destroy();
ciphers.push(ecdsa.getCipher());
ciphers.push(rsa.getCipher());
server.close();
});
});

Loading…
Cancel
Save