Browse Source

node: AsyncListener use separate storage mechanism

Before when an AsyncListener object was created and the "create"
callback returned a value, it was necessary to construct a new Object
with the same callbacks but add a place for the new storage value.

Now, instead, a separate storage array is kept on the context which is
used for any return value of the "create" callback. This significantly
reduces the number of Objects that need to be created.

Also added a flags property to the context to quickly check if a
specific callback was available either on the context or on the
AsyncListener instance itself.

Few other minor changes for readability that were difficult to separate
into their own commit.

This has not been optimized yet.
v0.11.11-release
Trevor Norris 11 years ago
parent
commit
646ac18d79
  1. 49
      lib/timers.js
  2. 23
      src/async-wrap-inl.h
  3. 6
      src/async-wrap.h
  4. 8
      src/env-inl.h
  5. 6
      src/env.h
  6. 394
      src/node.js

49
lib/timers.js

@ -35,6 +35,9 @@ var runAsyncQueue = process._runAsyncQueue;
var loadAsyncQueue = process._loadAsyncQueue;
var unloadAsyncQueue = process._unloadAsyncQueue;
// Same as in AsyncListener in env.h
var kHasListener = 0;
// Do a little housekeeping.
delete process._asyncFlags;
delete process._runAsyncQueue;
@ -57,6 +60,8 @@ var lists = {};
// Make Timer as monomorphic as possible.
Timer.prototype._asyncQueue = undefined;
Timer.prototype._asyncData = undefined;
Timer.prototype._asyncFlags = 0;
// the main function - creates lists on demand and the watchers associated
// with them.
@ -195,21 +200,38 @@ exports.active = function(item) {
// Whether or not a new TimerWrap needed to be created, this should run
// for each item. This way each "item" (i.e. timer) can properly have
// their own domain assigned.
if (asyncFlags[0] > 0)
if (asyncFlags[kHasListener] > 0)
runAsyncQueue(item);
};
function timerAddAsyncListener(obj) {
// new Array() is used here because it is more efficient for sparse
// arrays. Please *do not* change these to simple bracket notation.
function timerAddAsyncListener(obj, data) {
obj = process.createAsyncListener(obj, data);
if (!this._asyncQueue)
this._asyncQueue = [];
this._asyncQueue = new Array();
if (!this._asyncData)
this._asyncData = new Array();
var inQueue = false;
var queue = this._asyncQueue;
// This queue will be small. Probably always <= 3 items.
for (var i = 0; i < queue.length; i++) {
if (queue[i].uid === obj.uid)
return;
if (queue[i].uid === obj.uid) {
inQueue = true;
break;
}
}
this._asyncQueue.push(obj);
if (!inQueue) {
queue.push(obj);
this._asyncData[obj.uid] = obj.data;
this._asyncFlags |= obj.flags;
}
return obj;
}
@ -217,13 +239,20 @@ function timerRemoveAsyncListener(obj) {
if (!this._asyncQueue)
return;
var queue = this._asyncQueue;
var i;
// This queue will be small. Probably always <= 3 items.
for (var i = 0; i < queue.length; i++) {
for (i = 0; i < queue.length; i++) {
if (queue[i].uid === obj.uid) {
queue.splice(i, 1);
this._asyncData[obj.uid] = undefined;
return;
}
}
// Rebuild flags
this._asyncFlags = 0;
for (i = 0; i < queue.length; i++) {
this._asyncFlags |= queue[i].flags;
}
}
@ -428,8 +457,10 @@ Immediate.prototype.removeAsyncListener = timerRemoveAsyncListener;
Immediate.prototype.domain = undefined;
Immediate.prototype._onImmediate = undefined;
Immediate.prototype._asyncQueue = undefined;
Immediate.prototype._asyncData = undefined;
Immediate.prototype._idleNext = undefined;
Immediate.prototype._idlePrev = undefined;
Immediate.prototype._asyncFlags = 0;
exports.setImmediate = function(callback) {
@ -456,7 +487,7 @@ exports.setImmediate = function(callback) {
}
// setImmediates are handled more like nextTicks.
if (asyncFlags[0] > 0)
if (asyncFlags[kHasListener] > 0)
runAsyncQueue(immediate);
if (process.domain)
immediate.domain = process.domain;
@ -494,7 +525,6 @@ function unrefTimeout() {
var diff, domain, first, hasQueue, threw;
while (first = L.peek(unrefList)) {
diff = now - first._idleStart;
hasQueue = !!first._asyncQueue;
if (diff < first._idleTimeout) {
diff = first._idleTimeout - diff;
@ -510,6 +540,7 @@ function unrefTimeout() {
if (!first._onTimeout) continue;
if (domain && domain._disposed) continue;
hasQueue = !!first._asyncQueue;
try {
if (hasQueue)

23
src/async-wrap-inl.h

@ -38,7 +38,7 @@ namespace node {
inline AsyncWrap::AsyncWrap(Environment* env, v8::Handle<v8::Object> object)
: BaseObject(env, object),
async_flags_(NO_OPTIONS) {
if (!env->has_async_listeners())
if (!env->has_async_listener())
return;
// TODO(trevnorris): Do we really need to TryCatch this call?
@ -49,7 +49,7 @@ inline AsyncWrap::AsyncWrap(Environment* env, v8::Handle<v8::Object> object)
env->async_listener_run_function()->Call(env->process_object(), 1, &val);
if (!try_catch.HasCaught())
async_flags_ |= ASYNC_LISTENERS;
async_flags_ |= HAS_ASYNC_LISTENER;
}
@ -83,8 +83,8 @@ inline void AsyncWrap::remove_flag(unsigned int flag) {
}
inline bool AsyncWrap::has_async_queue() {
return async_flags() & ASYNC_LISTENERS;
inline bool AsyncWrap::has_async_listener() {
return async_flags() & HAS_ASYNC_LISTENER;
}
@ -103,7 +103,7 @@ inline v8::Handle<v8::Value> AsyncWrap::MakeDomainCallback(
v8::TryCatch try_catch;
try_catch.SetVerbose(true);
if (has_async_queue()) {
if (has_async_listener()) {
v8::Local<v8::Value> val = context.As<v8::Value>();
env()->async_listener_load_function()->Call(process, 1, &val);
@ -143,7 +143,7 @@ inline v8::Handle<v8::Value> AsyncWrap::MakeDomainCallback(
return Undefined(env()->isolate());
}
if (has_async_queue()) {
if (has_async_listener()) {
v8::Local<v8::Value> val = context.As<v8::Value>();
env()->async_listener_unload_function()->Call(process, 1, &val);
@ -192,7 +192,7 @@ inline v8::Handle<v8::Value> AsyncWrap::MakeCallback(
v8::TryCatch try_catch;
try_catch.SetVerbose(true);
if (has_async_queue()) {
if (has_async_listener()) {
v8::Local<v8::Value> val = context.As<v8::Value>();
env()->async_listener_load_function()->Call(process, 1, &val);
@ -206,7 +206,7 @@ inline v8::Handle<v8::Value> AsyncWrap::MakeCallback(
return Undefined(env()->isolate());
}
if (has_async_queue()) {
if (has_async_listener()) {
v8::Local<v8::Value> val = context.As<v8::Value>();
env()->async_listener_unload_function()->Call(process, 1, &val);
@ -227,9 +227,6 @@ inline v8::Handle<v8::Value> AsyncWrap::MakeCallback(
tick_info->set_in_tick(true);
// TODO(trevnorris): Consider passing "context" to _tickCallback so it
// can then be passed as the first argument to the nextTick callback.
// That should greatly help needing to create closures.
env()->tick_callback_function()->Call(process, 0, NULL);
tick_info->set_in_tick(false);
@ -283,7 +280,7 @@ inline void AsyncWrap::AddAsyncListener(
Type* wrap = static_cast<Type*>(
handle->GetAlignedPointerFromInternalField(0));
assert(wrap != NULL);
wrap->set_flag(ASYNC_LISTENERS);
wrap->set_flag(HAS_ASYNC_LISTENER);
}
@ -305,7 +302,7 @@ inline void AsyncWrap::RemoveAsyncListener(
Type* wrap = static_cast<Type*>(
handle->GetAlignedPointerFromInternalField(0));
assert(wrap != NULL);
wrap->remove_flag(ASYNC_LISTENERS);
wrap->remove_flag(HAS_ASYNC_LISTENER);
}
}

6
src/async-wrap.h

@ -32,7 +32,7 @@ class AsyncWrap : public BaseObject {
public:
enum AsyncFlags {
NO_OPTIONS = 0,
ASYNC_LISTENERS = 1
HAS_ASYNC_LISTENER = 1
};
inline AsyncWrap(Environment* env, v8::Handle<v8::Object> object);
@ -48,7 +48,7 @@ class AsyncWrap : public BaseObject {
inline void remove_flag(unsigned int flag);
inline bool has_async_queue();
inline bool has_async_listener();
// Only call these within a valid HandleScope.
inline v8::Handle<v8::Value> MakeCallback(const v8::Handle<v8::Function> cb,
@ -62,6 +62,8 @@ class AsyncWrap : public BaseObject {
v8::Handle<v8::Value>* argv);
private:
inline AsyncWrap();
// TODO(trevnorris): BURN IN FIRE! Remove this as soon as a suitable
// replacement is committed.
inline v8::Handle<v8::Value> MakeDomainCallback(

8
src/env-inl.h

@ -82,8 +82,8 @@ inline int Environment::AsyncListener::fields_count() const {
return kFieldsCount;
}
inline uint32_t Environment::AsyncListener::count() const {
return fields_[kCount];
inline bool Environment::AsyncListener::has_listener() const {
return fields_[kHasListener] > 0;
}
inline Environment::DomainFlag::DomainFlag() {
@ -205,9 +205,9 @@ inline v8::Isolate* Environment::isolate() const {
return isolate_;
}
inline bool Environment::has_async_listeners() const {
inline bool Environment::has_async_listener() const {
// The const_cast is okay, it doesn't violate conceptual const-ness.
return const_cast<Environment*>(this)->async_listener()->count() > 0;
return const_cast<Environment*>(this)->async_listener()->has_listener();
}
inline bool Environment::in_domain() const {

6
src/env.h

@ -177,14 +177,14 @@ class Environment {
public:
inline uint32_t* fields();
inline int fields_count() const;
inline uint32_t count() const;
inline bool has_listener() const;
private:
friend class Environment; // So we can call the constructor.
inline AsyncListener();
enum Fields {
kCount,
kHasListener,
kFieldsCount
};
@ -253,7 +253,7 @@ class Environment {
inline v8::Isolate* isolate() const;
inline uv_loop_t* event_loop() const;
inline bool has_async_listeners() const;
inline bool has_async_listener() const;
inline bool in_domain() const;
static inline Environment* from_immediate_check_handle(uv_check_t* handle);

394
src/node.js

@ -256,9 +256,20 @@
};
startup.processAsyncListener = function() {
var asyncStack = [];
var asyncQueue = [];
var uid = 0;
// new Array() is used here because it is more efficient for sparse
// arrays. Please *do not* change these to simple bracket notation.
// Track the active queue of AsyncListeners that have been added.
var asyncStack = new Array();
var asyncQueue = undefined;
// Keep the stack of all contexts that have been loaded in the
// execution chain of asynchronous events.
var contextStack = new Array();
var currentContext = undefined;
// Incremental uid for new AsyncListener instances.
var alUid = 0;
// Stateful flags shared with Environment for quick JS/C++
// communication.
@ -272,7 +283,13 @@
var inErrorTick = false;
// Needs to be the same as src/env.h
var kCount = 0;
var kHasListener = 0;
// Flags to determine what async listeners are available.
var HAS_CREATE_AL = 1 << 0;
var HAS_BEFORE_AL = 1 << 1;
var HAS_AFTER_AL = 1 << 2;
var HAS_ERROR_AL = 1 << 3;
// _errorHandler is scoped so it's also accessible by _fatalException.
_errorHandler = errorHandler;
@ -298,115 +315,249 @@
pushListener,
stripListener);
function popQueue() {
if (asyncStack.length > 0)
asyncQueue = asyncStack.pop();
else
asyncQueue = [];
// Load the currently executing context as the current context, and
// create a new asyncQueue that can receive any added queue items
// during the executing of the callback.
function loadContext(ctx) {
contextStack.push(currentContext);
currentContext = ctx;
asyncStack.push(asyncQueue);
asyncQueue = new Array();
asyncFlags[kHasListener] = 1;
}
function unloadContext() {
currentContext = contextStack.pop();
asyncQueue = asyncStack.pop();
if (typeof currentContext === 'undefined' &&
typeof asyncQueue === 'undefined')
asyncFlags[kHasListener] = 0;
}
// Run all the async listeners attached when an asynchronous event is
// instantiated.
function runAsyncQueue(context) {
var queue = [];
var queueItem, item, i, value;
var queue = new Array();
var data = new Array();
var ccQueue, i, item, queueItem, value;
context._asyncQueue = queue;
context._asyncData = data;
context._asyncFlags = 0;
inAsyncTick = true;
for (i = 0; i < asyncQueue.length; i++) {
queueItem = asyncQueue[i];
if (!queueItem.callbacks.create) {
queue[i] = queueItem;
continue;
// First run through all callbacks in the currentContext. These may
// add new AsyncListeners to the asyncQueue during execution. Hence
// why they need to be evaluated first.
if (currentContext) {
ccQueue = currentContext._asyncQueue;
context._asyncFlags |= currentContext._asyncFlags;
for (i = 0; i < ccQueue.length; i++) {
queueItem = ccQueue[i];
queue[queue.length] = queueItem;
if ((queueItem.flags & HAS_CREATE_AL) === 0) {
data[queueItem.uid] = queueItem.data;
continue;
}
value = queueItem.create(queueItem.data);
data[queueItem.uid] = (value === undefined) ? queueItem.data : value;
}
// Not passing "this" context because it hasn't actually been
// instantiated yet, so accessing some of the object properties
// can cause a segfault.
// Passing the original value will allow users to manipulate the
// original value object, while also allowing them to return a
// new value for current async call tracking.
value = queueItem.callbacks.create(queueItem.value);
if (typeof value !== 'undefined') {
item = {
callbacks: queueItem.callbacks,
value: value,
uid: queueItem.uid
};
} else {
item = queueItem;
}
// Then run through all items in the asyncQueue
if (asyncQueue) {
for (i = 0; i < asyncQueue.length; i++) {
queueItem = asyncQueue[i];
queue[queue.length] = queueItem;
context._asyncFlags |= queueItem.flags;
if ((queueItem.flags & HAS_CREATE_AL) === 0) {
data[queueItem.uid] = queueItem.data;
continue;
}
value = queueItem.create(queueItem.data);
data[queueItem.uid] = (value === undefined) ? queueItem.data : value;
}
queue[i] = item;
}
inAsyncTick = false;
context._asyncQueue = queue;
inAsyncTick = false;
}
// Uses the _asyncQueue object attached by runAsyncQueue.
// Load the AsyncListener queue attached to context and run all
// "before" callbacks, if they exist.
function loadAsyncQueue(context) {
var queue = context._asyncQueue;
var item, before, i;
loadContext(context);
asyncStack.push(asyncQueue);
asyncQueue = queue.slice();
// Since the async listener callback is required, the number of
// objects in the asyncQueue implies the number of async listeners
// there are to be processed.
asyncFlags[kCount] = queue.length;
if ((context._asyncFlags & HAS_BEFORE_AL) === 0)
return;
var queue = context._asyncQueue;
var data = context._asyncData;
var i, queueItem;
// Run "before" callbacks.
inAsyncTick = true;
for (i = 0; i < queue.length; i++) {
item = queue[i];
if (!item.callbacks)
continue;
before = item.callbacks.before;
if (typeof before === 'function')
before(context, item.value);
queueItem = queue[i];
if ((queueItem.flags & HAS_BEFORE_AL) > 0)
queueItem.before(context, data[queueItem.uid]);
}
inAsyncTick = false;
}
// Unload one level of the async stack. Returns true if there are
// still listeners somewhere in the stack.
// Unload the AsyncListener queue attached to context and run all
// "after" callbacks, if they exist.
function unloadAsyncQueue(context) {
if ((context._asyncFlags & HAS_AFTER_AL) === 0) {
unloadContext();
return;
}
var queue = context._asyncQueue;
var item, after, i;
var data = context._asyncData;
var i, queueItem;
// Run "after" callbacks.
inAsyncTick = true;
for (i = 0; i < queue.length; i++) {
item = queue[i];
if (!item.callbacks)
continue;
after = item.callbacks.after;
if (typeof after === 'function')
after(context, item.value);
queueItem = queue[i];
if ((queueItem.flags & HAS_AFTER_AL) > 0)
queueItem.after(context, data[queueItem.uid]);
}
inAsyncTick = false;
// Unload the current queue from the stack.
popQueue();
unloadContext();
}
// Handle errors that are thrown while in the context of an
// AsyncListener. If an error is thrown from an AsyncListener
// callback error handlers will be called once more to report
// the error, then the application will die forcefully.
function errorHandler(er) {
if (inErrorTick)
return false;
var handled = false;
var i, queueItem, threw;
inErrorTick = true;
// First process error callbacks from the current context.
if (currentContext && (currentContext._asyncFlags & HAS_ERROR_AL) > 0) {
var queue = currentContext._asyncQueue;
var data = currentContext._asyncData;
for (i = 0; i < queue.length; i++) {
queueItem = queue[i];
if ((queueItem.flags & HAS_ERROR_AL) === 0)
continue;
try {
threw = true;
// While it would be possible to pass in currentContext, if
// the error is thrown from the "create" callback then there's
// a chance the object hasn't been fully constructed.
handled = queueItem.error(data[queueItem.uid], er) || handled;
threw = false;
} finally {
// If the error callback thew then die quickly. Only allow the
// exit events to be processed.
if (threw) {
process._exiting = true;
process.emit('exit', 1);
}
}
}
}
// Now process callbacks from any existing queue.
if (asyncQueue) {
for (i = 0; i < asyncQueue.length; i++) {
queueItem = asyncQueue[i];
if ((queueItem.flags & HAS_ERROR_AL) === 0)
continue;
try {
threw = true;
handled = queueItem.error(queueItem.data, er) || handled;
threw = false;
} finally {
// If the error callback thew then die quickly. Only allow the
// exit events to be processed.
if (threw) {
process._exiting = true;
process.emit('exit', 1);
}
}
}
}
inErrorTick = false;
unloadContext();
// TODO(trevnorris): If the error was handled, should the after callbacks
// be fired anyways?
asyncFlags[kCount] = asyncQueue.length;
return handled && !inAsyncTick;
}
// Instance function of an AsyncListener object.
function AsyncListenerInst(callbacks, data) {
if (typeof callbacks.create === 'function') {
this.create = callbacks.create;
this.flags |= HAS_CREATE_AL;
}
if (typeof callbacks.before === 'function') {
this.before = callbacks.before;
this.flags |= HAS_BEFORE_AL;
}
if (typeof callbacks.after === 'function') {
this.after = callbacks.after;
this.flags |= HAS_AFTER_AL;
}
if (typeof callbacks.error === 'function') {
this.error = callbacks.error;
this.flags |= HAS_ERROR_AL;
}
return asyncQueue.length > 0 || asyncStack.length > 0;
this.uid = ++alUid;
this.data = data;
}
AsyncListenerInst.prototype.create = undefined;
AsyncListenerInst.prototype.before = undefined;
AsyncListenerInst.prototype.after = undefined;
AsyncListenerInst.prototype.error = undefined;
AsyncListenerInst.prototype.uid = 0;
AsyncListenerInst.prototype.flags = 0;
// Create new async listener object. Useful when instantiating a new
// object and want the listener instance, but not add it to the stack.
function createAsyncListener(callbacks, value) {
return {
callbacks: callbacks,
value: value,
uid: uid++
};
// If an existing AsyncListenerInst is passed then any new "data" is
// ignored.
function createAsyncListener(callbacks, data) {
if (typeof callbacks !== 'object' || callbacks == null)
throw new TypeError('callbacks argument must be an object');
if (callbacks instanceof AsyncListenerInst)
return callbacks;
else
return new AsyncListenerInst(callbacks, data);
}
// Add a listener to the current queue.
function addAsyncListener(callbacks, value) {
// Accept new listeners or previous created listeners.
if (typeof callbacks.uid !== 'number')
callbacks = createAsyncListener(callbacks, value);
function addAsyncListener(callbacks, data) {
if (!asyncQueue) {
asyncStack.push(asyncQueue);
asyncQueue = new Array();
}
// Fast track if a new AsyncListenerInst has to be created.
if (!(callbacks instanceof AsyncListenerInst)) {
callbacks = createAsyncListener(callbacks, data);
asyncQueue.push(callbacks);
asyncFlags[kHasListener] = 1;
return callbacks;
}
var inQueue = false;
// The asyncQueue will be small. Probably always <= 3 items.
@ -418,10 +569,11 @@
}
// Make sure the callback doesn't already exist in the queue.
if (!inQueue)
if (!inQueue) {
asyncQueue.push(callbacks);
asyncFlags[kHasListener] = 1;
}
asyncFlags[kCount] = asyncQueue.length;
return callbacks;
}
@ -429,14 +581,19 @@
function removeAsyncListener(obj) {
var i, j;
for (i = 0; i < asyncQueue.length; i++) {
if (obj.uid === asyncQueue[i].uid) {
asyncQueue.splice(i, 1);
break;
if (asyncQueue) {
for (i = 0; i < asyncQueue.length; i++) {
if (obj.uid === asyncQueue[i].uid) {
asyncQueue.splice(i, 1);
break;
}
}
}
// TODO(trevnorris): Why remove the AL from the entire stack?
for (i = 0; i < asyncStack.length; i++) {
if (asyncStack[i] === undefined)
continue;
for (j = 0; j < asyncStack[i].length; j++) {
if (obj.uid === asyncStack[i][j].uid) {
asyncStack[i].splice(j, 1);
@ -445,54 +602,25 @@
}
}
asyncFlags[kCount] = asyncQueue.length;
}
// Error handler used by _fatalException to run through all error
// callbacks in the current asyncQueue.
function errorHandler(er) {
var handled = false;
var error, item, i;
if (inErrorTick)
return false;
inErrorTick = true;
for (i = 0; i < asyncQueue.length; i++) {
item = asyncQueue[i];
if (!item.callbacks)
continue;
error = item.callbacks.error;
if (typeof error === 'function') {
try {
var threw = true;
handled = error(item.value, er) || handled;
threw = false;
} finally {
// If the error callback throws then we're going to die
// quickly with no chance of recovery. Only thing we're going
// to allow is execution of process exit event callbacks.
if (threw) {
process._exiting = true;
process.emit('exit', 1);
}
}
}
}
inErrorTick = false;
// Unload the current queue from the stack.
popQueue();
return handled && !inAsyncTick;
if ((asyncQueue && asyncQueue.length > 0) ||
(currentContext && currentContext._asyncQueue.length))
asyncFlags[kHasListener] = 1;
}
// Used by AsyncWrap::AddAsyncListener() to add an individual listener
// to the async queue. It will check the uid of the listener and only
// allow it to be added once.
function pushListener(obj) {
if (!this._asyncQueue)
this._asyncQueue = [];
if (!this._asyncQueue) {
this._asyncQueue = [obj];
this._asyncData = new Array();
this._asyncData[obj.uid] = obj.data;
this._asyncFlags = obj.flags;
return;
}
if (!this._asyncData)
this._asyncData = new Array();
var queue = this._asyncQueue;
var inQueue = false;
@ -504,26 +632,32 @@
}
}
if (!inQueue)
// Not in the queue so push it on and set the default storage.
if (!inQueue) {
queue.push(obj);
this._asyncData[obj.uid] = obj.data;
this._asyncFlags |= obj.flags;
}
}
// Used by AsyncWrap::RemoveAsyncListener() to remove an individual
// listener from the async queue, and return whether there are still
// listeners in the queue.
function stripListener(obj) {
if (!this._asyncQueue || this._asyncQueue.length === 0)
// No queue exists, so nothing to do.
if (!this._asyncQueue)
return false;
var queue = this._asyncQueue;
// The asyncQueue will be small. Probably always <= 3 items.
for (var i = 0; i < this._asyncQueue.length; i++) {
if (obj.uid === this._asyncQueue[i].uid) {
this._asyncQueue.splice(i, 1);
break;
for (var i = 0; i < queue.length; i++) {
if (obj.uid === queue[i].uid) {
this._asyncData[queue[i].uid] = undefined;
queue.splice(i, 1);
return queue.length > 0;
}
}
return this._asyncQueue.length > 0;
}
};

Loading…
Cancel
Save