diff --git a/alethzero/MainWin.cpp b/alethzero/MainWin.cpp index d7fc0b409..964553b03 100644 --- a/alethzero/MainWin.cpp +++ b/alethzero/MainWin.cpp @@ -1290,7 +1290,7 @@ void Main::refreshBlockCount() auto d = ethereum()->blockChain().details(); BlockQueueStatus b = ethereum()->blockQueueStatus(); SyncStatus sync = ethereum()->syncStatus(); - QString syncStatus = EthereumHost::stateName(sync.state); + QString syncStatus = QString("PV%1 %2").arg(sync.protocolVersion).arg(EthereumHost::stateName(sync.state)); if (sync.state == SyncState::Hashes) syncStatus += QString(": %1/%2%3").arg(sync.hashesReceived).arg(sync.hashesEstimated ? "~" : "").arg(sync.hashesTotal); if (sync.state == SyncState::Blocks || sync.state == SyncState::NewBlocks) diff --git a/libethereum/BlockChainSync.cpp b/libethereum/BlockChainSync.cpp index 04bb44d0b..b8c613a54 100644 --- a/libethereum/BlockChainSync.cpp +++ b/libethereum/BlockChainSync.cpp @@ -38,7 +38,8 @@ using namespace dev; using namespace dev::eth; using namespace p2p; -unsigned const c_chainReorgSize = 30000; +unsigned const c_chainReorgSize = 30000; /// Added to estimated hashes to account for potential chain reorganiation +unsigned const c_hashSubchainSize = 8192; /// PV61 subchain size BlockChainSync::BlockChainSync(EthereumHost& _host): m_host(_host) @@ -75,15 +76,18 @@ void BlockChainSync::onPeerStatus(std::shared_ptr _peer) { RecursiveGuard l(x_sync); DEV_INVARIANT_CHECK; + std::shared_ptr session = _peer->session(); + if (!session) + return; // Expired if (_peer->m_genesisHash != host().chain().genesisHash()) _peer->disable("Invalid genesis hash"); else if (_peer->m_protocolVersion != host().protocolVersion() && _peer->m_protocolVersion != EthereumHost::c_oldProtocolVersion) _peer->disable("Invalid protocol version."); else if (_peer->m_networkId != host().networkId()) _peer->disable("Invalid network identifier."); - else if (_peer->session()->info().clientVersion.find("/v0.7.0/") != string::npos) + else if (session->info().clientVersion.find("/v0.7.0/") != string::npos) _peer->disable("Blacklisted client version."); - else if (host().isBanned(_peer->session()->id())) + else if (host().isBanned(session->id())) _peer->disable("Peer banned for previous bad behaviour."); else { @@ -91,7 +95,6 @@ void BlockChainSync::onPeerStatus(std::shared_ptr _peer) _peer->m_expectedHashes = hashes; onNewPeer(_peer); } - DEV_INVARIANT_CHECK; } unsigned BlockChainSync::estimatedHashes() const @@ -114,7 +117,6 @@ void BlockChainSync::requestBlocks(std::shared_ptr _peer) { clog(NetAllDetail) << "Waiting for block queue before downloading blocks"; pauseSync(); - _peer->setIdle(); return; } _peer->requestBlocks(); @@ -137,13 +139,14 @@ void BlockChainSync::logNewBlock(h256 const& _h) void BlockChainSync::onPeerBlocks(std::shared_ptr _peer, RLP const& _r) { RecursiveGuard l(x_sync); - DEV_INVARIANT_CHECK; unsigned itemCount = _r.itemCount(); clog(NetMessageSummary) << "Blocks (" << dec << itemCount << "entries)" << (itemCount ? "" : ": NoMoreBlocks"); - _peer->setIdle(); if (m_state != SyncState::Blocks && m_state != SyncState::NewBlocks && m_state != SyncState::Waiting) - clog(NetWarn) << "Unexpected Blocks received!"; + { + clog(NetMessageSummary) << "Ignoring unexpected blocks"; + return; + } if (m_state == SyncState::Waiting) { clog(NetAllDetail) << "Ignored blocks while waiting"; @@ -184,6 +187,7 @@ void BlockChainSync::onPeerBlocks(std::shared_ptr _peer, RLP const case ImportResult::BadChain: logNewBlock(h); _peer->disable("Malformed block received."); + restartSync(); return; case ImportResult::FutureTimeKnown: @@ -282,7 +286,7 @@ void BlockChainSync::onPeerNewBlock(std::shared_ptr _peer, RLP con case ImportResult::FutureTimeUnknown: case ImportResult::UnknownParent: logNewBlock(h); - clog(NetMessageSummary) << "Received block with no known parent. Resyncing..."; + clog(NetMessageDetail) << "Received block with no known parent. Resyncing..."; resetSyncFor(_peer, h, _r[1].toInt()); break; default:; @@ -291,7 +295,6 @@ void BlockChainSync::onPeerNewBlock(std::shared_ptr _peer, RLP con DEV_GUARDED(_peer->x_knownBlocks) _peer->m_knownBlocks.insert(h); } - DEV_INVARIANT_CHECK; } PV60Sync::PV60Sync(EthereumHost& _host): @@ -305,6 +308,7 @@ SyncStatus PV60Sync::status() const RecursiveGuard l(x_sync); SyncStatus res; res.state = m_state; + res.protocolVersion = EthereumHost::c_oldProtocolVersion; if (m_state == SyncState::Hashes) { res.hashesTotal = m_estimatedHashes; @@ -344,26 +348,30 @@ void PV60Sync::restartSync() { resetSync(); host().bq().clear(); - if (isSyncing()) - transition(m_syncer.lock(), SyncState::Idle); + std::shared_ptr syncer = m_syncer.lock(); + if (syncer) + transition(syncer, SyncState::Idle); } void PV60Sync::completeSync() { - if (isSyncing()) - transition(m_syncer.lock(), SyncState::Idle); + std::shared_ptr syncer = m_syncer.lock(); + if (syncer) + transition(syncer, SyncState::Idle); } void PV60Sync::pauseSync() { - if (isSyncing()) - setState(m_syncer.lock(), SyncState::Waiting, true); + std::shared_ptr syncer = m_syncer.lock(); + if (syncer) + transition(syncer, SyncState::Waiting, true); } void PV60Sync::continueSync() { - if (isSyncing()) - transition(m_syncer.lock(), SyncState::Blocks); + std::shared_ptr syncer = m_syncer.lock(); + if (syncer) + transition(syncer, SyncState::Blocks); } void PV60Sync::onNewPeer(std::shared_ptr _peer) @@ -381,26 +389,10 @@ void PV60Sync::transition(std::shared_ptr _peer, SyncState _s, boo RLPStream s; if (_s == SyncState::Hashes) { - if (m_state == SyncState::Idle) - { - if (isSyncing(_peer)) - clog(NetWarn) << "Bad state: not asking for Hashes, yet syncing!"; - - m_syncingLatestHash = _peer->m_latestHash; - m_syncingTotalDifficulty = _peer->m_totalDifficulty; - setState(_peer, _s, true); - _peer->requestHashes(m_syncingLastReceivedHash ? m_syncingLastReceivedHash : m_syncingLatestHash); - DEV_INVARIANT_CHECK; - return; - } - else if (m_state == SyncState::Hashes) + if (m_state == SyncState::Idle || m_state == SyncState::Hashes) { - if (!isSyncing(_peer)) - clog(NetWarn) << "Bad state: asking for Hashes yet not syncing!"; - - setState(_peer, _s, true); - _peer->requestHashes(m_syncingLastReceivedHash); - DEV_INVARIANT_CHECK; + m_estimatedHashes = _peer->m_expectedHashes - c_chainReorgSize; + syncHashes(_peer); return; } } @@ -462,7 +454,6 @@ void PV60Sync::transition(std::shared_ptr _peer, SyncState _s, boo } else if (_s == SyncState::Idle) { - host().foreachPeer([this](std::shared_ptr _p) { _p->setIdle(); return true; }); if (m_state == SyncState::Blocks || m_state == SyncState::NewBlocks) { clog(NetMessageDetail) << "Finishing blocks fetch..."; @@ -473,7 +464,6 @@ void PV60Sync::transition(std::shared_ptr _peer, SyncState _s, boo // NOTE: need to notify of giving up on chain-hashes, too, altering state as necessary. _peer->m_sub.doneFetch(); - _peer->setIdle(); setState(_peer, SyncState::Idle, false); } else if (m_state == SyncState::Hashes) @@ -502,7 +492,9 @@ void PV60Sync::setNeedsSyncing(std::shared_ptr _peer, h256 const& if (_peer->m_latestHash) noteNeedsSyncing(_peer); - _peer->session()->addNote("sync", string(isSyncing(_peer) ? "ongoing" : "holding") + (needsSyncing(_peer) ? " & needed" : "")); + shared_ptr session = _peer->session(); + if (session) + session->addNote("sync", string(isSyncing(_peer) ? "ongoing" : "holding") + (needsSyncing(_peer) ? " & needed" : "")); } bool PV60Sync::needsSyncing(std::shared_ptr _peer) const @@ -562,7 +554,6 @@ void PV60Sync::attemptSync(std::shared_ptr _peer) else { clog(NetAllDetail) << "Yes. Their chain is better."; - m_estimatedHashes = _peer->m_expectedHashes - c_chainReorgSize; transition(_peer, SyncState::Hashes); } } @@ -573,7 +564,9 @@ void PV60Sync::noteNeedsSyncing(std::shared_ptr _peer) if (isSyncing()) { clog(NetAllDetail) << "Sync in progress: Just set to help out."; - if (m_state == SyncState::Blocks) + if (m_state == SyncState::Hashes && _peer->m_asking == Asking::Nothing) + requestSubchain(_peer); + else if (m_state == SyncState::Blocks) requestBlocks(_peer); } else @@ -649,20 +642,45 @@ void PV60Sync::noteDoneBlocks(std::shared_ptr _peer, bool _clemenc } resetSync(); downloadMan().reset(); + } _peer->m_sub.doneFetch(); +} + +void PV60Sync::syncHashes(std::shared_ptr _peer) +{ + if (m_state == SyncState::Idle) + { + if (isSyncing(_peer)) + clog(NetWarn) << "Bad state: not asking for Hashes, yet syncing!"; + + m_syncingLatestHash = _peer->m_latestHash; + m_syncingTotalDifficulty = _peer->m_totalDifficulty; + setState(_peer, SyncState::Hashes, true); + _peer->requestHashes(m_syncingLastReceivedHash ? m_syncingLastReceivedHash : m_syncingLatestHash); + } + else if (m_state == SyncState::Hashes) + { + if (!isSyncing(_peer)) + clog(NetWarn) << "Bad state: asking for Hashes yet not syncing!"; + + setState(_peer, SyncState::Hashes, true); + _peer->requestHashes(m_syncingLastReceivedHash); } } void PV60Sync::onPeerHashes(std::shared_ptr _peer, h256s const& _hashes) { RecursiveGuard l(x_sync); - DEV_INVARIANT_CHECK; - _peer->setIdle(); if (!isSyncing(_peer)) { clog(NetMessageSummary) << "Ignoring hashes since not syncing"; return; } + if (_peer->m_syncHash != (m_syncingLastReceivedHash ? m_syncingLastReceivedHash : m_syncingLatestHash)) + { + clog(NetMessageSummary) << "Ignoring unexpected hashes"; + return; + } if (_hashes.size() == 0) { transition(_peer, SyncState::Blocks); @@ -684,7 +702,8 @@ void PV60Sync::onPeerHashes(std::shared_ptr _peer, h256s const& _h else if (status == QueueStatus::Bad) { cwarn << "block hash bad!" << h << ". Bailing..."; - transition(_peer, SyncState::Idle); + _peer->disable("Bad blocks"); + restartSync(); return; } else if (status == QueueStatus::Unknown) @@ -711,7 +730,6 @@ void PV60Sync::onPeerHashes(std::shared_ptr _peer, h256s const& _h void PV60Sync::onPeerNewHashes(std::shared_ptr _peer, h256s const& _hashes) { RecursiveGuard l(x_sync); - DEV_INVARIANT_CHECK; if (isSyncing() && (m_state != SyncState::NewBlocks || isSyncing(_peer))) { clog(NetMessageSummary) << "Ignoring since we're already downloading."; @@ -769,8 +787,33 @@ void PV60Sync::onPeerNewHashes(std::shared_ptr _peer, h256s const& void PV60Sync::abortSync() { // Can't check invariants here since the peers is already removed from the list and the state is not updated yet. - host().foreachPeer([this](std::shared_ptr _p) { _p->setIdle(); return true; }); - setState(std::shared_ptr(), SyncState::Idle, false, true); + bool continueSync = false; + if (m_state == SyncState::Blocks) + { + // Main syncer aborted, try to find a replacement + host().foreachPeer([&](std::shared_ptr _p) + { + if (_p->m_asking == Asking::Blocks) + { + setState(_p, SyncState::Blocks, true, true); // will kick off other peers to help if available. + continueSync = true; + return false; + } + if (_p->m_asking == Asking::Nothing && shouldGrabBlocks(_p)) + { + transition(_p, SyncState::Blocks); + clog(NetMessageDetail) << "New sync peer selected"; + continueSync = true; + return false; + } + return true; + }); + } + if (!continueSync) + { + // Just set to idle. Hashchain is keept, Sync will be continued if there are more peers to sync with + setState(std::shared_ptr(), SyncState::Idle, false, true); + } DEV_INVARIANT_CHECK; } @@ -778,37 +821,366 @@ void PV60Sync::onPeerAborting() { RecursiveGuard l(x_sync); // Can't check invariants here since the peers is already removed from the list and the state is not updated yet. - if (m_syncer.expired()) + if (m_syncer.expired() && m_state != SyncState::Idle) + { + clog(NetWarn) << "Syncing peer disconnected, restarting sync"; + m_syncer.reset(); abortSync(); + } DEV_INVARIANT_CHECK; } bool PV60Sync::invariants() const { if (m_state == SyncState::Idle && isSyncing()) - return false; + BOOST_THROW_EXCEPTION(FailedInvariant() << errinfo_comment("Idle while peer syncing")); if (m_state != SyncState::Idle && !isSyncing()) - return false; + BOOST_THROW_EXCEPTION(FailedInvariant() << errinfo_comment("Active while peer not syncing")); if (m_state == SyncState::Hashes) { if (!m_syncingLatestHash) - return false; + BOOST_THROW_EXCEPTION(FailedInvariant() << errinfo_comment("m_syncingLatestHash is not set while downloading hashes")); if (m_syncingNeededBlocks.empty() != (!m_syncingLastReceivedHash)) - return false; + BOOST_THROW_EXCEPTION(FailedInvariant() << errinfo_comment("Received hashes but the hashes list is empty (or the other way around)")); } if (m_state == SyncState::Blocks || m_state == SyncState::NewBlocks) { if (downloadMan().isComplete()) - return false; + BOOST_THROW_EXCEPTION(FailedInvariant() << errinfo_comment("Block download complete but the state is still Blocks")); + } + if (m_state == SyncState::Waiting && !host().bq().isActive()) + BOOST_THROW_EXCEPTION(FailedInvariant() << errinfo_comment("Waiting while block queue is idle")); + return true; +} + +PV61Sync::PV61Sync(EthereumHost& _host): + PV60Sync(_host) +{ +} + +void PV61Sync::syncHashes(std::shared_ptr _peer) +{ + if (_peer->m_protocolVersion != host().protocolVersion()) + { + m_readyChainMap.clear(); + m_completeChainMap.clear(); + m_downloadingChainMap.clear(); + m_syncingBlockNumber = 0; + m_chainSyncPeers.clear(); + m_knownHashes.clear(); + m_hashScanComplete = false; + PV60Sync::syncHashes(_peer); + return; } if (m_state == SyncState::Idle) { - bool busy = false; - host().foreachPeer([&](std::shared_ptr _p) { if (_p->m_asking != Asking::Nothing && _p->m_asking != Asking::State) busy = true; return !busy; }); - if (busy) - return false; + if (isSyncing(_peer)) + clog(NetWarn) << "Bad state: not asking for Hashes, yet syncing!"; + + if (m_syncingBlockNumber == 0) + m_syncingBlockNumber = host().chain().number() + c_hashSubchainSize; + m_syncingTotalDifficulty = _peer->m_totalDifficulty; + setState(_peer, SyncState::Hashes, true); + _peer->requestHashes(m_syncingBlockNumber, 1); } - if (m_state == SyncState::Waiting && !host().bq().isActive()) - return false; + else if (m_state == SyncState::Hashes) + { + if (!isSyncing(_peer)) + clog(NetWarn) << "Bad state: asking for Hashes yet not syncing!"; + + m_syncingBlockNumber += c_hashSubchainSize; + setState(_peer, SyncState::Hashes, true); + _peer->requestHashes(m_syncingBlockNumber, 1); + } +} + +void PV61Sync::requestSubchain(std::shared_ptr _peer) +{ + auto syncPeer = m_chainSyncPeers.find(_peer); + if (syncPeer != m_chainSyncPeers.end()) + { + // Already downoading, request next batch + h256s& d = m_downloadingChainMap.at(syncPeer->second); + _peer->requestHashes(d.back()); + } + else if (needsSyncing(_peer)) + { + if (!m_readyChainMap.empty()) + { + clog(NetAllDetail) << "Helping with hashchin download"; + h256s& d = m_readyChainMap.begin()->second; + _peer->requestHashes(d.back()); + m_downloadingChainMap[m_readyChainMap.begin()->first] = move(d); + m_chainSyncPeers[_peer] = m_readyChainMap.begin()->first; + m_readyChainMap.erase(m_readyChainMap.begin()); + } + else if (!m_downloadingChainMap.empty() && m_hashScanComplete) + { + // Lead syncer is done, just grab whatever we can + h256s& d = m_downloadingChainMap.begin()->second; + _peer->requestHashes(d.back()); + } + } +} + +void PV61Sync::requestSubchains() +{ + host().foreachPeer([this](std::shared_ptr _p) + { + if (_p->m_asking == Asking::Nothing) + requestSubchain(_p); + return true; + }); +} + +void PV61Sync::completeSubchain(std::shared_ptr _peer, unsigned _n) +{ + m_completeChainMap[_n] = move(m_downloadingChainMap.at(_n)); + m_downloadingChainMap.erase(_n); + for (auto s = m_chainSyncPeers.begin(); s != m_chainSyncPeers.end(); ++s) + if (s->second == _n) //TODO: optimize this + { + m_chainSyncPeers.erase(s); + break; + } + + _peer->m_syncHashNumber = 0; + + auto syncer = m_syncer.lock(); + if (!syncer) + { + restartSync(); + return; + } + + if (m_readyChainMap.empty() && m_downloadingChainMap.empty() && m_hashScanComplete) + { + //Done chain-get + m_syncingNeededBlocks.clear(); + for (auto h = m_completeChainMap.rbegin(); h != m_completeChainMap.rend(); ++h) + m_syncingNeededBlocks.insert(m_syncingNeededBlocks.end(), h->second.begin(), h->second.end()); + m_completeChainMap.clear(); + m_knownHashes.clear(); + m_syncingBlockNumber = 0; + transition(syncer, SyncState::Blocks); + } + else + requestSubchain(_peer); +} + +void PV61Sync::restartSync() +{ + m_completeChainMap.clear(); + m_readyChainMap.clear(); + m_downloadingChainMap.clear(); + m_chainSyncPeers.clear(); + m_syncingBlockNumber = 0; + m_knownHashes.clear(); + m_hashScanComplete = false; + PV60Sync::restartSync(); +} + +void PV61Sync::onPeerHashes(std::shared_ptr _peer, h256s const& _hashes) +{ + RecursiveGuard l(x_sync); + if (m_syncingBlockNumber == 0 || (_peer == m_syncer.lock() && _peer->m_protocolVersion != host().protocolVersion())) + { + // Syncing in pv60 mode + PV60Sync::onPeerHashes(_peer, _hashes); + return; + } + if (_hashes.size() == 0) + { + if (isSyncing(_peer) && _peer->m_syncHashNumber == m_syncingBlockNumber) + { + // End of hash chain, add last chunk to download + m_readyChainMap.insert(make_pair(m_syncingBlockNumber, h256s { _peer->m_latestHash })); + m_hashScanComplete = true; + _peer->m_syncHashNumber = 0; + requestSubchain(_peer); + } + else + { + auto syncPeer = m_chainSyncPeers.find(_peer); + if (syncPeer == m_chainSyncPeers.end()) + clog(NetMessageDetail) << "Hashes response from unexpected peer"; + else + { + // Peer does not have request hashes, move back from downloading to ready + unsigned number = syncPeer->second; + m_chainSyncPeers.erase(_peer); + m_readyChainMap[number] = move(m_downloadingChainMap.at(number)); + m_downloadingChainMap.erase(number); + resetNeedsSyncing(_peer); + requestSubchains(); + } + } + return; + } + if (isSyncing(_peer) && _peer->m_syncHashNumber == m_syncingBlockNumber) + { + // Got new subchain marker + assert(_hashes.size() == 1); + m_knownHashes.insert(_hashes[0]); + m_readyChainMap.insert(make_pair(m_syncingBlockNumber, h256s { _hashes[0] })); + if ((m_readyChainMap.size() + m_downloadingChainMap.size() + m_completeChainMap.size()) * c_hashSubchainSize > _peer->m_expectedHashes) + { + _peer->disable("Too many hashes from lead peer"); + restartSync(); + return; + } + transition(_peer, SyncState::Hashes); + requestSubchains(); + } + else + { + auto syncPeer = m_chainSyncPeers.find(_peer); + unsigned number = 0; + if (syncPeer == m_chainSyncPeers.end()) + { + //check downlading peers + for (auto const& downloader: m_downloadingChainMap) + if (downloader.second.back() == _peer->m_syncHash) + { + number = downloader.first; + break; + } + } + else + number = syncPeer->second; + if (number == 0) + { + clog(NetAllDetail) << "Hashes response from unexpected/expired peer"; + return; + } + + auto downloadingPeer = m_downloadingChainMap.find(number); + if (downloadingPeer == m_downloadingChainMap.end() || downloadingPeer->second.back() != _peer->m_syncHash) + { + // Too late, other peer has already downloaded our hashes + m_chainSyncPeers.erase(_peer); + requestSubchain(_peer); + return; + } + + h256s& hashes = downloadingPeer->second; + unsigned knowns = 0; + unsigned unknowns = 0; + for (unsigned i = 0; i < _hashes.size(); ++i) + { + auto h = _hashes[i]; + auto status = host().bq().blockStatus(h); + if (status == QueueStatus::Importing || status == QueueStatus::Ready || host().chain().isKnown(h) || !!m_knownHashes.count(h)) + { + clog(NetMessageSummary) << "Subchain download complete"; + m_chainSyncPeers.erase(_peer); + completeSubchain(_peer, number); + return; + } + else if (status == QueueStatus::Bad) + { + cwarn << "block hash bad!" << h << ". Bailing..."; + _peer->disable("Bad hashes"); + if (isSyncing(_peer)) + restartSync(); + else + { + //try with other peer + m_readyChainMap[number] = move(m_downloadingChainMap.at(number)); + m_downloadingChainMap.erase(number); + m_chainSyncPeers.erase(_peer); + } + return; + } + else if (status == QueueStatus::Unknown) + { + unknowns++; + hashes.push_back(h); + } + else + knowns++; + } + clog(NetMessageSummary) << knowns << "knowns," << unknowns << "unknowns; now at" << hashes.back(); + if (hashes.size() > c_hashSubchainSize) + { + _peer->disable("Too many subchain hashes"); + restartSync(); + return; + } + requestSubchain(_peer); + } + DEV_INVARIANT_CHECK; +} + +void PV61Sync::onPeerAborting() +{ + RecursiveGuard l(x_sync); + // Can't check invariants here since the peers is already removed from the list and the state is not updated yet. + for (auto s = m_chainSyncPeers.begin(); s != m_chainSyncPeers.end();) + { + if (s->first.expired()) + { + unsigned number = s->second; + m_readyChainMap[number] = move(m_downloadingChainMap.at(number)); + m_downloadingChainMap.erase(number); + m_chainSyncPeers.erase(s++); + } + else + ++s; + } + if (m_syncer.expired()) + { + if (m_state == SyncState::Hashes) + { + // Main syncer aborted, other peers are probably still downloading hashes, just set one of them as syncer + host().foreachPeer([&](std::shared_ptr _p) + { + if (_p->m_asking != Asking::Hashes) + return true; + setState(_p, SyncState::Hashes, true, true); + return false; + }); + } + + if (m_syncer.expired()) + PV60Sync::onPeerAborting(); + } + else if (isPV61Syncing() && m_state == SyncState::Hashes) + requestSubchains(); + DEV_INVARIANT_CHECK; +} + +SyncStatus PV61Sync::status() const +{ + RecursiveGuard l(x_sync); + SyncStatus res = PV60Sync::status(); + if (m_state == SyncState::Hashes && isPV61Syncing()) + { + res.protocolVersion = 61; + res.hashesReceived = 0; + for (auto const& d : m_readyChainMap) + res.hashesReceived += d.second.size(); + for (auto const& d : m_downloadingChainMap) + res.hashesReceived += d.second.size(); + for (auto const& d : m_completeChainMap) + res.hashesReceived += d.second.size(); + } + return res; +} + +bool PV61Sync::isPV61Syncing() const +{ + return m_syncingBlockNumber != 0; +} + +bool PV61Sync::invariants() const +{ + if (m_state == SyncState::Hashes) + { + if (isPV61Syncing() && !m_syncingBlockNumber) + BOOST_THROW_EXCEPTION(FailedInvariant() << errinfo_comment("Syncing in PV61 with no block number set")); + } + else if (!PV60Sync::invariants()) + return false; return true; } diff --git a/libethereum/BlockChainSync.h b/libethereum/BlockChainSync.h index 793b05b5b..5501075fd 100644 --- a/libethereum/BlockChainSync.h +++ b/libethereum/BlockChainSync.h @@ -131,8 +131,8 @@ private: /** - * @brief Syncrhonization over PV60. Selects a single peer and tries to downloading hashes from it. After hash downaload is complete - * Syncs to peers and keeps up to date + * @brief Syncrhonization over PV60. Selects a single peer and tries to downloading hashes from it. After hash download is complete + * syncs to peers and keeps up to date */ /** @@ -227,7 +227,7 @@ protected: void pauseSync() override; void resetSyncFor(std::shared_ptr _peer, h256 const& _latestHash, u256 const& _td) override; -private: +protected: /// Transition sync state in a particular direction. @param _peer Peer that is responsible for state tranfer void transition(std::shared_ptr _peer, SyncState _s, bool _force = false, bool _needHelp = true); @@ -261,6 +261,12 @@ private: /// Called when peer done downloading blocks void noteDoneBlocks(std::shared_ptr _who, bool _clemency); + /// Start chainhash sync + virtual void syncHashes(std::shared_ptr _peer); + + /// Request subchain, no-op for pv60 + virtual void requestSubchain(std::shared_ptr /*_peer*/) {} + /// Abort syncing void abortSync(); @@ -275,5 +281,40 @@ private: u256 m_syncingTotalDifficulty; ///< Latest block's total difficulty of the peer we aresyncing to, as of the current sync. std::weak_ptr m_syncer; ///< Peer we are currently syncing with }; + +/** + * @brief Syncrhonization over PV61. Selects a single peer and requests every c_hashSubchainSize hash, splitting the hashchain into subchains and downloading each subchain in parallel. + * Syncs to peers and keeps up to date + */ +class PV61Sync: public PV60Sync +{ +public: + PV61Sync(EthereumHost& _host); + +protected: + void restartSync() override; + void requestSubchain(std::shared_ptr _peer) override; + void syncHashes(std::shared_ptr _peer) override; + void onPeerHashes(std::shared_ptr _peer, h256s const& _hashes) override; + void onPeerAborting() override; + SyncStatus status() const override; + bool invariants() const override; + +private: + /// Called when subchain is complete. Check if if hashchain is fully downloaded and proceed to downloading blocks + void completeSubchain(std::shared_ptr _peer, unsigned _n); + /// Find a subchain for peers to downloading + void requestSubchains(); + /// Check if downloading hashes in parallel + bool isPV61Syncing() const; + + std::map m_completeChainMap; ///< Fully downloaded subchains + std::map m_readyChainMap; ///< Subchains ready for download + std::map m_downloadingChainMap; ///< Subchains currently being downloading. In sync with m_chainSyncPeers + std::map, unsigned, std::owner_less>> m_chainSyncPeers; ///< Peers to m_downloadingSubchain number map + h256Hash m_knownHashes; ///< Subchain start markers. Used to track suchain completion + unsigned m_syncingBlockNumber = 0; ///< Current subchain marker + bool m_hashScanComplete = false; ///< True if leading peer completed hashchain scan and we have a list of subchains ready +}; } } diff --git a/libethereum/BlockQueue.cpp b/libethereum/BlockQueue.cpp index 3a5138ca3..5e0a523d8 100644 --- a/libethereum/BlockQueue.cpp +++ b/libethereum/BlockQueue.cpp @@ -21,6 +21,7 @@ #include "BlockQueue.h" #include +#include #include #include #include @@ -473,7 +474,13 @@ void BlockQueue::drain(VerifiedBlocks& o_out, unsigned _max) bool BlockQueue::invariants() const { Guard l(m_verification); - return m_readySet.size() == m_verified.size() + m_unverified.size() + m_verifying.size(); + if (!(m_readySet.size() == m_verified.size() + m_unverified.size() + m_verifying.size())) + { + std::stringstream s; + s << "Failed BlockQueue invariant: m_readySet: " << m_readySet.size() << " m_verified: " << m_verified.size() << " m_unverified: " << m_unverified.size() << " m_verifying" << m_verifying.size(); + BOOST_THROW_EXCEPTION(FailedInvariant() << errinfo_comment(s.str())); + } + return true; } void BlockQueue::noteReady_WITH_LOCK(h256 const& _good) diff --git a/libethereum/CommonNet.h b/libethereum/CommonNet.h index 35a27ff5e..d03e04d85 100644 --- a/libethereum/CommonNet.h +++ b/libethereum/CommonNet.h @@ -91,6 +91,7 @@ enum class SyncState struct SyncStatus { SyncState state = SyncState::Idle; + unsigned protocolVersion = 0; unsigned hashesTotal = 0; unsigned hashesReceived = 0; bool hashesEstimated = false; diff --git a/libethereum/EthereumHost.cpp b/libethereum/EthereumHost.cpp index 55a1d1bf0..d1d6e8838 100644 --- a/libethereum/EthereumHost.cpp +++ b/libethereum/EthereumHost.cpp @@ -230,10 +230,10 @@ void EthereumHost::maintainBlocks(h256 const& _currentHash) } } -BlockChainSync& EthereumHost::sync() +BlockChainSync* EthereumHost::sync() { if (m_sync) - return *m_sync; // We only chose sync strategy once + return m_sync.get(); // We only chose sync strategy once bool pv61 = false; foreachPeer([&](std::shared_ptr _p) @@ -242,38 +242,43 @@ BlockChainSync& EthereumHost::sync() pv61 = true; return !pv61; }); - m_sync.reset(pv61 ? new PV60Sync(*this) : new PV60Sync(*this)); - return *m_sync; + m_sync.reset(pv61 ? new PV61Sync(*this) : new PV60Sync(*this)); + return m_sync.get(); } void EthereumHost::onPeerStatus(std::shared_ptr _peer) { Guard l(x_sync); - sync().onPeerStatus(_peer); + if (sync()) + sync()->onPeerStatus(_peer); } void EthereumHost::onPeerHashes(std::shared_ptr _peer, h256s const& _hashes) { Guard l(x_sync); - sync().onPeerHashes(_peer, _hashes); + if (sync()) + sync()->onPeerHashes(_peer, _hashes); } void EthereumHost::onPeerBlocks(std::shared_ptr _peer, RLP const& _r) { Guard l(x_sync); - sync().onPeerBlocks(_peer, _r); + if (sync()) + sync()->onPeerBlocks(_peer, _r); } void EthereumHost::onPeerNewHashes(std::shared_ptr _peer, h256s const& _hashes) { Guard l(x_sync); - sync().onPeerNewHashes(_peer, _hashes); + if (sync()) + sync()->onPeerNewHashes(_peer, _hashes); } void EthereumHost::onPeerNewBlock(std::shared_ptr _peer, RLP const& _r) { Guard l(x_sync); - sync().onPeerNewBlock(_peer, _r); + if (sync()) + sync()->onPeerNewBlock(_peer, _r); } void EthereumHost::onPeerTransactions(std::shared_ptr _peer, RLP const& _r) @@ -312,8 +317,15 @@ void EthereumHost::onPeerTransactions(std::shared_ptr _peer, RLP c void EthereumHost::onPeerAborting() { Guard l(x_sync); - if (m_sync) - m_sync->onPeerAborting(); + try + { + if (m_sync) + m_sync->onPeerAborting(); + } + catch (Exception&) + { + cwarn << "Exception on peer destruciton: " << boost::current_exception_diagnostic_information(); + } } bool EthereumHost::isSyncing() const diff --git a/libethereum/EthereumHost.h b/libethereum/EthereumHost.h index a4b9d0006..5c5f16ff7 100644 --- a/libethereum/EthereumHost.h +++ b/libethereum/EthereumHost.h @@ -115,7 +115,7 @@ private: virtual void onStarting() override { startWorking(); } virtual void onStopping() override { stopWorking(); } - BlockChainSync& sync(); + BlockChainSync* sync(); BlockChain const& m_chain; TransactionQueue& m_tq; ///< Maintains a list of incoming transactions not yet in a block on the blockchain. diff --git a/libethereum/EthereumPeer.cpp b/libethereum/EthereumPeer.cpp index eea5d23dc..3b7a72b63 100644 --- a/libethereum/EthereumPeer.cpp +++ b/libethereum/EthereumPeer.cpp @@ -127,20 +127,19 @@ void EthereumPeer::requestStatus() m_requireTransactions = true; RLPStream s; bool latest = m_peerCapabilityVersion == host()->protocolVersion(); - prep(s, StatusPacket, latest ? 6 : 5) + prep(s, StatusPacket, 5) << (latest ? host()->protocolVersion() : EthereumHost::c_oldProtocolVersion) << host()->networkId() << host()->chain().details().totalDifficulty << host()->chain().currentHash() << host()->chain().genesisHash(); - if (latest) - s << u256(host()->chain().number()); sealAndSend(s); } void EthereumPeer::requestHashes(u256 _number, unsigned _count) { assert(m_asking == Asking::Nothing); + assert(m_protocolVersion == host()->protocolVersion()); m_syncHashNumber = _number; m_syncHash = h256(); setAsking(Asking::Hashes); @@ -198,7 +197,7 @@ void EthereumPeer::requestBlocks() void EthereumPeer::setAsking(Asking _a) { m_asking = _a; - m_lastAsk = chrono::system_clock::now(); + m_lastAsk = std::chrono::system_clock::to_time_t(chrono::system_clock::now()); auto s = session(); if (s) @@ -211,7 +210,8 @@ void EthereumPeer::setAsking(Asking _a) void EthereumPeer::tick() { auto s = session(); - if (s && (chrono::system_clock::now() - m_lastAsk > chrono::seconds(10) && m_asking != Asking::Nothing)) + time_t now = std::chrono::system_clock::to_time_t(chrono::system_clock::now()); + if (s && (now - m_lastAsk > 10 && m_asking != Asking::Nothing)) // timeout s->disconnect(PingTimeout); } @@ -228,6 +228,7 @@ bool EthereumPeer::isCriticalSyncing() const bool EthereumPeer::interpret(unsigned _id, RLP const& _r) { + m_lastAsk = std::chrono::system_clock::to_time_t(chrono::system_clock::now()); try { switch (_id) @@ -240,21 +241,10 @@ bool EthereumPeer::interpret(unsigned _id, RLP const& _r) m_latestHash = _r[3].toHash(); m_genesisHash = _r[4].toHash(); if (m_peerCapabilityVersion == host()->protocolVersion()) - { - if (_r.itemCount() != 6) - { - clog(NetImpolite) << "Peer does not support PV61+ status extension."; - m_protocolVersion = EthereumHost::c_oldProtocolVersion; - } - else - { - m_protocolVersion = host()->protocolVersion(); - m_latestBlockNumber = _r[5].toInt(); - } - } + m_protocolVersion = host()->protocolVersion(); - clog(NetMessageSummary) << "Status:" << m_protocolVersion << "/" << m_networkId << "/" << m_genesisHash << "/" << m_latestBlockNumber << ", TD:" << m_totalDifficulty << "=" << m_latestHash; - setAsking(Asking::Nothing); + clog(NetMessageSummary) << "Status:" << m_protocolVersion << "/" << m_networkId << "/" << m_genesisHash << ", TD:" << m_totalDifficulty << "=" << m_latestHash; + setIdle(); host()->onPeerStatus(dynamic_pointer_cast(dynamic_pointer_cast(shared_from_this()))); break; } @@ -311,6 +301,7 @@ bool EthereumPeer::interpret(unsigned _id, RLP const& _r) clog(NetWarn) << "Peer giving us hashes when we didn't ask for them."; break; } + setIdle(); h256s hashes(itemCount); for (unsigned i = 0; i < itemCount; ++i) hashes[i] = _r[i].toHash(); @@ -357,7 +348,10 @@ bool EthereumPeer::interpret(unsigned _id, RLP const& _r) if (m_asking != Asking::Blocks) clog(NetImpolite) << "Peer giving us blocks when we didn't ask for them."; else + { + setIdle(); host()->onPeerBlocks(dynamic_pointer_cast(shared_from_this()), _r); + } break; } case NewBlockPacket: @@ -381,9 +375,9 @@ bool EthereumPeer::interpret(unsigned _id, RLP const& _r) return false; } } - catch (Exception const& _e) + catch (Exception const&) { - clog(NetWarn) << "Peer causing an Exception:" << _e.what() << _r; + clog(NetWarn) << "Peer causing an Exception:" << boost::current_exception_diagnostic_information() << _r; } catch (std::exception const& _e) { diff --git a/libethereum/EthereumPeer.h b/libethereum/EthereumPeer.h index e8842c8af..23d52b346 100644 --- a/libethereum/EthereumPeer.h +++ b/libethereum/EthereumPeer.h @@ -135,14 +135,12 @@ private: /// What, if anything, we last asked the other peer for. Asking m_asking = Asking::Nothing; /// When we asked for it. Allows a time out. - std::chrono::system_clock::time_point m_lastAsk; + std::atomic m_lastAsk; /// These are determined through either a Status message or from NewBlock. h256 m_latestHash; ///< Peer's latest block's hash that we know about or default null value if no need to sync. u256 m_totalDifficulty; ///< Peer's latest block's total difficulty. h256 m_genesisHash; ///< Peer's genesis hash - u256 m_latestBlockNumber; ///< Number of the latest block this peer has - /// This is built as we ask for hashes. Once no more hashes are given, we present this to the /// host who initialises the DownloadMan and m_sub becomes active for us to begin asking for blocks. diff --git a/libp2p/RLPXSocket.h b/libp2p/RLPXSocket.h index 389418c76..58613bf82 100644 --- a/libp2p/RLPXSocket.h +++ b/libp2p/RLPXSocket.h @@ -45,7 +45,7 @@ public: bool isConnected() const { return m_socket.is_open(); } void close() { try { boost::system::error_code ec; m_socket.shutdown(bi::tcp::socket::shutdown_both, ec); if (m_socket.is_open()) m_socket.close(); } catch (...){} } - bi::tcp::endpoint remoteEndpoint() { try { return m_socket.remote_endpoint(); } catch (...){ return bi::tcp::endpoint(); } } + bi::tcp::endpoint remoteEndpoint() { boost::system::error_code ec; return m_socket.remote_endpoint(ec); } bi::tcp::socket& ref() { return m_socket; } protected: @@ -53,4 +53,4 @@ protected: }; } -} \ No newline at end of file +} diff --git a/libp2p/Session.cpp b/libp2p/Session.cpp index 08c411ca0..803824558 100644 --- a/libp2p/Session.cpp +++ b/libp2p/Session.cpp @@ -334,11 +334,12 @@ void Session::drop(DisconnectReason _reason) void Session::disconnect(DisconnectReason _reason) { clog(NetConnect) << "Disconnecting (our reason:" << reasonOf(_reason) << ")"; + size_t peerCount = m_server->peerCount(); //needs to be outside of lock to avoid deadlocking with other thread that capture x_info/x_sessions in reverse order DEV_GUARDED(x_info) StructuredLogger::p2pDisconnected( m_info.id.abridged(), m_peer->endpoint, // TODO: may not be 100% accurate - m_server->peerCount() + peerCount ); if (m_socket->ref().is_open()) {