diff --git a/libethereum/EthereumHost.cpp b/libethereum/EthereumHost.cpp index 3965a214e..8b2427e42 100644 --- a/libethereum/EthereumHost.cpp +++ b/libethereum/EthereumHost.cpp @@ -237,7 +237,7 @@ void EthereumHost::maintainBlocks(h256 const& _currentHash) void EthereumHost::onPeerStatus(EthereumPeer* _peer) { - Guard l(x_sync); + RecursiveGuard l(x_sync); if (_peer->m_genesisHash != m_chain.genesisHash()) _peer->disable("Invalid genesis hash"); else if (_peer->m_protocolVersion != protocolVersion() && _peer->m_protocolVersion != c_oldProtocolVersion) @@ -252,10 +252,13 @@ void EthereumHost::onPeerStatus(EthereumPeer* _peer) { if (_peer->m_protocolVersion != protocolVersion()) estimatePeerHashes(_peer); - else if (_peer->m_latestBlockNumber > m_chain.number()) + else + { + if (_peer->m_latestBlockNumber > m_chain.number()) _peer->m_expectedHashes = (unsigned)_peer->m_latestBlockNumber - m_chain.number(); - if (m_hashMan.chainSize() < _peer->m_expectedHashes) - m_hashMan.resetToRange(m_chain.number() + 1, _peer->m_expectedHashes); + if (m_hashMan.chainSize() < _peer->m_expectedHashes) + m_hashMan.resetToRange(m_chain.number() + 1, _peer->m_expectedHashes); + } continueSync(_peer); } } @@ -265,7 +268,7 @@ void EthereumHost::estimatePeerHashes(EthereumPeer* _peer) BlockInfo block = m_chain.info(); time_t lastBlockTime = (block.hash() == m_chain.genesisHash()) ? 1428192000 : (time_t)block.timestamp; time_t now = time(0); - unsigned blockCount = 1000; + unsigned blockCount = 30000; if (lastBlockTime > now) clog(NetWarn) << "Clock skew? Latest block is in the future"; else @@ -276,7 +279,7 @@ void EthereumHost::estimatePeerHashes(EthereumPeer* _peer) void EthereumHost::onPeerHashes(EthereumPeer* _peer, h256s const& _hashes) { - Guard l(x_sync); + RecursiveGuard l(x_sync); assert(_peer->m_asking == Asking::Nothing); onPeerHashes(_peer, _hashes, false); } @@ -285,13 +288,22 @@ void EthereumHost::onPeerHashes(EthereumPeer* _peer, h256s const& _hashes, bool { if (_hashes.empty()) { - onPeerDoneHashes(_peer, true); + continueSync(); return; } + + bool syncByNumber = _peer->m_syncHashNumber; + if (!syncByNumber && _peer->m_latestHash != m_syncingLatestHash) + { + // Obsolete hashes, discard + continueSync(_peer); + return; + } + unsigned knowns = 0; unsigned unknowns = 0; h256s neededBlocks; - bool syncByNumber = !m_syncingLatestHash; + unsigned firstNumber = _peer->m_syncHashNumber - _hashes.size(); for (unsigned i = 0; i < _hashes.size(); ++i) { _peer->addRating(1); @@ -321,8 +333,11 @@ void EthereumHost::onPeerHashes(EthereumPeer* _peer, h256s const& _hashes, bool } else knowns++; + if (!syncByNumber) m_syncingLatestHash = h; + else + _peer->m_hashSub.noteHash(firstNumber + i, 1); } if (syncByNumber) { @@ -368,13 +383,14 @@ void EthereumHost::onPeerDoneHashes(EthereumPeer* _peer, bool _localChain) { m_man.resetToChain(m_hashes); m_hashes.clear(); + m_hashMan.reset(m_chain.number() + 1); } continueSync(); } void EthereumHost::onPeerBlocks(EthereumPeer* _peer, RLP const& _r) { - Guard l(x_sync); + RecursiveGuard l(x_sync); assert(_peer->m_asking == Asking::Nothing); unsigned itemCount = _r.itemCount(); clog(NetMessageSummary) << "Blocks (" << dec << itemCount << "entries)" << (itemCount ? "" : ": NoMoreBlocks"); @@ -450,7 +466,7 @@ void EthereumHost::onPeerBlocks(EthereumPeer* _peer, RLP const& _r) void EthereumHost::onPeerNewHashes(EthereumPeer* _peer, h256s const& _hashes) { - Guard l(x_sync); + RecursiveGuard l(x_sync); if (isSyncing_UNSAFE()) { clog(NetMessageSummary) << "Ignoring new hashes since we're already downloading."; @@ -462,7 +478,7 @@ void EthereumHost::onPeerNewHashes(EthereumPeer* _peer, h256s const& _hashes) void EthereumHost::onPeerNewBlock(EthereumPeer* _peer, RLP const& _r) { - Guard l(x_sync); + RecursiveGuard l(x_sync); if (isSyncing_UNSAFE()) { clog(NetMessageSummary) << "Ignoring new blocks since we're already downloading."; @@ -549,6 +565,16 @@ void EthereumHost::onPeerTransactions(EthereumPeer* _peer, RLP const& _r) } } +void EthereumHost::onPeerAborting(EthereumPeer* _peer) +{ + RecursiveGuard l(x_sync); + if (_peer->isSyncing()) + { + _peer->setIdle(); + continueSync(); + } +} + void EthereumHost::continueSync() { clog(NetAllDetail) << "Getting help with downloading hashes and blocks"; @@ -562,21 +588,33 @@ void EthereumHost::continueSync() void EthereumHost::continueSync(EthereumPeer* _peer) { assert(_peer->m_asking == Asking::Nothing); - bool otherPeerSync = false; + bool otherPeerV60Sync = false; + bool otherPeerV61Sync = false; if (m_needSyncHashes && peerShouldGrabChain(_peer)) { foreachPeer([&](EthereumPeer* _p) { - if (_p != _peer && _p->m_asking == Asking::Hashes && _p->m_protocolVersion != protocolVersion()) - otherPeerSync = true; // Already have a peer downloading hash chain with old protocol, do nothing + if (_p != _peer && _p->m_asking == Asking::Hashes) + { + if (_p->m_protocolVersion != protocolVersion()) + otherPeerV60Sync = true; // Already have a peer downloading hash chain with old protocol, do nothing + else + otherPeerV61Sync = true; // Already have a peer downloading hash chain with V61+ protocol, join if supported + } }); - if (otherPeerSync) + if (otherPeerV60Sync && !m_hashes.empty()) { /// Downloading from other peer with v60 protocol, nothing ese we can do _peer->setIdle(); return; } - if (_peer->m_protocolVersion == protocolVersion() && !m_syncingLatestHash) + if (otherPeerV61Sync && _peer->m_protocolVersion != protocolVersion()) + { + /// Downloading from other peer with v61+ protocol which this peer does not support, + _peer->setIdle(); + return; + } + if (_peer->m_protocolVersion == protocolVersion() && !m_hashMan.isComplete()) _peer->requestHashes(); /// v61+ and not catching up to a particular hash else { @@ -586,7 +624,8 @@ void EthereumHost::continueSync(EthereumPeer* _peer) m_syncingLatestHash =_peer->m_latestHash; m_syncingTotalDifficulty = _peer->m_totalDifficulty; } - _peer->requestHashes(m_syncingLatestHash); + if (_peer->m_totalDifficulty >= m_syncingTotalDifficulty) + _peer->requestHashes(m_syncingLatestHash); } } else if (m_needSyncBlocks && peerShouldGrabBlocks(_peer)) // Check if this peer can help with downloading blocks diff --git a/libethereum/EthereumHost.h b/libethereum/EthereumHost.h index 8ca815a17..d9628de3a 100644 --- a/libethereum/EthereumHost.h +++ b/libethereum/EthereumHost.h @@ -70,7 +70,7 @@ public: void reset(); DownloadMan const& downloadMan() const { return m_man; } - bool isSyncing() const { Guard l(x_sync); return isSyncing_UNSAFE(); } + bool isSyncing() const { RecursiveGuard l(x_sync); return isSyncing_UNSAFE(); } bool isBanned(p2p::NodeId _id) const { return !!m_banned.count(_id); } void noteNewTransactions() { m_newTransactions = true; } @@ -82,6 +82,7 @@ public: void onPeerNewHashes(EthereumPeer* _peer, h256s const& _hashes); ///< Called by peer once it has new hashes void onPeerHashes(EthereumPeer* _peer, h256s const& _hashes); ///< Called by peer once it has another sequential block of hashes during sync void onPeerTransactions(EthereumPeer* _peer, RLP const& _r); ///< Called by peer when it has new transactions + void onPeerAborting(EthereumPeer* _peer); ///< Called by peer when it is disconnecting DownloadMan& downloadMan() { return m_man; } HashDownloadMan& hashDownloadMan() { return m_hashMan; } @@ -141,7 +142,7 @@ private: bool m_newTransactions = false; bool m_newBlocks = false; - mutable Mutex x_sync; + mutable RecursiveMutex x_sync; bool m_needSyncHashes = true; ///< Indicates if need to downlad hashes bool m_needSyncBlocks = true; ///< Indicates if we still need to download some blocks h256 m_syncingLatestHash; ///< Latest block's hash, as of the current sync. diff --git a/libethereum/EthereumPeer.cpp b/libethereum/EthereumPeer.cpp index d6b0b50c3..72015a5e9 100644 --- a/libethereum/EthereumPeer.cpp +++ b/libethereum/EthereumPeer.cpp @@ -40,7 +40,6 @@ EthereumPeer::EthereumPeer(Session* _s, HostCapabilityFace* _h, unsigned _i, Cap m_hashSub(host()->hashDownloadMan()), m_peerCapabilityVersion(_cap.second) { - m_syncHashNumber = host()->chain().number() + 1; requestStatus(); } @@ -52,8 +51,7 @@ EthereumPeer::~EthereumPeer() void EthereumPeer::abortSync() { - if (isSyncing()) - setIdle(); + host()->onPeerAborting(this); } EthereumHost* EthereumPeer::host() const @@ -105,6 +103,7 @@ void EthereumPeer::requestHashes() { assert(m_asking == Asking::Nothing); m_syncHashNumber = m_hashSub.nextFetch(c_maxHashesAsk); + m_syncHash = h256(); setAsking(Asking::Hashes); RLPStream s; prep(s, GetBlockHashesByNumberPacket, 2) << m_syncHashNumber << c_maxHashesAsk; @@ -119,6 +118,8 @@ void EthereumPeer::requestHashes(h256 const& _lastHash) RLPStream s; prep(s, GetBlockHashesPacket, 2) << _lastHash << c_maxHashesAsk; clog(NetMessageDetail) << "Requesting block hashes staring from " << _lastHash; + m_syncHash = _lastHash; + m_syncHashNumber = 0; sealAndSend(s); } @@ -150,7 +151,7 @@ void EthereumPeer::setAsking(Asking _a) void EthereumPeer::tick() { - if (chrono::system_clock::now() - m_lastAsk > chrono::seconds(10) && m_asking != Asking::Nothing) + if (chrono::system_clock::now() - m_lastAsk > chrono::seconds(20) && m_asking != Asking::Nothing) // timeout session()->disconnect(PingTimeout); } @@ -240,12 +241,10 @@ bool EthereumPeer::interpret(unsigned _id, RLP const& _r) setAsking(Asking::Nothing); h256s hashes(itemCount); for (unsigned i = 0; i < itemCount; ++i) - { hashes[i] = _r[i].toHash(); - m_hashSub.noteHash(m_syncHashNumber + i, 1); - } - m_syncHashNumber += itemCount; + if (m_syncHashNumber > 0) + m_syncHashNumber += itemCount; host()->onPeerHashes(this, hashes); break; } diff --git a/libethereum/EthereumPeer.h b/libethereum/EthereumPeer.h index 94dc60501..f7a2e12af 100644 --- a/libethereum/EthereumPeer.h +++ b/libethereum/EthereumPeer.h @@ -128,8 +128,9 @@ private: /// This is built as we ask for hashes. Once no more hashes are given, we present this to the /// host who initialises the DownloadMan and m_sub becomes active for us to begin asking for blocks. - unsigned m_expectedHashes = 0; ///< Estimated upper bound of hashes to expect from this peer. - unsigned m_syncHashNumber = 0; ///< Number of latest hash we sync to + unsigned m_expectedHashes = 0; ///< Estimated upper bound of hashes to expect from this peer. + unsigned m_syncHashNumber = 0; ///< Number of latest hash we sync to (PV61+) + h256 m_syncHash; ///< Latest hash we sync to (PV60) /// Once we're asking for blocks, this becomes in use. DownloadSub m_sub;