diff --git a/lib/timers.js b/lib/timers.js index ff5843066e..d268479cf5 100644 --- a/lib/timers.js +++ b/lib/timers.js @@ -35,6 +35,9 @@ var runAsyncQueue = process._runAsyncQueue; var loadAsyncQueue = process._loadAsyncQueue; var unloadAsyncQueue = process._unloadAsyncQueue; +// Same as in AsyncListener in env.h +var kHasListener = 0; + // Do a little housekeeping. delete process._asyncFlags; delete process._runAsyncQueue; @@ -57,6 +60,8 @@ var lists = {}; // Make Timer as monomorphic as possible. Timer.prototype._asyncQueue = undefined; +Timer.prototype._asyncData = undefined; +Timer.prototype._asyncFlags = 0; // the main function - creates lists on demand and the watchers associated // with them. @@ -195,21 +200,38 @@ exports.active = function(item) { // Whether or not a new TimerWrap needed to be created, this should run // for each item. This way each "item" (i.e. timer) can properly have // their own domain assigned. - if (asyncFlags[0] > 0) + if (asyncFlags[kHasListener] > 0) runAsyncQueue(item); }; -function timerAddAsyncListener(obj) { +// new Array() is used here because it is more efficient for sparse +// arrays. Please *do not* change these to simple bracket notation. +function timerAddAsyncListener(obj, data) { + obj = process.createAsyncListener(obj, data); + if (!this._asyncQueue) - this._asyncQueue = []; + this._asyncQueue = new Array(); + if (!this._asyncData) + this._asyncData = new Array(); + + var inQueue = false; var queue = this._asyncQueue; // This queue will be small. Probably always <= 3 items. for (var i = 0; i < queue.length; i++) { - if (queue[i].uid === obj.uid) - return; + if (queue[i].uid === obj.uid) { + inQueue = true; + break; + } } - this._asyncQueue.push(obj); + + if (!inQueue) { + queue.push(obj); + this._asyncData[obj.uid] = obj.data; + this._asyncFlags |= obj.flags; + } + + return obj; } @@ -217,13 +239,20 @@ function timerRemoveAsyncListener(obj) { if (!this._asyncQueue) return; var queue = this._asyncQueue; + var i; // This queue will be small. Probably always <= 3 items. - for (var i = 0; i < queue.length; i++) { + for (i = 0; i < queue.length; i++) { if (queue[i].uid === obj.uid) { queue.splice(i, 1); + this._asyncData[obj.uid] = undefined; return; } } + // Rebuild flags + this._asyncFlags = 0; + for (i = 0; i < queue.length; i++) { + this._asyncFlags |= queue[i].flags; + } } @@ -428,8 +457,10 @@ Immediate.prototype.removeAsyncListener = timerRemoveAsyncListener; Immediate.prototype.domain = undefined; Immediate.prototype._onImmediate = undefined; Immediate.prototype._asyncQueue = undefined; +Immediate.prototype._asyncData = undefined; Immediate.prototype._idleNext = undefined; Immediate.prototype._idlePrev = undefined; +Immediate.prototype._asyncFlags = 0; exports.setImmediate = function(callback) { @@ -456,7 +487,7 @@ exports.setImmediate = function(callback) { } // setImmediates are handled more like nextTicks. - if (asyncFlags[0] > 0) + if (asyncFlags[kHasListener] > 0) runAsyncQueue(immediate); if (process.domain) immediate.domain = process.domain; @@ -494,7 +525,6 @@ function unrefTimeout() { var diff, domain, first, hasQueue, threw; while (first = L.peek(unrefList)) { diff = now - first._idleStart; - hasQueue = !!first._asyncQueue; if (diff < first._idleTimeout) { diff = first._idleTimeout - diff; @@ -510,6 +540,7 @@ function unrefTimeout() { if (!first._onTimeout) continue; if (domain && domain._disposed) continue; + hasQueue = !!first._asyncQueue; try { if (hasQueue) diff --git a/src/async-wrap-inl.h b/src/async-wrap-inl.h index b32ead8030..3223d835ec 100644 --- a/src/async-wrap-inl.h +++ b/src/async-wrap-inl.h @@ -38,7 +38,7 @@ namespace node { inline AsyncWrap::AsyncWrap(Environment* env, v8::Handle object) : BaseObject(env, object), async_flags_(NO_OPTIONS) { - if (!env->has_async_listeners()) + if (!env->has_async_listener()) return; // TODO(trevnorris): Do we really need to TryCatch this call? @@ -49,7 +49,7 @@ inline AsyncWrap::AsyncWrap(Environment* env, v8::Handle object) env->async_listener_run_function()->Call(env->process_object(), 1, &val); if (!try_catch.HasCaught()) - async_flags_ |= ASYNC_LISTENERS; + async_flags_ |= HAS_ASYNC_LISTENER; } @@ -83,8 +83,8 @@ inline void AsyncWrap::remove_flag(unsigned int flag) { } -inline bool AsyncWrap::has_async_queue() { - return async_flags() & ASYNC_LISTENERS; +inline bool AsyncWrap::has_async_listener() { + return async_flags() & HAS_ASYNC_LISTENER; } @@ -103,7 +103,7 @@ inline v8::Handle AsyncWrap::MakeDomainCallback( v8::TryCatch try_catch; try_catch.SetVerbose(true); - if (has_async_queue()) { + if (has_async_listener()) { v8::Local val = context.As(); env()->async_listener_load_function()->Call(process, 1, &val); @@ -143,7 +143,7 @@ inline v8::Handle AsyncWrap::MakeDomainCallback( return Undefined(env()->isolate()); } - if (has_async_queue()) { + if (has_async_listener()) { v8::Local val = context.As(); env()->async_listener_unload_function()->Call(process, 1, &val); @@ -192,7 +192,7 @@ inline v8::Handle AsyncWrap::MakeCallback( v8::TryCatch try_catch; try_catch.SetVerbose(true); - if (has_async_queue()) { + if (has_async_listener()) { v8::Local val = context.As(); env()->async_listener_load_function()->Call(process, 1, &val); @@ -206,7 +206,7 @@ inline v8::Handle AsyncWrap::MakeCallback( return Undefined(env()->isolate()); } - if (has_async_queue()) { + if (has_async_listener()) { v8::Local val = context.As(); env()->async_listener_unload_function()->Call(process, 1, &val); @@ -227,9 +227,6 @@ inline v8::Handle AsyncWrap::MakeCallback( tick_info->set_in_tick(true); - // TODO(trevnorris): Consider passing "context" to _tickCallback so it - // can then be passed as the first argument to the nextTick callback. - // That should greatly help needing to create closures. env()->tick_callback_function()->Call(process, 0, NULL); tick_info->set_in_tick(false); @@ -283,7 +280,7 @@ inline void AsyncWrap::AddAsyncListener( Type* wrap = static_cast( handle->GetAlignedPointerFromInternalField(0)); assert(wrap != NULL); - wrap->set_flag(ASYNC_LISTENERS); + wrap->set_flag(HAS_ASYNC_LISTENER); } @@ -305,7 +302,7 @@ inline void AsyncWrap::RemoveAsyncListener( Type* wrap = static_cast( handle->GetAlignedPointerFromInternalField(0)); assert(wrap != NULL); - wrap->remove_flag(ASYNC_LISTENERS); + wrap->remove_flag(HAS_ASYNC_LISTENER); } } diff --git a/src/async-wrap.h b/src/async-wrap.h index 25ee2eccd2..068dfa6915 100644 --- a/src/async-wrap.h +++ b/src/async-wrap.h @@ -32,7 +32,7 @@ class AsyncWrap : public BaseObject { public: enum AsyncFlags { NO_OPTIONS = 0, - ASYNC_LISTENERS = 1 + HAS_ASYNC_LISTENER = 1 }; inline AsyncWrap(Environment* env, v8::Handle object); @@ -48,7 +48,7 @@ class AsyncWrap : public BaseObject { inline void remove_flag(unsigned int flag); - inline bool has_async_queue(); + inline bool has_async_listener(); // Only call these within a valid HandleScope. inline v8::Handle MakeCallback(const v8::Handle cb, @@ -62,6 +62,8 @@ class AsyncWrap : public BaseObject { v8::Handle* argv); private: + inline AsyncWrap(); + // TODO(trevnorris): BURN IN FIRE! Remove this as soon as a suitable // replacement is committed. inline v8::Handle MakeDomainCallback( diff --git a/src/env-inl.h b/src/env-inl.h index c7baff4052..21075a68c2 100644 --- a/src/env-inl.h +++ b/src/env-inl.h @@ -82,8 +82,8 @@ inline int Environment::AsyncListener::fields_count() const { return kFieldsCount; } -inline uint32_t Environment::AsyncListener::count() const { - return fields_[kCount]; +inline bool Environment::AsyncListener::has_listener() const { + return fields_[kHasListener] > 0; } inline Environment::DomainFlag::DomainFlag() { @@ -205,9 +205,9 @@ inline v8::Isolate* Environment::isolate() const { return isolate_; } -inline bool Environment::has_async_listeners() const { +inline bool Environment::has_async_listener() const { // The const_cast is okay, it doesn't violate conceptual const-ness. - return const_cast(this)->async_listener()->count() > 0; + return const_cast(this)->async_listener()->has_listener(); } inline bool Environment::in_domain() const { diff --git a/src/env.h b/src/env.h index 71936dd5c2..b654ea2d0a 100644 --- a/src/env.h +++ b/src/env.h @@ -177,14 +177,14 @@ class Environment { public: inline uint32_t* fields(); inline int fields_count() const; - inline uint32_t count() const; + inline bool has_listener() const; private: friend class Environment; // So we can call the constructor. inline AsyncListener(); enum Fields { - kCount, + kHasListener, kFieldsCount }; @@ -253,7 +253,7 @@ class Environment { inline v8::Isolate* isolate() const; inline uv_loop_t* event_loop() const; - inline bool has_async_listeners() const; + inline bool has_async_listener() const; inline bool in_domain() const; static inline Environment* from_immediate_check_handle(uv_check_t* handle); diff --git a/src/node.js b/src/node.js index 2c61592737..fd0d98a333 100644 --- a/src/node.js +++ b/src/node.js @@ -256,9 +256,20 @@ }; startup.processAsyncListener = function() { - var asyncStack = []; - var asyncQueue = []; - var uid = 0; + // new Array() is used here because it is more efficient for sparse + // arrays. Please *do not* change these to simple bracket notation. + + // Track the active queue of AsyncListeners that have been added. + var asyncStack = new Array(); + var asyncQueue = undefined; + + // Keep the stack of all contexts that have been loaded in the + // execution chain of asynchronous events. + var contextStack = new Array(); + var currentContext = undefined; + + // Incremental uid for new AsyncListener instances. + var alUid = 0; // Stateful flags shared with Environment for quick JS/C++ // communication. @@ -272,7 +283,13 @@ var inErrorTick = false; // Needs to be the same as src/env.h - var kCount = 0; + var kHasListener = 0; + + // Flags to determine what async listeners are available. + var HAS_CREATE_AL = 1 << 0; + var HAS_BEFORE_AL = 1 << 1; + var HAS_AFTER_AL = 1 << 2; + var HAS_ERROR_AL = 1 << 3; // _errorHandler is scoped so it's also accessible by _fatalException. _errorHandler = errorHandler; @@ -298,115 +315,249 @@ pushListener, stripListener); - function popQueue() { - if (asyncStack.length > 0) - asyncQueue = asyncStack.pop(); - else - asyncQueue = []; + // Load the currently executing context as the current context, and + // create a new asyncQueue that can receive any added queue items + // during the executing of the callback. + function loadContext(ctx) { + contextStack.push(currentContext); + currentContext = ctx; + + asyncStack.push(asyncQueue); + asyncQueue = new Array(); + + asyncFlags[kHasListener] = 1; + } + + function unloadContext() { + currentContext = contextStack.pop(); + asyncQueue = asyncStack.pop(); + + if (typeof currentContext === 'undefined' && + typeof asyncQueue === 'undefined') + asyncFlags[kHasListener] = 0; } // Run all the async listeners attached when an asynchronous event is // instantiated. function runAsyncQueue(context) { - var queue = []; - var queueItem, item, i, value; + var queue = new Array(); + var data = new Array(); + var ccQueue, i, item, queueItem, value; + + context._asyncQueue = queue; + context._asyncData = data; + context._asyncFlags = 0; inAsyncTick = true; - for (i = 0; i < asyncQueue.length; i++) { - queueItem = asyncQueue[i]; - if (!queueItem.callbacks.create) { - queue[i] = queueItem; - continue; + + // First run through all callbacks in the currentContext. These may + // add new AsyncListeners to the asyncQueue during execution. Hence + // why they need to be evaluated first. + if (currentContext) { + ccQueue = currentContext._asyncQueue; + context._asyncFlags |= currentContext._asyncFlags; + for (i = 0; i < ccQueue.length; i++) { + queueItem = ccQueue[i]; + queue[queue.length] = queueItem; + if ((queueItem.flags & HAS_CREATE_AL) === 0) { + data[queueItem.uid] = queueItem.data; + continue; + } + value = queueItem.create(queueItem.data); + data[queueItem.uid] = (value === undefined) ? queueItem.data : value; } - // Not passing "this" context because it hasn't actually been - // instantiated yet, so accessing some of the object properties - // can cause a segfault. - // Passing the original value will allow users to manipulate the - // original value object, while also allowing them to return a - // new value for current async call tracking. - value = queueItem.callbacks.create(queueItem.value); - if (typeof value !== 'undefined') { - item = { - callbacks: queueItem.callbacks, - value: value, - uid: queueItem.uid - }; - } else { - item = queueItem; + } + + + // Then run through all items in the asyncQueue + if (asyncQueue) { + for (i = 0; i < asyncQueue.length; i++) { + queueItem = asyncQueue[i]; + queue[queue.length] = queueItem; + context._asyncFlags |= queueItem.flags; + if ((queueItem.flags & HAS_CREATE_AL) === 0) { + data[queueItem.uid] = queueItem.data; + continue; + } + value = queueItem.create(queueItem.data); + data[queueItem.uid] = (value === undefined) ? queueItem.data : value; } - queue[i] = item; } - inAsyncTick = false; - context._asyncQueue = queue; + inAsyncTick = false; } - // Uses the _asyncQueue object attached by runAsyncQueue. + // Load the AsyncListener queue attached to context and run all + // "before" callbacks, if they exist. function loadAsyncQueue(context) { - var queue = context._asyncQueue; - var item, before, i; + loadContext(context); - asyncStack.push(asyncQueue); - asyncQueue = queue.slice(); - // Since the async listener callback is required, the number of - // objects in the asyncQueue implies the number of async listeners - // there are to be processed. - asyncFlags[kCount] = queue.length; + if ((context._asyncFlags & HAS_BEFORE_AL) === 0) + return; + + var queue = context._asyncQueue; + var data = context._asyncData; + var i, queueItem; - // Run "before" callbacks. inAsyncTick = true; for (i = 0; i < queue.length; i++) { - item = queue[i]; - if (!item.callbacks) - continue; - before = item.callbacks.before; - if (typeof before === 'function') - before(context, item.value); + queueItem = queue[i]; + if ((queueItem.flags & HAS_BEFORE_AL) > 0) + queueItem.before(context, data[queueItem.uid]); } inAsyncTick = false; } - // Unload one level of the async stack. Returns true if there are - // still listeners somewhere in the stack. + // Unload the AsyncListener queue attached to context and run all + // "after" callbacks, if they exist. function unloadAsyncQueue(context) { + if ((context._asyncFlags & HAS_AFTER_AL) === 0) { + unloadContext(); + return; + } + var queue = context._asyncQueue; - var item, after, i; + var data = context._asyncData; + var i, queueItem; - // Run "after" callbacks. inAsyncTick = true; for (i = 0; i < queue.length; i++) { - item = queue[i]; - if (!item.callbacks) - continue; - after = item.callbacks.after; - if (typeof after === 'function') - after(context, item.value); + queueItem = queue[i]; + if ((queueItem.flags & HAS_AFTER_AL) > 0) + queueItem.after(context, data[queueItem.uid]); } inAsyncTick = false; - // Unload the current queue from the stack. - popQueue(); + unloadContext(); + } + + // Handle errors that are thrown while in the context of an + // AsyncListener. If an error is thrown from an AsyncListener + // callback error handlers will be called once more to report + // the error, then the application will die forcefully. + function errorHandler(er) { + if (inErrorTick) + return false; + + var handled = false; + var i, queueItem, threw; + + inErrorTick = true; + + // First process error callbacks from the current context. + if (currentContext && (currentContext._asyncFlags & HAS_ERROR_AL) > 0) { + var queue = currentContext._asyncQueue; + var data = currentContext._asyncData; + for (i = 0; i < queue.length; i++) { + queueItem = queue[i]; + if ((queueItem.flags & HAS_ERROR_AL) === 0) + continue; + try { + threw = true; + // While it would be possible to pass in currentContext, if + // the error is thrown from the "create" callback then there's + // a chance the object hasn't been fully constructed. + handled = queueItem.error(data[queueItem.uid], er) || handled; + threw = false; + } finally { + // If the error callback thew then die quickly. Only allow the + // exit events to be processed. + if (threw) { + process._exiting = true; + process.emit('exit', 1); + } + } + } + } + + // Now process callbacks from any existing queue. + if (asyncQueue) { + for (i = 0; i < asyncQueue.length; i++) { + queueItem = asyncQueue[i]; + if ((queueItem.flags & HAS_ERROR_AL) === 0) + continue; + try { + threw = true; + handled = queueItem.error(queueItem.data, er) || handled; + threw = false; + } finally { + // If the error callback thew then die quickly. Only allow the + // exit events to be processed. + if (threw) { + process._exiting = true; + process.emit('exit', 1); + } + } + } + } + + inErrorTick = false; + + unloadContext(); + + // TODO(trevnorris): If the error was handled, should the after callbacks + // be fired anyways? - asyncFlags[kCount] = asyncQueue.length; + return handled && !inAsyncTick; + } + + // Instance function of an AsyncListener object. + function AsyncListenerInst(callbacks, data) { + if (typeof callbacks.create === 'function') { + this.create = callbacks.create; + this.flags |= HAS_CREATE_AL; + } + if (typeof callbacks.before === 'function') { + this.before = callbacks.before; + this.flags |= HAS_BEFORE_AL; + } + if (typeof callbacks.after === 'function') { + this.after = callbacks.after; + this.flags |= HAS_AFTER_AL; + } + if (typeof callbacks.error === 'function') { + this.error = callbacks.error; + this.flags |= HAS_ERROR_AL; + } - return asyncQueue.length > 0 || asyncStack.length > 0; + this.uid = ++alUid; + this.data = data; } + AsyncListenerInst.prototype.create = undefined; + AsyncListenerInst.prototype.before = undefined; + AsyncListenerInst.prototype.after = undefined; + AsyncListenerInst.prototype.error = undefined; + AsyncListenerInst.prototype.uid = 0; + AsyncListenerInst.prototype.flags = 0; // Create new async listener object. Useful when instantiating a new // object and want the listener instance, but not add it to the stack. - function createAsyncListener(callbacks, value) { - return { - callbacks: callbacks, - value: value, - uid: uid++ - }; + // If an existing AsyncListenerInst is passed then any new "data" is + // ignored. + function createAsyncListener(callbacks, data) { + if (typeof callbacks !== 'object' || callbacks == null) + throw new TypeError('callbacks argument must be an object'); + + if (callbacks instanceof AsyncListenerInst) + return callbacks; + else + return new AsyncListenerInst(callbacks, data); } // Add a listener to the current queue. - function addAsyncListener(callbacks, value) { - // Accept new listeners or previous created listeners. - if (typeof callbacks.uid !== 'number') - callbacks = createAsyncListener(callbacks, value); + function addAsyncListener(callbacks, data) { + if (!asyncQueue) { + asyncStack.push(asyncQueue); + asyncQueue = new Array(); + } + + // Fast track if a new AsyncListenerInst has to be created. + if (!(callbacks instanceof AsyncListenerInst)) { + callbacks = createAsyncListener(callbacks, data); + asyncQueue.push(callbacks); + asyncFlags[kHasListener] = 1; + return callbacks; + } var inQueue = false; // The asyncQueue will be small. Probably always <= 3 items. @@ -418,10 +569,11 @@ } // Make sure the callback doesn't already exist in the queue. - if (!inQueue) + if (!inQueue) { asyncQueue.push(callbacks); + asyncFlags[kHasListener] = 1; + } - asyncFlags[kCount] = asyncQueue.length; return callbacks; } @@ -429,14 +581,19 @@ function removeAsyncListener(obj) { var i, j; - for (i = 0; i < asyncQueue.length; i++) { - if (obj.uid === asyncQueue[i].uid) { - asyncQueue.splice(i, 1); - break; + if (asyncQueue) { + for (i = 0; i < asyncQueue.length; i++) { + if (obj.uid === asyncQueue[i].uid) { + asyncQueue.splice(i, 1); + break; + } } } + // TODO(trevnorris): Why remove the AL from the entire stack? for (i = 0; i < asyncStack.length; i++) { + if (asyncStack[i] === undefined) + continue; for (j = 0; j < asyncStack[i].length; j++) { if (obj.uid === asyncStack[i][j].uid) { asyncStack[i].splice(j, 1); @@ -445,54 +602,25 @@ } } - asyncFlags[kCount] = asyncQueue.length; - } - - // Error handler used by _fatalException to run through all error - // callbacks in the current asyncQueue. - function errorHandler(er) { - var handled = false; - var error, item, i; - - if (inErrorTick) - return false; - - inErrorTick = true; - for (i = 0; i < asyncQueue.length; i++) { - item = asyncQueue[i]; - if (!item.callbacks) - continue; - error = item.callbacks.error; - if (typeof error === 'function') { - try { - var threw = true; - handled = error(item.value, er) || handled; - threw = false; - } finally { - // If the error callback throws then we're going to die - // quickly with no chance of recovery. Only thing we're going - // to allow is execution of process exit event callbacks. - if (threw) { - process._exiting = true; - process.emit('exit', 1); - } - } - } - } - inErrorTick = false; - - // Unload the current queue from the stack. - popQueue(); - - return handled && !inAsyncTick; + if ((asyncQueue && asyncQueue.length > 0) || + (currentContext && currentContext._asyncQueue.length)) + asyncFlags[kHasListener] = 1; } // Used by AsyncWrap::AddAsyncListener() to add an individual listener // to the async queue. It will check the uid of the listener and only // allow it to be added once. function pushListener(obj) { - if (!this._asyncQueue) - this._asyncQueue = []; + if (!this._asyncQueue) { + this._asyncQueue = [obj]; + this._asyncData = new Array(); + this._asyncData[obj.uid] = obj.data; + this._asyncFlags = obj.flags; + return; + } + + if (!this._asyncData) + this._asyncData = new Array(); var queue = this._asyncQueue; var inQueue = false; @@ -504,26 +632,32 @@ } } - if (!inQueue) + // Not in the queue so push it on and set the default storage. + if (!inQueue) { queue.push(obj); + this._asyncData[obj.uid] = obj.data; + this._asyncFlags |= obj.flags; + } } // Used by AsyncWrap::RemoveAsyncListener() to remove an individual // listener from the async queue, and return whether there are still // listeners in the queue. function stripListener(obj) { - if (!this._asyncQueue || this._asyncQueue.length === 0) + // No queue exists, so nothing to do. + if (!this._asyncQueue) return false; + var queue = this._asyncQueue; + // The asyncQueue will be small. Probably always <= 3 items. - for (var i = 0; i < this._asyncQueue.length; i++) { - if (obj.uid === this._asyncQueue[i].uid) { - this._asyncQueue.splice(i, 1); - break; + for (var i = 0; i < queue.length; i++) { + if (obj.uid === queue[i].uid) { + this._asyncData[queue[i].uid] = undefined; + queue.splice(i, 1); + return queue.length > 0; } } - - return this._asyncQueue.length > 0; } };