From f10469774789d707aa133793fc6efc20e24d9682 Mon Sep 17 00:00:00 2001 From: Gav Wood Date: Fri, 17 Apr 2015 14:53:22 +0200 Subject: [PATCH] Redo thread safety in Client. Rework some Worker threading stuff. --- libdevcore/Common.cpp | 2 +- libdevcore/Worker.cpp | 20 +++--- libdevcore/Worker.h | 5 +- libdevcrypto/OverlayDB.cpp | 7 --- libdevcrypto/OverlayDB.h | 1 - libethcore/Ethash.cpp | 27 ++++---- libethereum/Client.cpp | 123 +++++++++++++++---------------------- libethereum/Client.h | 11 ++-- libethereum/State.cpp | 2 + 9 files changed, 88 insertions(+), 110 deletions(-) diff --git a/libdevcore/Common.cpp b/libdevcore/Common.cpp index 7cdc433f3..649e79310 100644 --- a/libdevcore/Common.cpp +++ b/libdevcore/Common.cpp @@ -27,7 +27,7 @@ using namespace dev; namespace dev { -char const* Version = "0.9.9"; +char const* Version = "0.9.10"; } diff --git a/libdevcore/Worker.cpp b/libdevcore/Worker.cpp index 8c1fbb9c7..0f30a0aff 100644 --- a/libdevcore/Worker.cpp +++ b/libdevcore/Worker.cpp @@ -43,30 +43,32 @@ void Worker::startWorking(IfRunning _ir) } catch (...) {} cnote << "Spawning" << m_name; m_stop = false; + m_stopped = false; m_work.reset(new thread([&]() { setThreadName(m_name.c_str()); startedWorking(); workLoop(); - m_work->detach(); cnote << "Finishing up worker thread"; doneWorking(); + ETH_GUARDED(x_work) + m_work->detach(); + m_stopped = true; })); } void Worker::stopWorking() { // cnote << "stopWorking for thread" << m_name; - Guard l(x_work); - if (!m_work || !m_work->joinable()) - return; + ETH_GUARDED(x_work) + if (!m_work || !m_work->joinable()) + return; cnote << "Stopping" << m_name; m_stop = true; - try { - m_work->join(); - } - catch (...) {} - m_work.reset(); + while (!m_stopped) + this_thread::sleep_for(chrono::microseconds(50)); + ETH_GUARDED(x_work) + m_work.reset(); cnote << "Stopped" << m_name; } diff --git a/libdevcore/Worker.h b/libdevcore/Worker.h index 287ff6d6f..aad1dfc0e 100644 --- a/libdevcore/Worker.h +++ b/libdevcore/Worker.h @@ -59,7 +59,7 @@ protected: void stopWorking(); /// Returns if worker thread is present. - bool isWorking() const { Guard l(x_work); return !!m_work; } + bool isWorking() const { Guard l(x_work); return !!m_work && m_work->joinable(); } /// Called after thread is started from startWorking(). virtual void startedWorking() {} @@ -83,7 +83,8 @@ private: mutable Mutex x_work; ///< Lock for the network existance. std::unique_ptr m_work; ///< The network thread. - bool m_stop = false; + std::atomic m_stop = {false}; + std::atomic m_stopped = {false}; }; } diff --git a/libdevcrypto/OverlayDB.cpp b/libdevcrypto/OverlayDB.cpp index 4bd698f57..5f8aea667 100644 --- a/libdevcrypto/OverlayDB.cpp +++ b/libdevcrypto/OverlayDB.cpp @@ -34,13 +34,6 @@ OverlayDB::~OverlayDB() cnote << "Closing state DB"; } -void OverlayDB::setDB(ldb::DB* _db, bool _clearOverlay) -{ - m_db = std::shared_ptr(_db); - if (_clearOverlay) - m_over.clear(); -} - void OverlayDB::commit() { if (m_db) diff --git a/libdevcrypto/OverlayDB.h b/libdevcrypto/OverlayDB.h index d027afbd4..7f7736ac1 100644 --- a/libdevcrypto/OverlayDB.h +++ b/libdevcrypto/OverlayDB.h @@ -42,7 +42,6 @@ public: ~OverlayDB(); ldb::DB* db() const { return m_db.get(); } - void setDB(ldb::DB* _db, bool _clearOverlay = true); void commit(); void rollback(); diff --git a/libethcore/Ethash.cpp b/libethcore/Ethash.cpp index 977149e7a..16d17b1e8 100644 --- a/libethcore/Ethash.cpp +++ b/libethcore/Ethash.cpp @@ -295,21 +295,24 @@ void Ethash::GPUMiner::kickOff() void Ethash::GPUMiner::workLoop() { // take local copy of work since it may end up being overwritten by kickOff/pause. - WorkPackage w = work(); - if (!m_miner || m_minerSeed != w.seedHash) - { - m_minerSeed = w.seedHash; + try { + WorkPackage w = work(); + if (!m_miner || m_minerSeed != w.seedHash) + { + m_minerSeed = w.seedHash; - delete m_miner; - m_miner = new ethash_cl_miner; + delete m_miner; + m_miner = new ethash_cl_miner; - auto p = EthashAux::params(m_minerSeed); - auto cb = [&](void* d) { EthashAux::full(m_minerSeed, bytesRef((byte*)d, p.full_size)); }; - m_miner->init(p, cb, 32, s_deviceId); - } + auto p = EthashAux::params(m_minerSeed); + auto cb = [&](void* d) { EthashAux::full(m_minerSeed, bytesRef((byte*)d, p.full_size)); }; + m_miner->init(p, cb, 32, s_deviceId); + } - uint64_t upper64OfBoundary = (uint64_t)(u64)((u256)w.boundary >> 192); - m_miner->search(w.headerHash.data(), upper64OfBoundary, *m_hook); + uint64_t upper64OfBoundary = (uint64_t)(u64)((u256)w.boundary >> 192); + m_miner->search(w.headerHash.data(), upper64OfBoundary, *m_hook); + } + catch (...) {} } void Ethash::GPUMiner::pause() diff --git a/libethereum/Client.cpp b/libethereum/Client.cpp index b9df1cf90..1684ef54d 100644 --- a/libethereum/Client.cpp +++ b/libethereum/Client.cpp @@ -197,16 +197,17 @@ void Client::startedWorking() // Synchronise the state according to the head of the block chain. // TODO: currently it contains keys for *all* blocks. Make it remove old ones. cdebug << "startedWorking()"; - WriteGuard l(x_stateDB); cdebug << m_bc.number() << m_bc.currentHash(); - cdebug << "Pre:" << m_preMine.info(); cdebug << "Post:" << m_postMine.info(); cdebug << "Pre:" << m_preMine.info().headerHash(WithoutNonce) << "; Post:" << m_postMine.info().headerHash(WithoutNonce); - m_preMine.sync(m_bc); - m_postMine = m_preMine; + ETH_WRITE_GUARDED(x_preMine) + m_preMine.sync(m_bc); + ETH_WRITE_GUARDED(x_postMine) + ETH_READ_GUARDED(x_preMine) + m_postMine = m_preMine; cdebug << "Pre:" << m_preMine.info(); cdebug << "Post:" << m_postMine.info(); @@ -217,13 +218,18 @@ void Client::doneWorking() { // Synchronise the state according to the head of the block chain. // TODO: currently it contains keys for *all* blocks. Make it remove old ones. - WriteGuard l(x_stateDB); - m_preMine.sync(m_bc); - m_postMine = m_preMine; + ETH_WRITE_GUARDED(x_preMine) + m_preMine.sync(m_bc); + ETH_WRITE_GUARDED(x_postMine) + ETH_READ_GUARDED(x_preMine) + m_postMine = m_preMine; } void Client::killChain() { + WriteGuard l(x_postMine); + WriteGuard l2(x_preMine); + bool wasMining = isMining(); if (wasMining) stopMining(); @@ -235,8 +241,8 @@ void Client::killChain() m_preMine = State(); m_postMine = State(); +// ETH_WRITE_GUARDED(x_stateDB) // no point doing this yet since we can't control where else it's open yet. { - WriteGuard l(x_stateDB); m_stateDB = OverlayDB(); m_stateDB = State::openDB(Defaults::dbPath(), WithExisting::Kill); } @@ -258,15 +264,16 @@ void Client::killChain() void Client::clearPending() { h256Set changeds; + ETH_WRITE_GUARDED(x_postMine) { - WriteGuard l(x_stateDB); if (!m_postMine.pending().size()) return; // for (unsigned i = 0; i < m_postMine.pending().size(); ++i) // appendFromNewPending(m_postMine.logBloom(i), changeds); changeds.insert(PendingChangedFilter); m_tq.clear(); - m_postMine = m_preMine; + ETH_READ_GUARDED(x_preMine) + m_postMine = m_preMine; } startMining(); @@ -364,29 +371,6 @@ std::list Client::miningHistory() return ret; } -/*void Client::setupState(State& _s) -{ - { - ReadGuard l(x_stateDB); - cwork << "SETUP MINE"; - _s = m_postMine; - } - if (m_paranoia) - { - if (_s.amIJustParanoid(m_bc)) - { - cnote << "I'm just paranoid. Block is fine."; - _s.commitToMine(m_bc); - } - else - { - cwarn << "I'm not just paranoid. Cannot mine. Please file a bug report."; - } - } - else - _s.commitToMine(m_bc); -}*/ - ExecutionResult Client::call(Address _dest, bytes const& _data, u256 _gas, u256 _value, u256 _gasPrice, Address const& _from) { ExecutionResult ret; @@ -394,11 +378,9 @@ ExecutionResult Client::call(Address _dest, bytes const& _data, u256 _gas, u256 { State temp; // cdebug << "Nonce at " << toAddress(_secret) << " pre:" << m_preMine.transactionsFrom(toAddress(_secret)) << " post:" << m_postMine.transactionsFrom(toAddress(_secret)); - { - ReadGuard l(x_stateDB); + ETH_READ_GUARDED(x_postMine) temp = m_postMine; - temp.addBalance(_from, _value + _gasPrice * _gas); - } + temp.addBalance(_from, _value + _gasPrice * _gas); Executive e(temp, LastHashes(), 0); if (!e.call(_dest, _dest, _from, _value, _gasPrice, &_data, _gas, _from)) e.go(); @@ -420,10 +402,11 @@ bool Client::submitWork(ProofOfWork::Solution const& _solution) { bytes newBlock; { - WriteGuard l(x_stateDB); + WriteGuard l(x_postMine); if (!m_postMine.completeMine(_solution)) return false; newBlock = m_postMine.blockData(); + // OPTIMISE: very inefficient to not utilise the existing OverlayDB in m_postMine that contains all trie changes. } m_bq.import(&newBlock, m_bc, true); /* @@ -439,13 +422,9 @@ void Client::syncBlockQueue() cwork << "BQ ==> CHAIN ==> STATE"; { - WriteGuard l(x_stateDB); - OverlayDB db = m_stateDB; - ETH_WRITE_UNGUARDED(x_stateDB) - tie(ir.first, ir.second, m_syncBlockQueue) = m_bc.sync(m_bq, db, 100); + tie(ir.first, ir.second, m_syncBlockQueue) = m_bc.sync(m_bq, m_stateDB, 100); if (ir.first.empty()) return; - m_stateDB = db; } onChainChanged(ir); } @@ -458,25 +437,26 @@ void Client::syncTransactionQueue() h256Set changeds; TransactionReceipts newPendingReceipts; - ETH_WRITE_GUARDED(x_stateDB) + ETH_WRITE_GUARDED(x_postMine) newPendingReceipts = m_postMine.sync(m_bc, m_tq, *m_gp); - if (newPendingReceipts.size()) - { + if (newPendingReceipts.empty()) + return; + + ETH_READ_GUARDED(x_postMine) for (size_t i = 0; i < newPendingReceipts.size(); i++) appendFromNewPending(newPendingReceipts[i], changeds, m_postMine.pending()[i].sha3()); - changeds.insert(PendingChangedFilter); + changeds.insert(PendingChangedFilter); - // TODO: Tell farm about new transaction (i.e. restartProofOfWork mining). - onPostStateChanged(); + // TODO: Tell farm about new transaction (i.e. restartProofOfWork mining). + onPostStateChanged(); - // Tell watches about the new transactions. - noteChanged(changeds); + // Tell watches about the new transactions. + noteChanged(changeds); - // Tell network about the new transactions. - if (auto h = m_host.lock()) - h->noteNewTransactions(); - } + // Tell network about the new transactions. + if (auto h = m_host.lock()) + h->noteNewTransactions(); } void Client::onChainChanged(ImportRoute const& _ir) @@ -514,18 +494,21 @@ void Client::onChainChanged(ImportRoute const& _ir) // RESTART MINING // LOCKS REALLY NEEDED? - ETH_WRITE_GUARDED(x_stateDB) - if (m_preMine.sync(m_bc) || m_postMine.address() != m_preMine.address()) - { - if (isMining()) - cnote << "New block on chain."; + bool preChanged = false; + ETH_WRITE_GUARDED(x_preMine) + preChanged = m_preMine.sync(m_bc); + if (preChanged || m_postMine.address() != m_preMine.address()) + { + if (isMining()) + cnote << "New block on chain."; - m_postMine = m_preMine; - changeds.insert(PendingChangedFilter); + ETH_WRITE_GUARDED(x_postMine) + ETH_READ_GUARDED(x_preMine) + m_postMine = m_preMine; + changeds.insert(PendingChangedFilter); - ETH_WRITE_UNGUARDED(x_stateDB) - onPostStateChanged(); - } + onPostStateChanged(); + } noteChanged(changeds); } @@ -536,7 +519,7 @@ void Client::onPostStateChanged() if (isMining()) { { - WriteGuard l(x_stateDB); + WriteGuard l(x_postMine); m_postMine.commitToMine(m_bc); m_miningInfo = m_postMine.info(); } @@ -606,15 +589,13 @@ void Client::checkWatchGarbage() { // watches garbage collection vector toUninstall; - { - Guard l(x_filtersWatches); + ETH_GUARDED(x_filtersWatches) for (auto key: keysOf(m_watches)) if (m_watches[key].lastPoll != chrono::system_clock::time_point::max() && chrono::system_clock::now() - m_watches[key].lastPoll > chrono::seconds(20)) { toUninstall.push_back(key); cnote << "GC: Uninstall" << key << "(" << chrono::duration_cast(chrono::system_clock::now() - m_watches[key].lastPoll).count() << "s old)"; } - } for (auto i: toUninstall) uninstallWatch(i); @@ -627,7 +608,6 @@ void Client::checkWatchGarbage() State Client::asOf(h256 const& _block) const { - ReadGuard l(x_stateDB); return State(m_stateDB, bc(), _block); } @@ -638,19 +618,16 @@ void Client::prepareForTransaction() State Client::state(unsigned _txi, h256 _block) const { - ReadGuard l(x_stateDB); return State(m_stateDB, m_bc, _block).fromPending(_txi); } eth::State Client::state(h256 _block) const { - ReadGuard l(x_stateDB); return State(m_stateDB, m_bc, _block); } eth::State Client::state(unsigned _txi) const { - ReadGuard l(x_stateDB); return m_postMine.fromPending(_txi); } diff --git a/libethereum/Client.h b/libethereum/Client.h index 1dfa45d7e..aa2dd61b8 100644 --- a/libethereum/Client.h +++ b/libethereum/Client.h @@ -144,7 +144,7 @@ public: dev::eth::State state(unsigned _txi) const; /// Get the object representing the current state of Ethereum. - dev::eth::State postState() const { ReadGuard l(x_stateDB); return m_postMine; } + dev::eth::State postState() const { ReadGuard l(x_postMine); return m_postMine; } /// Get the object representing the current canonical blockchain. CanonBlockChain const& blockChain() const { return m_bc; } /// Get some information on the block queue. @@ -152,7 +152,7 @@ public: // Mining stuff: - void setAddress(Address _us) { WriteGuard l(x_stateDB); m_preMine.setAddress(_us); } + void setAddress(Address _us) { WriteGuard l(x_preMine); m_preMine.setAddress(_us); } /// Check block validity prior to mining. bool miningParanoia() const { return m_paranoia; } @@ -214,8 +214,8 @@ protected: /// Works properly with LatestBlock and PendingBlock. using ClientBase::asOf; virtual State asOf(h256 const& _block) const override; - virtual State preMine() const override { ReadGuard l(x_stateDB); return m_preMine; } - virtual State postMine() const override { ReadGuard l(x_stateDB); return m_postMine; } + virtual State preMine() const override { ReadGuard l(x_preMine); return m_preMine; } + virtual State postMine() const override { ReadGuard l(x_postMine); return m_postMine; } virtual void prepareForTransaction() override; /// Collate the changed filters for the bloom filter of the given pending transaction. @@ -271,9 +271,10 @@ private: BlockQueue m_bq; ///< Maintains a list of incoming blocks not yet on the blockchain (to be imported). std::shared_ptr m_gp; ///< The gas pricer. - mutable SharedMutex x_stateDB; ///< Lock on the state DB, effectively a lock on m_postMine. OverlayDB m_stateDB; ///< Acts as the central point for the state database, so multiple States can share it. + mutable SharedMutex x_preMine; ///< Lock on the OverlayDB and other attributes of m_preMine. State m_preMine; ///< The present state of the client. + mutable SharedMutex x_postMine; ///< Lock on the OverlayDB and other attributes of m_postMine. State m_postMine; ///< The state of the client which we're mining (i.e. it'll have all the rewards added). BlockInfo m_miningInfo; ///< The header we're attempting to mine on (derived from m_postMine). diff --git a/libethereum/State.cpp b/libethereum/State.cpp index 65f267b0f..ec5023ff8 100644 --- a/libethereum/State.cpp +++ b/libethereum/State.cpp @@ -690,7 +690,9 @@ void State::cleanup(bool _fullCommit) paranoia("immediately before database commit", true); // Commit the new trie to disk. + cnote << "Commiting to disk..."; m_db.commit(); + cnote << "Committed."; paranoia("immediately after database commit", true); m_previousBlock = m_currentBlock;