/* This file is part of cpp-ethereum. cpp-ethereum is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. cpp-ethereum is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with cpp-ethereum. If not, see . */ /** @file BlockChainSync.cpp * @author Gav Wood * @date 2014 */ #include "BlockChainSync.h" #include #include #include #include #include #include #include "BlockChain.h" #include "BlockQueue.h" #include "EthereumPeer.h" #include "EthereumHost.h" #include "DownloadMan.h" using namespace std; using namespace dev; using namespace dev::eth; using namespace p2p; unsigned const c_chainReorgSize = 30000; /// Added to estimated hashes to account for potential chain reorganiation unsigned const c_hashSubchainSize = 8192; /// PV61 subchain size std::ostream& dev::eth::operator<<(std::ostream& _out, SyncStatus const& _sync) { _out << "protocol: " << _sync.protocolVersion << endl; _out << "state: " << EthereumHost::stateName(_sync.state) << " "; if (_sync.state == SyncState::Hashes) _out << _sync.hashesReceived << "/" << (_sync.hashesEstimated ? "~" : "") << _sync.hashesTotal; if (_sync.state == SyncState::Blocks || _sync.state == SyncState::NewBlocks) _out << _sync.blocksReceived << "/" << _sync.blocksTotal; return _out; } BlockChainSync::BlockChainSync(EthereumHost& _host): m_host(_host) { m_bqRoomAvailable = host().bq().onRoomAvailable([this]() { RecursiveGuard l(x_sync); continueSync(); }); } BlockChainSync::~BlockChainSync() { RecursiveGuard l(x_sync); abortSync(); } DownloadMan const& BlockChainSync::downloadMan() const { return host().downloadMan(); } DownloadMan& BlockChainSync::downloadMan() { return host().downloadMan(); } void BlockChainSync::abortSync() { downloadMan().reset(); } void BlockChainSync::onPeerStatus(std::shared_ptr _peer) { RecursiveGuard l(x_sync); DEV_INVARIANT_CHECK; std::shared_ptr session = _peer->session(); if (!session) return; // Expired if (_peer->m_genesisHash != host().chain().genesisHash()) _peer->disable("Invalid genesis hash"); else if (_peer->m_protocolVersion != host().protocolVersion() && _peer->m_protocolVersion != EthereumHost::c_oldProtocolVersion) _peer->disable("Invalid protocol version."); else if (_peer->m_networkId != host().networkId()) _peer->disable("Invalid network identifier."); else if (session->info().clientVersion.find("/v0.7.0/") != string::npos) _peer->disable("Blacklisted client version."); else if (host().isBanned(session->id())) _peer->disable("Peer banned for previous bad behaviour."); else { unsigned hashes = estimatedHashes(); _peer->m_expectedHashes = hashes; onNewPeer(_peer); } } unsigned BlockChainSync::estimatedHashes() const { BlockInfo block = host().chain().info(); time_t lastBlockTime = (block.hash() == host().chain().genesisHash()) ? 1428192000 : (time_t)block.timestamp; time_t now = time(0); unsigned blockCount = c_chainReorgSize; if (lastBlockTime > now) clog(NetWarn) << "Clock skew? Latest block is in the future"; else blockCount += (now - lastBlockTime) / (unsigned)c_durationLimit; clog(NetAllDetail) << "Estimated hashes: " << blockCount; return blockCount; } void BlockChainSync::requestBlocks(std::shared_ptr _peer) { if (host().bq().knownFull()) { clog(NetAllDetail) << "Waiting for block queue before downloading blocks"; pauseSync(); return; } _peer->requestBlocks(); if (_peer->m_asking != Asking::Blocks) //nothing to download { peerDoneBlocks(_peer); if (downloadMan().isComplete()) completeSync(); return; } } void BlockChainSync::logNewBlock(h256 const& _h) { if (m_state == SyncState::NewBlocks) clog(NetNote) << "NewBlock: " << _h; m_knownNewHashes.erase(_h); } void BlockChainSync::onPeerBlocks(std::shared_ptr _peer, RLP const& _r) { RecursiveGuard l(x_sync); unsigned itemCount = _r.itemCount(); clog(NetMessageSummary) << "Blocks (" << dec << itemCount << "entries)" << (itemCount ? "" : ": NoMoreBlocks"); if (m_state != SyncState::Blocks && m_state != SyncState::NewBlocks && m_state != SyncState::Waiting) { clog(NetMessageSummary) << "Ignoring unexpected blocks"; return; } if (m_state == SyncState::Waiting) { clog(NetAllDetail) << "Ignored blocks while waiting"; return; } if (itemCount == 0) { // Got to this peer's latest block - just give up. peerDoneBlocks(_peer); if (downloadMan().isComplete()) completeSync(); return; } unsigned success = 0; unsigned future = 0; unsigned unknown = 0; unsigned got = 0; unsigned repeated = 0; u256 maxUnknownNumber = 0; h256 maxUnknown; for (unsigned i = 0; i < itemCount; ++i) { auto h = BlockInfo::headerHash(_r[i].data()); if (_peer->m_sub.noteBlock(h)) { _peer->addRating(10); switch (host().bq().import(_r[i].data(), host().chain())) { case ImportResult::Success: success++; logNewBlock(h); break; case ImportResult::Malformed: case ImportResult::BadChain: logNewBlock(h); _peer->disable("Malformed block received."); restartSync(); return; case ImportResult::FutureTimeKnown: logNewBlock(h); future++; break; case ImportResult::AlreadyInChain: case ImportResult::AlreadyKnown: got++; break; case ImportResult::FutureTimeUnknown: future++; //Fall through case ImportResult::UnknownParent: { unknown++; logNewBlock(h); if (m_state == SyncState::NewBlocks) { BlockInfo bi; bi.populateFromHeader(_r[i][0]); if (bi.number > maxUnknownNumber) { maxUnknownNumber = bi.number; maxUnknown = h; } } break; } default:; } } else { _peer->addRating(0); // -1? repeated++; } } clog(NetMessageSummary) << dec << success << "imported OK," << unknown << "with unknown parents," << future << "with future timestamps," << got << " already known," << repeated << " repeats received."; if (host().bq().unknownFull()) { clog(NetWarn) << "Too many unknown blocks, restarting sync"; restartSync(); return; } if (m_state == SyncState::NewBlocks && unknown > 0) { completeSync(); resetSyncFor(_peer, maxUnknown, std::numeric_limits::max()); //TODO: proper total difficuty } if (m_state == SyncState::Blocks || m_state == SyncState::NewBlocks) { if (downloadMan().isComplete()) completeSync(); else requestBlocks(_peer); // Some of the blocks might have been downloaded by helping peers, proceed anyway } DEV_INVARIANT_CHECK; } void BlockChainSync::onPeerNewBlock(std::shared_ptr _peer, RLP const& _r) { DEV_INVARIANT_CHECK; RecursiveGuard l(x_sync); auto h = BlockInfo::headerHash(_r[0].data()); if (_r.itemCount() != 2) _peer->disable("NewBlock without 2 data fields."); else { switch (host().bq().import(_r[0].data(), host().chain())) { case ImportResult::Success: _peer->addRating(100); logNewBlock(h); break; case ImportResult::FutureTimeKnown: //TODO: Rating dependent on how far in future it is. break; case ImportResult::Malformed: case ImportResult::BadChain: logNewBlock(h); _peer->disable("Malformed block received."); return; case ImportResult::AlreadyInChain: case ImportResult::AlreadyKnown: break; case ImportResult::FutureTimeUnknown: case ImportResult::UnknownParent: { logNewBlock(h); u256 totalDifficulty = _r[1].toInt(); if (totalDifficulty > _peer->m_totalDifficulty) { clog(NetMessageDetail) << "Received block with no known parent. Resyncing..."; resetSyncFor(_peer, h, totalDifficulty); } break; } default:; } DEV_GUARDED(_peer->x_knownBlocks) _peer->m_knownBlocks.insert(h); } } PV60Sync::PV60Sync(EthereumHost& _host): BlockChainSync(_host) { resetSync(); } SyncStatus PV60Sync::status() const { RecursiveGuard l(x_sync); SyncStatus res; res.state = m_state; res.protocolVersion = EthereumHost::c_oldProtocolVersion; if (m_state == SyncState::Hashes) { res.hashesTotal = m_estimatedHashes; res.hashesReceived = static_cast(m_syncingNeededBlocks.size()); res.hashesEstimated = true; } else if (m_state == SyncState::Blocks || m_state == SyncState::NewBlocks || m_state == SyncState::Waiting) { res.blocksTotal = downloadMan().chainSize(); res.blocksReceived = downloadMan().blocksGot().size(); } return res; } void PV60Sync::setState(std::shared_ptr _peer, SyncState _s, bool _isSyncing, bool _needHelp) { bool changedState = (m_state != _s); m_state = _s; if (_isSyncing != (m_syncer.lock() == _peer) || (_isSyncing && changedState)) changeSyncer(_isSyncing ? _peer : nullptr, _needHelp); else if (_s == SyncState::Idle) changeSyncer(nullptr, _needHelp); assert(isSyncing() || _s == SyncState::Idle); } void PV60Sync::resetSync() { m_syncingLatestHash = h256(); m_syncingLastReceivedHash = h256(); m_syncingTotalDifficulty = 0; m_syncingNeededBlocks.clear(); } void PV60Sync::restartSync() { resetSync(); host().bq().clear(); std::shared_ptr syncer = m_syncer.lock(); if (syncer) transition(syncer, SyncState::Idle); } void PV60Sync::completeSync() { std::shared_ptr syncer = m_syncer.lock(); if (syncer) transition(syncer, SyncState::Idle); } void PV60Sync::pauseSync() { std::shared_ptr syncer = m_syncer.lock(); if (syncer) transition(syncer, SyncState::Waiting, true); } void PV60Sync::continueSync() { std::shared_ptr syncer = m_syncer.lock(); if (syncer) transition(syncer, SyncState::Blocks); } void PV60Sync::onNewPeer(std::shared_ptr _peer) { setNeedsSyncing(_peer, _peer->m_latestHash, _peer->m_totalDifficulty); } void PV60Sync::transition(std::shared_ptr _peer, SyncState _s, bool _force, bool _needHelp) { clog(NetMessageSummary) << "Transition!" << EthereumHost::stateName(_s) << "from" << EthereumHost::stateName(m_state) << ", " << (isSyncing(_peer) ? "syncing" : "holding") << (needsSyncing(_peer) ? "& needed" : ""); if (m_state == SyncState::Idle && _s != SyncState::Idle) _peer->m_requireTransactions = true; if (_s == SyncState::Hashes) { if (m_state == SyncState::Idle || m_state == SyncState::Hashes) { m_estimatedHashes = _peer->m_expectedHashes - c_chainReorgSize; syncHashes(_peer); return; } } else if (_s == SyncState::Blocks) { if (m_state == SyncState::Hashes) { if (!isSyncing(_peer)) { clog(NetWarn) << "Bad state: asking for Hashes yet not syncing!"; return; } if (shouldGrabBlocks(_peer)) { clog(NetMessageDetail) << "Difficulty of hashchain HIGHER. Grabbing" << m_syncingNeededBlocks.size() << "blocks [latest now" << m_syncingLatestHash << ", was" << host().latestBlockSent() << "]"; downloadMan().resetToChain(m_syncingNeededBlocks); resetSync(); } else { clog(NetMessageDetail) << "Difficulty of hashchain not HIGHER. Ignoring."; resetSync(); setState(_peer, SyncState::Idle, false); return; } assert (isSyncing(_peer)); } // run through into... if (m_state == SyncState::Idle || m_state == SyncState::Hashes || m_state == SyncState::Blocks || m_state == SyncState::Waiting) { // Looks like it's the best yet for total difficulty. Set to download. setState(_peer, SyncState::Blocks, isSyncing(_peer), _needHelp); // will kick off other peers to help if available. requestBlocks(_peer); DEV_INVARIANT_CHECK; return; } } else if (_s == SyncState::NewBlocks) { if (m_state != SyncState::Idle && m_state != SyncState::NewBlocks && m_state != SyncState::Waiting) clog(NetWarn) << "Bad state: Asking new blocks while syncing!"; else { setState(_peer, SyncState::NewBlocks, true, _needHelp); requestBlocks(_peer); DEV_INVARIANT_CHECK; return; } } else if (_s == SyncState::Waiting) { if (m_state != SyncState::Blocks && m_state != SyncState::NewBlocks && m_state != SyncState::Hashes && m_state != SyncState::Waiting) clog(NetWarn) << "Bad state: Entering waiting state while not downloading blocks!"; else { setState(_peer, SyncState::Waiting, isSyncing(_peer), _needHelp); return; } } else if (_s == SyncState::Idle) { if (m_state == SyncState::Blocks || m_state == SyncState::NewBlocks) { clog(NetMessageDetail) << "Finishing blocks fetch..."; // a bit overkill given that the other nodes may yet have the needed blocks, but better to be safe than sorry. if (isSyncing(_peer)) noteDoneBlocks(_peer, _force); // NOTE: need to notify of giving up on chain-hashes, too, altering state as necessary. _peer->m_sub.doneFetch(); setState(_peer, SyncState::Idle, false); } else if (m_state == SyncState::Hashes) { clog(NetMessageDetail) << "Finishing hashes fetch..."; setState(_peer, SyncState::Idle, false); } // Otherwise it's fine. We don't care if it's Nothing->Nothing. DEV_INVARIANT_CHECK; return; } clog(NetWarn) << "Invalid state transition:" << EthereumHost::stateName(_s) << "from" << EthereumHost::stateName(m_state) << ", " << (isSyncing(_peer) ? "syncing" : "holding") << (needsSyncing(_peer) ? "& needed" : ""); } void PV60Sync::resetSyncFor(std::shared_ptr _peer, h256 const& _latestHash, u256 const& _td) { setNeedsSyncing(_peer, _latestHash, _td); } void PV60Sync::setNeedsSyncing(std::shared_ptr _peer, h256 const& _latestHash, u256 const& _td) { _peer->m_latestHash = _latestHash; _peer->m_totalDifficulty = _td; if (_peer->m_latestHash) noteNeedsSyncing(_peer); shared_ptr session = _peer->session(); if (session) session->addNote("sync", string(isSyncing(_peer) ? "ongoing" : "holding") + (needsSyncing(_peer) ? " & needed" : "")); } bool PV60Sync::needsSyncing(std::shared_ptr _peer) const { return !!_peer->m_latestHash; } bool PV60Sync::isSyncing(std::shared_ptr _peer) const { return m_syncer.lock() == _peer; } bool PV60Sync::shouldGrabBlocks(std::shared_ptr _peer) const { auto td = _peer->m_totalDifficulty; auto lh = _peer->m_latestHash; auto ctd = host().chain().details().totalDifficulty; if (m_syncingNeededBlocks.empty()) return false; clog(NetMessageDetail) << "Should grab blocks? " << td << "vs" << ctd << ";" << m_syncingNeededBlocks.size() << " blocks, ends" << m_syncingNeededBlocks.back(); if (td < ctd || (td == ctd && host().chain().currentHash() == lh)) return false; return true; } void PV60Sync::attemptSync(std::shared_ptr _peer) { if (m_state != SyncState::Idle || _peer->m_asking != Asking::Nothing) { clog(NetAllDetail) << "Can't sync with this peer - outstanding asks."; return; } // if already done this, then ignore. if (!needsSyncing(_peer)) { clog(NetAllDetail) << "Already synced with this peer."; return; } unsigned n = host().chain().number(); u256 td = host().chain().details().totalDifficulty; if (host().bq().isActive()) td += host().bq().difficulty(); clog(NetAllDetail) << "Attempt chain-grab? Latest:" << (m_syncingLastReceivedHash ? m_syncingLastReceivedHash : m_syncingLatestHash) << ", number:" << n << ", TD:" << td << " versus " << _peer->m_totalDifficulty; if (td >= _peer->m_totalDifficulty) { clog(NetAllDetail) << "No. Our chain is better."; resetNeedsSyncing(_peer); transition(_peer, SyncState::Idle); } else { clog(NetAllDetail) << "Yes. Their chain is better."; transition(_peer, SyncState::Hashes); } } void PV60Sync::noteNeedsSyncing(std::shared_ptr _peer) { // if already downloading hash-chain, ignore. if (isSyncing()) { clog(NetAllDetail) << "Sync in progress: Just set to help out."; if (m_state == SyncState::Hashes && _peer->m_asking == Asking::Nothing) requestSubchain(_peer); else if (m_state == SyncState::Blocks) requestBlocks(_peer); } else // otherwise check to see if we should be downloading... attemptSync(_peer); } void PV60Sync::changeSyncer(std::shared_ptr _syncer, bool _needHelp) { if (_syncer) clog(NetAllDetail) << "Changing syncer to" << _syncer->session()->socketId(); else clog(NetAllDetail) << "Clearing syncer."; m_syncer = _syncer; if (isSyncing()) { if (_needHelp && (m_state == SyncState::Blocks || m_state == SyncState::NewBlocks)) host().foreachPeer([&](std::shared_ptr _p) { clog(NetMessageDetail) << "Getting help with downloading blocks"; if (_p != _syncer && _p->m_asking == Asking::Nothing) transition(_p, m_state); return true; }); } else { // start grabbing next hash chain if there is one. host().foreachPeer([this](std::shared_ptr _p) { attemptSync(_p); return !isSyncing(); }); if (!isSyncing()) { if (m_state != SyncState::Idle) setState(_syncer, SyncState::Idle); clog(NetMessageDetail) << "No more peers to sync with."; } } assert(isSyncing() || m_state == SyncState::Idle); } void PV60Sync::peerDoneBlocks(std::shared_ptr _peer) { noteDoneBlocks(_peer, false); } void PV60Sync::noteDoneBlocks(std::shared_ptr _peer, bool _clemency) { resetNeedsSyncing(_peer); if (downloadMan().isComplete()) { // Done our chain-get. clog(NetMessageDetail) << "Chain download complete."; // 1/100th for each useful block hash. _peer->addRating(downloadMan().chainSize() / 100); downloadMan().reset(); } else if (isSyncing(_peer)) { if (_clemency) clog(NetNote) << "Chain download failed. Aborted while incomplete."; else { // Done our chain-get. clog(NetWarn) << "Chain download failed. Peer with blocks didn't have them all. This peer is bad and should be punished."; clog(NetWarn) << downloadMan().remaining(); clog(NetWarn) << "WOULD BAN."; // m_banned.insert(_peer->session()->id()); // We know who you are! // _peer->disable("Peer sent hashes but was unable to provide the blocks."); } resetSync(); downloadMan().reset(); } _peer->m_sub.doneFetch(); } void PV60Sync::syncHashes(std::shared_ptr _peer) { if (m_state == SyncState::Idle) { if (isSyncing(_peer)) clog(NetWarn) << "Bad state: not asking for Hashes, yet syncing!"; m_syncingLatestHash = _peer->m_latestHash; m_syncingTotalDifficulty = _peer->m_totalDifficulty; setState(_peer, SyncState::Hashes, true); _peer->requestHashes(m_syncingLastReceivedHash ? m_syncingLastReceivedHash : m_syncingLatestHash); } else if (m_state == SyncState::Hashes) { if (!isSyncing(_peer)) clog(NetWarn) << "Bad state: asking for Hashes yet not syncing!"; setState(_peer, SyncState::Hashes, true); _peer->requestHashes(m_syncingLastReceivedHash); } } void PV60Sync::onPeerHashes(std::shared_ptr _peer, h256s const& _hashes) { RecursiveGuard l(x_sync); if (!isSyncing(_peer)) { clog(NetMessageSummary) << "Ignoring hashes since not syncing"; return; } if (_peer->m_syncHash != (m_syncingLastReceivedHash ? m_syncingLastReceivedHash : m_syncingLatestHash)) { clog(NetMessageSummary) << "Ignoring unexpected hashes"; return; } if (_hashes.size() == 0) { transition(_peer, SyncState::Blocks); return; } unsigned knowns = 0; unsigned unknowns = 0; for (unsigned i = 0; i < _hashes.size(); ++i) { auto h = _hashes[i]; auto status = host().bq().blockStatus(h); if (status == QueueStatus::Importing || status == QueueStatus::Ready || host().chain().isKnown(h)) { clog(NetMessageSummary) << "block hash ready:" << h << ". Start blocks download..."; assert (isSyncing(_peer)); transition(_peer, SyncState::Blocks); return; } else if (status == QueueStatus::Bad) { cwarn << "block hash bad!" << h << ". Bailing..."; _peer->disable("Bad blocks"); restartSync(); return; } else if (status == QueueStatus::Unknown) { unknowns++; m_syncingNeededBlocks.push_back(h); } else knowns++; m_syncingLastReceivedHash = h; } clog(NetMessageSummary) << knowns << "knowns," << unknowns << "unknowns; now at" << m_syncingLastReceivedHash; if (m_syncingNeededBlocks.size() > _peer->m_expectedHashes) { _peer->disable("Too many hashes"); restartSync(); return; } // run through - ask for more. transition(_peer, SyncState::Hashes); DEV_INVARIANT_CHECK; } void PV60Sync::onPeerNewHashes(std::shared_ptr _peer, h256s const& _hashes) { RecursiveGuard l(x_sync); if (isSyncing() && (m_state != SyncState::NewBlocks || isSyncing(_peer))) { clog(NetMessageSummary) << "Ignoring since we're already downloading."; return; } clog(NetMessageDetail) << "Not syncing and new block hash discovered: syncing without help."; unsigned knowns = 0; unsigned unknowns = 0; for (auto const& h: _hashes) { _peer->addRating(1); DEV_GUARDED(_peer->x_knownBlocks) _peer->m_knownBlocks.insert(h); auto status = host().bq().blockStatus(h); if (status == QueueStatus::Importing || status == QueueStatus::Ready || host().chain().isKnown(h)) knowns++; else if (status == QueueStatus::Bad) { cwarn << "block hash bad!" << h << ". Bailing..."; return; } else if (status == QueueStatus::Unknown) { unknowns++; m_syncingNeededBlocks.push_back(h); } else knowns++; } clog(NetMessageSummary) << knowns << "knowns," << unknowns << "unknowns"; if (unknowns > 0) { if (m_state == SyncState::NewBlocks) { clog(NetMessageDetail) << "Downloading new blocks and seeing new hashes. Trying grabbing blocks"; _peer->requestBlocks(m_syncingNeededBlocks); } else { clog(NetMessageDetail) << "Not syncing and new block hash discovered: syncing without help."; downloadMan().resetToChain(m_syncingNeededBlocks); transition(_peer, SyncState::NewBlocks, false, false); } for (auto const& h: m_syncingNeededBlocks) if (!m_knownNewHashes.count(h)) { m_knownNewHashes.insert(h); clog(NetNote) << "NewHash: " << h; } resetSync(); } DEV_INVARIANT_CHECK; } void PV60Sync::abortSync() { // Can't check invariants here since the peers is already removed from the list and the state is not updated yet. bool continueSync = false; if (m_state == SyncState::Blocks) { // Main syncer aborted, try to find a replacement host().foreachPeer([&](std::shared_ptr _p) { if (_p->m_asking == Asking::Blocks) { setState(_p, SyncState::Blocks, true, true); // will kick off other peers to help if available. continueSync = true; return false; } if (_p->m_asking == Asking::Nothing && shouldGrabBlocks(_p)) { transition(_p, SyncState::Blocks); clog(NetMessageDetail) << "New sync peer selected"; continueSync = true; return false; } return true; }); } if (!continueSync) { // Just set to idle. Hashchain is keept, Sync will be continued if there are more peers to sync with setState(std::shared_ptr(), SyncState::Idle, false, true); } DEV_INVARIANT_CHECK; } void PV60Sync::onPeerAborting() { RecursiveGuard l(x_sync); // Can't check invariants here since the peers is already removed from the list and the state is not updated yet. if (m_syncer.expired() && m_state != SyncState::Idle) { clog(NetWarn) << "Syncing peer disconnected, restarting sync"; m_syncer.reset(); abortSync(); } DEV_INVARIANT_CHECK; } bool PV60Sync::invariants() const { if (m_state == SyncState::Idle && isSyncing()) BOOST_THROW_EXCEPTION(FailedInvariant() << errinfo_comment("Idle while peer syncing")); if (m_state != SyncState::Idle && !isSyncing()) BOOST_THROW_EXCEPTION(FailedInvariant() << errinfo_comment("Active while peer not syncing")); if (m_state == SyncState::Hashes) { if (!m_syncingLatestHash) BOOST_THROW_EXCEPTION(FailedInvariant() << errinfo_comment("m_syncingLatestHash is not set while downloading hashes")); if (m_syncingNeededBlocks.empty() != (!m_syncingLastReceivedHash)) BOOST_THROW_EXCEPTION(FailedInvariant() << errinfo_comment("Received hashes but the hashes list is empty (or the other way around)")); } if (m_state == SyncState::Blocks || m_state == SyncState::NewBlocks) { if (downloadMan().isComplete()) BOOST_THROW_EXCEPTION(FailedInvariant() << errinfo_comment("Block download complete but the state is still Blocks")); } if (m_state == SyncState::Waiting && !host().bq().isActive()) BOOST_THROW_EXCEPTION(FailedInvariant() << errinfo_comment("Waiting while block queue is idle")); return true; } PV61Sync::PV61Sync(EthereumHost& _host): PV60Sync(_host) { } void PV61Sync::syncHashes(std::shared_ptr _peer) { if (_peer->m_protocolVersion != host().protocolVersion()) { m_readyChainMap.clear(); m_completeChainMap.clear(); m_downloadingChainMap.clear(); m_syncingBlockNumber = 0; m_chainSyncPeers.clear(); m_knownHashes.clear(); m_hashScanComplete = false; PV60Sync::syncHashes(_peer); return; } if (m_state == SyncState::Idle) { if (isSyncing(_peer)) clog(NetWarn) << "Bad state: not asking for Hashes, yet syncing!"; if (m_syncingBlockNumber == 0) m_syncingBlockNumber = host().chain().number() + c_hashSubchainSize; m_syncingTotalDifficulty = _peer->m_totalDifficulty; setState(_peer, SyncState::Hashes, true); _peer->requestHashes(m_syncingBlockNumber, 1); } else if (m_state == SyncState::Hashes) { if (!isSyncing(_peer)) clog(NetWarn) << "Bad state: asking for Hashes yet not syncing!"; m_syncingBlockNumber += c_hashSubchainSize; setState(_peer, SyncState::Hashes, true); _peer->requestHashes(m_syncingBlockNumber, 1); } } void PV61Sync::requestSubchain(std::shared_ptr _peer) { auto syncPeer = m_chainSyncPeers.find(_peer); if (syncPeer != m_chainSyncPeers.end()) { // Already downoading, request next batch SubChain const& s = m_downloadingChainMap.at(syncPeer->second); _peer->requestHashes(s.lastHash); } else if (needsSyncing(_peer)) { if (!m_readyChainMap.empty()) { clog(NetAllDetail) << "Helping with hashchin download"; SubChain& s = m_readyChainMap.begin()->second; _peer->requestHashes(s.lastHash); m_downloadingChainMap[m_readyChainMap.begin()->first] = move(s); m_chainSyncPeers[_peer] = m_readyChainMap.begin()->first; m_readyChainMap.erase(m_readyChainMap.begin()); } else if (!m_downloadingChainMap.empty() && m_hashScanComplete) // Lead syncer is done, just grab whatever we can _peer->requestHashes(m_downloadingChainMap.begin()->second.lastHash); } } void PV61Sync::requestSubchains() { host().foreachPeer([this](std::shared_ptr _p) { if (_p->m_asking == Asking::Nothing) requestSubchain(_p); return true; }); } void PV61Sync::completeSubchain(std::shared_ptr _peer, unsigned _n) { m_completeChainMap[_n] = move(m_downloadingChainMap.at(_n)); m_downloadingChainMap.erase(_n); for (auto s = m_chainSyncPeers.begin(); s != m_chainSyncPeers.end(); ++s) if (s->second == _n) //TODO: optimize this { m_chainSyncPeers.erase(s); break; } _peer->m_syncHashNumber = 0; auto syncer = m_syncer.lock(); if (!syncer) { restartSync(); return; } if (m_readyChainMap.empty() && m_downloadingChainMap.empty() && m_hashScanComplete) { //Done chain-get m_syncingNeededBlocks.clear(); for (auto h = m_completeChainMap.rbegin(); h != m_completeChainMap.rend(); ++h) m_syncingNeededBlocks.insert(m_syncingNeededBlocks.end(), h->second.hashes.begin(), h->second.hashes.end()); m_completeChainMap.clear(); m_knownHashes.clear(); m_syncingBlockNumber = 0; transition(syncer, SyncState::Blocks); } else requestSubchain(_peer); } void PV61Sync::restartSync() { m_completeChainMap.clear(); m_readyChainMap.clear(); m_downloadingChainMap.clear(); m_chainSyncPeers.clear(); m_syncingBlockNumber = 0; m_knownHashes.clear(); m_hashScanComplete = false; PV60Sync::restartSync(); } void PV61Sync::onPeerHashes(std::shared_ptr _peer, h256s const& _hashes) { RecursiveGuard l(x_sync); if (m_syncingBlockNumber == 0 || (_peer == m_syncer.lock() && _peer->m_protocolVersion != host().protocolVersion())) { // Syncing in pv60 mode PV60Sync::onPeerHashes(_peer, _hashes); return; } if (_hashes.size() == 0) { if (isSyncing(_peer) && _peer->m_syncHashNumber == m_syncingBlockNumber) { // End of hash chain, add last chunk to download m_readyChainMap.insert(make_pair(m_syncingBlockNumber, SubChain{ h256s{ _peer->m_latestHash }, _peer->m_latestHash })); m_hashScanComplete = true; _peer->m_syncHashNumber = 0; requestSubchain(_peer); } else { auto syncPeer = m_chainSyncPeers.find(_peer); if (syncPeer == m_chainSyncPeers.end()) clog(NetMessageDetail) << "Hashes response from unexpected peer"; else { // Peer does not have request hashes, move back from downloading to ready unsigned number = syncPeer->second; m_chainSyncPeers.erase(_peer); m_readyChainMap[number] = move(m_downloadingChainMap.at(number)); m_downloadingChainMap.erase(number); resetNeedsSyncing(_peer); requestSubchains(); } } return; } if (isSyncing(_peer) && _peer->m_syncHashNumber == m_syncingBlockNumber) { // Got new subchain marker assert(_hashes.size() == 1); m_knownHashes.insert(_hashes[0]); m_readyChainMap.insert(make_pair(m_syncingBlockNumber, SubChain{ h256s{ _hashes[0] }, _hashes[0] })); if ((m_readyChainMap.size() + m_downloadingChainMap.size() + m_completeChainMap.size()) * c_hashSubchainSize > _peer->m_expectedHashes) { _peer->disable("Too many hashes from lead peer"); restartSync(); return; } transition(_peer, SyncState::Hashes); requestSubchains(); } else { auto syncPeer = m_chainSyncPeers.find(_peer); unsigned number = 0; if (syncPeer == m_chainSyncPeers.end()) { //check downlading peers for (auto const& downloader: m_downloadingChainMap) if (downloader.second.lastHash == _peer->m_syncHash) { number = downloader.first; break; } } else number = syncPeer->second; if (number == 0) { clog(NetAllDetail) << "Hashes response from unexpected/expired peer"; return; } auto downloadingPeer = m_downloadingChainMap.find(number); if (downloadingPeer == m_downloadingChainMap.end() || downloadingPeer->second.lastHash != _peer->m_syncHash) { // Too late, other peer has already downloaded our hashes m_chainSyncPeers.erase(_peer); requestSubchain(_peer); return; } SubChain& subChain = downloadingPeer->second; unsigned knowns = 0; unsigned unknowns = 0; for (unsigned i = 0; i < _hashes.size(); ++i) { auto h = _hashes[i]; auto status = host().bq().blockStatus(h); if (status == QueueStatus::Importing || status == QueueStatus::Ready || host().chain().isKnown(h) || !!m_knownHashes.count(h)) { clog(NetMessageSummary) << "Subchain download complete"; m_chainSyncPeers.erase(_peer); completeSubchain(_peer, number); return; } else if (status == QueueStatus::Bad) { cwarn << "block hash bad!" << h << ". Bailing..."; _peer->disable("Bad hashes"); if (isSyncing(_peer)) restartSync(); else { //try with other peer m_readyChainMap[number] = move(m_downloadingChainMap.at(number)); m_downloadingChainMap.erase(number); m_chainSyncPeers.erase(_peer); } return; } else if (status == QueueStatus::Unknown) { unknowns++; subChain.hashes.push_back(h); } else knowns++; subChain.lastHash = h; } clog(NetMessageSummary) << knowns << "knowns," << unknowns << "unknowns; now at" << subChain.lastHash; if (subChain.hashes.size() > c_hashSubchainSize) { _peer->disable("Too many subchain hashes"); restartSync(); return; } requestSubchain(_peer); } DEV_INVARIANT_CHECK; } void PV61Sync::onPeerAborting() { RecursiveGuard l(x_sync); // Can't check invariants here since the peers is already removed from the list and the state is not updated yet. for (auto s = m_chainSyncPeers.begin(); s != m_chainSyncPeers.end();) { if (s->first.expired()) { unsigned number = s->second; m_readyChainMap[number] = move(m_downloadingChainMap.at(number)); m_downloadingChainMap.erase(number); m_chainSyncPeers.erase(s++); } else ++s; } if (m_syncer.expired()) { if (m_state == SyncState::Hashes) { // Main syncer aborted, other peers are probably still downloading hashes, just set one of them as syncer host().foreachPeer([&](std::shared_ptr _p) { if (_p->m_asking != Asking::Hashes) return true; setState(_p, SyncState::Hashes, true, true); return false; }); } if (m_syncer.expired()) PV60Sync::onPeerAborting(); } else if (isPV61Syncing() && m_state == SyncState::Hashes) requestSubchains(); DEV_INVARIANT_CHECK; } SyncStatus PV61Sync::status() const { RecursiveGuard l(x_sync); SyncStatus res = PV60Sync::status(); res.protocolVersion = 61; if (m_state == SyncState::Hashes && isPV61Syncing()) { res.hashesReceived = 0; for (auto const& d : m_readyChainMap) res.hashesReceived += d.second.hashes.size(); for (auto const& d : m_downloadingChainMap) res.hashesReceived += d.second.hashes.size(); for (auto const& d : m_completeChainMap) res.hashesReceived += d.second.hashes.size(); } return res; } bool PV61Sync::isPV61Syncing() const { return m_syncingBlockNumber != 0; } bool PV61Sync::invariants() const { if (m_state == SyncState::Hashes) { if (isPV61Syncing() && !m_syncingBlockNumber) BOOST_THROW_EXCEPTION(FailedInvariant() << errinfo_comment("Syncing in PV61 with no block number set")); } else if (!PV60Sync::invariants()) return false; return true; }