diff --git a/src/node_isolate.cc b/src/node_isolate.cc index f6fab56805..47e5b66e3c 100644 --- a/src/node_isolate.cc +++ b/src/node_isolate.cc @@ -87,17 +87,23 @@ public: uv_mutex_unlock(&mutex_); } - T Consume() { + bool Consume(T& item) { + ngx_queue_t* q = NULL; + uv_mutex_lock(&mutex_); - ngx_queue_t* q = ngx_queue_head(&queue_); - ngx_queue_remove(q); + if (!ngx_queue_empty(&queue_)) { + q = ngx_queue_head(&queue_); + ngx_queue_remove(q); + } uv_mutex_unlock(&mutex_); + if (q == NULL) return false; + Message* m = ngx_queue_data(q, Message, queue_); - T item = m->item_; + item = m->item_; delete m; - return item; + return true; } private: @@ -140,8 +146,8 @@ private: } void OnMessage() { - T item = queue_.Consume(); - callback_(item, arg_); + T item; + while (queue_.Consume(item)) callback_(item, arg_); } void* arg_; diff --git a/test/simple/test-isolates-ping-pong.js b/test/simple/test-isolates-ping-pong.js index 4fc2e0453e..4340ff6e25 100644 --- a/test/simple/test-isolates-ping-pong.js +++ b/test/simple/test-isolates-ping-pong.js @@ -1,6 +1,11 @@ var isolates = process.binding('isolates'); +var assert = require('assert'); -var N = 4; // # of child isolates +var N_ISOLATES = 4; +var N_MESSAGES = 20; +var N_MESSAGES_PER_TICK = 4; + +assert(N_MESSAGES % N_MESSAGES_PER_TICK == 0); if (process.tid === 1) master(); @@ -8,32 +13,56 @@ else child(); function master() { - for (var i = 0; i < N; ++i) spawn(); + for (var i = 0; i < N_ISOLATES; ++i) spawn(); function spawn() { var isolate = isolates.create(process.argv); + + var gotExit = false; // exit event emitted? + var msgId = 0; // message IDs seen so far + var tick = 0; + isolate.onexit = function() { - console.error("onexit isolate #%d", isolate.tid); + gotExit = true; }; - isolate.onmessage = function(m) { - console.error("parent received message '%s'", m); - isolate.send(Buffer('ACK ' + m)); + + isolate.onmessage = function(buf) { + var msg = JSON.parse(buf); + assert.equal(msg.id, msgId + 1); // verify that messages arrive in order + assert.equal(msg.tick, tick); // and on the proper tick (=full mq drain) + msgId = msg.id; + if (msgId % N_MESSAGES_PER_TICK == 0) tick++; + isolate.send(buf); }; + + process.on('exit', function() { + assert.equal(gotExit, true); + assert.equal(msgId, N_MESSAGES); + assert.equal(tick, N_MESSAGES / N_MESSAGES_PER_TICK); + }); } } function child() { - var n = 0; + var msgId = 0; + var tick = 0; function send() { - if (++n > 10) return; - process._send(Buffer('SYN' + n)); - setTimeout(send, 10); + // Send multiple messages, verify that the message queue + // is completely drained on each tick of the event loop. + for (var i = 0; i < N_MESSAGES_PER_TICK; ++i) { + process.send({tick:tick, id:++msgId}); + } + + if (msgId < N_MESSAGES) { + setTimeout(send, 10); + } + + tick++; } send(); process._onmessage = function(m) { - console.error("child %d received message '%s'", process.tid, m); }; }