From 6df207655c4f0db9257e296c2852c20bc32ec850 Mon Sep 17 00:00:00 2001 From: Gav Wood Date: Tue, 7 Oct 2014 00:37:31 +0200 Subject: [PATCH] More work on the peer state transition system --- libethereum/EthereumHost.cpp | 39 ++--- libethereum/EthereumHost.h | 6 +- libethereum/EthereumPeer.cpp | 329 +++++++++++++++++++---------------- libethereum/EthereumPeer.h | 18 +- 4 files changed, 206 insertions(+), 186 deletions(-) diff --git a/libethereum/EthereumHost.cpp b/libethereum/EthereumHost.cpp index 4be0065aa..2c87d887a 100644 --- a/libethereum/EthereumHost.cpp +++ b/libethereum/EthereumHost.cpp @@ -90,48 +90,43 @@ void EthereumHost::notePeerStateChanged(EthereumPeer* _who) } // otherwise check to see if we should be downloading... - _who->tryGrabbingHashChain(); + _who->attemptSyncing(); } -void EthereumHost::updateGrabbing(Asking _g) +void EthereumHost::updateGrabbing(Asking _g, EthereumPeer* _ignore) { m_grabbing = _g; if (_g == Asking::Nothing) readyForSync(); - else if (_g == Asking::Chain) + else if (_g == Asking::Blocks) for (auto j: peers()) - j->cap()->ensureGettingChain(); + if (j->cap().get() != _ignore && j->cap()->m_asking == Asking::Nothing) + j->cap()->transition(Asking::Blocks); } -void EthereumHost::noteHaveChain(EthereumPeer* _from) +bool EthereumHost::shouldGrabBlocks(EthereumPeer* _from) { - auto td = _from->m_totalDifficulty; + auto td = _from->m_syncingTotalDifficulty; + auto lh = _from->m_syncingLatestHash; - if (_from->m_neededBlocks.empty()) + if (_from->m_syncingNeededBlocks.empty()) { - _from->setGrabbing(Asking::Nothing); updateGrabbing(Asking::Nothing); - return; + return false; } - clog(NetNote) << "Hash-chain COMPLETE:" << _from->m_totalDifficulty << "vs" << m_chain.details().totalDifficulty << ";" << _from->m_neededBlocks.size() << " blocks, ends" << _from->m_neededBlocks.back().abridged(); + clog(NetNote) << "Hash-chain COMPLETE:" << td << "vs" << m_chain.details().totalDifficulty << ";" << _from->m_syncingNeededBlocks.size() << " blocks, ends" << _from->m_syncingNeededBlocks.back().abridged(); - if (td < m_chain.details().totalDifficulty || (td == m_chain.details().totalDifficulty && m_chain.currentHash() == _from->m_latestHash)) + if (td < m_chain.details().totalDifficulty || (td == m_chain.details().totalDifficulty && m_chain.currentHash() == lh)) { clog(NetNote) << "Difficulty of hashchain not HIGHER. Ignoring."; - _from->setGrabbing(Asking::Nothing); updateGrabbing(Asking::Nothing); - return; + return false; } - clog(NetNote) << "Difficulty of hashchain HIGHER. Replacing fetch queue [latest now" << _from->m_latestHash.abridged() << ", was" << m_latestBlockSent.abridged() << "]"; + clog(NetNote) << "Difficulty of hashchain HIGHER. Replacing fetch queue [latest now" << lh.abridged() << ", was" << m_latestBlockSent.abridged() << "]"; - // Looks like it's the best yet for total difficulty. Set to download. - m_man.resetToChain(_from->m_neededBlocks); - m_latestBlockSent = _from->m_latestHash; - - _from->setGrabbing(Asking::Chain); - updateGrabbing(Asking::Chain); + return true; } void EthereumHost::readyForSync() @@ -139,7 +134,7 @@ void EthereumHost::readyForSync() // start grabbing next hash chain if there is one. for (auto j: peers()) { - j->cap()->tryGrabbingHashChain(); + j->cap()->attemptSyncing(); if (j->cap()->m_grabbing == Asking::Hashes) { m_grabbing = Asking::Hashes; @@ -158,7 +153,7 @@ void EthereumHost::noteDoneBlocks(EthereumPeer* _who) updateGrabbing(Asking::Nothing); m_man.reset(); } - if (_who->m_grabbing == Asking::Chain) + if (_who->isSyncing()) { // Done our chain-get. clog(NetNote) << "Chain download failed. Peer with blocks didn't have them all. This peer is bad and should be punished."; diff --git a/libethereum/EthereumHost.h b/libethereum/EthereumHost.h index 8cbfbf678..b42ae7fa7 100644 --- a/libethereum/EthereumHost.h +++ b/libethereum/EthereumHost.h @@ -77,8 +77,10 @@ private: /// Session wants to pass us a block that we might not have. /// @returns true if we didn't have it. bool noteBlock(h256 _hash, bytesConstRef _data); + /// Session has finished getting the chain of hashes. - void noteHaveChain(EthereumPeer* _who); + bool shouldGrabBlocks(EthereumPeer* _who); + /// Called when the peer can no longer provide us with any needed blocks. void noteDoneBlocks(EthereumPeer* _who); @@ -103,7 +105,7 @@ private: virtual void onStopping() { stopWorking(); } void readyForSync(); - void updateGrabbing(Asking _g); + void updateGrabbing(Asking _g, EthereumPeer* _ignore); BlockChain const& m_chain; TransactionQueue& m_tq; ///< Maintains a list of incoming transactions not yet in a block on the blockchain. diff --git a/libethereum/EthereumPeer.cpp b/libethereum/EthereumPeer.cpp index ee5db6a4b..20a9fef52 100644 --- a/libethereum/EthereumPeer.cpp +++ b/libethereum/EthereumPeer.cpp @@ -38,7 +38,7 @@ EthereumPeer::EthereumPeer(Session* _s, HostCapabilityFace* _h): Capability(_s, _h), m_sub(host()->m_man) { - setAsking(Asking::State, Syncing::Done); + setAsking(Asking::State, false); sendStatus(); } @@ -65,20 +65,133 @@ void EthereumPeer::sendStatus() sealAndSend(s); } -void EthereumPeer::startInitialSync() +/* + * Possible asking/syncing states for two peers: + */ + +void EthereumPeer::transition(Asking _a) { - // Grab transactions off them. + RLPStream s; + prep(s); + if (_a == Asking::Hashes) { - RLPStream s; - prep(s).appendList(1); - s << GetTransactionsPacket; - sealAndSend(s); + if (m_asking == Asking::State || m_asking == Asking::Nothing) + { + if (isSyncing()) + clogS(NetWarn) << "Bad state: not asking for Hashes, yet syncing!"; + + m_syncingLatestHash = m_latestHash; + m_syncingTotalDifficulty = m_totalDifficulty; + m_latestHash = h256(); + + setAsking(_a, true); + s.appendList(3) << GetBlockHashesPacket << m_syncingLatestHash << c_maxHashesAsk; + m_syncingNeededBlocks = h256s(1, m_syncingLatestHash); + host()->updateGrabbing(Asking::Hashes); + sealAndSend(s); + return; + } + else if (m_asking == Asking::Hashes) + { + if (!isSyncing()) + clogS(NetWarn) << "Bad state: asking for Hashes yet not syncing!"; + + setAsking(_a, true); + s.appendList(3) << GetBlockHashesPacket << m_syncingNeededBlocks.back() << c_maxHashesAsk; + sealAndSend(s); + return; + } + } + else if (_a == Asking::Blocks) + { + if (m_asking == Asking::Hashes) + { + if (host()->shouldGrabBlocks(this)) + { + host()->m_man.resetToChain(m_syncingNeededBlocks); + host()->m_latestBlockSent = m_syncingLatestHash; + + host()->updateGrabbing(Asking::Blocks, this); + } + else + { + setAsking(Asking::Nothing, false); + return; + } + } + // run through into... + if (m_asking == Asking::Nothing || m_asking == Asking::Hashes || m_asking == Asking::Blocks) + { + // Looks like it's the best yet for total difficulty. Set to download. + setAsking(Asking::Blocks, true); + auto blocks = m_sub.nextFetch(c_maxBlocksAsk); + if (blocks.size()) + { + s.appendList(blocks.size() + 1) << GetBlocksPacket; + for (auto const& i: blocks) + s << i; + sealAndSend(s); + } + else + transition(Asking::Nothing); + return; + } } + else if (_a == Asking::Nothing) + { + if (m_asking == Asking::Blocks) + { + clogS(NetNote) << "Finishing block fetch..."; + + // a bit overkill given that the other nodes may yet have the needed blocks, but better to be safe than sorry. + if (isSyncing()) + host()->noteDoneBlocks(this); + // NOTE: need to notify of giving up on chain-hashes, too, altering state as necessary. + m_sub.doneFetch(); + + setAsking(Asking::Nothing, false); + } + else if (m_asking == Asking::Hashes) + { + clogS(NetNote) << "Finishing hashes fetch..."; + + if (isSyncing()) + host()->noteDoneBlocks(this); + + setAsking(Asking::Nothing, false); + } + else if (m_asking == Asking::State) + { + setAsking(Asking::Nothing, false); + // TODO: Just got the state - should check to see if we can be of help downloading the chain if any. + // TODO: Otherwise, should put ourselves up for sync. + } + // Otherwise it's fine. We don't care if it's Nothing->Nothing. + return; + } + + clogS(NetWarn) << "Invalid state transition:" << (int)_a << "from" << (int)m_asking << "/" << boolalpha << isSyncing() << needsSyncing(); +} + +void EthereumPeer::setAsking(Asking _a, bool _isSyncing) +{ + m_asking = _a; + m_isSyncing = _isSyncing; + session()->addNote("ask", _a == Asking::Nothing ? "nothing" : _a == Asking::State ? "state" : _a == Asking::Hashes ? "hashes" : _a == Asking::Blocks ? "blocks" : "?"); + session()->addNote("sync", string(isSyncing() ? "ongoing" : "holding") + (needsSyncing() ? " & needed" : "")); +} + +void EthereumPeer::setNeedsSyncing(h256 _latestHash, u256 _td) +{ + m_latestHash = _latestHash; + m_totalDifficulty = _td; + + // TODO: should be "noteNeedsSyncing" or some such. host()->notePeerStateChanged(this); } -void EthereumPeer::tryGrabbingHashChain() +void EthereumPeer::attemptSyncing() { if (m_asking != Asking::Nothing) { @@ -87,7 +200,7 @@ void EthereumPeer::tryGrabbingHashChain() } // if already done this, then ignore. - if (m_syncing == Syncing::Done) + if (!needsSyncing()) { clogS(NetAllDetail) << "Already synced with this peer."; return; @@ -101,39 +214,15 @@ void EthereumPeer::tryGrabbingHashChain() if (td >= m_totalDifficulty) { clogS(NetAllDetail) << "No. Our chain is better."; - setAsking(Asking::Nothing, Syncing::Done); - return; // All good - we have the better chain. + transition(Asking::Nothing); } - - // Our chain isn't better - grab theirs. + else { clogS(NetAllDetail) << "Yes. Their chain is better."; - - host()->updateGrabbing(Asking::Hashes); - setAsking(Asking::Hashes, Syncing::Executing); - RLPStream s; - prep(s).appendList(3); - s << GetBlockHashesPacket << m_latestHash << c_maxHashesAsk; - m_neededBlocks = h256s(1, m_latestHash); - sealAndSend(s); + transition(Asking::Hashes); } } -void EthereumPeer::finishSync() -{ - clogS(NetNote) << "Finishing fetch..."; - - // a bit overkill given that the other nodes may yet have the needed blocks, but better to be safe than sorry. - if (m_asking == Asking::Blocks || m_asking == Asking::ChainHelper) - { - host()->noteDoneBlocks(this); - setAsking(Asking::Nothing); - } - - // NOTE: need to notify of giving up on chain-hashes, too, altering state as necessary. - m_sub.doneFetch(); -} - bool EthereumPeer::interpret(RLP const& _r) { switch (_r[0].toInt()) @@ -142,8 +231,8 @@ bool EthereumPeer::interpret(RLP const& _r) { m_protocolVersion = _r[1].toInt(); m_networkId = _r[2].toInt(); - m_totalDifficulty = _r[3].toInt(); - m_latestHash = _r[4].toHash(); + auto totalDifficulty = _r[3].toInt(); + auto latestHash = _r[4].toHash(); auto genesisHash = _r[5].toHash(); clogS(NetMessageSummary) << "Status:" << m_protocolVersion << "/" << m_networkId << "/" << genesisHash.abridged() << ", TD:" << m_totalDifficulty << "=" << m_latestHash.abridged(); @@ -157,7 +246,15 @@ bool EthereumPeer::interpret(RLP const& _r) else if (session()->info().clientVersion.find("/v0.6.9/") != string::npos) disable("Blacklisted client version."); else - startInitialSync(); + { + // Grab transactions off them. + RLPStream s; + prep(s).appendList(1); + s << GetTransactionsPacket; + sealAndSend(s); + + setNeedsSyncing(latestHash, totalDifficulty); + } break; } case GetTransactionsPacket: @@ -205,7 +302,7 @@ bool EthereumPeer::interpret(RLP const& _r) } if (_r.itemCount() == 1) { - host()->noteHaveChain(this); + transition(Asking::Blocks); return true; } for (unsigned i = 1; i < _r.itemCount(); ++i) @@ -213,17 +310,14 @@ bool EthereumPeer::interpret(RLP const& _r) auto h = _r[i].toHash(); if (host()->m_chain.isKnown(h)) { - host()->noteHaveChain(this); + transition(Asking::Blocks); return true; } else - m_neededBlocks.push_back(h); + m_syncingNeededBlocks.push_back(h); } // run through - ask for more. - RLPStream s; - prep(s).appendList(3); - s << GetBlockHashesPacket << m_neededBlocks.back() << c_maxHashesAsk; - sealAndSend(s); + transition(Asking::Hashes); break; } case GetBlocksPacket: @@ -245,40 +339,6 @@ bool EthereumPeer::interpret(RLP const& _r) sealAndSend(prep(s).appendList(n + 1).append(BlocksPacket).appendRaw(rlp, n)); break; } - case NewBlockPacket: - { - auto h = BlockInfo::headerHash(bd(_r[1].data())); - clogS(NetMessageSummary) << "NewBlock: " << h.abridged(); - - if (_r.itemCount() != 3) - disable("NewBlock without 2 data fields."); - else - { - switch (host()->m_bq.import(_r[1].data(), host()->m_chain)) - { - case ImportResult::Success: - case ImportResult::FutureTime: - addRating(1); - break; - - case ImportResult::Malformed: - disable("Malformed block received."); - break; - - case ImportResult::AlreadyInChain: - case ImportResult::AlreadyKnown: - break; - - case ImportResult::UnknownParent: - clogS(NetMessageSummary) << "Received block with no known parent. Resyncing..."; - setNeedsSyncing(h, _r[2].toInt()); - break; - } - Guard l(x_knownBlocks); - m_knownBlocks.insert(h); - } - break; - } case BlocksPacket: { clogS(NetMessageSummary) << "Blocks (" << dec << (_r.itemCount() - 1) << "entries)" << (_r.itemCount() - 1 ? "" : ": NoMoreBlocks"); @@ -289,8 +349,7 @@ bool EthereumPeer::interpret(RLP const& _r) if (_r.itemCount() == 1) { // Got to this peer's latest block - just give up. - if (m_asking == Asking::Blocks) - finishSync(); + transition(Asking::Nothing); break; } @@ -335,31 +394,44 @@ bool EthereumPeer::interpret(RLP const& _r) } } - unsigned knownParents = 0; - unsigned unknownParents = 0; - if (g_logVerbosity >= NetMessageSummary::verbosity) + clogS(NetMessageSummary) << dec << success << "imported OK," << unknown << "with unknown parents," << future << "with future timestamps," << got << " already known."; + + if (m_asking == Asking::Blocks) + transition(Asking::Blocks); + break; + } + case NewBlockPacket: + { + auto h = BlockInfo::headerHash(bd(_r[1].data())); + clogS(NetMessageSummary) << "NewBlock: " << h.abridged(); + + if (_r.itemCount() != 3) + disable("NewBlock without 2 data fields."); + else { - unsigned ic = _r.itemCount(); - for (unsigned i = 1; i < ic; ++i) + switch (host()->m_bq.import(_r[1].data(), host()->m_chain)) { - auto h = BlockInfo::headerHash(_r[i].data()); - BlockInfo bi(_r[i].data()); - Guard l(x_knownBlocks); - if (!host()->m_chain.details(bi.parentHash) && !m_knownBlocks.count(bi.parentHash)) - { - unknownParents++; - clogS(NetAllDetail) << "Unknown parent" << bi.parentHash.abridged() << "of block" << h.abridged(); - } - else - { - knownParents++; - clogS(NetAllDetail) << "Known parent" << bi.parentHash.abridged() << "of block" << h.abridged(); - } + case ImportResult::Success: + case ImportResult::FutureTime: + addRating(1); + break; + + case ImportResult::Malformed: + disable("Malformed block received."); + break; + + case ImportResult::AlreadyInChain: + case ImportResult::AlreadyKnown: + break; + + case ImportResult::UnknownParent: + clogS(NetMessageSummary) << "Received block with no known parent. Resyncing..."; + setNeedsSyncing(h, _r[2].toInt()); + break; } + Guard l(x_knownBlocks); + m_knownBlocks.insert(h); } - clogS(NetMessageSummary) << dec << success << "known parents," << unknownParents << "unknown," << used << "used."; - if (m_asking == Asking::Blocks) - continueSync(); break; } default: @@ -367,54 +439,3 @@ bool EthereumPeer::interpret(RLP const& _r) } return true; } - -void EthereumPeer::ensureAskingBlocks() -{ - if (m_asking != Asking::Nothing) - return; // Already asked & waiting for some. - - continueSync(); -} - -void EthereumPeer::continueSync() -{ - // If we're getting the hashes already, then we shouldn't be asking for the chain. - if (m_asking == Asking::Hashes) - return; - - auto blocks = m_sub.nextFetch(c_maxBlocksAsk); - - if (blocks.size()) - { - RLPStream s; - prep(s); - s.appendList(blocks.size() + 1) << GetBlocksPacket; - for (auto const& i: blocks) - s << i; - sealAndSend(s); - } - else - finishSync(); -} - -/* - * Possible asking/syncing states for two peers: - * state/ presync - * presync hashes - * presync chain (transiently) - * presync+ chain - * presync nothing - * hashes nothing - * chain hashes - * presync chain (transiently) - * presync+ chain - * presync nothing - */ - -void EthereumPeer::setAsking(Asking _a, bool _isSyncing) -{ - m_asking = _a; - m_isSyncing = _isSyncing; - session()->addNote("ask", _a == Asking::Nothing ? "nothing" : _a == Asking::State ? "state" : _a == Asking::Hashes ? "hashes" : _a == Asking::Blocks ? "blocks" : "?"); - session()->addNote("sync", string(isSyncing() ? "ongoing" : "holding") + (needsSyncing() ? "needed" : "ok")); -} diff --git a/libethereum/EthereumPeer.h b/libethereum/EthereumPeer.h index d5628829c..92b56301d 100644 --- a/libethereum/EthereumPeer.h +++ b/libethereum/EthereumPeer.h @@ -42,6 +42,7 @@ namespace eth /** * @brief The EthereumPeer class * @todo Document fully. + * @todo make state transitions thread-safe. */ class EthereumPeer: public p2p::Capability { @@ -59,21 +60,20 @@ private: virtual bool interpret(RLP const& _r); void sendStatus(); - void startInitialSync(); - void tryGrabbingHashChain(); + void transition(Asking _wantState); + + void attemptSyncing(); /// Ensure that we are waiting for a bunch of blocks from our peer. void ensureAskingBlocks(); - /// Ensure that we are waiting for a bunch of blocks from our peer. - void continueSync(); void finishSync(); void clearKnownTransactions() { std::lock_guard l(x_knownTransactions); m_knownTransactions.clear(); } void setAsking(Asking _g, bool _isSyncing); - void setNeedsSyncing(h256 _latestHash, u256 _td) { m_latestHash = _latestHash; m_totalDifficulty = _td; } + void setNeedsSyncing(h256 _latestHash, u256 _td); bool needsSyncing() const { return !!m_latestHash; } bool isSyncing() const { return m_isSyncing; } @@ -89,13 +89,15 @@ private: bool m_isSyncing = false; /// These are determined through either a Status message or from NewBlock. - h256 m_latestHash; ///< Peer's latest block's hash. + h256 m_latestHash; ///< Peer's latest block's hash that we know about or default null value if no need to sync. u256 m_totalDifficulty; ///< Peer's latest block's total difficulty. - /// Once a sync is started on this peer, they are cleared. + /// Once a sync is started on this peer, they are cleared and moved into m_syncing*. /// This is built as we ask for hashes. Once no more hashes are given, we present this to the /// host who initialises the DownloadMan and m_sub becomes active for us to begin asking for blocks. - h256s m_neededBlocks; ///< The blocks that we should download from this peer. + h256s m_syncingNeededBlocks; ///< The blocks that we should download from this peer. + h256 m_syncingLatestHash; ///< Peer's latest block's hash, as of the current sync. + u256 m_syncingTotalDifficulty; ///< Peer's latest block's total difficulty, as of the current sync. /// Once we're asking for blocks, this becomes in use. DownloadSub m_sub;