Browse Source

isolates: drain message queue completely

v0.7.4-release
Ben Noordhuis 13 years ago
parent
commit
97e4b3a7bf
  1. 18
      src/node_isolate.cc
  2. 49
      test/simple/test-isolates-ping-pong.js

18
src/node_isolate.cc

@ -87,17 +87,23 @@ public:
uv_mutex_unlock(&mutex_); uv_mutex_unlock(&mutex_);
} }
T Consume() { bool Consume(T& item) {
ngx_queue_t* q = NULL;
uv_mutex_lock(&mutex_); uv_mutex_lock(&mutex_);
ngx_queue_t* q = ngx_queue_head(&queue_); if (!ngx_queue_empty(&queue_)) {
q = ngx_queue_head(&queue_);
ngx_queue_remove(q); ngx_queue_remove(q);
}
uv_mutex_unlock(&mutex_); uv_mutex_unlock(&mutex_);
if (q == NULL) return false;
Message* m = ngx_queue_data(q, Message, queue_); Message* m = ngx_queue_data(q, Message, queue_);
T item = m->item_; item = m->item_;
delete m; delete m;
return item; return true;
} }
private: private:
@ -140,8 +146,8 @@ private:
} }
void OnMessage() { void OnMessage() {
T item = queue_.Consume(); T item;
callback_(item, arg_); while (queue_.Consume(item)) callback_(item, arg_);
} }
void* arg_; void* arg_;

49
test/simple/test-isolates-ping-pong.js

@ -1,6 +1,11 @@
var isolates = process.binding('isolates'); var isolates = process.binding('isolates');
var assert = require('assert');
var N = 4; // # of child isolates var N_ISOLATES = 4;
var N_MESSAGES = 20;
var N_MESSAGES_PER_TICK = 4;
assert(N_MESSAGES % N_MESSAGES_PER_TICK == 0);
if (process.tid === 1) if (process.tid === 1)
master(); master();
@ -8,32 +13,56 @@ else
child(); child();
function master() { function master() {
for (var i = 0; i < N; ++i) spawn(); for (var i = 0; i < N_ISOLATES; ++i) spawn();
function spawn() { function spawn() {
var isolate = isolates.create(process.argv); var isolate = isolates.create(process.argv);
var gotExit = false; // exit event emitted?
var msgId = 0; // message IDs seen so far
var tick = 0;
isolate.onexit = function() { isolate.onexit = function() {
console.error("onexit isolate #%d", isolate.tid); gotExit = true;
}; };
isolate.onmessage = function(m) {
console.error("parent received message '%s'", m); isolate.onmessage = function(buf) {
isolate.send(Buffer('ACK ' + m)); var msg = JSON.parse(buf);
assert.equal(msg.id, msgId + 1); // verify that messages arrive in order
assert.equal(msg.tick, tick); // and on the proper tick (=full mq drain)
msgId = msg.id;
if (msgId % N_MESSAGES_PER_TICK == 0) tick++;
isolate.send(buf);
}; };
process.on('exit', function() {
assert.equal(gotExit, true);
assert.equal(msgId, N_MESSAGES);
assert.equal(tick, N_MESSAGES / N_MESSAGES_PER_TICK);
});
} }
} }
function child() { function child() {
var n = 0; var msgId = 0;
var tick = 0;
function send() { function send() {
if (++n > 10) return; // Send multiple messages, verify that the message queue
process._send(Buffer('SYN' + n)); // is completely drained on each tick of the event loop.
for (var i = 0; i < N_MESSAGES_PER_TICK; ++i) {
process.send({tick:tick, id:++msgId});
}
if (msgId < N_MESSAGES) {
setTimeout(send, 10); setTimeout(send, 10);
} }
tick++;
}
send(); send();
process._onmessage = function(m) { process._onmessage = function(m) {
console.error("child %d received message '%s'", process.tid, m);
}; };
} }

Loading…
Cancel
Save