From cfcd722c51460716fe6ebaa1449d36a906d09fb5 Mon Sep 17 00:00:00 2001 From: arkpar Date: Mon, 29 Jun 2015 11:48:32 +0200 Subject: [PATCH 01/13] pv61 sync re-enable pv60 --- alethzero/MainWin.cpp | 2 +- libethereum/BlockChainSync.cpp | 345 ++++++++++++++++++++++++++++++--- libethereum/BlockChainSync.h | 46 ++++- libethereum/CommonNet.h | 1 + libethereum/EthereumHost.cpp | 23 ++- libethereum/EthereumHost.h | 2 +- libethereum/EthereumPeer.cpp | 17 +- libethereum/EthereumPeer.h | 2 +- libp2p/Session.cpp | 5 +- 9 files changed, 392 insertions(+), 51 deletions(-) diff --git a/alethzero/MainWin.cpp b/alethzero/MainWin.cpp index 821022abc..1fa122cea 100644 --- a/alethzero/MainWin.cpp +++ b/alethzero/MainWin.cpp @@ -1252,7 +1252,7 @@ void Main::refreshBlockCount() auto d = ethereum()->blockChain().details(); BlockQueueStatus b = ethereum()->blockQueueStatus(); SyncStatus sync = ethereum()->syncStatus(); - QString syncStatus = EthereumHost::stateName(sync.state); + QString syncStatus = QString("PV%1 %2").arg(sync.protocolVersion).arg(EthereumHost::stateName(sync.state)); if (sync.state == SyncState::Hashes) syncStatus += QString(": %1/%2%3").arg(sync.hashesReceived).arg(sync.hashesEstimated ? "~" : "").arg(sync.hashesTotal); if (sync.state == SyncState::Blocks || sync.state == SyncState::NewBlocks) diff --git a/libethereum/BlockChainSync.cpp b/libethereum/BlockChainSync.cpp index 63ac620ce..85e894e5d 100644 --- a/libethereum/BlockChainSync.cpp +++ b/libethereum/BlockChainSync.cpp @@ -38,7 +38,8 @@ using namespace dev; using namespace dev::eth; using namespace p2p; -unsigned const c_chainReorgSize = 30000; +unsigned const c_chainReorgSize = 30000; /// Added to estimated hashes to account for potential chain reorganiation +unsigned const c_hashSubchainSize = 8192; /// PV61 subchain size BlockChainSync::BlockChainSync(EthereumHost& _host): m_host(_host) @@ -114,7 +115,6 @@ void BlockChainSync::requestBlocks(std::shared_ptr _peer) { clog(NetAllDetail) << "Waiting for block queue before downloading blocks"; pauseSync(); - _peer->setIdle(); return; } _peer->requestBlocks(); @@ -137,11 +137,9 @@ void BlockChainSync::logNewBlock(h256 const& _h) void BlockChainSync::onPeerBlocks(std::shared_ptr _peer, RLP const& _r) { RecursiveGuard l(x_sync); - DEV_INVARIANT_CHECK; unsigned itemCount = _r.itemCount(); clog(NetMessageSummary) << "Blocks (" << dec << itemCount << "entries)" << (itemCount ? "" : ": NoMoreBlocks"); - _peer->setIdle(); if (m_state != SyncState::Blocks && m_state != SyncState::NewBlocks && m_state != SyncState::Waiting) clog(NetWarn) << "Unexpected Blocks received!"; if (m_state == SyncState::Waiting) @@ -305,6 +303,7 @@ SyncStatus PV60Sync::status() const RecursiveGuard l(x_sync); SyncStatus res; res.state = m_state; + res.protocolVersion = EthereumHost::c_oldProtocolVersion; if (m_state == SyncState::Hashes) { res.hashesTotal = m_estimatedHashes; @@ -381,26 +380,10 @@ void PV60Sync::transition(std::shared_ptr _peer, SyncState _s, boo RLPStream s; if (_s == SyncState::Hashes) { - if (m_state == SyncState::Idle) - { - if (isSyncing(_peer)) - clog(NetWarn) << "Bad state: not asking for Hashes, yet syncing!"; - - m_syncingLatestHash = _peer->m_latestHash; - m_syncingTotalDifficulty = _peer->m_totalDifficulty; - setState(_peer, _s, true); - _peer->requestHashes(m_syncingLastReceivedHash ? m_syncingLastReceivedHash : m_syncingLatestHash); - DEV_INVARIANT_CHECK; - return; - } - else if (m_state == SyncState::Hashes) + if (m_state == SyncState::Idle || m_state == SyncState::Hashes) { - if (!isSyncing(_peer)) - clog(NetWarn) << "Bad state: asking for Hashes yet not syncing!"; - - setState(_peer, _s, true); - _peer->requestHashes(m_syncingLastReceivedHash); - DEV_INVARIANT_CHECK; + m_estimatedHashes = _peer->m_expectedHashes - c_chainReorgSize; + syncHashes(_peer); return; } } @@ -462,7 +445,6 @@ void PV60Sync::transition(std::shared_ptr _peer, SyncState _s, boo } else if (_s == SyncState::Idle) { - host().foreachPeer([this](std::shared_ptr _p) { _p->setIdle(); return true; }); if (m_state == SyncState::Blocks || m_state == SyncState::NewBlocks) { clog(NetMessageDetail) << "Finishing blocks fetch..."; @@ -473,7 +455,6 @@ void PV60Sync::transition(std::shared_ptr _peer, SyncState _s, boo // NOTE: need to notify of giving up on chain-hashes, too, altering state as necessary. _peer->m_sub.doneFetch(); - _peer->setIdle(); setState(_peer, SyncState::Idle, false); } else if (m_state == SyncState::Hashes) @@ -562,7 +543,6 @@ void PV60Sync::attemptSync(std::shared_ptr _peer) else { clog(NetAllDetail) << "Yes. Their chain is better."; - m_estimatedHashes = _peer->m_expectedHashes - c_chainReorgSize; transition(_peer, SyncState::Hashes); } } @@ -573,7 +553,9 @@ void PV60Sync::noteNeedsSyncing(std::shared_ptr _peer) if (isSyncing()) { clog(NetAllDetail) << "Sync in progress: Just set to help out."; - if (m_state == SyncState::Blocks) + if (m_state == SyncState::Hashes && _peer->m_asking == Asking::Nothing) + requestSubchain(_peer); + else if (m_state == SyncState::Blocks) requestBlocks(_peer); } else @@ -649,20 +631,45 @@ void PV60Sync::noteDoneBlocks(std::shared_ptr _peer, bool _clemenc } resetSync(); downloadMan().reset(); + } _peer->m_sub.doneFetch(); +} + +void PV60Sync::syncHashes(std::shared_ptr _peer) +{ + if (m_state == SyncState::Idle) + { + if (isSyncing(_peer)) + clog(NetWarn) << "Bad state: not asking for Hashes, yet syncing!"; + + m_syncingLatestHash = _peer->m_latestHash; + m_syncingTotalDifficulty = _peer->m_totalDifficulty; + setState(_peer, SyncState::Hashes, true); + _peer->requestHashes(m_syncingLastReceivedHash ? m_syncingLastReceivedHash : m_syncingLatestHash); + } + else if (m_state == SyncState::Hashes) + { + if (!isSyncing(_peer)) + clog(NetWarn) << "Bad state: asking for Hashes yet not syncing!"; + + setState(_peer, SyncState::Hashes, true); + _peer->requestHashes(m_syncingLastReceivedHash); } } void PV60Sync::onPeerHashes(std::shared_ptr _peer, h256s const& _hashes) { RecursiveGuard l(x_sync); - DEV_INVARIANT_CHECK; - _peer->setIdle(); if (!isSyncing(_peer)) { clog(NetMessageSummary) << "Ignoring hashes since not syncing"; return; } + if (_peer->m_syncHash != (m_syncingLastReceivedHash ? m_syncingLastReceivedHash : m_syncingLatestHash)) + { + clog(NetMessageSummary) << "Ignoring unexpected hashes"; + return; + } if (_hashes.size() == 0) { transition(_peer, SyncState::Blocks); @@ -711,7 +718,6 @@ void PV60Sync::onPeerHashes(std::shared_ptr _peer, h256s const& _h void PV60Sync::onPeerNewHashes(std::shared_ptr _peer, h256s const& _hashes) { RecursiveGuard l(x_sync); - DEV_INVARIANT_CHECK; if (isSyncing() && (m_state != SyncState::NewBlocks || isSyncing(_peer))) { clog(NetMessageSummary) << "Ignoring since we're already downloading."; @@ -769,7 +775,6 @@ void PV60Sync::onPeerNewHashes(std::shared_ptr _peer, h256s const& void PV60Sync::abortSync() { // Can't check invariants here since the peers is already removed from the list and the state is not updated yet. - host().foreachPeer([this](std::shared_ptr _p) { _p->setIdle(); return true; }); setState(std::shared_ptr(), SyncState::Idle, false, true); DEV_INVARIANT_CHECK; } @@ -820,3 +825,281 @@ bool PV60Sync::invariants() const return false; return true; } + +PV61Sync::PV61Sync(EthereumHost& _host): + PV60Sync(_host) +{ +} + +void PV61Sync::syncHashes(std::shared_ptr _peer) +{ + if (_peer->m_protocolVersion != host().protocolVersion()) + { + m_readyChainMap.clear(); + m_completeChainMap.clear(); + m_downloadingChainMap.clear(); + m_syncingBlockNumber = 0; + m_chainSyncPeers.clear(); + m_knownHashes.clear(); + PV60Sync::syncHashes(_peer); + return; + } + if (m_state == SyncState::Idle) + { + if (isSyncing(_peer)) + clog(NetWarn) << "Bad state: not asking for Hashes, yet syncing!"; + + if (m_syncingBlockNumber == 0) + m_syncingBlockNumber = host().chain().number() + c_hashSubchainSize; + m_syncingTotalDifficulty = _peer->m_totalDifficulty; + setState(_peer, SyncState::Hashes, true); + _peer->requestHashes(m_syncingBlockNumber, 1); + } + else if (m_state == SyncState::Hashes) + { + if (!isSyncing(_peer)) + clog(NetWarn) << "Bad state: asking for Hashes yet not syncing!"; + + m_syncingBlockNumber += c_hashSubchainSize; + setState(_peer, SyncState::Hashes, true); + _peer->requestHashes(m_syncingBlockNumber, 1); + } +} + +void PV61Sync::requestSubchain(std::shared_ptr _peer) +{ + auto syncPeer = m_chainSyncPeers.find(_peer); + if (syncPeer != m_chainSyncPeers.end()) + { + // Already downoading, request next batch + h256s& d = m_downloadingChainMap.at(syncPeer->second); + _peer->requestHashes(d.back()); + } + else if (needsSyncing(_peer) && !m_readyChainMap.empty()) + { + clog(NetAllDetail) << "Helping with hashchin download"; + h256s& d = m_readyChainMap.begin()->second; + _peer->requestHashes(d.back()); + m_downloadingChainMap[m_readyChainMap.begin()->first] = move(d); + m_chainSyncPeers[_peer] = m_readyChainMap.begin()->first; + m_readyChainMap.erase(m_readyChainMap.begin()); + } +} + +void PV61Sync::requestSubchains() +{ + host().foreachPeer([this](std::shared_ptr _p) + { + if (_p->m_asking == Asking::Nothing) + requestSubchain(_p); + return true; + }); +} + +void PV61Sync::completeSubchain(std::shared_ptr _peer, unsigned _n) +{ + m_completeChainMap[_n] = move(m_downloadingChainMap.at(_n)); + m_downloadingChainMap.erase(_n); + _peer->m_syncHashNumber = 0; + + auto syncer = m_syncer.lock(); + if (!syncer) + { + restartSync(); + return; + } + + if (m_readyChainMap.empty() && m_downloadingChainMap.empty() && syncer->m_asking == Asking::Nothing) + { + //Done chain-get + m_syncingNeededBlocks.clear(); + for (auto h = m_completeChainMap.rbegin(); h != m_completeChainMap.rend(); ++h) + m_syncingNeededBlocks.insert(m_syncingNeededBlocks.end(), h->second.begin(), h->second.end()); + m_completeChainMap.clear(); + m_knownHashes.clear(); + m_syncingBlockNumber = 0; + transition(syncer, SyncState::Blocks); + } + else + requestSubchain(_peer); +} + +void PV61Sync::restartSync() +{ + m_completeChainMap.clear(); + m_readyChainMap.clear(); + m_downloadingChainMap.clear(); + m_chainSyncPeers.clear(); + m_syncingBlockNumber = 0; + m_knownHashes.clear(); + PV60Sync::restartSync(); +} + +void PV61Sync::onPeerHashes(std::shared_ptr _peer, h256s const& _hashes) +{ + RecursiveGuard l(x_sync); + if (m_syncingBlockNumber == 0 || (_peer == m_syncer.lock() && _peer->m_protocolVersion != host().protocolVersion())) + { + // Syncing in pv60 mode + PV60Sync::onPeerHashes(_peer, _hashes); + return; + } + if (_hashes.size() == 0) + { + if (isSyncing(_peer) && _peer->m_syncHashNumber == m_syncingBlockNumber) + { + // End of hash chain, add last chunk to download + m_readyChainMap.insert(make_pair(m_syncingBlockNumber, h256s { _peer->m_latestHash })); + _peer->m_syncHashNumber = 0; + requestSubchain(_peer); + } + else + { + auto syncPeer = m_chainSyncPeers.find(_peer); + if (syncPeer == m_chainSyncPeers.end()) + clog(NetWarn) << "Hashes response from unexpected peer"; + else + { + // Peer does not have request hashes, move back from downloading to ready + unsigned number = syncPeer->second; + m_chainSyncPeers.erase(_peer); + m_readyChainMap[number] = move(m_downloadingChainMap.at(number)); + m_downloadingChainMap.erase(number); + resetNeedsSyncing(_peer); + requestSubchains(); + } + } + return; + } + if (isSyncing(_peer) && _peer->m_syncHashNumber == m_syncingBlockNumber) + { + // Got new subchain marker + assert(_hashes.size() == 1); + m_knownHashes.insert(_hashes[0]); + m_readyChainMap.insert(make_pair(m_syncingBlockNumber, h256s { _hashes[0] })); + if ((m_readyChainMap.size() + m_downloadingChainMap.size() + m_completeChainMap.size()) * c_hashSubchainSize > _peer->m_expectedHashes) + { + _peer->disable("Too many hashes from lead peer"); + restartSync(); + return; + } + transition(_peer, SyncState::Hashes); + requestSubchains(); + } + else + { + auto syncPeer = m_chainSyncPeers.find(_peer); + if (syncPeer == m_chainSyncPeers.end()) + { + clog(NetWarn) << "Hashes response from unexpected peer"; + return; + } + unsigned number = syncPeer->second; + h256s& hashes = m_downloadingChainMap.at(number); + + unsigned knowns = 0; + unsigned unknowns = 0; + for (unsigned i = 0; i < _hashes.size(); ++i) + { + auto h = _hashes[i]; + auto status = host().bq().blockStatus(h); + if (status == QueueStatus::Importing || status == QueueStatus::Ready || host().chain().isKnown(h) || !!m_knownHashes.count(h)) + { + clog(NetMessageSummary) << "Subchain download complete"; + m_chainSyncPeers.erase(_peer); + completeSubchain(_peer, number); + return; + } + else if (status == QueueStatus::Bad) + { + cwarn << "block hash bad!" << h << ". Bailing..."; + restartSync(); + return; + } + else if (status == QueueStatus::Unknown) + { + unknowns++; + hashes.push_back(h); + } + else + knowns++; + } + clog(NetMessageSummary) << knowns << "knowns," << unknowns << "unknowns; now at" << hashes.back(); + if (hashes.size() > c_hashSubchainSize) + { + _peer->disable("Too many subchain hashes"); + restartSync(); + return; + } + requestSubchain(_peer); + } + DEV_INVARIANT_CHECK; +} + +void PV61Sync::onPeerAborting() +{ + RecursiveGuard l(x_sync); + // Can't check invariants here since the peers is already removed from the list and the state is not updated yet. + for (auto s = m_chainSyncPeers.begin(); s != m_chainSyncPeers.end();) + { + if (s->first.expired()) + { + unsigned number = s->second; + m_readyChainMap[number] = move(m_downloadingChainMap.at(number)); + m_downloadingChainMap.erase(number); + m_chainSyncPeers.erase(s++); + } + else + ++s; + } + if (m_syncer.expired()) + abortSync(); + else if (isPV61Syncing()) + requestSubchains(); + DEV_INVARIANT_CHECK; +} + +SyncStatus PV61Sync::status() const +{ + RecursiveGuard l(x_sync); + SyncStatus res = PV60Sync::status(); + if (m_state == SyncState::Hashes && isPV61Syncing()) + { + res.protocolVersion = 61; + res.hashesReceived = 0; + for (auto const& d : m_readyChainMap) + res.hashesReceived += d.second.size(); + for (auto const& d : m_downloadingChainMap) + res.hashesReceived += d.second.size(); + for (auto const& d : m_completeChainMap) + res.hashesReceived += d.second.size(); + } + return res; +} + +bool PV61Sync::isPV61Syncing() const +{ + return m_syncingBlockNumber != 0; +} + +bool PV61Sync::invariants() const +{ + if (m_downloadingChainMap.size() != m_chainSyncPeers.size()) + return false; + if (m_state == SyncState::Idle && isSyncing()) + return false; + if (m_state != SyncState::Idle && !isSyncing()) + return false; + if (m_state == SyncState::Hashes) + { + bool hashes = false; + host().foreachPeer([&](std::shared_ptr _p) { if (_p->m_asking == Asking::Hashes) hashes = true; return !hashes; }); + if (!hashes) + return false; + if (isPV61Syncing() && !m_syncingBlockNumber) + return false; + } + else if (!PV60Sync::invariants()) + return false; + return true; +} diff --git a/libethereum/BlockChainSync.h b/libethereum/BlockChainSync.h index 449dd49d1..01727a0cb 100644 --- a/libethereum/BlockChainSync.h +++ b/libethereum/BlockChainSync.h @@ -132,8 +132,8 @@ private: /** - * @brief Syncrhonization over PV60. Selects a single peer and tries to downloading hashes from it. After hash downaload is complete - * Syncs to peers and keeps up to date + * @brief Syncrhonization over PV60. Selects a single peer and tries to downloading hashes from it. After hash download is complete + * syncs to peers and keeps up to date */ /** @@ -228,7 +228,7 @@ protected: void pauseSync() override; void resetSyncFor(std::shared_ptr _peer, h256 const& _latestHash, u256 const& _td) override; -private: +protected: /// Transition sync state in a particular direction. @param _peer Peer that is responsible for state tranfer void transition(std::shared_ptr _peer, SyncState _s, bool _force = false, bool _needHelp = true); @@ -262,6 +262,12 @@ private: /// Called when peer done downloading blocks void noteDoneBlocks(std::shared_ptr _who, bool _clemency); + /// Start chainhash sync + virtual void syncHashes(std::shared_ptr _peer); + + /// Request subchain, no-op for pv60 + virtual void requestSubchain(std::shared_ptr /*_peer*/) {} + /// Abort syncing void abortSync(); @@ -276,5 +282,39 @@ private: u256 m_syncingTotalDifficulty; ///< Latest block's total difficulty of the peer we aresyncing to, as of the current sync. std::weak_ptr m_syncer; ///< Peer we are currently syncing with }; + +/** + * @brief Syncrhonization over PV61. Selects a single peer and requests every c_hashSubchainSize hash, splitting the hashchain into subchains and downloading each subchain in parallel. + * Syncs to peers and keeps up to date + */ +class PV61Sync: public PV60Sync +{ +public: + PV61Sync(EthereumHost& _host); + +protected: + void restartSync() override; + void requestSubchain(std::shared_ptr _peer) override; + void syncHashes(std::shared_ptr _peer) override; + void onPeerHashes(std::shared_ptr _peer, h256s const& _hashes) override; + void onPeerAborting() override; + SyncStatus status() const override; + bool invariants() const override; + +private: + /// Called when subchain is complete. Check if if hashchain is fully downloaded and proceed to downloading blocks + void completeSubchain(std::shared_ptr _peer, unsigned _n); + /// Find a subchain for peers to downloading + void requestSubchains(); + /// Check if downloading hashes in parallel + bool isPV61Syncing() const; + + std::map m_completeChainMap; ///< Fully downloaded subchains + std::map m_readyChainMap; ///< Subchains ready for download + std::map m_downloadingChainMap; ///< Subchains currently being downloading. In sync with m_chainSyncPeers + std::map, unsigned, std::owner_less>> m_chainSyncPeers; ///< Peers to m_downloadingSubchain number map + h256Hash m_knownHashes; ///< Subchain start markers. Used to track suchain completion + unsigned m_syncingBlockNumber = 0; ///< Current subchain marker +}; } } diff --git a/libethereum/CommonNet.h b/libethereum/CommonNet.h index 35a27ff5e..d03e04d85 100644 --- a/libethereum/CommonNet.h +++ b/libethereum/CommonNet.h @@ -91,6 +91,7 @@ enum class SyncState struct SyncStatus { SyncState state = SyncState::Idle; + unsigned protocolVersion = 0; unsigned hashesTotal = 0; unsigned hashesReceived = 0; bool hashesEstimated = false; diff --git a/libethereum/EthereumHost.cpp b/libethereum/EthereumHost.cpp index 9980f4339..5bd16ac12 100644 --- a/libethereum/EthereumHost.cpp +++ b/libethereum/EthereumHost.cpp @@ -230,10 +230,10 @@ void EthereumHost::maintainBlocks(h256 const& _currentHash) } } -BlockChainSync& EthereumHost::sync() +BlockChainSync* EthereumHost::sync() { if (m_sync) - return *m_sync; // We only chose sync strategy once + return m_sync.get(); // We only chose sync strategy once bool pv61 = false; foreachPeer([&](std::shared_ptr _p) @@ -242,38 +242,43 @@ BlockChainSync& EthereumHost::sync() pv61 = true; return !pv61; }); - m_sync.reset(pv61 ? new PV60Sync(*this) : new PV60Sync(*this)); - return *m_sync; + m_sync.reset(pv61 ? new PV61Sync(*this) : new PV60Sync(*this)); + return m_sync.get(); } void EthereumHost::onPeerStatus(std::shared_ptr _peer) { Guard l(x_sync); - sync().onPeerStatus(_peer); + if (sync()) + sync()->onPeerStatus(_peer); } void EthereumHost::onPeerHashes(std::shared_ptr _peer, h256s const& _hashes) { Guard l(x_sync); - sync().onPeerHashes(_peer, _hashes); + if (sync()) + sync()->onPeerHashes(_peer, _hashes); } void EthereumHost::onPeerBlocks(std::shared_ptr _peer, RLP const& _r) { Guard l(x_sync); - sync().onPeerBlocks(_peer, _r); + if (sync()) + sync()->onPeerBlocks(_peer, _r); } void EthereumHost::onPeerNewHashes(std::shared_ptr _peer, h256s const& _hashes) { Guard l(x_sync); - sync().onPeerNewHashes(_peer, _hashes); + if (sync()) + sync()->onPeerNewHashes(_peer, _hashes); } void EthereumHost::onPeerNewBlock(std::shared_ptr _peer, RLP const& _r) { Guard l(x_sync); - sync().onPeerNewBlock(_peer, _r); + if (sync()) + sync()->onPeerNewBlock(_peer, _r); } void EthereumHost::onPeerTransactions(std::shared_ptr _peer, RLP const& _r) diff --git a/libethereum/EthereumHost.h b/libethereum/EthereumHost.h index 182c3d4cd..37c495ec5 100644 --- a/libethereum/EthereumHost.h +++ b/libethereum/EthereumHost.h @@ -116,7 +116,7 @@ private: virtual void onStarting() override { startWorking(); } virtual void onStopping() override { stopWorking(); } - BlockChainSync& sync(); + BlockChainSync* sync(); BlockChain const& m_chain; TransactionQueue& m_tq; ///< Maintains a list of incoming transactions not yet in a block on the blockchain. diff --git a/libethereum/EthereumPeer.cpp b/libethereum/EthereumPeer.cpp index 88878334c..9e7dd6898 100644 --- a/libethereum/EthereumPeer.cpp +++ b/libethereum/EthereumPeer.cpp @@ -141,6 +141,7 @@ void EthereumPeer::requestStatus() void EthereumPeer::requestHashes(u256 _number, unsigned _count) { assert(m_asking == Asking::Nothing); + assert(m_protocolVersion == host()->protocolVersion()); m_syncHashNumber = _number; m_syncHash = h256(); setAsking(Asking::Hashes); @@ -198,7 +199,7 @@ void EthereumPeer::requestBlocks() void EthereumPeer::setAsking(Asking _a) { m_asking = _a; - m_lastAsk = chrono::system_clock::now(); + m_lastAsk = std::chrono::system_clock::to_time_t(chrono::system_clock::now()); auto s = session(); if (s) @@ -211,9 +212,14 @@ void EthereumPeer::setAsking(Asking _a) void EthereumPeer::tick() { auto s = session(); - if (s && (chrono::system_clock::now() - m_lastAsk > chrono::seconds(10) && m_asking != Asking::Nothing)) + time_t now = std::chrono::system_clock::to_time_t(chrono::system_clock::now()); + if (s && (now - m_lastAsk > 10 && m_asking != Asking::Nothing)) + { + clog(NetWarn) << "timeout: " << (now - m_lastAsk) << " " << + (m_asking == Asking::Nothing ? "nothing" : m_asking == Asking::State ? "state" : m_asking == Asking::Hashes ? "hashes" : m_asking == Asking::Blocks ? "blocks" : "?"); // timeout s->disconnect(PingTimeout); + } } bool EthereumPeer::isConversing() const @@ -228,6 +234,7 @@ bool EthereumPeer::isCriticalSyncing() const bool EthereumPeer::interpret(unsigned _id, RLP const& _r) { + m_lastAsk = std::chrono::system_clock::to_time_t(chrono::system_clock::now()); try { switch (_id) @@ -254,7 +261,7 @@ bool EthereumPeer::interpret(unsigned _id, RLP const& _r) } clog(NetMessageSummary) << "Status:" << m_protocolVersion << "/" << m_networkId << "/" << m_genesisHash << "/" << m_latestBlockNumber << ", TD:" << m_totalDifficulty << "=" << m_latestHash; - setAsking(Asking::Nothing); + setIdle(); host()->onPeerStatus(dynamic_pointer_cast(dynamic_pointer_cast(shared_from_this()))); break; } @@ -311,6 +318,7 @@ bool EthereumPeer::interpret(unsigned _id, RLP const& _r) clog(NetWarn) << "Peer giving us hashes when we didn't ask for them."; break; } + setIdle(); h256s hashes(itemCount); for (unsigned i = 0; i < itemCount; ++i) hashes[i] = _r[i].toHash(); @@ -357,7 +365,10 @@ bool EthereumPeer::interpret(unsigned _id, RLP const& _r) if (m_asking != Asking::Blocks) clog(NetImpolite) << "Peer giving us blocks when we didn't ask for them."; else + { + setIdle(); host()->onPeerBlocks(dynamic_pointer_cast(shared_from_this()), _r); + } break; } case NewBlockPacket: diff --git a/libethereum/EthereumPeer.h b/libethereum/EthereumPeer.h index d506ffb3f..faf3a0650 100644 --- a/libethereum/EthereumPeer.h +++ b/libethereum/EthereumPeer.h @@ -136,7 +136,7 @@ private: /// What, if anything, we last asked the other peer for. Asking m_asking = Asking::Nothing; /// When we asked for it. Allows a time out. - std::chrono::system_clock::time_point m_lastAsk; + std::atomic m_lastAsk; /// These are determined through either a Status message or from NewBlock. h256 m_latestHash; ///< Peer's latest block's hash that we know about or default null value if no need to sync. diff --git a/libp2p/Session.cpp b/libp2p/Session.cpp index 3d0bd0b6a..b79a225a2 100644 --- a/libp2p/Session.cpp +++ b/libp2p/Session.cpp @@ -367,7 +367,7 @@ void Session::drop(DisconnectReason _reason) if (socket.is_open()) try { - clog(NetConnect) << "Closing " << socket.remote_endpoint() << "(" << reasonOf(_reason) << ")"; + clog(NetWarn) << "Closing " << socket.remote_endpoint() << "(" << reasonOf(_reason) << ")"; boost::system::error_code ec; socket.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec); socket.close(); @@ -386,11 +386,12 @@ void Session::drop(DisconnectReason _reason) void Session::disconnect(DisconnectReason _reason) { clog(NetConnect) << "Disconnecting (our reason:" << reasonOf(_reason) << ")"; + size_t peerCount = m_server->peerCount(); //needs to be outside of lock to avoid deadlocking with other thread that capture x_info/x_sessions in reverse order DEV_GUARDED(x_info) StructuredLogger::p2pDisconnected( m_info.id.abridged(), m_peer->endpoint, // TODO: may not be 100% accurate - m_server->peerCount() + peerCount ); if (m_socket->ref().is_open()) { From cc9865b271a995104703cbabed7e4c4112ecfd1d Mon Sep 17 00:00:00 2001 From: arkpar Date: Tue, 30 Jun 2015 18:09:46 +0200 Subject: [PATCH 02/13] detailed invariant exceptions --- libethereum/BlockChainSync.cpp | 32 +++++++++++--------------------- libethereum/EthereumPeer.cpp | 4 ++-- 2 files changed, 13 insertions(+), 23 deletions(-) diff --git a/libethereum/BlockChainSync.cpp b/libethereum/BlockChainSync.cpp index 85e894e5d..7fa4a574d 100644 --- a/libethereum/BlockChainSync.cpp +++ b/libethereum/BlockChainSync.cpp @@ -791,38 +791,32 @@ void PV60Sync::onPeerAborting() bool PV60Sync::invariants() const { if (m_state == SyncState::Idle && isSyncing()) - return false; + BOOST_THROW_EXCEPTION(FailedInvariant() << errinfo_comment("Idle while peer syncing")); if (m_state != SyncState::Idle && !isSyncing()) - return false; + BOOST_THROW_EXCEPTION(FailedInvariant() << errinfo_comment("Active while peer not syncing")); if (m_state == SyncState::Hashes) { bool hashes = false; host().foreachPeer([&](std::shared_ptr _p) { if (_p->m_asking == Asking::Hashes) hashes = true; return !hashes; }); if (!hashes) - return false; + BOOST_THROW_EXCEPTION(FailedInvariant() << errinfo_comment("No peers asking for hashes")); if (!m_syncingLatestHash) - return false; + BOOST_THROW_EXCEPTION(FailedInvariant() << errinfo_comment("m_syncingLatestHash is not set while downloading hashes")); if (m_syncingNeededBlocks.empty() != (!m_syncingLastReceivedHash)) - return false; + BOOST_THROW_EXCEPTION(FailedInvariant() << errinfo_comment("Received hashes but the hashes list is empty (or the other way around)")); } if (m_state == SyncState::Blocks || m_state == SyncState::NewBlocks) { bool blocks = false; host().foreachPeer([&](std::shared_ptr _p) { if (_p->m_asking == Asking::Blocks) blocks = true; return !blocks; }); if (!blocks) - return false; + BOOST_THROW_EXCEPTION(FailedInvariant() << errinfo_comment("No peers asking for blocks")); if (downloadMan().isComplete()) - return false; - } - if (m_state == SyncState::Idle) - { - bool busy = false; - host().foreachPeer([&](std::shared_ptr _p) { if (_p->m_asking != Asking::Nothing && _p->m_asking != Asking::State) busy = true; return !busy; }); - if (busy) + BOOST_THROW_EXCEPTION(FailedInvariant() << errinfo_comment("Block download complete but the state is still Blocks")); return false; } if (m_state == SyncState::Waiting && !host().bq().isActive()) - return false; + BOOST_THROW_EXCEPTION(FailedInvariant() << errinfo_comment("Waiting while block queue is idle")); return true; } @@ -1085,19 +1079,15 @@ bool PV61Sync::isPV61Syncing() const bool PV61Sync::invariants() const { if (m_downloadingChainMap.size() != m_chainSyncPeers.size()) - return false; - if (m_state == SyncState::Idle && isSyncing()) - return false; - if (m_state != SyncState::Idle && !isSyncing()) - return false; + BOOST_THROW_EXCEPTION(FailedInvariant() << errinfo_comment("m_downloadingChainMap and m_chainSyncPeers out of sync")); if (m_state == SyncState::Hashes) { bool hashes = false; host().foreachPeer([&](std::shared_ptr _p) { if (_p->m_asking == Asking::Hashes) hashes = true; return !hashes; }); if (!hashes) - return false; + BOOST_THROW_EXCEPTION(FailedInvariant() << errinfo_comment("No peers asking for hashes")); if (isPV61Syncing() && !m_syncingBlockNumber) - return false; + BOOST_THROW_EXCEPTION(FailedInvariant() << errinfo_comment("Syncing in PV61 with no block number set")); } else if (!PV60Sync::invariants()) return false; diff --git a/libethereum/EthereumPeer.cpp b/libethereum/EthereumPeer.cpp index 9e7dd6898..e8565fafc 100644 --- a/libethereum/EthereumPeer.cpp +++ b/libethereum/EthereumPeer.cpp @@ -392,9 +392,9 @@ bool EthereumPeer::interpret(unsigned _id, RLP const& _r) return false; } } - catch (Exception const& _e) + catch (Exception const&) { - clog(NetWarn) << "Peer causing an Exception:" << _e.what() << _r; + clog(NetWarn) << "Peer causing an Exception:" << boost::current_exception_diagnostic_information() << _r; } catch (std::exception const& _e) { From fc9d28081b52955d0a78955a16795459207be968 Mon Sep 17 00:00:00 2001 From: arkpar Date: Tue, 30 Jun 2015 18:23:18 +0200 Subject: [PATCH 03/13] removed extra status field for pv61 --- libethereum/EthereumPeer.cpp | 23 +++-------------------- libethereum/EthereumPeer.h | 2 -- 2 files changed, 3 insertions(+), 22 deletions(-) diff --git a/libethereum/EthereumPeer.cpp b/libethereum/EthereumPeer.cpp index e8565fafc..1ad4aec7b 100644 --- a/libethereum/EthereumPeer.cpp +++ b/libethereum/EthereumPeer.cpp @@ -127,14 +127,12 @@ void EthereumPeer::requestStatus() m_requireTransactions = true; RLPStream s; bool latest = m_peerCapabilityVersion == host()->protocolVersion(); - prep(s, StatusPacket, latest ? 6 : 5) + prep(s, StatusPacket, 5) << (latest ? host()->protocolVersion() : EthereumHost::c_oldProtocolVersion) << host()->networkId() << host()->chain().details().totalDifficulty << host()->chain().currentHash() << host()->chain().genesisHash(); - if (latest) - s << u256(host()->chain().number()); sealAndSend(s); } @@ -214,12 +212,8 @@ void EthereumPeer::tick() auto s = session(); time_t now = std::chrono::system_clock::to_time_t(chrono::system_clock::now()); if (s && (now - m_lastAsk > 10 && m_asking != Asking::Nothing)) - { - clog(NetWarn) << "timeout: " << (now - m_lastAsk) << " " << - (m_asking == Asking::Nothing ? "nothing" : m_asking == Asking::State ? "state" : m_asking == Asking::Hashes ? "hashes" : m_asking == Asking::Blocks ? "blocks" : "?"); // timeout s->disconnect(PingTimeout); - } } bool EthereumPeer::isConversing() const @@ -247,20 +241,9 @@ bool EthereumPeer::interpret(unsigned _id, RLP const& _r) m_latestHash = _r[3].toHash(); m_genesisHash = _r[4].toHash(); if (m_peerCapabilityVersion == host()->protocolVersion()) - { - if (_r.itemCount() != 6) - { - clog(NetImpolite) << "Peer does not support PV61+ status extension."; - m_protocolVersion = EthereumHost::c_oldProtocolVersion; - } - else - { - m_protocolVersion = host()->protocolVersion(); - m_latestBlockNumber = _r[5].toInt(); - } - } + m_protocolVersion = host()->protocolVersion(); - clog(NetMessageSummary) << "Status:" << m_protocolVersion << "/" << m_networkId << "/" << m_genesisHash << "/" << m_latestBlockNumber << ", TD:" << m_totalDifficulty << "=" << m_latestHash; + clog(NetMessageSummary) << "Status:" << m_protocolVersion << "/" << m_networkId << "/" << m_genesisHash << ", TD:" << m_totalDifficulty << "=" << m_latestHash; setIdle(); host()->onPeerStatus(dynamic_pointer_cast(dynamic_pointer_cast(shared_from_this()))); break; diff --git a/libethereum/EthereumPeer.h b/libethereum/EthereumPeer.h index faf3a0650..ef9fa542a 100644 --- a/libethereum/EthereumPeer.h +++ b/libethereum/EthereumPeer.h @@ -142,8 +142,6 @@ private: h256 m_latestHash; ///< Peer's latest block's hash that we know about or default null value if no need to sync. u256 m_totalDifficulty; ///< Peer's latest block's total difficulty. h256 m_genesisHash; ///< Peer's genesis hash - u256 m_latestBlockNumber; ///< Number of the latest block this peer has - /// This is built as we ask for hashes. Once no more hashes are given, we present this to the /// host who initialises the DownloadMan and m_sub becomes active for us to begin asking for blocks. From c438d60ae8353ef9f9424e619886603ce684052c Mon Sep 17 00:00:00 2001 From: arkpar Date: Mon, 29 Jun 2015 22:19:24 +0200 Subject: [PATCH 04/13] removed obsolete invariant --- libethereum/BlockChainSync.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/libethereum/BlockChainSync.cpp b/libethereum/BlockChainSync.cpp index 7fa4a574d..88b68f614 100644 --- a/libethereum/BlockChainSync.cpp +++ b/libethereum/BlockChainSync.cpp @@ -813,7 +813,6 @@ bool PV60Sync::invariants() const BOOST_THROW_EXCEPTION(FailedInvariant() << errinfo_comment("No peers asking for blocks")); if (downloadMan().isComplete()) BOOST_THROW_EXCEPTION(FailedInvariant() << errinfo_comment("Block download complete but the state is still Blocks")); - return false; } if (m_state == SyncState::Waiting && !host().bq().isActive()) BOOST_THROW_EXCEPTION(FailedInvariant() << errinfo_comment("Waiting while block queue is idle")); From 307c14dfbb4d37922d4e76a758bd8bf3165d6b26 Mon Sep 17 00:00:00 2001 From: arkpar Date: Tue, 30 Jun 2015 19:11:04 +0200 Subject: [PATCH 05/13] more invariant diagnostics --- libethereum/BlockQueue.cpp | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/libethereum/BlockQueue.cpp b/libethereum/BlockQueue.cpp index 074278d9a..a09c2e320 100644 --- a/libethereum/BlockQueue.cpp +++ b/libethereum/BlockQueue.cpp @@ -21,6 +21,7 @@ #include "BlockQueue.h" #include +#include #include #include #include @@ -471,7 +472,13 @@ void BlockQueue::drain(VerifiedBlocks& o_out, unsigned _max) bool BlockQueue::invariants() const { Guard l(m_verification); - return m_readySet.size() == m_verified.size() + m_unverified.size() + m_verifying.size(); + if (!(m_readySet.size() == m_verified.size() + m_unverified.size() + m_verifying.size())) + { + std::stringstream s; + s << "Failed BlockQueue invariant: m_readySet: " << m_readySet.size() << " m_verified: " << m_verified.size() << " m_unverified: " << m_unverified.size() << " m_verifying" << m_verifying.size(); + BOOST_THROW_EXCEPTION(FailedInvariant() << errinfo_comment(s.str())); + } + return true; } void BlockQueue::noteReady_WITH_LOCK(h256 const& _good) From 2a31247150cc688513ea8b86e52bb5f4545dd141 Mon Sep 17 00:00:00 2001 From: arkpar Date: Tue, 30 Jun 2015 19:57:04 +0200 Subject: [PATCH 06/13] diable peers giving bad chain hashes --- libethereum/BlockChainSync.cpp | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/libethereum/BlockChainSync.cpp b/libethereum/BlockChainSync.cpp index 88b68f614..7368d7632 100644 --- a/libethereum/BlockChainSync.cpp +++ b/libethereum/BlockChainSync.cpp @@ -141,7 +141,10 @@ void BlockChainSync::onPeerBlocks(std::shared_ptr _peer, RLP const clog(NetMessageSummary) << "Blocks (" << dec << itemCount << "entries)" << (itemCount ? "" : ": NoMoreBlocks"); if (m_state != SyncState::Blocks && m_state != SyncState::NewBlocks && m_state != SyncState::Waiting) - clog(NetWarn) << "Unexpected Blocks received!"; + { + clog(NetMessageSummary) << "Ignoring unexpected blocks"; + return; + } if (m_state == SyncState::Waiting) { clog(NetAllDetail) << "Ignored blocks while waiting"; @@ -182,6 +185,7 @@ void BlockChainSync::onPeerBlocks(std::shared_ptr _peer, RLP const case ImportResult::BadChain: logNewBlock(h); _peer->disable("Malformed block received."); + restartSync(); return; case ImportResult::FutureTimeKnown: @@ -691,7 +695,8 @@ void PV60Sync::onPeerHashes(std::shared_ptr _peer, h256s const& _h else if (status == QueueStatus::Bad) { cwarn << "block hash bad!" << h << ". Bailing..."; - transition(_peer, SyncState::Idle); + _peer->disable("Bad blocks"); + restartSync(); return; } else if (status == QueueStatus::Unknown) @@ -1006,6 +1011,7 @@ void PV61Sync::onPeerHashes(std::shared_ptr _peer, h256s const& _h else if (status == QueueStatus::Bad) { cwarn << "block hash bad!" << h << ". Bailing..."; + _peer->disable("Bad blocks"); restartSync(); return; } From befae80d10febaa45f2587405683475a31325b26 Mon Sep 17 00:00:00 2001 From: arkpar Date: Wed, 1 Jul 2015 17:10:22 +0200 Subject: [PATCH 07/13] more diagnostics --- libethereum/BlockChainSync.cpp | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/libethereum/BlockChainSync.cpp b/libethereum/BlockChainSync.cpp index 7368d7632..9dda66b7d 100644 --- a/libethereum/BlockChainSync.cpp +++ b/libethereum/BlockChainSync.cpp @@ -284,7 +284,7 @@ void BlockChainSync::onPeerNewBlock(std::shared_ptr _peer, RLP con case ImportResult::FutureTimeUnknown: case ImportResult::UnknownParent: logNewBlock(h); - clog(NetMessageSummary) << "Received block with no known parent. Resyncing..."; + clog(NetMessageDetail) << "Received block with no known parent. Resyncing..."; resetSyncFor(_peer, h, _r[1].toInt()); break; default:; @@ -788,8 +788,12 @@ void PV60Sync::onPeerAborting() { RecursiveGuard l(x_sync); // Can't check invariants here since the peers is already removed from the list and the state is not updated yet. - if (m_syncer.expired()) + if (m_syncer.expired() && m_state != SyncState::Idle) + { + clog(NetWarn) << "Syncing peer disconnected, restarting sync"; + m_syncer.reset(); abortSync(); + } DEV_INVARIANT_CHECK; } @@ -1051,8 +1055,12 @@ void PV61Sync::onPeerAborting() else ++s; } - if (m_syncer.expired()) + if (m_syncer.expired() && m_state != SyncState::Idle) + { + clog(NetWarn) << "Syncing peer disconnected, restarting sync"; + m_syncer.reset(); abortSync(); + } else if (isPV61Syncing()) requestSubchains(); DEV_INVARIANT_CHECK; From f3850a5e8a437b0107e8427276e9de55a97ad18f Mon Sep 17 00:00:00 2001 From: arkpar Date: Wed, 1 Jul 2015 17:34:40 +0200 Subject: [PATCH 08/13] even more diagnostics --- libethereum/EthereumHost.cpp | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/libethereum/EthereumHost.cpp b/libethereum/EthereumHost.cpp index 5bd16ac12..dc6e46ad4 100644 --- a/libethereum/EthereumHost.cpp +++ b/libethereum/EthereumHost.cpp @@ -317,8 +317,15 @@ void EthereumHost::onPeerTransactions(std::shared_ptr _peer, RLP c void EthereumHost::onPeerAborting() { Guard l(x_sync); - if (m_sync) - m_sync->onPeerAborting(); + try + { + if (m_sync) + m_sync->onPeerAborting(); + } + catch (Exception&) + { + cwarn << "Exception on peer destruciton: " << boost::current_exception_diagnostic_information(); + } } bool EthereumHost::isSyncing() const From 382c4768261c2bdef7110f70d4066b1e1c2fe5c4 Mon Sep 17 00:00:00 2001 From: arkpar Date: Thu, 2 Jul 2015 09:18:16 +0200 Subject: [PATCH 09/13] removed incorrect invariants (subject to race condition) --- libethereum/BlockChainSync.cpp | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/libethereum/BlockChainSync.cpp b/libethereum/BlockChainSync.cpp index 9dda66b7d..282a3d564 100644 --- a/libethereum/BlockChainSync.cpp +++ b/libethereum/BlockChainSync.cpp @@ -805,10 +805,6 @@ bool PV60Sync::invariants() const BOOST_THROW_EXCEPTION(FailedInvariant() << errinfo_comment("Active while peer not syncing")); if (m_state == SyncState::Hashes) { - bool hashes = false; - host().foreachPeer([&](std::shared_ptr _p) { if (_p->m_asking == Asking::Hashes) hashes = true; return !hashes; }); - if (!hashes) - BOOST_THROW_EXCEPTION(FailedInvariant() << errinfo_comment("No peers asking for hashes")); if (!m_syncingLatestHash) BOOST_THROW_EXCEPTION(FailedInvariant() << errinfo_comment("m_syncingLatestHash is not set while downloading hashes")); if (m_syncingNeededBlocks.empty() != (!m_syncingLastReceivedHash)) @@ -816,10 +812,6 @@ bool PV60Sync::invariants() const } if (m_state == SyncState::Blocks || m_state == SyncState::NewBlocks) { - bool blocks = false; - host().foreachPeer([&](std::shared_ptr _p) { if (_p->m_asking == Asking::Blocks) blocks = true; return !blocks; }); - if (!blocks) - BOOST_THROW_EXCEPTION(FailedInvariant() << errinfo_comment("No peers asking for blocks")); if (downloadMan().isComplete()) BOOST_THROW_EXCEPTION(FailedInvariant() << errinfo_comment("Block download complete but the state is still Blocks")); } @@ -1095,10 +1087,6 @@ bool PV61Sync::invariants() const BOOST_THROW_EXCEPTION(FailedInvariant() << errinfo_comment("m_downloadingChainMap and m_chainSyncPeers out of sync")); if (m_state == SyncState::Hashes) { - bool hashes = false; - host().foreachPeer([&](std::shared_ptr _p) { if (_p->m_asking == Asking::Hashes) hashes = true; return !hashes; }); - if (!hashes) - BOOST_THROW_EXCEPTION(FailedInvariant() << errinfo_comment("No peers asking for hashes")); if (isPV61Syncing() && !m_syncingBlockNumber) BOOST_THROW_EXCEPTION(FailedInvariant() << errinfo_comment("Syncing in PV61 with no block number set")); } From 4983db8e58086c8e287c503e7be4f5aedfdc95bb Mon Sep 17 00:00:00 2001 From: arkpar Date: Thu, 2 Jul 2015 12:26:29 +0200 Subject: [PATCH 10/13] removed unneeded invariant checks, fixed disconnect message priority --- libethereum/BlockChainSync.cpp | 2 -- libp2p/Session.cpp | 2 +- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/libethereum/BlockChainSync.cpp b/libethereum/BlockChainSync.cpp index 9dda66b7d..30dbf2964 100644 --- a/libethereum/BlockChainSync.cpp +++ b/libethereum/BlockChainSync.cpp @@ -92,7 +92,6 @@ void BlockChainSync::onPeerStatus(std::shared_ptr _peer) _peer->m_expectedHashes = hashes; onNewPeer(_peer); } - DEV_INVARIANT_CHECK; } unsigned BlockChainSync::estimatedHashes() const @@ -293,7 +292,6 @@ void BlockChainSync::onPeerNewBlock(std::shared_ptr _peer, RLP con DEV_GUARDED(_peer->x_knownBlocks) _peer->m_knownBlocks.insert(h); } - DEV_INVARIANT_CHECK; } PV60Sync::PV60Sync(EthereumHost& _host): diff --git a/libp2p/Session.cpp b/libp2p/Session.cpp index e44c365f4..f28d25e88 100644 --- a/libp2p/Session.cpp +++ b/libp2p/Session.cpp @@ -315,7 +315,7 @@ void Session::drop(DisconnectReason _reason) if (socket.is_open()) try { - clog(NetWarn) << "Closing " << socket.remote_endpoint() << "(" << reasonOf(_reason) << ")"; + clog(NetConnect) << "Closing " << socket.remote_endpoint() << "(" << reasonOf(_reason) << ")"; boost::system::error_code ec; socket.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec); socket.close(); From 9cea8c669ee2c99f5b98a606a00660735831636d Mon Sep 17 00:00:00 2001 From: arkpar Date: Thu, 2 Jul 2015 16:17:33 +0200 Subject: [PATCH 11/13] keep the chain on syncer aborting during blocks downloading --- libethereum/BlockChainSync.cpp | 83 +++++++++++++++++++++++++--------- 1 file changed, 61 insertions(+), 22 deletions(-) diff --git a/libethereum/BlockChainSync.cpp b/libethereum/BlockChainSync.cpp index 30dbf2964..647ebb4aa 100644 --- a/libethereum/BlockChainSync.cpp +++ b/libethereum/BlockChainSync.cpp @@ -76,15 +76,18 @@ void BlockChainSync::onPeerStatus(std::shared_ptr _peer) { RecursiveGuard l(x_sync); DEV_INVARIANT_CHECK; + std::shared_ptr session = _peer->session(); + if (!session) + return; // Expired if (_peer->m_genesisHash != host().chain().genesisHash()) _peer->disable("Invalid genesis hash"); else if (_peer->m_protocolVersion != host().protocolVersion() && _peer->m_protocolVersion != EthereumHost::c_oldProtocolVersion) _peer->disable("Invalid protocol version."); else if (_peer->m_networkId != host().networkId()) _peer->disable("Invalid network identifier."); - else if (_peer->session()->info().clientVersion.find("/v0.7.0/") != string::npos) + else if (session->info().clientVersion.find("/v0.7.0/") != string::npos) _peer->disable("Blacklisted client version."); - else if (host().isBanned(_peer->session()->id())) + else if (host().isBanned(session->id())) _peer->disable("Peer banned for previous bad behaviour."); else { @@ -345,26 +348,30 @@ void PV60Sync::restartSync() { resetSync(); host().bq().clear(); - if (isSyncing()) - transition(m_syncer.lock(), SyncState::Idle); + std::shared_ptr syncer = m_syncer.lock(); + if (syncer) + transition(syncer, SyncState::Idle); } void PV60Sync::completeSync() { - if (isSyncing()) - transition(m_syncer.lock(), SyncState::Idle); + std::shared_ptr syncer = m_syncer.lock(); + if (syncer) + transition(syncer, SyncState::Idle); } void PV60Sync::pauseSync() { - if (isSyncing()) - setState(m_syncer.lock(), SyncState::Waiting, true); + std::shared_ptr syncer = m_syncer.lock(); + if (syncer) + transition(syncer, SyncState::Waiting, true); } void PV60Sync::continueSync() { - if (isSyncing()) - transition(m_syncer.lock(), SyncState::Blocks); + std::shared_ptr syncer = m_syncer.lock(); + if (syncer) + transition(syncer, SyncState::Blocks); } void PV60Sync::onNewPeer(std::shared_ptr _peer) @@ -485,7 +492,9 @@ void PV60Sync::setNeedsSyncing(std::shared_ptr _peer, h256 const& if (_peer->m_latestHash) noteNeedsSyncing(_peer); - _peer->session()->addNote("sync", string(isSyncing(_peer) ? "ongoing" : "holding") + (needsSyncing(_peer) ? " & needed" : "")); + shared_ptr session = _peer->session(); + if (session) + session->addNote("sync", string(isSyncing(_peer) ? "ongoing" : "holding") + (needsSyncing(_peer) ? " & needed" : "")); } bool PV60Sync::needsSyncing(std::shared_ptr _peer) const @@ -778,7 +787,33 @@ void PV60Sync::onPeerNewHashes(std::shared_ptr _peer, h256s const& void PV60Sync::abortSync() { // Can't check invariants here since the peers is already removed from the list and the state is not updated yet. - setState(std::shared_ptr(), SyncState::Idle, false, true); + bool continueSync = false; + if (m_state == SyncState::Blocks) + { + // Main syncer aborted, try to find a replacement + host().foreachPeer([&](std::shared_ptr _p) + { + if (_p->m_asking == Asking::Blocks) + { + setState(_p, SyncState::Blocks, true, true); // will kick off other peers to help if available. + continueSync = true; + return false; + } + if (_p->m_asking == Asking::Nothing && shouldGrabBlocks(_p)) + { + transition(_p, SyncState::Blocks); + clog(NetMessageDetail) << "New sync peer selected"; + continueSync = true; + return false; + } + return true; + }); + } + if (!continueSync) + { + // Just set to idle. Hashchain is keept, Sync will be continued if there are more peers to sync with + setState(std::shared_ptr(), SyncState::Idle, false, true); + } DEV_INVARIANT_CHECK; } @@ -957,7 +992,7 @@ void PV61Sync::onPeerHashes(std::shared_ptr _peer, h256s const& _h { auto syncPeer = m_chainSyncPeers.find(_peer); if (syncPeer == m_chainSyncPeers.end()) - clog(NetWarn) << "Hashes response from unexpected peer"; + clog(NetMessageDetail) << "Hashes response from unexpected peer"; else { // Peer does not have request hashes, move back from downloading to ready @@ -1013,8 +1048,16 @@ void PV61Sync::onPeerHashes(std::shared_ptr _peer, h256s const& _h else if (status == QueueStatus::Bad) { cwarn << "block hash bad!" << h << ". Bailing..."; - _peer->disable("Bad blocks"); - restartSync(); + _peer->disable("Bad hashes"); + if (isSyncing(_peer)) + restartSync(); + else + { + //try with other peer + m_readyChainMap[number] = move(m_downloadingChainMap.at(number)); + m_downloadingChainMap.erase(number); + m_chainSyncPeers.erase(_peer); + } return; } else if (status == QueueStatus::Unknown) @@ -1053,13 +1096,9 @@ void PV61Sync::onPeerAborting() else ++s; } - if (m_syncer.expired() && m_state != SyncState::Idle) - { - clog(NetWarn) << "Syncing peer disconnected, restarting sync"; - m_syncer.reset(); - abortSync(); - } - else if (isPV61Syncing()) + if (m_syncer.expired()) + PV60Sync::onPeerAborting(); + else if (isPV61Syncing() && m_state == SyncState::Hashes) requestSubchains(); DEV_INVARIANT_CHECK; } From de121cddcf13580511f9fe6d47d55fcd45f281e5 Mon Sep 17 00:00:00 2001 From: arkpar Date: Thu, 2 Jul 2015 18:02:35 +0200 Subject: [PATCH 12/13] download hashes from multiple peers when starving --- libethereum/BlockChainSync.cpp | 56 ++++++++++++++++++++++++++++------ 1 file changed, 46 insertions(+), 10 deletions(-) diff --git a/libethereum/BlockChainSync.cpp b/libethereum/BlockChainSync.cpp index 82a06b3b2..97684efb1 100644 --- a/libethereum/BlockChainSync.cpp +++ b/libethereum/BlockChainSync.cpp @@ -902,14 +902,23 @@ void PV61Sync::requestSubchain(std::shared_ptr _peer) h256s& d = m_downloadingChainMap.at(syncPeer->second); _peer->requestHashes(d.back()); } - else if (needsSyncing(_peer) && !m_readyChainMap.empty()) + else if (needsSyncing(_peer)) { - clog(NetAllDetail) << "Helping with hashchin download"; - h256s& d = m_readyChainMap.begin()->second; - _peer->requestHashes(d.back()); - m_downloadingChainMap[m_readyChainMap.begin()->first] = move(d); - m_chainSyncPeers[_peer] = m_readyChainMap.begin()->first; - m_readyChainMap.erase(m_readyChainMap.begin()); + if (!m_readyChainMap.empty()) + { + clog(NetAllDetail) << "Helping with hashchin download"; + h256s& d = m_readyChainMap.begin()->second; + _peer->requestHashes(d.back()); + m_downloadingChainMap[m_readyChainMap.begin()->first] = move(d); + m_chainSyncPeers[_peer] = m_readyChainMap.begin()->first; + m_readyChainMap.erase(m_readyChainMap.begin()); + } + else if (!m_downloadingChainMap.empty() && !m_completeChainMap.empty()) + { + // Lead syncer is done, just grab whatever we can + h256s& d = m_downloadingChainMap.begin()->second; + _peer->requestHashes(d.back()); + } } } @@ -1016,12 +1025,24 @@ void PV61Sync::onPeerHashes(std::shared_ptr _peer, h256s const& _h else { auto syncPeer = m_chainSyncPeers.find(_peer); + unsigned number = 0; if (syncPeer == m_chainSyncPeers.end()) { - clog(NetWarn) << "Hashes response from unexpected peer"; + //check downlading peers + for (auto const& downloader: m_downloadingChainMap) + if (downloader.second.back() == _peer->m_syncHash) + { + number = downloader.first; + break; + } + } + else + number = syncPeer->second; + if (number == 0) + { + clog(NetAllDetail) << "Hashes response from unexpected/expired peer"; return; } - unsigned number = syncPeer->second; h256s& hashes = m_downloadingChainMap.at(number); unsigned knowns = 0; @@ -1089,7 +1110,22 @@ void PV61Sync::onPeerAborting() ++s; } if (m_syncer.expired()) - PV60Sync::onPeerAborting(); + { + if (m_state == SyncState::Hashes) + { + // Main syncer aborted, other peers are probably still downloading hashes, just set one of them as syncer + host().foreachPeer([&](std::shared_ptr _p) + { + if (_p->m_asking != Asking::Hashes) + return true; + setState(_p, SyncState::Hashes, true, true); + return false; + }); + } + + if (m_syncer.expired()) + PV60Sync::onPeerAborting(); + } else if (isPV61Syncing() && m_state == SyncState::Hashes) requestSubchains(); DEV_INVARIANT_CHECK; From cf54c31c3ceb7ceb653debb2bb66af616474f8de Mon Sep 17 00:00:00 2001 From: arkpar Date: Thu, 2 Jul 2015 19:30:43 +0200 Subject: [PATCH 13/13] fixed starved download, removed obsolete invariant --- libethereum/BlockChainSync.cpp | 27 ++++++++++++++++++++++----- libethereum/BlockChainSync.h | 1 + libp2p/RLPXSocket.h | 4 ++-- 3 files changed, 25 insertions(+), 7 deletions(-) diff --git a/libethereum/BlockChainSync.cpp b/libethereum/BlockChainSync.cpp index 97684efb1..b8c613a54 100644 --- a/libethereum/BlockChainSync.cpp +++ b/libethereum/BlockChainSync.cpp @@ -868,6 +868,7 @@ void PV61Sync::syncHashes(std::shared_ptr _peer) m_syncingBlockNumber = 0; m_chainSyncPeers.clear(); m_knownHashes.clear(); + m_hashScanComplete = false; PV60Sync::syncHashes(_peer); return; } @@ -913,7 +914,7 @@ void PV61Sync::requestSubchain(std::shared_ptr _peer) m_chainSyncPeers[_peer] = m_readyChainMap.begin()->first; m_readyChainMap.erase(m_readyChainMap.begin()); } - else if (!m_downloadingChainMap.empty() && !m_completeChainMap.empty()) + else if (!m_downloadingChainMap.empty() && m_hashScanComplete) { // Lead syncer is done, just grab whatever we can h256s& d = m_downloadingChainMap.begin()->second; @@ -936,6 +937,13 @@ void PV61Sync::completeSubchain(std::shared_ptr _peer, unsigned _n { m_completeChainMap[_n] = move(m_downloadingChainMap.at(_n)); m_downloadingChainMap.erase(_n); + for (auto s = m_chainSyncPeers.begin(); s != m_chainSyncPeers.end(); ++s) + if (s->second == _n) //TODO: optimize this + { + m_chainSyncPeers.erase(s); + break; + } + _peer->m_syncHashNumber = 0; auto syncer = m_syncer.lock(); @@ -945,7 +953,7 @@ void PV61Sync::completeSubchain(std::shared_ptr _peer, unsigned _n return; } - if (m_readyChainMap.empty() && m_downloadingChainMap.empty() && syncer->m_asking == Asking::Nothing) + if (m_readyChainMap.empty() && m_downloadingChainMap.empty() && m_hashScanComplete) { //Done chain-get m_syncingNeededBlocks.clear(); @@ -968,6 +976,7 @@ void PV61Sync::restartSync() m_chainSyncPeers.clear(); m_syncingBlockNumber = 0; m_knownHashes.clear(); + m_hashScanComplete = false; PV60Sync::restartSync(); } @@ -986,6 +995,7 @@ void PV61Sync::onPeerHashes(std::shared_ptr _peer, h256s const& _h { // End of hash chain, add last chunk to download m_readyChainMap.insert(make_pair(m_syncingBlockNumber, h256s { _peer->m_latestHash })); + m_hashScanComplete = true; _peer->m_syncHashNumber = 0; requestSubchain(_peer); } @@ -1043,8 +1053,17 @@ void PV61Sync::onPeerHashes(std::shared_ptr _peer, h256s const& _h clog(NetAllDetail) << "Hashes response from unexpected/expired peer"; return; } - h256s& hashes = m_downloadingChainMap.at(number); + auto downloadingPeer = m_downloadingChainMap.find(number); + if (downloadingPeer == m_downloadingChainMap.end() || downloadingPeer->second.back() != _peer->m_syncHash) + { + // Too late, other peer has already downloaded our hashes + m_chainSyncPeers.erase(_peer); + requestSubchain(_peer); + return; + } + + h256s& hashes = downloadingPeer->second; unsigned knowns = 0; unsigned unknowns = 0; for (unsigned i = 0; i < _hashes.size(); ++i) @@ -1156,8 +1175,6 @@ bool PV61Sync::isPV61Syncing() const bool PV61Sync::invariants() const { - if (m_downloadingChainMap.size() != m_chainSyncPeers.size()) - BOOST_THROW_EXCEPTION(FailedInvariant() << errinfo_comment("m_downloadingChainMap and m_chainSyncPeers out of sync")); if (m_state == SyncState::Hashes) { if (isPV61Syncing() && !m_syncingBlockNumber) diff --git a/libethereum/BlockChainSync.h b/libethereum/BlockChainSync.h index 01727a0cb..213169add 100644 --- a/libethereum/BlockChainSync.h +++ b/libethereum/BlockChainSync.h @@ -315,6 +315,7 @@ private: std::map, unsigned, std::owner_less>> m_chainSyncPeers; ///< Peers to m_downloadingSubchain number map h256Hash m_knownHashes; ///< Subchain start markers. Used to track suchain completion unsigned m_syncingBlockNumber = 0; ///< Current subchain marker + bool m_hashScanComplete = false; ///< True if leading peer completed hashchain scan and we have a list of subchains ready }; } } diff --git a/libp2p/RLPXSocket.h b/libp2p/RLPXSocket.h index 389418c76..58613bf82 100644 --- a/libp2p/RLPXSocket.h +++ b/libp2p/RLPXSocket.h @@ -45,7 +45,7 @@ public: bool isConnected() const { return m_socket.is_open(); } void close() { try { boost::system::error_code ec; m_socket.shutdown(bi::tcp::socket::shutdown_both, ec); if (m_socket.is_open()) m_socket.close(); } catch (...){} } - bi::tcp::endpoint remoteEndpoint() { try { return m_socket.remote_endpoint(); } catch (...){ return bi::tcp::endpoint(); } } + bi::tcp::endpoint remoteEndpoint() { boost::system::error_code ec; return m_socket.remote_endpoint(ec); } bi::tcp::socket& ref() { return m_socket; } protected: @@ -53,4 +53,4 @@ protected: }; } -} \ No newline at end of file +}