From 611caef125367602a1ee7b9572cc37da225d6d6c Mon Sep 17 00:00:00 2001 From: Gav Wood Date: Tue, 7 Oct 2014 15:43:49 +0200 Subject: [PATCH] Compilable etheruem network rewrite. --- libethereum/EthereumHost.cpp | 140 +++++++++++------------------------ libethereum/EthereumHost.h | 22 +++--- libethereum/EthereumPeer.cpp | 126 ++++++++++++++++++++++--------- libethereum/EthereumPeer.h | 48 ++++++++---- 4 files changed, 175 insertions(+), 161 deletions(-) diff --git a/libethereum/EthereumHost.cpp b/libethereum/EthereumHost.cpp index 2c87d887a..3cdc442bf 100644 --- a/libethereum/EthereumHost.cpp +++ b/libethereum/EthereumHost.cpp @@ -52,7 +52,7 @@ EthereumHost::EthereumHost(BlockChain const& _ch, TransactionQueue& _tq, BlockQu EthereumHost::~EthereumHost() { for (auto const& i: peers()) - i->cap()->giveUpOnFetch(); + i->cap()->abortSync(); } bool EthereumHost::ensureInitialised(TransactionQueue& _tq) @@ -70,108 +70,76 @@ bool EthereumHost::ensureInitialised(TransactionQueue& _tq) return false; } -void EthereumHost::notePeerStateChanged(EthereumPeer* _who) +void EthereumHost::noteNeedsSyncing(EthereumPeer* _who) { - clog(NetAllDetail) << "Peer state changed."; - - // TODO: FIX: BUG: Better state management! - // if already downloading hash-chain, ignore. - if (m_grabbing != Asking::Nothing) + if (isSyncing()) { - for (auto const& i: peers()) - if (i->cap()->m_grabbing == m_grabbing || m_grabbing == Asking::Presync) - { - clog(NetAllDetail) << "Already downloading chain. Just set to help out."; - _who->ensureGettingChain(); - return; - } - m_grabbing = Asking::Nothing; + clog(NetAllDetail) << "Sync in progress: Just set to help out."; + if (m_syncer->m_asking == Asking::Blocks) + _who->transition(Asking::Blocks); } - - // otherwise check to see if we should be downloading... - _who->attemptSyncing(); + else + // otherwise check to see if we should be downloading... + _who->attemptSync(); } -void EthereumHost::updateGrabbing(Asking _g, EthereumPeer* _ignore) +void EthereumHost::updateSyncer(EthereumPeer* _syncer) { - m_grabbing = _g; - if (_g == Asking::Nothing) - readyForSync(); - else if (_g == Asking::Blocks) + if (_syncer) + { for (auto j: peers()) - if (j->cap().get() != _ignore && j->cap()->m_asking == Asking::Nothing) + if (j->cap().get() != _syncer && j->cap()->m_asking == Asking::Nothing) j->cap()->transition(Asking::Blocks); -} - -bool EthereumHost::shouldGrabBlocks(EthereumPeer* _from) -{ - auto td = _from->m_syncingTotalDifficulty; - auto lh = _from->m_syncingLatestHash; - - if (_from->m_syncingNeededBlocks.empty()) - { - updateGrabbing(Asking::Nothing); - return false; } - - clog(NetNote) << "Hash-chain COMPLETE:" << td << "vs" << m_chain.details().totalDifficulty << ";" << _from->m_syncingNeededBlocks.size() << " blocks, ends" << _from->m_syncingNeededBlocks.back().abridged(); - - if (td < m_chain.details().totalDifficulty || (td == m_chain.details().totalDifficulty && m_chain.currentHash() == lh)) - { - clog(NetNote) << "Difficulty of hashchain not HIGHER. Ignoring."; - updateGrabbing(Asking::Nothing); - return false; - } - - clog(NetNote) << "Difficulty of hashchain HIGHER. Replacing fetch queue [latest now" << lh.abridged() << ", was" << m_latestBlockSent.abridged() << "]"; - - return true; -} - -void EthereumHost::readyForSync() -{ - // start grabbing next hash chain if there is one. - for (auto j: peers()) + else { - j->cap()->attemptSyncing(); - if (j->cap()->m_grabbing == Asking::Hashes) + // start grabbing next hash chain if there is one. + for (auto j: peers()) { - m_grabbing = Asking::Hashes; - return; + j->cap()->attemptSync(); + if (isSyncing()) + return; } + clog(NetNote) << "No more peers to sync with."; } - clog(NetNote) << "No more peers to sync with."; } -void EthereumHost::noteDoneBlocks(EthereumPeer* _who) +void EthereumHost::noteDoneBlocks(EthereumPeer* _who, bool _clemency) { if (m_man.isComplete()) { // Done our chain-get. clog(NetNote) << "Chain download complete."; - updateGrabbing(Asking::Nothing); + // 1/100th for each useful block hash. + _who->addRating(m_man.chain().size() / 100); m_man.reset(); } if (_who->isSyncing()) { - // Done our chain-get. - clog(NetNote) << "Chain download failed. Peer with blocks didn't have them all. This peer is bad and should be punished."; - // TODO: note that peer is BADBADBAD! - updateGrabbing(Asking::Nothing); + if (_clemency) + clog(NetNote) << "Chain download failed. Aborted while incomplete."; + else + { + // Done our chain-get. + clog(NetNote) << "Chain download failed. Peer with blocks didn't have them all. This peer is bad and should be punished."; + + m_banned.insert(_who->session()->id()); // We know who you are! + _who->disable("Peer sent hashes but was unable to provide the blocks."); + } m_man.reset(); } } -bool EthereumHost::noteBlock(h256 _hash, bytesConstRef _data) +void EthereumHost::reset() { - if (!m_chain.details(_hash)) - { - lock_guard l(m_incomingLock); - m_incomingBlocks.push_back(_data.toBytes()); - return true; - } - return false; + if (m_syncer) + m_syncer->abortSync(); + + m_man.resetToChain(h256s()); + + m_latestBlockSent = h256(); + m_transactionsSent.clear(); } void EthereumHost::doWork() @@ -187,16 +155,7 @@ void EthereumHost::doWork() void EthereumHost::maintainTransactions(TransactionQueue& _tq, h256 _currentHash) { - bool resendAll = (m_grabbing == Asking::Nothing && m_chain.isKnown(m_latestBlockSent) && _currentHash != m_latestBlockSent); - { - lock_guard l(m_incomingLock); - for (auto it = m_incomingTransactions.begin(); it != m_incomingTransactions.end(); ++it) - if (_tq.import(&*it)) - {}//ret = true; // just putting a transaction in the queue isn't enough to change the state - it might have an invalid nonce... - else - m_transactionsSent.insert(sha3(*it)); // if we already had the transaction, then don't bother sending it on. - m_incomingTransactions.clear(); - } + bool resendAll = (!isSyncing() && m_chain.isKnown(m_latestBlockSent) && _currentHash != m_latestBlockSent); // Send any new transactions. for (auto const& p: peers()) @@ -226,23 +185,10 @@ void EthereumHost::maintainTransactions(TransactionQueue& _tq, h256 _currentHash } } -void EthereumHost::reset() -{ - m_grabbing = Asking::Nothing; - - m_man.resetToChain(h256s()); - - m_incomingTransactions.clear(); - m_incomingBlocks.clear(); - - m_latestBlockSent = h256(); - m_transactionsSent.clear(); -} - void EthereumHost::maintainBlocks(BlockQueue& _bq, h256 _currentHash) { // If we've finished our initial sync send any new blocks. - if (m_grabbing == Asking::Nothing && m_chain.isKnown(m_latestBlockSent) && m_chain.details(m_latestBlockSent).totalDifficulty < m_chain.details(_currentHash).totalDifficulty) + if (!isSyncing() && m_chain.isKnown(m_latestBlockSent) && m_chain.details(m_latestBlockSent).totalDifficulty < m_chain.details(_currentHash).totalDifficulty) { // TODO: clean up h256s hs; diff --git a/libethereum/EthereumHost.h b/libethereum/EthereumHost.h index b42ae7fa7..92e2c8c32 100644 --- a/libethereum/EthereumHost.h +++ b/libethereum/EthereumHost.h @@ -70,19 +70,16 @@ public: void reset(); DownloadMan const& downloadMan() const { return m_man; } - bool isSyncing() const { return m_grabbing == Asking::Chain; } + bool isSyncing() const { return !!m_syncer; } -private: - void noteHavePeerState(EthereumPeer* _who); - /// Session wants to pass us a block that we might not have. - /// @returns true if we didn't have it. - bool noteBlock(h256 _hash, bytesConstRef _data); + bool isBanned(h512 _id) const { return m_banned.count(_id); } - /// Session has finished getting the chain of hashes. - bool shouldGrabBlocks(EthereumPeer* _who); +private: + /// Session is tell us that we may need (re-)syncing with the peer. + void noteNeedsSyncing(EthereumPeer* _who); /// Called when the peer can no longer provide us with any needed blocks. - void noteDoneBlocks(EthereumPeer* _who); + void noteDoneBlocks(EthereumPeer* _who, bool _clemency); /// Sync with the BlockChain. It might contain one of our mined blocks, we might have new candidates from the network. void doWork(); @@ -104,8 +101,7 @@ private: virtual void onStarting() { startWorking(); } virtual void onStopping() { stopWorking(); } - void readyForSync(); - void updateGrabbing(Asking _g, EthereumPeer* _ignore); + void updateSyncer(EthereumPeer* _ignore); BlockChain const& m_chain; TransactionQueue& m_tq; ///< Maintains a list of incoming transactions not yet in a block on the blockchain. @@ -113,12 +109,14 @@ private: u256 m_networkId; - Asking m_grabbing = Asking::Nothing; // TODO: needs to be thread-safe & switch to just having a peer id. + EthereumPeer* m_syncer = nullptr; // TODO: switch to weak_ptr DownloadMan m_man; h256 m_latestBlockSent; h256Set m_transactionsSent; + + std::set m_banned; }; } diff --git a/libethereum/EthereumPeer.cpp b/libethereum/EthereumPeer.cpp index 20a9fef52..175c0bbae 100644 --- a/libethereum/EthereumPeer.cpp +++ b/libethereum/EthereumPeer.cpp @@ -27,6 +27,8 @@ #include #include "BlockChain.h" #include "EthereumHost.h" +#include "TransactionQueue.h" +#include "BlockQueue.h" using namespace std; using namespace dev; using namespace dev::eth; @@ -38,13 +40,18 @@ EthereumPeer::EthereumPeer(Session* _s, HostCapabilityFace* _h): Capability(_s, _h), m_sub(host()->m_man) { - setAsking(Asking::State, false); - sendStatus(); + transition(Asking::State); } EthereumPeer::~EthereumPeer() { - finishSync(); + abortSync(); +} + +void EthereumPeer::abortSync() +{ + if (isSyncing()) + transition(Asking::Nothing, true); } EthereumHost* EthereumPeer::host() const @@ -54,26 +61,46 @@ EthereumHost* EthereumPeer::host() const void EthereumPeer::sendStatus() { - RLPStream s; - prep(s); - s.appendList(6) << StatusPacket - << host()->protocolVersion() - << host()->networkId() - << host()->m_chain.details().totalDifficulty - << host()->m_chain.currentHash() - << host()->m_chain.genesisHash(); - sealAndSend(s); } /* * Possible asking/syncing states for two peers: */ -void EthereumPeer::transition(Asking _a) +string toString(Asking _a) +{ + switch (_a) + { + case Asking::Blocks: return "Blocks"; + case Asking::Hashes: return "Hashes"; + case Asking::Nothing: return "Nothing"; + case Asking::State: return "State"; + } + return "?"; +} + +void EthereumPeer::transition(Asking _a, bool _force) { + clogS(NetMessageSummary) << "Transition!" << ::toString(_a) << "from" << ::toString(m_asking) << ", " << (isSyncing() ? "syncing" : "holding") << (needsSyncing() ? "& needed" : ""); + RLPStream s; prep(s); - if (_a == Asking::Hashes) + if (_a == Asking::State) + { + if (m_asking == Asking::Nothing) + { + setAsking(Asking::State, false); + s.appendList(6) << StatusPacket + << host()->protocolVersion() + << host()->networkId() + << host()->m_chain.details().totalDifficulty + << host()->m_chain.currentHash() + << host()->m_chain.genesisHash(); + sealAndSend(s); + return; + } + } + else if (_a == Asking::Hashes) { if (m_asking == Asking::State || m_asking == Asking::Nothing) { @@ -87,7 +114,6 @@ void EthereumPeer::transition(Asking _a) setAsking(_a, true); s.appendList(3) << GetBlockHashesPacket << m_syncingLatestHash << c_maxHashesAsk; m_syncingNeededBlocks = h256s(1, m_syncingLatestHash); - host()->updateGrabbing(Asking::Hashes); sealAndSend(s); return; } @@ -106,15 +132,17 @@ void EthereumPeer::transition(Asking _a) { if (m_asking == Asking::Hashes) { - if (host()->shouldGrabBlocks(this)) + if (shouldGrabBlocks()) { + clog(NetNote) << "Difficulty of hashchain HIGHER. Grabbing" << m_syncingNeededBlocks.size() << "blocks [latest now" << m_syncingLatestHash.abridged() << ", was" << host()->m_latestBlockSent.abridged() << "]"; + host()->m_man.resetToChain(m_syncingNeededBlocks); host()->m_latestBlockSent = m_syncingLatestHash; - - host()->updateGrabbing(Asking::Blocks, this); } else { + clog(NetNote) << "Difficulty of hashchain not HIGHER. Ignoring."; + setAsking(Asking::Nothing, false); return; } @@ -145,7 +173,7 @@ void EthereumPeer::transition(Asking _a) // a bit overkill given that the other nodes may yet have the needed blocks, but better to be safe than sorry. if (isSyncing()) - host()->noteDoneBlocks(this); + host()->noteDoneBlocks(this, _force); // NOTE: need to notify of giving up on chain-hashes, too, altering state as necessary. m_sub.doneFetch(); @@ -156,28 +184,28 @@ void EthereumPeer::transition(Asking _a) { clogS(NetNote) << "Finishing hashes fetch..."; - if (isSyncing()) - host()->noteDoneBlocks(this); - setAsking(Asking::Nothing, false); } else if (m_asking == Asking::State) { setAsking(Asking::Nothing, false); - // TODO: Just got the state - should check to see if we can be of help downloading the chain if any. - // TODO: Otherwise, should put ourselves up for sync. + // Just got the state - should check to see if we can be of help downloading the chain if any. + // Otherwise, should put ourselves up for sync. + setNeedsSyncing(m_latestHash, m_totalDifficulty); } // Otherwise it's fine. We don't care if it's Nothing->Nothing. return; } - clogS(NetWarn) << "Invalid state transition:" << (int)_a << "from" << (int)m_asking << "/" << boolalpha << isSyncing() << needsSyncing(); + clogS(NetWarn) << "Invalid state transition:" << ::toString(_a) << "from" << ::toString(m_asking) << ", " << (isSyncing() ? "syncing" : "holding") << (needsSyncing() ? "& needed" : ""); } void EthereumPeer::setAsking(Asking _a, bool _isSyncing) { m_asking = _a; - m_isSyncing = _isSyncing; + if (_isSyncing != (host()->m_syncer == this)) + host()->updateSyncer(_isSyncing ? this : nullptr); + session()->addNote("ask", _a == Asking::Nothing ? "nothing" : _a == Asking::State ? "state" : _a == Asking::Hashes ? "hashes" : _a == Asking::Blocks ? "blocks" : "?"); session()->addNote("sync", string(isSyncing() ? "ongoing" : "holding") + (needsSyncing() ? " & needed" : "")); } @@ -187,11 +215,32 @@ void EthereumPeer::setNeedsSyncing(h256 _latestHash, u256 _td) m_latestHash = _latestHash; m_totalDifficulty = _td; - // TODO: should be "noteNeedsSyncing" or some such. - host()->notePeerStateChanged(this); + host()->noteNeedsSyncing(this); +} + +bool EthereumPeer::isSyncing() const +{ + return host()->m_syncer == this; } -void EthereumPeer::attemptSyncing() +bool EthereumPeer::shouldGrabBlocks() const +{ + auto td = m_syncingTotalDifficulty; + auto lh = m_syncingLatestHash; + auto ctd = host()->m_chain.details().totalDifficulty; + + if (m_syncingNeededBlocks.empty()) + return false; + + clog(NetNote) << "Should grab blocks? " << td << "vs" << ctd << ";" << m_syncingNeededBlocks.size() << " blocks, ends" << m_syncingNeededBlocks.back().abridged(); + + if (td < ctd || (td == ctd && host()->m_chain.currentHash() == lh)) + return false; + + return true; +} + +void EthereumPeer::attemptSync() { if (m_asking != Asking::Nothing) { @@ -231,8 +280,8 @@ bool EthereumPeer::interpret(RLP const& _r) { m_protocolVersion = _r[1].toInt(); m_networkId = _r[2].toInt(); - auto totalDifficulty = _r[3].toInt(); - auto latestHash = _r[4].toHash(); + m_totalDifficulty = _r[3].toInt(); + m_latestHash = _r[4].toHash(); auto genesisHash = _r[5].toHash(); clogS(NetMessageSummary) << "Status:" << m_protocolVersion << "/" << m_networkId << "/" << genesisHash.abridged() << ", TD:" << m_totalDifficulty << "=" << m_latestHash.abridged(); @@ -245,6 +294,8 @@ bool EthereumPeer::interpret(RLP const& _r) disable("Invalid network identifier."); else if (session()->info().clientVersion.find("/v0.6.9/") != string::npos) disable("Blacklisted client version."); + else if (host()->isBanned(session()->id())) + disable("Peer banned for previous bad behaviour."); else { // Grab transactions off them. @@ -252,8 +303,7 @@ bool EthereumPeer::interpret(RLP const& _r) prep(s).appendList(1); s << GetTransactionsPacket; sealAndSend(s); - - setNeedsSyncing(latestHash, totalDifficulty); + transition(Asking::Nothing); } break; } @@ -269,9 +319,11 @@ bool EthereumPeer::interpret(RLP const& _r) Guard l(x_knownTransactions); for (unsigned i = 1; i < _r.itemCount(); ++i) { - m_knownTransactions.insert(sha3(_r[i].data())); - if (!_tq.import(_r[i].data())) // if we already had the transaction, then don't bother sending it on. - host()->m_transactionsSent.insert(sha3(*it)); + auto h = sha3(_r[i].data()); + m_knownTransactions.insert(h); + if (!host()->m_tq.import(_r[i].data())) + // if we already had the transaction, then don't bother sending it on. + host()->m_transactionsSent.insert(h); } break; } @@ -402,7 +454,7 @@ bool EthereumPeer::interpret(RLP const& _r) } case NewBlockPacket: { - auto h = BlockInfo::headerHash(bd(_r[1].data())); + auto h = BlockInfo::headerHash(_r[1].data()); clogS(NetMessageSummary) << "NewBlock: " << h.abridged(); if (_r.itemCount() != 3) diff --git a/libethereum/EthereumPeer.h b/libethereum/EthereumPeer.h index 92b56301d..a736516c2 100644 --- a/libethereum/EthereumPeer.h +++ b/libethereum/EthereumPeer.h @@ -49,41 +49,59 @@ class EthereumPeer: public p2p::Capability friend class EthereumHost; public: + /// Basic constructor. EthereumPeer(p2p::Session* _s, p2p::HostCapabilityFace* _h); + + /// Basic destructor. virtual ~EthereumPeer(); + /// What is our name? static std::string name() { return "eth"; } + /// What is the ethereum subprotocol host object. EthereumHost* host() const; private: + /// Interpret an incoming message. virtual bool interpret(RLP const& _r); + /// Send our status to peer. void sendStatus(); - void transition(Asking _wantState); - - void attemptSyncing(); + /// Transition state in a particular direction. + void transition(Asking _wantState, bool _force = false); - /// Ensure that we are waiting for a bunch of blocks from our peer. - void ensureAskingBlocks(); + /// Attempt to begin syncing with this peer; first check the peer has a more difficlult chain to download, then start asking for hashes, then move to blocks. + void attemptSync(); - void finishSync(); + /// Abort the sync operation. + void abortSync(); + /// Clear all known transactions. void clearKnownTransactions() { std::lock_guard l(x_knownTransactions); m_knownTransactions.clear(); } + + /// Update our asking state. void setAsking(Asking _g, bool _isSyncing); + /// Update our syncing requirements state. void setNeedsSyncing(h256 _latestHash, u256 _td); + + /// Do we presently need syncing with this peer? bool needsSyncing() const { return !!m_latestHash; } - bool isSyncing() const { return m_isSyncing; } - + + /// Are we presently syncing with this peer? + bool isSyncing() const; + + /// Check whether the session should bother grabbing the peer's blocks. + bool shouldGrabBlocks() const; + /// Peer's protocol version. unsigned m_protocolVersion; /// Peer's network id. u256 m_networkId; /// What, if anything, we last asked the other peer for. - Asking m_asking; + Asking m_asking = Asking::Nothing; /// Whether this peer is in the process of syncing or not. Only one peer can be syncing at once. bool m_isSyncing = false; @@ -95,9 +113,9 @@ private: /// This is built as we ask for hashes. Once no more hashes are given, we present this to the /// host who initialises the DownloadMan and m_sub becomes active for us to begin asking for blocks. - h256s m_syncingNeededBlocks; ///< The blocks that we should download from this peer. - h256 m_syncingLatestHash; ///< Peer's latest block's hash, as of the current sync. - u256 m_syncingTotalDifficulty; ///< Peer's latest block's total difficulty, as of the current sync. + h256s m_syncingNeededBlocks; ///< The blocks that we should download from this peer. + h256 m_syncingLatestHash; ///< Peer's latest block's hash, as of the current sync. + u256 m_syncingTotalDifficulty; ///< Peer's latest block's total difficulty, as of the current sync. /// Once we're asking for blocks, this becomes in use. DownloadSub m_sub; @@ -106,9 +124,9 @@ private: bool m_requireTransactions; Mutex x_knownBlocks; - std::set m_knownBlocks; - std::set m_knownTransactions; - std::mutex x_knownTransactions; + h256Set m_knownBlocks; ///< Blocks that the peer already knows about (that don't need to be sent to them). + Mutex x_knownTransactions; + h256Set m_knownTransactions; ///< Transactions that the peer already knows of. };