Browse Source

Redo thread safety in Client. Rework some Worker threading stuff.

cl-refactor
Gav Wood 10 years ago
parent
commit
f104697747
  1. 2
      libdevcore/Common.cpp
  2. 20
      libdevcore/Worker.cpp
  3. 5
      libdevcore/Worker.h
  4. 7
      libdevcrypto/OverlayDB.cpp
  5. 1
      libdevcrypto/OverlayDB.h
  6. 27
      libethcore/Ethash.cpp
  7. 123
      libethereum/Client.cpp
  8. 11
      libethereum/Client.h
  9. 2
      libethereum/State.cpp

2
libdevcore/Common.cpp

@ -27,7 +27,7 @@ using namespace dev;
namespace dev
{
char const* Version = "0.9.9";
char const* Version = "0.9.10";
}

20
libdevcore/Worker.cpp

@ -43,30 +43,32 @@ void Worker::startWorking(IfRunning _ir)
} catch (...) {}
cnote << "Spawning" << m_name;
m_stop = false;
m_stopped = false;
m_work.reset(new thread([&]()
{
setThreadName(m_name.c_str());
startedWorking();
workLoop();
m_work->detach();
cnote << "Finishing up worker thread";
doneWorking();
ETH_GUARDED(x_work)
m_work->detach();
m_stopped = true;
}));
}
void Worker::stopWorking()
{
// cnote << "stopWorking for thread" << m_name;
Guard l(x_work);
if (!m_work || !m_work->joinable())
return;
ETH_GUARDED(x_work)
if (!m_work || !m_work->joinable())
return;
cnote << "Stopping" << m_name;
m_stop = true;
try {
m_work->join();
}
catch (...) {}
m_work.reset();
while (!m_stopped)
this_thread::sleep_for(chrono::microseconds(50));
ETH_GUARDED(x_work)
m_work.reset();
cnote << "Stopped" << m_name;
}

5
libdevcore/Worker.h

@ -59,7 +59,7 @@ protected:
void stopWorking();
/// Returns if worker thread is present.
bool isWorking() const { Guard l(x_work); return !!m_work; }
bool isWorking() const { Guard l(x_work); return !!m_work && m_work->joinable(); }
/// Called after thread is started from startWorking().
virtual void startedWorking() {}
@ -83,7 +83,8 @@ private:
mutable Mutex x_work; ///< Lock for the network existance.
std::unique_ptr<std::thread> m_work; ///< The network thread.
bool m_stop = false;
std::atomic<bool> m_stop = {false};
std::atomic<bool> m_stopped = {false};
};
}

7
libdevcrypto/OverlayDB.cpp

@ -34,13 +34,6 @@ OverlayDB::~OverlayDB()
cnote << "Closing state DB";
}
void OverlayDB::setDB(ldb::DB* _db, bool _clearOverlay)
{
m_db = std::shared_ptr<ldb::DB>(_db);
if (_clearOverlay)
m_over.clear();
}
void OverlayDB::commit()
{
if (m_db)

1
libdevcrypto/OverlayDB.h

@ -42,7 +42,6 @@ public:
~OverlayDB();
ldb::DB* db() const { return m_db.get(); }
void setDB(ldb::DB* _db, bool _clearOverlay = true);
void commit();
void rollback();

27
libethcore/Ethash.cpp

@ -295,21 +295,24 @@ void Ethash::GPUMiner::kickOff()
void Ethash::GPUMiner::workLoop()
{
// take local copy of work since it may end up being overwritten by kickOff/pause.
WorkPackage w = work();
if (!m_miner || m_minerSeed != w.seedHash)
{
m_minerSeed = w.seedHash;
try {
WorkPackage w = work();
if (!m_miner || m_minerSeed != w.seedHash)
{
m_minerSeed = w.seedHash;
delete m_miner;
m_miner = new ethash_cl_miner;
delete m_miner;
m_miner = new ethash_cl_miner;
auto p = EthashAux::params(m_minerSeed);
auto cb = [&](void* d) { EthashAux::full(m_minerSeed, bytesRef((byte*)d, p.full_size)); };
m_miner->init(p, cb, 32, s_deviceId);
}
auto p = EthashAux::params(m_minerSeed);
auto cb = [&](void* d) { EthashAux::full(m_minerSeed, bytesRef((byte*)d, p.full_size)); };
m_miner->init(p, cb, 32, s_deviceId);
}
uint64_t upper64OfBoundary = (uint64_t)(u64)((u256)w.boundary >> 192);
m_miner->search(w.headerHash.data(), upper64OfBoundary, *m_hook);
uint64_t upper64OfBoundary = (uint64_t)(u64)((u256)w.boundary >> 192);
m_miner->search(w.headerHash.data(), upper64OfBoundary, *m_hook);
}
catch (...) {}
}
void Ethash::GPUMiner::pause()

123
libethereum/Client.cpp

@ -197,16 +197,17 @@ void Client::startedWorking()
// Synchronise the state according to the head of the block chain.
// TODO: currently it contains keys for *all* blocks. Make it remove old ones.
cdebug << "startedWorking()";
WriteGuard l(x_stateDB);
cdebug << m_bc.number() << m_bc.currentHash();
cdebug << "Pre:" << m_preMine.info();
cdebug << "Post:" << m_postMine.info();
cdebug << "Pre:" << m_preMine.info().headerHash(WithoutNonce) << "; Post:" << m_postMine.info().headerHash(WithoutNonce);
m_preMine.sync(m_bc);
m_postMine = m_preMine;
ETH_WRITE_GUARDED(x_preMine)
m_preMine.sync(m_bc);
ETH_WRITE_GUARDED(x_postMine)
ETH_READ_GUARDED(x_preMine)
m_postMine = m_preMine;
cdebug << "Pre:" << m_preMine.info();
cdebug << "Post:" << m_postMine.info();
@ -217,13 +218,18 @@ void Client::doneWorking()
{
// Synchronise the state according to the head of the block chain.
// TODO: currently it contains keys for *all* blocks. Make it remove old ones.
WriteGuard l(x_stateDB);
m_preMine.sync(m_bc);
m_postMine = m_preMine;
ETH_WRITE_GUARDED(x_preMine)
m_preMine.sync(m_bc);
ETH_WRITE_GUARDED(x_postMine)
ETH_READ_GUARDED(x_preMine)
m_postMine = m_preMine;
}
void Client::killChain()
{
WriteGuard l(x_postMine);
WriteGuard l2(x_preMine);
bool wasMining = isMining();
if (wasMining)
stopMining();
@ -235,8 +241,8 @@ void Client::killChain()
m_preMine = State();
m_postMine = State();
// ETH_WRITE_GUARDED(x_stateDB) // no point doing this yet since we can't control where else it's open yet.
{
WriteGuard l(x_stateDB);
m_stateDB = OverlayDB();
m_stateDB = State::openDB(Defaults::dbPath(), WithExisting::Kill);
}
@ -258,15 +264,16 @@ void Client::killChain()
void Client::clearPending()
{
h256Set changeds;
ETH_WRITE_GUARDED(x_postMine)
{
WriteGuard l(x_stateDB);
if (!m_postMine.pending().size())
return;
// for (unsigned i = 0; i < m_postMine.pending().size(); ++i)
// appendFromNewPending(m_postMine.logBloom(i), changeds);
changeds.insert(PendingChangedFilter);
m_tq.clear();
m_postMine = m_preMine;
ETH_READ_GUARDED(x_preMine)
m_postMine = m_preMine;
}
startMining();
@ -364,29 +371,6 @@ std::list<MineInfo> Client::miningHistory()
return ret;
}
/*void Client::setupState(State& _s)
{
{
ReadGuard l(x_stateDB);
cwork << "SETUP MINE";
_s = m_postMine;
}
if (m_paranoia)
{
if (_s.amIJustParanoid(m_bc))
{
cnote << "I'm just paranoid. Block is fine.";
_s.commitToMine(m_bc);
}
else
{
cwarn << "I'm not just paranoid. Cannot mine. Please file a bug report.";
}
}
else
_s.commitToMine(m_bc);
}*/
ExecutionResult Client::call(Address _dest, bytes const& _data, u256 _gas, u256 _value, u256 _gasPrice, Address const& _from)
{
ExecutionResult ret;
@ -394,11 +378,9 @@ ExecutionResult Client::call(Address _dest, bytes const& _data, u256 _gas, u256
{
State temp;
// cdebug << "Nonce at " << toAddress(_secret) << " pre:" << m_preMine.transactionsFrom(toAddress(_secret)) << " post:" << m_postMine.transactionsFrom(toAddress(_secret));
{
ReadGuard l(x_stateDB);
ETH_READ_GUARDED(x_postMine)
temp = m_postMine;
temp.addBalance(_from, _value + _gasPrice * _gas);
}
temp.addBalance(_from, _value + _gasPrice * _gas);
Executive e(temp, LastHashes(), 0);
if (!e.call(_dest, _dest, _from, _value, _gasPrice, &_data, _gas, _from))
e.go();
@ -420,10 +402,11 @@ bool Client::submitWork(ProofOfWork::Solution const& _solution)
{
bytes newBlock;
{
WriteGuard l(x_stateDB);
WriteGuard l(x_postMine);
if (!m_postMine.completeMine<ProofOfWork>(_solution))
return false;
newBlock = m_postMine.blockData();
// OPTIMISE: very inefficient to not utilise the existing OverlayDB in m_postMine that contains all trie changes.
}
m_bq.import(&newBlock, m_bc, true);
/*
@ -439,13 +422,9 @@ void Client::syncBlockQueue()
cwork << "BQ ==> CHAIN ==> STATE";
{
WriteGuard l(x_stateDB);
OverlayDB db = m_stateDB;
ETH_WRITE_UNGUARDED(x_stateDB)
tie(ir.first, ir.second, m_syncBlockQueue) = m_bc.sync(m_bq, db, 100);
tie(ir.first, ir.second, m_syncBlockQueue) = m_bc.sync(m_bq, m_stateDB, 100);
if (ir.first.empty())
return;
m_stateDB = db;
}
onChainChanged(ir);
}
@ -458,25 +437,26 @@ void Client::syncTransactionQueue()
h256Set changeds;
TransactionReceipts newPendingReceipts;
ETH_WRITE_GUARDED(x_stateDB)
ETH_WRITE_GUARDED(x_postMine)
newPendingReceipts = m_postMine.sync(m_bc, m_tq, *m_gp);
if (newPendingReceipts.size())
{
if (newPendingReceipts.empty())
return;
ETH_READ_GUARDED(x_postMine)
for (size_t i = 0; i < newPendingReceipts.size(); i++)
appendFromNewPending(newPendingReceipts[i], changeds, m_postMine.pending()[i].sha3());
changeds.insert(PendingChangedFilter);
changeds.insert(PendingChangedFilter);
// TODO: Tell farm about new transaction (i.e. restartProofOfWork mining).
onPostStateChanged();
// TODO: Tell farm about new transaction (i.e. restartProofOfWork mining).
onPostStateChanged();
// Tell watches about the new transactions.
noteChanged(changeds);
// Tell watches about the new transactions.
noteChanged(changeds);
// Tell network about the new transactions.
if (auto h = m_host.lock())
h->noteNewTransactions();
}
// Tell network about the new transactions.
if (auto h = m_host.lock())
h->noteNewTransactions();
}
void Client::onChainChanged(ImportRoute const& _ir)
@ -514,18 +494,21 @@ void Client::onChainChanged(ImportRoute const& _ir)
// RESTART MINING
// LOCKS REALLY NEEDED?
ETH_WRITE_GUARDED(x_stateDB)
if (m_preMine.sync(m_bc) || m_postMine.address() != m_preMine.address())
{
if (isMining())
cnote << "New block on chain.";
bool preChanged = false;
ETH_WRITE_GUARDED(x_preMine)
preChanged = m_preMine.sync(m_bc);
if (preChanged || m_postMine.address() != m_preMine.address())
{
if (isMining())
cnote << "New block on chain.";
m_postMine = m_preMine;
changeds.insert(PendingChangedFilter);
ETH_WRITE_GUARDED(x_postMine)
ETH_READ_GUARDED(x_preMine)
m_postMine = m_preMine;
changeds.insert(PendingChangedFilter);
ETH_WRITE_UNGUARDED(x_stateDB)
onPostStateChanged();
}
onPostStateChanged();
}
noteChanged(changeds);
}
@ -536,7 +519,7 @@ void Client::onPostStateChanged()
if (isMining())
{
{
WriteGuard l(x_stateDB);
WriteGuard l(x_postMine);
m_postMine.commitToMine(m_bc);
m_miningInfo = m_postMine.info();
}
@ -606,15 +589,13 @@ void Client::checkWatchGarbage()
{
// watches garbage collection
vector<unsigned> toUninstall;
{
Guard l(x_filtersWatches);
ETH_GUARDED(x_filtersWatches)
for (auto key: keysOf(m_watches))
if (m_watches[key].lastPoll != chrono::system_clock::time_point::max() && chrono::system_clock::now() - m_watches[key].lastPoll > chrono::seconds(20))
{
toUninstall.push_back(key);
cnote << "GC: Uninstall" << key << "(" << chrono::duration_cast<chrono::seconds>(chrono::system_clock::now() - m_watches[key].lastPoll).count() << "s old)";
}
}
for (auto i: toUninstall)
uninstallWatch(i);
@ -627,7 +608,6 @@ void Client::checkWatchGarbage()
State Client::asOf(h256 const& _block) const
{
ReadGuard l(x_stateDB);
return State(m_stateDB, bc(), _block);
}
@ -638,19 +618,16 @@ void Client::prepareForTransaction()
State Client::state(unsigned _txi, h256 _block) const
{
ReadGuard l(x_stateDB);
return State(m_stateDB, m_bc, _block).fromPending(_txi);
}
eth::State Client::state(h256 _block) const
{
ReadGuard l(x_stateDB);
return State(m_stateDB, m_bc, _block);
}
eth::State Client::state(unsigned _txi) const
{
ReadGuard l(x_stateDB);
return m_postMine.fromPending(_txi);
}

11
libethereum/Client.h

@ -144,7 +144,7 @@ public:
dev::eth::State state(unsigned _txi) const;
/// Get the object representing the current state of Ethereum.
dev::eth::State postState() const { ReadGuard l(x_stateDB); return m_postMine; }
dev::eth::State postState() const { ReadGuard l(x_postMine); return m_postMine; }
/// Get the object representing the current canonical blockchain.
CanonBlockChain const& blockChain() const { return m_bc; }
/// Get some information on the block queue.
@ -152,7 +152,7 @@ public:
// Mining stuff:
void setAddress(Address _us) { WriteGuard l(x_stateDB); m_preMine.setAddress(_us); }
void setAddress(Address _us) { WriteGuard l(x_preMine); m_preMine.setAddress(_us); }
/// Check block validity prior to mining.
bool miningParanoia() const { return m_paranoia; }
@ -214,8 +214,8 @@ protected:
/// Works properly with LatestBlock and PendingBlock.
using ClientBase::asOf;
virtual State asOf(h256 const& _block) const override;
virtual State preMine() const override { ReadGuard l(x_stateDB); return m_preMine; }
virtual State postMine() const override { ReadGuard l(x_stateDB); return m_postMine; }
virtual State preMine() const override { ReadGuard l(x_preMine); return m_preMine; }
virtual State postMine() const override { ReadGuard l(x_postMine); return m_postMine; }
virtual void prepareForTransaction() override;
/// Collate the changed filters for the bloom filter of the given pending transaction.
@ -271,9 +271,10 @@ private:
BlockQueue m_bq; ///< Maintains a list of incoming blocks not yet on the blockchain (to be imported).
std::shared_ptr<GasPricer> m_gp; ///< The gas pricer.
mutable SharedMutex x_stateDB; ///< Lock on the state DB, effectively a lock on m_postMine.
OverlayDB m_stateDB; ///< Acts as the central point for the state database, so multiple States can share it.
mutable SharedMutex x_preMine; ///< Lock on the OverlayDB and other attributes of m_preMine.
State m_preMine; ///< The present state of the client.
mutable SharedMutex x_postMine; ///< Lock on the OverlayDB and other attributes of m_postMine.
State m_postMine; ///< The state of the client which we're mining (i.e. it'll have all the rewards added).
BlockInfo m_miningInfo; ///< The header we're attempting to mine on (derived from m_postMine).

2
libethereum/State.cpp

@ -690,7 +690,9 @@ void State::cleanup(bool _fullCommit)
paranoia("immediately before database commit", true);
// Commit the new trie to disk.
cnote << "Commiting to disk...";
m_db.commit();
cnote << "Committed.";
paranoia("immediately after database commit", true);
m_previousBlock = m_currentBlock;

Loading…
Cancel
Save