From 6b80953aa3bfa4423f3e5f950474e12142169890 Mon Sep 17 00:00:00 2001 From: arkpar Date: Fri, 12 Jun 2015 10:30:44 +0200 Subject: [PATCH] State management in EthereumHost, better sync progress reporting. Invariants checking --- alethzero/Main.ui | 9 ++- alethzero/MainWin.cpp | 13 +++- libethereum/Client.cpp | 4 +- libethereum/Client.h | 2 +- libethereum/CommonNet.h | 25 +++++--- libethereum/DownloadMan.h | 5 -- libethereum/EthereumHost.cpp | 113 +++++++++++++++++++++++++---------- libethereum/EthereumHost.h | 32 ++++++---- 8 files changed, 138 insertions(+), 65 deletions(-) diff --git a/alethzero/Main.ui b/alethzero/Main.ui index efb6256af..f798437e6 100644 --- a/alethzero/Main.ui +++ b/alethzero/Main.ui @@ -44,7 +44,14 @@ 0 bytes used - + + + + + + + + diff --git a/alethzero/MainWin.cpp b/alethzero/MainWin.cpp index 4d96b77ff..a2abb1068 100644 --- a/alethzero/MainWin.cpp +++ b/alethzero/MainWin.cpp @@ -198,6 +198,7 @@ Main::Main(QWidget *parent) : statusBar()->addPermanentWidget(ui->balance); statusBar()->addPermanentWidget(ui->peerCount); statusBar()->addPermanentWidget(ui->mineStatus); + statusBar()->addPermanentWidget(ui->syncStatus); statusBar()->addPermanentWidget(ui->chainStatus); statusBar()->addPermanentWidget(ui->blockCount); @@ -1245,9 +1246,15 @@ void Main::refreshBlockCount() { auto d = ethereum()->blockChain().details(); BlockQueueStatus b = ethereum()->blockQueueStatus(); - HashChainStatus h = ethereum()->hashChainStatus(); - ui->chainStatus->setText(QString("%10/%11%12 hashes %3 importing %4 ready %5 verifying %6 unverified %7 future %8 unknown %9 bad %1 #%2") - .arg(m_privateChain.size() ? "[" + m_privateChain + "] " : "testnet").arg(d.number).arg(b.importing).arg(b.verified).arg(b.verifying).arg(b.unverified).arg(b.future).arg(b.unknown).arg(b.bad).arg(h.received).arg(h.estimated ? "~" : "").arg(h.total)); + SyncStatus sync = ethereum()->syncStatus(); + QString syncStatus = EthereumHost::stateName(sync.state); + if (sync.state == SyncState::HashesParallel || sync.state == SyncState::HashesSingle) + syncStatus += QString(": %1/%2%3").arg(sync.hashesReceived).arg(sync.hashesEstimated ? "~" : "").arg(sync.hashesTotal); + if (sync.state == SyncState::Blocks || sync.state == SyncState::NewBlocks) + syncStatus += QString(": %1/%2").arg(sync.blocksReceived).arg(sync.blocksTotal); + ui->syncStatus->setText(syncStatus); + ui->chainStatus->setText(QString("%3 importing %4 ready %5 verifying %6 unverified %7 future %8 unknown %9 bad %1 #%2") + .arg(m_privateChain.size() ? "[" + m_privateChain + "] " : "testnet").arg(d.number).arg(b.importing).arg(b.verified).arg(b.verifying).arg(b.unverified).arg(b.future).arg(b.unknown).arg(b.bad)); } void Main::on_turboMining_triggered() diff --git a/libethereum/Client.cpp b/libethereum/Client.cpp index d1ede38ac..23e4f86c8 100644 --- a/libethereum/Client.cpp +++ b/libethereum/Client.cpp @@ -904,8 +904,8 @@ void Client::flushTransactions() doWork(); } -HashChainStatus Client::hashChainStatus() const +SyncStatus Client::syncStatus() const { auto h = m_host.lock(); - return h ? h->status() : HashChainStatus { 0, 0, false }; + return h ? h->status() : SyncStatus(); } diff --git a/libethereum/Client.h b/libethereum/Client.h index cba93290b..5cae50ed0 100644 --- a/libethereum/Client.h +++ b/libethereum/Client.h @@ -157,7 +157,7 @@ public: /// Get some information on the block queue. BlockQueueStatus blockQueueStatus() const { return m_bq.status(); } /// Get some information on the block queue. - HashChainStatus hashChainStatus() const; + SyncStatus syncStatus() const; /// Get the block queue. BlockQueue const& blockQueue() const { return m_bq; } diff --git a/libethereum/CommonNet.h b/libethereum/CommonNet.h index 2eb2d77c8..b960c8f3f 100644 --- a/libethereum/CommonNet.h +++ b/libethereum/CommonNet.h @@ -77,18 +77,27 @@ enum class Asking Nothing }; -enum class Syncing +enum class SyncState { - Waiting, - Executing, - Done + Idle, ///< Initial chain sync complete. Waiting for new packets + WaitingQueue, ///< Block downloading paused. Waiting for block queue to process blocks and free space + HashesNegotiate, ///< Waiting for first hashes to arrive + HashesSingle, ///< Locked on and downloading hashes from a single peer + HashesParallel, ///< Downloading hashes from multiple peers over + Blocks, ///< Downloading blocks + NewBlocks, ///< Downloading blocks learned from NewHashes packet + + Size /// Must be kept last }; -struct HashChainStatus +struct SyncStatus { - unsigned total; - unsigned received; - bool estimated; + SyncState state = SyncState::Idle; + unsigned hashesTotal = 0; + unsigned hashesReceived = 0; + bool hashesEstimated = false; + unsigned blocksTotal = 0; + unsigned blocksReceived = 0; }; } diff --git a/libethereum/DownloadMan.h b/libethereum/DownloadMan.h index 0c27e84ea..ac99e1d36 100644 --- a/libethereum/DownloadMan.h +++ b/libethereum/DownloadMan.h @@ -253,11 +253,6 @@ public: return m_got.full(); } - unsigned gotCount() const - { - return m_got.size(); - } - size_t chainSize() const { ReadGuard l(m_lock); return m_chainCount; } size_t chainEmpty() const { ReadGuard l(m_lock); return m_chainCount == 0; } void foreachSub(std::function const& _f) const { ReadGuard l(x_subs); for(auto i: m_subs) _f(*i); } diff --git a/libethereum/EthereumHost.cpp b/libethereum/EthereumHost.cpp index 4609fddb6..661f34b11 100644 --- a/libethereum/EthereumHost.cpp +++ b/libethereum/EthereumHost.cpp @@ -41,6 +41,8 @@ using namespace p2p; unsigned const EthereumHost::c_oldProtocolVersion = 60; //TODO: remove this once v61+ is common unsigned const c_chainReorgSize = 30000; +char const* const EthereumHost::s_stateNames[static_cast(SyncState::Size)] = {"Idle", "WaitingQueue", "HashesNegotiate", "HashesSingle", "HashesParallel", "Blocks", "NewBlocks" }; + EthereumHost::EthereumHost(BlockChain const& _ch, TransactionQueue& _tq, BlockQueue& _bq, u256 _networkId): HostCapability(), Worker ("ethsync"), @@ -49,6 +51,7 @@ EthereumHost::EthereumHost(BlockChain const& _ch, TransactionQueue& _tq, BlockQu m_bq (_bq), m_networkId (_networkId) { + setState(SyncState::HashesNegotiate); m_latestBlockSent = _ch.currentHash(); m_hashMan.reset(m_chain.number() + 1); m_bqRoomAvailable = m_bq.onRoomAvailable([this](){ m_continueSync = true; }); @@ -79,9 +82,7 @@ void EthereumHost::reset() foreachPeer([](EthereumPeer* _p) { _p->abortSync(); }); m_man.resetToChain(h256s()); m_hashMan.reset(m_chain.number() + 1); - m_needSyncBlocks = true; - m_needSyncHashes = true; - m_syncingNewHashes = false; + setState(SyncState::HashesNegotiate); m_syncingLatestHash = h256(); m_syncingTotalDifficulty = 0; m_latestBlockSent = h256(); @@ -91,10 +92,18 @@ void EthereumHost::reset() void EthereumHost::resetSyncTo(h256 const& _h) { - m_needSyncHashes = true; - m_needSyncBlocks = true; + setState(SyncState::HashesNegotiate); m_syncingLatestHash = _h; - m_syncingNewHashes = false; +} + + +void EthereumHost::setState(SyncState _s) +{ + if (m_state != _s) + { + clog(NetAllDetail) << "SyncState changed from " << stateName(m_state) << " to " << stateName(_s); + m_state = _s; + } } void EthereumHost::doWork() @@ -255,6 +264,7 @@ void EthereumHost::maintainBlocks(h256 const& _currentHash) void EthereumHost::onPeerStatus(EthereumPeer* _peer) { RecursiveGuard l(x_sync); + DEV_INVARIANT_CHECK; if (_peer->m_genesisHash != m_chain.genesisHash()) _peer->disable("Invalid genesis hash"); else if (_peer->m_protocolVersion != protocolVersion() && _peer->m_protocolVersion != c_oldProtocolVersion) @@ -274,13 +284,14 @@ void EthereumHost::onPeerStatus(EthereumPeer* _peer) _peer->m_expectedHashes = (unsigned)_peer->m_latestBlockNumber - m_chain.number(); if (_peer->m_expectedHashes > estimatedHashes) _peer->disable("Too many hashes"); - else if (m_needSyncHashes && m_hashMan.chainSize() < _peer->m_expectedHashes) + else if (needHashes() && m_hashMan.chainSize() < _peer->m_expectedHashes) m_hashMan.resetToRange(m_chain.number() + 1, _peer->m_expectedHashes); } else _peer->m_expectedHashes = estimatedHashes; continueSync(_peer); } + DEV_INVARIANT_CHECK; } unsigned EthereumHost::estimateHashes() @@ -309,6 +320,7 @@ void EthereumHost::onPeerHashes(EthereumPeer* _peer, h256s const& _hashes) void EthereumHost::onPeerHashes(EthereumPeer* _peer, h256s const& _hashes, bool _complete) { + DEV_INVARIANT_CHECK; if (_hashes.empty()) { _peer->m_hashSub.doneFetch(); @@ -376,8 +388,8 @@ void EthereumHost::onPeerHashes(EthereumPeer* _peer, h256s const& _hashes, bool if (_complete) { clog(NetMessageSummary) << "Start new blocks download..."; - m_needSyncBlocks = true; - m_syncingNewHashes = true; + m_syncingLatestHash = h256(); + setState(SyncState::NewBlocks); m_man.resetToChain(m_hashes); m_hashes.clear(); m_hashMan.reset(m_chain.number() + 1); @@ -386,40 +398,41 @@ void EthereumHost::onPeerHashes(EthereumPeer* _peer, h256s const& _hashes, bool else if (syncByNumber && m_hashMan.isComplete()) { // Done our chain-get. - m_needSyncHashes = false; clog(NetNote) << "Hashes download complete."; - // 1/100th for each useful block hash. - _peer->addRating(m_man.chainSize() / 100); //TODO: what about other peers? - m_hashMan.reset(m_chain.number() + 1); - continueSync(); + onPeerDoneHashes(_peer, false); } else if (m_hashes.size() > _peer->m_expectedHashes) { _peer->disable("Too many hashes"); m_hashes.clear(); m_syncingLatestHash = h256(); + setState(SyncState::HashesNegotiate); continueSync(); ///Try with some other peer, keep the chain } else continueSync(_peer); /// Grab next hashes + DEV_INVARIANT_CHECK; } void EthereumHost::onPeerDoneHashes(EthereumPeer* _peer, bool _localChain) { assert(_peer->m_asking == Asking::Nothing); - m_needSyncHashes = false; + m_syncingLatestHash = h256(); + setState(SyncState::Blocks); if (_peer->m_protocolVersion != protocolVersion() || _localChain) { m_man.resetToChain(m_hashes); - m_hashes.clear(); - m_hashMan.reset(m_chain.number() + 1); + _peer->addRating(m_man.chainSize() / 100); //TODO: what about other peers? } + m_hashMan.reset(m_chain.number() + 1); + m_hashes.clear(); continueSync(); } void EthereumHost::onPeerBlocks(EthereumPeer* _peer, RLP const& _r) { RecursiveGuard l(x_sync); + DEV_INVARIANT_CHECK; _peer->setAsking(Asking::Nothing); unsigned itemCount = _r.itemCount(); clog(NetMessageSummary) << "Blocks (" << dec << itemCount << "entries)" << (itemCount ? "" : ": NoMoreBlocks"); @@ -484,19 +497,21 @@ void EthereumHost::onPeerBlocks(EthereumPeer* _peer, RLP const& _r) clog(NetMessageSummary) << dec << success << "imported OK," << unknown << "with unknown parents," << future << "with future timestamps," << got << " already known," << repeated << " repeats received."; - if (m_syncingNewHashes && unknown > 0) + if (m_state == SyncState::NewBlocks && unknown > 0) { _peer->m_latestHash = lastUnknown; resetSyncTo(lastUnknown); } continueSync(_peer); + DEV_INVARIANT_CHECK; } void EthereumHost::onPeerNewHashes(EthereumPeer* _peer, h256s const& _hashes) { RecursiveGuard l(x_sync); - if (isSyncing_UNSAFE() || _peer->isConversing()) + DEV_INVARIANT_CHECK; + if (isSyncing() || _peer->isConversing()) { clog(NetMessageSummary) << "Ignoring new hashes since we're already downloading."; return; @@ -504,12 +519,14 @@ void EthereumHost::onPeerNewHashes(EthereumPeer* _peer, h256s const& _hashes) clog(NetNote) << "New block hash discovered: syncing without help."; _peer->m_syncHashNumber = 0; onPeerHashes(_peer, _hashes, true); + DEV_INVARIANT_CHECK; } void EthereumHost::onPeerNewBlock(EthereumPeer* _peer, RLP const& _r) { RecursiveGuard l(x_sync); - if ((isSyncing_UNSAFE() || _peer->isConversing()) && !m_syncingNewHashes) + DEV_INVARIANT_CHECK; + if ((isSyncing() || _peer->isConversing()) && m_state != SyncState::NewBlocks) { clog(NetMessageSummary) << "Ignoring new blocks since we're already downloading."; return; @@ -563,6 +580,7 @@ void EthereumHost::onPeerNewBlock(EthereumPeer* _peer, RLP const& _r) if (sync) continueSync(); } + DEV_INVARIANT_CHECK; } void EthereumHost::onPeerTransactions(EthereumPeer* _peer, RLP const& _r) @@ -607,6 +625,8 @@ void EthereumHost::onPeerAborting(EthereumPeer* _peer) void EthereumHost::continueSync() { + if (m_state == SyncState::WaitingQueue) + setState(m_lastActiveState); clog(NetAllDetail) << "Continuing sync for all peers"; foreachPeer([&](EthereumPeer* _p) { @@ -617,10 +637,11 @@ void EthereumHost::continueSync() void EthereumHost::continueSync(EthereumPeer* _peer) { + DEV_INVARIANT_CHECK; assert(_peer->m_asking == Asking::Nothing); bool otherPeerV60Sync = false; bool otherPeerV61Sync = false; - if (m_needSyncHashes) + if (needHashes()) { if (!peerShouldGrabChain(_peer)) { @@ -652,7 +673,7 @@ void EthereumHost::continueSync(EthereumPeer* _peer) } if (_peer->m_protocolVersion == protocolVersion() && !m_hashMan.isComplete()) { - m_syncingV61 = true; + setState(SyncState::HashesParallel); _peer->requestHashes(); /// v61+ and not catching up to a particular hash } else @@ -666,20 +687,19 @@ void EthereumHost::continueSync(EthereumPeer* _peer) if (_peer->m_totalDifficulty >= m_syncingTotalDifficulty) { _peer->requestHashes(m_syncingLatestHash); - m_syncingV61 = false; + setState(SyncState::HashesSingle); m_estimatedHashes = _peer->m_expectedHashes - (_peer->m_protocolVersion == protocolVersion() ? 0 : c_chainReorgSize); } else _peer->setIdle(); } } - else if (m_needSyncBlocks) + else if (needBlocks()) { if (m_man.isComplete()) { // Done our chain-get. - m_needSyncBlocks = false; - m_syncingNewHashes = false; + setState(SyncState::Idle); clog(NetNote) << "Chain download complete."; // 1/100th for each useful block hash. _peer->addRating(m_man.chainSize() / 100); //TODO: what about other peers? @@ -700,6 +720,8 @@ void EthereumHost::continueSync(EthereumPeer* _peer) else if (m_bq.knownFull()) { clog(NetAllDetail) << "Waiting for block queue before downloading blocks"; + m_lastActiveState = m_state; + setState(SyncState::WaitingQueue); _peer->setIdle(); } else @@ -708,6 +730,7 @@ void EthereumHost::continueSync(EthereumPeer* _peer) } else _peer->setIdle(); + DEV_INVARIANT_CHECK; } bool EthereumHost::peerCanHelp(EthereumPeer* _peer) const @@ -754,16 +777,42 @@ bool EthereumHost::peerShouldGrabChain(EthereumPeer* _peer) const } } -bool EthereumHost::isSyncing_UNSAFE() const +bool EthereumHost::isSyncing() const { - return m_needSyncBlocks || m_needSyncHashes; + return m_state != SyncState::Idle; } -HashChainStatus EthereumHost::status() +SyncStatus EthereumHost::status() const { RecursiveGuard l(x_sync); - if (m_syncingV61) - return HashChainStatus { static_cast(m_hashMan.chainSize()), static_cast(m_hashMan.gotCount()), false }; - return HashChainStatus { m_estimatedHashes > 0 ? m_estimatedHashes : 0, static_cast(m_hashes.size()), m_estimatedHashes > 0 }; + SyncStatus res; + res.state = m_state; + if (m_state == SyncState::HashesParallel) + { + res.hashesReceived = m_hashMan.hashesGot().size(); + res.hashesTotal = m_hashMan.chainSize(); + } + else if (m_state == SyncState::HashesSingle) + { + res.hashesTotal = m_estimatedHashes; + res.hashesReceived = static_cast(m_hashes.size()); + res.hashesEstimated = true; + } + else if (m_state == SyncState::Blocks || m_state == SyncState::NewBlocks || m_state == SyncState::WaitingQueue) + { + res.blocksTotal = m_man.chainSize(); + res.blocksReceived = m_man.blocksGot().size(); + } + return res; } + +bool EthereumHost::invariants() const +{ + if (m_state == SyncState::HashesNegotiate && !m_hashes.empty()) + return false; + if (needBlocks() && (m_syncingLatestHash || !m_hashes.empty())) + return false; + + return true; +} diff --git a/libethereum/EthereumHost.h b/libethereum/EthereumHost.h index 392e234f3..098d893ee 100644 --- a/libethereum/EthereumHost.h +++ b/libethereum/EthereumHost.h @@ -54,9 +54,10 @@ class BlockQueue; * @warning None of this is thread-safe. You have been warned. * @doWork Syncs to peers and sends new blocks and transactions. */ -class EthereumHost: public p2p::HostCapability, Worker +class EthereumHost: public p2p::HostCapability, Worker, HasInvariants { public: + /// Start server, but don't listen. EthereumHost(BlockChain const& _ch, TransactionQueue& _tq, BlockQueue& _bq, u256 _networkId); @@ -70,7 +71,7 @@ public: void reset(); DownloadMan const& downloadMan() const { return m_man; } - bool isSyncing() const { RecursiveGuard l(x_sync); return isSyncing_UNSAFE(); } + bool isSyncing() const; bool isBanned(p2p::NodeId const& _id) const { return !!m_banned.count(_id); } void noteNewTransactions() { m_newTransactions = true; } @@ -87,17 +88,21 @@ public: DownloadMan& downloadMan() { return m_man; } HashDownloadMan& hashDownloadMan() { return m_hashMan; } BlockChain const& chain() { return m_chain; } - HashChainStatus status(); + SyncStatus status() const; + static char const* stateName(SyncState _s) { return s_stateNames[static_cast(_s)]; } static unsigned const c_oldProtocolVersion; private: + static char const* const s_stateNames[static_cast(SyncState::Size)]; + std::tuple>, std::vector>, std::vector>> randomSelection(unsigned _percent = 25, std::function const& _allow = [](EthereumPeer const*){ return true; }); void foreachPeerPtr(std::function)> const& _f) const; void foreachPeer(std::function const& _f) const; - bool isSyncing_UNSAFE() const; void resetSyncTo(h256 const& _h); + bool needHashes() const { return m_state == SyncState::HashesNegotiate || m_state == SyncState::HashesSingle || m_state == SyncState::HashesParallel; } + bool needBlocks() const { return m_state == SyncState::Blocks || m_state == SyncState::NewBlocks; } /// Sync with the BlockChain. It might contain one of our mined blocks, we might have new candidates from the network. void doWork(); @@ -128,6 +133,9 @@ private: bool peerCanHelp(EthereumPeer* _peer) const; unsigned estimateHashes(); void estimatePeerHashes(EthereumPeer* _peer); + void setState(SyncState _s); + + bool invariants() const override; BlockChain const& m_chain; TransactionQueue& m_tq; ///< Maintains a list of incoming transactions not yet in a block on the blockchain. @@ -148,15 +156,13 @@ private: bool m_newBlocks = false; mutable RecursiveMutex x_sync; - bool m_needSyncHashes = true; ///< Indicates if need to downlad hashes - bool m_needSyncBlocks = true; ///< Indicates if we still need to download some blocks - h256 m_syncingLatestHash; ///< Latest block's hash, as of the current sync. - u256 m_syncingTotalDifficulty; ///< Latest block's total difficulty, as of the current sync. - bool m_syncingNewHashes = false; ///< True if currently downloading hashes received with NewHashes - h256s m_hashes; ///< List of hashes with unknown block numbers. Used for PV60 chain downloading and catching up to a particular unknown - unsigned m_estimatedHashes = 0; ///< Number of estimated hashes for the last peer over PV60. Used for status reporting only. - bool m_syncingV61 = false; ///< True if recent activity was over pv61+. Used for status reporting only. - bool m_continueSync = false; ///< True when the block queue has processed a block; we should restart grabbing blocks. + SyncState m_state = SyncState::Idle; ///< Current sync state + SyncState m_lastActiveState = SyncState::Idle; ///< Saved state before entering waiting queue mode + h256 m_syncingLatestHash; ///< Latest block's hash, as of the current sync. + u256 m_syncingTotalDifficulty; ///< Latest block's total difficulty, as of the current sync. + h256s m_hashes; ///< List of hashes with unknown block numbers. Used for PV60 chain downloading and catching up to a particular unknown + unsigned m_estimatedHashes = 0; ///< Number of estimated hashes for the last peer over PV60. Used for status reporting only. + bool m_continueSync = false; ///< True when the block queue has processed a block; we should restart grabbing blocks. }; }