Browse Source

Avoid threading issues.

cl-refactor
Gav Wood 10 years ago
parent
commit
6aaee1f711
  1. 78
      libdevcore/Log.cpp
  2. 22
      libdevcore/Log.h
  3. 87
      libdevcore/Worker.cpp
  4. 26
      libdevcore/Worker.h
  5. 5
      libethcore/Ethash.cpp
  6. 15
      libethereum/Farm.h
  7. 2
      libp2p/Session.cpp
  8. 2
      libwhisper/WhisperHost.cpp

78
libdevcore/Log.cpp

@ -23,6 +23,7 @@
#include <string> #include <string>
#include <iostream> #include <iostream>
#include <thread>
#include "Guards.h" #include "Guards.h"
using namespace std; using namespace std;
using namespace dev; using namespace dev;
@ -31,13 +32,87 @@ using namespace dev;
int dev::g_logVerbosity = 5; int dev::g_logVerbosity = 5;
map<type_info const*, bool> dev::g_logOverride; map<type_info const*, bool> dev::g_logOverride;
ThreadLocalLogName dev::t_logThreadName("main"); /// Associate a name with each thread for nice logging.
struct ThreadLocalLogName
{
ThreadLocalLogName(std::string const& _name) { m_name.reset(new string(_name)); }
boost::thread_specific_ptr<std::string> m_name;
};
/// Associate a name with each thread for nice logging.
struct ThreadLocalLogContext
{
ThreadLocalLogContext() = default;
void push(std::string const& _name)
{
if (!m_contexts.get())
m_contexts.reset(new vector<string>);
m_contexts->push_back(_name);
}
void pop()
{
m_contexts->pop_back();
}
string join(string const& _prior)
{
string ret;
if (m_contexts.get())
for (auto const& i: *m_contexts)
ret += _prior + i;
return ret;
}
boost::thread_specific_ptr<std::vector<std::string>> m_contexts;
};
ThreadLocalLogContext g_logThreadContext;
ThreadLocalLogName g_logThreadName("main");
void dev::ThreadContext::push(string const& _n)
{
g_logThreadContext.push(_n);
}
void dev::ThreadContext::pop()
{
g_logThreadContext.pop();
}
string dev::ThreadContext::join(string const& _prior)
{
return g_logThreadContext.join(_prior);
}
// foward declare without all of Windows.h // foward declare without all of Windows.h
#ifdef _WIN32 #ifdef _WIN32
extern "C" __declspec(dllimport) void __stdcall OutputDebugStringA(const char* lpOutputString); extern "C" __declspec(dllimport) void __stdcall OutputDebugStringA(const char* lpOutputString);
#endif #endif
string dev::getThreadName()
{
#ifdef __linux__
char buffer[128];
pthread_getname_np(pthread_self(), buffer, 127);
buffer[127] = 0;
return buffer;
#else
return g_logThreadName.m_name.get() ? *g_logThreadName.m_name.get() : "<unknown>";
#endif
}
void dev::setThreadName(string const& _n)
{
#ifdef __linux__
pthread_setname_np(pthread_self(), _n.c_str());
#else
g_logThreadName.m_name.reset(new std::string(_n));
#endif
}
void dev::simpleDebugOut(std::string const& _s, char const*) void dev::simpleDebugOut(std::string const& _s, char const*)
{ {
static Mutex s_lock; static Mutex s_lock;
@ -55,4 +130,3 @@ void dev::simpleDebugOut(std::string const& _s, char const*)
} }
std::function<void(std::string const&, char const*)> dev::g_logPost = simpleDebugOut; std::function<void(std::string const&, char const*)> dev::g_logPost = simpleDebugOut;

22
libdevcore/Log.h

@ -53,18 +53,24 @@ extern std::function<void(std::string const&, char const*)> g_logPost;
/// or equal to the currently output verbosity (g_logVerbosity). /// or equal to the currently output verbosity (g_logVerbosity).
extern std::map<std::type_info const*, bool> g_logOverride; extern std::map<std::type_info const*, bool> g_logOverride;
/// Associate a name with each thread for nice logging. #define ETH_THREAD_CONTEXT(name) for (std::pair<dev::ThreadContext, bool> __eth_thread_context(name, true); p.second; p.second = false)
struct ThreadLocalLogName
class ThreadContext
{ {
ThreadLocalLogName(std::string _name) { m_name.reset(new std::string(_name)); }; public:
boost::thread_specific_ptr<std::string> m_name; ThreadContext(std::string const& _info) { push(_info); }
~ThreadContext() { pop(); }
static void push(std::string const& _n);
static void pop();
static std::string join(std::string const& _prior);
}; };
/// The current thread's name. /// Set the current thread's log name.
extern ThreadLocalLogName t_logThreadName; void setThreadName(std::string const& _n);
/// Set the current thread's log name. /// Set the current thread's log name.
inline void setThreadName(char const* _n) { t_logThreadName.m_name.reset(new std::string(_n)); } std::string getThreadName();
/// The default logging channels. Each has an associated verbosity and three-letter prefix (name() ). /// The default logging channels. Each has an associated verbosity and three-letter prefix (name() ).
/// Channels should inherit from LogChannel and define name() and verbosity. /// Channels should inherit from LogChannel and define name() and verbosity.
@ -92,7 +98,7 @@ public:
char buf[24]; char buf[24];
if (strftime(buf, 24, "%X", localtime(&rawTime)) == 0) if (strftime(buf, 24, "%X", localtime(&rawTime)) == 0)
buf[0] = '\0'; // empty if case strftime fails buf[0] = '\0'; // empty if case strftime fails
m_sstr << Id::name() << " [ " << buf << " | " << (t_logThreadName.m_name.get() ? *t_logThreadName.m_name.get() : std::string("<unknown>")) << (_term ? " ] " : ""); m_sstr << Id::name() << " [ " << buf << " | " << getThreadName() << ThreadContext::join(" | ") << (_term ? " ] " : "");
} }
} }

87
libdevcore/Worker.cpp

@ -27,54 +27,75 @@
using namespace std; using namespace std;
using namespace dev; using namespace dev;
void Worker::startWorking(IfRunning _ir) void Worker::startWorking()
{ {
// cnote << "startWorking for thread" << m_name; // cnote << "startWorking for thread" << m_name;
Guard l(x_work); Guard l(x_work);
m_state = WorkerState::Starting;
if (m_work && m_work->joinable()) if (!m_work)
try {
if (_ir == IfRunning::Detach)
m_work->detach();
else if (_ir == IfRunning::Join)
m_work->join();
else
return;
} catch (...) {}
cnote << "Spawning" << m_name;
m_stop = false;
m_stopped = false;
m_work.reset(new thread([&]()
{ {
setThreadName(m_name.c_str()); m_work.reset(new thread([&]()
startedWorking(); {
workLoop(); setThreadName(m_name.c_str());
cnote << "Finishing up worker thread"; while (m_state != WorkerState::Killing)
doneWorking(); {
ETH_GUARDED(x_work) WorkerState ex = WorkerState::Starting;
m_work->detach(); m_state.compare_exchange_strong(ex, WorkerState::Started);
m_stopped = true;
})); startedWorking();
workLoop();
cnote << "Finishing up worker thread";
doneWorking();
// ex = WorkerState::Stopping;
// m_state.compare_exchange_strong(ex, WorkerState::Stopped);
ex = m_state.exchange(WorkerState::Stopped);
if (ex == WorkerState::Killing)
m_state.exchange(ex);
while (m_state == WorkerState::Stopped)
this_thread::sleep_for(chrono::milliseconds(20));
}
}));
cnote << "Spawning" << m_name;
}
while (m_state != WorkerState::Started)
this_thread::sleep_for(chrono::microseconds(20));
} }
void Worker::stopWorking() void Worker::stopWorking()
{ {
// cnote << "stopWorking for thread" << m_name; // cnote << "stopWorking for thread" << m_name;
ETH_GUARDED(x_work) ETH_GUARDED(x_work)
if (!m_work || !m_work->joinable()) if (m_work)
return; {
cnote << "Stopping" << m_name; cnote << "Stopping" << m_name;
m_stop = true; WorkerState ex = WorkerState::Started;
while (!m_stopped) m_state.compare_exchange_strong(ex, WorkerState::Stopping);
this_thread::sleep_for(chrono::microseconds(50));
while (m_state != WorkerState::Stopped)
this_thread::sleep_for(chrono::microseconds(20));
}
}
void Worker::terminate()
{
// cnote << "stopWorking for thread" << m_name;
ETH_GUARDED(x_work) ETH_GUARDED(x_work)
m_work.reset(); if (m_work)
cnote << "Stopped" << m_name; {
cnote << "Terminating" << m_name;
m_state.exchange(WorkerState::Killing);
m_work->join();
m_work.reset();
}
} }
void Worker::workLoop() void Worker::workLoop()
{ {
while (!m_stop) while (m_state == WorkerState::Started)
{ {
if (m_idleWaitMs) if (m_idleWaitMs)
this_thread::sleep_for(chrono::milliseconds(m_idleWaitMs)); this_thread::sleep_for(chrono::milliseconds(m_idleWaitMs));

26
libdevcore/Worker.h

@ -36,6 +36,15 @@ enum class IfRunning
Detach Detach
}; };
enum class WorkerState
{
Starting,
Started,
Stopping,
Stopped,
Killing
};
class Worker class Worker
{ {
protected: protected:
@ -47,19 +56,19 @@ protected:
/// Move-assignment. /// Move-assignment.
Worker& operator=(Worker&& _m) { std::swap(m_name, _m.m_name); return *this; } Worker& operator=(Worker&& _m) { std::swap(m_name, _m.m_name); return *this; }
virtual ~Worker() { stopWorking(); } virtual ~Worker() { terminate(); }
/// Allows changing worker name if work is stopped. /// Allows changing worker name if work is stopped.
void setName(std::string _n) { if (!isWorking()) m_name = _n; } void setName(std::string _n) { if (!isWorking()) m_name = _n; }
/// Starts worker thread; causes startedWorking() to be called. /// Starts worker thread; causes startedWorking() to be called.
void startWorking(IfRunning _ir = IfRunning::Fail); void startWorking();
/// Stop worker thread; causes call to stopWorking(). /// Stop worker thread; causes call to stopWorking().
void stopWorking(); void stopWorking();
/// Returns if worker thread is present. /// Returns if worker thread is present.
bool isWorking() const { Guard l(x_work); return !!m_work && m_work->joinable(); } bool isWorking() const { Guard l(x_work); return m_state == WorkerState::Started; }
/// Called after thread is started from startWorking(). /// Called after thread is started from startWorking().
virtual void startedWorking() {} virtual void startedWorking() {}
@ -69,22 +78,25 @@ protected:
/// Overrides doWork(); should call shouldStop() often and exit when true. /// Overrides doWork(); should call shouldStop() often and exit when true.
virtual void workLoop(); virtual void workLoop();
bool shouldStop() const { return m_stop; } bool shouldStop() const { return m_state != WorkerState::Started; }
/// Called when is to be stopped, just prior to thread being joined. /// Called when is to be stopped, just prior to thread being joined.
virtual void doneWorking() {} virtual void doneWorking() {}
/// Blocks caller into worker thread has finished. /// Blocks caller into worker thread has finished.
void join() const { Guard l(x_work); try { if (m_work) m_work->join(); } catch (...) {} } // void join() const { Guard l(x_work); try { if (m_work) m_work->join(); } catch (...) {} }
private: private:
/// Stop and never start again.
void terminate();
std::string m_name; std::string m_name;
unsigned m_idleWaitMs = 0; unsigned m_idleWaitMs = 0;
mutable Mutex x_work; ///< Lock for the network existance. mutable Mutex x_work; ///< Lock for the network existance.
std::unique_ptr<std::thread> m_work; ///< The network thread. std::unique_ptr<std::thread> m_work; ///< The network thread.
std::atomic<bool> m_stop = {false}; std::atomic<WorkerState> m_state = {WorkerState::Starting};
std::atomic<bool> m_stopped = {false};
}; };
} }

5
libethcore/Ethash.cpp

@ -237,7 +237,7 @@ protected:
return true; return true;
} }
} }
return false; return m_owner->shouldStop();
} }
virtual bool searched(uint64_t _startNonce, uint32_t _count) override virtual bool searched(uint64_t _startNonce, uint32_t _count) override
@ -246,7 +246,7 @@ protected:
// std::cerr << "Searched " << _count << " from " << _startNonce << std::endl; // std::cerr << "Searched " << _count << " from " << _startNonce << std::endl;
m_owner->accumulateHashes(_count); m_owner->accumulateHashes(_count);
m_last = _startNonce + _count; m_last = _startNonce + _count;
if (m_abort) if (m_abort || m_owner->shouldStop())
{ {
m_aborted = true; m_aborted = true;
return true; return true;
@ -266,6 +266,7 @@ unsigned Ethash::GPUMiner::s_deviceId = 0;
Ethash::GPUMiner::GPUMiner(ConstructionInfo const& _ci): Ethash::GPUMiner::GPUMiner(ConstructionInfo const& _ci):
Miner(_ci), Miner(_ci),
Worker("gpuminer"),
m_hook(new EthashCLHook(this)) m_hook(new EthashCLHook(this))
{ {
} }

15
libethereum/Farm.h

@ -155,12 +155,15 @@ private:
{ {
if (m_onSolutionFound && m_onSolutionFound(_s)) if (m_onSolutionFound && m_onSolutionFound(_s))
{ {
WriteGuard ul(x_minerWork); if (x_minerWork.try_lock())
for (std::shared_ptr<Miner> const& m: m_miners) {
if (m.get() != _m) for (std::shared_ptr<Miner> const& m: m_miners)
m->setWork(); if (m.get() != _m)
m_work.reset(); m->setWork();
return true; m_work.reset();
x_minerWork.unlock();
return true;
}
} }
return false; return false;
} }

2
libp2p/Session.cpp

@ -398,6 +398,7 @@ void Session::doRead()
auto self(shared_from_this()); auto self(shared_from_this());
ba::async_read(m_socket, boost::asio::buffer(m_data, h256::size), [this,self](boost::system::error_code ec, std::size_t length) ba::async_read(m_socket, boost::asio::buffer(m_data, h256::size), [this,self](boost::system::error_code ec, std::size_t length)
{ {
ThreadContext tc(toString(socketId()));
if (ec && ec.category() != boost::asio::error::get_misc_category() && ec.value() != boost::asio::error::eof) if (ec && ec.category() != boost::asio::error::get_misc_category() && ec.value() != boost::asio::error::eof)
{ {
clogS(NetWarn) << "Error reading: " << ec.message(); clogS(NetWarn) << "Error reading: " << ec.message();
@ -433,6 +434,7 @@ void Session::doRead()
auto tlen = frameSize + ((16 - (frameSize % 16)) % 16) + h128::size; auto tlen = frameSize + ((16 - (frameSize % 16)) % 16) + h128::size;
ba::async_read(m_socket, boost::asio::buffer(m_data, tlen), [this, self, headerRLP, frameSize, tlen](boost::system::error_code ec, std::size_t length) ba::async_read(m_socket, boost::asio::buffer(m_data, tlen), [this, self, headerRLP, frameSize, tlen](boost::system::error_code ec, std::size_t length)
{ {
ThreadContext tc(toString(socketId()));
if (ec && ec.category() != boost::asio::error::get_misc_category() && ec.value() != boost::asio::error::eof) if (ec && ec.category() != boost::asio::error::get_misc_category() && ec.value() != boost::asio::error::eof)
{ {
clogS(NetWarn) << "Error reading: " << ec.message(); clogS(NetWarn) << "Error reading: " << ec.message();

2
libwhisper/WhisperHost.cpp

@ -34,7 +34,7 @@ using namespace dev::shh;
#endif #endif
#define clogS(X) dev::LogOutputStream<X, true>(false) << "| " << std::setw(2) << session()->socketId() << "] " #define clogS(X) dev::LogOutputStream<X, true>(false) << "| " << std::setw(2) << session()->socketId() << "] "
WhisperHost::WhisperHost() WhisperHost::WhisperHost(): Worker("shh")
{ {
} }

Loading…
Cancel
Save