Browse Source

isolates: implement message passing

Parent and child isolates can now pass arbitrary binary messages between each
other. The messages are sent and received through a thread-safe queue that
wakes up the event loop of the receiving thread.
v0.7.4-release
Ben Noordhuis 13 years ago
parent
commit
dadc30318f
  1. 6
      src/node.cc
  2. 58
      src/node_internals.h
  3. 359
      src/node_isolate.cc
  4. 24
      src/node_isolate.h
  5. 39
      test/simple/test-isolates-ping-pong.js
  6. 9
      test/simple/test-isolates.js

6
src/node.cc

@ -2664,6 +2664,12 @@ void StartThread(node::Isolate* isolate,
process_l->Set(String::NewSymbol("tid"),
Integer::NewFromUnsigned(isolate->id_));
// TODO check (isolate->channel_ != NULL)
if (isolate->id_ > 1) {
process_l->Set(String::NewSymbol("_send"),
FunctionTemplate::New(Isolate::Send)->GetFunction());
}
// FIXME crashes with "CHECK(heap->isolate() == Isolate::Current()) failed"
//v8_typed_array::AttachBindings(v8::Context::GetCurrent()->Global());

58
src/node_internals.h

@ -47,59 +47,13 @@ void StartThread(Isolate* isolate, int argc, char** argv);
#define ARRAY_SIZE(a) (sizeof((a)) / sizeof((a)[0]))
#endif
//
// isolates support
//
#if HAVE_ISOLATES
# if _WIN32
# define THREAD __declspec(thread)
# else
# define THREAD __thread
# endif
# define TLS(type, name) THREAD type* __tls_##name
# define VAR(name) (*__tls_##name)
# define EMPTY(name) (__tls_##name == NULL)
# define ASSIGN(name, val) ((__tls_##name) = P(val))
# define LAZY_ASSIGN(name, val) \
do if (!__tls_##name) ((__tls_##name) = P(val)); while (0)
template <class T> inline v8::Persistent<T>* P(v8::Handle<T> v)
{
return new v8::Persistent<T>(v8::Persistent<T>::New(v));
}
inline v8::Persistent<v8::String>* P(const char* symbol)
{
return new v8::Persistent<v8::String>(
v8::Persistent<v8::String>::New(
v8::String::NewSymbol(symbol)));
}
#else // !HAVE_ISOLATES
# define THREAD /* nothing */
# define TLS(type, name) type name
# define VAR(name) (name)
# define EMPTY(name) ((name).IsEmpty())
# define ASSIGN(name, val) ((name) = P(val))
# define LAZY_ASSIGN(name, val) \
do if ((name).IsEmpty()) (name) = P(val); while (0)
template <class T> inline v8::Persistent<T> P(v8::Handle<T> v)
{
return v8::Persistent<T>(v);
}
#define DISALLOW_COPY_AND_ASSIGN(TypeName) \
TypeName(const TypeName&); \
void operator=(const TypeName&)
inline v8::Persistent<v8::String> P(const char* symbol)
{
return v8::Persistent<v8::String>::New(
v8::String::NewSymbol(symbol));
}
#endif // HAVE_ISOLATES
#define DISALLOW_IMPLICIT_CONSTRUCTORS(TypeName) \
TypeName(); \
DISALLOW_COPY_AND_ASSIGN(TypeName)
} // namespace node

359
src/node_isolate.cc

@ -19,10 +19,12 @@
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
// USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <v8.h>
#include <node.h>
#include <node_buffer.h>
#include <node_isolate.h>
#include <node_internals.h>
#include <v8.h>
#include <node_object_wrap.h>
#include <stdlib.h>
#include <string.h>
@ -34,6 +36,7 @@ namespace node {
using v8::Arguments;
using v8::Array;
using v8::False;
using v8::FunctionTemplate;
using v8::Handle;
using v8::HandleScope;
using v8::Integer;
@ -41,12 +44,11 @@ using v8::Local;
using v8::Null;
using v8::Object;
using v8::ObjectTemplate;
using v8::Persistent;
using v8::String;
using v8::True;
using v8::Value;
static char magic_isolate_cookie_[] = "magic isolate cookie";
using v8::Undefined;
static volatile bool initialized;
static volatile int id;
@ -55,6 +57,165 @@ static uv_mutex_t list_lock;
static ngx_queue_t list_head;
#ifdef NDEBUG
# define IF_DEBUG(expr)
#else
# define IF_DEBUG(expr) expr;
#endif
template <class T>
class Queue {
public:
Queue() {
if (uv_mutex_init(&mutex_)) abort();
ngx_queue_init(&queue_);
}
~Queue() {
IF_DEBUG({
uv_mutex_lock(&mutex_);
assert(ngx_queue_empty(&queue_));
uv_mutex_unlock(&mutex_);
})
uv_mutex_destroy(&mutex_);
}
void Produce(T item) {
Message* m = new Message;
m->item_ = item;
uv_mutex_lock(&mutex_);
ngx_queue_insert_tail(&queue_, &m->queue_);
uv_mutex_unlock(&mutex_);
}
T Consume() {
uv_mutex_lock(&mutex_);
ngx_queue_t* q = ngx_queue_head(&queue_);
ngx_queue_remove(q);
uv_mutex_unlock(&mutex_);
Message* m = ngx_queue_data(q, Message, queue_);
T item = m->item_;
delete m;
return item;
}
private:
struct Message {
ngx_queue_t queue_;
T item_;
};
ngx_queue_t queue_;
uv_mutex_t mutex_;
};
template <class T>
class Channel {
public:
typedef void (*Callback)(T item, void* arg);
Channel(uv_loop_t* loop, Callback callback, void* arg) {
callback_ = callback;
arg_ = arg;
uv_async_init(loop, &async_, OnMessage);
uv_unref(loop);
}
~Channel() {
uv_ref(async_.loop);
uv_close(reinterpret_cast<uv_handle_t*>(&async_), NULL);
}
void Send(T item) {
queue_.Produce(item);
uv_async_send(&async_);
}
private:
static void OnMessage(uv_async_t* handle, int status) {
Channel* c = container_of(handle, Channel, async_);
c->OnMessage();
}
void OnMessage() {
T item = queue_.Consume();
callback_(item, arg_);
}
void* arg_;
Callback callback_;
uv_async_t async_;
Queue<T> queue_;
};
struct IsolateMessage {
size_t size_;
char* data_;
IsolateMessage(const char* data, size_t size) {
// make a copy for now
size_ = size;
data_ = new char[size];
memcpy(data_, data, size);
}
~IsolateMessage() {
delete[] data_;
}
static void Free(char* data, void* arg) {
IsolateMessage* msg = static_cast<IsolateMessage*>(arg);
assert(data == msg->data_);
delete msg;
}
};
class IsolateChannel: public Channel<IsolateMessage*> {
public:
IsolateChannel(uv_loop_t* loop, Callback callback, void* arg)
: Channel<IsolateMessage*>(loop, callback, arg)
{
}
};
Handle<Value> Isolate::Send(const Arguments& args) {
HandleScope scope;
Isolate* isolate = Isolate::GetCurrent();
assert(Buffer::HasInstance(args[0]));
assert(isolate->send_channel_ != NULL);
Local<Object> obj = args[0]->ToObject();
const char* data = Buffer::Data(obj);
size_t size = Buffer::Length(obj);
IsolateMessage* msg = new IsolateMessage(data, size);
isolate->send_channel_->Send(msg);
return Undefined();
}
void Isolate::OnMessage(IsolateMessage* msg, void* arg) {
HandleScope scope;
Isolate* self = static_cast<Isolate*>(arg);
assert(uv_thread_self() == self->tid_);
NODE_ISOLATE_CHECK(self);
Buffer* buf = Buffer::New(msg->data_, msg->size_, IsolateMessage::Free, msg);
Handle<Value> argv[] = { buf->handle_ };
MakeCallback(self->globals_.process, "_onmessage", ARRAY_SIZE(argv), argv);
}
void Isolate::Initialize() {
if (!initialized) {
initialized = true;
@ -93,6 +254,9 @@ void Isolate::JoinAll() {
Isolate::Isolate() {
send_channel_ = NULL; // set (and deleted) by the parent isolate
recv_channel_ = NULL;
uv_mutex_lock(&list_lock);
assert(initialized && "node::Isolate::Initialize() hasn't been called");
@ -123,6 +287,13 @@ Isolate::Isolate() {
}
Isolate::~Isolate() {
if (!argv_) return;
for (size_t i = 0; argv_[i]; ++i) delete[] argv_[i];
delete[] argv_;
}
struct globals* Isolate::Globals() {
return &globals_;
}
@ -131,12 +302,12 @@ struct globals* Isolate::Globals() {
void Isolate::AtExit(AtExitCallback callback, void* arg) {
struct AtExitCallbackInfo* it = new AtExitCallbackInfo;
NODE_ISOLATE_CHECK(this);
//NODE_ISOLATE_CHECK(this);
it->callback_ = callback;
it->arg_ = arg;
ngx_queue_insert_head(&at_exit_callbacks_, &it->at_exit_callbacks_);
ngx_queue_insert_head(&at_exit_callbacks_, &it->queue_);
}
@ -157,14 +328,27 @@ void Isolate::Enter() {
}
void Isolate::Exit() {
NODE_ISOLATE_CHECK(this);
v8_context_->Exit();
v8_isolate_->Exit();
}
void Isolate::Dispose() {
uv_mutex_lock(&list_lock);
NODE_ISOLATE_CHECK(this);
struct AtExitCallbackInfo* it;
ngx_queue_t* q;
while (!ngx_queue_empty(&at_exit_callbacks_)) {
ngx_queue_t* q = ngx_queue_head(&at_exit_callbacks_);
ngx_queue_remove(q);
AtExitCallbackInfo* it = ngx_queue_data(q, AtExitCallbackInfo, queue_);
it->callback_(it->arg_);
delete it;
}
assert(v8_context_->InContext());
v8_context_->Exit();
@ -184,18 +368,117 @@ void Isolate::Dispose() {
}
struct IsolateWrap: public ObjectWrap {
public:
IsolateWrap(Isolate* parent_isolate) {
parent_isolate_ = parent_isolate;
uv_loop_t* parent_loop = parent_isolate->GetLoop();
recv_channel_ = new IsolateChannel(
parent_loop, IsolateWrap::OnMessage, this);
isolate_ = new Isolate;
send_channel_ = new IsolateChannel(
isolate_->loop_, Isolate::OnMessage, isolate_);
isolate_->send_channel_ = recv_channel_;
isolate_->recv_channel_ = send_channel_;
// TODO this could be folded into the regular channel
uv_async_init(parent_loop, &child_exit_, AfterChildExit);
isolate_->AtExit(AtChildExit, this);
HandleScope scope;
Local<ObjectTemplate> tpl = ObjectTemplate::New();
tpl->SetInternalFieldCount(1);
Local<Object> obj = tpl->NewInstance();
Wrap(obj);
Ref(); // unref'd when the child isolate exits
obj->Set(String::NewSymbol("tid"),
Integer::New(isolate_->id_));
obj->Set(String::NewSymbol("send"),
FunctionTemplate::New(Send)->GetFunction());
}
~IsolateWrap() {
delete isolate_;
delete recv_channel_;
delete send_channel_;
}
Isolate* GetIsolate() {
return isolate_;
}
private:
// runs in the child thread
static void AtChildExit(void* arg) {
IsolateWrap* self = static_cast<IsolateWrap*>(arg);
uv_async_send(&self->child_exit_);
}
// runs in the parent thread
static void AfterChildExit(uv_async_t* handle, int status) {
IsolateWrap* self = container_of(handle, IsolateWrap, child_exit_);
self->OnExit();
}
void OnExit() {
if (uv_thread_join(&isolate_->tid_)) abort();
uv_close(reinterpret_cast<uv_handle_t*>(&child_exit_), NULL);
MakeCallback(handle_, "onexit", 0, NULL);
Unref(); // child is dead, it's safe to GC the JS object now
}
static void OnMessage(IsolateMessage* msg, void* arg) {
IsolateWrap* self = static_cast<IsolateWrap*>(arg);
self->OnMessage(msg);
}
void OnMessage(IsolateMessage* msg) {
assert(uv_thread_self() != isolate_->tid_);
NODE_ISOLATE_CHECK(parent_isolate_);
HandleScope scope;
Buffer* buf = Buffer::New(
msg->data_, msg->size_, IsolateMessage::Free, msg);
Handle<Value> argv[] = { buf->handle_ };
MakeCallback(handle_, "onmessage", ARRAY_SIZE(argv), argv);
}
// TODO merge with Isolate::Send(), it's almost identical
static Handle<Value> Send(const Arguments& args) {
HandleScope scope;
IsolateWrap* self = Unwrap<IsolateWrap>(args.This());
assert(Buffer::HasInstance(args[0]));
Local<Object> obj = args[0]->ToObject();
const char* data = Buffer::Data(obj);
size_t size = Buffer::Length(obj);
IsolateMessage* msg = new IsolateMessage(data, size);
self->send_channel_->Send(msg);
return Undefined();
}
DISALLOW_IMPLICIT_CONSTRUCTORS(IsolateWrap);
Isolate* isolate_;
Isolate* parent_isolate_;
IsolateChannel* send_channel_;
IsolateChannel* recv_channel_;
uv_async_t child_exit_; // side effect: keeps the parent's event loop alive
// until the child exits
};
static void RunIsolate(void* arg) {
node::Isolate* isolate = reinterpret_cast<node::Isolate*>(arg);
Isolate* isolate = static_cast<Isolate*>(arg);
isolate->Enter();
// TODO in the future when v0.6 is dead, move StartThread and related
// handles into node_isolate.cc. It is currently organized like this to
// minimize diff (and thus merge conflicts) between the legacy v0.6
// branch.
StartThread(isolate, isolate->argc_, isolate->argv_);
isolate->Dispose();
delete isolate;
}
@ -207,9 +490,9 @@ static Handle<Value> CreateIsolate(const Arguments& args) {
Local<Array> argv = args[0].As<Array>();
assert(argv->Length() >= 2);
// Note that isolate lock is aquired in the constructor here. It will not
// be unlocked until RunIsolate starts and calls isolate->Enter().
Isolate* isolate = new node::Isolate();
Isolate* current_isolate = node::Isolate::GetCurrent();
IsolateWrap* wrap = new IsolateWrap(current_isolate);
Isolate* isolate = wrap->GetIsolate();
// Copy over arguments into isolate
isolate->argc_ = argv->Length();
@ -222,20 +505,10 @@ static Handle<Value> CreateIsolate(const Arguments& args) {
}
isolate->argv_[isolate->argc_] = NULL;
if (uv_thread_create(&isolate->tid_, RunIsolate, isolate)) {
delete isolate;
return Null();
}
// TODO instead of ObjectTemplate - have a special wrapper.
Local<ObjectTemplate> tpl = ObjectTemplate::New();
tpl->SetInternalFieldCount(2);
Local<Object> obj = tpl->NewInstance();
obj->SetPointerInInternalField(0, magic_isolate_cookie_);
obj->SetPointerInInternalField(1, isolate);
return scope.Close(obj);
if (uv_thread_create(&isolate->tid_, RunIsolate, isolate))
return Null(); // wrap is collected by the GC
else
return wrap->handle_;
}
@ -245,30 +518,10 @@ static Handle<Value> CountIsolate(const Arguments& args) {
}
static Handle<Value> JoinIsolate(const Arguments& args) {
HandleScope scope;
assert(args[0]->IsObject());
Local<Object> obj = args[0]->ToObject();
assert(obj->InternalFieldCount() == 2);
assert(obj->GetPointerFromInternalField(0) == magic_isolate_cookie_);
Isolate* ti = reinterpret_cast<Isolate*>(
obj->GetPointerFromInternalField(1));
if (uv_thread_join(&ti->tid_))
return False(); // error
else
return True(); // ok
}
void InitIsolates(Handle<Object> target) {
HandleScope scope;
NODE_SET_METHOD(target, "create", CreateIsolate);
NODE_SET_METHOD(target, "count", CountIsolate);
NODE_SET_METHOD(target, "join", JoinIsolate);
}

24
src/node_isolate.h

@ -42,6 +42,10 @@
namespace node {
class IsolateWrap;
class IsolateChannel;
class IsolateMessage;
class Isolate {
public:
char** argv_;
@ -55,6 +59,7 @@ public:
typedef void (*AtExitCallback)(void* arg);
static void JoinAll();
static v8::Handle<v8::Value> Send(const v8::Arguments& args);
static Isolate* GetCurrent() {
return reinterpret_cast<Isolate*>(v8::Isolate::GetCurrent()->GetData());
@ -86,29 +91,34 @@ public:
// This constructor is used for every non-main thread
Isolate();
~Isolate() {
if (argv_) {
delete argv_;
}
}
~Isolate();
void Enter();
void Exit();
/* Shutdown the isolate. Call this method at thread death. */
void Dispose();
private:
friend class IsolateWrap;
struct AtExitCallbackInfo {
ngx_queue_t at_exit_callbacks_;
AtExitCallback callback_;
ngx_queue_t queue_;
void* arg_;
};
static void OnMessage(IsolateMessage*, void*);
// Forbid implicit constructors and copy constructors
void operator=(const Isolate&) {}
Isolate(const Isolate&) {}
ngx_queue_t at_exit_callbacks_;
v8::Persistent<v8::Context> v8_context_;
v8::Isolate* v8_isolate_;
IsolateChannel* send_channel_;
IsolateChannel* recv_channel_;
uv_loop_t* loop_;
// Each isolate is a member of the static list_head.

39
test/simple/test-isolates-ping-pong.js

@ -0,0 +1,39 @@
var isolates = process.binding('isolates');
var N = 4; // # of child isolates
if (process.tid === 1)
master();
else
child();
function master() {
for (var i = 0; i < N; ++i) spawn();
function spawn() {
var isolate = isolates.create(process.argv);
isolate.onexit = function() {
console.error("onexit isolate #%d", isolate.tid);
};
isolate.onmessage = function(m) {
console.error("parent received message '%s'", m);
isolate.send(Buffer('ACK ' + m));
};
}
}
function child() {
var n = 0;
function send() {
if (++n > 10) return;
process._send(Buffer('SYN' + n));
setTimeout(send, 10);
}
send();
process._onmessage = function(m) {
console.error("child %d received message '%s'", process.tid, m);
};
}

9
test/simple/test-isolates.js

@ -6,7 +6,14 @@ console.log("count: %d", isolates.count());
if (process.tid === 1) {
var isolate = isolates.create(process.argv);
//process._joinIsolate(isolate);
isolate.onmessage = function() {
console.error("onmessage");
};
isolate.onexit = function() {
console.error("onexit");
};
console.error("master");
fs.stat(__dirname, function(err, stat) {
if (err) throw err;

Loading…
Cancel
Save