From 2ad9a6d265ad2ab7b4d4ae97a3226082d50b3396 Mon Sep 17 00:00:00 2001 From: arkpar Date: Wed, 17 Jun 2015 10:06:00 +0200 Subject: [PATCH] reverted to old pv61 sync code --- libethcore/Common.h | 3 +- libethereum/BlockChain.cpp | 2 +- libethereum/BlockChainSync.cpp | 701 +++++++++++++++++++++++++++++++++ libethereum/BlockChainSync.h | 146 +++++++ libethereum/BlockQueue.cpp | 3 +- libethereum/CommonNet.h | 6 +- libethereum/EthereumHost.cpp | 578 +++------------------------ libethereum/EthereumHost.h | 64 +-- libethereum/EthereumPeer.cpp | 11 +- libethereum/EthereumPeer.h | 16 +- 10 files changed, 941 insertions(+), 589 deletions(-) create mode 100644 libethereum/BlockChainSync.cpp create mode 100644 libethereum/BlockChainSync.h diff --git a/libethcore/Common.h b/libethcore/Common.h index 6f23cb0e8..77ec9fa3b 100644 --- a/libethcore/Common.h +++ b/libethcore/Common.h @@ -100,7 +100,8 @@ enum class ImportResult { Success = 0, UnknownParent, - FutureTime, + FutureTimeKnown, + FutureTimeUnkwnown, AlreadyInChain, AlreadyKnown, Malformed, diff --git a/libethereum/BlockChain.cpp b/libethereum/BlockChain.cpp index bd6996a45..b667ba03f 100644 --- a/libethereum/BlockChain.cpp +++ b/libethereum/BlockChain.cpp @@ -372,7 +372,7 @@ pair BlockChain::attemptImport(bytes const& _block, O } catch (FutureTime&) { - return make_pair(ImportResult::FutureTime, make_pair(h256s(), h256s())); + return make_pair(ImportResult::FutureTimeKnown, make_pair(h256s(), h256s())); } catch (Exception& ex) { diff --git a/libethereum/BlockChainSync.cpp b/libethereum/BlockChainSync.cpp new file mode 100644 index 000000000..ed43e0786 --- /dev/null +++ b/libethereum/BlockChainSync.cpp @@ -0,0 +1,701 @@ +/* + This file is part of cpp-ethereum. + + cpp-ethereum is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + cpp-ethereum is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with cpp-ethereum. If not, see . +*/ +/** @file EthereumHost.cpp + * @author Gav Wood + * @date 2014 + */ + +#include "BlockChainSync.h" + +#include +#include +#include +#include +#include +#include +#include +#include "BlockChain.h" +#include "TransactionQueue.h" +#include "BlockQueue.h" +#include "EthereumPeer.h" +#include "EthereumHost.h" +#include "DownloadMan.h" +using namespace std; +using namespace dev; +using namespace dev::eth; +using namespace p2p; + + +unsigned const c_chainReorgSize = 30000; + + +BlockChainSync::BlockChainSync(EthereumHost& _host): + m_host(_host) +{ +} + +BlockChainSync::~BlockChainSync() +{ + abortSync(); +} + +DownloadMan const& BlockChainSync::downloadMan() const +{ + return host().downloadMan(); +} + +DownloadMan& BlockChainSync::downloadMan() +{ + return host().downloadMan(); +} + +void BlockChainSync::abortSync() +{ + host().foreachPeer([this](EthereumPeer* _p) { onPeerAborting(_p); return true; }); + downloadMan().resetToChain(h256s()); +} + +void BlockChainSync::onPeerStatus(EthereumPeer*) +{ + +} + +unsigned BlockChainSync::estimateHashes() +{ + BlockInfo block = host().chain().info(); + time_t lastBlockTime = (block.hash() == host().chain().genesisHash()) ? 1428192000 : (time_t)block.timestamp; + time_t now = time(0); + unsigned blockCount = c_chainReorgSize; + if (lastBlockTime > now) + clog(NetWarn) << "Clock skew? Latest block is in the future"; + else + blockCount += (now - lastBlockTime) / (unsigned)c_durationLimit; + clog(NetAllDetail) << "Estimated hashes: " << blockCount; + return blockCount; +} + +PV60Sync::PV60Sync(EthereumHost& _host): + BlockChainSync(_host) +{ +} + +SyncStatus PV60Sync::status() const +{ + RecursiveGuard l(x_sync); + SyncStatus res; + res.state = m_state; + if (m_state == SyncState::Hashes) + { + res.hashesTotal = m_estimatedHashes; + res.hashesReceived = static_cast(m_syncingNeededBlocks.size()); + res.hashesEstimated = true; + } + else if (m_state == SyncState::Blocks || m_state == SyncState::NewBlocks || m_state == SyncState::Waiting) + { + res.blocksTotal = downloadMan().chainSize(); + res.blocksReceived = downloadMan().blocksGot().size(); + } + return res; +} + +void PV60Sync::setState(EthereumPeer* _peer, SyncState _s, bool _isSyncing, bool _needHelp) +{ + bool changedState = (m_state != _s); + m_state = _s; + + if (_isSyncing != (m_syncer == _peer) || (_isSyncing && changedState)) + changeSyncer(_isSyncing ? _peer : nullptr, _needHelp); + else if (_s == SyncState::Idle) + changeSyncer(nullptr, _needHelp); + + assert(!!m_syncer || _s == SyncState::Idle); + + if (!_isSyncing) + { + m_syncingLatestHash = h256(); + m_syncingTotalDifficulty = 0; + m_syncingNeededBlocks.clear(); + } +} + +void PV60Sync::transition(EthereumPeer* _peer, SyncState _s, bool _force, bool _needHelp) +{ + clog(NetMessageSummary) << "Transition!" << EthereumHost::stateName(_s) << "from" << EthereumHost::stateName(m_state) << ", " << (isSyncing(_peer) ? "syncing" : "holding") << (needsSyncing(_peer) ? "& needed" : ""); + + //DEV_INVARIANT_CHECK; + if (m_state == SyncState::Idle && _s != SyncState::Idle) + _peer->m_requireTransactions = true; + + RLPStream s; + if (_s == SyncState::Hashes) + { + if (m_state == SyncState::Idle) + { + if (isSyncing(_peer)) + clog(NetWarn) << "Bad state: not asking for Hashes, yet syncing!"; + + m_syncingLatestHash = _peer->m_latestHash; + m_syncingTotalDifficulty = _peer->m_totalDifficulty; + setState(_peer, _s, true); + _peer->requestHashes(m_syncingLatestHash); + DEV_INVARIANT_CHECK; + return; + } + else if (m_state == SyncState::Hashes) + { + if (!isSyncing(_peer)) + clog(NetWarn) << "Bad state: asking for Hashes yet not syncing!"; + + setState(_peer, _s, true); + _peer->requestHashes(m_syncingLastReceivedHash); + DEV_INVARIANT_CHECK; + return; + } + } + else if (_s == SyncState::Blocks) + { + if (m_state == SyncState::Hashes) + { + if (!isSyncing(_peer)) + { + clog(NetWarn) << "Bad state: asking for Hashes yet not syncing!"; + return; + } + if (shouldGrabBlocks(_peer)) + { + clog(NetNote) << "Difficulty of hashchain HIGHER. Grabbing" << m_syncingNeededBlocks.size() << "blocks [latest now" << m_syncingLatestHash << ", was" << host().latestBlockSent() << "]"; + downloadMan().resetToChain(m_syncingNeededBlocks); + } + else + { + clog(NetNote) << "Difficulty of hashchain not HIGHER. Ignoring."; + m_syncingLatestHash = h256(); + setState(_peer, SyncState::Idle, false); + return; + } + assert (isSyncing(_peer)); + } + // run through into... + if (m_state == SyncState::Idle || m_state == SyncState::Hashes || m_state == SyncState::Blocks) + { + // Looks like it's the best yet for total difficulty. Set to download. + setState(_peer, SyncState::Blocks, isSyncing(_peer), _needHelp); // will kick off other peers to help if available. + requestBlocks(_peer); + DEV_INVARIANT_CHECK; + return; + } + } + else if (_s == SyncState::NewBlocks) + { + if (m_state != SyncState::Idle && m_state != SyncState::NewBlocks) + clog(NetWarn) << "Bad state: Asking new blocks while syncing!"; + else + { + setState(_peer, SyncState::NewBlocks, true, _needHelp); + requestBlocks(_peer); + DEV_INVARIANT_CHECK; + return; + } + } + else if (_s == SyncState::Idle) + { + host().foreachPeer([this](EthereumPeer* _p) { _p->setIdle(); return true; }); + if (m_state == SyncState::Blocks || m_state == SyncState::NewBlocks) + { + clog(NetNote) << "Finishing blocks fetch..."; + + // a bit overkill given that the other nodes may yet have the needed blocks, but better to be safe than sorry. + if (isSyncing(_peer)) + noteDoneBlocks(_peer, _force); + + // NOTE: need to notify of giving up on chain-hashes, too, altering state as necessary. + _peer->m_sub.doneFetch(); + _peer->setIdle(); + setState(_peer, SyncState::Idle, false); + } + else if (m_state == SyncState::Hashes) + { + clog(NetNote) << "Finishing hashes fetch..."; + setState(_peer, SyncState::Idle, false); + } + // Otherwise it's fine. We don't care if it's Nothing->Nothing. + DEV_INVARIANT_CHECK; + return; + } + + clog(NetWarn) << "Invalid state transition:" << EthereumHost::stateName(_s) << "from" << EthereumHost::stateName(m_state) << ", " << (isSyncing(_peer) ? "syncing" : "holding") << (needsSyncing(_peer) ? "& needed" : ""); +} + +void PV60Sync::requestBlocks(EthereumPeer* _peer) +{ + _peer->requestBlocks(); + if (_peer->m_asking != Asking::Blocks) //nothing to download + { + noteDoneBlocks(_peer, false); + if (downloadMan().isComplete()) + transition(_peer, SyncState::Idle); + return; + } +} + +void PV60Sync::setNeedsSyncing(EthereumPeer* _peer, h256 _latestHash, u256 _td) +{ + _peer->m_latestHash = _latestHash; + _peer->m_totalDifficulty = _td; + + if (_peer->m_latestHash) + noteNeedsSyncing(_peer); + + _peer->session()->addNote("sync", string(isSyncing(_peer) ? "ongoing" : "holding") + (needsSyncing(_peer) ? " & needed" : "")); +} + +bool PV60Sync::isSyncing(EthereumPeer* _peer) const +{ + return m_syncer == _peer; +} + +bool PV60Sync::shouldGrabBlocks(EthereumPeer* _peer) const +{ + auto td = _peer->m_totalDifficulty; + auto lh = _peer->m_latestHash; + auto ctd = host().chain().details().totalDifficulty; + + if (m_syncingNeededBlocks.empty()) + return false; + + clog(NetNote) << "Should grab blocks? " << td << "vs" << ctd << ";" << m_syncingNeededBlocks.size() << " blocks, ends" << m_syncingNeededBlocks.back(); + + if (td < ctd || (td == ctd && host().chain().currentHash() == lh)) + return false; + + return true; +} + +void PV60Sync::attemptSync(EthereumPeer* _peer) +{ + if (m_state != SyncState::Idle) + { + clog(NetAllDetail) << "Can't sync with this peer - outstanding asks."; + return; + } + + // if already done this, then ignore. + if (!needsSyncing(_peer)) + { + clog(NetAllDetail) << "Already synced with this peer."; + return; + } + + h256 c = host().chain().currentHash(); + unsigned n = host().chain().number(); + u256 td = host().chain().details().totalDifficulty; + + clog(NetAllDetail) << "Attempt chain-grab? Latest:" << c << ", number:" << n << ", TD:" << td << " versus " << _peer->m_totalDifficulty; + if (td >= _peer->m_totalDifficulty) + { + clog(NetAllDetail) << "No. Our chain is better."; + resetNeedsSyncing(_peer); + transition(_peer, SyncState::Idle); + } + else + { + clog(NetAllDetail) << "Yes. Their chain is better."; + m_estimatedHashes = _peer->m_expectedHashes - c_chainReorgSize; + transition(_peer, SyncState::Hashes); + } +} + +void PV60Sync::noteNeedsSyncing(EthereumPeer* _peer) +{ + // if already downloading hash-chain, ignore. + if (isSyncing()) + { + clog(NetAllDetail) << "Sync in progress: Just set to help out."; + if (m_state == SyncState::Blocks) + _peer->requestBlocks(); + } + else + // otherwise check to see if we should be downloading... + attemptSync(_peer); +} + +void PV60Sync::changeSyncer(EthereumPeer* _syncer, bool _needHelp) +{ + if (_syncer) + clog(NetAllDetail) << "Changing syncer to" << _syncer->session()->socketId(); + else + clog(NetAllDetail) << "Clearing syncer."; + + m_syncer = _syncer; + if (isSyncing()) + { + if (_needHelp && (m_state == SyncState::Blocks || m_state == SyncState::NewBlocks)) + host().foreachPeer([&](EthereumPeer* _p) + { + clog(NetNote) << "Getting help with downloading blocks"; + if (_p != _syncer && _p->m_asking == Asking::Nothing) + transition(_p, m_state); + return true; + }); + } + else + { + // start grabbing next hash chain if there is one. + host().foreachPeer([this](EthereumPeer* _p) + { + attemptSync(_p); + return !isSyncing(); + }); + if (!isSyncing()) + { + if (m_state != SyncState::Idle) + setState(_syncer, SyncState::Idle); + clog(NetNote) << "No more peers to sync with."; + } + } + assert(!!m_syncer || m_state == SyncState::Idle); +} + +void PV60Sync::noteDoneBlocks(EthereumPeer* _peer, bool _clemency) +{ + resetNeedsSyncing(_peer); + if (downloadMan().isComplete()) + { + // Done our chain-get. + clog(NetNote) << "Chain download complete."; + // 1/100th for each useful block hash. + _peer->addRating(downloadMan().chainSize() / 100); + downloadMan().reset(); + } + else if (isSyncing(_peer)) + { + if (_clemency) + clog(NetNote) << "Chain download failed. Aborted while incomplete."; + else + { + // Done our chain-get. + clog(NetWarn) << "Chain download failed. Peer with blocks didn't have them all. This peer is bad and should be punished."; + clog(NetWarn) << downloadMan().remaining(); + clog(NetWarn) << "WOULD BAN."; +// m_banned.insert(_peer->session()->id()); // We know who you are! +// _peer->disable("Peer sent hashes but was unable to provide the blocks."); + } + downloadMan().reset(); + } + _peer->m_sub.doneFetch(); +} + +void PV60Sync::onPeerStatus(EthereumPeer* _peer) +{ + RecursiveGuard l(x_sync); + DEV_INVARIANT_CHECK; + if (_peer->m_genesisHash != host().chain().genesisHash()) + _peer->disable("Invalid genesis hash"); + else if (_peer->m_protocolVersion != host().protocolVersion() && _peer->m_protocolVersion != EthereumHost::c_oldProtocolVersion) + _peer->disable("Invalid protocol version."); + else if (_peer->m_networkId != host().networkId()) + _peer->disable("Invalid network identifier."); + else if (_peer->session()->info().clientVersion.find("/v0.7.0/") != string::npos) + _peer->disable("Blacklisted client version."); + else if (host().isBanned(_peer->session()->id())) + _peer->disable("Peer banned for previous bad behaviour."); + else + { + unsigned estimatedHashes = estimateHashes(); + _peer->m_expectedHashes = estimatedHashes; + setNeedsSyncing(_peer, _peer->m_latestHash, _peer->m_totalDifficulty); + } + DEV_INVARIANT_CHECK; +} + +void PV60Sync::onPeerBlocks(EthereumPeer* _peer, RLP const& _r) +{ + RecursiveGuard l(x_sync); + DEV_INVARIANT_CHECK; + unsigned itemCount = _r.itemCount(); + clog(NetMessageSummary) << "Blocks (" << dec << itemCount << "entries)" << (itemCount ? "" : ": NoMoreBlocks"); + + _peer->setIdle(); + if (m_state != SyncState::Blocks && m_state != SyncState::NewBlocks) + clog(NetWarn) << "Unexpected Blocks received!"; + + if (itemCount == 0) + { + // Got to this peer's latest block - just give up. + noteDoneBlocks(_peer, false); + if (downloadMan().isComplete()) + transition(_peer, SyncState::Idle); + return; + } + + unsigned success = 0; + unsigned future = 0; + unsigned unknown = 0; + unsigned got = 0; + unsigned repeated = 0; + u256 maxDifficulty = 0; + h256 maxUnknown; + + for (unsigned i = 0; i < itemCount; ++i) + { + auto h = BlockInfo::headerHash(_r[i].data()); + if (_peer->m_sub.noteBlock(h)) + { + _peer->addRating(10); + switch (host().bq().import(_r[i].data(), host().chain())) + { + case ImportResult::Success: + success++; + break; + + case ImportResult::Malformed: + case ImportResult::BadChain: + _peer->disable("Malformed block received."); + return; + + case ImportResult::FutureTimeKnown: + future++; + break; + case ImportResult::AlreadyInChain: + case ImportResult::AlreadyKnown: + got++; + break; + + case ImportResult::FutureTimeUnkwnown: + future++; //Fall through + + case ImportResult::UnknownParent: + { + unknown++; + if (m_state == SyncState::NewBlocks) + { + BlockInfo bi; + bi.populateFromHeader(_r[i][0]); + if (bi.difficulty > maxDifficulty) + { + maxDifficulty = bi.difficulty; + maxUnknown = h; + } + } + break; + } + + default:; + } + } + else + { + _peer->addRating(0); // -1? + repeated++; + } + } + + clog(NetMessageSummary) << dec << success << "imported OK," << unknown << "with unknown parents," << future << "with future timestamps," << got << " already known," << repeated << " repeats received."; + + if (m_state == SyncState::Blocks || m_state == SyncState::NewBlocks) + { + if (downloadMan().isComplete()) + transition(_peer, SyncState::Idle); + else if (!got) + transition(_peer, m_state); + else + noteDoneBlocks(_peer, false); + } + DEV_INVARIANT_CHECK; +} + +void PV60Sync::onPeerHashes(EthereumPeer* _peer, h256s const& _hashes) +{ + RecursiveGuard l(x_sync); + DEV_INVARIANT_CHECK; + _peer->setIdle(); + if (!isSyncing(_peer)) + { + clog(NetMessageSummary) << "Ignoring hashes synce not syncing"; + return; + } + if (_hashes.size() == 0) + { + transition(_peer, SyncState::Blocks); + return; + } + unsigned knowns = 0; + unsigned unknowns = 0; + for (unsigned i = 0; i < _hashes.size(); ++i) + { + auto h = _hashes[i]; + auto status = host().bq().blockStatus(h); + if (status == QueueStatus::Importing || status == QueueStatus::Ready || host().chain().isKnown(h)) + { + clog(NetMessageSummary) << "block hash ready:" << h << ". Start blocks download..."; + assert (isSyncing(_peer)); + transition(_peer, SyncState::Blocks); + return; + } + else if (status == QueueStatus::Bad) + { + cwarn << "block hash bad!" << h << ". Bailing..."; + transition(_peer, SyncState::Idle); + return; + } + else if (status == QueueStatus::Unknown) + { + unknowns++; + m_syncingNeededBlocks.push_back(h); + } + else + knowns++; + m_syncingLastReceivedHash = h; + } + clog(NetMessageSummary) << knowns << "knowns," << unknowns << "unknowns; now at" << m_syncingLastReceivedHash; + if (m_syncingNeededBlocks.size() > _peer->m_expectedHashes) + { + _peer->disable("Too many hashes"); + m_syncingNeededBlocks.clear(); + m_syncingLatestHash = h256(); + transition(_peer, SyncState::Idle); + return; + } + // run through - ask for more. + transition(_peer, SyncState::Hashes); + DEV_INVARIANT_CHECK; +} + + +void PV60Sync::abortSync(EthereumPeer* _peer) +{ + if (isSyncing(_peer)) + { + host().foreachPeer([this](EthereumPeer* _p) { _p->setIdle(); return true; }); + transition(_peer, SyncState::Idle, true); + } + DEV_INVARIANT_CHECK; +} + +void PV60Sync::onPeerAborting(EthereumPeer* _peer) +{ + abortSync(_peer); + DEV_INVARIANT_CHECK; +} + +void PV60Sync::onPeerNewBlock(EthereumPeer* _peer, RLP const& _r) +{ + DEV_INVARIANT_CHECK; + RecursiveGuard l(x_sync); + auto h = BlockInfo::headerHash(_r[0].data()); + clog(NetMessageSummary) << "NewBlock: " << h; + + if (_r.itemCount() != 2) + _peer->disable("NewBlock without 2 data fields."); + else + { + switch (host().bq().import(_r[0].data(), host().chain())) + { + case ImportResult::Success: + _peer->addRating(100); + break; + case ImportResult::FutureTimeKnown: + //TODO: Rating dependent on how far in future it is. + break; + + case ImportResult::Malformed: + case ImportResult::BadChain: + _peer->disable("Malformed block received."); + return; + + case ImportResult::AlreadyInChain: + case ImportResult::AlreadyKnown: + break; + + case ImportResult::FutureTimeUnkwnown: + case ImportResult::UnknownParent: + clog(NetMessageSummary) << "Received block with no known parent. Resyncing..."; + setNeedsSyncing(_peer, h, _r[1].toInt()); + break; + default:; + } + + DEV_GUARDED(_peer->x_knownBlocks) + _peer->m_knownBlocks.insert(h); + } + DEV_INVARIANT_CHECK; +} + +void PV60Sync::onPeerNewHashes(EthereumPeer* _peer, h256s const& _hashes) +{ + RecursiveGuard l(x_sync); + DEV_INVARIANT_CHECK; + if (isSyncing()) + { + clog(NetMessageSummary) << "Ignoring since we're already downloading."; + return; + } + unsigned knowns = 0; + unsigned unknowns = 0; + for (auto h: _hashes) + { + _peer->addRating(1); + DEV_GUARDED(_peer->x_knownBlocks) + _peer->m_knownBlocks.insert(h); + auto status = host().bq().blockStatus(h); + if (status == QueueStatus::Importing || status == QueueStatus::Ready || host().chain().isKnown(h)) + knowns++; + else if (status == QueueStatus::Bad) + { + cwarn << "block hash bad!" << h << ". Bailing..."; + return; + } + else if (status == QueueStatus::Unknown) + { + unknowns++; + m_syncingNeededBlocks.push_back(h); + } + else + knowns++; + } + clog(NetMessageSummary) << knowns << "knowns," << unknowns << "unknowns"; + if (unknowns > 0) + { + clog(NetNote) << "Not syncing and new block hash discovered: syncing without help."; + downloadMan().resetToChain(m_syncingNeededBlocks); + transition(_peer, SyncState::NewBlocks, false, false); + } + DEV_INVARIANT_CHECK; +} + +bool PV60Sync::invariants() const +{ + if (m_state == SyncState::Idle && !!m_syncer) + return false; + if (m_state != SyncState::Idle && !m_syncer) + return false; + if (m_state == SyncState::Hashes) + { + bool hashes = false; + host().foreachPeer([&](EthereumPeer* _p) { if (_p->m_asking == Asking::Hashes) hashes = true; return !hashes; }); + if (!hashes) + return false; + } + if (m_state == SyncState::Blocks || m_state == SyncState::NewBlocks) + { + bool blocks = false; + host().foreachPeer([&](EthereumPeer* _p) { if (_p->m_asking == Asking::Blocks) blocks = true; return !blocks; }); + if (!blocks) + return false; + } + return true; +} diff --git a/libethereum/BlockChainSync.h b/libethereum/BlockChainSync.h new file mode 100644 index 000000000..2c3f61de6 --- /dev/null +++ b/libethereum/BlockChainSync.h @@ -0,0 +1,146 @@ +/* + This file is part of cpp-ethereum. + + cpp-ethereum is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + cpp-ethereum is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with cpp-ethereum. If not, see . +*/ +/** @file EthereumHost.h + * @author Gav Wood + * @date 2014 + */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include "CommonNet.h" +#include "EthereumPeer.h" //TODO: forward decl +#include "DownloadMan.h" + + +namespace dev +{ + +class RLPStream; + +namespace eth +{ + +class EthereumHost; +class BlockQueue; + +/** + * @brief BlockChain synchronization strategy class + * @doWork Syncs to peers and sends new blocks and transactions. + */ +class BlockChainSync: public HasInvariants +{ +public: + BlockChainSync(EthereumHost& _host); + + /// Will block on network process events. + virtual ~BlockChainSync(); + void abortSync(); + + DownloadMan const& downloadMan() const; + DownloadMan& downloadMan(); + virtual bool isSyncing() const = 0; + virtual void onPeerStatus(EthereumPeer* _peer); ///< Called by peer to report status + virtual void onPeerBlocks(EthereumPeer* _peer, RLP const& _r) = 0; ///< Called by peer once it has new blocks during syn + virtual void onPeerNewBlock(EthereumPeer* _peer, RLP const& _r) = 0; ///< Called by peer once it has new blocks + virtual void onPeerNewHashes(EthereumPeer* _peer, h256s const& _hashes) = 0; ///< Called by peer once it has new hashes + virtual void onPeerHashes(EthereumPeer* _peer, h256s const& _hashes) = 0; ///< Called by peer once it has another sequential block of hashes during sync + virtual void onPeerAborting(EthereumPeer* _peer) = 0; ///< Called by peer when it is disconnecting + virtual SyncStatus status() const = 0; + + static char const* stateName(SyncState _s) { return s_stateNames[static_cast(_s)]; } + +private: + static char const* const s_stateNames[static_cast(SyncState::Size)]; + + void setState(SyncState _s); + + bool invariants() const override = 0; + + EthereumHost& m_host; + Handler m_bqRoomAvailable; + HashDownloadMan m_hashMan; + +protected: + + EthereumHost& host() { return m_host; } + EthereumHost const& host() const { return m_host; } + unsigned estimateHashes(); + + mutable RecursiveMutex x_sync; + SyncState m_state = SyncState::Idle; ///< Current sync state + SyncState m_lastActiveState = SyncState::Idle; ///< Saved state before entering waiting queue mode + unsigned m_estimatedHashes = 0; ///< Number of estimated hashes for the last peer over PV60. Used for status reporting only. +}; + +class PV60Sync: public BlockChainSync +{ +public: + + PV60Sync(EthereumHost& _host); + + bool isSyncing() const override { return !!m_syncer; } + void onPeerStatus(EthereumPeer* _peer) override; ///< Called by peer to report status + void onPeerBlocks(EthereumPeer* _peer, RLP const& _r) override; ///< Called by peer once it has new blocks during syn + void onPeerNewBlock(EthereumPeer* _peer, RLP const& _r) override; ///< Called by peer once it has new blocks + void onPeerNewHashes(EthereumPeer* _peer, h256s const& _hashes) override; ///< Called by peer once it has new hashes + void onPeerHashes(EthereumPeer* _peer, h256s const& _hashes) override; ///< Called by peer once it has another sequential block of hashes during sync + void onPeerAborting(EthereumPeer* _peer) override; ///< Called by peer when it is disconnecting + SyncStatus status() const override; + + void transition(EthereumPeer* _peer, SyncState _s, bool _force = false, bool _needHelp = true); + void resetNeedsSyncing(EthereumPeer* _peer) { setNeedsSyncing(_peer, h256(), 0); } + bool needsSyncing(EthereumPeer* _peer) const { return !!_peer->m_latestHash; } + + void setNeedsSyncing(EthereumPeer* _peer, h256 _latestHash, u256 _td); + bool shouldGrabBlocks(EthereumPeer* _peer) const; + void attemptSync(EthereumPeer* _peer); + void setState(EthereumPeer* _peer, SyncState _s, bool _isSyncing = false, bool _needHelp = false); + bool isSyncing(EthereumPeer* _peer) const; + void noteNeedsSyncing(EthereumPeer* _who); + void changeSyncer(EthereumPeer* _syncer, bool _needHelp); + void noteDoneBlocks(EthereumPeer* _who, bool _clemency); + void abortSync(EthereumPeer* _peer); + void requestBlocks(EthereumPeer* _peer); + + +private: + bool invariants() const override; + + h256s m_knownHashes; ///< List of block hashes we need to download. + + h256s m_syncingNeededBlocks; ///< The blocks that we should download from this peer. + h256 m_syncingLastReceivedHash; ///< Hash most recently received from peer. + h256 m_syncingLatestHash; ///< Peer's latest block's hash, as of the current sync. + u256 m_syncingTotalDifficulty; ///< Peer's latest block's total difficulty, as of the current sync. + EthereumPeer* m_syncer = nullptr; // TODO: switch to weak_ptr + +}; +} +} diff --git a/libethereum/BlockQueue.cpp b/libethereum/BlockQueue.cpp index f142be62e..c8bbfc119 100644 --- a/libethereum/BlockQueue.cpp +++ b/libethereum/BlockQueue.cpp @@ -229,7 +229,8 @@ ImportResult BlockQueue::import(bytesConstRef _block, BlockChain const& _bc, boo cblockq << "OK - queued for future [" << bi.timestamp << "vs" << time(0) << "] - will wait until" << buf; m_unknownSize += _block.size(); m_unknownCount++; - return ImportResult::FutureTime; + bool unknown = !m_readySet.count(bi.parentHash) && !m_drainingSet.count(bi.parentHash) && !_bc.isKnown(bi.parentHash); + return unknown ? ImportResult::FutureTimeUnkwnown : ImportResult::FutureTimeKnown; } else { diff --git a/libethereum/CommonNet.h b/libethereum/CommonNet.h index b960c8f3f..35a27ff5e 100644 --- a/libethereum/CommonNet.h +++ b/libethereum/CommonNet.h @@ -80,10 +80,8 @@ enum class Asking enum class SyncState { Idle, ///< Initial chain sync complete. Waiting for new packets - WaitingQueue, ///< Block downloading paused. Waiting for block queue to process blocks and free space - HashesNegotiate, ///< Waiting for first hashes to arrive - HashesSingle, ///< Locked on and downloading hashes from a single peer - HashesParallel, ///< Downloading hashes from multiple peers over + Waiting, ///< Block downloading paused. Waiting for block queue to process blocks and free space + Hashes, ///< Downloading hashes from multiple peers over Blocks, ///< Downloading blocks NewBlocks, ///< Downloading blocks learned from NewHashes packet diff --git a/libethereum/EthereumHost.cpp b/libethereum/EthereumHost.cpp index e69bfa684..8b8a7c305 100644 --- a/libethereum/EthereumHost.cpp +++ b/libethereum/EthereumHost.cpp @@ -33,6 +33,8 @@ #include "BlockQueue.h" #include "EthereumPeer.h" #include "DownloadMan.h" +#include "BlockChainSync.h" + using namespace std; using namespace dev; using namespace dev::eth; @@ -41,7 +43,7 @@ using namespace p2p; unsigned const EthereumHost::c_oldProtocolVersion = 60; //TODO: remove this once v61+ is common unsigned const c_chainReorgSize = 30000; -char const* const EthereumHost::s_stateNames[static_cast(SyncState::Size)] = {"Idle", "WaitingQueue", "HashesNegotiate", "HashesSingle", "HashesParallel", "Blocks", "NewBlocks" }; +char const* const EthereumHost::s_stateNames[static_cast(SyncState::Size)] = {"Idle", "Waiting", "Hashes", "Blocks", "NewBlocks" }; EthereumHost::EthereumHost(BlockChain const& _ch, TransactionQueue& _tq, BlockQueue& _bq, u256 _networkId): HostCapability(), @@ -51,15 +53,12 @@ EthereumHost::EthereumHost(BlockChain const& _ch, TransactionQueue& _tq, BlockQu m_bq (_bq), m_networkId (_networkId) { - setState(SyncState::HashesNegotiate); m_latestBlockSent = _ch.currentHash(); - m_hashMan.reset(m_chain.number() + 1); - m_bqRoomAvailable = m_bq.onRoomAvailable([this](){ m_continueSync = true; }); } EthereumHost::~EthereumHost() { - foreachPeer([](EthereumPeer* _p) { _p->abortSync(); }); + //foreachPeer([](EthereumPeer* _p) { _p->abortSync(); }); } bool EthereumHost::ensureInitialised() @@ -79,31 +78,13 @@ bool EthereumHost::ensureInitialised() void EthereumHost::reset() { - foreachPeer([](EthereumPeer* _p) { _p->abortSync(); }); - m_man.resetToChain(h256s()); - m_hashMan.reset(m_chain.number() + 1); - setState(SyncState::HashesNegotiate); - m_syncingLatestHash = h256(); - m_syncingTotalDifficulty = 0; + Guard l(x_sync); + if (m_sync) + m_sync->abortSync(); + m_sync.reset(); + m_latestBlockSent = h256(); m_transactionsSent.clear(); - m_hashes.clear(); -} - -void EthereumHost::resetSyncTo(h256 const& _h) -{ - setState(SyncState::HashesNegotiate); - m_syncingLatestHash = _h; -} - - -void EthereumHost::setState(SyncState _s) -{ - if (m_state != _s) - { - clog(NetAllDetail) << "SyncState changed from " << stateName(m_state) << " to " << stateName(_s); - m_state = _s; - } } void EthereumHost::doWork() @@ -125,14 +106,7 @@ void EthereumHost::doWork() } } - if (m_continueSync) - { - m_continueSync = false; - RecursiveGuard l(x_sync); - continueSync(); - } - - foreachPeer([](EthereumPeer* _p) { _p->tick(); }); + foreachPeer([](EthereumPeer* _p) { _p->tick(); return true; }); // return netChange; // TODO: Figure out what to do with netChange. @@ -174,24 +148,28 @@ void EthereumHost::maintainTransactions() cnote << "Sent" << n << "transactions to " << _p->session()->info().clientVersion; } _p->m_requireTransactions = false; + return true; }); } -void EthereumHost::foreachPeer(std::function const& _f) const +void EthereumHost::foreachPeer(std::function const& _f) const { foreachPeerPtr([&](std::shared_ptr _p) { if (_p) - _f(_p.get()); + return _f(_p.get()); + return true; }); } -void EthereumHost::foreachPeerPtr(std::function)> const& _f) const +void EthereumHost::foreachPeerPtr(std::function)> const& _f) const { for (auto s: peerSessions()) - _f(s.first->cap()); + if (!_f(s.first->cap())) + return; for (auto s: peerSessions(c_oldProtocolVersion)) //TODO: remove once v61+ is common - _f(s.first->cap(c_oldProtocolVersion)); + if (!_f(s.first->cap(c_oldProtocolVersion))) + return; } tuple>, vector>, vector>> EthereumHost::randomSelection(unsigned _percent, std::function const& _allow) @@ -263,326 +241,50 @@ void EthereumHost::maintainBlocks(h256 const& _currentHash) } } -void EthereumHost::onPeerStatus(EthereumPeer* _peer) +BlockChainSync& EthereumHost::sync() { - RecursiveGuard l(x_sync); - DEV_INVARIANT_CHECK; - if (_peer->m_genesisHash != m_chain.genesisHash()) - _peer->disable("Invalid genesis hash"); - else if (_peer->m_protocolVersion != protocolVersion() && _peer->m_protocolVersion != c_oldProtocolVersion) - _peer->disable("Invalid protocol version."); - else if (_peer->m_networkId != networkId()) - _peer->disable("Invalid network identifier."); - else if (_peer->session()->info().clientVersion.find("/v0.7.0/") != string::npos) - _peer->disable("Blacklisted client version."); - else if (isBanned(_peer->session()->id())) - _peer->disable("Peer banned for previous bad behaviour."); - else + if (m_sync) + return *m_sync; // We only chose sync strategy once + + bool pv61 = false; + foreachPeer([&](EthereumPeer* _p) { - unsigned estimatedHashes = estimateHashes(); - if (_peer->m_protocolVersion == protocolVersion()) - { - if (_peer->m_latestBlockNumber > m_chain.number()) - _peer->m_expectedHashes = (unsigned)_peer->m_latestBlockNumber - m_chain.number(); - if (_peer->m_expectedHashes > estimatedHashes) - _peer->disable("Too many hashes"); - else if (needHashes() && m_hashMan.chainSize() < _peer->m_expectedHashes) - m_hashMan.resetToRange(m_chain.number() + 1, _peer->m_expectedHashes); - } - else - _peer->m_expectedHashes = estimatedHashes; - continueSync(_peer); - } - DEV_INVARIANT_CHECK; + if (_p->m_protocolVersion == protocolVersion()) + pv61 = true; + return !pv61; + }); + m_sync.reset(pv61 ? new PV60Sync(*this) : new PV60Sync(*this)); + return *m_sync; } -unsigned EthereumHost::estimateHashes() +void EthereumHost::onPeerStatus(EthereumPeer* _peer) { - BlockInfo block = m_chain.info(); - time_t lastBlockTime = (block.hash() == m_chain.genesisHash()) ? 1428192000 : (time_t)block.timestamp; - time_t now = time(0); - unsigned blockCount = c_chainReorgSize; - if (lastBlockTime > now) - clog(NetWarn) << "Clock skew? Latest block is in the future"; - else - blockCount += (now - lastBlockTime) / (unsigned)c_durationLimit; - clog(NetAllDetail) << "Estimated hashes: " << blockCount; - return blockCount; + Guard l(x_sync); + sync().onPeerStatus(_peer); } void EthereumHost::onPeerHashes(EthereumPeer* _peer, h256s const& _hashes) { - RecursiveGuard l(x_sync); - if (_peer->m_syncHashNumber > 0) - _peer->m_syncHashNumber += _hashes.size(); - - _peer->setAsking(Asking::Nothing); - onPeerHashes(_peer, _hashes, false); -} - -void EthereumHost::onPeerHashes(EthereumPeer* _peer, h256s const& _hashes, bool _complete) -{ - DEV_INVARIANT_CHECK; - if (_hashes.empty()) - { - _peer->m_hashSub.doneFetch(); - continueSync(); - return; - } - - bool syncByNumber = _peer->m_syncHashNumber; - if (!syncByNumber && !_complete && _peer->m_syncHash != m_syncingLatestHash) - { - // Obsolete hashes, discard - continueSync(_peer); - return; - } - - unsigned knowns = 0; - unsigned unknowns = 0; - h256s neededBlocks; - unsigned firstNumber = _peer->m_syncHashNumber - _hashes.size(); - for (unsigned i = 0; i < _hashes.size(); ++i) - { - _peer->addRating(1); - auto h = _hashes[i]; - auto status = m_bq.blockStatus(h); - if (status == QueueStatus::Importing || status == QueueStatus::Ready || m_chain.isKnown(h)) - { - clog(NetMessageSummary) << "Block hash already known:" << h; - if (!syncByNumber) - { - m_hashes += neededBlocks; - clog(NetMessageSummary) << "Start blocks download..."; - onPeerDoneHashes(_peer, true); - return; - } - } - else if (status == QueueStatus::Bad) - { - cwarn << "block hash bad!" << h << ". Bailing..."; - _peer->setIdle(); - return; - } - else if (status == QueueStatus::Unknown) - { - unknowns++; - neededBlocks.push_back(h); - } - else - knowns++; - - if (!syncByNumber) - m_syncingLatestHash = h; - else - _peer->m_hashSub.noteHash(firstNumber + i, 1); - } - if (syncByNumber) - { - m_man.appendToChain(neededBlocks); // Append to download manager immediatelly - clog(NetMessageSummary) << knowns << "knowns," << unknowns << "unknowns"; - } - else - { - m_hashes += neededBlocks; // Append to local list - clog(NetMessageSummary) << knowns << "knowns," << unknowns << "unknowns; now at" << m_syncingLatestHash; - } - if (_complete) - { - clog(NetMessageSummary) << "Start new blocks download..."; - m_syncingLatestHash = h256(); - setState(SyncState::NewBlocks); - m_man.resetToChain(m_hashes); - m_hashes.clear(); - m_hashMan.reset(m_chain.number() + 1); - continueSync(_peer); - } - else if (syncByNumber && m_hashMan.isComplete()) - { - // Done our chain-get. - clog(NetNote) << "Hashes download complete."; - onPeerDoneHashes(_peer, false); - } - else if (m_hashes.size() > _peer->m_expectedHashes) - { - _peer->disable("Too many hashes"); - m_hashes.clear(); - m_syncingLatestHash = h256(); - setState(SyncState::HashesNegotiate); - continueSync(); ///Try with some other peer, keep the chain - } - else - continueSync(_peer); /// Grab next hashes - DEV_INVARIANT_CHECK; -} - -void EthereumHost::onPeerDoneHashes(EthereumPeer* _peer, bool _localChain) -{ - assert(_peer->m_asking == Asking::Nothing); - m_syncingLatestHash = h256(); - setState(SyncState::Blocks); - if (_peer->m_protocolVersion != protocolVersion() || _localChain) - { - m_man.resetToChain(m_hashes); - _peer->addRating(m_man.chainSize() / 100); //TODO: what about other peers? - } - m_hashMan.reset(m_chain.number() + 1); - m_hashes.clear(); - continueSync(); + Guard l(x_sync); + sync().onPeerHashes(_peer, _hashes); } void EthereumHost::onPeerBlocks(EthereumPeer* _peer, RLP const& _r) { - RecursiveGuard l(x_sync); - DEV_INVARIANT_CHECK; - _peer->setAsking(Asking::Nothing); - unsigned itemCount = _r.itemCount(); - clog(NetMessageSummary) << "Blocks (" << dec << itemCount << "entries)" << (itemCount ? "" : ": NoMoreBlocks"); - - if (itemCount == 0) - { - // Got to this peer's latest block - just give up. - clog(NetNote) << "Finishing blocks fetch..."; - // NOTE: need to notify of giving up on chain-hashes, too, altering state as necessary. - _peer->m_sub.doneFetch(); - _peer->setIdle(); - return; - } - - unsigned success = 0; - unsigned future = 0; - unsigned unknown = 0; - unsigned got = 0; - unsigned repeated = 0; - h256 lastUnknown; - - for (unsigned i = 0; i < itemCount; ++i) - { - auto h = BlockInfo::headerHash(_r[i].data()); - if (_peer->m_sub.noteBlock(h)) - { - _peer->addRating(10); - switch (m_bq.import(_r[i].data(), m_chain)) - { - case ImportResult::Success: - success++; - break; - - case ImportResult::Malformed: - case ImportResult::BadChain: - _peer->disable("Malformed block received."); - return; - - case ImportResult::FutureTime: - future++; - break; - - case ImportResult::AlreadyInChain: - case ImportResult::AlreadyKnown: - got++; - break; - - case ImportResult::UnknownParent: - lastUnknown = h; - unknown++; - break; - - default:; - } - } - else - { - _peer->addRating(0); // -1? - repeated++; - } - } - - clog(NetMessageSummary) << dec << success << "imported OK," << unknown << "with unknown parents," << future << "with future timestamps," << got << " already known," << repeated << " repeats received."; - - if (m_state == SyncState::NewBlocks && unknown > 0) - { - _peer->m_latestHash = lastUnknown; - resetSyncTo(lastUnknown); - } - - continueSync(_peer); - DEV_INVARIANT_CHECK; + Guard l(x_sync); + sync().onPeerBlocks(_peer, _r); } void EthereumHost::onPeerNewHashes(EthereumPeer* _peer, h256s const& _hashes) { - RecursiveGuard l(x_sync); - DEV_INVARIANT_CHECK; - if (isSyncing() || _peer->isConversing()) - { - clog(NetMessageSummary) << "Ignoring new hashes since we're already downloading."; - return; - } - clog(NetNote) << "New block hash discovered: syncing without help."; - _peer->m_syncHashNumber = 0; - onPeerHashes(_peer, _hashes, true); - DEV_INVARIANT_CHECK; + Guard l(x_sync); + sync().onPeerNewHashes(_peer, _hashes); } void EthereumHost::onPeerNewBlock(EthereumPeer* _peer, RLP const& _r) { - RecursiveGuard l(x_sync); - DEV_INVARIANT_CHECK; - if ((isSyncing() || _peer->isConversing()) && m_state != SyncState::NewBlocks) - { - clog(NetMessageSummary) << "Ignoring new blocks since we're already downloading."; - return; - } - auto h = BlockInfo::headerHash(_r[0].data()); - clog(NetMessageSummary) << "NewBlock: " << h; - - if (_r.itemCount() != 2) - _peer->disable("NewBlock without 2 data fields."); - else - { - bool sync = false; - switch (m_bq.import(_r[0].data(), m_chain)) - { - case ImportResult::Success: - _peer->addRating(100); - break; - case ImportResult::FutureTime: - //TODO: Rating dependent on how far in future it is. - break; - - case ImportResult::Malformed: - case ImportResult::BadChain: - _peer->disable("Malformed block received."); - return; - - case ImportResult::AlreadyInChain: - case ImportResult::AlreadyKnown: - break; - - case ImportResult::UnknownParent: - if (h) - { - u256 difficulty = _r[1].toInt(); - if (m_syncingTotalDifficulty < difficulty) - { - clog(NetMessageSummary) << "Received block with no known parent. Resyncing..."; - _peer->m_latestHash = h; - _peer->m_totalDifficulty = difficulty; - resetSyncTo(h);; - sync = true; - } - } - break; - default:; - } - - DEV_GUARDED(_peer->x_knownBlocks) - _peer->m_knownBlocks.insert(h); - - if (sync) - continueSync(); - } - DEV_INVARIANT_CHECK; + Guard l(x_sync); + sync().onPeerNewBlock(_peer, _r); } void EthereumHost::onPeerTransactions(EthereumPeer* _peer, RLP const& _r) @@ -615,202 +317,28 @@ void EthereumHost::onPeerTransactions(EthereumPeer* _peer, RLP const& _r) void EthereumHost::onPeerAborting(EthereumPeer* _peer) { - RecursiveGuard l(x_sync); - if (_peer->isConversing()) - { - _peer->setIdle(); - if (_peer->isCriticalSyncing()) - _peer->setRude(); - continueSync(); - } -} - -void EthereumHost::continueSync() -{ - if (m_state == SyncState::WaitingQueue) - setState(m_lastActiveState); - clog(NetAllDetail) << "Continuing sync for all peers"; - foreachPeer([&](EthereumPeer* _p) - { - if (_p->m_asking == Asking::Nothing) - continueSync(_p); - }); -} - -void EthereumHost::continueSync(EthereumPeer* _peer) -{ - DEV_INVARIANT_CHECK; - assert(_peer->m_asking == Asking::Nothing); - bool otherPeerV60Sync = false; - bool otherPeerV61Sync = false; - if (needHashes()) - { - if (!peerShouldGrabChain(_peer)) - { - _peer->setIdle(); - return; - } - - foreachPeer([&](EthereumPeer* _p) - { - if (_p != _peer && _p->m_asking == Asking::Hashes) - { - if (_p->m_protocolVersion != protocolVersion()) - otherPeerV60Sync = true; // Already have a peer downloading hash chain with old protocol, do nothing - else - otherPeerV61Sync = true; // Already have a peer downloading hash chain with V61+ protocol, join if supported - } - }); - if (otherPeerV60Sync && !m_hashes.empty()) - { - /// Downloading from other peer with v60 protocol, nothing else we can do - _peer->setIdle(); - return; - } - if (otherPeerV61Sync && _peer->m_protocolVersion != protocolVersion()) - { - /// Downloading from other peer with v61+ protocol which this peer does not support, - _peer->setIdle(); - return; - } - if (_peer->m_protocolVersion == protocolVersion() && !m_hashMan.isComplete()) - { - setState(SyncState::HashesParallel); - _peer->requestHashes(); /// v61+ and not catching up to a particular hash - } - else - { - // Restart/continue sync in single peer mode - if (!m_syncingLatestHash) - { - m_syncingLatestHash =_peer->m_latestHash; - m_syncingTotalDifficulty = _peer->m_totalDifficulty; - } - if (_peer->m_totalDifficulty >= m_syncingTotalDifficulty) - { - _peer->requestHashes(m_syncingLatestHash); - setState(SyncState::HashesSingle); - m_estimatedHashes = _peer->m_expectedHashes - (_peer->m_protocolVersion == protocolVersion() ? 0 : c_chainReorgSize); - } - else - _peer->setIdle(); - } - } - else if (needBlocks()) - { - if (m_man.isComplete()) - { - // Done our chain-get. - setState(SyncState::Idle); - clog(NetNote) << "Chain download complete."; - // 1/100th for each useful block hash. - _peer->addRating(m_man.chainSize() / 100); //TODO: what about other peers? - m_man.reset(); - _peer->setIdle(); - return; - } - else if (peerCanHelp(_peer)) - { - // Check block queue status - if (m_bq.unknownFull()) - { - clog(NetWarn) << "Too many unknown blocks, restarting sync"; - m_bq.clear(); - reset(); - continueSync(); - } - else if (m_bq.knownFull()) - { - clog(NetAllDetail) << "Waiting for block queue before downloading blocks"; - m_lastActiveState = m_state; - setState(SyncState::WaitingQueue); - _peer->setIdle(); - } - else - _peer->requestBlocks(); - } - } - else - _peer->setIdle(); - DEV_INVARIANT_CHECK; -} - -bool EthereumHost::peerCanHelp(EthereumPeer* _peer) const -{ - (void)_peer; - return true; -} - -bool EthereumHost::peerShouldGrabBlocks(EthereumPeer* _peer) const -{ - // this is only good for deciding whether to go ahead and grab a particular peer's hash chain, - // yet it's being used in determining whether to allow a peer help with downloading an existing - // chain of blocks. - auto td = _peer->m_totalDifficulty; - auto lh = m_syncingLatestHash; - auto ctd = m_chain.details().totalDifficulty; - - clog(NetAllDetail) << "Should grab blocks? " << td << "vs" << ctd; - if (td < ctd || (td == ctd && m_chain.currentHash() == lh)) - return false; - return true; -} - -bool EthereumHost::peerShouldGrabChain(EthereumPeer* _peer) const -{ - h256 c = m_chain.currentHash(); - unsigned n = m_chain.number(); - u256 td = m_chain.details().totalDifficulty; - - clog(NetAllDetail) << "Attempt chain-grab? Latest:" << c << ", number:" << n << ", TD:" << td << " versus " << _peer->m_totalDifficulty; - if (td >= _peer->m_totalDifficulty) - { - clog(NetAllDetail) << "No. Our chain is better."; - return false; - } - else - { - clog(NetAllDetail) << "Yes. Their chain is better."; - return true; - } + Guard l(x_sync); + if (m_sync) + m_sync->onPeerAborting(_peer); } bool EthereumHost::isSyncing() const { - return m_state != SyncState::Idle; + Guard l(x_sync); + if (!m_sync) + return false; + return m_sync->isSyncing(); } SyncStatus EthereumHost::status() const { - RecursiveGuard l(x_sync); - SyncStatus res; - res.state = m_state; - if (m_state == SyncState::HashesParallel) - { - res.hashesReceived = m_hashMan.hashesGot().size(); - res.hashesTotal = m_hashMan.chainSize(); - } - else if (m_state == SyncState::HashesSingle) - { - res.hashesTotal = m_estimatedHashes; - res.hashesReceived = static_cast(m_hashes.size()); - res.hashesEstimated = true; - } - else if (m_state == SyncState::Blocks || m_state == SyncState::NewBlocks || m_state == SyncState::WaitingQueue) - { - res.blocksTotal = m_man.chainSize(); - res.blocksReceived = m_man.blocksGot().size(); - } - return res; + Guard l(x_sync); + if (!m_sync) + return SyncStatus(); + return m_sync->status(); } - bool EthereumHost::invariants() const { - if (m_state == SyncState::HashesNegotiate && !m_hashes.empty()) - return false; - if (needBlocks() && (m_syncingLatestHash || !m_hashes.empty())) - return false; - return true; } diff --git a/libethereum/EthereumHost.h b/libethereum/EthereumHost.h index 098d893ee..aed35c192 100644 --- a/libethereum/EthereumHost.h +++ b/libethereum/EthereumHost.h @@ -48,6 +48,7 @@ namespace eth class TransactionQueue; class BlockQueue; +class BlockChainSync; /** * @brief The EthereumHost class @@ -57,7 +58,6 @@ class BlockQueue; class EthereumHost: public p2p::HostCapability, Worker, HasInvariants { public: - /// Start server, but don't listen. EthereumHost(BlockChain const& _ch, TransactionQueue& _tq, BlockQueue& _bq, u256 _networkId); @@ -71,50 +71,42 @@ public: void reset(); DownloadMan const& downloadMan() const { return m_man; } + DownloadMan& downloadMan() { return m_man; } bool isSyncing() const; bool isBanned(p2p::NodeId const& _id) const { return !!m_banned.count(_id); } void noteNewTransactions() { m_newTransactions = true; } void noteNewBlocks() { m_newBlocks = true; } - void onPeerStatus(EthereumPeer* _peer); ///< Called by peer to report status - void onPeerBlocks(EthereumPeer* _peer, RLP const& _r); ///< Called by peer once it has new blocks during syn - void onPeerNewBlock(EthereumPeer* _peer, RLP const& _r); ///< Called by peer once it has new blocks - void onPeerNewHashes(EthereumPeer* _peer, h256s const& _hashes); ///< Called by peer once it has new hashes - void onPeerHashes(EthereumPeer* _peer, h256s const& _hashes); ///< Called by peer once it has another sequential block of hashes during sync - void onPeerTransactions(EthereumPeer* _peer, RLP const& _r); ///< Called by peer when it has new transactions - void onPeerAborting(EthereumPeer* _peer); ///< Called by peer when it is disconnecting - - DownloadMan& downloadMan() { return m_man; } - HashDownloadMan& hashDownloadMan() { return m_hashMan; } - BlockChain const& chain() { return m_chain; } + BlockChain const& chain() const { return m_chain; } + BlockQueue& bq() { return m_bq; } SyncStatus status() const; + h256 latestBlockSent() { return m_latestBlockSent; } static char const* stateName(SyncState _s) { return s_stateNames[static_cast(_s)]; } static unsigned const c_oldProtocolVersion; + void foreachPeerPtr(std::function)> const& _f) const; + void foreachPeer(std::function const& _f) const; + + void onPeerStatus(EthereumPeer* _peer); + void onPeerHashes(EthereumPeer* _peer, h256s const& _hashes); + void onPeerBlocks(EthereumPeer* _peer, RLP const& _r); + void onPeerNewHashes(EthereumPeer* _peer, h256s const& _hashes); + void onPeerNewBlock(EthereumPeer* _peer, RLP const& _r); + void onPeerTransactions(EthereumPeer* _peer, RLP const& _r); + void onPeerAborting(EthereumPeer* _peer); private: static char const* const s_stateNames[static_cast(SyncState::Size)]; std::tuple>, std::vector>, std::vector>> randomSelection(unsigned _percent = 25, std::function const& _allow = [](EthereumPeer const*){ return true; }); - void foreachPeerPtr(std::function)> const& _f) const; - void foreachPeer(std::function const& _f) const; - void resetSyncTo(h256 const& _h); - bool needHashes() const { return m_state == SyncState::HashesNegotiate || m_state == SyncState::HashesSingle || m_state == SyncState::HashesParallel; } - bool needBlocks() const { return m_state == SyncState::Blocks || m_state == SyncState::NewBlocks; } - /// Sync with the BlockChain. It might contain one of our mined blocks, we might have new candidates from the network. void doWork(); void maintainTransactions(); void maintainBlocks(h256 const& _currentBlock); - /// Get a bunch of needed blocks. - /// Removes them from our list of needed blocks. - /// @returns empty if there's no more blocks left to fetch, otherwise the blocks to fetch. - h256Hash neededBlocks(h256Hash const& _exclude); - /// Check to see if the network peer-state initialisation has happened. bool isInitialised() const { return (bool)m_latestBlockSent; } @@ -124,29 +116,16 @@ private: virtual void onStarting() { startWorking(); } virtual void onStopping() { stopWorking(); } - void continueSync(); /// Find something to do for all peers - void continueSync(EthereumPeer* _peer); /// Find some work to do for a peer - void onPeerDoneHashes(EthereumPeer* _peer, bool _new); /// Called when done downloading hashes from peer - void onPeerHashes(EthereumPeer* _peer, h256s const& _hashes, bool _complete); - bool peerShouldGrabBlocks(EthereumPeer* _peer) const; - bool peerShouldGrabChain(EthereumPeer* _peer) const; - bool peerCanHelp(EthereumPeer* _peer) const; - unsigned estimateHashes(); - void estimatePeerHashes(EthereumPeer* _peer); - void setState(SyncState _s); + BlockChainSync& sync(); bool invariants() const override; BlockChain const& m_chain; TransactionQueue& m_tq; ///< Maintains a list of incoming transactions not yet in a block on the blockchain. BlockQueue& m_bq; ///< Maintains a list of incoming blocks not yet on the blockchain (to be imported). - Handler m_bqRoomAvailable; u256 m_networkId; - DownloadMan m_man; - HashDownloadMan m_hashMan; - h256 m_latestBlockSent; h256Hash m_transactionsSent; @@ -155,14 +134,9 @@ private: bool m_newTransactions = false; bool m_newBlocks = false; - mutable RecursiveMutex x_sync; - SyncState m_state = SyncState::Idle; ///< Current sync state - SyncState m_lastActiveState = SyncState::Idle; ///< Saved state before entering waiting queue mode - h256 m_syncingLatestHash; ///< Latest block's hash, as of the current sync. - u256 m_syncingTotalDifficulty; ///< Latest block's total difficulty, as of the current sync. - h256s m_hashes; ///< List of hashes with unknown block numbers. Used for PV60 chain downloading and catching up to a particular unknown - unsigned m_estimatedHashes = 0; ///< Number of estimated hashes for the last peer over PV60. Used for status reporting only. - bool m_continueSync = false; ///< True when the block queue has processed a block; we should restart grabbing blocks. + mutable Mutex x_sync; + DownloadMan m_man; + std::unique_ptr m_sync; }; } diff --git a/libethereum/EthereumPeer.cpp b/libethereum/EthereumPeer.cpp index 7a30f1ad9..8aca62b9a 100644 --- a/libethereum/EthereumPeer.cpp +++ b/libethereum/EthereumPeer.cpp @@ -30,6 +30,8 @@ #include "EthereumHost.h" #include "TransactionQueue.h" #include "BlockQueue.h" +#include "BlockChainSync.h" + using namespace std; using namespace dev; using namespace dev::eth; @@ -38,7 +40,6 @@ using namespace p2p; EthereumPeer::EthereumPeer(Session* _s, HostCapabilityFace* _h, unsigned _i, CapDesc const& _cap): Capability(_s, _h, _i), m_sub(host()->downloadMan()), - m_hashSub(host()->hashDownloadMan()), m_peerCapabilityVersion(_cap.second) { session()->addNote("manners", isRude() ? "RUDE" : "nice"); @@ -91,8 +92,6 @@ string toString(Asking _a) void EthereumPeer::setIdle() { - m_sub.doneFetch(); - m_hashSub.doneFetch(); setAsking(Asking::Nothing); } @@ -113,14 +112,14 @@ void EthereumPeer::requestStatus() sealAndSend(s); } -void EthereumPeer::requestHashes() +void EthereumPeer::requestHashes(u256 _number, unsigned _count) { assert(m_asking == Asking::Nothing); - m_syncHashNumber = m_hashSub.nextFetch(c_maxHashesAsk); + m_syncHashNumber = _number; m_syncHash = h256(); setAsking(Asking::Hashes); RLPStream s; - prep(s, GetBlockHashesByNumberPacket, 2) << m_syncHashNumber << c_maxHashesAsk; + prep(s, GetBlockHashesByNumberPacket, 2) << m_syncHashNumber << _count; clog(NetMessageDetail) << "Requesting block hashes for numbers " << m_syncHashNumber << "-" << m_syncHashNumber + c_maxHashesAsk - 1; sealAndSend(s); } diff --git a/libethereum/EthereumPeer.h b/libethereum/EthereumPeer.h index a12b7a197..2ffbe9101 100644 --- a/libethereum/EthereumPeer.h +++ b/libethereum/EthereumPeer.h @@ -50,6 +50,9 @@ namespace eth class EthereumPeer: public p2p::Capability { friend class EthereumHost; //TODO: remove this + friend class BlockChainSync; //TODO: remove this + friend class PV60Sync; //TODO: remove this + friend class PV61Sync; //TODO: remove this public: /// Basic constructor. @@ -73,8 +76,8 @@ public: /// Abort sync and reset fetch void setIdle(); - /// Request hashes. Uses hash download manager to get hash number. v61+ protocol version only - void requestHashes(); + /// Request hashes by number. v61+ protocol version only + void requestHashes(u256 _number, unsigned _count); /// Request hashes for given parent hash. void requestHashes(h256 const& _lastHash); @@ -135,18 +138,19 @@ private: h256 m_genesisHash; ///< Peer's genesis hash u256 m_latestBlockNumber; ///< Number of the latest block this peer has + /// This is built as we ask for hashes. Once no more hashes are given, we present this to the /// host who initialises the DownloadMan and m_sub becomes active for us to begin asking for blocks. + h256 m_syncingLastReceivedHash; ///< Hash most recently received from peer. + h256 m_syncingLatestHash; ///< Peer's latest block's hash, as of the current sync. + u256 m_syncingTotalDifficulty; ///< Peer's latest block's total difficulty, as of the current sync. unsigned m_expectedHashes = 0; ///< Estimated upper bound of hashes to expect from this peer. - unsigned m_syncHashNumber = 0; ///< Number of latest hash we sync to (PV61+) + u256 m_syncHashNumber = 0; ///< Number of latest hash we sync to (PV61+) h256 m_syncHash; ///< Latest hash we sync to (PV60) /// Once we're asking for blocks, this becomes in use. DownloadSub m_sub; - /// Once we're asking for hashes, this becomes in use. - HashDownloadSub m_hashSub; - u256 m_peerCapabilityVersion; ///< Protocol version this peer supports received as capability /// Have we received a GetTransactions packet that we haven't yet answered? bool m_requireTransactions = false;