From 6aaee1f711081d70423697401e4fb7c9226528ab Mon Sep 17 00:00:00 2001 From: Gav Wood Date: Sat, 18 Apr 2015 23:50:39 +0200 Subject: [PATCH] Avoid threading issues. --- libdevcore/Log.cpp | 78 +++++++++++++++++++++++++++++++++- libdevcore/Log.h | 22 ++++++---- libdevcore/Worker.cpp | 87 +++++++++++++++++++++++--------------- libdevcore/Worker.h | 28 ++++++++---- libethcore/Ethash.cpp | 5 ++- libethereum/Farm.h | 15 ++++--- libp2p/Session.cpp | 2 + libwhisper/WhisperHost.cpp | 2 +- 8 files changed, 179 insertions(+), 60 deletions(-) diff --git a/libdevcore/Log.cpp b/libdevcore/Log.cpp index 7196ea358..0edc3e039 100644 --- a/libdevcore/Log.cpp +++ b/libdevcore/Log.cpp @@ -23,6 +23,7 @@ #include #include +#include #include "Guards.h" using namespace std; using namespace dev; @@ -31,13 +32,87 @@ using namespace dev; int dev::g_logVerbosity = 5; map dev::g_logOverride; -ThreadLocalLogName dev::t_logThreadName("main"); +/// Associate a name with each thread for nice logging. +struct ThreadLocalLogName +{ + ThreadLocalLogName(std::string const& _name) { m_name.reset(new string(_name)); } + boost::thread_specific_ptr m_name; +}; + +/// Associate a name with each thread for nice logging. +struct ThreadLocalLogContext +{ + ThreadLocalLogContext() = default; + + void push(std::string const& _name) + { + if (!m_contexts.get()) + m_contexts.reset(new vector); + m_contexts->push_back(_name); + } + + void pop() + { + m_contexts->pop_back(); + } + + string join(string const& _prior) + { + string ret; + if (m_contexts.get()) + for (auto const& i: *m_contexts) + ret += _prior + i; + return ret; + } + + boost::thread_specific_ptr> m_contexts; +}; + +ThreadLocalLogContext g_logThreadContext; + +ThreadLocalLogName g_logThreadName("main"); + +void dev::ThreadContext::push(string const& _n) +{ + g_logThreadContext.push(_n); +} + +void dev::ThreadContext::pop() +{ + g_logThreadContext.pop(); +} + +string dev::ThreadContext::join(string const& _prior) +{ + return g_logThreadContext.join(_prior); +} // foward declare without all of Windows.h #ifdef _WIN32 extern "C" __declspec(dllimport) void __stdcall OutputDebugStringA(const char* lpOutputString); #endif +string dev::getThreadName() +{ +#ifdef __linux__ + char buffer[128]; + pthread_getname_np(pthread_self(), buffer, 127); + buffer[127] = 0; + return buffer; +#else + return g_logThreadName.m_name.get() ? *g_logThreadName.m_name.get() : ""; +#endif +} + +void dev::setThreadName(string const& _n) +{ +#ifdef __linux__ + pthread_setname_np(pthread_self(), _n.c_str()); +#else + g_logThreadName.m_name.reset(new std::string(_n)); +#endif +} + void dev::simpleDebugOut(std::string const& _s, char const*) { static Mutex s_lock; @@ -55,4 +130,3 @@ void dev::simpleDebugOut(std::string const& _s, char const*) } std::function dev::g_logPost = simpleDebugOut; - diff --git a/libdevcore/Log.h b/libdevcore/Log.h index 2e111332c..812ec0886 100644 --- a/libdevcore/Log.h +++ b/libdevcore/Log.h @@ -53,18 +53,24 @@ extern std::function g_logPost; /// or equal to the currently output verbosity (g_logVerbosity). extern std::map g_logOverride; -/// Associate a name with each thread for nice logging. -struct ThreadLocalLogName +#define ETH_THREAD_CONTEXT(name) for (std::pair __eth_thread_context(name, true); p.second; p.second = false) + +class ThreadContext { - ThreadLocalLogName(std::string _name) { m_name.reset(new std::string(_name)); }; - boost::thread_specific_ptr m_name; +public: + ThreadContext(std::string const& _info) { push(_info); } + ~ThreadContext() { pop(); } + + static void push(std::string const& _n); + static void pop(); + static std::string join(std::string const& _prior); }; -/// The current thread's name. -extern ThreadLocalLogName t_logThreadName; +/// Set the current thread's log name. +void setThreadName(std::string const& _n); /// Set the current thread's log name. -inline void setThreadName(char const* _n) { t_logThreadName.m_name.reset(new std::string(_n)); } +std::string getThreadName(); /// The default logging channels. Each has an associated verbosity and three-letter prefix (name() ). /// Channels should inherit from LogChannel and define name() and verbosity. @@ -92,7 +98,7 @@ public: char buf[24]; if (strftime(buf, 24, "%X", localtime(&rawTime)) == 0) buf[0] = '\0'; // empty if case strftime fails - m_sstr << Id::name() << " [ " << buf << " | " << (t_logThreadName.m_name.get() ? *t_logThreadName.m_name.get() : std::string("")) << (_term ? " ] " : ""); + m_sstr << Id::name() << " [ " << buf << " | " << getThreadName() << ThreadContext::join(" | ") << (_term ? " ] " : ""); } } diff --git a/libdevcore/Worker.cpp b/libdevcore/Worker.cpp index 0f30a0aff..19c0d9751 100644 --- a/libdevcore/Worker.cpp +++ b/libdevcore/Worker.cpp @@ -27,54 +27,75 @@ using namespace std; using namespace dev; -void Worker::startWorking(IfRunning _ir) +void Worker::startWorking() { // cnote << "startWorking for thread" << m_name; Guard l(x_work); - - if (m_work && m_work->joinable()) - try { - if (_ir == IfRunning::Detach) - m_work->detach(); - else if (_ir == IfRunning::Join) - m_work->join(); - else - return; - } catch (...) {} - cnote << "Spawning" << m_name; - m_stop = false; - m_stopped = false; - m_work.reset(new thread([&]() + m_state = WorkerState::Starting; + if (!m_work) { - setThreadName(m_name.c_str()); - startedWorking(); - workLoop(); - cnote << "Finishing up worker thread"; - doneWorking(); - ETH_GUARDED(x_work) - m_work->detach(); - m_stopped = true; - })); + m_work.reset(new thread([&]() + { + setThreadName(m_name.c_str()); + while (m_state != WorkerState::Killing) + { + WorkerState ex = WorkerState::Starting; + m_state.compare_exchange_strong(ex, WorkerState::Started); + + startedWorking(); + workLoop(); + cnote << "Finishing up worker thread"; + doneWorking(); + +// ex = WorkerState::Stopping; +// m_state.compare_exchange_strong(ex, WorkerState::Stopped); + + ex = m_state.exchange(WorkerState::Stopped); + if (ex == WorkerState::Killing) + m_state.exchange(ex); + + while (m_state == WorkerState::Stopped) + this_thread::sleep_for(chrono::milliseconds(20)); + } + })); + cnote << "Spawning" << m_name; + } + while (m_state != WorkerState::Started) + this_thread::sleep_for(chrono::microseconds(20)); } void Worker::stopWorking() { // cnote << "stopWorking for thread" << m_name; ETH_GUARDED(x_work) - if (!m_work || !m_work->joinable()) - return; - cnote << "Stopping" << m_name; - m_stop = true; - while (!m_stopped) - this_thread::sleep_for(chrono::microseconds(50)); + if (m_work) + { + cnote << "Stopping" << m_name; + WorkerState ex = WorkerState::Started; + m_state.compare_exchange_strong(ex, WorkerState::Stopping); + + while (m_state != WorkerState::Stopped) + this_thread::sleep_for(chrono::microseconds(20)); + } +} + +void Worker::terminate() +{ +// cnote << "stopWorking for thread" << m_name; ETH_GUARDED(x_work) - m_work.reset(); - cnote << "Stopped" << m_name; + if (m_work) + { + cnote << "Terminating" << m_name; + m_state.exchange(WorkerState::Killing); + + m_work->join(); + m_work.reset(); + } } void Worker::workLoop() { - while (!m_stop) + while (m_state == WorkerState::Started) { if (m_idleWaitMs) this_thread::sleep_for(chrono::milliseconds(m_idleWaitMs)); diff --git a/libdevcore/Worker.h b/libdevcore/Worker.h index aad1dfc0e..fbc4d7042 100644 --- a/libdevcore/Worker.h +++ b/libdevcore/Worker.h @@ -36,6 +36,15 @@ enum class IfRunning Detach }; +enum class WorkerState +{ + Starting, + Started, + Stopping, + Stopped, + Killing +}; + class Worker { protected: @@ -47,19 +56,19 @@ protected: /// Move-assignment. Worker& operator=(Worker&& _m) { std::swap(m_name, _m.m_name); return *this; } - virtual ~Worker() { stopWorking(); } + virtual ~Worker() { terminate(); } /// Allows changing worker name if work is stopped. void setName(std::string _n) { if (!isWorking()) m_name = _n; } /// Starts worker thread; causes startedWorking() to be called. - void startWorking(IfRunning _ir = IfRunning::Fail); + void startWorking(); /// Stop worker thread; causes call to stopWorking(). void stopWorking(); - + /// Returns if worker thread is present. - bool isWorking() const { Guard l(x_work); return !!m_work && m_work->joinable(); } + bool isWorking() const { Guard l(x_work); return m_state == WorkerState::Started; } /// Called after thread is started from startWorking(). virtual void startedWorking() {} @@ -69,22 +78,25 @@ protected: /// Overrides doWork(); should call shouldStop() often and exit when true. virtual void workLoop(); - bool shouldStop() const { return m_stop; } + bool shouldStop() const { return m_state != WorkerState::Started; } /// Called when is to be stopped, just prior to thread being joined. virtual void doneWorking() {} /// Blocks caller into worker thread has finished. - void join() const { Guard l(x_work); try { if (m_work) m_work->join(); } catch (...) {} } +// void join() const { Guard l(x_work); try { if (m_work) m_work->join(); } catch (...) {} } private: + /// Stop and never start again. + void terminate(); + std::string m_name; + unsigned m_idleWaitMs = 0; mutable Mutex x_work; ///< Lock for the network existance. std::unique_ptr m_work; ///< The network thread. - std::atomic m_stop = {false}; - std::atomic m_stopped = {false}; + std::atomic m_state = {WorkerState::Starting}; }; } diff --git a/libethcore/Ethash.cpp b/libethcore/Ethash.cpp index 16d17b1e8..8e31b3c51 100644 --- a/libethcore/Ethash.cpp +++ b/libethcore/Ethash.cpp @@ -237,7 +237,7 @@ protected: return true; } } - return false; + return m_owner->shouldStop(); } virtual bool searched(uint64_t _startNonce, uint32_t _count) override @@ -246,7 +246,7 @@ protected: // std::cerr << "Searched " << _count << " from " << _startNonce << std::endl; m_owner->accumulateHashes(_count); m_last = _startNonce + _count; - if (m_abort) + if (m_abort || m_owner->shouldStop()) { m_aborted = true; return true; @@ -266,6 +266,7 @@ unsigned Ethash::GPUMiner::s_deviceId = 0; Ethash::GPUMiner::GPUMiner(ConstructionInfo const& _ci): Miner(_ci), + Worker("gpuminer"), m_hook(new EthashCLHook(this)) { } diff --git a/libethereum/Farm.h b/libethereum/Farm.h index 6263faf1b..fda9d64c3 100644 --- a/libethereum/Farm.h +++ b/libethereum/Farm.h @@ -155,12 +155,15 @@ private: { if (m_onSolutionFound && m_onSolutionFound(_s)) { - WriteGuard ul(x_minerWork); - for (std::shared_ptr const& m: m_miners) - if (m.get() != _m) - m->setWork(); - m_work.reset(); - return true; + if (x_minerWork.try_lock()) + { + for (std::shared_ptr const& m: m_miners) + if (m.get() != _m) + m->setWork(); + m_work.reset(); + x_minerWork.unlock(); + return true; + } } return false; } diff --git a/libp2p/Session.cpp b/libp2p/Session.cpp index 0ea7c33e2..06cd7f7a1 100644 --- a/libp2p/Session.cpp +++ b/libp2p/Session.cpp @@ -398,6 +398,7 @@ void Session::doRead() auto self(shared_from_this()); ba::async_read(m_socket, boost::asio::buffer(m_data, h256::size), [this,self](boost::system::error_code ec, std::size_t length) { + ThreadContext tc(toString(socketId())); if (ec && ec.category() != boost::asio::error::get_misc_category() && ec.value() != boost::asio::error::eof) { clogS(NetWarn) << "Error reading: " << ec.message(); @@ -433,6 +434,7 @@ void Session::doRead() auto tlen = frameSize + ((16 - (frameSize % 16)) % 16) + h128::size; ba::async_read(m_socket, boost::asio::buffer(m_data, tlen), [this, self, headerRLP, frameSize, tlen](boost::system::error_code ec, std::size_t length) { + ThreadContext tc(toString(socketId())); if (ec && ec.category() != boost::asio::error::get_misc_category() && ec.value() != boost::asio::error::eof) { clogS(NetWarn) << "Error reading: " << ec.message(); diff --git a/libwhisper/WhisperHost.cpp b/libwhisper/WhisperHost.cpp index 97a24e112..281ad2bc5 100644 --- a/libwhisper/WhisperHost.cpp +++ b/libwhisper/WhisperHost.cpp @@ -34,7 +34,7 @@ using namespace dev::shh; #endif #define clogS(X) dev::LogOutputStream(false) << "| " << std::setw(2) << session()->socketId() << "] " -WhisperHost::WhisperHost() +WhisperHost::WhisperHost(): Worker("shh") { }