From 268ca545ac51e680e0d9ecc9ad8a42c4beb15a3f Mon Sep 17 00:00:00 2001 From: Gav Wood Date: Fri, 25 Jul 2014 20:37:55 +0200 Subject: [PATCH] Network a lot smoother. --- libethereum/BlockChain.cpp | 32 +++- libethereum/BlockChain.h | 9 +- libethereum/BlockQueue.cpp | 102 ++++++++++++ libethereum/BlockQueue.h | 66 ++++++++ libethereum/Client.cpp | 75 +++++---- libethereum/Client.h | 10 +- libethereum/Guards.h | 3 + libethereum/PeerServer.cpp | 275 +++++++++++++++++-------------- libethereum/PeerServer.h | 34 +++- libethereum/PeerSession.cpp | 36 ++-- libethereum/TransactionQueue.cpp | 35 ++-- libethereum/TransactionQueue.h | 15 +- 12 files changed, 478 insertions(+), 214 deletions(-) create mode 100644 libethereum/BlockQueue.cpp create mode 100644 libethereum/BlockQueue.h diff --git a/libethereum/BlockChain.cpp b/libethereum/BlockChain.cpp index 742818118..f26304db2 100644 --- a/libethereum/BlockChain.cpp +++ b/libethereum/BlockChain.cpp @@ -163,20 +163,40 @@ bool contains(T const& _t, V const& _v) return false; } -h256s BlockChain::attemptImport(bytes const& _block, OverlayDB const& _stateDB) +h256s BlockChain::sync(BlockQueue& _bq, OverlayDB const& _stateDB, unsigned _max) +{ + vector blocks; + _bq.drain(blocks); + + h256s ret; + for (auto const& block: blocks) + try + { + for (auto h: import(block, _stateDB)) + if (!_max--) + break; + else + ret.push_back(h); + } + catch (UnknownParent) + { + cwarn << "Unknown parent of block!!!" << eth::sha3(block).abridged(); + _bq.import(&block, *this); + } + catch (...){} + return ret; +} + +h256s BlockChain::attemptImport(bytes const& _block, OverlayDB const& _stateDB) noexcept { -#if ETH_CATCH try -#endif { return import(_block, _stateDB); } -#if ETH_CATCH catch (...) { return h256s(); } -#endif } h256s BlockChain::import(bytes const& _block, OverlayDB const& _db) @@ -220,7 +240,7 @@ h256s BlockChain::import(bytes const& _block, OverlayDB const& _db) if (bi.timestamp > (u256)time(0)) { clog(BlockChainNote) << newHash << ": Future time " << bi.timestamp << " (now at " << time(0) << ")"; - // We don't know the parent (yet) - discard for now. It'll get resent to us if we find out about its ancestry later on. + // Block has a timestamp in the future. This is no good. throw FutureTime(); } diff --git a/libethereum/BlockChain.h b/libethereum/BlockChain.h index 052471537..1c4e5e8fd 100644 --- a/libethereum/BlockChain.h +++ b/libethereum/BlockChain.h @@ -28,6 +28,7 @@ #include "Guards.h" #include "BlockDetails.h" #include "AddressState.h" +#include "BlockQueue.h" namespace ldb = leveldb; namespace eth @@ -66,8 +67,12 @@ public: /// To be called from main loop every 100ms or so. void process(); - /// Attempt to import the given block. - h256s attemptImport(bytes const& _block, OverlayDB const& _stateDB); + /// Sync the chain with any incoming blocks. All blocks should, if processed in order + h256s sync(BlockQueue& _bq, OverlayDB const& _stateDB, unsigned _max); + + /// Attempt to import the given block directly into the BlockChain and sync with the state DB. + /// @returns the block hashes of any blocks that came into/went out of the canonical block chain. + h256s attemptImport(bytes const& _block, OverlayDB const& _stateDB) noexcept; /// Import block into disk-backed DB /// @returns the block hashes of any blocks that came into/went out of the canonical block chain. diff --git a/libethereum/BlockQueue.cpp b/libethereum/BlockQueue.cpp new file mode 100644 index 000000000..b688ae186 --- /dev/null +++ b/libethereum/BlockQueue.cpp @@ -0,0 +1,102 @@ +/* + This file is part of cpp-ethereum. + + cpp-ethereum is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + cpp-ethereum is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with cpp-ethereum. If not, see . +*/ +/** @file BlockQueue.cpp + * @author Gav Wood + * @date 2014 + */ + +#include "BlockQueue.h" + +#include +#include +#include +#include "BlockChain.h" +using namespace std; +using namespace eth; + +bool BlockQueue::import(bytesConstRef _block, BlockChain const& _bc) +{ + // Check if we already know this block. + h256 h = sha3(_block); + + UpgradableGuard l(m_lock); + if (m_readySet.count(h) || m_futureSet.count(h)) + // Already know about this one. + return false; + + // VERIFY: populates from the block and checks the block is internally coherent. + BlockInfo bi; + +#if ETH_CATCH + try +#endif + { + bi.populate(_block); + bi.verifyInternals(_block); + } +#if ETH_CATCH + catch (Exception const& _e) + { + cwarn << "Ignoring malformed block: " << _e.description(); + return false; + } +#endif + auto newHash = eth::sha3(_block); + + // Check block doesn't already exist first! + if (_bc.details(newHash)) + return false; + + // Check it's not crazy + if (bi.timestamp > (u256)time(0)) + return false; + + UpgradeGuard ul(l); + + // We now know it. + if (!m_readySet.count(bi.parentHash) && !_bc.details(bi.parentHash)) + { + // We don't know the parent (yet) - queue it up for later. It'll get resent to us if we find out about its ancestry later on. + m_future.insert(make_pair(bi.parentHash, make_pair(h, _block.toBytes()))); + m_futureSet.insert(h); + return true; + } + + // If valid, append to blocks. + m_ready.push_back(_block.toBytes()); + m_readySet.insert(h); + + noteReadyWithoutWriteGuard(h); + + return true; +} + +void BlockQueue::noteReadyWithoutWriteGuard(h256 _b) +{ + auto r = m_future.equal_range(_b); + h256s good; + for (auto it = r.first; it != r.second; ++it) + { + m_futureSet.erase(it->second.first); + m_ready.push_back(it->second.second); + m_readySet.erase(it->second.first); + good.push_back(it->second.first); + } + m_future.erase(r.first, r.second); + for (auto g: good) + noteReadyWithoutWriteGuard(g); +} diff --git a/libethereum/BlockQueue.h b/libethereum/BlockQueue.h new file mode 100644 index 000000000..7ec68215a --- /dev/null +++ b/libethereum/BlockQueue.h @@ -0,0 +1,66 @@ +/* + This file is part of cpp-ethereum. + + cpp-ethereum is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + cpp-ethereum is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with cpp-ethereum. If not, see . +*/ +/** @file BlockQueue.h + * @author Gav Wood + * @date 2014 + */ + +#pragma once + +#include +#include +#include "libethcore/CommonEth.h" +#include "Guards.h" + +namespace eth +{ + +class BlockChain; + +/** + * @brief A queue of blocks. Sits between network or other I/O and the BlockChain. + * Sorts them ready for blockchain insertion (with the BlockChain::sync() method). + * @threadsafe + */ +class BlockQueue +{ +public: + /// Import a block into the queue. + bool import(bytesConstRef _tx, BlockChain const& _bc); + + /// Grabs the blocks that are ready, giving them in the correct order for insertion into the chain. + void drain(std::vector& o_out) { WriteGuard l(m_lock); swap(o_out, m_ready); m_readySet.clear(); } + + /// Notify the queue that the chain has changed and a new block has attained 'ready' status (i.e. is in the chain). + void noteReady(h256 _b) { WriteGuard l(m_lock); noteReadyWithoutWriteGuard(_b); } + + /// Get information on the items queued. + std::pair items() const { ReadGuard l(m_lock); return std::make_pair(m_ready.size(), m_future.size()); } + +private: + void noteReadyWithoutWriteGuard(h256 _b); + + mutable boost::shared_mutex m_lock; ///< General lock. + std::set m_readySet; ///< All blocks ready for chain-import. + std::vector m_ready; ///< List of blocks, in correct order, ready for chain-import. + std::set m_futureSet; ///< Set of all blocks whose parents are not ready/in-chain. + std::multimap> m_future; ///< For transactions that have an unknown parent; we map their parent hash to the block stuff, and insert once the block appears. +}; + +} + + diff --git a/libethereum/Client.cpp b/libethereum/Client.cpp index d2905234d..2b2d1211e 100644 --- a/libethereum/Client.cpp +++ b/libethereum/Client.cpp @@ -198,40 +198,43 @@ void Client::startNetwork(unsigned short _listenPort, std::string const& _seedHo { ensureWorking(); - ClientGuard l(this); - if (m_net.get()) - return; - try - { - m_net.reset(new PeerServer(m_clientVersion, m_bc, 0, _listenPort, _mode, _publicIP, _upnp)); - } - catch (std::exception const&) { - // Probably already have the port open. - cwarn << "Could not initialize with specified/default port. Trying system-assigned port"; - m_net.reset(new PeerServer(m_clientVersion, m_bc, 0, _mode, _publicIP, _upnp)); + Guard l(x_net); + if (m_net.get()) + return; + try + { + m_net.reset(new PeerServer(m_clientVersion, m_bc, 0, _listenPort, _mode, _publicIP, _upnp)); + } + catch (std::exception const&) + { + // Probably already have the port open. + cwarn << "Could not initialize with specified/default port. Trying system-assigned port"; + m_net.reset(new PeerServer(m_clientVersion, m_bc, 0, _mode, _publicIP, _upnp)); + } + + m_net->setIdealPeerCount(_peers); } - m_net->setIdealPeerCount(_peers); if (_seedHost.size()) connect(_seedHost, _port); } std::vector Client::peers() { - ClientGuard l(this); + Guard l(x_net); return m_net ? m_net->peers() : std::vector(); } size_t Client::peerCount() const { - ClientGuard l(this); + Guard l(x_net); return m_net ? m_net->peerCount() : 0; } void Client::connect(std::string const& _seedHost, unsigned short _port) { - ClientGuard l(this); + Guard l(x_net); if (!m_net.get()) return; m_net->connect(_seedHost, _port); @@ -239,7 +242,7 @@ void Client::connect(std::string const& _seedHost, unsigned short _port) void Client::stopNetwork() { - ClientGuard l(this); + Guard l(x_net); m_net.reset(nullptr); } @@ -308,27 +311,26 @@ void Client::work(bool _justQueue) // Process network events. // Synchronise block chain with network. // Will broadcast any of our (new) transactions and blocks, and collect & add any of their (new) transactions and blocks. - if (m_net && !_justQueue) { - cdebug << "--- WORK: LOCK"; - ClientGuard l(this); - cdebug << "--- WORK: NETWORK"; - m_net->process(); // must be in guard for now since it uses the blockchain. - - // returns h256Set as block hashes, once for each block that has come in/gone out. - cdebug << "--- WORK: TQ <== NET ==> CHAIN"; - h256Set newBlocks = m_net->sync(m_bc, m_tq, m_stateDB, 100); - if (newBlocks.size()) + Guard l(x_net); + if (m_net && !_justQueue) { - for (auto i: newBlocks) - appendFromNewBlock(i, changeds); - changeds.insert(NewBlockFilter); + cdebug << "--- WORK: NETWORK"; + m_net->process(); // must be in guard for now since it uses the blockchain. + + // returns h256Set as block hashes, once for each block that has come in/gone out. + cdebug << "--- WORK: NET <==> TQ ; CHAIN ==> NET ==> BQ"; + m_net->sync(m_tq, m_bq); + + cdebug << "--- TQ:" << m_tq.items() << "; BQ:" << m_bq.items(); } } // Do some mining. if (!_justQueue) { + + // TODO: Separate "Miner" object. if (m_doMine) { if (m_restartMining) @@ -402,6 +404,21 @@ void Client::work(bool _justQueue) // Resynchronise state with block chain & trans { ClientGuard l(this); + + cdebug << "--- WORK: BQ ==> CHAIN ==> STATE"; + OverlayDB db = m_stateDB; + m_lock.unlock(); + h256s newBlocks = m_bc.sync(m_bq, db, 100); + if (newBlocks.size()) + { + for (auto i: newBlocks) + appendFromNewBlock(i, changeds); + changeds.insert(NewBlockFilter); + } + m_lock.lock(); + if (newBlocks.size()) + m_stateDB = db; + cdebug << "--- WORK: preSTATE <== CHAIN"; if (m_preMine.sync(m_bc) || m_postMine.address() != m_preMine.address()) { diff --git a/libethereum/Client.h b/libethereum/Client.h index d589d30ce..008cf0c52 100644 --- a/libethereum/Client.h +++ b/libethereum/Client.h @@ -252,9 +252,9 @@ public: /// Stop the network subsystem. void stopNetwork(); /// Is the network subsystem up? - bool haveNetwork() { return !!m_net; } - /// Get access to the peer server object. This will be null if the network isn't online. - PeerServer* peerServer() const { return m_net.get(); } + bool haveNetwork() { Guard l(x_net); return !!m_net; } + /// Get access to the peer server object. This will be null if the network isn't online. DANGEROUS! DO NOT USE! + PeerServer* peerServer() const { Guard l(x_net); return m_net.get(); } // Mining stuff: @@ -311,11 +311,13 @@ private: std::string m_clientVersion; ///< Our end-application client's name/version. VersionChecker m_vc; ///< Dummy object to check & update the protocol version. BlockChain m_bc; ///< Maintains block database. - TransactionQueue m_tq; ///< Maintains list of incoming transactions not yet on the block chain. + TransactionQueue m_tq; ///< Maintains a list of incoming transactions not yet in a block on the blockchain. + BlockQueue m_bq; ///< Maintains a list of incoming blocks not yet on the blockchain (to be imported). OverlayDB m_stateDB; ///< Acts as the central point for the state database, so multiple States can share it. State m_preMine; ///< The present state of the client. State m_postMine; ///< The state of the client which we're mining (i.e. it'll have all the rewards added). + mutable std::mutex x_net; ///< Lock for the network. std::unique_ptr m_net; ///< Should run in background and send us events when blocks found and allow us to send blocks as required. std::unique_ptr m_work;///< The work thread. diff --git a/libethereum/Guards.h b/libethereum/Guards.h index cf047a3b3..64a95eb78 100644 --- a/libethereum/Guards.h +++ b/libethereum/Guards.h @@ -21,11 +21,14 @@ #pragma once +#include #include namespace eth { +using Guard = std::lock_guard; +using RecursiveGuard = std::lock_guard; using ReadGuard = boost::shared_lock; using UpgradableGuard = boost::upgrade_lock; using UpgradeGuard = boost::upgrade_to_unique_lock; diff --git a/libethereum/PeerServer.cpp b/libethereum/PeerServer.cpp index 6f9e779ae..73959f8f7 100644 --- a/libethereum/PeerServer.cpp +++ b/libethereum/PeerServer.cpp @@ -39,6 +39,7 @@ #include #include "BlockChain.h" #include "TransactionQueue.h" +#include "BlockQueue.h" #include "PeerSession.h" using namespace std; using namespace eth; @@ -106,9 +107,34 @@ PeerServer::PeerServer(std::string const& _clientVersion, BlockChain const& _ch, PeerServer::~PeerServer() { - for (auto const& i: m_peers) - if (auto p = i.second.lock()) - p->disconnect(ClientQuit); + disconnectPeers(); +} + +void PeerServer::registerPeer(std::shared_ptr _s) +{ + Guard l(x_peers); + m_peers[_s->m_id] = _s; +} + +void PeerServer::disconnectPeers() +{ + for (unsigned n = 0;; n = 0) + { + { + Guard l(x_peers); + for (auto i: m_peers) + if (auto p = i.second.lock()) + { + p->disconnect(ClientQuit); + n++; + } + } + if (!n) + break; + m_ioService.poll(); + usleep(100000); + } + delete m_upnp; } @@ -252,6 +278,7 @@ std::map PeerServer::potentialPeers() std::map ret; if (!m_public.address().is_unspecified()) ret.insert(make_pair(m_key.pub(), m_public)); + Guard l(x_peers); for (auto i: m_peers) if (auto j = i.second.lock()) { @@ -288,7 +315,7 @@ void PeerServer::ensureAccepting() clog(NetWarn) << "ERROR: " << _e.what(); } m_accepting = false; - if (ec.value() != 1 && (m_mode == NodeMode::PeerServer || m_peers.size() < m_idealPeerCount * 2)) + if (ec.value() != 1 && (m_mode == NodeMode::PeerServer || peerCount() < m_idealPeerCount * 2)) ensureAccepting(); }); } @@ -336,12 +363,12 @@ void PeerServer::connect(bi::tcp::endpoint const& _ep) }); } -bool PeerServer::ensureInitialised(BlockChain& _bc, TransactionQueue& _tq) +bool PeerServer::ensureInitialised(TransactionQueue& _tq) { if (m_latestBlockSent == h256()) { // First time - just initialise. - m_latestBlockSent = _bc.currentHash(); + m_latestBlockSent = m_chain->currentHash(); clog(NetNote) << "Initialising: latest=" << m_latestBlockSent; for (auto const& i: _tq.transactions()) @@ -363,146 +390,141 @@ bool PeerServer::noteBlock(h256 _hash, bytesConstRef _data) return false; } -h256Set PeerServer::sync(BlockChain& _bc, TransactionQueue& _tq, OverlayDB& _o, unsigned _max) +bool PeerServer::sync(TransactionQueue& _tq, BlockQueue& _bq) { - h256Set ret; - - bool netChange = ensureInitialised(_bc, _tq); + bool netChange = ensureInitialised(_tq); if (m_mode == NodeMode::Full) { - for (auto it = m_incomingTransactions.begin(); it != m_incomingTransactions.end(); ++it) - if (_tq.import(&*it)) - {}//ret = true; // just putting a transaction in the queue isn't enough to change the state - it might have an invalid nonce... - else - m_transactionsSent.insert(sha3(*it)); // if we already had the transaction, then don't bother sending it on. - m_incomingTransactions.clear(); + auto h = m_chain->currentHash(); - auto h = _bc.currentHash(); - bool resendAll = (h != m_latestBlockSent); + maintainTransactions(_tq, h); + maintainBlocks(_bq, h); - // Send any new transactions. - for (auto j: m_peers) - if (auto p = j.second.lock()) - { - bytes b; - uint n = 0; - for (auto const& i: _tq.transactions()) - if ((!m_transactionsSent.count(i.first) && !p->m_knownTransactions.count(i.first)) || p->m_requireTransactions || resendAll) - { - b += i.second; - ++n; - m_transactionsSent.insert(i.first); - } - if (n) - { - RLPStream ts; - PeerSession::prep(ts); - ts.appendList(n + 1) << TransactionsPacket; - ts.appendRaw(b, n).swapOut(b); - seal(b); - p->send(&b); - } - p->m_knownTransactions.clear(); - p->m_requireTransactions = false; - } + // Connect to additional peers + growPeers(); + } - // Send any new blocks. - if (h != m_latestBlockSent) + // platform for consensus of social contract. + // restricts your freedom but does so fairly. and that's the value proposition. + // guarantees that everyone else respect the rules of the system. (i.e. obeys laws). + + prunePeers(); + + return netChange; +} + +void PeerServer::maintainTransactions(TransactionQueue& _tq, h256 _currentHash) +{ + bool resendAll = (_currentHash != m_latestBlockSent); + + for (auto it = m_incomingTransactions.begin(); it != m_incomingTransactions.end(); ++it) + if (_tq.import(&*it)) + {}//ret = true; // just putting a transaction in the queue isn't enough to change the state - it might have an invalid nonce... + else + m_transactionsSent.insert(sha3(*it)); // if we already had the transaction, then don't bother sending it on. + m_incomingTransactions.clear(); + + // Send any new transactions. + Guard l(x_peers); + for (auto j: m_peers) + if (auto p = j.second.lock()) { - // TODO: find where they diverge and send complete new branch. - RLPStream ts; - PeerSession::prep(ts); - ts.appendList(2) << BlocksPacket; bytes b; - ts.appendRaw(_bc.block(_bc.currentHash())).swapOut(b); - seal(b); - for (auto j: m_peers) - if (auto p = j.second.lock()) + uint n = 0; + for (auto const& i: _tq.transactions()) + if ((!m_transactionsSent.count(i.first) && !p->m_knownTransactions.count(i.first)) || p->m_requireTransactions || resendAll) { - if (!p->m_knownBlocks.count(_bc.currentHash())) - p->send(&b); - p->m_knownBlocks.clear(); + b += i.second; + ++n; + m_transactionsSent.insert(i.first); } + if (n) + { + RLPStream ts; + PeerSession::prep(ts); + ts.appendList(n + 1) << TransactionsPacket; + ts.appendRaw(b, n).swapOut(b); + seal(b); + p->send(&b); + } + p->m_knownTransactions.clear(); + p->m_requireTransactions = false; } - m_latestBlockSent = h; +} - unsigned totalAccepted = 0; - for (int accepted = 1, n = 0; accepted && totalAccepted < _max; ++n) - { - accepted = 0; - lock_guard l(m_incomingLock); - if (m_incomingBlocks.size()) - for (auto it = prev(m_incomingBlocks.end()); totalAccepted < _max; --it) - { - try - { - for (auto h: _bc.import(*it, _o)) - ret.insert(h); - it = m_incomingBlocks.erase(it); - ++accepted; - ++totalAccepted; - netChange = true; - } - catch (UnknownParent) - { - // Don't (yet) know its parent. Leave it for later. - m_unknownParentBlocks.push_back(*it); - it = m_incomingBlocks.erase(it); - } - catch (...) - { - // Some other error - erase it. - it = m_incomingBlocks.erase(it); - } +void PeerServer::maintainBlocks(BlockQueue& _bq, h256 _currentHash) +{ + // Import new blocks + { + lock_guard l(m_incomingLock); + for (auto it = m_incomingBlocks.rbegin(); it != m_incomingBlocks.rend(); ++it) + if (_bq.import(&*it, *m_chain)) + {} + else{} // TODO: don't forward it. + m_incomingBlocks.clear(); + } - if (it == m_incomingBlocks.begin()) - break; - } - if (!n && accepted) + // Send any new blocks. + if (_currentHash != m_latestBlockSent) + { + // TODO: find where they diverge and send complete new branch. + RLPStream ts; + PeerSession::prep(ts); + ts.appendList(2) << BlocksPacket; + bytes b; + ts.appendRaw(m_chain->block()).swapOut(b); + seal(b); + + Guard l(x_peers); + for (auto j: m_peers) + if (auto p = j.second.lock()) { - for (auto i: m_unknownParentBlocks) - m_incomingBlocks.push_back(i); - m_unknownParentBlocks.clear(); + if (!p->m_knownBlocks.count(_currentHash)) + p->send(&b); + p->m_knownBlocks.clear(); } - } + } + m_latestBlockSent = _currentHash; +} - // Connect to additional peers - while (m_peers.size() < m_idealPeerCount) +void PeerServer::growPeers() +{ + Guard l(x_peers); + while (m_peers.size() < m_idealPeerCount) + { + if (m_freePeers.empty()) { - if (m_freePeers.empty()) + if (chrono::steady_clock::now() > m_lastPeersRequest + chrono::seconds(10)) { - if (chrono::steady_clock::now() > m_lastPeersRequest + chrono::seconds(10)) - { - RLPStream s; - bytes b; - (PeerSession::prep(s).appendList(1) << GetPeersPacket).swapOut(b); - seal(b); - for (auto const& i: m_peers) - if (auto p = i.second.lock()) - if (p->isOpen()) - p->send(&b); - m_lastPeersRequest = chrono::steady_clock::now(); - } - + RLPStream s; + bytes b; + (PeerSession::prep(s).appendList(1) << GetPeersPacket).swapOut(b); + seal(b); + for (auto const& i: m_peers) + if (auto p = i.second.lock()) + if (p->isOpen()) + p->send(&b); + m_lastPeersRequest = chrono::steady_clock::now(); + } - if (!m_accepting) - ensureAccepting(); - break; - } + if (!m_accepting) + ensureAccepting(); - auto x = time(0) % m_freePeers.size(); - m_incomingPeers[m_freePeers[x]].second++; - connect(m_incomingPeers[m_freePeers[x]].first); - m_freePeers.erase(m_freePeers.begin() + x); + break; } - } - // platform for consensus of social contract. - // restricts your freedom but does so fairly. and that's the value proposition. - // guarantees that everyone else respect the rules of the system. (i.e. obeys laws). + auto x = time(0) % m_freePeers.size(); + m_incomingPeers[m_freePeers[x]].second++; + connect(m_incomingPeers[m_freePeers[x]].first); + m_freePeers.erase(m_freePeers.begin() + x); + } +} +void PeerServer::prunePeers() +{ + Guard l(x_peers); // We'll keep at most twice as many as is ideal, halfing what counts as "too young to kill" until we get there. for (uint old = 15000; m_peers.size() > m_idealPeerCount * 2 && old > 100; old /= 2) while (m_peers.size() > m_idealPeerCount) @@ -524,12 +546,17 @@ h256Set PeerServer::sync(BlockChain& _bc, TransactionQueue& _tq, OverlayDB& _o, worst->disconnect(TooManyPeers); } - (void)netChange; - return ret; + // Remove dead peers from list. + for (auto i = m_peers.begin(); i != m_peers.end();) + if (i->second.lock().get()) + ++i; + else + i = m_peers.erase(i); } std::vector PeerServer::peers(bool _updatePing) const { + Guard l(x_peers); if (_updatePing) const_cast(this)->pingAll(); this_thread::sleep_for(chrono::milliseconds(200)); @@ -543,6 +570,7 @@ std::vector PeerServer::peers(bool _updatePing) const void PeerServer::pingAll() { + Guard l(x_peers); for (auto& i: m_peers) if (auto j = i.second.lock()) j->ping(); @@ -550,6 +578,7 @@ void PeerServer::pingAll() bytes PeerServer::savePeers() const { + Guard l(x_peers); RLPStream ret; int n = 0; for (auto& i: m_peers) diff --git a/libethereum/PeerServer.h b/libethereum/PeerServer.h index 465a4dc41..ccb73eab1 100644 --- a/libethereum/PeerServer.h +++ b/libethereum/PeerServer.h @@ -30,12 +30,20 @@ #include #include #include "PeerNetwork.h" +#include "Guards.h" namespace ba = boost::asio; namespace bi = boost::asio::ip; namespace eth { +class TransactionQueue; +class BlockQueue; + +/** + * @brief The PeerServer class + * @warning None of this is thread-safe. You have been warned. + */ class PeerServer { friend class PeerSession; @@ -48,8 +56,12 @@ public: /// Start server, but don't listen. PeerServer(std::string const& _clientVersion, BlockChain const& _ch, unsigned int _networkId, NodeMode _m = NodeMode::Full); + /// Will block on network process events. ~PeerServer(); + /// Closes all peers. + void disconnectPeers(); + static unsigned protocolVersion(); unsigned networkId() { return m_networkId; } @@ -58,13 +70,15 @@ public: void connect(bi::tcp::endpoint const& _ep); /// Sync with the BlockChain. It might contain one of our mined blocks, we might have new candidates from the network. - h256Set sync(BlockChain& _bc, TransactionQueue&, OverlayDB& _o, unsigned _max); + bool sync(TransactionQueue&, BlockQueue& _bc); /// Conduct I/O, polling, syncing, whatever. /// Ideally all time-consuming I/O is done in a background thread or otherwise asynchronously, but you get this call every 100ms or so anyway. /// This won't touch alter the blockchain. void process() { if (isInitialised()) m_ioService.poll(); } + bool havePeer(Public _id) const { Guard l(x_peers); return m_peers.count(_id); } + /// Set ideal number of peers. void setIdealPeerCount(unsigned _n) { m_idealPeerCount = _n; } @@ -74,7 +88,7 @@ public: std::vector peers(bool _updatePing = false) const; /// Get number of peers connected; equivalent to, but faster than, peers().size(). - size_t peerCount() const { return m_peers.size(); } + size_t peerCount() const { Guard l(x_peers); return m_peers.size(); } /// Ping the peers, to update the latency information. void pingAll(); @@ -85,6 +99,8 @@ public: bytes savePeers() const; void restorePeers(bytesConstRef _b); + void registerPeer(std::shared_ptr _s); + private: /// Session wants to pass us a block that we might not have. /// @returns true if we didn't have it. @@ -95,10 +111,15 @@ private: void determinePublic(std::string const& _publicAddress, bool _upnp); void ensureAccepting(); + void growPeers(); + void prunePeers(); + void maintainTransactions(TransactionQueue& _tq, h256 _currentBlock); + void maintainBlocks(BlockQueue& _bq, h256 _currentBlock); + /// Check to see if the network peer-state initialisation has happened. bool isInitialised() const { return m_latestBlockSent; } /// Initialises the network peer-state, doing the stuff that needs to be once-only. @returns true if it really was first. - bool ensureInitialised(BlockChain& _bc, TransactionQueue& _tq); + bool ensureInitialised(TransactionQueue& _tq); std::map potentialPeers(); @@ -117,14 +138,15 @@ private: KeyPair m_key; unsigned m_networkId; + + mutable std::mutex x_peers; std::map> m_peers; + mutable std::recursive_mutex m_incomingLock; std::vector m_incomingTransactions; std::vector m_incomingBlocks; - mutable std::recursive_mutex m_incomingLock; - std::vector m_unknownParentBlocks; - std::vector m_freePeers; std::map> m_incomingPeers; + std::vector m_freePeers; h256 m_latestBlockSent; std::set m_transactionsSent; diff --git a/libethereum/PeerSession.cpp b/libethereum/PeerSession.cpp index 239163e86..281e834f0 100644 --- a/libethereum/PeerSession.cpp +++ b/libethereum/PeerSession.cpp @@ -86,15 +86,13 @@ bool PeerSession::interpret(RLP const& _r) clogS(NetMessageSummary) << "Hello: " << clientVersion << "V[" << m_protocolVersion << "/" << m_networkId << "]" << m_id.abridged() << showbase << hex << m_caps << dec << m_listenPort; - if (m_server->m_peers.count(m_id)) - if (auto l = m_server->m_peers[m_id].lock()) - if (l.get() != this && l->isOpen()) - { - // Already connected. - cwarn << "Already have peer id" << m_id.abridged() << "at" << l->endpoint() << "rather than" << endpoint(); - disconnect(DuplicatePeer); - return false; - } + if (m_server->havePeer(m_id)) + { + // Already connected. + cwarn << "Already have peer id" << m_id.abridged();// << "at" << l->endpoint() << "rather than" << endpoint(); + disconnect(DuplicatePeer); + return false; + } if (m_protocolVersion != PeerServer::protocolVersion() || m_networkId != m_server->networkId() || !m_id) { @@ -109,7 +107,8 @@ bool PeerSession::interpret(RLP const& _r) return false; } - m_server->m_peers[m_id] = shared_from_this(); + m_server->registerPeer(shared_from_this()); + startInitialSync(); // Grab trsansactions off them. { @@ -173,7 +172,7 @@ bool PeerSession::interpret(RLP const& _r) clogS(NetAllDetail) << "Checking: " << ep << "(" << toHex(id.ref().cropped(0, 4)) << ")"; // check that it's not us or one we already know: - if (id && (m_server->m_key.pub() == id || m_server->m_peers.count(id) || m_server->m_incomingPeers.count(id))) + if (id && (m_server->m_key.pub() == id || m_server->havePeer(id) || m_server->m_incomingPeers.count(id))) goto CONTINUE; // check that we're not already connected to addr: @@ -182,13 +181,6 @@ bool PeerSession::interpret(RLP const& _r) for (auto i: m_server->m_addresses) if (ep.address() == i && ep.port() == m_server->listenPort()) goto CONTINUE; - for (auto i: m_server->m_peers) - if (shared_ptr p = i.second.lock()) - { - clogS(NetAllDetail) << " ...against " << p->endpoint(); - if (p->m_socket.is_open() && p->endpoint() == ep) - goto CONTINUE; - } for (auto i: m_server->m_incomingPeers) if (i.second.first == ep) goto CONTINUE; @@ -492,14 +484,6 @@ void PeerSession::dropped() m_socket.close(); } catch (...) {} - - // Remove from peer server - for (auto i = m_server->m_peers.begin(); i != m_server->m_peers.end(); ++i) - if (i->second.lock().get() == this) - { - m_server->m_peers.erase(i); - break; - } } void PeerSession::disconnect(int _reason) diff --git a/libethereum/TransactionQueue.cpp b/libethereum/TransactionQueue.cpp index 9a8c4c20a..750243de2 100644 --- a/libethereum/TransactionQueue.cpp +++ b/libethereum/TransactionQueue.cpp @@ -31,9 +31,7 @@ bool TransactionQueue::import(bytesConstRef _block) { // Check if we already know this transaction. h256 h = sha3(_block); - - UpgradableGuard l(x_data); - if (m_data.count(h)) + if (m_known.count(h)) return false; try @@ -44,8 +42,7 @@ bool TransactionQueue::import(bytesConstRef _block) auto s = t.sender(); // If valid, append to blocks. - UpgradeGuard ul(l); - m_data[h] = _block.toBytes(); + m_current[h] = _block.toBytes(); } catch (InvalidTransactionFormat const& _e) { @@ -63,20 +60,36 @@ bool TransactionQueue::import(bytesConstRef _block) void TransactionQueue::setFuture(std::pair const& _t) { - UpgradableGuard l(x_data); - if (m_data.count(_t.first)) + if (m_current.count(_t.first)) { - UpgradeGuard ul(l); - m_data.erase(_t.first); + m_current.erase(_t.first); m_future.insert(make_pair(Transaction(_t.second).sender(), _t)); } } void TransactionQueue::noteGood(std::pair const& _t) { - WriteGuard l(x_data); auto r = m_future.equal_range(Transaction(_t.second).sender()); for (auto it = r.first; it != r.second; ++it) - m_data.insert(_t); + m_current.insert(it->second); m_future.erase(r.first, r.second); } + +void TransactionQueue::drop(h256 _txHash) +{ + WriteGuard l(m_lock); + if (!m_known.erase(_txHash)) + return; + + if (m_current.count(_txHash)) + m_current.erase(_txHash); + else + { + for (auto i = m_future.begin(); i != m_future.end(); ++i) + if (i->second.first == _txHash) + { + m_future.erase(i); + break; + } + } +} diff --git a/libethereum/TransactionQueue.h b/libethereum/TransactionQueue.h index d3ad354cc..2c6556a71 100644 --- a/libethereum/TransactionQueue.h +++ b/libethereum/TransactionQueue.h @@ -38,21 +38,22 @@ class BlockChain; class TransactionQueue { public: - bool attemptImport(bytesConstRef _tx) { try { import(_block); return true; } catch (...) { return false; } } - bool attemptImport(bytes const& _tx) { return attemptImport(&_block); } + bool attemptImport(bytesConstRef _tx) { try { import(_tx); return true; } catch (...) { return false; } } + bool attemptImport(bytes const& _tx) { return attemptImport(&_tx); } bool import(bytesConstRef _tx); - void drop(h256 _txHash) { WriteGuard l(x_data); m_data.erase(_txHash); } + void drop(h256 _txHash); - std::map transactions() const { ReadGuard l(x_data); return m_data; } + std::map transactions() const { ReadGuard l(m_lock); return m_current; } + std::pair items() const { ReadGuard l(m_lock); return std::make_pair(m_current.size(), m_future.size()); } void setFuture(std::pair const& _t); void noteGood(std::pair const& _t); private: - std::map m_data; ///< Map of SHA3(tx) to tx. - boost::shared_mutex x_data; - + mutable boost::shared_mutex m_lock; ///< General lock. + std::set m_known; ///< Hashes of transactions in both sets. + std::map m_current; ///< Map of SHA3(tx) to tx. std::multimap> m_future; ///< For transactions that have a future nonce; we map their sender address to the tx stuff, and insert once the sender has a valid TX. };