From 3f61b506dbb064b39e74920f02569f8f7194dfcc Mon Sep 17 00:00:00 2001 From: Gav Wood Date: Mon, 6 Oct 2014 14:25:11 +0200 Subject: [PATCH] Remove incoming queue. Put things straight into actual queues. Make state items more fitting. --- libethereum/BlockQueue.cpp | 14 +++-- libethereum/BlockQueue.h | 17 ++++- libethereum/CommonNet.h | 10 ++- libethereum/EthereumHost.cpp | 39 +++--------- libethereum/EthereumHost.h | 7 --- libethereum/EthereumPeer.cpp | 118 ++++++++++++++++++++++++++--------- libethereum/EthereumPeer.h | 6 +- libp2p/Session.cpp | 14 ++--- 8 files changed, 141 insertions(+), 84 deletions(-) diff --git a/libethereum/BlockQueue.cpp b/libethereum/BlockQueue.cpp index a845965d5..67e48a59d 100644 --- a/libethereum/BlockQueue.cpp +++ b/libethereum/BlockQueue.cpp @@ -29,7 +29,7 @@ using namespace std; using namespace dev; using namespace dev::eth; -bool BlockQueue::import(bytesConstRef _block, BlockChain const& _bc) +ImportResult BlockQueue::import(bytesConstRef _block, BlockChain const& _bc) { // Check if we already know this block. h256 h = BlockInfo::headerHash(_block); @@ -42,7 +42,7 @@ bool BlockQueue::import(bytesConstRef _block, BlockChain const& _bc) { // Already know about this one. cblockq << "Already known."; - return false; + return ImportResult::AlreadyKnown; } // VERIFY: populates from the block and checks the block is internally coherent. @@ -59,7 +59,7 @@ bool BlockQueue::import(bytesConstRef _block, BlockChain const& _bc) catch (Exception const& _e) { cwarn << "Ignoring malformed block: " << _e.description(); - return false; + return ImportResult::Malformed; } #endif @@ -67,7 +67,7 @@ bool BlockQueue::import(bytesConstRef _block, BlockChain const& _bc) if (_bc.details(h)) { cblockq << "Already known in chain."; - return false; + return ImportResult::AlreadyInChain; } UpgradeGuard ul(l); @@ -77,6 +77,7 @@ bool BlockQueue::import(bytesConstRef _block, BlockChain const& _bc) { m_future.insert(make_pair((unsigned)bi.timestamp, _block.toBytes())); cblockq << "OK - queued for future."; + return ImportResult::FutureTime; } else { @@ -87,6 +88,8 @@ bool BlockQueue::import(bytesConstRef _block, BlockChain const& _bc) cblockq << "OK - queued as unknown parent:" << bi.parentHash.abridged(); m_unknown.insert(make_pair(bi.parentHash, make_pair(h, _block.toBytes()))); m_unknownSet.insert(h); + + return ImportResult::UnknownParent; } else { @@ -96,10 +99,9 @@ bool BlockQueue::import(bytesConstRef _block, BlockChain const& _bc) m_readySet.insert(h); noteReadyWithoutWriteGuard(h); + return ImportResult::Success; } } - - return true; } void BlockQueue::tick(BlockChain const& _bc) diff --git a/libethereum/BlockQueue.h b/libethereum/BlockQueue.h index 20bc4ce59..210b9eeb0 100644 --- a/libethereum/BlockQueue.h +++ b/libethereum/BlockQueue.h @@ -37,6 +37,16 @@ class BlockChain; struct BlockQueueChannel: public LogChannel { static const char* name() { return "[]Q"; } static const int verbosity = 4; }; #define cblockq dev::LogOutputStream() +enum class ImportResult +{ + Success = 0, + UnknownParent, + FutureTime, + AlreadyInChain, + AlreadyKnown, + Malformed +}; + /** * @brief A queue of blocks. Sits between network or other I/O and the BlockChain. * Sorts them ready for blockchain insertion (with the BlockChain::sync() method). @@ -46,7 +56,7 @@ class BlockQueue { public: /// Import a block into the queue. - bool import(bytesConstRef _tx, BlockChain const& _bc); + ImportResult import(bytesConstRef _tx, BlockChain const& _bc); /// Notes that time has moved on and some blocks that used to be "in the future" may no be valid. void tick(BlockChain const& _bc); @@ -67,6 +77,9 @@ public: /// Clear everything. void clear() { WriteGuard l(m_lock); m_readySet.clear(); m_drainingSet.clear(); m_ready.clear(); m_unknownSet.clear(); m_unknown.clear(); m_future.clear(); } + /// Return first block with an unknown parent. + h256 firstUnknown() const { ReadGuard l(m_lock); return m_unknownSet.size() ? *m_unknownSet.begin() : h256(); } + private: void noteReadyWithoutWriteGuard(h256 _b); void notePresentWithoutWriteGuard(bytesConstRef _block); @@ -77,7 +90,7 @@ private: std::vector m_ready; ///< List of blocks, in correct order, ready for chain-import. std::set m_unknownSet; ///< Set of all blocks whose parents are not ready/in-chain. std::multimap> m_unknown; ///< For transactions that have an unknown parent; we map their parent hash to the block stuff, and insert once the block appears. - std::multimap m_future; ///< Set of blocks that are not yet valid. + std::multimap m_future; ///< Set of blocks that are not yet valid. }; } diff --git a/libethereum/CommonNet.h b/libethereum/CommonNet.h index c53b644e4..c7f6929e3 100644 --- a/libethereum/CommonNet.h +++ b/libethereum/CommonNet.h @@ -65,10 +65,16 @@ enum class Grabbing { State, Hashes, - Chain, - ChainHelper, + Blocks, Nothing }; +enum class Syncing +{ + Waiting, + Executing, + Done +}; + } } diff --git a/libethereum/EthereumHost.cpp b/libethereum/EthereumHost.cpp index e7e80c28a..61b0ec70a 100644 --- a/libethereum/EthereumHost.cpp +++ b/libethereum/EthereumHost.cpp @@ -70,9 +70,9 @@ bool EthereumHost::ensureInitialised(TransactionQueue& _tq) return false; } -void EthereumHost::noteHavePeerState(EthereumPeer* _who) +void EthereumHost::notePeerStateChanged(EthereumPeer* _who) { - clog(NetAllDetail) << "Have peer state."; + clog(NetAllDetail) << "Peer state changed."; // TODO: FIX: BUG: Better state management! @@ -80,7 +80,7 @@ void EthereumHost::noteHavePeerState(EthereumPeer* _who) if (m_grabbing != Grabbing::Nothing) { for (auto const& i: peers()) - if (i->cap()->m_grabbing == m_grabbing || m_grabbing == Grabbing::State) + if (i->cap()->m_grabbing == m_grabbing || m_grabbing == Grabbing::Presync) { clog(NetAllDetail) << "Already downloading chain. Just set to help out."; _who->ensureGettingChain(); @@ -205,9 +205,7 @@ void EthereumHost::maintainTransactions(TransactionQueue& _tq, h256 _currentHash // Send any new transactions. for (auto const& p: peers()) - { - auto ep = p->cap(); - if (ep) + if (auto ep = p->cap()) { bytes b; unsigned n = 0; @@ -231,7 +229,6 @@ void EthereumHost::maintainTransactions(TransactionQueue& _tq, h256 _currentHash } ep->m_requireTransactions = false; } - } } void EthereumHost::reset() @@ -249,36 +246,20 @@ void EthereumHost::reset() void EthereumHost::maintainBlocks(BlockQueue& _bq, h256 _currentHash) { - // Import new blocks - { - lock_guard l(m_incomingLock); - for (auto it = m_incomingBlocks.rbegin(); it != m_incomingBlocks.rend(); ++it) - if (_bq.import(&*it, m_chain)) - {} - else{} // TODO: don't forward it. - m_incomingBlocks.clear(); - } - // If we've finished our initial sync send any new blocks. if (m_grabbing == Grabbing::Nothing && m_chain.isKnown(m_latestBlockSent) && m_chain.details(m_latestBlockSent).totalDifficulty < m_chain.details(_currentHash).totalDifficulty) { + // TODO: clean up + h256s hs; + hs.push_back(_currentHash); RLPStream ts; EthereumPeer::prep(ts); bytes bs; - unsigned c = 0; - for (auto h: m_chain.treeRoute(m_latestBlockSent, _currentHash, nullptr, false, true)) - { + for (auto h: hs) bs += m_chain.block(h); - ++c; - } - clog(NetMessageSummary) << "Sending" << c << "new blocks (current is" << _currentHash << ", was" << m_latestBlockSent << ")"; - if (c > 1000) - { - cwarn << "Gaa this would be an awful lot of new blocks. Not bothering"; - return; - } + clog(NetMessageSummary) << "Sending" << hs.size() << "new blocks (current is" << _currentHash << ", was" << m_latestBlockSent << ")"; - ts.appendList(1 + c).append(BlocksPacket).appendRaw(bs, c); + ts.appendList(1 + hs.size()).append(BlocksPacket).appendRaw(bs, hs.size()); bytes b; ts.swapOut(b); seal(b); diff --git a/libethereum/EthereumHost.h b/libethereum/EthereumHost.h index 07ef92513..49b8077a8 100644 --- a/libethereum/EthereumHost.h +++ b/libethereum/EthereumHost.h @@ -85,9 +85,6 @@ private: /// Sync with the BlockChain. It might contain one of our mined blocks, we might have new candidates from the network. void doWork(); - /// Called by peer to add incoming transactions. - void addIncomingTransaction(bytes const& _bytes) { std::lock_guard l(m_incomingLock); m_incomingTransactions.push_back(_bytes); } - void maintainTransactions(TransactionQueue& _tq, h256 _currentBlock); void maintainBlocks(BlockQueue& _bq, h256 _currentBlock); @@ -116,10 +113,6 @@ private: Grabbing m_grabbing = Grabbing::Nothing; // TODO: needs to be thread-safe & switch to just having a peer id. - mutable std::recursive_mutex m_incomingLock; - std::vector m_incomingTransactions; - std::vector m_incomingBlocks; - DownloadMan m_man; h256 m_latestBlockSent; diff --git a/libethereum/EthereumPeer.cpp b/libethereum/EthereumPeer.cpp index b45abfbff..9b8139289 100644 --- a/libethereum/EthereumPeer.cpp +++ b/libethereum/EthereumPeer.cpp @@ -38,7 +38,7 @@ EthereumPeer::EthereumPeer(Session* _s, HostCapabilityFace* _h): Capability(_s, _h), m_sub(host()->m_man) { - setGrabbing(Grabbing::State); + setAsking(Asking::State, Syncing::Done); sendStatus(); } @@ -75,13 +75,19 @@ void EthereumPeer::startInitialSync() sealAndSend(s); } - host()->noteHavePeerState(this); + host()->notePeerStateChanged(this); } void EthereumPeer::tryGrabbingHashChain() { + if (m_asking != Asking::Nothing) + { + clogS(NetAllDetail) << "Can't synced with this peer - outstanding asks."; + return; + } + // if already done this, then ignore. - if (m_grabbing != Grabbing::State) + if (m_syncing == Syncing::Done) { clogS(NetAllDetail) << "Already synced with this peer."; return; @@ -95,7 +101,7 @@ void EthereumPeer::tryGrabbingHashChain() if (td >= m_totalDifficulty) { clogS(NetAllDetail) << "No. Our chain is better."; - setGrabbing(Grabbing::Nothing); + setAsking(Asking::Nothing, Syncing::Done); return; // All good - we have the better chain. } @@ -103,8 +109,8 @@ void EthereumPeer::tryGrabbingHashChain() { clogS(NetAllDetail) << "Yes. Their chain is better."; - host()->updateGrabbing(Grabbing::Hashes); - setGrabbing(Grabbing::Hashes); + host()->updateGrabbing(Asking::Hashes); + setAsking(Asking::Hashes, Syncing::Executing); RLPStream s; prep(s).appendList(3); s << GetBlockHashesPacket << m_latestHash << c_maxHashesAsk; @@ -118,10 +124,10 @@ void EthereumPeer::giveUpOnFetch() clogS(NetNote) << "Finishing fetch..."; // a bit overkill given that the other nodes may yet have the needed blocks, but better to be safe than sorry. - if (m_grabbing == Grabbing::Chain || m_grabbing == Grabbing::ChainHelper) + if (m_asking == Asking::Blocks || m_asking == Asking::ChainHelper) { host()->noteDoneBlocks(this); - setGrabbing(Grabbing::Nothing); + setAsking(Asking::Nothing); } // NOTE: need to notify of giving up on chain-hashes, too, altering state as necessary. @@ -163,12 +169,11 @@ bool EthereumPeer::interpret(RLP const& _r) { clogS(NetMessageSummary) << "Transactions (" << dec << (_r.itemCount() - 1) << "entries)"; addRating(_r.itemCount() - 1); - lock_guard l(host()->m_incomingLock); + RecursiveGuard l(m_incomingLock); + Guard l(x_knownTransactions); for (unsigned i = 1; i < _r.itemCount(); ++i) { - host()->addIncomingTransaction(_r[i].data().toBytes()); - - lock_guard l(x_knownTransactions); + m_incomingTransactions.push_back(_r[i].data().toBytes()); m_knownTransactions.insert(sha3(_r[i].data())); } break; @@ -193,7 +198,7 @@ bool EthereumPeer::interpret(RLP const& _r) { clogS(NetMessageSummary) << "BlockHashes (" << dec << (_r.itemCount() - 1) << "entries)" << (_r.itemCount() - 1 ? "" : ": NoMoreHashes"); - if (m_grabbing != Grabbing::Hashes) + if (m_asking != Asking::Hashes) { cwarn << "Peer giving us hashes when we didn't ask for them."; break; @@ -247,21 +252,62 @@ bool EthereumPeer::interpret(RLP const& _r) if (_r.itemCount() == 1) { // Couldn't get any from last batch - probably got to this peer's latest block - just give up. - if (m_grabbing == Grabbing::Chain || m_grabbing == Grabbing::ChainHelper) + if (m_asking == Asking::Blocks || m_asking == Asking::ChainHelper) giveUpOnFetch(); break; } - unsigned used = 0; + unsigned success = 0; + unsigned got = 0; + unsigned bad = 0; + unsigned unknown = 0; + unsigned future = 0; + for (unsigned i = 1; i < _r.itemCount(); ++i) { auto h = BlockInfo::headerHash(_r[i].data()); m_sub.noteBlock(h); - if (host()->noteBlock(h, _r[i].data())) - used++; - Guard l(x_knownBlocks); - m_knownBlocks.insert(h); + + { + Guard l(x_knownBlocks); + m_knownBlocks.insert(h); + } + + switch (host()->m_bq.import(_r[i].data(), host()->m_chain)) + { + case ImportResult::Success: + success++; + break; + + case ImportResult::Malformed: + bad++; + break; + + case ImportResult::FutureTime: + future++; + break; + + case ImportResult::AlreadyInChain: + case ImportResult::AlreadyKnown: + got++; + break; + + case ImportResult::UnknownParent: + unknown++; + break; + } } + + if (unknown && m_asking == Asking::Nothing) + { + // TODO: kick off resync. + } + + if (bad) + { + // TODO: punish peer + } + addRating(used); unsigned knownParents = 0; unsigned unknownParents = 0; @@ -286,7 +332,7 @@ bool EthereumPeer::interpret(RLP const& _r) } } clogS(NetMessageSummary) << dec << knownParents << "known parents," << unknownParents << "unknown," << used << "used."; - if (m_grabbing == Grabbing::Chain || m_grabbing == Grabbing::ChainHelper) + if (m_asking == Asking::Blocks || m_asking == Asking::ChainHelper) continueGettingChain(); break; } @@ -298,20 +344,18 @@ bool EthereumPeer::interpret(RLP const& _r) void EthereumPeer::ensureGettingChain() { - if (m_grabbing == Grabbing::ChainHelper) + if (m_helping) return; // Already asked & waiting for some. - // Switch to ChainHelper otherwise, unless we're already the Chain grabber. - if (m_grabbing != Grabbing::Chain) - setGrabbing(Grabbing::ChainHelper); - + // Help otherwise, unless we're already the Chain grabber. + setHelping(true); continueGettingChain(); } void EthereumPeer::continueGettingChain() { // If we're getting the hashes already, then we shouldn't be asking for the chain. - if (m_grabbing == Grabbing::Hashes) + if (m_asking == Asking::Hashes) return; auto blocks = m_sub.nextFetch(c_maxBlocksAsk); @@ -329,8 +373,24 @@ void EthereumPeer::continueGettingChain() giveUpOnFetch(); } -void EthereumPeer::setGrabbing(Grabbing _g) +/* + * Possible asking/syncing states for two peers: + * state/ presync + * presync hashes + * presync chain (transiently) + * presync+ chain + * presync nothing + * hashes nothing + * chain hashes + * presync chain (transiently) + * presync+ chain + * presync nothing + */ + +void EthereumPeer::setAsking(Asking _a, Syncing _s) { - m_grabbing = _g; - session()->addNote("grab", _g == Grabbing::Nothing ? "nothing" : _g == Grabbing::State ? "state" : _g == Grabbing::Hashes ? "hashes" : _g == Grabbing::Chain ? "chain" : _g == Grabbing::ChainHelper ? "chainhelper" : "?"); + m_asking = _a; + m_syncing = _s; + session()->addNote("ask", _a == Asking::Nothing ? "nothing" : _a == Asking::State ? "state" : _a == Asking::Hashes ? "hashes" : _a == Asking::Blocks ? "blocks" : "?"); + session()->addNote("sync", _s == Syncing::Done ? "done" : _s == Syncing::Waiting ? "wait" : _s == Syncing::Executing ? "exec" : "?"); } diff --git a/libethereum/EthereumPeer.h b/libethereum/EthereumPeer.h index 052af3c7e..da020f503 100644 --- a/libethereum/EthereumPeer.h +++ b/libethereum/EthereumPeer.h @@ -71,12 +71,14 @@ private: void giveUpOnFetch(); void clearKnownTransactions() { std::lock_guard l(x_knownTransactions); m_knownTransactions.clear(); } - void setGrabbing(Grabbing _g); + void setAsking(Asking _g, bool _helping = false); + void setHelping(bool _helping = false) { setAsking(m_asking, _helping); } unsigned m_protocolVersion; u256 m_networkId; - Grabbing m_grabbing; + Asking m_asking; + Syncing m_syncing; h256 m_latestHash; ///< Peer's latest block's hash. u256 m_totalDifficulty; ///< Peer's latest block's total difficulty. diff --git a/libp2p/Session.cpp b/libp2p/Session.cpp index ea8db2127..84a33db65 100644 --- a/libp2p/Session.cpp +++ b/libp2p/Session.cpp @@ -87,7 +87,7 @@ bool Session::interpret(RLP const& _r) if (m_server->havePeer(m_id)) { // Already connected. - cwarn << "Already have peer id" << m_id.abridged();// << "at" << l->endpoint() << "rather than" << endpoint(); + clogS(NetWarn) << "Already have peer id" << m_id.abridged();// << "at" << l->endpoint() << "rather than" << endpoint(); disconnect(DuplicatePeer); return false; } @@ -240,7 +240,7 @@ void Session::sendDestroy(bytes& _msg) if (!checkPacket(bytesConstRef(&_msg))) { - cwarn << "INVALID PACKET CONSTRUCTED!"; + clogS(NetWarn) << "INVALID PACKET CONSTRUCTED!"; } bytes buffer = bytes(std::move(_msg)); @@ -253,7 +253,7 @@ void Session::send(bytesConstRef _msg) if (!checkPacket(_msg)) { - cwarn << "INVALID PACKET CONSTRUCTED!"; + clogS(NetWarn) << "INVALID PACKET CONSTRUCTED!"; } bytes buffer = bytes(_msg.toBytes()); @@ -288,7 +288,7 @@ void Session::write() // must check queue, as write callback can occur following dropped() if (ec) { - cwarn << "Error sending: " << ec.message(); + clogS(NetWarn) << "Error sending: " << ec.message(); dropped(); return; } @@ -363,7 +363,7 @@ void Session::doRead() if (ec && ec.category() != boost::asio::error::get_misc_category() && ec.value() != boost::asio::error::eof) { // got here with length of 1241... - cwarn << "Error reading: " << ec.message(); + clogS(NetWarn) << "Error reading: " << ec.message(); dropped(); } else if (ec && length == 0) @@ -380,7 +380,7 @@ void Session::doRead() { if (m_incoming[0] != 0x22 || m_incoming[1] != 0x40 || m_incoming[2] != 0x08 || m_incoming[3] != 0x91) { - cwarn << "INVALID SYNCHRONISATION TOKEN; expected = 22400891; received = " << toHex(bytesConstRef(m_incoming.data(), 4)); + clogS(NetWarn) << "INVALID SYNCHRONISATION TOKEN; expected = 22400891; received = " << toHex(bytesConstRef(m_incoming.data(), 4)); disconnect(BadProtocol); return; } @@ -396,7 +396,7 @@ void Session::doRead() if (!checkPacket(data)) { cerr << "Received " << len << ": " << toHex(bytesConstRef(m_incoming.data() + 8, len)) << endl; - cwarn << "INVALID MESSAGE RECEIVED"; + clogS(NetWarn) << "INVALID MESSAGE RECEIVED"; disconnect(BadProtocol); return; }