Browse Source

Various threading fixes.

cl-refactor
Gav Wood 10 years ago
parent
commit
192761a4b1
  1. 9
      CMakeLists.txt
  2. 2
      alethzero/CMakeLists.txt
  3. 4
      alethzero/Transact.cpp
  4. 28
      exp/main.cpp
  5. 4
      libdevcore/Worker.cpp
  6. 31
      libethcore/Miner.h
  7. 34
      libethereum/Client.cpp
  8. 9
      libethereum/Client.h
  9. 33
      libethereum/Farm.h

9
CMakeLists.txt

@ -243,6 +243,15 @@ elseif (BUNDLE STREQUAL "full")
set(TOOLS ON)
set(TESTS ON)
set(FATDB ON)
elseif (BUNDLE STREQUAL "core")
set(SERPENT OFF)
set(SOLIDITY ON)
set(USENPM OFF)
set(GUI ON)
set(NCURSES OFF)
set(TOOLS ON)
set(TESTS OFF)
set(FATDB ON)
elseif (BUNDLE STREQUAL "tests")
set(SERPENT ${DECENT_PLATFORM})
set(SOLIDITY ON)

2
alethzero/CMakeLists.txt

@ -59,7 +59,7 @@ target_link_libraries(${EXECUTABLE} jsqrc)
target_link_libraries(${EXECUTABLE} natspec)
target_link_libraries(${EXECUTABLE} ${MHD_LIBRARIES})
if (NOT ("${CMAKE_CXX_COMPILER_ID}" STREQUAL "MSVC"))
if (SERPENT)
target_link_libraries(${EXECUTABLE} serpent)
endif()

4
alethzero/Transact.cpp

@ -37,7 +37,7 @@
#include <libnatspec/NatspecExpressionEvaluator.h>
#include <libethereum/Client.h>
#include <libethereum/Utility.h>
#ifndef _MSC_VER
#if ETH_SERPENT
#include <libserpent/funcs.h>
#include <libserpent/util.h>
#endif
@ -220,7 +220,7 @@ static tuple<vector<string>, bytes, string> userInputToCode(string const& _user,
errors.push_back("Solidity: Uncaught exception");
}
}
#ifndef _MSC_VER
#if ETH_SERPENT
else if (sourceIsSerpent(_user))
{
try

28
exp/main.cpp

@ -40,6 +40,7 @@
#include <libethereum/Farm.h>
#include <libethereum/AccountDiff.h>
#include <libethereum/DownloadMan.h>
#include <libethereum/Client.h>
#include <liblll/All.h>
#include <libwhisper/WhisperPeer.h>
#include <libwhisper/WhisperHost.h>
@ -153,7 +154,7 @@ int main()
return 0;
}
#else
#elif 0
void mine(State& s, BlockChain const& _bc)
{
@ -169,7 +170,7 @@ void mine(State& s, BlockChain const& _bc)
while (!completed)
this_thread::sleep_for(chrono::milliseconds(20));
}
#elif 0
int main()
{
cnote << "Testing State...";
@ -224,5 +225,28 @@ int main()
return 0;
}
#else
int main()
{
string tempDir = boost::filesystem::temp_directory_path().string() + "/" + toString(chrono::system_clock::now().time_since_epoch().count());
KeyPair myMiner = sha3("Gav's Miner");
p2p::Host net("Test");
cdebug << "Path:" << tempDir;
Client c(&net, tempDir);
c.setAddress(myMiner.address());
this_thread::sleep_for(chrono::milliseconds(1000));
c.startMining();
this_thread::sleep_for(chrono::milliseconds(6000));
c.stopMining();
return 0;
}
#endif

4
libdevcore/Worker.cpp

@ -29,7 +29,7 @@ using namespace dev;
void Worker::startWorking(IfRunning _ir)
{
cnote << "startWorking for thread" << m_name;
// cnote << "startWorking for thread" << m_name;
Guard l(x_work);
if (m_work && m_work->joinable())
@ -56,7 +56,7 @@ void Worker::startWorking(IfRunning _ir)
void Worker::stopWorking()
{
cnote << "stopWorking for thread" << m_name;
// cnote << "stopWorking for thread" << m_name;
Guard l(x_work);
if (!m_work || !m_work->joinable())
return;

31
libethcore/Miner.h

@ -73,11 +73,12 @@ public:
* @param _finder The miner that found it.
* @return true iff the solution was good (implying that mining should be .
*/
virtual bool submitProof(Solution const& _p, WorkPackage& io_wp, Miner* _finder) = 0;
virtual bool submitProof(Solution const& _p, Miner* _finder) = 0;
};
/**
* @brief A miner - a member and adoptee of the Farm.
* @warning Not threadsafe. It is assumed Farm will synchronise calls to/from this class.
*/
template <class PoW> class GenericMiner
{
@ -96,15 +97,17 @@ public:
void setWork(WorkPackage const& _work = WorkPackage())
{
Guard l(x_work);
auto old = m_work;
m_work = _work;
if (!!m_work)
{
Guard l(x_work);
m_work = _work;
}
if (!!_work)
{
pause();
kickOff();
}
else if (!m_work && !!old)
else if (!_work && !!old)
pause();
m_hashCount = 0;
}
@ -137,16 +140,18 @@ protected:
*/
bool submitProof(Solution const& _s)
{
if (m_farm)
if (!m_farm)
return true;
if (m_farm->submitProof(_s, this))
{
if (!m_farm->submitProof(_s, m_work, this))
return false;
Guard l(x_work);
m_work.reset();
return true;
}
return true;
return false;
}
WorkPackage const& work() const { return m_work; }
WorkPackage const& work() const { Guard l(x_work); return m_work; }
void accumulateHashes(unsigned _n) { m_hashCount += _n; }
@ -154,10 +159,10 @@ private:
FarmFace* m_farm = nullptr;
unsigned m_index;
Mutex x_work;
WorkPackage m_work;
uint64_t m_hashCount = 0;
WorkPackage m_work;
mutable Mutex x_work;
};
}

34
libethereum/Client.cpp

@ -192,6 +192,27 @@ bool Client::isSyncing() const
return false;
}
void Client::startedWorking()
{
// Synchronise the state according to the head of the block chain.
// TODO: currently it contains keys for *all* blocks. Make it remove old ones.
cdebug << "startedWorking()";
WriteGuard l(x_stateDB);
cdebug << m_bc.number() << m_bc.currentHash();
cdebug << "Pre:" << m_preMine.info();
cdebug << "Post:" << m_postMine.info();
cdebug << "Pre:" << m_preMine.info().headerHash(WithoutNonce) << "; Post:" << m_postMine.info().headerHash(WithoutNonce);
m_preMine.sync(m_bc);
m_postMine = m_preMine;
cdebug << "Pre:" << m_preMine.info();
cdebug << "Post:" << m_postMine.info();
cdebug << "Pre:" << m_preMine.info().headerHash(WithoutNonce) << "; Post:" << m_postMine.info().headerHash(WithoutNonce);
}
void Client::doneWorking()
{
// Synchronise the state according to the head of the block chain.
@ -403,10 +424,11 @@ bool Client::submitWork(ProofOfWork::Solution const& _solution)
return false;
newBlock = m_postMine.blockData();
}
m_bq.import(&newBlock, m_bc);
/*
ImportRoute ir = m_bc.attemptImport(newBlock, m_stateDB);
if (!ir.first.empty())
onChainChanged(ir);
onChainChanged(ir);*/
return true;
}
@ -500,9 +522,9 @@ void Client::onChainChanged(ImportRoute const& _ir)
m_postMine = m_preMine;
changeds.insert(PendingChangedFilter);
x_stateDB.unlock();
x_stateDB.unlock_shared();
onPostStateChanged();
x_stateDB.lock();
x_stateDB.lock_shared();
}
}
@ -514,8 +536,12 @@ void Client::onPostStateChanged()
cnote << "Post state changed: Restarting mining...";
{
WriteGuard l(x_stateDB);
cdebug << "Pre:" << m_preMine.info();
m_postMine.commitToMine(m_bc);
m_miningInfo = m_postMine.info();
cdebug << "Pre:" << m_preMine.info();
cdebug << "Post:" << m_postMine.info();
cdebug << "Pre:" << m_preMine.info().headerHash(WithoutNonce) << "; Post:" << m_postMine.info().headerHash(WithoutNonce);
}
m_farm.setWork(m_miningInfo);

9
libethereum/Client.h

@ -169,7 +169,7 @@ public:
/// Start mining.
/// NOT thread-safe - call it & stopMining only from a single thread
void startMining() override { if (m_turboMining) m_farm.startGPU(); else m_farm.startCPU(); }
void startMining() override { if (m_turboMining) m_farm.startGPU(); else m_farm.startCPU(); onPostStateChanged(); }
/// Stop mining.
/// NOT thread-safe
void stopMining() override { m_farm.stop(); }
@ -229,11 +229,14 @@ protected:
void noteChanged(h256Set const& _filters);
private:
/// Called when Worker is starting.
void startedWorking() override;
/// Do some work. Handles blockchain maintenance and mining.
virtual void doWork();
void doWork() override;
/// Called when Worker is exiting.
virtual void doneWorking();
void doneWorking() override;
/// Magically called when the chain has changed. An import route is provided.
/// Called by either submitWork() or in our main thread through syncBlockQueue().

33
libethereum/Farm.h

@ -61,8 +61,7 @@ public:
*/
void setWork(BlockInfo const& _bi)
{
WriteGuard l(x_work);
ReadGuard l2(x_miners);
WriteGuard l(x_minerWork);
m_header = _bi;
auto p = PoW::package(m_header);
if (p.headerHash == m_work.headerHash)
@ -90,14 +89,14 @@ public:
*/
void stop()
{
WriteGuard l(x_miners);
WriteGuard l(x_minerWork);
m_miners.clear();
m_work.reset();
}
bool isMining() const
{
ReadGuard l(x_miners);
return !m_miners.empty();
return !!m_work;
}
/**
@ -109,7 +108,7 @@ public:
MiningProgress p;
p.ms = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - m_lastStart).count();
{
ReadGuard l2(x_miners);
ReadGuard l2(x_minerWork);
for (auto const& i: m_miners)
p.hashes += i->hashCount();
}
@ -127,7 +126,7 @@ public:
*/
void onSolutionFound(SolutionFound const& _handler) { m_onSolutionFound = _handler; }
WorkPackage work() const { ReadGuard l(x_work); return m_work; }
WorkPackage work() const { ReadGuard l(x_minerWork); return m_work; }
private:
/**
@ -136,19 +135,14 @@ private:
* @param _wp The WorkPackage that the Solution is for.
* @return true iff the solution was good (implying that mining should be .
*/
bool submitProof(Solution const& _s, WorkPackage& _wp, Miner* _m) override
bool submitProof(Solution const& _s, Miner* _m) override
{
ReadGuard l(x_work);
if (_wp.headerHash != m_work.headerHash)
return false;
if (m_onSolutionFound && m_onSolutionFound(_s))
{
ReadGuard l(x_miners);
WriteGuard ul(x_minerWork);
for (std::shared_ptr<Miner> const& m: m_miners)
if (m.get() != _m)
m->setWork();
_wp.reset();
m_work.reset();
return true;
}
@ -161,8 +155,7 @@ private:
template <class MinerType>
bool start()
{
ReadGuard l(x_work);
WriteGuard l2(x_miners);
WriteGuard l(x_minerWork);
if (!m_miners.empty() && !!std::dynamic_pointer_cast<MinerType>(m_miners[0]))
return true;
m_miners.clear();
@ -181,17 +174,15 @@ private:
m_lastStart = std::chrono::steady_clock::now();
}
mutable SharedMutex x_miners;
mutable SharedMutex x_minerWork;
std::vector<std::shared_ptr<Miner>> m_miners;
WorkPackage m_work;
BlockInfo m_header;
mutable SharedMutex x_progress;
mutable MiningProgress m_progress;
std::chrono::steady_clock::time_point m_lastStart;
mutable SharedMutex x_work;
WorkPackage m_work;
BlockInfo m_header;
SolutionFound m_onSolutionFound;
};

Loading…
Cancel
Save