Browse Source

Merge pull request #2542 from chfast/concurrent_queue

Concurrent queue
cl-refactor
Gav Wood 10 years ago
parent
commit
14ab4ab91b
  1. 60
      libdevcore/concurrent_queue.h
  2. 43
      libevm/SmartVM.cpp

60
libdevcore/concurrent_queue.h

@ -0,0 +1,60 @@
/*
This file is part of cpp-ethereum.
cpp-ethereum is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
cpp-ethereum is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with cpp-ethereum. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include <utility>
#include <queue>
#include <condition_variable>
#include <mutex>
namespace dev
{
/// Concurrent queue.
/// You can push and pop elements to/from the queue. Pop will block until the queue is not empty.
/// The default backend (_QueueT) is std::queue. It can be changed to any type that has
/// proper push(), pop(), empty() and front() methods.
template<typename _T, typename _QueueT = std::queue<_T>>
class concurrent_queue
{
public:
template<typename _U>
void push(_U&& _elem)
{
{
std::lock_guard<decltype(x_mutex)> guard{x_mutex};
m_queue.push(std::forward<_U>(_elem));
}
m_cv.notify_one();
}
_T pop()
{
std::unique_lock<std::mutex> lock{x_mutex};
m_cv.wait(lock, [this]{ return !m_queue.empty(); });
auto item = std::move(m_queue.front());
m_queue.pop();
return item;
}
private:
_QueueT m_queue;
std::mutex x_mutex;
std::condition_variable m_cv;
};
}

43
libevm/SmartVM.cpp

@ -21,9 +21,7 @@
#include "SmartVM.h" #include "SmartVM.h"
#include <unordered_map> #include <unordered_map>
#include <thread> #include <thread>
#include <mutex> #include <libdevcore/concurrent_queue.h>
#include <condition_variable>
#include <queue>
#include <libdevcore/Log.h> #include <libdevcore/Log.h>
#include <libdevcore/SHA3.h> #include <libdevcore/SHA3.h>
#include <libdevcore/Guards.h> #include <libdevcore/Guards.h>
@ -51,34 +49,26 @@ namespace
{ {
bytes code; bytes code;
h256 codeHash; h256 codeHash;
static JitTask createStopSentinel() { return JitTask(); }
bool isStopSentinel()
{
assert((!code.empty() || !codeHash) && "'empty code => empty hash' invariand failed");
return code.empty();
}
}; };
class JitWorker class JitWorker
{ {
bool m_finished = false;
std::mutex x_mutex;
std::condition_variable m_cv;
std::thread m_worker; std::thread m_worker;
std::queue<JitTask> m_queue; concurrent_queue<JitTask> m_queue;
bool pop(JitTask& o_task)
{
std::unique_lock<std::mutex> lock{x_mutex};
m_cv.wait(lock, [this]{ return m_finished || !m_queue.empty(); });
if (m_finished)
return false;
assert(!m_queue.empty());
o_task = std::move(m_queue.front());
m_queue.pop();
return true;
}
void work() void work()
{ {
clog(JitInfo) << "JIT worker started."; clog(JitInfo) << "JIT worker started.";
JitTask task; JitTask task;
while (pop(task)) while (!(task = m_queue.pop()).isStopSentinel())
{ {
clog(JitInfo) << "Compilation... " << task.codeHash; clog(JitInfo) << "Compilation... " << task.codeHash;
evmjit::JIT::compile(task.code.data(), task.code.size(), eth2jit(task.codeHash)); evmjit::JIT::compile(task.code.data(), task.code.size(), eth2jit(task.codeHash));
@ -93,18 +83,11 @@ namespace
~JitWorker() ~JitWorker()
{ {
DEV_GUARDED(x_mutex) push(JitTask::createStopSentinel());
m_finished = true;
m_cv.notify_one();
m_worker.join(); m_worker.join();
} }
void push(JitTask&& _task) void push(JitTask&& _task) { m_queue.push(std::move(_task)); }
{
DEV_GUARDED(x_mutex)
m_queue.push(std::move(_task));
m_cv.notify_one();
}
}; };
} }

Loading…
Cancel
Save