From f73b46d2c1fcf82653a5eba59f5449c3a2da6711 Mon Sep 17 00:00:00 2001 From: Gav Wood Date: Mon, 6 Oct 2014 22:58:51 +0200 Subject: [PATCH] Work on making states good and various docs. --- libethereum/CommonNet.h | 3 +- libethereum/EthereumHost.cpp | 42 ++++++++-------- libethereum/EthereumHost.h | 6 +-- libethereum/EthereumPeer.cpp | 96 ++++++++++++++++++++++-------------- libethereum/EthereumPeer.h | 30 ++++++++--- 5 files changed, 109 insertions(+), 68 deletions(-) diff --git a/libethereum/CommonNet.h b/libethereum/CommonNet.h index c7f6929e3..15b05bff3 100644 --- a/libethereum/CommonNet.h +++ b/libethereum/CommonNet.h @@ -59,9 +59,10 @@ enum EthereumPacket BlockHashesPacket, GetBlocksPacket, BlocksPacket, + NewBlockPacket, }; -enum class Grabbing +enum class Asking { State, Hashes, diff --git a/libethereum/EthereumHost.cpp b/libethereum/EthereumHost.cpp index 61b0ec70a..4be0065aa 100644 --- a/libethereum/EthereumHost.cpp +++ b/libethereum/EthereumHost.cpp @@ -77,28 +77,28 @@ void EthereumHost::notePeerStateChanged(EthereumPeer* _who) // TODO: FIX: BUG: Better state management! // if already downloading hash-chain, ignore. - if (m_grabbing != Grabbing::Nothing) + if (m_grabbing != Asking::Nothing) { for (auto const& i: peers()) - if (i->cap()->m_grabbing == m_grabbing || m_grabbing == Grabbing::Presync) + if (i->cap()->m_grabbing == m_grabbing || m_grabbing == Asking::Presync) { clog(NetAllDetail) << "Already downloading chain. Just set to help out."; _who->ensureGettingChain(); return; } - m_grabbing = Grabbing::Nothing; + m_grabbing = Asking::Nothing; } // otherwise check to see if we should be downloading... _who->tryGrabbingHashChain(); } -void EthereumHost::updateGrabbing(Grabbing _g) +void EthereumHost::updateGrabbing(Asking _g) { m_grabbing = _g; - if (_g == Grabbing::Nothing) + if (_g == Asking::Nothing) readyForSync(); - else if (_g == Grabbing::Chain) + else if (_g == Asking::Chain) for (auto j: peers()) j->cap()->ensureGettingChain(); } @@ -109,8 +109,8 @@ void EthereumHost::noteHaveChain(EthereumPeer* _from) if (_from->m_neededBlocks.empty()) { - _from->setGrabbing(Grabbing::Nothing); - updateGrabbing(Grabbing::Nothing); + _from->setGrabbing(Asking::Nothing); + updateGrabbing(Asking::Nothing); return; } @@ -119,8 +119,8 @@ void EthereumHost::noteHaveChain(EthereumPeer* _from) if (td < m_chain.details().totalDifficulty || (td == m_chain.details().totalDifficulty && m_chain.currentHash() == _from->m_latestHash)) { clog(NetNote) << "Difficulty of hashchain not HIGHER. Ignoring."; - _from->setGrabbing(Grabbing::Nothing); - updateGrabbing(Grabbing::Nothing); + _from->setGrabbing(Asking::Nothing); + updateGrabbing(Asking::Nothing); return; } @@ -130,8 +130,8 @@ void EthereumHost::noteHaveChain(EthereumPeer* _from) m_man.resetToChain(_from->m_neededBlocks); m_latestBlockSent = _from->m_latestHash; - _from->setGrabbing(Grabbing::Chain); - updateGrabbing(Grabbing::Chain); + _from->setGrabbing(Asking::Chain); + updateGrabbing(Asking::Chain); } void EthereumHost::readyForSync() @@ -140,9 +140,9 @@ void EthereumHost::readyForSync() for (auto j: peers()) { j->cap()->tryGrabbingHashChain(); - if (j->cap()->m_grabbing == Grabbing::Hashes) + if (j->cap()->m_grabbing == Asking::Hashes) { - m_grabbing = Grabbing::Hashes; + m_grabbing = Asking::Hashes; return; } } @@ -155,15 +155,15 @@ void EthereumHost::noteDoneBlocks(EthereumPeer* _who) { // Done our chain-get. clog(NetNote) << "Chain download complete."; - updateGrabbing(Grabbing::Nothing); + updateGrabbing(Asking::Nothing); m_man.reset(); } - if (_who->m_grabbing == Grabbing::Chain) + if (_who->m_grabbing == Asking::Chain) { // Done our chain-get. clog(NetNote) << "Chain download failed. Peer with blocks didn't have them all. This peer is bad and should be punished."; // TODO: note that peer is BADBADBAD! - updateGrabbing(Grabbing::Nothing); + updateGrabbing(Asking::Nothing); m_man.reset(); } } @@ -192,7 +192,7 @@ void EthereumHost::doWork() void EthereumHost::maintainTransactions(TransactionQueue& _tq, h256 _currentHash) { - bool resendAll = (m_grabbing == Grabbing::Nothing && m_chain.isKnown(m_latestBlockSent) && _currentHash != m_latestBlockSent); + bool resendAll = (m_grabbing == Asking::Nothing && m_chain.isKnown(m_latestBlockSent) && _currentHash != m_latestBlockSent); { lock_guard l(m_incomingLock); for (auto it = m_incomingTransactions.begin(); it != m_incomingTransactions.end(); ++it) @@ -218,7 +218,7 @@ void EthereumHost::maintainTransactions(TransactionQueue& _tq, h256 _currentHash } ep->clearKnownTransactions(); - if (n) + if (n || ep->m_requireTransactions) { RLPStream ts; EthereumPeer::prep(ts); @@ -233,7 +233,7 @@ void EthereumHost::maintainTransactions(TransactionQueue& _tq, h256 _currentHash void EthereumHost::reset() { - m_grabbing = Grabbing::Nothing; + m_grabbing = Asking::Nothing; m_man.resetToChain(h256s()); @@ -247,7 +247,7 @@ void EthereumHost::reset() void EthereumHost::maintainBlocks(BlockQueue& _bq, h256 _currentHash) { // If we've finished our initial sync send any new blocks. - if (m_grabbing == Grabbing::Nothing && m_chain.isKnown(m_latestBlockSent) && m_chain.details(m_latestBlockSent).totalDifficulty < m_chain.details(_currentHash).totalDifficulty) + if (m_grabbing == Asking::Nothing && m_chain.isKnown(m_latestBlockSent) && m_chain.details(m_latestBlockSent).totalDifficulty < m_chain.details(_currentHash).totalDifficulty) { // TODO: clean up h256s hs; diff --git a/libethereum/EthereumHost.h b/libethereum/EthereumHost.h index 49b8077a8..8cbfbf678 100644 --- a/libethereum/EthereumHost.h +++ b/libethereum/EthereumHost.h @@ -70,7 +70,7 @@ public: void reset(); DownloadMan const& downloadMan() const { return m_man; } - bool isSyncing() const { return m_grabbing == Grabbing::Chain; } + bool isSyncing() const { return m_grabbing == Asking::Chain; } private: void noteHavePeerState(EthereumPeer* _who); @@ -103,7 +103,7 @@ private: virtual void onStopping() { stopWorking(); } void readyForSync(); - void updateGrabbing(Grabbing _g); + void updateGrabbing(Asking _g); BlockChain const& m_chain; TransactionQueue& m_tq; ///< Maintains a list of incoming transactions not yet in a block on the blockchain. @@ -111,7 +111,7 @@ private: u256 m_networkId; - Grabbing m_grabbing = Grabbing::Nothing; // TODO: needs to be thread-safe & switch to just having a peer id. + Asking m_grabbing = Asking::Nothing; // TODO: needs to be thread-safe & switch to just having a peer id. DownloadMan m_man; diff --git a/libethereum/EthereumPeer.cpp b/libethereum/EthereumPeer.cpp index 9b8139289..ee5db6a4b 100644 --- a/libethereum/EthereumPeer.cpp +++ b/libethereum/EthereumPeer.cpp @@ -44,7 +44,7 @@ EthereumPeer::EthereumPeer(Session* _s, HostCapabilityFace* _h): EthereumPeer::~EthereumPeer() { - giveUpOnFetch(); + finishSync(); } EthereumHost* EthereumPeer::host() const @@ -119,7 +119,7 @@ void EthereumPeer::tryGrabbingHashChain() } } -void EthereumPeer::giveUpOnFetch() +void EthereumPeer::finishSync() { clogS(NetNote) << "Finishing fetch..."; @@ -169,12 +169,12 @@ bool EthereumPeer::interpret(RLP const& _r) { clogS(NetMessageSummary) << "Transactions (" << dec << (_r.itemCount() - 1) << "entries)"; addRating(_r.itemCount() - 1); - RecursiveGuard l(m_incomingLock); Guard l(x_knownTransactions); for (unsigned i = 1; i < _r.itemCount(); ++i) { - m_incomingTransactions.push_back(_r[i].data().toBytes()); m_knownTransactions.insert(sha3(_r[i].data())); + if (!_tq.import(_r[i].data())) // if we already had the transaction, then don't bother sending it on. + host()->m_transactionsSent.insert(sha3(*it)); } break; } @@ -245,23 +245,59 @@ bool EthereumPeer::interpret(RLP const& _r) sealAndSend(prep(s).appendList(n + 1).append(BlocksPacket).appendRaw(rlp, n)); break; } + case NewBlockPacket: + { + auto h = BlockInfo::headerHash(bd(_r[1].data())); + clogS(NetMessageSummary) << "NewBlock: " << h.abridged(); + + if (_r.itemCount() != 3) + disable("NewBlock without 2 data fields."); + else + { + switch (host()->m_bq.import(_r[1].data(), host()->m_chain)) + { + case ImportResult::Success: + case ImportResult::FutureTime: + addRating(1); + break; + + case ImportResult::Malformed: + disable("Malformed block received."); + break; + + case ImportResult::AlreadyInChain: + case ImportResult::AlreadyKnown: + break; + + case ImportResult::UnknownParent: + clogS(NetMessageSummary) << "Received block with no known parent. Resyncing..."; + setNeedsSyncing(h, _r[2].toInt()); + break; + } + Guard l(x_knownBlocks); + m_knownBlocks.insert(h); + } + break; + } case BlocksPacket: { clogS(NetMessageSummary) << "Blocks (" << dec << (_r.itemCount() - 1) << "entries)" << (_r.itemCount() - 1 ? "" : ": NoMoreBlocks"); + if (m_asking != Asking::Blocks) + clogS(NetWarn) << "Unexpected Blocks received!"; + if (_r.itemCount() == 1) { - // Couldn't get any from last batch - probably got to this peer's latest block - just give up. - if (m_asking == Asking::Blocks || m_asking == Asking::ChainHelper) - giveUpOnFetch(); + // Got to this peer's latest block - just give up. + if (m_asking == Asking::Blocks) + finishSync(); break; } unsigned success = 0; - unsigned got = 0; - unsigned bad = 0; - unsigned unknown = 0; unsigned future = 0; + unsigned unknown = 0; + unsigned got = 0; for (unsigned i = 1; i < _r.itemCount(); ++i) { @@ -276,12 +312,13 @@ bool EthereumPeer::interpret(RLP const& _r) switch (host()->m_bq.import(_r[i].data(), host()->m_chain)) { case ImportResult::Success: + addRating(1); success++; break; case ImportResult::Malformed: - bad++; - break; + disable("Malformed block received."); + return true; case ImportResult::FutureTime: future++; @@ -298,17 +335,6 @@ bool EthereumPeer::interpret(RLP const& _r) } } - if (unknown && m_asking == Asking::Nothing) - { - // TODO: kick off resync. - } - - if (bad) - { - // TODO: punish peer - } - - addRating(used); unsigned knownParents = 0; unsigned unknownParents = 0; if (g_logVerbosity >= NetMessageSummary::verbosity) @@ -331,9 +357,9 @@ bool EthereumPeer::interpret(RLP const& _r) } } } - clogS(NetMessageSummary) << dec << knownParents << "known parents," << unknownParents << "unknown," << used << "used."; - if (m_asking == Asking::Blocks || m_asking == Asking::ChainHelper) - continueGettingChain(); + clogS(NetMessageSummary) << dec << success << "known parents," << unknownParents << "unknown," << used << "used."; + if (m_asking == Asking::Blocks) + continueSync(); break; } default: @@ -342,17 +368,15 @@ bool EthereumPeer::interpret(RLP const& _r) return true; } -void EthereumPeer::ensureGettingChain() +void EthereumPeer::ensureAskingBlocks() { - if (m_helping) + if (m_asking != Asking::Nothing) return; // Already asked & waiting for some. - // Help otherwise, unless we're already the Chain grabber. - setHelping(true); - continueGettingChain(); + continueSync(); } -void EthereumPeer::continueGettingChain() +void EthereumPeer::continueSync() { // If we're getting the hashes already, then we shouldn't be asking for the chain. if (m_asking == Asking::Hashes) @@ -370,7 +394,7 @@ void EthereumPeer::continueGettingChain() sealAndSend(s); } else - giveUpOnFetch(); + finishSync(); } /* @@ -387,10 +411,10 @@ void EthereumPeer::continueGettingChain() * presync nothing */ -void EthereumPeer::setAsking(Asking _a, Syncing _s) +void EthereumPeer::setAsking(Asking _a, bool _isSyncing) { m_asking = _a; - m_syncing = _s; + m_isSyncing = _isSyncing; session()->addNote("ask", _a == Asking::Nothing ? "nothing" : _a == Asking::State ? "state" : _a == Asking::Hashes ? "hashes" : _a == Asking::Blocks ? "blocks" : "?"); - session()->addNote("sync", _s == Syncing::Done ? "done" : _s == Syncing::Waiting ? "wait" : _s == Syncing::Executing ? "exec" : "?"); + session()->addNote("sync", string(isSyncing() ? "ongoing" : "holding") + (needsSyncing() ? "needed" : "ok")); } diff --git a/libethereum/EthereumPeer.h b/libethereum/EthereumPeer.h index da020f503..d5628829c 100644 --- a/libethereum/EthereumPeer.h +++ b/libethereum/EthereumPeer.h @@ -64,26 +64,43 @@ private: void tryGrabbingHashChain(); /// Ensure that we are waiting for a bunch of blocks from our peer. - void ensureGettingChain(); + void ensureAskingBlocks(); /// Ensure that we are waiting for a bunch of blocks from our peer. - void continueGettingChain(); + void continueSync(); - void giveUpOnFetch(); + void finishSync(); void clearKnownTransactions() { std::lock_guard l(x_knownTransactions); m_knownTransactions.clear(); } - void setAsking(Asking _g, bool _helping = false); - void setHelping(bool _helping = false) { setAsking(m_asking, _helping); } + void setAsking(Asking _g, bool _isSyncing); + + void setNeedsSyncing(h256 _latestHash, u256 _td) { m_latestHash = _latestHash; m_totalDifficulty = _td; } + bool needsSyncing() const { return !!m_latestHash; } + bool isSyncing() const { return m_isSyncing; } + /// Peer's protocol version. unsigned m_protocolVersion; + /// Peer's network id. u256 m_networkId; + /// What, if anything, we last asked the other peer for. Asking m_asking; - Syncing m_syncing; + /// Whether this peer is in the process of syncing or not. Only one peer can be syncing at once. + bool m_isSyncing = false; + + /// These are determined through either a Status message or from NewBlock. h256 m_latestHash; ///< Peer's latest block's hash. u256 m_totalDifficulty; ///< Peer's latest block's total difficulty. + /// Once a sync is started on this peer, they are cleared. + + /// This is built as we ask for hashes. Once no more hashes are given, we present this to the + /// host who initialises the DownloadMan and m_sub becomes active for us to begin asking for blocks. h256s m_neededBlocks; ///< The blocks that we should download from this peer. + /// Once we're asking for blocks, this becomes in use. + DownloadSub m_sub; + + /// Have we received a GetTransactions packet that we haven't yet answered? bool m_requireTransactions; Mutex x_knownBlocks; @@ -91,7 +108,6 @@ private: std::set m_knownTransactions; std::mutex x_knownTransactions; - DownloadSub m_sub; }; }