Browse Source

Various threading fixes.

cl-refactor
Gav Wood 10 years ago
parent
commit
7c3a920e1d
  1. 9
      CMakeLists.txt
  2. 2
      alethzero/CMakeLists.txt
  3. 4
      alethzero/Transact.cpp
  4. 28
      exp/main.cpp
  5. 4
      libdevcore/Worker.cpp
  6. 31
      libethcore/Miner.h
  7. 34
      libethereum/Client.cpp
  8. 9
      libethereum/Client.h
  9. 33
      libethereum/Farm.h

9
CMakeLists.txt

@ -244,6 +244,15 @@ elseif (BUNDLE STREQUAL "full")
set(TOOLS ON) set(TOOLS ON)
set(TESTS ON) set(TESTS ON)
set(FATDB ON) set(FATDB ON)
elseif (BUNDLE STREQUAL "core")
set(SERPENT OFF)
set(SOLIDITY ON)
set(USENPM OFF)
set(GUI ON)
set(NCURSES OFF)
set(TOOLS ON)
set(TESTS OFF)
set(FATDB ON)
elseif (BUNDLE STREQUAL "tests") elseif (BUNDLE STREQUAL "tests")
set(SERPENT ${DECENT_PLATFORM}) set(SERPENT ${DECENT_PLATFORM})
set(SOLIDITY ON) set(SOLIDITY ON)

2
alethzero/CMakeLists.txt

@ -59,7 +59,7 @@ target_link_libraries(${EXECUTABLE} jsqrc)
target_link_libraries(${EXECUTABLE} natspec) target_link_libraries(${EXECUTABLE} natspec)
target_link_libraries(${EXECUTABLE} ${MHD_LIBRARIES}) target_link_libraries(${EXECUTABLE} ${MHD_LIBRARIES})
if (NOT ("${CMAKE_CXX_COMPILER_ID}" STREQUAL "MSVC")) if (SERPENT)
target_link_libraries(${EXECUTABLE} serpent) target_link_libraries(${EXECUTABLE} serpent)
endif() endif()

4
alethzero/Transact.cpp

@ -37,7 +37,7 @@
#include <libnatspec/NatspecExpressionEvaluator.h> #include <libnatspec/NatspecExpressionEvaluator.h>
#include <libethereum/Client.h> #include <libethereum/Client.h>
#include <libethereum/Utility.h> #include <libethereum/Utility.h>
#ifndef _MSC_VER #if ETH_SERPENT
#include <libserpent/funcs.h> #include <libserpent/funcs.h>
#include <libserpent/util.h> #include <libserpent/util.h>
#endif #endif
@ -220,7 +220,7 @@ static tuple<vector<string>, bytes, string> userInputToCode(string const& _user,
errors.push_back("Solidity: Uncaught exception"); errors.push_back("Solidity: Uncaught exception");
} }
} }
#ifndef _MSC_VER #if ETH_SERPENT
else if (sourceIsSerpent(_user)) else if (sourceIsSerpent(_user))
{ {
try try

28
exp/main.cpp

@ -40,6 +40,7 @@
#include <libethereum/Farm.h> #include <libethereum/Farm.h>
#include <libethereum/AccountDiff.h> #include <libethereum/AccountDiff.h>
#include <libethereum/DownloadMan.h> #include <libethereum/DownloadMan.h>
#include <libethereum/Client.h>
#include <liblll/All.h> #include <liblll/All.h>
#include <libwhisper/WhisperPeer.h> #include <libwhisper/WhisperPeer.h>
#include <libwhisper/WhisperHost.h> #include <libwhisper/WhisperHost.h>
@ -153,7 +154,7 @@ int main()
return 0; return 0;
} }
#else #elif 0
void mine(State& s, BlockChain const& _bc) void mine(State& s, BlockChain const& _bc)
{ {
@ -169,7 +170,7 @@ void mine(State& s, BlockChain const& _bc)
while (!completed) while (!completed)
this_thread::sleep_for(chrono::milliseconds(20)); this_thread::sleep_for(chrono::milliseconds(20));
} }
#elif 0
int main() int main()
{ {
cnote << "Testing State..."; cnote << "Testing State...";
@ -224,5 +225,28 @@ int main()
return 0; return 0;
} }
#else
int main()
{
string tempDir = boost::filesystem::temp_directory_path().string() + "/" + toString(chrono::system_clock::now().time_since_epoch().count());
KeyPair myMiner = sha3("Gav's Miner");
p2p::Host net("Test");
cdebug << "Path:" << tempDir;
Client c(&net, tempDir);
c.setAddress(myMiner.address());
this_thread::sleep_for(chrono::milliseconds(1000));
c.startMining();
this_thread::sleep_for(chrono::milliseconds(6000));
c.stopMining();
return 0;
}
#endif #endif

4
libdevcore/Worker.cpp

@ -29,7 +29,7 @@ using namespace dev;
void Worker::startWorking(IfRunning _ir) void Worker::startWorking(IfRunning _ir)
{ {
cnote << "startWorking for thread" << m_name; // cnote << "startWorking for thread" << m_name;
Guard l(x_work); Guard l(x_work);
if (m_work && m_work->joinable()) if (m_work && m_work->joinable())
@ -56,7 +56,7 @@ void Worker::startWorking(IfRunning _ir)
void Worker::stopWorking() void Worker::stopWorking()
{ {
cnote << "stopWorking for thread" << m_name; // cnote << "stopWorking for thread" << m_name;
Guard l(x_work); Guard l(x_work);
if (!m_work || !m_work->joinable()) if (!m_work || !m_work->joinable())
return; return;

31
libethcore/Miner.h

@ -73,11 +73,12 @@ public:
* @param _finder The miner that found it. * @param _finder The miner that found it.
* @return true iff the solution was good (implying that mining should be . * @return true iff the solution was good (implying that mining should be .
*/ */
virtual bool submitProof(Solution const& _p, WorkPackage& io_wp, Miner* _finder) = 0; virtual bool submitProof(Solution const& _p, Miner* _finder) = 0;
}; };
/** /**
* @brief A miner - a member and adoptee of the Farm. * @brief A miner - a member and adoptee of the Farm.
* @warning Not threadsafe. It is assumed Farm will synchronise calls to/from this class.
*/ */
template <class PoW> class GenericMiner template <class PoW> class GenericMiner
{ {
@ -96,15 +97,17 @@ public:
void setWork(WorkPackage const& _work = WorkPackage()) void setWork(WorkPackage const& _work = WorkPackage())
{ {
Guard l(x_work);
auto old = m_work; auto old = m_work;
m_work = _work; {
if (!!m_work) Guard l(x_work);
m_work = _work;
}
if (!!_work)
{ {
pause(); pause();
kickOff(); kickOff();
} }
else if (!m_work && !!old) else if (!_work && !!old)
pause(); pause();
m_hashCount = 0; m_hashCount = 0;
} }
@ -137,16 +140,18 @@ protected:
*/ */
bool submitProof(Solution const& _s) bool submitProof(Solution const& _s)
{ {
if (m_farm) if (!m_farm)
return true;
if (m_farm->submitProof(_s, this))
{ {
if (!m_farm->submitProof(_s, m_work, this)) Guard l(x_work);
return false; m_work.reset();
return true; return true;
} }
return true; return false;
} }
WorkPackage const& work() const { return m_work; } WorkPackage const& work() const { Guard l(x_work); return m_work; }
void accumulateHashes(unsigned _n) { m_hashCount += _n; } void accumulateHashes(unsigned _n) { m_hashCount += _n; }
@ -154,10 +159,10 @@ private:
FarmFace* m_farm = nullptr; FarmFace* m_farm = nullptr;
unsigned m_index; unsigned m_index;
Mutex x_work;
WorkPackage m_work;
uint64_t m_hashCount = 0; uint64_t m_hashCount = 0;
WorkPackage m_work;
mutable Mutex x_work;
}; };
} }

34
libethereum/Client.cpp

@ -192,6 +192,27 @@ bool Client::isSyncing() const
return false; return false;
} }
void Client::startedWorking()
{
// Synchronise the state according to the head of the block chain.
// TODO: currently it contains keys for *all* blocks. Make it remove old ones.
cdebug << "startedWorking()";
WriteGuard l(x_stateDB);
cdebug << m_bc.number() << m_bc.currentHash();
cdebug << "Pre:" << m_preMine.info();
cdebug << "Post:" << m_postMine.info();
cdebug << "Pre:" << m_preMine.info().headerHash(WithoutNonce) << "; Post:" << m_postMine.info().headerHash(WithoutNonce);
m_preMine.sync(m_bc);
m_postMine = m_preMine;
cdebug << "Pre:" << m_preMine.info();
cdebug << "Post:" << m_postMine.info();
cdebug << "Pre:" << m_preMine.info().headerHash(WithoutNonce) << "; Post:" << m_postMine.info().headerHash(WithoutNonce);
}
void Client::doneWorking() void Client::doneWorking()
{ {
// Synchronise the state according to the head of the block chain. // Synchronise the state according to the head of the block chain.
@ -403,10 +424,11 @@ bool Client::submitWork(ProofOfWork::Solution const& _solution)
return false; return false;
newBlock = m_postMine.blockData(); newBlock = m_postMine.blockData();
} }
m_bq.import(&newBlock, m_bc);
/*
ImportRoute ir = m_bc.attemptImport(newBlock, m_stateDB); ImportRoute ir = m_bc.attemptImport(newBlock, m_stateDB);
if (!ir.first.empty()) if (!ir.first.empty())
onChainChanged(ir); onChainChanged(ir);*/
return true; return true;
} }
@ -500,9 +522,9 @@ void Client::onChainChanged(ImportRoute const& _ir)
m_postMine = m_preMine; m_postMine = m_preMine;
changeds.insert(PendingChangedFilter); changeds.insert(PendingChangedFilter);
x_stateDB.unlock(); x_stateDB.unlock_shared();
onPostStateChanged(); onPostStateChanged();
x_stateDB.lock(); x_stateDB.lock_shared();
} }
} }
@ -514,8 +536,12 @@ void Client::onPostStateChanged()
cnote << "Post state changed: Restarting mining..."; cnote << "Post state changed: Restarting mining...";
{ {
WriteGuard l(x_stateDB); WriteGuard l(x_stateDB);
cdebug << "Pre:" << m_preMine.info();
m_postMine.commitToMine(m_bc); m_postMine.commitToMine(m_bc);
m_miningInfo = m_postMine.info(); m_miningInfo = m_postMine.info();
cdebug << "Pre:" << m_preMine.info();
cdebug << "Post:" << m_postMine.info();
cdebug << "Pre:" << m_preMine.info().headerHash(WithoutNonce) << "; Post:" << m_postMine.info().headerHash(WithoutNonce);
} }
m_farm.setWork(m_miningInfo); m_farm.setWork(m_miningInfo);

9
libethereum/Client.h

@ -169,7 +169,7 @@ public:
/// Start mining. /// Start mining.
/// NOT thread-safe - call it & stopMining only from a single thread /// NOT thread-safe - call it & stopMining only from a single thread
void startMining() override { if (m_turboMining) m_farm.startGPU(); else m_farm.startCPU(); } void startMining() override { if (m_turboMining) m_farm.startGPU(); else m_farm.startCPU(); onPostStateChanged(); }
/// Stop mining. /// Stop mining.
/// NOT thread-safe /// NOT thread-safe
void stopMining() override { m_farm.stop(); } void stopMining() override { m_farm.stop(); }
@ -229,11 +229,14 @@ protected:
void noteChanged(h256Set const& _filters); void noteChanged(h256Set const& _filters);
private: private:
/// Called when Worker is starting.
void startedWorking() override;
/// Do some work. Handles blockchain maintenance and mining. /// Do some work. Handles blockchain maintenance and mining.
virtual void doWork(); void doWork() override;
/// Called when Worker is exiting. /// Called when Worker is exiting.
virtual void doneWorking(); void doneWorking() override;
/// Magically called when the chain has changed. An import route is provided. /// Magically called when the chain has changed. An import route is provided.
/// Called by either submitWork() or in our main thread through syncBlockQueue(). /// Called by either submitWork() or in our main thread through syncBlockQueue().

33
libethereum/Farm.h

@ -61,8 +61,7 @@ public:
*/ */
void setWork(BlockInfo const& _bi) void setWork(BlockInfo const& _bi)
{ {
WriteGuard l(x_work); WriteGuard l(x_minerWork);
ReadGuard l2(x_miners);
m_header = _bi; m_header = _bi;
auto p = PoW::package(m_header); auto p = PoW::package(m_header);
if (p.headerHash == m_work.headerHash) if (p.headerHash == m_work.headerHash)
@ -90,14 +89,14 @@ public:
*/ */
void stop() void stop()
{ {
WriteGuard l(x_miners); WriteGuard l(x_minerWork);
m_miners.clear(); m_miners.clear();
m_work.reset();
} }
bool isMining() const bool isMining() const
{ {
ReadGuard l(x_miners); return !!m_work;
return !m_miners.empty();
} }
/** /**
@ -109,7 +108,7 @@ public:
MiningProgress p; MiningProgress p;
p.ms = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - m_lastStart).count(); p.ms = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - m_lastStart).count();
{ {
ReadGuard l2(x_miners); ReadGuard l2(x_minerWork);
for (auto const& i: m_miners) for (auto const& i: m_miners)
p.hashes += i->hashCount(); p.hashes += i->hashCount();
} }
@ -127,7 +126,7 @@ public:
*/ */
void onSolutionFound(SolutionFound const& _handler) { m_onSolutionFound = _handler; } void onSolutionFound(SolutionFound const& _handler) { m_onSolutionFound = _handler; }
WorkPackage work() const { ReadGuard l(x_work); return m_work; } WorkPackage work() const { ReadGuard l(x_minerWork); return m_work; }
private: private:
/** /**
@ -136,19 +135,14 @@ private:
* @param _wp The WorkPackage that the Solution is for. * @param _wp The WorkPackage that the Solution is for.
* @return true iff the solution was good (implying that mining should be . * @return true iff the solution was good (implying that mining should be .
*/ */
bool submitProof(Solution const& _s, WorkPackage& _wp, Miner* _m) override bool submitProof(Solution const& _s, Miner* _m) override
{ {
ReadGuard l(x_work);
if (_wp.headerHash != m_work.headerHash)
return false;
if (m_onSolutionFound && m_onSolutionFound(_s)) if (m_onSolutionFound && m_onSolutionFound(_s))
{ {
ReadGuard l(x_miners); WriteGuard ul(x_minerWork);
for (std::shared_ptr<Miner> const& m: m_miners) for (std::shared_ptr<Miner> const& m: m_miners)
if (m.get() != _m) if (m.get() != _m)
m->setWork(); m->setWork();
_wp.reset();
m_work.reset(); m_work.reset();
return true; return true;
} }
@ -161,8 +155,7 @@ private:
template <class MinerType> template <class MinerType>
bool start() bool start()
{ {
ReadGuard l(x_work); WriteGuard l(x_minerWork);
WriteGuard l2(x_miners);
if (!m_miners.empty() && !!std::dynamic_pointer_cast<MinerType>(m_miners[0])) if (!m_miners.empty() && !!std::dynamic_pointer_cast<MinerType>(m_miners[0]))
return true; return true;
m_miners.clear(); m_miners.clear();
@ -181,17 +174,15 @@ private:
m_lastStart = std::chrono::steady_clock::now(); m_lastStart = std::chrono::steady_clock::now();
} }
mutable SharedMutex x_miners; mutable SharedMutex x_minerWork;
std::vector<std::shared_ptr<Miner>> m_miners; std::vector<std::shared_ptr<Miner>> m_miners;
WorkPackage m_work;
BlockInfo m_header;
mutable SharedMutex x_progress; mutable SharedMutex x_progress;
mutable MiningProgress m_progress; mutable MiningProgress m_progress;
std::chrono::steady_clock::time_point m_lastStart; std::chrono::steady_clock::time_point m_lastStart;
mutable SharedMutex x_work;
WorkPackage m_work;
BlockInfo m_header;
SolutionFound m_onSolutionFound; SolutionFound m_onSolutionFound;
}; };

Loading…
Cancel
Save