diff --git a/lib/timers.js b/lib/timers.js index 93dd65e375..0fce78d4da 100644 --- a/lib/timers.js +++ b/lib/timers.js @@ -30,6 +30,17 @@ var TIMEOUT_MAX = 2147483647; // 2^31-1 var debug = require('util').debuglog('timer'); +var asyncFlags = process._asyncFlags; +var runAsyncQueue = process._runAsyncQueue; +var loadAsyncQueue = process._loadAsyncQueue; +var unloadAsyncQueue = process._unloadAsyncQueue; + +// Do a little housekeeping. +delete process._asyncFlags; +delete process._runAsyncQueue; +delete process._loadAsyncQueue; +delete process._unloadAsyncQueue; + // IDLE TIMEOUTS // @@ -44,6 +55,9 @@ var debug = require('util').debuglog('timer'); // value = list var lists = {}; +// Make Timer as monomorphic as possible. +Timer.prototype._asyncQueue = undefined; + // the main function - creates lists on demand and the watchers associated // with them. function insert(item, msecs) { @@ -80,9 +94,9 @@ function listOnTimeout() { var now = Timer.now(); debug('now: %s', now); - var first; + var diff, first, hasQueue, threw; while (first = L.peek(list)) { - var diff = now - first._idleStart; + diff = now - first._idleStart; if (diff < msecs) { list.start(msecs - diff, 0); debug('%d list wait because diff is %d', msecs, diff); @@ -99,12 +113,20 @@ function listOnTimeout() { // // https://github.com/joyent/node/issues/2631 var domain = first.domain; - if (domain && domain._disposed) continue; + if (domain && domain._disposed) + continue; + + hasQueue = !!first._asyncQueue; + try { + if (hasQueue) + loadAsyncQueue(first); if (domain) domain.enter(); - var threw = true; + threw = true; first._onTimeout(); + if (hasQueue) + unloadAsyncQueue(first); if (domain) domain.exit(); threw = false; @@ -162,7 +184,6 @@ exports.enroll = function(item, msecs) { exports.active = function(item) { var msecs = item._idleTimeout; if (msecs >= 0) { - var list = lists[msecs]; if (!list || L.isEmpty(list)) { insert(item, msecs); @@ -171,6 +192,11 @@ exports.active = function(item) { L.append(list, item); } } + // Whether or not a new TimerWrap needed to be created, this should run + // for each item. This way each "item" (i.e. timer) can properly have + // their own domain assigned. + if (asyncFlags[0] > 0) + runAsyncQueue(item); }; @@ -316,16 +342,43 @@ L.init(immediateQueue); function processImmediate() { var queue = immediateQueue; + var domain, hasQueue, immediate; immediateQueue = {}; L.init(immediateQueue); while (L.isEmpty(queue) === false) { - var immediate = L.shift(queue); - var domain = immediate.domain; - if (domain) domain.enter(); - immediate._onImmediate(); - if (domain) domain.exit(); + immediate = L.shift(queue); + hasQueue = !!immediate._asyncQueue; + domain = immediate.domain; + + if (hasQueue) + loadAsyncQueue(immediate); + if (domain) + domain.enter(); + + var threw = true; + try { + immediate._onImmediate(); + threw = false; + } finally { + if (threw) { + if (!L.isEmpty(queue)) { + // Handle any remaining on next tick, assuming we're still + // alive to do so. + while (!L.isEmpty(immediateQueue)) { + L.append(queue, L.shift(immediateQueue)); + } + immediateQueue = queue; + process.nextTick(processImmediate); + } + } + } + + if (hasQueue) + unloadAsyncQueue(immediate); + if (domain) + domain.exit(); } // Only round-trip to C++ land if we have to. Calling clearImmediate() on an @@ -357,7 +410,11 @@ exports.setImmediate = function(callback) { process._immediateCallback = processImmediate; } - if (process.domain) immediate.domain = process.domain; + // setImmediates are handled more like nextTicks. + if (asyncFlags[0] > 0) + runAsyncQueue(immediate); + if (process.domain) + immediate.domain = process.domain; L.append(immediateQueue, immediate); @@ -389,9 +446,10 @@ function unrefTimeout() { debug('unrefTimer fired'); - var first; + var diff, domain, first, hasQueue, threw; while (first = L.peek(unrefList)) { - var diff = now - first._idleStart; + diff = now - first._idleStart; + hasQueue = !!first._asyncQueue; if (diff < first._idleTimeout) { diff = first._idleTimeout - diff; @@ -403,17 +461,21 @@ function unrefTimeout() { L.remove(first); - var domain = first.domain; + domain = first.domain; if (!first._onTimeout) continue; if (domain && domain._disposed) continue; try { + if (hasQueue) + loadAsyncQueue(first); if (domain) domain.enter(); - var threw = true; + threw = true; debug('unreftimer firing timeout'); first._onTimeout(); threw = false; + if (hasQueue) + unloadAsyncQueue(first); if (domain) domain.exit(); } finally { if (threw) process.nextTick(unrefTimeout); diff --git a/node.gyp b/node.gyp index 50c1394719..4bb73980d7 100644 --- a/node.gyp +++ b/node.gyp @@ -115,6 +115,8 @@ 'src/udp_wrap.cc', 'src/uv.cc', # headers to make for a more pleasant IDE experience + 'src/async-wrap.h', + 'src/async-wrap-inl.h', 'src/env.h', 'src/env-inl.h', 'src/handle_wrap.h', diff --git a/src/async-wrap-inl.h b/src/async-wrap-inl.h new file mode 100644 index 0000000000..055e31ecf0 --- /dev/null +++ b/src/async-wrap-inl.h @@ -0,0 +1,324 @@ +// Copyright Joyent, Inc. and other Node contributors. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the +// "Software"), to deal in the Software without restriction, including +// without limitation the rights to use, copy, modify, merge, publish, +// distribute, sublicense, and/or sell copies of the Software, and to permit +// persons to whom the Software is furnished to do so, subject to the +// following conditions: +// +// The above copyright notice and this permission notice shall be included +// in all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN +// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR +// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE +// USE OR OTHER DEALINGS IN THE SOFTWARE. + +#ifndef SRC_ASYNC_WRAP_INL_H_ +#define SRC_ASYNC_WRAP_INL_H_ + +#include "async-wrap.h" +#include "env.h" +#include "env-inl.h" +#include "util.h" +#include "util-inl.h" +#include "v8.h" +#include + +namespace node { + +inline AsyncWrap::AsyncWrap(Environment* env, v8::Handle object) + : object_(env->isolate(), object), + env_(env), + async_flags_(NO_OPTIONS) { + assert(!object.IsEmpty()); + + if (!env->has_async_listeners()) + return; + + // TODO(trevnorris): Do we really need to TryCatch this call? + v8::TryCatch try_catch; + try_catch.SetVerbose(true); + + v8::Local val = object.As(); + env->async_listener_run_function()->Call(env->process_object(), 1, &val); + + if (!try_catch.HasCaught()) + async_flags_ |= ASYNC_LISTENERS; +} + + +inline AsyncWrap::~AsyncWrap() { + assert(persistent().IsEmpty()); +} + + +template +inline void AsyncWrap::AddMethods(v8::Handle t) { + NODE_SET_PROTOTYPE_METHOD(t, + "addAsyncListener", + AddAsyncListener); + NODE_SET_PROTOTYPE_METHOD(t, + "removeAsyncListener", + RemoveAsyncListener); +} + + +inline uint32_t AsyncWrap::async_flags() const { + return async_flags_; +} + + +inline void AsyncWrap::set_flag(unsigned int flag) { + async_flags_ |= flag; +} + + +inline void AsyncWrap::remove_flag(unsigned int flag) { + async_flags_ &= ~flag; +} + + +inline bool AsyncWrap::has_async_queue() { + return async_flags() & ASYNC_LISTENERS; +} + + +inline Environment* AsyncWrap::env() const { + return env_; +} + + +inline v8::Local AsyncWrap::object() { + return PersistentToLocal(env()->isolate(), persistent()); +} + + +inline v8::Persistent& AsyncWrap::persistent() { + return object_; +} + + +// I hate you domains. +inline v8::Handle AsyncWrap::MakeDomainCallback( + const v8::Handle cb, + int argc, + v8::Handle* argv) { + assert(env()->context() == env()->isolate()->GetCurrentContext()); + + v8::Local context = object(); + v8::Local process = env()->process_object(); + v8::Local domain_v = context->Get(env()->domain_string()); + v8::Local domain; + + v8::TryCatch try_catch; + try_catch.SetVerbose(true); + + if (has_async_queue()) { + v8::Local val = context.As(); + env()->async_listener_load_function()->Call(process, 1, &val); + + if (try_catch.HasCaught()) + return v8::Undefined(env()->isolate()); + } + + bool has_domain = domain_v->IsObject(); + if (has_domain) { + domain = domain_v.As(); + + if (domain->Get(env()->disposed_string())->IsTrue()) + return Undefined(env()->isolate()); + + v8::Local enter = + domain->Get(env()->enter_string()).As(); + assert(enter->IsFunction()); + enter->Call(domain, 0, NULL); + + if (try_catch.HasCaught()) + return Undefined(env()->isolate()); + } + + v8::Local ret = cb->Call(context, argc, argv); + + if (try_catch.HasCaught()) { + return Undefined(env()->isolate()); + } + + if (has_async_queue()) { + v8::Local val = context.As(); + env()->async_listener_unload_function()->Call(process, 1, &val); + } + + if (has_domain) { + v8::Local exit = + domain->Get(env()->exit_string()).As(); + assert(exit->IsFunction()); + exit->Call(domain, 0, NULL); + + if (try_catch.HasCaught()) + return Undefined(env()->isolate()); + } + + Environment::TickInfo* tick_info = env()->tick_info(); + + if (tick_info->in_tick()) { + return ret; + } + + if (tick_info->length() == 0) { + tick_info->set_index(0); + return ret; + } + + tick_info->set_in_tick(true); + + env()->tick_callback_function()->Call(process, 0, NULL); + + tick_info->set_in_tick(false); + + if (try_catch.HasCaught()) { + tick_info->set_last_threw(true); + return Undefined(env()->isolate()); + } + + return ret; +} + + +inline v8::Handle AsyncWrap::MakeCallback( + const v8::Handle cb, + int argc, + v8::Handle* argv) { + if (env()->using_domains()) + return MakeDomainCallback(cb, argc, argv); + + assert(env()->context() == env()->isolate()->GetCurrentContext()); + + v8::Local context = object(); + v8::Local process = env()->process_object(); + + v8::TryCatch try_catch; + try_catch.SetVerbose(true); + + if (has_async_queue()) { + v8::Local val = context.As(); + env()->async_listener_load_function()->Call(process, 1, &val); + + if (try_catch.HasCaught()) + return v8::Undefined(env()->isolate()); + } + + v8::Local ret = cb->Call(context, argc, argv); + + if (try_catch.HasCaught()) { + return Undefined(env()->isolate()); + } + + if (has_async_queue()) { + v8::Local val = context.As(); + env()->async_listener_unload_function()->Call(process, 1, &val); + } + + Environment::TickInfo* tick_info = env()->tick_info(); + + if (tick_info->in_tick()) { + return ret; + } + + if (tick_info->length() == 0) { + tick_info->set_index(0); + return ret; + } + + tick_info->set_in_tick(true); + + // TODO(trevnorris): Consider passing "context" to _tickCallback so it + // can then be passed as the first argument to the nextTick callback. + // That should greatly help needing to create closures. + env()->tick_callback_function()->Call(process, 0, NULL); + + tick_info->set_in_tick(false); + + if (try_catch.HasCaught()) { + tick_info->set_last_threw(true); + return Undefined(env()->isolate()); + } + + return ret; +} + + +inline v8::Handle AsyncWrap::MakeCallback( + const v8::Handle symbol, + int argc, + v8::Handle* argv) { + v8::Local cb_v = object()->Get(symbol); + v8::Local cb = cb_v.As(); + assert(cb->IsFunction()); + + return MakeCallback(cb, argc, argv); +} + + +inline v8::Handle AsyncWrap::MakeCallback( + uint32_t index, + int argc, + v8::Handle* argv) { + v8::Local cb_v = object()->Get(index); + v8::Local cb = cb_v.As(); + assert(cb->IsFunction()); + + return MakeCallback(cb, argc, argv); +} + + +template +inline void AsyncWrap::AddAsyncListener( + const v8::FunctionCallbackInfo& args) { + Environment* env = Environment::GetCurrent(args.GetIsolate()); + v8::HandleScope handle_scope(args.GetIsolate()); + + v8::Local handle = args.This(); + v8::Local listener = args[0]; + assert(listener->IsObject()); + assert(handle->InternalFieldCount() > 0); + + env->async_listener_push_function()->Call(handle, 1, &listener); + + TYPE* wrap = static_cast( + handle->GetAlignedPointerFromInternalField(0)); + assert(wrap != NULL); + wrap->set_flag(ASYNC_LISTENERS); +} + + +template +inline void AsyncWrap::RemoveAsyncListener( + const v8::FunctionCallbackInfo& args) { + Environment* env = Environment::GetCurrent(args.GetIsolate()); + v8::HandleScope handle_scope(args.GetIsolate()); + + v8::Local handle = args.This(); + v8::Local listener = args[0]; + assert(listener->IsObject()); + assert(handle->InternalFieldCount() > 0); + + v8::Local ret = + env->async_listener_strip_function()->Call(handle, 1, &listener); + + if (ret->IsFalse()) { + TYPE* wrap = static_cast( + handle->GetAlignedPointerFromInternalField(0)); + assert(wrap != NULL); + wrap->remove_flag(ASYNC_LISTENERS); + } +} + +} // namespace node + +#endif // SRC_ASYNC_WRAP_INL_H_ diff --git a/src/async-wrap.h b/src/async-wrap.h new file mode 100644 index 0000000000..4797386ca9 --- /dev/null +++ b/src/async-wrap.h @@ -0,0 +1,97 @@ +// Copyright Joyent, Inc. and other Node contributors. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the +// "Software"), to deal in the Software without restriction, including +// without limitation the rights to use, copy, modify, merge, publish, +// distribute, sublicense, and/or sell copies of the Software, and to permit +// persons to whom the Software is furnished to do so, subject to the +// following conditions: +// +// The above copyright notice and this permission notice shall be included +// in all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN +// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR +// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE +// USE OR OTHER DEALINGS IN THE SOFTWARE. + +#ifndef SRC_ASYNC_WRAP_H_ +#define SRC_ASYNC_WRAP_H_ + +#include "env.h" +#include "v8.h" + +namespace node { + +class AsyncWrap { + public: + enum AsyncFlags { + NO_OPTIONS = 0, + ASYNC_LISTENERS = 1 + }; + + inline AsyncWrap(Environment* env, v8::Handle object); + + inline ~AsyncWrap(); + + template + static inline void AddMethods(v8::Handle t); + + inline uint32_t async_flags() const; + + inline void set_flag(unsigned int flag); + + inline void remove_flag(unsigned int flag); + + inline bool has_async_queue(); + + inline Environment* env() const; + + // Returns the wrapped object. Illegal to call in your destructor. + inline v8::Local object(); + + // Parent class is responsible to Dispose. + inline v8::Persistent& persistent(); + + // Only call these within a valid HandleScope. + inline v8::Handle MakeCallback(const v8::Handle cb, + int argc, + v8::Handle* argv); + inline v8::Handle MakeCallback(const v8::Handle symbol, + int argc, + v8::Handle* argv); + inline v8::Handle MakeCallback(uint32_t index, + int argc, + v8::Handle* argv); + + private: + // TODO(trevnorris): BURN IN FIRE! Remove this as soon as a suitable + // replacement is committed. + inline v8::Handle MakeDomainCallback( + const v8::Handle cb, + int argc, + v8::Handle* argv); + + // Add an async listener to an existing handle. + template + static inline void AddAsyncListener( + const v8::FunctionCallbackInfo& args); + + // Remove an async listener to an existing handle. + template + static inline void RemoveAsyncListener( + const v8::FunctionCallbackInfo& args); + + v8::Persistent object_; + Environment* const env_; + uint32_t async_flags_; +}; + +} // namespace node + + +#endif // SRC_ASYNC_WRAP_H_ diff --git a/src/env-inl.h b/src/env-inl.h index b520e07129..b57cf99af1 100644 --- a/src/env-inl.h +++ b/src/env-inl.h @@ -69,6 +69,23 @@ inline v8::Isolate* Environment::IsolateData::isolate() const { return isolate_; } +inline Environment::AsyncListener::AsyncListener() { + for (int i = 0; i < kFieldsCount; ++i) + fields_[i] = 0; +} + +inline uint32_t* Environment::AsyncListener::fields() { + return fields_; +} + +inline int Environment::AsyncListener::fields_count() const { + return kFieldsCount; +} + +inline uint32_t Environment::AsyncListener::count() const { + return fields_[kCount]; +} + inline Environment::DomainFlag::DomainFlag() { for (int i = 0; i < kFieldsCount; ++i) fields_[i] = 0; } @@ -86,7 +103,8 @@ inline uint32_t Environment::DomainFlag::count() const { } inline Environment::TickInfo::TickInfo() : in_tick_(false), last_threw_(false) { - for (int i = 0; i < kFieldsCount; ++i) fields_[i] = 0; + for (int i = 0; i < kFieldsCount; ++i) + fields_[i] = 0; } inline uint32_t* Environment::TickInfo::fields() { @@ -187,6 +205,11 @@ inline v8::Isolate* Environment::isolate() const { return isolate_; } +inline bool Environment::has_async_listeners() const { + // The const_cast is okay, it doesn't violate conceptual const-ness. + return const_cast(this)->async_listener()->count() > 0; +} + inline bool Environment::in_domain() const { // The const_cast is okay, it doesn't violate conceptual const-ness. return using_domains() && @@ -227,6 +250,10 @@ inline uv_loop_t* Environment::event_loop() const { return isolate_data()->event_loop(); } +inline Environment::AsyncListener* Environment::async_listener() { + return &async_listener_count_; +} + inline Environment::DomainFlag* Environment::domain_flag() { return &domain_flag_; } diff --git a/src/env.h b/src/env.h index b45d250f06..37e4ae6a66 100644 --- a/src/env.h +++ b/src/env.h @@ -53,6 +53,7 @@ namespace node { #define PER_ISOLATE_STRING_PROPERTIES(V) \ V(address_string, "address") \ V(atime_string, "atime") \ + V(async_queue_string, "_asyncQueue") \ V(birthtime_string, "birthtime") \ V(blksize_string, "blksize") \ V(blocks_string, "blocks") \ @@ -131,6 +132,11 @@ namespace node { V(write_queue_size_string, "writeQueueSize") \ #define ENVIRONMENT_STRONG_PERSISTENT_PROPERTIES(V) \ + V(async_listener_load_function, v8::Function) \ + V(async_listener_push_function, v8::Function) \ + V(async_listener_run_function, v8::Function) \ + V(async_listener_strip_function, v8::Function) \ + V(async_listener_unload_function, v8::Function) \ V(binding_cache_object, v8::Object) \ V(buffer_constructor_function, v8::Function) \ V(context, v8::Context) \ @@ -163,6 +169,26 @@ RB_HEAD(ares_task_list, ares_task_t); class Environment { public: + class AsyncListener { + public: + inline uint32_t* fields(); + inline int fields_count() const; + inline uint32_t count() const; + + private: + friend class Environment; // So we can call the constructor. + inline AsyncListener(); + + enum Fields { + kCount, + kFieldsCount + }; + + uint32_t fields_[kFieldsCount]; + + DISALLOW_COPY_AND_ASSIGN(AsyncListener); + }; + class DomainFlag { public: inline uint32_t* fields(); @@ -223,6 +249,7 @@ class Environment { inline v8::Isolate* isolate() const; inline uv_loop_t* event_loop() const; + inline bool has_async_listeners() const; inline bool in_domain() const; static inline Environment* from_immediate_check_handle(uv_check_t* handle); @@ -235,6 +262,7 @@ class Environment { static inline Environment* from_idle_check_handle(uv_check_t* handle); inline uv_check_t* idle_check_handle(); + inline AsyncListener* async_listener(); inline DomainFlag* domain_flag(); inline TickInfo* tick_info(); @@ -279,6 +307,7 @@ class Environment { uv_idle_t immediate_idle_handle_; uv_prepare_t idle_prepare_handle_; uv_check_t idle_check_handle_; + AsyncListener async_listener_count_; DomainFlag domain_flag_; TickInfo tick_info_; uv_timer_t cares_timer_handle_; diff --git a/src/fs_event_wrap.cc b/src/fs_event_wrap.cc index 1fc901e2fa..076a224b39 100644 --- a/src/fs_event_wrap.cc +++ b/src/fs_event_wrap.cc @@ -19,6 +19,8 @@ // OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE // USE OR OTHER DEALINGS IN THE SOFTWARE. +#include "async-wrap.h" +#include "async-wrap-inl.h" #include "env.h" #include "env-inl.h" #include "util.h" @@ -175,11 +177,7 @@ void FSEventWrap::OnEvent(uv_fs_event_t* handle, const char* filename, argv[2] = OneByteString(node_isolate, filename); } - MakeCallback(env, - wrap->object(), - env->onchange_string(), - ARRAY_SIZE(argv), - argv); + wrap->MakeCallback(env->onchange_string(), ARRAY_SIZE(argv), argv); } diff --git a/src/handle_wrap.cc b/src/handle_wrap.cc index d930bd355b..be37d63b6d 100644 --- a/src/handle_wrap.cc +++ b/src/handle_wrap.cc @@ -20,6 +20,8 @@ // USE OR OTHER DEALINGS IN THE SOFTWARE. #include "handle_wrap.h" +#include "async-wrap.h" +#include "async-wrap-inl.h" #include "env.h" #include "env-inl.h" #include "util.h" @@ -89,12 +91,11 @@ void HandleWrap::Close(const FunctionCallbackInfo& args) { HandleWrap::HandleWrap(Environment* env, Handle object, uv_handle_t* handle) - : env_(env), + : AsyncWrap(env, object), flags_(0), handle__(handle) { handle__->data = this; HandleScope scope(node_isolate); - persistent().Reset(node_isolate, object); Wrap(object, this); QUEUE_INSERT_TAIL(&handle_wrap_queue, &handle_wrap_queue_); } @@ -121,7 +122,7 @@ void HandleWrap::OnClose(uv_handle_t* handle) { Local object = wrap->object(); if (wrap->flags_ & kCloseCallback) { - MakeCallback(env, object, env->close_string()); + wrap->MakeCallback(env->close_string(), 0, NULL); } object->SetAlignedPointerInInternalField(0, NULL); diff --git a/src/handle_wrap.h b/src/handle_wrap.h index 73a43c3a3f..47cc44f910 100644 --- a/src/handle_wrap.h +++ b/src/handle_wrap.h @@ -22,6 +22,7 @@ #ifndef SRC_HANDLE_WRAP_H_ #define SRC_HANDLE_WRAP_H_ +#include "async-wrap.h" #include "env.h" #include "node.h" #include "queue.h" @@ -50,7 +51,7 @@ namespace node { // js/c++ boundary crossing. At the javascript layer that should all be // taken care of. -class HandleWrap { +class HandleWrap : public AsyncWrap { public: static void Close(const v8::FunctionCallbackInfo& args); static void Ref(const v8::FunctionCallbackInfo& args); @@ -64,24 +65,10 @@ class HandleWrap { uv_handle_t* handle); virtual ~HandleWrap(); - inline Environment* env() const { - return env_; - } - - inline v8::Local object() { - return PersistentToLocal(env()->isolate(), persistent()); - } - - inline v8::Persistent& persistent() { - return object_; - } - private: friend void GetActiveHandles(const v8::FunctionCallbackInfo&); static void OnClose(uv_handle_t* handle); - v8::Persistent object_; QUEUE handle_wrap_queue_; - Environment* const env_; unsigned int flags_; // Using double underscore due to handle_ member in tcp_wrap. Probably // tcp_wrap should rename it's member to 'handle'. diff --git a/src/node.cc b/src/node.cc index 4e30269e0d..2e8a01394a 100644 --- a/src/node.cc +++ b/src/node.cc @@ -40,6 +40,8 @@ #endif #include "ares.h" +#include "async-wrap.h" +#include "async-wrap-inl.h" #include "env.h" #include "env-inl.h" #include "handle_wrap.h" @@ -841,6 +843,36 @@ Local WinapiErrnoException(int errorno, #endif +void SetupAsyncListener(const FunctionCallbackInfo& args) { + Environment* env = Environment::GetCurrent(args.GetIsolate()); + HandleScope handle_scope(args.GetIsolate()); + + assert(args[0]->IsObject() && + args[1]->IsFunction() && + args[2]->IsFunction() && + args[3]->IsFunction() && + args[4]->IsFunction() && + args[5]->IsFunction()); + + env->set_async_listener_run_function(args[1].As()); + env->set_async_listener_load_function(args[2].As()); + env->set_async_listener_unload_function(args[3].As()); + env->set_async_listener_push_function(args[4].As()); + env->set_async_listener_strip_function(args[5].As()); + + Local async_listener_flag_obj = args[0].As(); + Environment::AsyncListener* async_listener = env->async_listener(); + async_listener_flag_obj->SetIndexedPropertiesToExternalArrayData( + async_listener->fields(), + kExternalUnsignedIntArray, + async_listener->fields_count()); + + // Do a little housekeeping. + env->process_object()->Delete( + FIXED_ONE_BYTE_STRING(args.GetIsolate(), "_setupAsyncListener")); +} + + void SetupDomainUse(const FunctionCallbackInfo& args) { Environment* env = Environment::GetCurrent(args.GetIsolate()); @@ -882,25 +914,60 @@ void SetupDomainUse(const FunctionCallbackInfo& args) { domain_flag->fields(), kExternalUnsignedIntArray, domain_flag->fields_count()); + + // Do a little housekeeping. + env->process_object()->Delete( + FIXED_ONE_BYTE_STRING(args.GetIsolate(), "_setupDomainUse")); +} + + +void SetupNextTick(const FunctionCallbackInfo& args) { + Environment* env = Environment::GetCurrent(args.GetIsolate()); + HandleScope handle_scope(args.GetIsolate()); + + if (!args[0]->IsObject() || !args[1]->IsFunction()) + abort(); + + // Values use to cross communicate with processNextTick. + Local tick_info_obj = args[0].As(); + tick_info_obj->SetIndexedPropertiesToExternalArrayData( + env->tick_info()->fields(), + kExternalUnsignedIntArray, + env->tick_info()->fields_count()); + + env->set_tick_callback_function(args[1].As()); + + // Do a little housekeeping. + env->process_object()->Delete( + FIXED_ONE_BYTE_STRING(args.GetIsolate(), "_setupNextTick")); } Handle MakeDomainCallback(Environment* env, - const Handle object, + Handle object, const Handle callback, int argc, Handle argv[]) { // If you hit this assertion, you forgot to enter the v8::Context first. assert(env->context() == env->isolate()->GetCurrentContext()); - // TODO(trevnorris) Hook for long stack traces to be made here. - + Local process = env->process_object(); Local domain_v = object->Get(env->domain_string()); Local domain; TryCatch try_catch; try_catch.SetVerbose(true); + // TODO(trevnorris): This is sucky for performance. Fix it. + bool has_async_queue = object->Has(env->async_queue_string()); + if (has_async_queue) { + Local argv[] = { object }; + env->async_listener_load_function()->Call(process, ARRAY_SIZE(argv), argv); + + if (try_catch.HasCaught()) + return Undefined(node_isolate); + } + bool has_domain = domain_v->IsObject(); if (has_domain) { domain = domain_v.As(); @@ -926,6 +993,14 @@ Handle MakeDomainCallback(Environment* env, return Undefined(node_isolate); } + if (has_async_queue) { + Local val = object.As(); + env->async_listener_unload_function()->Call(process, 1, &val); + + if (try_catch.HasCaught()) + return Undefined(node_isolate); + } + if (has_domain) { Local exit = domain->Get(env->exit_string()).As(); @@ -955,10 +1030,7 @@ Handle MakeDomainCallback(Environment* env, tick_info->set_in_tick(true); - // process nextTicks after call - Local process_object = env->process_object(); - Local tick_callback_function = env->tick_callback_function(); - tick_callback_function->Call(process_object, 0, NULL); + env->tick_callback_function()->Call(process, 0, NULL); tick_info->set_in_tick(false); @@ -972,31 +1044,48 @@ Handle MakeDomainCallback(Environment* env, Handle MakeCallback(Environment* env, - const Handle object, + Handle object, const Handle callback, int argc, Handle argv[]) { + if (env->using_domains()) + return MakeDomainCallback(env, object, callback, argc, argv); + // If you hit this assertion, you forgot to enter the v8::Context first. assert(env->context() == env->isolate()->GetCurrentContext()); - // TODO(trevnorris) Hook for long stack traces to be made here. - Local process_object = env->process_object(); - - if (env->using_domains()) - return MakeDomainCallback(env, object, callback, argc, argv); + Local process = env->process_object(); TryCatch try_catch; try_catch.SetVerbose(true); + // TODO(trevnorris): This is sucky for performance. Fix it. + bool has_async_queue = object->Has(env->async_queue_string()); + if (has_async_queue) { + Local argv[] = { object }; + env->async_listener_load_function()->Call(process, ARRAY_SIZE(argv), argv); + + if (try_catch.HasCaught()) + return Undefined(node_isolate); + } + Local ret = callback->Call(object, argc, argv); if (try_catch.HasCaught()) { return Undefined(node_isolate); } + if (has_async_queue) { + Local val = object.As(); + env->async_listener_unload_function()->Call(process, 1, &val); + + if (try_catch.HasCaught()) + return Undefined(node_isolate); + } + Environment::TickInfo* tick_info = env->tick_info(); - if (tick_info->in_tick() == 1) { + if (tick_info->in_tick()) { return ret; } @@ -1007,22 +1096,8 @@ Handle MakeCallback(Environment* env, tick_info->set_in_tick(true); - // lazy load no domain next tick callbacks - Local tick_callback_function = env->tick_callback_function(); - if (tick_callback_function.IsEmpty()) { - Local tick_callback_function_key = - FIXED_ONE_BYTE_STRING(node_isolate, "_tickCallback"); - tick_callback_function = - process_object->Get(tick_callback_function_key).As(); - if (!tick_callback_function->IsFunction()) { - fprintf(stderr, "process._tickCallback assigned to non-function\n"); - abort(); - } - env->set_tick_callback_function(tick_callback_function); - } - // process nextTicks after call - tick_callback_function->Call(process_object, 0, NULL); + env->tick_callback_function()->Call(process, 0, NULL); tick_info->set_in_tick(false); @@ -1041,16 +1116,9 @@ Handle MakeCallback(Environment* env, uint32_t index, int argc, Handle argv[]) { - // If you hit this assertion, you forgot to enter the v8::Context first. - assert(env->context() == env->isolate()->GetCurrentContext()); - Local callback = object->Get(index).As(); assert(callback->IsFunction()); - if (env->using_domains()) { - return MakeDomainCallback(env, object, callback, argc, argv); - } - return MakeCallback(env, object, callback, argc, argv); } @@ -1060,16 +1128,8 @@ Handle MakeCallback(Environment* env, const Handle symbol, int argc, Handle argv[]) { - // If you hit this assertion, you forgot to enter the v8::Context first. - assert(env->context() == env->isolate()->GetCurrentContext()); - Local callback = object->Get(symbol).As(); assert(callback->IsFunction()); - - if (env->using_domains()) { - return MakeDomainCallback(env, object, callback, argc, argv); - } - return MakeCallback(env, object, callback, argc, argv); } @@ -1079,8 +1139,6 @@ Handle MakeCallback(Environment* env, const char* method, int argc, Handle argv[]) { - // If you hit this assertion, you forgot to enter the v8::Context first. - assert(env->context() == env->isolate()->GetCurrentContext()); Local method_string = OneByteString(node_isolate, method); return MakeCallback(env, object, method_string, argc, argv); } @@ -2570,6 +2628,8 @@ void SetupProcessObject(Environment* env, NODE_SET_METHOD(process, "binding", Binding); + NODE_SET_METHOD(process, "_setupAsyncListener", SetupAsyncListener); + NODE_SET_METHOD(process, "_setupNextTick", SetupNextTick); NODE_SET_METHOD(process, "_setupDomainUse", SetupDomainUse); // values use to cross communicate with processNextTick diff --git a/src/node.js b/src/node.js index 8692a2c95e..d878c062fa 100644 --- a/src/node.js +++ b/src/node.js @@ -26,6 +26,7 @@ // of the startup process, so many dependencies are invoked lazily. (function(process) { this.global = this; + var _errorHandler; function startup() { var EventEmitter = NativeModule.require('events').EventEmitter; @@ -46,6 +47,7 @@ startup.globalTimeouts(); startup.globalConsole(); + startup.processAsyncListener(); startup.processAssert(); startup.processConfig(); startup.processNextTick(); @@ -219,15 +221,16 @@ startup.processFatal = function() { process._fatalException = function(er) { - var caught = false; + // First run through error handlers from asyncListener. + var caught = _errorHandler(er); - if (process.domain && process.domain._errorHandler) { - caught = process.domain._errorHandler(er); - } else { + if (process.domain && process.domain._errorHandler) + caught = process.domain._errorHandler(er) || caught; + + if (!caught) caught = process.emit('uncaughtException', er); - } - // if someone handled it, then great. otherwise, die in C++ land + // If someone handled it, then great. otherwise, die in C++ land // since that means that we'll exit the process, emit the 'exit' event if (!caught) { try { @@ -241,13 +244,288 @@ // if we handled an error, then make sure any ticks get processed } else { - setImmediate(process._tickCallback); + var t = setImmediate(process._tickCallback); + // Complete hack to make sure any errors thrown from async + // listeners don't cause an infinite loop. + if (t._asyncQueue) + t._asyncQueue = []; } return caught; }; }; + startup.processAsyncListener = function() { + var asyncStack = []; + var asyncQueue = []; + var uid = 0; + + // Stateful flags shared with Environment for quick JS/C++ + // communication. + var asyncFlags = {}; + + // Prevent accidentally suppressed thrown errors from before/after. + var inAsyncTick = false; + + // To prevent infinite recursion when an error handler also throws + // flag when an error is currenly being handled. + var inErrorTick = false; + + // Needs to be the same as src/env.h + var kCount = 0; + + // _errorHandler is scoped so it's also accessible by _fatalException. + _errorHandler = errorHandler; + + // Needs to be accessible from lib/timers.js so they know when async + // listeners are currently in queue. They'll be cleaned up once + // references there are made. + process._asyncFlags = asyncFlags; + process._runAsyncQueue = runAsyncQueue; + process._loadAsyncQueue = loadAsyncQueue; + process._unloadAsyncQueue = unloadAsyncQueue; + + // Public API. + process.createAsyncListener = createAsyncListener; + process.addAsyncListener = addAsyncListener; + process.removeAsyncListener = removeAsyncListener; + + // Setup shared objects/callbacks with native layer. + process._setupAsyncListener(asyncFlags, + runAsyncQueue, + loadAsyncQueue, + unloadAsyncQueue, + pushListener, + stripListener); + + function popQueue() { + if (asyncStack.length > 0) + asyncQueue = asyncStack.pop(); + else + asyncQueue = []; + } + + // Run all the async listeners attached when an asynchronous event is + // instantiated. + function runAsyncQueue(context) { + var queue = []; + var queueItem, item, i, value; + + inAsyncTick = true; + for (i = 0; i < asyncQueue.length; i++) { + queueItem = asyncQueue[i]; + // Not passing "this" context because it hasn't actually been + // instantiated yet, so accessing some of the object properties + // can cause a segfault. + // Passing the original value will allow users to manipulate the + // original value object, while also allowing them to return a + // new value for current async call tracking. + value = queueItem.listener(queueItem.value); + if (typeof value !== 'undefined') { + item = { + callbacks: queueItem.callbacks, + value: value, + listener: queueItem.listener, + uid: queueItem.uid + }; + } else { + item = queueItem; + } + queue[i] = item; + } + inAsyncTick = false; + + context._asyncQueue = queue; + } + + // Uses the _asyncQueue object attached by runAsyncQueue. + function loadAsyncQueue(context) { + var queue = context._asyncQueue; + var item, before, i; + + asyncStack.push(asyncQueue); + asyncQueue = queue; + // Since the async listener callback is required, the number of + // objects in the asyncQueue implies the number of async listeners + // there are to be processed. + asyncFlags[kCount] = queue.length; + + // Run "before" callbacks. + inAsyncTick = true; + for (i = 0; i < queue.length; i++) { + item = queue[i]; + if (!item.callbacks) + continue; + before = item.callbacks.before; + if (typeof before === 'function') + before(context, item.value); + } + inAsyncTick = false; + } + + // Unload one level of the async stack. Returns true if there are + // still listeners somewhere in the stack. + function unloadAsyncQueue(context) { + var item, after, i; + + // Run "after" callbacks. + inAsyncTick = true; + for (i = 0; i < asyncQueue.length; i++) { + item = asyncQueue[i]; + if (!item.callbacks) + continue; + after = item.callbacks.after; + if (typeof after === 'function') + after(context, item.value); + } + inAsyncTick = false; + + // Unload the current queue from the stack. + popQueue(); + + asyncFlags[kCount] = asyncQueue.length; + + return asyncQueue.length > 0 || asyncStack.length > 0; + } + + // Create new async listener object. Useful when instantiating a new + // object and want the listener instance, but not add it to the stack. + function createAsyncListener(listener, callbacks, value) { + return { + callbacks: callbacks, + value: value, + listener: listener, + uid: uid++ + }; + } + + // Add a listener to the current queue. + function addAsyncListener(listener, callbacks, value) { + // Accept new listeners or previous created listeners. + if (typeof listener === 'function') + callbacks = createAsyncListener(listener, callbacks, value); + else + callbacks = listener; + + var inQueue = false; + // The asyncQueue will be small. Probably always <= 3 items. + for (var i = 0; i < asyncQueue.length; i++) { + if (callbacks.uid === asyncQueue[i].uid) { + inQueue = true; + break; + } + } + + // Make sure the callback doesn't already exist in the queue. + if (!inQueue) + asyncQueue.push(callbacks); + + asyncFlags[kCount] = asyncQueue.length; + return callbacks; + } + + // Remove listener from the current queue and the entire stack. + function removeAsyncListener(obj) { + var i, j; + + for (i = 0; i < asyncQueue.length; i++) { + if (obj.uid === asyncQueue[i].uid) { + asyncQueue.splice(i, 1); + break; + } + } + + for (i = 0; i < asyncStack.length; i++) { + for (j = 0; j < asyncStack[i].length; j++) { + if (obj.uid === asyncStack[i][j].uid) { + asyncStack[i].splice(j, 1); + break; + } + } + } + + asyncFlags[kCount] = asyncQueue.length; + } + + // Error handler used by _fatalException to run through all error + // callbacks in the current asyncQueue. + function errorHandler(er) { + var handled = false; + var error, item, i; + + if (inErrorTick) + return false; + + inErrorTick = true; + for (i = 0; i < asyncQueue.length; i++) { + item = asyncQueue[i]; + if (!item.callbacks) + continue; + error = item.callbacks.error; + if (typeof error === 'function') { + try { + var threw = true; + handled = error(item.value, er) || handled; + threw = false; + } finally { + // If the error callback throws then we're going to die + // quickly with no chance of recovery. Only thing we're going + // to allow is execution of process exit event callbacks. + if (threw) { + process._exiting = true; + process.emit('exit', 1); + } + } + } + } + inErrorTick = false; + + // Unload the current queue from the stack. + popQueue(); + + return handled && !inAsyncTick; + } + + // Used by AsyncWrap::AddAsyncListener() to add an individual listener + // to the async queue. It will check the uid of the listener and only + // allow it to be added once. + function pushListener(obj) { + if (!this._asyncQueue) + this._asyncQueue = []; + + var queue = this._asyncQueue; + var inQueue = false; + // The asyncQueue will be small. Probably always <= 3 items. + for (var i = 0; i < queue.length; i++) { + if (obj.uid === queue.uid) { + inQueue = true; + break; + } + } + + if (!inQueue) + queue.push(obj); + } + + // Used by AsyncWrap::RemoveAsyncListener() to remove an individual + // listener from the async queue, and return whether there are still + // listeners in the queue. + function stripListener(obj) { + if (!this._asyncQueue || this._asyncQueue.length === 0) + return false; + + // The asyncQueue will be small. Probably always <= 3 items. + for (var i = 0; i < this._asyncQueue.length; i++) { + if (obj.uid === this._asyncQueue[i].uid) { + this._asyncQueue.splice(i, 1); + break; + } + } + + return this._asyncQueue.length > 0; + } + }; + var assert; startup.processAssert = function() { assert = process.assert = function(x, msg) { @@ -272,20 +550,30 @@ startup.processNextTick = function() { var nextTickQueue = []; + var asyncFlags = process._asyncFlags; + var _runAsyncQueue = process._runAsyncQueue; + var _loadAsyncQueue = process._loadAsyncQueue; + var _unloadAsyncQueue = process._unloadAsyncQueue; // This tickInfo thing is used so that the C++ code in src/node.cc // can have easy accesss to our nextTick state, and avoid unnecessary - var tickInfo = process._tickInfo; + var tickInfo = {}; // *Must* match Environment::TickInfo::Fields in src/env.h. var kIndex = 0; var kLength = 1; + // For asyncFlags. + // *Must* match Environment::AsyncListeners::Fields in src/env.h + var kCount = 0; + process.nextTick = nextTick; - // needs to be accessible from cc land + // Needs to be accessible from beyond this scope. process._tickCallback = _tickCallback; process._tickDomainCallback = _tickDomainCallback; + process._setupNextTick(tickInfo, _tickCallback); + function tickDone() { if (tickInfo[kLength] !== 0) { if (tickInfo[kLength] <= tickInfo[kIndex]) { @@ -299,43 +587,54 @@ tickInfo[kIndex] = 0; } - // run callbacks that have no domain - // using domains will cause this to be overridden + // Run callbacks that have no domain. + // Using domains will cause this to be overridden. function _tickCallback() { - var callback, threw; + var callback, hasQueue, threw, tock; while (tickInfo[kIndex] < tickInfo[kLength]) { - callback = nextTickQueue[tickInfo[kIndex]++].callback; + tock = nextTickQueue[tickInfo[kIndex]++]; + callback = tock.callback; threw = true; + hasQueue = !!tock._asyncQueue; + if (hasQueue) + _loadAsyncQueue(tock); try { callback(); threw = false; } finally { - if (threw) tickDone(); + if (threw) + tickDone(); } + if (hasQueue) + _unloadAsyncQueue(tock); } tickDone(); } function _tickDomainCallback() { - var tock, callback, threw, domain; + var callback, domain, hasQueue, threw, tock; while (tickInfo[kIndex] < tickInfo[kLength]) { tock = nextTickQueue[tickInfo[kIndex]++]; callback = tock.callback; domain = tock.domain; - if (domain) { - if (domain._disposed) continue; + hasQueue = !!tock._asyncQueue; + if (hasQueue) + _loadAsyncQueue(tock); + if (domain) domain.enter(); - } threw = true; try { callback(); threw = false; } finally { - if (threw) tickDone(); + if (threw) + tickDone(); } + if (hasQueue) + _unloadAsyncQueue(tock); if (domain) domain.exit(); } @@ -348,10 +647,16 @@ if (process._exiting) return; - nextTickQueue.push({ + var obj = { callback: callback, - domain: process.domain || null - }); + domain: process.domain || null, + _asyncQueue: undefined + }; + + if (asyncFlags[kCount] > 0) + _runAsyncQueue(obj); + + nextTickQueue.push(obj); tickInfo[kLength]++; } }; diff --git a/src/node_file.cc b/src/node_file.cc index cb37124c1a..5650e4cc2d 100644 --- a/src/node_file.cc +++ b/src/node_file.cc @@ -67,7 +67,7 @@ using v8::Value; class FSReqWrap: public ReqWrap { public: FSReqWrap(Environment* env, const char* syscall, char* data = NULL) - : ReqWrap(env), + : ReqWrap(env, Object::New()), syscall_(syscall), data_(data) { } @@ -214,7 +214,7 @@ static void After(uv_fs_t *req) { } } - MakeCallback(env, req_wrap->object(), env->oncomplete_string(), argc, argv); + req_wrap->MakeCallback(env->oncomplete_string(), argc, argv); uv_fs_req_cleanup(&req_wrap->req_); delete req_wrap; diff --git a/src/node_internals.h b/src/node_internals.h index a484779243..84c523b295 100644 --- a/src/node_internals.h +++ b/src/node_internals.h @@ -49,7 +49,7 @@ inline v8::Local PersistentToLocal( // Call with valid HandleScope and while inside Context scope. v8::Handle MakeCallback(Environment* env, - const v8::Handle object, + v8::Handle object, const char* method, int argc = 0, v8::Handle* argv = NULL); diff --git a/src/pipe_wrap.cc b/src/pipe_wrap.cc index 08ab7190bd..a4e4ed1237 100644 --- a/src/pipe_wrap.cc +++ b/src/pipe_wrap.cc @@ -192,11 +192,7 @@ void PipeWrap::OnConnection(uv_stream_t* handle, int status) { }; if (status != 0) { - MakeCallback(env, - pipe_wrap->object(), - env->onconnection_string(), - ARRAY_SIZE(argv), - argv); + pipe_wrap->MakeCallback(env->onconnection_string(), ARRAY_SIZE(argv), argv); return; } @@ -212,11 +208,7 @@ void PipeWrap::OnConnection(uv_stream_t* handle, int status) { // Successful accept. Call the onconnection callback in JavaScript land. argv[1] = client_obj; - MakeCallback(env, - pipe_wrap->object(), - env->onconnection_string(), - ARRAY_SIZE(argv), - argv); + pipe_wrap->MakeCallback(env->onconnection_string(), ARRAY_SIZE(argv), argv); } // TODO(bnoordhuis) Maybe share this with TCPWrap? @@ -251,11 +243,7 @@ void PipeWrap::AfterConnect(uv_connect_t* req, int status) { Boolean::New(writable) }; - MakeCallback(env, - req_wrap_obj, - env->oncomplete_string(), - ARRAY_SIZE(argv), - argv); + req_wrap->MakeCallback(env->oncomplete_string(), ARRAY_SIZE(argv), argv); delete req_wrap; } diff --git a/src/process_wrap.cc b/src/process_wrap.cc index ba09745eaa..7b8125ceba 100644 --- a/src/process_wrap.cc +++ b/src/process_wrap.cc @@ -284,11 +284,7 @@ class ProcessWrap : public HandleWrap { OneByteString(node_isolate, signo_string(term_signal)) }; - MakeCallback(env, - wrap->object(), - env->onexit_string(), - ARRAY_SIZE(argv), - argv); + wrap->MakeCallback(env->onexit_string(), ARRAY_SIZE(argv), argv); } uv_process_t process_; diff --git a/src/req_wrap.h b/src/req_wrap.h index 8a701e5158..da3abd8759 100644 --- a/src/req_wrap.h +++ b/src/req_wrap.h @@ -22,6 +22,8 @@ #ifndef SRC_REQ_WRAP_H_ #define SRC_REQ_WRAP_H_ +#include "async-wrap.h" +#include "async-wrap-inl.h" #include "env.h" #include "env-inl.h" #include "queue.h" @@ -33,21 +35,14 @@ namespace node { extern QUEUE req_wrap_queue; template -class ReqWrap { +class ReqWrap : public AsyncWrap { public: - ReqWrap(Environment* env, - v8::Handle object = v8::Handle()) - : env_(env) { - v8::HandleScope handle_scope(env->isolate()); + ReqWrap(Environment* env, v8::Handle object) + : AsyncWrap(env, object) { + assert(!object.IsEmpty()); - if (object.IsEmpty()) { - object = v8::Object::New(); - } - persistent().Reset(env->isolate(), object); - - if (env->in_domain()) { + if (env->in_domain()) object->Set(env->domain_string(), env->domain_array()->Get(0)); - } QUEUE_INSERT_TAIL(&req_wrap_queue, &req_wrap_queue_); } @@ -66,25 +61,9 @@ class ReqWrap { req_.data = this; } - inline Environment* env() const { - return env_; - } - - inline v8::Local object() { - return PersistentToLocal(env()->isolate(), persistent()); - } - - inline v8::Persistent& persistent() { - return object_; - } - // TODO(bnoordhuis) Make these private. QUEUE req_wrap_queue_; T req_; // *must* be last, GetActiveRequests() in node.cc depends on it - - private: - v8::Persistent object_; - Environment* const env_; }; diff --git a/src/signal_wrap.cc b/src/signal_wrap.cc index 3a1e5bf863..b0f16196fa 100644 --- a/src/signal_wrap.cc +++ b/src/signal_wrap.cc @@ -19,6 +19,8 @@ // OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE // USE OR OTHER DEALINGS IN THE SOFTWARE. +#include "async-wrap.h" +#include "async-wrap-inl.h" #include "env.h" #include "env-inl.h" #include "handle_wrap.h" @@ -100,8 +102,9 @@ class SignalWrap : public HandleWrap { Environment* env = wrap->env(); Context::Scope context_scope(env->context()); HandleScope handle_scope(env->isolate()); + Local arg = Integer::New(signum, env->isolate()); - MakeCallback(env, wrap->object(), env->onsignal_string(), 1, &arg); + wrap->MakeCallback(env->onsignal_string(), 1, &arg); } uv_signal_t handle_; diff --git a/src/stream_wrap.cc b/src/stream_wrap.cc index f98fcded0e..76e667bd3f 100644 --- a/src/stream_wrap.cc +++ b/src/stream_wrap.cc @@ -451,11 +451,7 @@ void StreamWrap::AfterWrite(uv_write_t* req, int status) { req_wrap_obj }; - MakeCallback(env, - req_wrap_obj, - env->oncomplete_string(), - ARRAY_SIZE(argv), - argv); + req_wrap->MakeCallback(env->oncomplete_string(), ARRAY_SIZE(argv), argv); req_wrap->~WriteWrap(); delete[] reinterpret_cast(req_wrap); @@ -499,11 +495,7 @@ void StreamWrap::AfterShutdown(uv_shutdown_t* req, int status) { req_wrap_obj }; - MakeCallback(env, - req_wrap_obj, - env->oncomplete_string(), - ARRAY_SIZE(argv), - argv); + req_wrap->MakeCallback(env->oncomplete_string(), ARRAY_SIZE(argv), argv); delete req_wrap; } @@ -574,7 +566,7 @@ void StreamWrapCallbacks::DoRead(uv_stream_t* handle, if (nread < 0) { if (buf->base != NULL) free(buf->base); - MakeCallback(env, Self(), env->onread_string(), ARRAY_SIZE(argv), argv); + wrap()->MakeCallback(env->onread_string(), ARRAY_SIZE(argv), argv); return; } @@ -603,11 +595,7 @@ void StreamWrapCallbacks::DoRead(uv_stream_t* handle, argv[2] = pending_obj; } - MakeCallback(env, - wrap()->object(), - env->onread_string(), - ARRAY_SIZE(argv), - argv); + wrap()->MakeCallback(env->onread_string(), ARRAY_SIZE(argv), argv); } diff --git a/src/tcp_wrap.cc b/src/tcp_wrap.cc index 840dd92077..73058dbd4f 100644 --- a/src/tcp_wrap.cc +++ b/src/tcp_wrap.cc @@ -321,11 +321,7 @@ void TCPWrap::OnConnection(uv_stream_t* handle, int status) { argv[1] = client_obj; } - MakeCallback(env, - tcp_wrap->object(), - env->onconnection_string(), - ARRAY_SIZE(argv), - argv); + tcp_wrap->MakeCallback(env->onconnection_string(), ARRAY_SIZE(argv), argv); } @@ -350,11 +346,8 @@ void TCPWrap::AfterConnect(uv_connect_t* req, int status) { v8::True(node_isolate), v8::True(node_isolate) }; - MakeCallback(env, - req_wrap_obj, - env->oncomplete_string(), - ARRAY_SIZE(argv), - argv); + + req_wrap->MakeCallback(env->oncomplete_string(), ARRAY_SIZE(argv), argv); delete req_wrap; } diff --git a/src/timer_wrap.cc b/src/timer_wrap.cc index d6e32bdedd..393def3d59 100644 --- a/src/timer_wrap.cc +++ b/src/timer_wrap.cc @@ -19,6 +19,8 @@ // OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE // USE OR OTHER DEALINGS IN THE SOFTWARE. +#include "async-wrap.h" +#include "async-wrap-inl.h" #include "env.h" #include "env-inl.h" #include "handle_wrap.h" @@ -138,7 +140,7 @@ class TimerWrap : public HandleWrap { Context::Scope context_scope(env->context()); HandleScope handle_scope(env->isolate()); Local argv[1] = { Integer::New(status, node_isolate) }; - MakeCallback(env, wrap->object(), kOnTimeout, ARRAY_SIZE(argv), argv); + wrap->MakeCallback(kOnTimeout, ARRAY_SIZE(argv), argv); } static void Now(const FunctionCallbackInfo& args) { diff --git a/src/udp_wrap.cc b/src/udp_wrap.cc index 3b5044968f..2af278d6e5 100644 --- a/src/udp_wrap.cc +++ b/src/udp_wrap.cc @@ -363,9 +363,8 @@ void UDPWrap::OnSend(uv_udp_send_t* req, int status) { Environment* env = req_wrap->env(); Context::Scope context_scope(env->context()); HandleScope handle_scope(env->isolate()); - Local req_wrap_obj = req_wrap->object(); Local arg = Integer::New(status, node_isolate); - MakeCallback(env, req_wrap_obj, env->oncomplete_string(), 1, &arg); + req_wrap->MakeCallback(env->oncomplete_string(), 1, &arg); } delete req_wrap; } @@ -405,25 +404,21 @@ void UDPWrap::OnRecv(uv_udp_t* handle, Local argv[] = { Integer::New(nread, node_isolate), wrap_obj, - Undefined(), - Undefined() + Undefined(env->isolate()), + Undefined(env->isolate()) }; if (nread < 0) { if (buf->base != NULL) free(buf->base); - MakeCallback(env, - wrap_obj, - env->onmessage_string(), - ARRAY_SIZE(argv), - argv); + wrap->MakeCallback(env->onmessage_string(), ARRAY_SIZE(argv), argv); return; } char* base = static_cast(realloc(buf->base, nread)); argv[2] = Buffer::Use(env, base, nread); argv[3] = AddressToJS(env, addr); - MakeCallback(env, wrap_obj, env->onmessage_string(), ARRAY_SIZE(argv), argv); + wrap->MakeCallback(env->onmessage_string(), ARRAY_SIZE(argv), argv); } diff --git a/test/simple/test-asynclistener-error.js b/test/simple/test-asynclistener-error.js new file mode 100644 index 0000000000..c1525e46bb --- /dev/null +++ b/test/simple/test-asynclistener-error.js @@ -0,0 +1,258 @@ +// Copyright Joyent, Inc. and other Node contributors. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the +// "Software"), to deal in the Software without restriction, including +// without limitation the rights to use, copy, modify, merge, publish, +// distribute, sublicense, and/or sell copies of the Software, and to permit +// persons to whom the Software is furnished to do so, subject to the +// following conditions: +// +// The above copyright notice and this permission notice shall be included +// in all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN +// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR +// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE +// USE OR OTHER DEALINGS IN THE SOFTWARE. + +var common = require('../common'); +var assert = require('assert'); +var dns = require('dns'); +var fs = require('fs'); +var net = require('net'); + +var addListener = process.addAsyncListener; +var removeListener = process.removeAsyncListener; +var errorMsgs = []; +var currentMsg = ''; +var caught = 0; +var expectCaught = 0; +var exitCbRan = false; + +function asyncL() { } + +var callbacksObj = { + error: function(value, er) { + var idx = errorMsgs.indexOf(er.message); + + caught++; + + if (-1 < idx) + errorMsgs.splice(idx, 1); + + return currentMsg === er.message; + } +}; + +var listener = process.createAsyncListener(asyncL, callbacksObj); + +process.on('exit', function(code) { + removeListener(listener); + + // Something else went wrong, no need to further check. + if (code > 0) + return; + + // Make sure the exit callback only runs once. + assert.ok(!exitCbRan); + exitCbRan = true; + + // Check if any error messages weren't removed from the msg queue. + if (errorMsgs.length > 0) + throw new Error('Errors not fired: ' + errorMsgs); + + assert.equal(caught, expectCaught, 'caught all expected errors'); + process._rawDebug('ok'); +}); + + +// Catch synchronous throws +errorMsgs.push('sync throw'); +process.nextTick(function() { + addListener(listener); + + expectCaught++; + currentMsg = 'sync throw'; + throw new Error(currentMsg); + + removeListener(listener); +}); + + +// Simple cases +errorMsgs.push('setTimeout - simple'); +errorMsgs.push('setImmediate - simple'); +errorMsgs.push('setInterval - simple'); +errorMsgs.push('process.nextTick - simple'); +process.nextTick(function() { + addListener(listener); + + setTimeout(function() { + currentMsg = 'setTimeout - simple'; + throw new Error(currentMsg); + }); + expectCaught++; + + setImmediate(function() { + currentMsg = 'setImmediate - simple'; + throw new Error(currentMsg); + }); + expectCaught++; + + var b = setInterval(function() { + clearInterval(b); + currentMsg = 'setInterval - simple'; + throw new Error(currentMsg); + }); + expectCaught++; + + process.nextTick(function() { + currentMsg = 'process.nextTick - simple'; + throw new Error(currentMsg); + }); + expectCaught++; + + removeListener(listener); +}); + + +// Deeply nested +errorMsgs.push('setInterval - nested'); +errorMsgs.push('setImmediate - nested'); +errorMsgs.push('process.nextTick - nested'); +errorMsgs.push('setTimeout2 - nested'); +errorMsgs.push('setTimeout - nested'); +process.nextTick(function() { + addListener(listener); + + setTimeout(function() { + process.nextTick(function() { + setImmediate(function() { + var b = setInterval(function() { + clearInterval(b); + currentMsg = 'setInterval - nested'; + throw new Error(currentMsg); + }); + expectCaught++; + currentMsg = 'setImmediate - nested'; + throw new Error(currentMsg); + }); + expectCaught++; + currentMsg = 'process.nextTick - nested'; + throw new Error(currentMsg); + }); + expectCaught++; + setTimeout(function() { + currentMsg = 'setTimeout2 - nested'; + throw new Error(currentMsg); + }); + expectCaught++; + currentMsg = 'setTimeout - nested'; + throw new Error(currentMsg); + }); + expectCaught++; + + removeListener(listener); +}); + + +// FS +errorMsgs.push('fs - file does not exist'); +errorMsgs.push('fs - exists'); +errorMsgs.push('fs - realpath'); +process.nextTick(function() { + addListener(listener); + + fs.stat('does not exist', function(err, stats) { + currentMsg = 'fs - file does not exist'; + throw new Error(currentMsg); + }); + expectCaught++; + + fs.exists('hi all', function(exists) { + currentMsg = 'fs - exists'; + throw new Error(currentMsg); + }); + expectCaught++; + + fs.realpath('/some/path', function(err, resolved) { + currentMsg = 'fs - realpath'; + throw new Error(currentMsg); + }); + expectCaught++; + + removeListener(listener); +}); + + +// Nested FS +errorMsgs.push('fs - nested file does not exist'); +process.nextTick(function() { + addListener(listener); + + setTimeout(function() { + setImmediate(function() { + var b = setInterval(function() { + clearInterval(b); + process.nextTick(function() { + fs.stat('does not exist', function(err, stats) { + currentMsg = 'fs - nested file does not exist'; + throw new Error(currentMsg); + }); + expectCaught++; + }); + }); + }); + }); + + removeListener(listener); +}); + + +// Net +errorMsgs.push('net - connection listener'); +errorMsgs.push('net - client connect'); +errorMsgs.push('net - server listening'); +process.nextTick(function() { + addListener(listener); + + var server = net.createServer(function(c) { + server.close(); + currentMsg = 'net - connection listener'; + throw new Error(currentMsg); + }); + expectCaught++; + + server.listen(common.PORT, function() { + var client = net.connect(common.PORT, function() { + client.end(); + currentMsg = 'net - client connect'; + throw new Error(currentMsg); + }); + expectCaught++; + currentMsg = 'net - server listening'; + throw new Error(currentMsg); + }); + expectCaught++; + + removeListener(listener); +}); + + +// DNS +errorMsgs.push('dns - lookup'); +process.nextTick(function() { + addListener(listener); + + dns.lookup('localhost', function() { + currentMsg = 'dns - lookup'; + throw new Error(currentMsg); + }); + expectCaught++; + + removeListener(listener); +}); diff --git a/test/simple/test-asynclistener-multi-timeout.js b/test/simple/test-asynclistener-multi-timeout.js new file mode 100644 index 0000000000..30ec6dd9ae --- /dev/null +++ b/test/simple/test-asynclistener-multi-timeout.js @@ -0,0 +1,71 @@ +// Copyright Joyent, Inc. and other Node contributors. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the +// "Software"), to deal in the Software without restriction, including +// without limitation the rights to use, copy, modify, merge, publish, +// distribute, sublicense, and/or sell copies of the Software, and to permit +// persons to whom the Software is furnished to do so, subject to the +// following conditions: +// +// The above copyright notice and this permission notice shall be included +// in all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN +// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR +// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE +// USE OR OTHER DEALINGS IN THE SOFTWARE. + +var common = require('../common'); +var assert = require('assert'); + +var addListener = process.addAsyncListener; +var removeListener = process.removeAsyncListener; +var caught = []; +var expect = []; + +function asyncL(a) {} + +var callbacksObj = { + error: function(value, er) { + process._rawDebug('caught', er.message); + caught.push(er.message); + return (expect.indexOf(er.message) !== -1); + } +}; + +var listener = process.createAsyncListener(asyncL, callbacksObj); + +process.on('exit', function(code) { + removeListener(listener); + + if (code > 0) + return; + + expect = expect.sort(); + caught = caught.sort(); + + process._rawDebug('expect', expect); + process._rawDebug('caught', caught); + assert.deepEqual(caught, expect, 'caught all expected errors'); + process._rawDebug('ok'); +}); + + +expect.push('immediate simple a'); +expect.push('immediate simple b'); +process.nextTick(function() { + addListener(listener); + // Tests for a setImmediate specific bug encountered while implementing + // AsyncListeners. + setImmediate(function() { + throw new Error('immediate simple a'); + }); + setImmediate(function() { + throw new Error('immediate simple b'); + }); + removeListener(listener); +}); diff --git a/test/simple/test-asynclistener-throw-before-infinite-recursion.js b/test/simple/test-asynclistener-throw-before-infinite-recursion.js new file mode 100644 index 0000000000..fcf0fd69e8 --- /dev/null +++ b/test/simple/test-asynclistener-throw-before-infinite-recursion.js @@ -0,0 +1,47 @@ +// Copyright Joyent, Inc. and other Node contributors. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the +// "Software"), to deal in the Software without restriction, including +// without limitation the rights to use, copy, modify, merge, publish, +// distribute, sublicense, and/or sell copies of the Software, and to permit +// persons to whom the Software is furnished to do so, subject to the +// following conditions: +// +// The above copyright notice and this permission notice shall be included +// in all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN +// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR +// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE +// USE OR OTHER DEALINGS IN THE SOFTWARE. + +var common = require('../common'); +var assert = require('assert'); + +// If there is an uncaughtException listener then the error thrown from +// "before" will be considered handled, thus calling setImmediate to +// finish execution of the nextTickQueue. This in turn will cause "before" +// to fire again, entering into an infinite loop. +// So the asyncQueue is cleared from the returned setImmediate in +// _fatalException to prevent this from happening. +var cntr = 0; + + +process.addAsyncListener(function() { }, { + before: function() { + if (++cntr > 1) { + // Can't throw since uncaughtException will also catch that. + process._rawDebug('Error: Multiple before callbacks called'); + process.exit(1); + } + throw new Error('before'); + } +}); + +process.on('uncaughtException', function() { }); + +process.nextTick(); diff --git a/test/simple/test-asynclistener.js b/test/simple/test-asynclistener.js new file mode 100644 index 0000000000..c5110de9ee --- /dev/null +++ b/test/simple/test-asynclistener.js @@ -0,0 +1,185 @@ +// Copyright Joyent, Inc. and other Node contributors. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the +// "Software"), to deal in the Software without restriction, including +// without limitation the rights to use, copy, modify, merge, publish, +// distribute, sublicense, and/or sell copies of the Software, and to permit +// persons to whom the Software is furnished to do so, subject to the +// following conditions: +// +// The above copyright notice and this permission notice shall be included +// in all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN +// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR +// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE +// USE OR OTHER DEALINGS IN THE SOFTWARE. + +var common = require('../common'); +var assert = require('assert'); +var net = require('net'); +var fs = require('fs'); +var dgram = require('dgram'); + +var addListener = process.addAsyncListener; +var removeListener = process.removeAsyncListener; +var actualAsync = 0; +var expectAsync = 0; + +function onAsync() { + actualAsync++; +} + +var listener = process.createAsyncListener(onAsync); + +process.on('exit', function() { + process._rawDebug('expected', expectAsync); + process._rawDebug('actual ', actualAsync); + // TODO(trevnorris): Not a great test. If one was missed, but others + // overflowed then the test would still pass. + assert.ok(actualAsync >= expectAsync); +}); + + +// Test listeners side-by-side +process.nextTick(function() { + addListener(listener); + + var b = setInterval(function() { + clearInterval(b); + }); + expectAsync++; + + var c = setInterval(function() { + clearInterval(c); + }); + expectAsync++; + + setTimeout(function() { }); + expectAsync++; + + setTimeout(function() { }); + expectAsync++; + + process.nextTick(function() { }); + expectAsync++; + + process.nextTick(function() { }); + expectAsync++; + + setImmediate(function() { }); + expectAsync++; + + setImmediate(function() { }); + expectAsync++; + + setTimeout(function() { }, 10); + expectAsync++; + + setTimeout(function() { }, 10); + expectAsync++; + + removeListener(listener); +}); + + +// Async listeners should propagate with nested callbacks +process.nextTick(function() { + addListener(listener); + var interval = 3; + + process.nextTick(function() { + setTimeout(function() { + setImmediate(function() { + var i = setInterval(function() { + if (--interval <= 0) + clearInterval(i); + }); + expectAsync++; + }); + expectAsync++; + process.nextTick(function() { + setImmediate(function() { + setTimeout(function() { }, 20); + expectAsync++; + }); + expectAsync++; + }); + expectAsync++; + }); + expectAsync++; + }); + expectAsync++; + + removeListener(listener); +}); + + +// Test triggers with two async listeners +process.nextTick(function() { + addListener(listener); + addListener(listener); + + setTimeout(function() { + process.nextTick(function() { }); + expectAsync += 2; + }); + expectAsync += 2; + + removeListener(listener); + removeListener(listener); +}); + + +// Test callbacks from fs I/O +process.nextTick(function() { + addListener(listener); + + fs.stat('something random', function(err, stat) { }); + expectAsync++; + + setImmediate(function() { + fs.stat('random again', function(err, stat) { }); + expectAsync++; + }); + expectAsync++; + + removeListener(listener); +}); + + +// Test net I/O +process.nextTick(function() { + addListener(listener); + + var server = net.createServer(function(c) { }); + expectAsync++; + + server.listen(common.PORT, function() { + server.close(); + expectAsync++; + }); + expectAsync++; + + removeListener(listener); +}); + + +// Test UDP +process.nextTick(function() { + addListener(listener); + + var server = dgram.createSocket('udp4'); + expectAsync++; + + server.bind(common.PORT); + + server.close(); + expectAsync++; + + removeListener(listener); +}); diff --git a/test/simple/test-domain-http-server.js b/test/simple/test-domain-http-server.js index 666f5d190a..57e8ac4d14 100644 --- a/test/simple/test-domain-http-server.js +++ b/test/simple/test-domain-http-server.js @@ -45,11 +45,10 @@ var server = http.createServer(function(req, res) { res.end(er.stack || er.message || 'Unknown error'); }); - var data; dom.run(function() { // Now, an action that has the potential to fail! // if you request 'baz', then it'll throw a JSON circular ref error. - data = JSON.stringify(objects[req.url.replace(/[^a-z]/g, '')]); + var data = JSON.stringify(objects[req.url.replace(/[^a-z]/g, '')]); // this line will throw if you pick an unknown key assert(data !== undefined, 'Data should not be undefined'); @@ -64,8 +63,6 @@ server.listen(common.PORT, next); function next() { console.log('listening on localhost:%d', common.PORT); - // now hit it a few times - var dom = domain.create(); var requests = 0; var responses = 0;