From 8be99bd36c9ed8686e46f3533b8ea42766309201 Mon Sep 17 00:00:00 2001 From: Gav Wood Date: Sat, 15 Feb 2014 21:16:51 +0000 Subject: [PATCH] Less hangy. --- TODO | 34 +++-- libethereum/BlockChain.cpp | 45 +++++-- libethereum/BlockChain.h | 11 +- libethereum/Client.cpp | 24 ++-- libethereum/PeerNetwork.cpp | 262 +++++++++++++++++------------------- libethereum/PeerNetwork.h | 9 +- test/peer.cpp | 2 +- 7 files changed, 201 insertions(+), 186 deletions(-) diff --git a/TODO b/TODO index 67db641b2..31047dbc8 100644 --- a/TODO +++ b/TODO @@ -7,12 +7,20 @@ Crypto stuff: - kFromMessage - Check all the tweak instructions. -Better handling of corrupt blocks. -- Kill DB & restart. - Network: -- Crypto on network. TLS? +- *** Exponential backoff on bad connection. +- *** Randomly select peers from incoming peers. +- NotInChain will be very bad for new peers - it'll run through until the genesis. + - Check how many it has first. +- Crypto on network - use id as public key? - Make work with IPv6 +- Peers rated. + - Useful/useless - new blocks/transactions or useful peers? + - Solid communications? +- Strategy for peer suggestion? + + + CLI client - Implement CLI option "--help". @@ -23,22 +31,24 @@ General: - Move over to new system. - Remove block chain on protocol change (i.e. store protocol with block chain). - Robustness - Remove aborts - Recover from all exceptions. +- Store version alongside BC DB. +- Better handling of corrupt blocks. + - Kill DB & restart. -### Gav +GUI +- Make address/block chain list model-based, JIT populated. +- Make everything else model-based +- Qt/QML class. -For PoC2: +For PoC3: +- Shared contract acceptence tests. - Use mining state for nonce. - -Network: -- NotInChain will be very bad for new peers - it'll run through until the genesis. - - Check how many it has first. - BUG: need to discard transactions if nonce too old, even when not mining. + ### Marko Ubuntu builds diff --git a/libethereum/BlockChain.cpp b/libethereum/BlockChain.cpp index 146d09f0a..54fb52a06 100644 --- a/libethereum/BlockChain.cpp +++ b/libethereum/BlockChain.cpp @@ -168,19 +168,21 @@ void BlockChain::import(bytes const& _block, Overlay const& _db) auto tdIncrease = s.playback(&_block, bi, biParent, biGrandParent, true); td = pd.totalDifficulty + tdIncrease; -#if !NDEBUG +#if PARANOIA checkConsistency(); #endif // All ok - insert into DB - m_details[newHash] = BlockDetails((uint)pd.number + 1, td, bi.parentHash, {}); - m_detailsDB->Put(m_writeOptions, ldb::Slice((char const*)&newHash, 32), (ldb::Slice)eth::ref(m_details[newHash].rlp())); + { + lock_guard l(m_lock); + m_details[newHash] = BlockDetails((uint)pd.number + 1, td, bi.parentHash, {}); + m_details[bi.parentHash].children.push_back(newHash); + } - m_details[bi.parentHash].children.push_back(newHash); + m_detailsDB->Put(m_writeOptions, ldb::Slice((char const*)&newHash, 32), (ldb::Slice)eth::ref(m_details[newHash].rlp())); m_detailsDB->Put(m_writeOptions, ldb::Slice((char const*)&bi.parentHash, 32), (ldb::Slice)eth::ref(m_details[bi.parentHash].rlp())); - m_db->Put(m_writeOptions, ldb::Slice((char const*)&newHash, 32), (ldb::Slice)ref(_block)); -#if !NDEBUG +#if PARANOIA checkConsistency(); #endif } @@ -193,7 +195,7 @@ void BlockChain::import(bytes const& _block, Overlay const& _db) // cnote << "Parent " << bi.parentHash << " has " << details(bi.parentHash).children.size() << " children."; // This might be the new last block... - if (td > m_details[m_lastBlockHash].totalDifficulty) + if (td > details(m_lastBlockHash).totalDifficulty) { m_lastBlockHash = newHash; m_detailsDB->Put(m_writeOptions, ldb::Slice("best"), ldb::Slice((char const*)&newHash, 32)); @@ -201,7 +203,7 @@ void BlockChain::import(bytes const& _block, Overlay const& _db) } else { - clog(BlockChainNote) << " Imported but not best (oTD:" << m_details[m_lastBlockHash].totalDifficulty << ", TD:" << td << ")"; + clog(BlockChainNote) << " Imported but not best (oTD:" << details(m_lastBlockHash).totalDifficulty << ", TD:" << td << ")"; } } @@ -230,14 +232,26 @@ bytesConstRef BlockChain::block(h256 _hash) const if (_hash == m_genesisHash) return &m_genesisBlock; - m_db->Get(m_readOptions, ldb::Slice((char const*)&_hash, 32), &m_cache[_hash]); - return bytesConstRef(&m_cache[_hash]); + string d; + m_db->Get(m_readOptions, ldb::Slice((char const*)&_hash, 32), &d); + + { + lock_guard l(m_lock); + swap(m_cache[_hash], d); + return bytesConstRef(&m_cache[_hash]); + } } BlockDetails const& BlockChain::details(h256 _h) const { - auto it = m_details.find(_h); - if (it == m_details.end()) + std::map::const_iterator it; + bool fetchRequired; + { + lock_guard l(m_lock); + it = m_details.find(_h); + fetchRequired = (it == m_details.end()); + } + if (fetchRequired) { std::string s; m_detailsDB->Get(m_readOptions, ldb::Slice((char const*)&_h, 32), &s); @@ -246,8 +260,11 @@ BlockDetails const& BlockChain::details(h256 _h) const // cout << "Not found in DB: " << _h << endl; return NullBlockDetails; } - bool ok; - tie(it, ok) = m_details.insert(make_pair(_h, BlockDetails(RLP(s)))); + { + lock_guard l(m_lock); + bool ok; + tie(it, ok) = m_details.insert(make_pair(_h, BlockDetails(RLP(s)))); + } } return it->second; } diff --git a/libethereum/BlockChain.h b/libethereum/BlockChain.h index f3e7a6693..75ae2a741 100644 --- a/libethereum/BlockChain.h +++ b/libethereum/BlockChain.h @@ -21,6 +21,7 @@ #pragma once +#include #include "Common.h" #include "AddressState.h" namespace ldb = leveldb; @@ -82,17 +83,14 @@ public: BlockDetails const& details(h256 _hash) const; BlockDetails const& details() const { return details(currentHash()); } - /// Get a given block (RLP format). + /// Get a given block (RLP format). Thread-safe. bytesConstRef block(h256 _hash) const; bytesConstRef block() const { return block(currentHash()); } - /// Get a given block (RLP format). + /// Get a given block (RLP format). Thread-safe. h256 currentHash() const { return m_lastBlockHash; } - /// Get the coinbase address of a given block. - Address coinbaseAddress(h256 _hash) const; - Address coinbaseAddress() const { return coinbaseAddress(currentHash()); } - + /// Get the hash of the genesis block. h256 genesisHash() const { return m_genesisHash; } std::vector> interestQueue() { std::vector> ret; swap(ret, m_interestQueue); return ret; } @@ -105,6 +103,7 @@ private: /// Get fully populated from disk DB. mutable std::map m_details; mutable std::map m_cache; + mutable std::mutex m_lock; /// The queue of transactions that have happened that we're interested in. std::map m_interest; diff --git a/libethereum/Client.cpp b/libethereum/Client.cpp index a3ef271e6..162919ba3 100644 --- a/libethereum/Client.cpp +++ b/libethereum/Client.cpp @@ -120,15 +120,19 @@ void Client::transact(Secret _secret, Address _dest, u256 _amount, u256s _data) void Client::work() { - m_lock.lock(); bool changed = false; // Process network events. // Synchronise block chain with network. // Will broadcast any of our (new) transactions and blocks, and collect & add any of their (new) transactions and blocks. if (m_net) - if (m_net->process(m_bc, m_tq, m_stateDB)) + { + m_net->process(); + + lock_guard l(m_lock); + if (m_net->sync(m_bc, m_tq, m_stateDB)) changed = true; + } // Synchronise state to block chain. // This should remove any transactions on our queue that are included within our state. @@ -136,18 +140,21 @@ void Client::work() // This might mean reverting to an earlier state and replaying some blocks, or, (worst-case: // if there are no checkpoints before our fork) reverting to the genesis block and replaying // all blocks. - // Resynchronise state with block chain & trans - if (m_s.sync(m_bc)) + // Resynchronise state with block chain & trans { - changed = true; - m_mined = m_s; + lock_guard l(m_lock); + if (m_s.sync(m_bc)) + { + changed = true; + m_mined = m_s; + } } - m_lock.unlock(); if (m_doMine) { if (m_miningStarted) { + lock_guard l(m_lock); m_mined = m_s; m_mined.sync(m_tq); m_mined.commitToMine(m_bc); @@ -164,10 +171,9 @@ void Client::work() if (mineInfo.completed) { // Import block. - m_lock.lock(); + lock_guard l(m_lock); m_bc.attemptImport(m_mined.blockData(), m_stateDB); m_mineProgress.best = 0; - m_lock.unlock(); m_changed = true; m_miningStarted = true; // need to re-commit to mine. } diff --git a/libethereum/PeerNetwork.cpp b/libethereum/PeerNetwork.cpp index 66d74a8df..934b821e4 100644 --- a/libethereum/PeerNetwork.cpp +++ b/libethereum/PeerNetwork.cpp @@ -791,33 +791,25 @@ void PeerServer::connect(bi::tcp::endpoint const& _ep) }); } -bool PeerServer::process(BlockChain& _bc) +bool PeerServer::sync() { bool ret = false; - m_ioService.poll(); - - auto n = chrono::steady_clock::now(); - bool fullProcess = (n > m_lastFullProcess + chrono::seconds(1)); - if (fullProcess) - m_lastFullProcess = n; - - if (fullProcess) - for (auto i = m_peers.begin(); i != m_peers.end();) + for (auto i = m_peers.begin(); i != m_peers.end();) + { + auto p = i->second.lock(); + if (p && p->m_socket.is_open() && + (p->m_disconnect == chrono::steady_clock::time_point::max() || chrono::steady_clock::now() - p->m_disconnect < chrono::seconds(1))) // kill old peers that should be disconnected. + ++i; + else { - auto p = i->second.lock(); - if (p && p->m_socket.is_open() && - (p->m_disconnect == chrono::steady_clock::time_point::max() || chrono::steady_clock::now() - p->m_disconnect < chrono::seconds(1))) // kill old peers that should be disconnected. - ++i; - else - { - i = m_peers.erase(i); - ret = true; - } + i = m_peers.erase(i); + ret = true; } + } return ret; } -bool PeerServer::process(BlockChain& _bc, TransactionQueue& _tq, Overlay& _o) +bool PeerServer::sync(BlockChain& _bc, TransactionQueue& _tq, Overlay& _o) { bool ret = false; @@ -830,14 +822,10 @@ bool PeerServer::process(BlockChain& _bc, TransactionQueue& _tq, Overlay& _o) for (auto const& i: _tq.transactions()) m_transactionsSent.insert(i.first); m_lastPeersRequest = chrono::steady_clock::time_point::min(); - m_lastFullProcess = chrono::steady_clock::time_point::min(); ret = true; } - auto n = chrono::steady_clock::now(); - bool fullProcess = (n > m_lastFullProcess + chrono::seconds(1)); - - if (process(_bc)) + if (sync()) ret = true; if (m_mode == NodeMode::Full) @@ -850,118 +838,115 @@ bool PeerServer::process(BlockChain& _bc, TransactionQueue& _tq, Overlay& _o) m_incomingTransactions.clear(); // Send any new transactions. - if (fullProcess) + for (auto j: m_peers) + if (auto p = j.second.lock()) + { + bytes b; + uint n = 0; + for (auto const& i: _tq.transactions()) + if ((!m_transactionsSent.count(i.first) && !p->m_knownTransactions.count(i.first)) || p->m_requireTransactions) + { + b += i.second; + ++n; + m_transactionsSent.insert(i.first); + } + if (n) + { + RLPStream ts; + PeerSession::prep(ts); + ts.appendList(n + 1) << TransactionsPacket; + ts.appendRaw(b, n).swapOut(b); + seal(b); + p->send(&b); + } + p->m_knownTransactions.clear(); + p->m_requireTransactions = false; + } + + // Send any new blocks. + auto h = _bc.currentHash(); + if (h != m_latestBlockSent) { + // TODO: find where they diverge and send complete new branch. + RLPStream ts; + PeerSession::prep(ts); + ts.appendList(2) << BlocksPacket; + bytes b; + ts.appendRaw(_bc.block(_bc.currentHash())).swapOut(b); + seal(b); for (auto j: m_peers) if (auto p = j.second.lock()) { - bytes b; - uint n = 0; - for (auto const& i: _tq.transactions()) - if ((!m_transactionsSent.count(i.first) && !p->m_knownTransactions.count(i.first)) || p->m_requireTransactions) - { - b += i.second; - ++n; - m_transactionsSent.insert(i.first); - } - if (n) - { - RLPStream ts; - PeerSession::prep(ts); - ts.appendList(n + 1) << TransactionsPacket; - ts.appendRaw(b, n).swapOut(b); - seal(b); + if (!p->m_knownBlocks.count(_bc.currentHash())) p->send(&b); - } - p->m_knownTransactions.clear(); - p->m_requireTransactions = false; + p->m_knownBlocks.clear(); } + } + m_latestBlockSent = h; - // Send any new blocks. - auto h = _bc.currentHash(); - if (h != m_latestBlockSent) - { - // TODO: find where they diverge and send complete new branch. - RLPStream ts; - PeerSession::prep(ts); - ts.appendList(2) << BlocksPacket; - bytes b; - ts.appendRaw(_bc.block(_bc.currentHash())).swapOut(b); - seal(b); - for (auto j: m_peers) - if (auto p = j.second.lock()) + for (int accepted = 1, n = 0; accepted; ++n) + { + accepted = 0; + + if (m_incomingBlocks.size()) + for (auto it = prev(m_incomingBlocks.end());; --it) + { + try { - if (!p->m_knownBlocks.count(_bc.currentHash())) - p->send(&b); - p->m_knownBlocks.clear(); + _bc.import(*it, _o); + it = m_incomingBlocks.erase(it); + ++accepted; + ret = true; } - } - m_latestBlockSent = h; - - for (int accepted = 1, n = 0; accepted; ++n) - { - accepted = 0; - - if (m_incomingBlocks.size()) - for (auto it = prev(m_incomingBlocks.end());; --it) + catch (UnknownParent) { - try - { - _bc.import(*it, _o); - it = m_incomingBlocks.erase(it); - ++accepted; - ret = true; - } - catch (UnknownParent) - { - // Don't (yet) know its parent. Leave it for later. - m_unknownParentBlocks.push_back(*it); - it = m_incomingBlocks.erase(it); - } - catch (...) - { - // Some other error - erase it. - it = m_incomingBlocks.erase(it); - } - - if (it == m_incomingBlocks.begin()) - break; + // Don't (yet) know its parent. Leave it for later. + m_unknownParentBlocks.push_back(*it); + it = m_incomingBlocks.erase(it); } - if (!n && accepted) - { - for (auto i: m_unknownParentBlocks) - m_incomingBlocks.push_back(i); - m_unknownParentBlocks.clear(); + catch (...) + { + // Some other error - erase it. + it = m_incomingBlocks.erase(it); + } + + if (it == m_incomingBlocks.begin()) + break; } + if (!n && accepted) + { + for (auto i: m_unknownParentBlocks) + m_incomingBlocks.push_back(i); + m_unknownParentBlocks.clear(); } + } - // Connect to additional peers - while (m_peers.size() < m_idealPeerCount) + // Connect to additional peers + while (m_peers.size() < m_idealPeerCount) + { + if (m_incomingPeers.empty()) { - if (m_incomingPeers.empty()) + if (chrono::steady_clock::now() > m_lastPeersRequest + chrono::seconds(10)) { - if (chrono::steady_clock::now() > m_lastPeersRequest + chrono::seconds(10)) - { - RLPStream s; - bytes b; - (PeerSession::prep(s).appendList(1) << GetPeersPacket).swapOut(b); - seal(b); - for (auto const& i: m_peers) - if (auto p = i.second.lock()) - if (p->isOpen()) - p->send(&b); - m_lastPeersRequest = chrono::steady_clock::now(); - } + RLPStream s; + bytes b; + (PeerSession::prep(s).appendList(1) << GetPeersPacket).swapOut(b); + seal(b); + for (auto const& i: m_peers) + if (auto p = i.second.lock()) + if (p->isOpen()) + p->send(&b); + m_lastPeersRequest = chrono::steady_clock::now(); + } - if (!m_accepting) - ensureAccepting(); + if (!m_accepting) + ensureAccepting(); - break; - } - connect(m_incomingPeers.begin()->second); - m_incomingPeers.erase(m_incomingPeers.begin()); + break; } + connect(m_incomingPeers.begin()->second); + m_incomingPeers.erase(m_incomingPeers.begin()); } } @@ -969,29 +954,26 @@ bool PeerServer::process(BlockChain& _bc, TransactionQueue& _tq, Overlay& _o) // restricts your freedom but does so fairly. and that's the value proposition. // guarantees that everyone else respect the rules of the system. (i.e. obeys laws). - if (fullProcess) - { - // We'll keep at most twice as many as is ideal, halfing what counts as "too young to kill" until we get there. - for (uint old = 15000; m_peers.size() > m_idealPeerCount * 2 && old > 100; old /= 2) - while (m_peers.size() > m_idealPeerCount) - { - // look for worst peer to kick off - // first work out how many are old enough to kick off. - shared_ptr worst; - unsigned agedPeers = 0; - for (auto i: m_peers) - if (auto p = i.second.lock()) - if ((m_mode != NodeMode::PeerServer || p->m_caps != 0x01) && chrono::steady_clock::now() > p->m_connect + chrono::milliseconds(old)) // don't throw off new peers; peer-servers should never kick off other peer-servers. - { - ++agedPeers; - if ((!worst || p->m_rating < worst->m_rating || (p->m_rating == worst->m_rating && p->m_connect > worst->m_connect))) // kill older ones - worst = p; - } - if (!worst || agedPeers <= m_idealPeerCount) - break; - worst->disconnect(TooManyPeers); - } - } + // We'll keep at most twice as many as is ideal, halfing what counts as "too young to kill" until we get there. + for (uint old = 15000; m_peers.size() > m_idealPeerCount * 2 && old > 100; old /= 2) + while (m_peers.size() > m_idealPeerCount) + { + // look for worst peer to kick off + // first work out how many are old enough to kick off. + shared_ptr worst; + unsigned agedPeers = 0; + for (auto i: m_peers) + if (auto p = i.second.lock()) + if ((m_mode != NodeMode::PeerServer || p->m_caps != 0x01) && chrono::steady_clock::now() > p->m_connect + chrono::milliseconds(old)) // don't throw off new peers; peer-servers should never kick off other peer-servers. + { + ++agedPeers; + if ((!worst || p->m_rating < worst->m_rating || (p->m_rating == worst->m_rating && p->m_connect > worst->m_connect))) // kill older ones + worst = p; + } + if (!worst || agedPeers <= m_idealPeerCount) + break; + worst->disconnect(TooManyPeers); + } return ret; } diff --git a/libethereum/PeerNetwork.h b/libethereum/PeerNetwork.h index 9553120fa..f9ef49a40 100644 --- a/libethereum/PeerNetwork.h +++ b/libethereum/PeerNetwork.h @@ -162,10 +162,13 @@ public: void connect(bi::tcp::endpoint const& _ep); /// Sync with the BlockChain. It might contain one of our mined blocks, we might have new candidates from the network. + bool sync(BlockChain& _bc, TransactionQueue&, Overlay& _o); + bool sync(); + /// Conduct I/O, polling, syncing, whatever. /// Ideally all time-consuming I/O is done in a background thread or otherwise asynchronously, but you get this call every 100ms or so anyway. - bool process(BlockChain& _bc, TransactionQueue&, Overlay& _o); - bool process(BlockChain& _bc); + /// This won't touch alter the blockchain. + void process() { m_ioService.poll(); } /// Set ideal number of peers. void setIdealPeerCount(unsigned _n) { m_idealPeerCount = _n; } @@ -222,8 +225,6 @@ private: std::chrono::steady_clock::time_point m_lastPeersRequest; unsigned m_idealPeerCount = 5; - std::chrono::steady_clock::time_point m_lastFullProcess; - std::vector m_addresses; std::vector m_peerAddresses; diff --git a/test/peer.cpp b/test/peer.cpp index c2d1de17c..879c32b06 100644 --- a/test/peer.cpp +++ b/test/peer.cpp @@ -56,7 +56,7 @@ int peerTest(int argc, char** argv) for (int i = 0; ; ++i) { this_thread::sleep_for(chrono::milliseconds(100)); - pn.process(ch); + pn.sync(); if (!(i % 10)) pn.pingAll(); }