diff --git a/src/node_v8_platform.cc b/src/node_v8_platform.cc index aed4661597..ad46e0cf6c 100644 --- a/src/node_v8_platform.cc +++ b/src/node_v8_platform.cc @@ -95,30 +95,19 @@ void Platform::WorkerBody(void* arg) { } -TaskQueue::TaskQueue() { - int err; - - for (size_t i = 0; i < ARRAY_SIZE(ring_); i += 1) - ring_[i] = nullptr; - - read_off_ = 0; - write_off_ = 0; - - err = uv_sem_init(&sem_, 0); - CHECK_EQ(err, 0); - - err = uv_cond_init(&cond_); - CHECK_EQ(err, 0); - - err = uv_mutex_init(&mutex_); - CHECK_EQ(err, 0); +TaskQueue::TaskQueue() : read_off_(0), write_off_(0) { + CHECK_EQ(0, uv_cond_init(&read_cond_)); + CHECK_EQ(0, uv_cond_init(&write_cond_)); + CHECK_EQ(0, uv_mutex_init(&mutex_)); } TaskQueue::~TaskQueue() { + uv_mutex_lock(&mutex_); CHECK_EQ(read_off_, write_off_); - uv_sem_destroy(&sem_); - uv_cond_destroy(&cond_); + uv_mutex_unlock(&mutex_); + uv_cond_destroy(&read_cond_); + uv_cond_destroy(&write_cond_); uv_mutex_destroy(&mutex_); } @@ -126,33 +115,53 @@ TaskQueue::~TaskQueue() { void TaskQueue::Push(Task* task) { uv_mutex_lock(&mutex_); - // Wait for empty cell - while (ring_[write_off_] != nullptr) - uv_cond_wait(&cond_, &mutex_); + while (can_write() == false) + uv_cond_wait(&write_cond_, &mutex_); // Wait until there is a free slot. ring_[write_off_] = task; - write_off_++; - write_off_ &= kRingMask; + write_off_ = next(write_off_); + uv_cond_signal(&read_cond_); uv_mutex_unlock(&mutex_); - - uv_sem_post(&sem_); } Task* TaskQueue::Shift() { - uv_sem_wait(&sem_); - uv_mutex_lock(&mutex_); - Task* task = ring_[read_off_]; - ring_[read_off_] = nullptr; - uv_cond_signal(&cond_); - read_off_++; - read_off_ &= kRingMask; + while (can_read() == false) + uv_cond_wait(&read_cond_, &mutex_); + + Task* task = ring_[read_off_]; + if (can_write() == false) + uv_cond_signal(&write_cond_); // Signal waiters that we freed up a slot. + read_off_ = next(read_off_); uv_mutex_unlock(&mutex_); return task; } +unsigned int TaskQueue::next(unsigned int n) { + return (n + 1) % ARRAY_SIZE(TaskQueue::ring_); +} + + +bool TaskQueue::can_read() const { + return read_off_ != write_off_; +} + + +// The read pointer chases the write pointer in the circular queue. +// This method checks that the write pointer hasn't advanced so much +// that it has gone full circle and caught up with the read pointer. +// +// can_write() returns false when there is an empty slot but the read pointer +// points to the first element and the write pointer to the last element. +// That should be rare enough that it is not worth the extra bookkeeping +// to work around that. It's not harmful either, just mildly inefficient. +bool TaskQueue::can_write() const { + return next(write_off_) != read_off_; +} + + } // namespace node diff --git a/src/node_v8_platform.h b/src/node_v8_platform.h index a51c3a10a6..44acd62db6 100644 --- a/src/node_v8_platform.h +++ b/src/node_v8_platform.h @@ -36,18 +36,15 @@ class TaskQueue { v8::Task* Shift(); private: - static const unsigned int kRingSize = 1024; - static const unsigned int kRingMask = kRingSize - 1; - - static_assert(kRingSize == (kRingSize & ~kRingMask), - "kRingSize is not a power of two"); - - uv_sem_t sem_; - uv_cond_t cond_; + static unsigned int next(unsigned int n); + bool can_read() const; + bool can_write() const; + uv_cond_t read_cond_; + uv_cond_t write_cond_; uv_mutex_t mutex_; unsigned int read_off_; unsigned int write_off_; - v8::Task* ring_[kRingSize]; + v8::Task* ring_[1024]; }; class Platform : public v8::Platform {