diff --git a/libethereum/Client.cpp b/libethereum/Client.cpp index 2e9b035bd..ceb742bf7 100644 --- a/libethereum/Client.cpp +++ b/libethereum/Client.cpp @@ -74,7 +74,21 @@ Client::Client(std::string const& _clientVersion, Address _us, std::string const if (_dbPath.size()) Defaults::setDBPath(_dbPath); m_vc.setOk(); - work(true); + work(); +} + +Client::~Client() +{ + if (m_work) + { + if (m_workState.load(std::memory_order_acquire) == Active) + m_workState.store(Deleting, std::memory_order_release); + while (m_workState.load(std::memory_order_acquire) != Deleted) + this_thread::sleep_for(chrono::milliseconds(10)); + m_work->join(); + m_work.reset(nullptr); + } + stopNetwork(); } void Client::ensureWorking() @@ -98,23 +112,9 @@ void Client::ensureWorking() })); } -Client::~Client() -{ - if (m_work) - { - if (m_workState.load(std::memory_order_acquire) == Active) - m_workState.store(Deleting, std::memory_order_release); - while (m_workState.load(std::memory_order_acquire) != Deleted) - this_thread::sleep_for(chrono::milliseconds(10)); - m_work->join(); - m_work.reset(nullptr); - } - stopNetwork(); -} - void Client::flushTransactions() { - work(true); + work(); } void Client::clearPending() @@ -127,6 +127,7 @@ void Client::clearPending() appendFromNewPending(m_postMine.bloom(i), changeds); changeds.insert(PendingChangedFilter); m_postMine = m_preMine; + m_miningStatus = Preparing; noteChanged(changeds); } @@ -302,15 +303,33 @@ void Client::connect(std::string const& _seedHost, unsigned short _port) void Client::startMining() { - ensureWorking(); + static const char* c_threadName = "miner"; + + if (!m_workMine) + m_workMine.reset(new thread([&]() + { + setThreadName(c_threadName); + m_workMineState.store(Active, std::memory_order_release); + m_miningStatus = Preparing; + while (m_workMineState.load(std::memory_order_acquire) != Deleting) + workMine(); + m_workMineState.store(Deleted, std::memory_order_release); + })); - m_doMine = true; - m_restartMining = true; + ensureWorking(); } void Client::stopMining() { - m_doMine = false; + if (m_workMine) + { + if (m_workMineState.load(std::memory_order_acquire) == Active) + m_workMineState.store(Deleting, std::memory_order_release); + while (m_workMineState.load(std::memory_order_acquire) != Deleted) + this_thread::sleep_for(chrono::milliseconds(10)); + m_workMine->join(); + } + m_workMine.reset(nullptr); } void Client::transact(Secret _secret, u256 _value, Address _dest, bytes const& _data, u256 _gas, u256 _gasPrice) @@ -404,85 +423,96 @@ void Client::workNet() this_thread::sleep_for(chrono::milliseconds(1)); } -void Client::work(bool _justQueue) +void Client::workMine() { - cworkin << "WORK"; - h256Set changeds; - // Do some mining. - if (!_justQueue && (m_pendingCount || m_forceMining)) + if ((m_pendingCount || m_forceMining) && m_miningStatus != Mined) { - // TODO: Separate "Miner" object. - if (m_doMine) + if (m_miningStatus == Preparing) { - if (m_restartMining) + m_miningStatus = Mining; + { + ReadGuard l(x_stateDB); + m_mineState = m_postMine; + } + + { + Guard l(x_mineProgress); m_mineProgress.best = (double)-1; m_mineProgress.hashes = 0; m_mineProgress.ms = 0; - WriteGuard l(x_stateDB); - if (m_paranoia) + } + + if (m_paranoia) + { + if (m_mineState.amIJustParanoid(m_bc)) { - if (m_postMine.amIJustParanoid(m_bc)) - { - cnote << "I'm just paranoid. Block is fine."; - m_postMine.commitToMine(m_bc); - } - else - { - cwarn << "I'm not just paranoid. Cannot mine. Please file a bug report."; - m_doMine = false; - } + cnote << "I'm just paranoid. Block is fine."; + m_mineState.commitToMine(m_bc); } else - m_postMine.commitToMine(m_bc); + { + cwarn << "I'm not just paranoid. Cannot mine. Please file a bug report."; + } } - m_restartMining = false; + else + m_mineState.commitToMine(m_bc); } - if (m_doMine) - { - cwork << "MINE"; + cwork << "MINE"; - // Mine for a while. - MineInfo mineInfo = m_postMine.mine(100, m_turboMining); + // Mine for a while. + MineInfo mineInfo = m_mineState.mine(100, m_turboMining); + { + Guard l(x_mineProgress); m_mineProgress.best = min(m_mineProgress.best, mineInfo.best); m_mineProgress.current = mineInfo.best; m_mineProgress.requirement = mineInfo.requirement; m_mineProgress.ms += 100; m_mineProgress.hashes += mineInfo.hashes; - WriteGuard l(x_stateDB); m_mineHistory.push_back(mineInfo); - if (mineInfo.completed) + } + if (mineInfo.completed) + { + // Import block. { - // Import block. + // Must lock stateDB here since we're actually pushing out to the database. + WriteGuard l(x_stateDB); cwork << "COMPLETE MINE"; - m_postMine.completeMine(); - cwork << "CHAIN <== postSTATE"; - h256s hs = m_bc.attemptImport(m_postMine.blockData(), m_stateDB); - if (hs.size()) - { - for (auto h: hs) - appendFromNewBlock(h, changeds); - changeds.insert(ChainChangedFilter); - //changeds.insert(PendingChangedFilter); // if we mined the new block, then we've probably reset the pending transactions. - } + m_mineState.completeMine(); } - } - else - { - cwork << "SLEEP"; - this_thread::sleep_for(chrono::milliseconds(100)); + m_miningStatus = Mined; } } else { - cwork << "SLEEP"; this_thread::sleep_for(chrono::milliseconds(100)); } +} + +void Client::work() +{ + cworkin << "WORK"; + h256Set changeds; + + if (m_miningStatus == Mined) + { + cwork << "CHAIN <== postSTATE"; + h256s hs = m_bc.attemptImport(m_mineState.blockData(), m_stateDB); + if (hs.size()) + { + for (auto h: hs) + appendFromNewBlock(h, changeds); + changeds.insert(ChainChangedFilter); + //changeds.insert(PendingChangedFilter); // if we mined the new block, then we've probably reset the pending transactions. + } + m_miningStatus = Preparing; + } + // Synchronise state to block chain. // This should remove any transactions on our queue that are included within our state. // It also guarantees that the state reflects the longest (valid!) chain on the block chain. @@ -507,13 +537,14 @@ void Client::work(bool _justQueue) if (newBlocks.size()) m_stateDB = db; + bool rsm = false; + cwork << "preSTATE <== CHAIN"; if (m_preMine.sync(m_bc) || m_postMine.address() != m_preMine.address()) { - if (m_doMine) - cnote << "New block on chain: Restarting mining operation."; - m_restartMining = true; // need to re-commit to mine. + cnote << "New block on chain: Restarting mining operation."; m_postMine = m_preMine; + rsm = true; changeds.insert(PendingChangedFilter); } @@ -526,11 +557,13 @@ void Client::work(bool _justQueue) appendFromNewPending(i, changeds); changeds.insert(PendingChangedFilter); - if (m_doMine) - cnote << "Additional transaction ready: Restarting mining operation."; - m_restartMining = true; + cnote << "Additional transaction ready: Restarting mining operation."; + rsm = true; } m_pendingCount = m_postMine.pending().size(); + + if (rsm) + m_miningStatus = Preparing; } cwork << "noteChanged" << changeds.size() << "items"; diff --git a/libethereum/Client.h b/libethereum/Client.h index f029e1d7f..ac0d07199 100644 --- a/libethereum/Client.h +++ b/libethereum/Client.h @@ -286,17 +286,19 @@ public: /// Get the coinbase address. Address address() const { return m_preMine.address(); } /// Start mining. + /// NOT thread-safe void startMining(); /// Stop mining. + /// NOT thread-safe void stopMining(); /// Are we mining now? - bool isMining() { return m_doMine; } + bool isMining() { return !!m_workMine; } /// Register a callback for information concerning mining. /// This callback will be in an arbitrary thread, blocking progress. JUST COPY THE DATA AND GET OUT. /// Check the progress of the mining. - MineProgress miningProgress() const { return m_mineProgress; } + MineProgress miningProgress() const { Guard l(x_mineProgress); return m_mineProgress; } /// Get and clear the mining history. - std::list miningHistory() { auto ret = m_mineHistory; m_mineHistory.clear(); return ret; } + std::list miningHistory() { Guard l(x_mineProgress); auto ret = m_mineHistory; m_mineHistory.clear(); return ret; } bool forceMining() const { return m_forceMining; } void setForceMining(bool _enable) { m_forceMining = _enable; } @@ -313,11 +315,14 @@ private: /// Do some work. Handles blockchain maintenance and mining. /// @param _justQueue If true will only processing the transaction queues. - void work(bool _justQueue = false); + void work(); /// Do some work on the network. void workNet(); + /// Do some work on the mining. + void workMine(); + /// Collate the changed filters for the bloom filter of the given pending transaction. /// Insert any filters that are activated into @a o_changed. void appendFromNewPending(h256 _pendingTransactionBloom, h256Set& o_changed) const; @@ -341,7 +346,8 @@ private: BlockChain m_bc; ///< Maintains block database. TransactionQueue m_tq; ///< Maintains a list of incoming transactions not yet in a block on the blockchain. BlockQueue m_bq; ///< Maintains a list of incoming blocks not yet on the blockchain (to be imported). - mutable boost::shared_mutex x_stateDB; // TODO: remove in favour of copying m_stateDB as required and thread-safing/copying State. Have a good think about what state objects there should be. Probably want 4 (pre, post, mining, user-visible). + // TODO: remove in favour of copying m_stateDB as required and thread-safing/copying State. Have a good think about what state objects there should be. Probably want 4 (pre, post, mining, user-visible). + mutable boost::shared_mutex x_stateDB; ///< Lock on the state DB, effectively a lock on m_postMine. OverlayDB m_stateDB; ///< Acts as the central point for the state database, so multiple States can share it. State m_preMine; ///< The present state of the client. State m_postMine; ///< The state of the client which we're mining (i.e. it'll have all the rewards added). @@ -354,13 +360,17 @@ private: std::unique_ptr m_work; ///< The work thread. std::atomic m_workState; + std::unique_ptr m_workMine;///< The work thread. + std::atomic m_workMineState; + enum MiningStatus { Preparing, Mining, Mined }; + MiningStatus m_miningStatus = Preparing;///< TODO: consider mutex/atomic variable. + State m_mineState; ///< The state on which we are mining, generally equivalent to m_postMine. bool m_paranoia = false; - bool m_doMine = false; ///< Are we supposed to be mining? bool m_turboMining = false; ///< Don't squander all of our time mining actually just sleeping. bool m_forceMining = false; ///< Mine even when there are no transactions pending? + mutable std::mutex x_mineProgress; ///< Lock for the mining progress & history. MineProgress m_mineProgress; std::list m_mineHistory; - mutable bool m_restartMining = false; mutable unsigned m_pendingCount = 0; mutable std::mutex m_filterLock;