Browse Source

src: revert domain using AsyncListeners

This is a slightly modified revert of bc39bdd.

Getting domains to use AsyncListeners became too much of a challenge
with many edge cases. While this is still a goal, it will have to be
deferred for now until more test coverage can be provided.
v0.11.11-release
Trevor Norris 11 years ago
parent
commit
828f14556e
  1. 4
      lib/_http_client.js
  2. 158
      lib/domain.js
  3. 6
      lib/events.js
  4. 11
      lib/net.js
  5. 23
      lib/timers.js
  6. 92
      src/async-wrap-inl.h
  7. 7
      src/async-wrap.h
  8. 35
      src/env-inl.h
  9. 29
      src/env.h
  10. 162
      src/node.cc
  11. 39
      src/node.js
  12. 6
      src/node_crypto.cc
  13. 3
      src/req_wrap.h

4
lib/_http_client.js

@ -457,12 +457,8 @@ ClientRequest.prototype.onSocket = function(socket) {
var req = this; var req = this;
process.nextTick(function() { process.nextTick(function() {
// If a domain was added to the request, attach it to the socket.
if (req.domain)
socket._handle.addAsyncListener(req.domain._listener);
tickOnSocket(req, socket); tickOnSocket(req, socket);
}); });
}; };
ClientRequest.prototype._deferToConnect = function(method, arguments_, cb) { ClientRequest.prototype._deferToConnect = function(method, arguments_, cb) {

158
lib/domain.js

@ -28,6 +28,26 @@ var inherits = util.inherits;
// a few side effects. // a few side effects.
EventEmitter.usingDomains = true; EventEmitter.usingDomains = true;
// overwrite process.domain with a getter/setter that will allow for more
// effective optimizations
var _domain = [null];
Object.defineProperty(process, 'domain', {
enumerable: true,
get: function() {
return _domain[0];
},
set: function(arg) {
return _domain[0] = arg;
}
});
// objects with external array data are excellent ways to communicate state
// between js and c++ w/o much overhead
var _domain_flag = {};
// let the process know we're using domains
process._setupDomainUse(_domain, _domain_flag);
exports.Domain = Domain; exports.Domain = Domain;
exports.create = exports.createDomain = function() { exports.create = exports.createDomain = function() {
@ -42,71 +62,66 @@ exports._stack = stack;
exports.active = null; exports.active = null;
var listenerObj = {
error: function errorHandler(domain, er) {
var caught = false;
// ignore errors on disposed domains.
//
// XXX This is a bit stupid. We should probably get rid of
// domain.dispose() altogether. It's almost always a terrible
// idea. --isaacs
if (domain._disposed)
return true;
er.domain = domain;
er.domainThrown = true;
// wrap this in a try/catch so we don't get infinite throwing
try {
// One of three things will happen here.
//
// 1. There is a handler, caught = true
// 2. There is no handler, caught = false
// 3. It throws, caught = false
//
// If caught is false after this, then there's no need to exit()
// the domain, because we're going to crash the process anyway.
caught = domain.emit('error', er);
if (stack.length === 0)
process.removeAsyncListener(domain._listener);
// Exit all domains on the stack. Uncaught exceptions end the
// current tick and no domains should be left on the stack
// between ticks.
stack.length = 0;
exports.active = process.domain = null;
} catch (er2) {
// The domain error handler threw! oh no!
// See if another domain can catch THIS error,
// or else crash on the original one.
// If the user already exited it, then don't double-exit.
if (domain === exports.active) {
stack.pop();
}
if (stack.length) {
exports.active = process.domain = stack[stack.length - 1];
caught = process._fatalException(er2);
} else {
caught = false;
}
return caught;
}
return caught;
}
};
inherits(Domain, EventEmitter); inherits(Domain, EventEmitter);
function Domain() { function Domain() {
EventEmitter.call(this); EventEmitter.call(this);
this.members = []; this.members = [];
this._listener = process.createAsyncListener(listenerObj, this);
} }
Domain.prototype.members = undefined; Domain.prototype.members = undefined;
Domain.prototype._disposed = undefined; Domain.prototype._disposed = undefined;
Domain.prototype._listener = undefined;
// Called by process._fatalException in case an error was thrown.
Domain.prototype._errorHandler = function errorHandler(er) {
var caught = false;
// ignore errors on disposed domains.
//
// XXX This is a bit stupid. We should probably get rid of
// domain.dispose() altogether. It's almost always a terrible
// idea. --isaacs
if (this._disposed)
return true;
er.domain = this;
er.domainThrown = true;
// wrap this in a try/catch so we don't get infinite throwing
try {
// One of three things will happen here.
//
// 1. There is a handler, caught = true
// 2. There is no handler, caught = false
// 3. It throws, caught = false
//
// If caught is false after this, then there's no need to exit()
// the domain, because we're going to crash the process anyway.
caught = this.emit('error', er);
// Exit all domains on the stack. Uncaught exceptions end the
// current tick and no domains should be left on the stack
// between ticks.
stack.length = 0;
exports.active = process.domain = null;
} catch (er2) {
// The domain error handler threw! oh no!
// See if another domain can catch THIS error,
// or else crash on the original one.
// If the user already exited it, then don't double-exit.
if (this === exports.active) {
stack.pop();
}
if (stack.length) {
exports.active = process.domain = stack[stack.length - 1];
caught = process._fatalException(er2);
} else {
caught = false;
}
return caught;
}
return caught;
};
Domain.prototype.enter = function() { Domain.prototype.enter = function() {
@ -116,22 +131,20 @@ Domain.prototype.enter = function() {
// to push it onto the stack so that we can pop it later. // to push it onto the stack so that we can pop it later.
exports.active = process.domain = this; exports.active = process.domain = this;
stack.push(this); stack.push(this);
_domain_flag[0] = stack.length;
process.addAsyncListener(this._listener);
}; };
Domain.prototype.exit = function() { Domain.prototype.exit = function() {
if (this._disposed) return; if (this._disposed) return;
process.removeAsyncListener(this._listener);
// exit all domains until this one. // exit all domains until this one.
var index = stack.lastIndexOf(this); var index = stack.lastIndexOf(this);
if (index !== -1) if (index !== -1)
stack.splice(index + 1); stack.splice(index + 1);
else else
stack.length = 0; stack.length = 0;
_domain_flag[0] = stack.length;
exports.active = stack[stack.length - 1]; exports.active = stack[stack.length - 1];
process.domain = exports.active; process.domain = exports.active;
@ -165,13 +178,6 @@ Domain.prototype.add = function(ee) {
ee.domain = this; ee.domain = this;
this.members.push(ee); this.members.push(ee);
// Adding the domain._listener to the Wrap associated with the event
// emitter instance will be done automatically either on class
// instantiation or manually, like in cases of net listen().
// The reason it cannot be done here is because in specific cases the
// _handle is not created on EE instantiation, so there's no place to
// add the listener.
}; };
@ -180,24 +186,6 @@ Domain.prototype.remove = function(ee) {
var index = this.members.indexOf(ee); var index = this.members.indexOf(ee);
if (index !== -1) if (index !== -1)
this.members.splice(index, 1); this.members.splice(index, 1);
// First check if the ee is a handle itself.
if (ee.removeAsyncListener)
ee.removeAsyncListener(this._listener);
// Manually remove the asyncListener from the handle, if possible.
if (ee._handle && ee._handle.removeAsyncListener)
ee._handle.removeAsyncListener(this._listener);
// TODO(trevnorris): Are there cases where the handle doesn't live on
// the ee or the _handle.
// TODO(trevnorris): For debugging that we've missed adding AsyncWrap's
// methods to a handle somewhere on the native side.
if (ee._handle && !ee._handle.removeAsyncListener) {
process._rawDebug('Wrap handle is missing AsyncWrap methods');
process.abort();
}
}; };

6
lib/events.js

@ -91,6 +91,9 @@ EventEmitter.prototype.emit = function(type) {
if (util.isUndefined(handler)) if (util.isUndefined(handler))
return false; return false;
if (this.domain && this !== process)
this.domain.enter();
if (util.isFunction(handler)) { if (util.isFunction(handler)) {
switch (arguments.length) { switch (arguments.length) {
// fast cases // fast cases
@ -123,6 +126,9 @@ EventEmitter.prototype.emit = function(type) {
listeners[i].apply(this, args); listeners[i].apply(this, args);
} }
if (this.domain && this !== process)
this.domain.exit();
return true; return true;
}; };

11
lib/net.js

@ -1089,20 +1089,9 @@ Server.prototype._listen2 = function(address, port, addressType, backlog, fd) {
// generate connection key, this should be unique to the connection // generate connection key, this should be unique to the connection
this._connectionKey = addressType + ':' + address + ':' + port; this._connectionKey = addressType + ':' + address + ':' + port;
// If a domain is attached to the event emitter then we need to add
// the listener to the handle.
if (this.domain) {
this._handle.addAsyncListener(this.domain._listener);
process.addAsyncListener(this.domain._listener);
}
process.nextTick(function() { process.nextTick(function() {
self.emit('listening'); self.emit('listening');
}); });
if (this.domain) {
process.removeAsyncListener(this.domain._listener);
}
}; };

23
lib/timers.js

@ -112,7 +112,8 @@ function listOnTimeout() {
// other timers that expire on this tick should still run. // other timers that expire on this tick should still run.
// //
// https://github.com/joyent/node/issues/2631 // https://github.com/joyent/node/issues/2631
if (first.domain && first.domain._disposed) var domain = first.domain;
if (domain && domain._disposed)
continue; continue;
hasQueue = !!first._asyncQueue; hasQueue = !!first._asyncQueue;
@ -120,8 +121,12 @@ function listOnTimeout() {
try { try {
if (hasQueue) if (hasQueue)
loadAsyncQueue(first); loadAsyncQueue(first);
if (domain)
domain.enter();
threw = true; threw = true;
first._onTimeout(); first._onTimeout();
if (domain)
domain.exit();
if (hasQueue) if (hasQueue)
unloadAsyncQueue(first); unloadAsyncQueue(first);
threw = false; threw = false;
@ -368,7 +373,7 @@ L.init(immediateQueue);
function processImmediate() { function processImmediate() {
var queue = immediateQueue; var queue = immediateQueue;
var hasQueue, immediate; var domain, hasQueue, immediate;
immediateQueue = {}; immediateQueue = {};
L.init(immediateQueue); L.init(immediateQueue);
@ -376,9 +381,12 @@ function processImmediate() {
while (L.isEmpty(queue) === false) { while (L.isEmpty(queue) === false) {
immediate = L.shift(queue); immediate = L.shift(queue);
hasQueue = !!immediate._asyncQueue; hasQueue = !!immediate._asyncQueue;
domain = immediate.domain;
if (hasQueue) if (hasQueue)
loadAsyncQueue(immediate); loadAsyncQueue(immediate);
if (domain)
domain.enter();
var threw = true; var threw = true;
try { try {
@ -398,6 +406,8 @@ function processImmediate() {
} }
} }
if (domain)
domain.exit();
if (hasQueue) if (hasQueue)
unloadAsyncQueue(immediate); unloadAsyncQueue(immediate);
} }
@ -481,7 +491,7 @@ function unrefTimeout() {
debug('unrefTimer fired'); debug('unrefTimer fired');
var diff, first, hasQueue, threw; var diff, domain, first, hasQueue, threw;
while (first = L.peek(unrefList)) { while (first = L.peek(unrefList)) {
diff = now - first._idleStart; diff = now - first._idleStart;
hasQueue = !!first._asyncQueue; hasQueue = !!first._asyncQueue;
@ -496,16 +506,21 @@ function unrefTimeout() {
L.remove(first); L.remove(first);
domain = first.domain;
if (!first._onTimeout) continue; if (!first._onTimeout) continue;
if (first.domain && first.domain._disposed) continue; if (domain && domain._disposed) continue;
try { try {
if (hasQueue) if (hasQueue)
loadAsyncQueue(first); loadAsyncQueue(first);
if (domain) domain.enter();
threw = true; threw = true;
debug('unreftimer firing timeout'); debug('unreftimer firing timeout');
first._onTimeout(); first._onTimeout();
threw = false; threw = false;
if (domain)
domain.exit();
if (hasQueue) if (hasQueue)
unloadAsyncQueue(first); unloadAsyncQueue(first);
} finally { } finally {

92
src/async-wrap-inl.h

@ -88,10 +88,102 @@ inline bool AsyncWrap::has_async_queue() {
} }
// I hate you domains.
inline v8::Handle<v8::Value> AsyncWrap::MakeDomainCallback(
const v8::Handle<v8::Function> cb,
int argc,
v8::Handle<v8::Value>* argv) {
assert(env()->context() == env()->isolate()->GetCurrentContext());
v8::Local<v8::Object> context = object();
v8::Local<v8::Object> process = env()->process_object();
v8::Local<v8::Value> domain_v = context->Get(env()->domain_string());
v8::Local<v8::Object> domain;
v8::TryCatch try_catch;
try_catch.SetVerbose(true);
if (has_async_queue()) {
v8::Local<v8::Value> val = context.As<v8::Value>();
env()->async_listener_load_function()->Call(process, 1, &val);
if (try_catch.HasCaught())
return v8::Undefined(env()->isolate());
}
bool has_domain = domain_v->IsObject();
if (has_domain) {
domain = domain_v.As<v8::Object>();
if (domain->Get(env()->disposed_string())->IsTrue())
return Undefined(env()->isolate());
v8::Local<v8::Function> enter =
domain->Get(env()->enter_string()).As<v8::Function>();
assert(enter->IsFunction());
enter->Call(domain, 0, NULL);
if (try_catch.HasCaught())
return Undefined(env()->isolate());
}
v8::Local<v8::Value> ret = cb->Call(context, argc, argv);
if (try_catch.HasCaught()) {
return Undefined(env()->isolate());
}
if (has_domain) {
v8::Local<v8::Function> exit =
domain->Get(env()->exit_string()).As<v8::Function>();
assert(exit->IsFunction());
exit->Call(domain, 0, NULL);
if (try_catch.HasCaught())
return Undefined(env()->isolate());
}
if (has_async_queue()) {
v8::Local<v8::Value> val = context.As<v8::Value>();
env()->async_listener_unload_function()->Call(process, 1, &val);
if (try_catch.HasCaught())
return Undefined(env()->isolate());
}
Environment::TickInfo* tick_info = env()->tick_info();
if (tick_info->in_tick()) {
return ret;
}
if (tick_info->length() == 0) {
tick_info->set_index(0);
return ret;
}
tick_info->set_in_tick(true);
env()->tick_callback_function()->Call(process, 0, NULL);
tick_info->set_in_tick(false);
if (try_catch.HasCaught()) {
tick_info->set_last_threw(true);
return Undefined(env()->isolate());
}
return ret;
}
inline v8::Handle<v8::Value> AsyncWrap::MakeCallback( inline v8::Handle<v8::Value> AsyncWrap::MakeCallback(
const v8::Handle<v8::Function> cb, const v8::Handle<v8::Function> cb,
int argc, int argc,
v8::Handle<v8::Value>* argv) { v8::Handle<v8::Value>* argv) {
if (env()->using_domains())
return MakeDomainCallback(cb, argc, argv);
assert(env()->context() == env()->isolate()->GetCurrentContext()); assert(env()->context() == env()->isolate()->GetCurrentContext());
v8::Local<v8::Object> context = object(); v8::Local<v8::Object> context = object();

7
src/async-wrap.h

@ -62,6 +62,13 @@ class AsyncWrap : public BaseObject {
v8::Handle<v8::Value>* argv); v8::Handle<v8::Value>* argv);
private: private:
// TODO(trevnorris): BURN IN FIRE! Remove this as soon as a suitable
// replacement is committed.
inline v8::Handle<v8::Value> MakeDomainCallback(
const v8::Handle<v8::Function> cb,
int argc,
v8::Handle<v8::Value>* argv);
// Add an async listener to an existing handle. // Add an async listener to an existing handle.
template <typename Type> template <typename Type>
static inline void AddAsyncListener( static inline void AddAsyncListener(

35
src/env-inl.h

@ -86,6 +86,22 @@ inline uint32_t Environment::AsyncListener::count() const {
return fields_[kCount]; return fields_[kCount];
} }
inline Environment::DomainFlag::DomainFlag() {
for (int i = 0; i < kFieldsCount; ++i) fields_[i] = 0;
}
inline uint32_t* Environment::DomainFlag::fields() {
return fields_;
}
inline int Environment::DomainFlag::fields_count() const {
return kFieldsCount;
}
inline uint32_t Environment::DomainFlag::count() const {
return fields_[kCount];
}
inline Environment::TickInfo::TickInfo() : in_tick_(false), last_threw_(false) { inline Environment::TickInfo::TickInfo() : in_tick_(false), last_threw_(false) {
for (int i = 0; i < kFieldsCount; ++i) for (int i = 0; i < kFieldsCount; ++i)
fields_[i] = 0; fields_[i] = 0;
@ -163,6 +179,7 @@ inline Environment::Environment(v8::Local<v8::Context> context)
: isolate_(context->GetIsolate()), : isolate_(context->GetIsolate()),
isolate_data_(IsolateData::GetOrCreate(context->GetIsolate())), isolate_data_(IsolateData::GetOrCreate(context->GetIsolate())),
using_smalloc_alloc_cb_(false), using_smalloc_alloc_cb_(false),
using_domains_(false),
context_(context->GetIsolate(), context) { context_(context->GetIsolate(), context) {
// We'll be creating new objects so make sure we've entered the context. // We'll be creating new objects so make sure we've entered the context.
v8::HandleScope handle_scope(isolate()); v8::HandleScope handle_scope(isolate());
@ -193,6 +210,12 @@ inline bool Environment::has_async_listeners() const {
return const_cast<Environment*>(this)->async_listener()->count() > 0; return const_cast<Environment*>(this)->async_listener()->count() > 0;
} }
inline bool Environment::in_domain() const {
// The const_cast is okay, it doesn't violate conceptual const-ness.
return using_domains() &&
const_cast<Environment*>(this)->domain_flag()->count() > 0;
}
inline Environment* Environment::from_immediate_check_handle( inline Environment* Environment::from_immediate_check_handle(
uv_check_t* handle) { uv_check_t* handle) {
return CONTAINER_OF(handle, Environment, immediate_check_handle_); return CONTAINER_OF(handle, Environment, immediate_check_handle_);
@ -231,6 +254,10 @@ inline Environment::AsyncListener* Environment::async_listener() {
return &async_listener_count_; return &async_listener_count_;
} }
inline Environment::DomainFlag* Environment::domain_flag() {
return &domain_flag_;
}
inline Environment::TickInfo* Environment::tick_info() { inline Environment::TickInfo* Environment::tick_info() {
return &tick_info_; return &tick_info_;
} }
@ -243,6 +270,14 @@ inline void Environment::set_using_smalloc_alloc_cb(bool value) {
using_smalloc_alloc_cb_ = value; using_smalloc_alloc_cb_ = value;
} }
inline bool Environment::using_domains() const {
return using_domains_;
}
inline void Environment::set_using_domains(bool value) {
using_domains_ = value;
}
inline Environment* Environment::from_cares_timer_handle(uv_timer_t* handle) { inline Environment* Environment::from_cares_timer_handle(uv_timer_t* handle) {
return CONTAINER_OF(handle, Environment, cares_timer_handle_); return CONTAINER_OF(handle, Environment, cares_timer_handle_);
} }

29
src/env.h

@ -66,6 +66,7 @@ namespace node {
V(ctime_string, "ctime") \ V(ctime_string, "ctime") \
V(dev_string, "dev") \ V(dev_string, "dev") \
V(disposed_string, "_disposed") \ V(disposed_string, "_disposed") \
V(domain_string, "domain") \
V(enter_string, "enter") \ V(enter_string, "enter") \
V(errno_string, "errno") \ V(errno_string, "errno") \
V(exit_string, "exit") \ V(exit_string, "exit") \
@ -143,6 +144,7 @@ namespace node {
V(binding_cache_object, v8::Object) \ V(binding_cache_object, v8::Object) \
V(buffer_constructor_function, v8::Function) \ V(buffer_constructor_function, v8::Function) \
V(context, v8::Context) \ V(context, v8::Context) \
V(domain_array, v8::Array) \
V(module_load_list_array, v8::Array) \ V(module_load_list_array, v8::Array) \
V(pipe_constructor_template, v8::FunctionTemplate) \ V(pipe_constructor_template, v8::FunctionTemplate) \
V(process_object, v8::Object) \ V(process_object, v8::Object) \
@ -191,6 +193,26 @@ class Environment {
DISALLOW_COPY_AND_ASSIGN(AsyncListener); DISALLOW_COPY_AND_ASSIGN(AsyncListener);
}; };
class DomainFlag {
public:
inline uint32_t* fields();
inline int fields_count() const;
inline uint32_t count() const;
private:
friend class Environment; // So we can call the constructor.
inline DomainFlag();
enum Fields {
kCount,
kFieldsCount
};
uint32_t fields_[kFieldsCount];
DISALLOW_COPY_AND_ASSIGN(DomainFlag);
};
class TickInfo { class TickInfo {
public: public:
inline uint32_t* fields(); inline uint32_t* fields();
@ -232,6 +254,7 @@ class Environment {
inline v8::Isolate* isolate() const; inline v8::Isolate* isolate() const;
inline uv_loop_t* event_loop() const; inline uv_loop_t* event_loop() const;
inline bool has_async_listeners() const; inline bool has_async_listeners() const;
inline bool in_domain() const;
static inline Environment* from_immediate_check_handle(uv_check_t* handle); static inline Environment* from_immediate_check_handle(uv_check_t* handle);
inline uv_check_t* immediate_check_handle(); inline uv_check_t* immediate_check_handle();
@ -244,6 +267,7 @@ class Environment {
inline uv_check_t* idle_check_handle(); inline uv_check_t* idle_check_handle();
inline AsyncListener* async_listener(); inline AsyncListener* async_listener();
inline DomainFlag* domain_flag();
inline TickInfo* tick_info(); inline TickInfo* tick_info();
static inline Environment* from_cares_timer_handle(uv_timer_t* handle); static inline Environment* from_cares_timer_handle(uv_timer_t* handle);
@ -255,6 +279,9 @@ class Environment {
inline bool using_smalloc_alloc_cb() const; inline bool using_smalloc_alloc_cb() const;
inline void set_using_smalloc_alloc_cb(bool value); inline void set_using_smalloc_alloc_cb(bool value);
inline bool using_domains() const;
inline void set_using_domains(bool value);
// Strings are shared across shared contexts. The getters simply proxy to // Strings are shared across shared contexts. The getters simply proxy to
// the per-isolate primitive. // the per-isolate primitive.
#define V(PropertyName, StringValue) \ #define V(PropertyName, StringValue) \
@ -285,11 +312,13 @@ class Environment {
uv_prepare_t idle_prepare_handle_; uv_prepare_t idle_prepare_handle_;
uv_check_t idle_check_handle_; uv_check_t idle_check_handle_;
AsyncListener async_listener_count_; AsyncListener async_listener_count_;
DomainFlag domain_flag_;
TickInfo tick_info_; TickInfo tick_info_;
uv_timer_t cares_timer_handle_; uv_timer_t cares_timer_handle_;
ares_channel cares_channel_; ares_channel cares_channel_;
ares_task_list cares_task_list_; ares_task_list cares_task_list_;
bool using_smalloc_alloc_cb_; bool using_smalloc_alloc_cb_;
bool using_domains_;
#define V(PropertyName, TypeName) \ #define V(PropertyName, TypeName) \
v8::Persistent<TypeName> PropertyName ## _; v8::Persistent<TypeName> PropertyName ## _;

162
src/node.cc

@ -876,11 +876,54 @@ void SetupAsyncListener(const FunctionCallbackInfo<Value>& args) {
} }
void SetupDomainUse(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args.GetIsolate());
if (env->using_domains())
return;
env->set_using_domains(true);
HandleScope scope(node_isolate);
Local<Object> process_object = env->process_object();
Local<String> tick_callback_function_key =
FIXED_ONE_BYTE_STRING(node_isolate, "_tickDomainCallback");
Local<Function> tick_callback_function =
process_object->Get(tick_callback_function_key).As<Function>();
if (!tick_callback_function->IsFunction()) {
fprintf(stderr, "process._tickDomainCallback assigned to non-function\n");
abort();
}
process_object->Set(FIXED_ONE_BYTE_STRING(node_isolate, "_tickCallback"),
tick_callback_function);
env->set_tick_callback_function(tick_callback_function);
assert(args[0]->IsArray());
assert(args[1]->IsObject());
env->set_domain_array(args[0].As<Array>());
Local<Object> domain_flag_obj = args[1].As<Object>();
Environment::DomainFlag* domain_flag = env->domain_flag();
domain_flag_obj->SetIndexedPropertiesToExternalArrayData(
domain_flag->fields(),
kExternalUnsignedIntArray,
domain_flag->fields_count());
// Do a little housekeeping.
env->process_object()->Delete(
FIXED_ONE_BYTE_STRING(args.GetIsolate(), "_setupDomainUse"));
}
void SetupNextTick(const FunctionCallbackInfo<Value>& args) { void SetupNextTick(const FunctionCallbackInfo<Value>& args) {
HandleScope handle_scope(args.GetIsolate()); HandleScope handle_scope(args.GetIsolate());
Environment* env = Environment::GetCurrent(args.GetIsolate()); Environment* env = Environment::GetCurrent(args.GetIsolate());
assert(args[0]->IsObject() && args[1]->IsFunction()); assert(args[0]->IsObject());
assert(args[1]->IsFunction());
// Values use to cross communicate with processNextTick. // Values use to cross communicate with processNextTick.
Local<Object> tick_info_obj = args[0].As<Object>(); Local<Object> tick_info_obj = args[0].As<Object>();
@ -897,11 +940,114 @@ void SetupNextTick(const FunctionCallbackInfo<Value>& args) {
} }
Handle<Value> MakeDomainCallback(Environment* env,
Handle<Object> object,
const Handle<Function> callback,
int argc,
Handle<Value> argv[]) {
// If you hit this assertion, you forgot to enter the v8::Context first.
assert(env->context() == env->isolate()->GetCurrentContext());
Local<Object> process = env->process_object();
Local<Value> domain_v = object->Get(env->domain_string());
Local<Object> domain;
TryCatch try_catch;
try_catch.SetVerbose(true);
// TODO(trevnorris): This is sucky for performance. Fix it.
bool has_async_queue = object->Has(env->async_queue_string());
if (has_async_queue) {
Local<Value> argv[] = { object };
env->async_listener_load_function()->Call(process, ARRAY_SIZE(argv), argv);
if (try_catch.HasCaught())
return Undefined(node_isolate);
}
bool has_domain = domain_v->IsObject();
if (has_domain) {
domain = domain_v.As<Object>();
if (domain->Get(env->disposed_string())->IsTrue()) {
// domain has been disposed of.
return Undefined(node_isolate);
}
Local<Function> enter =
domain->Get(env->enter_string()).As<Function>();
assert(enter->IsFunction());
enter->Call(domain, 0, NULL);
if (try_catch.HasCaught()) {
return Undefined(node_isolate);
}
}
Local<Value> ret = callback->Call(object, argc, argv);
if (try_catch.HasCaught()) {
return Undefined(node_isolate);
}
if (has_domain) {
Local<Function> exit =
domain->Get(env->exit_string()).As<Function>();
assert(exit->IsFunction());
exit->Call(domain, 0, NULL);
if (try_catch.HasCaught()) {
return Undefined(node_isolate);
}
}
if (has_async_queue) {
Local<Value> val = object.As<Value>();
env->async_listener_unload_function()->Call(process, 1, &val);
if (try_catch.HasCaught())
return Undefined(node_isolate);
}
Environment::TickInfo* tick_info = env->tick_info();
if (tick_info->last_threw() == 1) {
tick_info->set_last_threw(0);
return ret;
}
if (tick_info->in_tick()) {
return ret;
}
if (tick_info->length() == 0) {
tick_info->set_index(0);
return ret;
}
tick_info->set_in_tick(true);
env->tick_callback_function()->Call(process, 0, NULL);
tick_info->set_in_tick(false);
if (try_catch.HasCaught()) {
tick_info->set_last_threw(true);
return Undefined(node_isolate);
}
return ret;
}
Handle<Value> MakeCallback(Environment* env, Handle<Value> MakeCallback(Environment* env,
Handle<Object> object, Handle<Object> object,
const Handle<Function> callback, const Handle<Function> callback,
int argc, int argc,
Handle<Value> argv[]) { Handle<Value> argv[]) {
if (env->using_domains())
return MakeDomainCallback(env, object, callback, argc, argv);
// If you hit this assertion, you forgot to enter the v8::Context first. // If you hit this assertion, you forgot to enter the v8::Context first.
assert(env->context() == env->isolate()->GetCurrentContext()); assert(env->context() == env->isolate()->GetCurrentContext());
@ -1031,6 +1177,19 @@ Handle<Value> MakeCallback(const Handle<Object> object,
} }
Handle<Value> MakeDomainCallback(const Handle<Object> object,
const Handle<Function> callback,
int argc,
Handle<Value> argv[]) {
Local<Context> context = object->CreationContext();
Environment* env = Environment::GetCurrent(context);
Context::Scope context_scope(context);
HandleScope handle_scope(env->isolate());
return handle_scope.Close(
MakeDomainCallback(env, object, callback, argc, argv));
}
enum encoding ParseEncoding(Handle<Value> encoding_v, enum encoding _default) { enum encoding ParseEncoding(Handle<Value> encoding_v, enum encoding _default) {
HandleScope scope(node_isolate); HandleScope scope(node_isolate);
@ -2482,6 +2641,7 @@ void SetupProcessObject(Environment* env,
NODE_SET_METHOD(process, "_setupAsyncListener", SetupAsyncListener); NODE_SET_METHOD(process, "_setupAsyncListener", SetupAsyncListener);
NODE_SET_METHOD(process, "_setupNextTick", SetupNextTick); NODE_SET_METHOD(process, "_setupNextTick", SetupNextTick);
NODE_SET_METHOD(process, "_setupDomainUse", SetupDomainUse);
// values use to cross communicate with processNextTick // values use to cross communicate with processNextTick
Local<Object> tick_info_obj = Object::New(); Local<Object> tick_info_obj = Object::New();

39
src/node.js

@ -224,11 +224,14 @@
// First run through error handlers from asyncListener. // First run through error handlers from asyncListener.
var caught = _errorHandler(er); var caught = _errorHandler(er);
if (process.domain && process.domain._errorHandler)
caught = process.domain._errorHandler(er) || caught;
if (!caught) if (!caught)
caught = process.emit('uncaughtException', er); caught = process.emit('uncaughtException', er);
// If someone handled it, then great. Otherwise die in C++ since // If someone handled it, then great. otherwise, die in C++ land
// that means we'll exit the process, emit the 'exit' event. // since that means that we'll exit the process, emit the 'exit' event
if (!caught) { if (!caught) {
try { try {
if (!process._exiting) { if (!process._exiting) {
@ -568,6 +571,7 @@
process.nextTick = nextTick; process.nextTick = nextTick;
// Needs to be accessible from beyond this scope. // Needs to be accessible from beyond this scope.
process._tickCallback = _tickCallback; process._tickCallback = _tickCallback;
process._tickDomainCallback = _tickDomainCallback;
process._setupNextTick(tickInfo, _tickCallback); process._setupNextTick(tickInfo, _tickCallback);
@ -585,6 +589,7 @@
} }
// Run callbacks that have no domain. // Run callbacks that have no domain.
// Using domains will cause this to be overridden.
function _tickCallback() { function _tickCallback() {
var callback, hasQueue, threw, tock; var callback, hasQueue, threw, tock;
@ -611,6 +616,35 @@
tickDone(); tickDone();
} }
function _tickDomainCallback() {
var callback, domain, hasQueue, threw, tock;
while (tickInfo[kIndex] < tickInfo[kLength]) {
tock = nextTickQueue[tickInfo[kIndex]++];
callback = tock.callback;
domain = tock.domain;
hasQueue = !!tock._asyncQueue;
if (hasQueue)
_loadAsyncQueue(tock);
if (domain)
domain.enter();
threw = true;
try {
callback();
threw = false;
} finally {
if (threw)
tickDone();
}
if (hasQueue)
_unloadAsyncQueue(tock);
if (domain)
domain.exit();
}
tickDone();
}
function nextTick(callback) { function nextTick(callback) {
// on the way out, don't bother. it won't get fired anyway. // on the way out, don't bother. it won't get fired anyway.
if (process._exiting) if (process._exiting)
@ -618,6 +652,7 @@
var obj = { var obj = {
callback: callback, callback: callback,
domain: process.domain || null,
_asyncQueue: undefined _asyncQueue: undefined
}; };

6
src/node_crypto.cc

@ -3628,6 +3628,9 @@ void PBKDF2(const FunctionCallbackInfo<Value>& args) {
if (args[4]->IsFunction()) { if (args[4]->IsFunction()) {
obj->Set(env->ondone_string(), args[4]); obj->Set(env->ondone_string(), args[4]);
// XXX(trevnorris): This will need to go with the rest of domains.
if (env->in_domain())
obj->Set(env->domain_string(), env->domain_array()->Get(0));
uv_queue_work(env->event_loop(), uv_queue_work(env->event_loop(),
req->work_req(), req->work_req(),
EIO_PBKDF2, EIO_PBKDF2,
@ -3789,6 +3792,9 @@ void RandomBytes(const FunctionCallbackInfo<Value>& args) {
if (args[1]->IsFunction()) { if (args[1]->IsFunction()) {
obj->Set(FIXED_ONE_BYTE_STRING(node_isolate, "ondone"), args[1]); obj->Set(FIXED_ONE_BYTE_STRING(node_isolate, "ondone"), args[1]);
// XXX(trevnorris): This will need to go with the rest of domains.
if (env->in_domain())
obj->Set(env->domain_string(), env->domain_array()->Get(0));
uv_queue_work(env->event_loop(), uv_queue_work(env->event_loop(),
req->work_req(), req->work_req(),
RandomBytesWork<pseudoRandom>, RandomBytesWork<pseudoRandom>,

3
src/req_wrap.h

@ -39,6 +39,9 @@ class ReqWrap : public AsyncWrap {
public: public:
ReqWrap(Environment* env, v8::Handle<v8::Object> object) ReqWrap(Environment* env, v8::Handle<v8::Object> object)
: AsyncWrap(env, object) { : AsyncWrap(env, object) {
if (env->in_domain())
object->Set(env->domain_string(), env->domain_array()->Get(0));
QUEUE_INSERT_TAIL(&req_wrap_queue, &req_wrap_queue_); QUEUE_INSERT_TAIL(&req_wrap_queue, &req_wrap_queue_);
} }

Loading…
Cancel
Save