Browse Source

domain: use AsyncListener API

The domain module has been switched over to use the domain module API as
much as currently possible. There are still some hooks in the
EventEmitter, but hopefully we can remove those in the future.
Trevor Norris 11 years ago
parent
commit
bc39bdd995
  1. 63
      lib/_http_client.js
  2. 320
      lib/domain.js
  3. 36
      lib/events.js
  4. 11
      lib/net.js
  5. 22
      lib/timers.js
  6. 89
      src/async-wrap-inl.h
  7. 7
      src/async-wrap.h
  8. 35
      src/env-inl.h
  9. 29
      src/env.h
  10. 180
      src/node.cc
  11. 39
      src/node.js
  12. 6
      src/node_crypto.cc
  13. 5
      src/req_wrap.h
  14. 18
      src/tls_wrap.cc
  15. 4
      test/simple/test-event-emitter-no-error-provided-to-error-event.js

63
lib/_http_client.js

@ -417,38 +417,45 @@ function responseOnEnd() {
}
}
function tickOnSocket(req, socket) {
var parser = parsers.alloc();
req.socket = socket;
req.connection = socket;
parser.reinitialize(HTTPParser.RESPONSE);
parser.socket = socket;
parser.incoming = null;
req.parser = parser;
socket.parser = parser;
socket._httpMessage = req;
// Setup "drain" propogation.
httpSocketSetup(socket);
// Propagate headers limit from request object to parser
if (util.isNumber(req.maxHeadersCount)) {
parser.maxHeaderPairs = req.maxHeadersCount << 1;
} else {
// Set default value because parser may be reused from FreeList
parser.maxHeaderPairs = 2000;
}
parser.onIncoming = parserOnIncomingClient;
socket.on('error', socketErrorListener);
socket.on('data', socketOnData);
socket.on('end', socketOnEnd);
socket.on('close', socketCloseListener);
req.emit('socket', socket);
}
ClientRequest.prototype.onSocket = function(socket) {
var req = this;
process.nextTick(function() {
var parser = parsers.alloc();
req.socket = socket;
req.connection = socket;
parser.reinitialize(HTTPParser.RESPONSE);
parser.socket = socket;
parser.incoming = null;
req.parser = parser;
socket.parser = parser;
socket._httpMessage = req;
// Setup "drain" propogation.
httpSocketSetup(socket);
// Propagate headers limit from request object to parser
if (util.isNumber(req.maxHeadersCount)) {
parser.maxHeaderPairs = req.maxHeadersCount << 1;
} else {
// Set default value because parser may be reused from FreeList
parser.maxHeaderPairs = 2000;
}
parser.onIncoming = parserOnIncomingClient;
socket.on('error', socketErrorListener);
socket.on('data', socketOnData);
socket.on('end', socketOnEnd);
socket.on('close', socketCloseListener);
req.emit('socket', socket);
// If a domain was added to the request, attach it to the socket.
if (req.domain)
socket._handle.addAsyncListener(req.domain._listener);
tickOnSocket(req, socket);
});
};

320
lib/domain.js

@ -20,34 +20,13 @@
// USE OR OTHER DEALINGS IN THE SOFTWARE.
var util = require('util');
var events = require('events');
var EventEmitter = events.EventEmitter;
var EventEmitter = require('events');
var inherits = util.inherits;
// communicate with events module, but don't require that
// module to have to load this one, since this module has
// a few side effects.
events.usingDomains = true;
// overwrite process.domain with a getter/setter that will allow for more
// effective optimizations
var _domain = [null];
Object.defineProperty(process, 'domain', {
enumerable: true,
get: function() {
return _domain[0];
},
set: function(arg) {
return _domain[0] = arg;
}
});
// objects with external array data are excellent ways to communicate state
// between js and c++ w/o much overhead
var _domain_flag = {};
// let the process know we're using domains
process._setupDomainUse(_domain, _domain_flag);
EventEmitter.usingDomains = true;
exports.Domain = Domain;
@ -63,65 +42,75 @@ exports._stack = stack;
exports.active = null;
function noop() { }
var listenerObj = {
error: function errorHandler(domain, er) {
var caught = false;
// ignore errors on disposed domains.
//
// XXX This is a bit stupid. We should probably get rid of
// domain.dispose() altogether. It's almost always a terrible
// idea. --isaacs
if (domain._disposed)
return true;
er.domain = domain;
er.domainThrown = true;
// wrap this in a try/catch so we don't get infinite throwing
try {
// One of three things will happen here.
//
// 1. There is a handler, caught = true
// 2. There is no handler, caught = false
// 3. It throws, caught = false
//
// If caught is false after this, then there's no need to exit()
// the domain, because we're going to crash the process anyway.
caught = domain.emit('error', er);
if (stack.length === 0)
process.removeAsyncListener(domain._listener);
// Exit all domains on the stack. Uncaught exceptions end the
// current tick and no domains should be left on the stack
// between ticks.
stack.length = 0;
exports.active = process.domain = null;
} catch (er2) {
// The domain error handler threw! oh no!
// See if another domain can catch THIS error,
// or else crash on the original one.
// If the user already exited it, then don't double-exit.
if (domain === exports.active) {
stack.pop();
}
if (stack.length) {
exports.active = process.domain = stack[stack.length - 1];
caught = process._fatalException(er2);
} else {
caught = false;
}
return caught;
}
return caught;
}
};
inherits(Domain, EventEmitter);
function Domain() {
EventEmitter.call(this);
this.members = [];
this._listener = process.createAsyncListener(noop, listenerObj, this);
}
Domain.prototype.members = undefined;
Domain.prototype._disposed = undefined;
Domain.prototype._listener = undefined;
// Called by process._fatalException in case an error was thrown.
Domain.prototype._errorHandler = function errorHandler(er) {
var caught = false;
// ignore errors on disposed domains.
//
// XXX This is a bit stupid. We should probably get rid of
// domain.dispose() altogether. It's almost always a terrible
// idea. --isaacs
if (this._disposed)
return true;
er.domain = this;
er.domainThrown = true;
// wrap this in a try/catch so we don't get infinite throwing
try {
// One of three things will happen here.
//
// 1. There is a handler, caught = true
// 2. There is no handler, caught = false
// 3. It throws, caught = false
//
// If caught is false after this, then there's no need to exit()
// the domain, because we're going to crash the process anyway.
caught = this.emit('error', er);
// Exit all domains on the stack. Uncaught exceptions end the
// current tick and no domains should be left on the stack
// between ticks.
stack.length = 0;
exports.active = process.domain = null;
} catch (er2) {
// The domain error handler threw! oh no!
// See if another domain can catch THIS error,
// or else crash on the original one.
// If the user already exited it, then don't double-exit.
if (this === exports.active) {
stack.pop();
}
if (stack.length) {
exports.active = process.domain = stack[stack.length - 1];
caught = process._fatalException(er2);
} else {
caught = false;
}
return caught;
}
return caught;
};
Domain.prototype.enter = function() {
if (this._disposed) return;
@ -130,35 +119,37 @@ Domain.prototype.enter = function() {
// to push it onto the stack so that we can pop it later.
exports.active = process.domain = this;
stack.push(this);
_domain_flag[0] = stack.length;
process.addAsyncListener(this._listener);
};
Domain.prototype.exit = function() {
if (this._disposed) return;
process.removeAsyncListener(this._listener);
// exit all domains until this one.
var d;
do {
d = stack.pop();
} while (d && d !== this);
_domain_flag[0] = stack.length;
var index = stack.lastIndexOf(this);
if (index !== -1)
stack.splice(index + 1);
else
stack.length = 0;
exports.active = stack[stack.length - 1];
process.domain = exports.active;
};
// note: this works for timers as well.
Domain.prototype.add = function(ee) {
// disposed domains can't be used for new things.
if (this._disposed) return;
// already added to this domain.
if (ee.domain === this) return;
// If the domain is disposed or already added, then nothing left to do.
if (this._disposed || ee.domain === this)
return;
// has a domain already - remove it first.
if (ee.domain) {
if (ee.domain)
ee.domain.remove(ee);
}
// check for circular Domain->Domain links.
// This causes bad insanity!
@ -177,85 +168,128 @@ Domain.prototype.add = function(ee) {
ee.domain = this;
this.members.push(ee);
// Adding the domain._listener to the Wrap associated with the event
// emitter instance will be done automatically either on class
// instantiation or manually, like in cases of net listen().
// The reason it cannot be done here is because in specific cases the
// _handle is not created on EE instantiation, so there's no place to
// add the listener.
};
Domain.prototype.remove = function(ee) {
ee.domain = null;
var index = this.members.indexOf(ee);
if (index !== -1) {
if (index !== -1)
this.members.splice(index, 1);
// First check if the ee is a handle itself.
if (ee.removeAsyncListener)
ee.removeAsyncListener(this._listener);
// Manually remove the asyncListener from the handle, if possible.
if (ee._handle && ee._handle.removeAsyncListener)
ee._handle.removeAsyncListener(this._listener);
// TODO(trevnorris): Are there cases where the handle doesn't live on
// the ee or the _handle.
// TODO(trevnorris): For debugging that we've missed adding AsyncWrap's
// methods to a handle somewhere on the native side.
if (ee._handle && !ee._handle.removeAsyncListener) {
process._rawDebug('Wrap handle is missing AsyncWrap methods');
process.abort();
}
};
Domain.prototype.run = function(fn) {
return this.bind(fn)();
if (this._disposed)
return;
this.enter();
var ret = fn.call(this);
this.exit();
return ret;
};
function intercepted(_this, self, cb, fnargs) {
if (self._disposed)
return;
if (fnargs[0] && fnargs[0] instanceof Error) {
var er = fnargs[0];
util._extend(er, {
domainBound: cb,
domainThrown: false,
domain: self
});
self.emit('error', er);
return;
}
var len = fnargs.length;
var args = [];
var i, ret;
self.enter();
if (fnargs.length > 1) {
for (i = 1; i < fnargs.length; i++)
args.push(fnargs[i]);
ret = cb.apply(_this, args);
} else {
ret = cb.call(_this);
}
self.exit();
return ret;
}
Domain.prototype.intercept = function(cb) {
return this.bind(cb, true);
var self = this;
function runIntercepted() {
return intercepted(this, self, cb, arguments);
}
return runIntercepted;
};
Domain.prototype.bind = function(cb, interceptError) {
// if cb throws, catch it here.
var self = this;
var b = function() {
// disposing turns functions into no-ops
if (self._disposed) return;
if (this instanceof Domain) {
return cb.apply(this, arguments);
}
function bound(_this, self, cb, fnargs) {
if (self._disposed)
return;
// only intercept first-arg errors if explicitly requested.
if (interceptError && arguments[0] &&
(arguments[0] instanceof Error)) {
var er = arguments[0];
util._extend(er, {
domainBound: cb,
domainThrown: false,
domain: self
});
self.emit('error', er);
return;
}
var len = fnargs.length;
var args = [];
var i, ret;
// remove first-arg if intercept as assumed to be the error-arg
if (interceptError) {
var len = arguments.length;
var args;
switch (len) {
case 0:
case 1:
// no args that we care about.
args = [];
break;
case 2:
// optimization for most common case: cb(er, data)
args = [arguments[1]];
break;
default:
// slower for less common case: cb(er, foo, bar, baz, ...)
args = new Array(len - 1);
for (var i = 1; i < len; i++) {
args[i - 1] = arguments[i];
}
break;
}
self.enter();
var ret = cb.apply(this, args);
self.exit();
return ret;
}
self.enter();
if (fnargs.length > 0)
ret = cb.apply(_this, fnargs);
else
ret = cb.call(_this);
self.exit();
self.enter();
var ret = cb.apply(this, arguments);
self.exit();
return ret;
};
b.domain = this;
return b;
return ret;
}
Domain.prototype.bind = function(cb) {
var self = this;
function runBound() {
return bound(this, self, cb, arguments);
}
runBound.domain = this;
return runBound;
};
Domain.prototype.dispose = util.deprecate(function() {
if (this._disposed) return;

36
lib/events.js

@ -66,23 +66,21 @@ EventEmitter.prototype.emit = function(type) {
this._events = {};
// If there is no 'error' event listener then throw.
if (type === 'error') {
if (!this._events.error ||
(util.isObject(this._events.error) && !this._events.error.length)) {
er = arguments[1];
if (this.domain) {
if (!er) er = new TypeError('Uncaught, unspecified "error" event.');
er.domainEmitter = this;
er.domain = this.domain;
er.domainThrown = false;
this.domain.emit('error', er);
} else if (er instanceof Error) {
throw er; // Unhandled 'error' event
} else {
throw TypeError('Uncaught, unspecified "error" event.');
}
return false;
if (type === 'error' && !this._events.error) {
er = arguments[1];
if (this.domain) {
if (!er)
er = new Error('Uncaught, unspecified "error" event.');
er.domainEmitter = this;
er.domain = this.domain;
er.domainThrown = false;
this.domain.emit('error', er);
} else if (er instanceof Error) {
throw er; // Unhandled 'error' event
} else {
throw Error('Uncaught, unspecified "error" event.');
}
return false;
}
handler = this._events[type];
@ -90,9 +88,6 @@ EventEmitter.prototype.emit = function(type) {
if (util.isUndefined(handler))
return false;
if (this.domain && this !== process)
this.domain.enter();
if (util.isFunction(handler)) {
switch (arguments.length) {
// fast cases
@ -125,9 +120,6 @@ EventEmitter.prototype.emit = function(type) {
listeners[i].apply(this, args);
}
if (this.domain && this !== process)
this.domain.exit();
return true;
};

11
lib/net.js

@ -1089,9 +1089,20 @@ Server.prototype._listen2 = function(address, port, addressType, backlog, fd) {
// generate connection key, this should be unique to the connection
this._connectionKey = addressType + ':' + address + ':' + port;
// If a domain is attached to the event emitter then we need to add
// the listener to the handle.
if (this.domain) {
this._handle.addAsyncListener(this.domain._listener);
process.addAsyncListener(this.domain._listener);
}
process.nextTick(function() {
self.emit('listening');
});
if (this.domain) {
process.removeAsyncListener(this.domain._listener);
}
};

22
lib/timers.js

@ -112,8 +112,7 @@ function listOnTimeout() {
// other timers that expire on this tick should still run.
//
// https://github.com/joyent/node/issues/2631
var domain = first.domain;
if (domain && domain._disposed)
if (first.domain && first.domain._disposed)
continue;
hasQueue = !!first._asyncQueue;
@ -121,14 +120,10 @@ function listOnTimeout() {
try {
if (hasQueue)
loadAsyncQueue(first);
if (domain)
domain.enter();
threw = true;
first._onTimeout();
if (hasQueue)
unloadAsyncQueue(first);
if (domain)
domain.exit();
threw = false;
} finally {
if (threw) {
@ -373,7 +368,7 @@ L.init(immediateQueue);
function processImmediate() {
var queue = immediateQueue;
var domain, hasQueue, immediate;
var hasQueue, immediate;
immediateQueue = {};
L.init(immediateQueue);
@ -381,12 +376,9 @@ function processImmediate() {
while (L.isEmpty(queue) === false) {
immediate = L.shift(queue);
hasQueue = !!immediate._asyncQueue;
domain = immediate.domain;
if (hasQueue)
loadAsyncQueue(immediate);
if (domain)
domain.enter();
var threw = true;
try {
@ -408,8 +400,6 @@ function processImmediate() {
if (hasQueue)
unloadAsyncQueue(immediate);
if (domain)
domain.exit();
}
// Only round-trip to C++ land if we have to. Calling clearImmediate() on an
@ -489,7 +479,7 @@ function unrefTimeout() {
debug('unrefTimer fired');
var diff, domain, first, hasQueue, threw;
var diff, first, hasQueue, threw;
while (first = L.peek(unrefList)) {
diff = now - first._idleStart;
hasQueue = !!first._asyncQueue;
@ -504,22 +494,18 @@ function unrefTimeout() {
L.remove(first);
domain = first.domain;
if (!first._onTimeout) continue;
if (domain && domain._disposed) continue;
if (first.domain && first.domain._disposed) continue;
try {
if (hasQueue)
loadAsyncQueue(first);
if (domain) domain.enter();
threw = true;
debug('unreftimer firing timeout');
first._onTimeout();
threw = false;
if (hasQueue)
unloadAsyncQueue(first);
if (domain) domain.exit();
} finally {
if (threw) process.nextTick(unrefTimeout);
}

89
src/async-wrap-inl.h

@ -104,99 +104,10 @@ inline v8::Persistent<v8::Object>& AsyncWrap::persistent() {
}
// I hate you domains.
inline v8::Handle<v8::Value> AsyncWrap::MakeDomainCallback(
const v8::Handle<v8::Function> cb,
int argc,
v8::Handle<v8::Value>* argv) {
assert(env()->context() == env()->isolate()->GetCurrentContext());
v8::Local<v8::Object> context = object();
v8::Local<v8::Object> process = env()->process_object();
v8::Local<v8::Value> domain_v = context->Get(env()->domain_string());
v8::Local<v8::Object> domain;
v8::TryCatch try_catch;
try_catch.SetVerbose(true);
if (has_async_queue()) {
v8::Local<v8::Value> val = context.As<v8::Value>();
env()->async_listener_load_function()->Call(process, 1, &val);
if (try_catch.HasCaught())
return v8::Undefined(env()->isolate());
}
bool has_domain = domain_v->IsObject();
if (has_domain) {
domain = domain_v.As<v8::Object>();
if (domain->Get(env()->disposed_string())->IsTrue())
return Undefined(env()->isolate());
v8::Local<v8::Function> enter =
domain->Get(env()->enter_string()).As<v8::Function>();
assert(enter->IsFunction());
enter->Call(domain, 0, NULL);
if (try_catch.HasCaught())
return Undefined(env()->isolate());
}
v8::Local<v8::Value> ret = cb->Call(context, argc, argv);
if (try_catch.HasCaught()) {
return Undefined(env()->isolate());
}
if (has_async_queue()) {
v8::Local<v8::Value> val = context.As<v8::Value>();
env()->async_listener_unload_function()->Call(process, 1, &val);
}
if (has_domain) {
v8::Local<v8::Function> exit =
domain->Get(env()->exit_string()).As<v8::Function>();
assert(exit->IsFunction());
exit->Call(domain, 0, NULL);
if (try_catch.HasCaught())
return Undefined(env()->isolate());
}
Environment::TickInfo* tick_info = env()->tick_info();
if (tick_info->in_tick()) {
return ret;
}
if (tick_info->length() == 0) {
tick_info->set_index(0);
return ret;
}
tick_info->set_in_tick(true);
env()->tick_callback_function()->Call(process, 0, NULL);
tick_info->set_in_tick(false);
if (try_catch.HasCaught()) {
tick_info->set_last_threw(true);
return Undefined(env()->isolate());
}
return ret;
}
inline v8::Handle<v8::Value> AsyncWrap::MakeCallback(
const v8::Handle<v8::Function> cb,
int argc,
v8::Handle<v8::Value>* argv) {
if (env()->using_domains())
return MakeDomainCallback(cb, argc, argv);
assert(env()->context() == env()->isolate()->GetCurrentContext());
v8::Local<v8::Object> context = object();

7
src/async-wrap.h

@ -69,13 +69,6 @@ class AsyncWrap {
v8::Handle<v8::Value>* argv);
private:
// TODO(trevnorris): BURN IN FIRE! Remove this as soon as a suitable
// replacement is committed.
inline v8::Handle<v8::Value> MakeDomainCallback(
const v8::Handle<v8::Function> cb,
int argc,
v8::Handle<v8::Value>* argv);
// Add an async listener to an existing handle.
template <typename Type>
static inline void AddAsyncListener(

35
src/env-inl.h

@ -86,22 +86,6 @@ inline uint32_t Environment::AsyncListener::count() const {
return fields_[kCount];
}
inline Environment::DomainFlag::DomainFlag() {
for (int i = 0; i < kFieldsCount; ++i) fields_[i] = 0;
}
inline uint32_t* Environment::DomainFlag::fields() {
return fields_;
}
inline int Environment::DomainFlag::fields_count() const {
return kFieldsCount;
}
inline uint32_t Environment::DomainFlag::count() const {
return fields_[kCount];
}
inline Environment::TickInfo::TickInfo() : in_tick_(false), last_threw_(false) {
for (int i = 0; i < kFieldsCount; ++i)
fields_[i] = 0;
@ -179,7 +163,6 @@ inline Environment::Environment(v8::Local<v8::Context> context)
: isolate_(context->GetIsolate()),
isolate_data_(IsolateData::GetOrCreate(context->GetIsolate())),
using_smalloc_alloc_cb_(false),
using_domains_(false),
context_(context->GetIsolate(), context) {
// We'll be creating new objects so make sure we've entered the context.
v8::Context::Scope context_scope(context);
@ -210,12 +193,6 @@ inline bool Environment::has_async_listeners() const {
return const_cast<Environment*>(this)->async_listener()->count() > 0;
}
inline bool Environment::in_domain() const {
// The const_cast is okay, it doesn't violate conceptual const-ness.
return using_domains() &&
const_cast<Environment*>(this)->domain_flag()->count() > 0;
}
inline Environment* Environment::from_immediate_check_handle(
uv_check_t* handle) {
return CONTAINER_OF(handle, Environment, immediate_check_handle_);
@ -254,10 +231,6 @@ inline Environment::AsyncListener* Environment::async_listener() {
return &async_listener_count_;
}
inline Environment::DomainFlag* Environment::domain_flag() {
return &domain_flag_;
}
inline Environment::TickInfo* Environment::tick_info() {
return &tick_info_;
}
@ -270,14 +243,6 @@ inline void Environment::set_using_smalloc_alloc_cb(bool value) {
using_smalloc_alloc_cb_ = value;
}
inline bool Environment::using_domains() const {
return using_domains_;
}
inline void Environment::set_using_domains(bool value) {
using_domains_ = value;
}
inline Environment* Environment::from_cares_timer_handle(uv_timer_t* handle) {
return CONTAINER_OF(handle, Environment, cares_timer_handle_);
}

29
src/env.h

@ -66,7 +66,6 @@ namespace node {
V(ctime_string, "ctime") \
V(dev_string, "dev") \
V(disposed_string, "_disposed") \
V(domain_string, "domain") \
V(enter_string, "enter") \
V(errno_string, "errno") \
V(exit_string, "exit") \
@ -142,7 +141,6 @@ namespace node {
V(binding_cache_object, v8::Object) \
V(buffer_constructor_function, v8::Function) \
V(context, v8::Context) \
V(domain_array, v8::Array) \
V(module_load_list_array, v8::Array) \
V(pipe_constructor_template, v8::FunctionTemplate) \
V(process_object, v8::Object) \
@ -191,26 +189,6 @@ class Environment {
DISALLOW_COPY_AND_ASSIGN(AsyncListener);
};
class DomainFlag {
public:
inline uint32_t* fields();
inline int fields_count() const;
inline uint32_t count() const;
private:
friend class Environment; // So we can call the constructor.
inline DomainFlag();
enum Fields {
kCount,
kFieldsCount
};
uint32_t fields_[kFieldsCount];
DISALLOW_COPY_AND_ASSIGN(DomainFlag);
};
class TickInfo {
public:
inline uint32_t* fields();
@ -252,7 +230,6 @@ class Environment {
inline v8::Isolate* isolate() const;
inline uv_loop_t* event_loop() const;
inline bool has_async_listeners() const;
inline bool in_domain() const;
static inline Environment* from_immediate_check_handle(uv_check_t* handle);
inline uv_check_t* immediate_check_handle();
@ -265,7 +242,6 @@ class Environment {
inline uv_check_t* idle_check_handle();
inline AsyncListener* async_listener();
inline DomainFlag* domain_flag();
inline TickInfo* tick_info();
static inline Environment* from_cares_timer_handle(uv_timer_t* handle);
@ -277,9 +253,6 @@ class Environment {
inline bool using_smalloc_alloc_cb() const;
inline void set_using_smalloc_alloc_cb(bool value);
inline bool using_domains() const;
inline void set_using_domains(bool value);
// Strings are shared across shared contexts. The getters simply proxy to
// the per-isolate primitive.
#define V(PropertyName, StringValue) \
@ -310,13 +283,11 @@ class Environment {
uv_prepare_t idle_prepare_handle_;
uv_check_t idle_check_handle_;
AsyncListener async_listener_count_;
DomainFlag domain_flag_;
TickInfo tick_info_;
uv_timer_t cares_timer_handle_;
ares_channel cares_channel_;
ares_task_list cares_task_list_;
bool using_smalloc_alloc_cb_;
bool using_domains_;
#define V(PropertyName, TypeName) \
v8::Persistent<TypeName> PropertyName ## _;

180
src/node.cc

@ -847,12 +847,12 @@ void SetupAsyncListener(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args.GetIsolate());
HandleScope handle_scope(args.GetIsolate());
assert(args[0]->IsObject() &&
args[1]->IsFunction() &&
args[2]->IsFunction() &&
args[3]->IsFunction() &&
args[4]->IsFunction() &&
args[5]->IsFunction());
assert(args[0]->IsObject());
assert(args[1]->IsFunction());
assert(args[2]->IsFunction());
assert(args[3]->IsFunction());
assert(args[4]->IsFunction());
assert(args[5]->IsFunction());
env->set_async_listener_run_function(args[1].As<Function>());
env->set_async_listener_load_function(args[2].As<Function>());
@ -873,60 +873,11 @@ void SetupAsyncListener(const FunctionCallbackInfo<Value>& args) {
}
void SetupDomainUse(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args.GetIsolate());
if (env->using_domains())
return;
env->set_using_domains(true);
HandleScope scope(node_isolate);
Local<Object> process_object = env->process_object();
Local<String> tick_callback_function_key =
FIXED_ONE_BYTE_STRING(node_isolate, "_tickDomainCallback");
Local<Function> tick_callback_function =
process_object->Get(tick_callback_function_key).As<Function>();
if (!tick_callback_function->IsFunction()) {
fprintf(stderr, "process._tickDomainCallback assigned to non-function\n");
abort();
}
process_object->Set(FIXED_ONE_BYTE_STRING(node_isolate, "_tickCallback"),
tick_callback_function);
env->set_tick_callback_function(tick_callback_function);
if (!args[0]->IsArray()) {
fprintf(stderr, "_setupDomainUse first argument must be an array\n");
abort();
}
env->set_domain_array(args[0].As<Array>());
if (!args[1]->IsObject()) {
fprintf(stderr, "_setupDomainUse second argument must be an object\n");
abort();
}
Local<Object> domain_flag_obj = args[1].As<Object>();
Environment::DomainFlag* domain_flag = env->domain_flag();
domain_flag_obj->SetIndexedPropertiesToExternalArrayData(
domain_flag->fields(),
kExternalUnsignedIntArray,
domain_flag->fields_count());
// Do a little housekeeping.
env->process_object()->Delete(
FIXED_ONE_BYTE_STRING(args.GetIsolate(), "_setupDomainUse"));
}
void SetupNextTick(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args.GetIsolate());
HandleScope handle_scope(args.GetIsolate());
if (!args[0]->IsObject() || !args[1]->IsFunction())
abort();
assert(args[0]->IsObject() && args[1]->IsFunction());
// Values use to cross communicate with processNextTick.
Local<Object> tick_info_obj = args[0].As<Object>();
@ -943,114 +894,11 @@ void SetupNextTick(const FunctionCallbackInfo<Value>& args) {
}
Handle<Value> MakeDomainCallback(Environment* env,
Handle<Object> object,
const Handle<Function> callback,
int argc,
Handle<Value> argv[]) {
// If you hit this assertion, you forgot to enter the v8::Context first.
assert(env->context() == env->isolate()->GetCurrentContext());
Local<Object> process = env->process_object();
Local<Value> domain_v = object->Get(env->domain_string());
Local<Object> domain;
TryCatch try_catch;
try_catch.SetVerbose(true);
// TODO(trevnorris): This is sucky for performance. Fix it.
bool has_async_queue = object->Has(env->async_queue_string());
if (has_async_queue) {
Local<Value> argv[] = { object };
env->async_listener_load_function()->Call(process, ARRAY_SIZE(argv), argv);
if (try_catch.HasCaught())
return Undefined(node_isolate);
}
bool has_domain = domain_v->IsObject();
if (has_domain) {
domain = domain_v.As<Object>();
if (domain->Get(env->disposed_string())->IsTrue()) {
// domain has been disposed of.
return Undefined(node_isolate);
}
Local<Function> enter =
domain->Get(env->enter_string()).As<Function>();
assert(enter->IsFunction());
enter->Call(domain, 0, NULL);
if (try_catch.HasCaught()) {
return Undefined(node_isolate);
}
}
Local<Value> ret = callback->Call(object, argc, argv);
if (try_catch.HasCaught()) {
return Undefined(node_isolate);
}
if (has_async_queue) {
Local<Value> val = object.As<Value>();
env->async_listener_unload_function()->Call(process, 1, &val);
if (try_catch.HasCaught())
return Undefined(node_isolate);
}
if (has_domain) {
Local<Function> exit =
domain->Get(env->exit_string()).As<Function>();
assert(exit->IsFunction());
exit->Call(domain, 0, NULL);
if (try_catch.HasCaught()) {
return Undefined(node_isolate);
}
}
Environment::TickInfo* tick_info = env->tick_info();
if (tick_info->last_threw() == 1) {
tick_info->set_last_threw(0);
return ret;
}
if (tick_info->in_tick()) {
return ret;
}
if (tick_info->length() == 0) {
tick_info->set_index(0);
return ret;
}
tick_info->set_in_tick(true);
env->tick_callback_function()->Call(process, 0, NULL);
tick_info->set_in_tick(false);
if (try_catch.HasCaught()) {
tick_info->set_last_threw(true);
return Undefined(node_isolate);
}
return ret;
}
Handle<Value> MakeCallback(Environment* env,
Handle<Object> object,
const Handle<Function> callback,
int argc,
Handle<Value> argv[]) {
if (env->using_domains())
return MakeDomainCallback(env, object, callback, argc, argv);
// If you hit this assertion, you forgot to enter the v8::Context first.
assert(env->context() == env->isolate()->GetCurrentContext());
@ -1180,19 +1028,6 @@ Handle<Value> MakeCallback(const Handle<Object> object,
}
Handle<Value> MakeDomainCallback(const Handle<Object> object,
const Handle<Function> callback,
int argc,
Handle<Value> argv[]) {
Local<Context> context = object->CreationContext();
Environment* env = Environment::GetCurrent(context);
Context::Scope context_scope(context);
HandleScope handle_scope(env->isolate());
return handle_scope.Close(
MakeDomainCallback(env, object, callback, argc, argv));
}
enum encoding ParseEncoding(Handle<Value> encoding_v, enum encoding _default) {
HandleScope scope(node_isolate);
@ -2630,7 +2465,6 @@ void SetupProcessObject(Environment* env,
NODE_SET_METHOD(process, "_setupAsyncListener", SetupAsyncListener);
NODE_SET_METHOD(process, "_setupNextTick", SetupNextTick);
NODE_SET_METHOD(process, "_setupDomainUse", SetupDomainUse);
// values use to cross communicate with processNextTick
Local<Object> tick_info_obj = Object::New();

39
src/node.js

@ -224,14 +224,11 @@
// First run through error handlers from asyncListener.
var caught = _errorHandler(er);
if (process.domain && process.domain._errorHandler)
caught = process.domain._errorHandler(er) || caught;
if (!caught)
caught = process.emit('uncaughtException', er);
// If someone handled it, then great. otherwise, die in C++ land
// since that means that we'll exit the process, emit the 'exit' event
// If someone handled it, then great. Otherwise die in C++ since
// that means we'll exit the process, emit the 'exit' event.
if (!caught) {
try {
if (!process._exiting) {
@ -570,7 +567,6 @@
process.nextTick = nextTick;
// Needs to be accessible from beyond this scope.
process._tickCallback = _tickCallback;
process._tickDomainCallback = _tickDomainCallback;
process._setupNextTick(tickInfo, _tickCallback);
@ -588,7 +584,6 @@
}
// Run callbacks that have no domain.
// Using domains will cause this to be overridden.
function _tickCallback() {
var callback, hasQueue, threw, tock;
@ -613,35 +608,6 @@
tickDone();
}
function _tickDomainCallback() {
var callback, domain, hasQueue, threw, tock;
while (tickInfo[kIndex] < tickInfo[kLength]) {
tock = nextTickQueue[tickInfo[kIndex]++];
callback = tock.callback;
domain = tock.domain;
hasQueue = !!tock._asyncQueue;
if (hasQueue)
_loadAsyncQueue(tock);
if (domain)
domain.enter();
threw = true;
try {
callback();
threw = false;
} finally {
if (threw)
tickDone();
}
if (hasQueue)
_unloadAsyncQueue(tock);
if (domain)
domain.exit();
}
tickDone();
}
function nextTick(callback) {
// on the way out, don't bother. it won't get fired anyway.
if (process._exiting)
@ -649,7 +615,6 @@
var obj = {
callback: callback,
domain: process.domain || null,
_asyncQueue: undefined
};

6
src/node_crypto.cc

@ -3489,9 +3489,6 @@ void PBKDF2(const FunctionCallbackInfo<Value>& args) {
if (args[4]->IsFunction()) {
obj->Set(env->ondone_string(), args[4]);
// XXX(trevnorris): This will need to go with the rest of domains.
if (env->in_domain())
obj->Set(env->domain_string(), env->domain_array()->Get(0));
uv_queue_work(env->event_loop(),
req->work_req(),
EIO_PBKDF2,
@ -3653,9 +3650,6 @@ void RandomBytes(const FunctionCallbackInfo<Value>& args) {
if (args[1]->IsFunction()) {
obj->Set(FIXED_ONE_BYTE_STRING(node_isolate, "ondone"), args[1]);
// XXX(trevnorris): This will need to go with the rest of domains.
if (env->in_domain())
obj->Set(env->domain_string(), env->domain_array()->Get(0));
uv_queue_work(env->event_loop(),
req->work_req(),
RandomBytesWork<pseudoRandom>,

5
src/req_wrap.h

@ -39,11 +39,6 @@ class ReqWrap : public AsyncWrap {
public:
ReqWrap(Environment* env, v8::Handle<v8::Object> object)
: AsyncWrap(env, object) {
assert(!object.IsEmpty());
if (env->in_domain())
object->Set(env->domain_string(), env->domain_array()->Get(0));
QUEUE_INSERT_TAIL(&req_wrap_queue, &req_wrap_queue_);
}

18
src/tls_wrap.cc

@ -388,10 +388,10 @@ void TLSCallbacks::ClearOut() {
if (read == -1) {
int err;
Handle<Value> argv = GetSSLError(read, &err);
Handle<Value> arg = GetSSLError(read, &err);
if (!argv.IsEmpty()) {
MakeCallback(env()->onerror_string(), 1, &argv);
if (!arg.IsEmpty()) {
MakeCallback(env()->onerror_string(), 1, &arg);
}
}
}
@ -424,9 +424,9 @@ bool TLSCallbacks::ClearIn() {
// Error or partial write
int err;
Handle<Value> argv = GetSSLError(written, &err);
if (!argv.IsEmpty()) {
MakeCallback(env()->onerror_string(), 1, &argv);
Handle<Value> arg = GetSSLError(written, &err);
if (!arg.IsEmpty()) {
MakeCallback(env()->onerror_string(), 1, &arg);
}
return false;
@ -487,9 +487,9 @@ int TLSCallbacks::DoWrite(WriteWrap* w,
int err;
Context::Scope context_scope(env()->context());
HandleScope handle_scope(env()->isolate());
Handle<Value> argv = GetSSLError(written, &err);
if (!argv.IsEmpty()) {
MakeCallback(env()->onerror_string(), 1, &argv);
Handle<Value> arg = GetSSLError(written, &err);
if (!arg.IsEmpty()) {
MakeCallback(env()->onerror_string(), 1, &arg);
return -1;
}

4
test/simple/test-event-emitter-no-error-provided-to-error-event.js

@ -31,12 +31,12 @@ var e = new events.EventEmitter();
var d = domain.create();
d.add(e);
d.on('error', function (er) {
assert(er instanceof TypeError, 'type error created');
assert(er instanceof Error, 'error created');
errorCatched = true;
});
e.emit('error');
process.on('exit', function () {
assert(errorCatched, 'error got catched');
assert(errorCatched, 'error got caught');
});

Loading…
Cancel
Save