From 9e76b49578ef1088bfc0218d4ea9b8925e681425 Mon Sep 17 00:00:00 2001 From: Gav Wood Date: Mon, 1 Sep 2014 16:45:58 +0200 Subject: [PATCH] Slowing bashing code into shape. --- libethereum/Client.cpp | 5 +- libethereum/Client.h | 9 +- libethereum/CommonNet.h | 4 +- libethereum/EthereumHost.cpp | 84 +++++++++---------- libethereum/EthereumHost.h | 5 +- .../{EthereumSession.cpp => EthereumPeer.cpp} | 76 +++++++++-------- .../{EthereumSession.h => EthereumPeer.h} | 18 ++-- libethereumx/Ethereum.cpp | 8 +- libethereumx/Ethereum.h | 4 +- libethnet/Common.cpp | 47 ++++++++++- libethnet/Common.h | 35 +++++--- libethnet/PeerHost.cpp | 13 +-- libethnet/PeerHost.h | 5 +- libethnet/PeerSession.cpp | 18 ++-- libethnet/PeerSession.h | 13 ++- 15 files changed, 215 insertions(+), 129 deletions(-) rename libethereum/{EthereumSession.cpp => EthereumPeer.cpp} (76%) rename libethereum/{EthereumSession.h => EthereumPeer.h} (86%) diff --git a/libethereum/Client.cpp b/libethereum/Client.cpp index c62fe42de..c5bdf4c1a 100644 --- a/libethereum/Client.cpp +++ b/libethereum/Client.cpp @@ -25,6 +25,7 @@ #include #include #include +#include #include "Defaults.h" #include "EthereumHost.h" using namespace std; @@ -213,7 +214,9 @@ void Client::startNetwork(unsigned short _listenPort, std::string const& _seedHo try { - m_net.reset(new EthereumHost(m_clientVersion, m_bc, _networkId, _listenPort, _mode, _publicIP, _upnp)); + m_net.reset(new PeerHost(m_clientVersion, _listenPort, _publicIP, _upnp)); + if (_mode == NodeMode::Full) + m_net->registerCapability(new EthereumHost(m_bc, _networkId)); } catch (std::exception const&) { diff --git a/libethereum/Client.h b/libethereum/Client.h index 9499b0d4c..2848bd85a 100644 --- a/libethereum/Client.h +++ b/libethereum/Client.h @@ -28,9 +28,10 @@ #include #include #include +#include #include #include -#include +#include #include "BlockChain.h" #include "TransactionQueue.h" #include "State.h" @@ -51,6 +52,12 @@ enum ClientWorkState Deleted }; +enum class NodeMode +{ + Peer, + Full +}; + class VersionChecker { public: diff --git a/libethereum/CommonNet.h b/libethereum/CommonNet.h index 0b848e2c3..1ab4ab2d3 100644 --- a/libethereum/CommonNet.h +++ b/libethereum/CommonNet.h @@ -41,9 +41,9 @@ class OverlayDB; class BlockChain; class TransactionQueue; class EthereumHost; -class EthereumSession; +class EthereumPeer; -enum PacketType +enum { StatusPacket = 0x10, GetTransactionsPacket, diff --git a/libethereum/EthereumHost.cpp b/libethereum/EthereumHost.cpp index d5d8e08cd..836f3f8f0 100644 --- a/libethereum/EthereumHost.cpp +++ b/libethereum/EthereumHost.cpp @@ -28,26 +28,27 @@ #include #include #include +#include #include #include #include "BlockChain.h" #include "TransactionQueue.h" #include "BlockQueue.h" -#include "EthereumSession.h" +#include "EthereumPeer.h" using namespace std; using namespace eth; EthereumHost::EthereumHost(BlockChain const& _ch, u256 _networkId): - m_chain(&_ch), - m_networkId(_networkId) + HostCapability(), + m_chain (&_ch), + m_networkId (_networkId) { } EthereumHost::~EthereumHost() { - for (auto i: host()->m_peers) - if (shared_ptr p = i.second.lock()) - p->giveUpOnFetch(); + for (auto const& i: peers()) + i->cap()->giveUpOnFetch(); } h256Set EthereumHost::neededBlocks(h256Set const& _exclude) @@ -84,7 +85,6 @@ bool EthereumHost::ensureInitialised(TransactionQueue& _tq) for (auto const& i: _tq.transactions()) m_transactionsSent.insert(i.first); - m_lastPeersRequest = chrono::steady_clock::time_point::min(); return true; } return false; @@ -135,31 +135,30 @@ void EthereumHost::maintainTransactions(TransactionQueue& _tq, h256 _currentHash m_incomingTransactions.clear(); // Send any new transactions. - Guard l(host()->x_peers); - for (auto j: host()->m_peers) - if (auto p = j.second.lock()) - { - bytes b; - uint n = 0; - for (auto const& i: _tq.transactions()) - if ((!m_transactionsSent.count(i.first) && !p->m_knownTransactions.count(i.first)) || p->m_requireTransactions || resendAll) - { - b += i.second; - ++n; - m_transactionsSent.insert(i.first); - } - if (n) + for (auto const& p: peers()) + { + auto ep = p->cap(); + bytes b; + uint n = 0; + for (auto const& i: _tq.transactions()) + if ((!m_transactionsSent.count(i.first) && !ep->m_knownTransactions.count(i.first)) || ep->m_requireTransactions || resendAll) { - RLPStream ts; - EthereumSession::prep(ts); - ts.appendList(n + 1) << TransactionsPacket; - ts.appendRaw(b, n).swapOut(b); - seal(b); - p->send(&b); + b += i.second; + ++n; + m_transactionsSent.insert(i.first); } - p->m_knownTransactions.clear(); - p->m_requireTransactions = false; + if (n) + { + RLPStream ts; + EthereumPeer::prep(ts); + ts.appendList(n + 1) << TransactionsPacket; + ts.appendRaw(b, n).swapOut(b); + seal(b); + ep->send(&b); } + ep->m_knownTransactions.clear(); + ep->m_requireTransactions = false; + } } void EthereumHost::maintainBlocks(BlockQueue& _bq, h256 _currentHash) @@ -184,7 +183,7 @@ void EthereumHost::maintainBlocks(BlockQueue& _bq, h256 _currentHash) if (m_latestBlockSent != _currentHash) { RLPStream ts; - EthereumSession::prep(ts); + EthereumPeer::prep(ts); bytes bs; unsigned c = 0; for (auto h: m_chain->treeRoute(m_latestBlockSent, _currentHash, nullptr, false, true)) @@ -198,19 +197,18 @@ void EthereumHost::maintainBlocks(BlockQueue& _bq, h256 _currentHash) ts.swapOut(b); seal(b); - Guard l(host()->x_peers); - for (auto j: host()->m_peers) - if (auto p = j.second.lock()) - { - if (!p->m_knownBlocks.count(_currentHash)) - p->send(&b); - p->m_knownBlocks.clear(); - } + for (auto j: peers()) + { + auto p = j->cap(); + if (!p->m_knownBlocks.count(_currentHash)) + p->send(&b); + p->m_knownBlocks.clear(); + } m_latestBlockSent = _currentHash; } } -void EthereumHost::noteHaveChain(std::shared_ptr const& _from) +void EthereumHost::noteHaveChain(EthereumPeer* _from) { auto td = _from->m_totalDifficulty; @@ -235,10 +233,6 @@ void EthereumHost::noteHaveChain(std::shared_ptr const& _from) m_totalDifficultyOfNeeded = td; } - { - Guard l(host()->x_peers); - for (auto const& i: host()->m_peers) - if (shared_ptr p = i.second.lock()) - p->restartGettingChain(); - } + for (auto j: peers()) + j->cap()->restartGettingChain(); } diff --git a/libethereum/EthereumHost.h b/libethereum/EthereumHost.h index f172ea508..d2c28644b 100644 --- a/libethereum/EthereumHost.h +++ b/libethereum/EthereumHost.h @@ -32,13 +32,14 @@ #include #include #include "CommonNet.h" -#include "EthereumSession.h" +#include "EthereumPeer.h" namespace eth { class RLPStream; class TransactionQueue; +class BlockQueue; /** * @brief The EthereumHost class @@ -66,7 +67,7 @@ private: /// @returns true if we didn't have it. bool noteBlock(h256 _hash, bytesConstRef _data); /// Session has finished getting the chain of hashes. - void noteHaveChain(std::shared_ptr const& _who); + void noteHaveChain(EthereumPeer* _who); /// Called when the peer can no longer provide us with any needed blocks. void noteDoneBlocks(); diff --git a/libethereum/EthereumSession.cpp b/libethereum/EthereumPeer.cpp similarity index 76% rename from libethereum/EthereumSession.cpp rename to libethereum/EthereumPeer.cpp index 579aa5354..72de75e35 100644 --- a/libethereum/EthereumSession.cpp +++ b/libethereum/EthereumPeer.cpp @@ -14,24 +14,25 @@ You should have received a copy of the GNU General Public License along with cpp-ethereum. If not, see . */ -/** @file EthereumSession.cpp +/** @file EthereumPeer.cpp * @author Gav Wood * @date 2014 */ -#include "EthereumSession.h" +#include "EthereumPeer.h" #include #include #include +#include #include "BlockChain.h" #include "EthereumHost.h" using namespace std; using namespace eth; -#define clogS(X) eth::LogOutputStream(false) << "| " << std::setw(2) << m_socket.native_handle() << "] " +#define clogS(X) eth::LogOutputStream(false) << "| " << std::setw(2) << session()->id() << "] " -EthereumPeer::EthereumPeer(PeerSession* _s, HostCapability* _h): PeerCapability(_s, _h) +EthereumPeer::EthereumPeer(PeerSession* _s, HostCapabilityFace* _h): PeerCapability(_s, _h) { sendStatus(); } @@ -41,16 +42,21 @@ EthereumPeer::~EthereumPeer() giveUpOnFetch(); } +EthereumHost* EthereumPeer::host() const +{ + return static_cast(PeerCapability::hostCapability()); +} + void EthereumPeer::sendStatus() { RLPStream s; - m_session->prep(s); + prep(s); s.appendList(9) << StatusPacket - << hostCapability()->protocolVersion() - << hostCapability()->networkId() - << hostCapability()->m_chain->details().totalDifficulty - << hostCapability()->m_chain->currentHash() - << hostCapability()->m_chain->genesisHash(); + << host()->protocolVersion() + << host()->networkId() + << host()->m_chain->details().totalDifficulty + << host()->m_chain->currentHash() + << host()->m_chain->genesisHash(); sealAndSend(s); } @@ -64,11 +70,11 @@ void EthereumPeer::startInitialSync() sealAndSend(s); } - h256 c = m_server->m_chain->currentHash(); - uint n = m_server->m_chain->number(); - u256 td = max(m_server->m_chain->details().totalDifficulty, m_server->m_totalDifficultyOfNeeded); + h256 c = host()->m_chain->currentHash(); + uint n = host()->m_chain->number(); + u256 td = max(host()->m_chain->details().totalDifficulty, host()->m_totalDifficultyOfNeeded); - clogS(NetAllDetail) << "Initial sync. Latest:" << c.abridged() << ", number:" << n << ", TD: max(" << m_server->m_chain->details().totalDifficulty << "," << m_server->m_totalDifficultyOfNeeded << ") versus " << m_totalDifficulty; + clogS(NetAllDetail) << "Initial sync. Latest:" << c.abridged() << ", number:" << n << ", TD: max(" << host()->m_chain->details().totalDifficulty << "," << host()->m_totalDifficultyOfNeeded << ") versus " << m_totalDifficulty; if (td > m_totalDifficulty) return; // All good - we have the better chain. @@ -97,13 +103,13 @@ void EthereumPeer::giveUpOnFetch() clogS(NetNote) << "GIVE UP FETCH; can't get " << toString(m_askedBlocks); if (m_askedBlocks.size()) { - Guard l (m_server->x_blocksNeeded); - m_server->m_blocksNeeded.reserve(m_server->m_blocksNeeded.size() + m_askedBlocks.size()); + Guard l (host()->x_blocksNeeded); + host()->m_blocksNeeded.reserve(host()->m_blocksNeeded.size() + m_askedBlocks.size()); for (auto i: m_askedBlocks) { m_failedBlocks.insert(i); - m_server->m_blocksOnWay.erase(i); - m_server->m_blocksNeeded.push_back(i); + host()->m_blocksOnWay.erase(i); + host()->m_blocksNeeded.push_back(i); } m_askedBlocks.clear(); } @@ -123,11 +129,11 @@ bool EthereumPeer::interpret(RLP const& _r) clogS(NetMessageSummary) << "Status: " << m_protocolVersion << "/" << m_networkId << "/" << genesisHash.abridged() << ", TD:" << m_totalDifficulty << "=" << m_latestHash.abridged(); - if (genesisHash != hostCapability()->m_chain->genesisHash()) + if (genesisHash != host()->m_chain->genesisHash()) disable("Invalid genesis hash"); - if (m_protocolVersion != hostCapability()->protocolVersion()) + if (m_protocolVersion != host()->protocolVersion()) disable("Invalid protocol version."); - if (m_networkId != hostCapability()->networkId() || !m_id) + if (m_networkId != host()->networkId()) disable("Invalid network identifier."); startInitialSync(); @@ -140,10 +146,10 @@ bool EthereumPeer::interpret(RLP const& _r) } case TransactionsPacket: clogS(NetMessageSummary) << "Transactions (" << dec << (_r.itemCount() - 1) << " entries)"; - m_rating += _r.itemCount() - 1; + addRating(_r.itemCount() - 1); for (unsigned i = 1; i < _r.itemCount(); ++i) { - m_server->m_incomingTransactions.push_back(_r[i].data().toBytes()); + host()->m_incomingTransactions.push_back(_r[i].data().toBytes()); m_knownTransactions.insert(sha3(_r[i].data())); } break; @@ -153,12 +159,12 @@ bool EthereumPeer::interpret(RLP const& _r) unsigned limit = _r[2].toInt(); clogS(NetMessageSummary) << "GetBlockHashes (" << limit << "entries, " << later.abridged() << ")"; - unsigned c = min(m_server->m_chain->number(later), limit); + unsigned c = min(host()->m_chain->number(later), limit); RLPStream s; prep(s).appendList(1 + c).append(BlockHashesPacket); - h256 p = m_server->m_chain->details(later).parent; - for (unsigned i = 0; i < c; ++i, p = m_server->m_chain->details(p).parent) + h256 p = host()->m_chain->details(later).parent; + for (unsigned i = 0; i < c; ++i, p = host()->m_chain->details(p).parent) s << p; sealAndSend(s); break; @@ -168,15 +174,15 @@ bool EthereumPeer::interpret(RLP const& _r) clogS(NetMessageSummary) << "BlockHashes (" << dec << (_r.itemCount() - 1) << " entries)"; if (_r.itemCount() == 1) { - m_server->noteHaveChain(shared_from_this()); + host()->noteHaveChain(this); return true; } for (unsigned i = 1; i < _r.itemCount(); ++i) { auto h = _r[i].toHash(); - if (m_server->m_chain->details(h)) + if (host()->m_chain->details(h)) { - m_server->noteHaveChain(shared_from_this()); + host()->noteHaveChain(this); return true; } else @@ -197,7 +203,7 @@ bool EthereumPeer::interpret(RLP const& _r) unsigned n = 0; for (unsigned i = 1; i < _r.itemCount() && i <= c_maxBlocks; ++i) { - auto b = m_server->m_chain->block(_r[i].toHash()); + auto b = host()->m_chain->block(_r[i].toHash()); if (b.size()) { rlp += b; @@ -223,12 +229,12 @@ bool EthereumPeer::interpret(RLP const& _r) for (unsigned i = 1; i < _r.itemCount(); ++i) { auto h = BlockInfo::headerHash(_r[i].data()); - if (m_server->noteBlock(h, _r[i].data())) + if (host()->noteBlock(h, _r[i].data())) used++; m_askedBlocks.erase(h); m_knownBlocks.insert(h); } - m_rating += used; + addRating(used); unsigned knownParents = 0; unsigned unknownParents = 0; if (g_logVerbosity >= NetMessageSummary::verbosity) @@ -237,7 +243,7 @@ bool EthereumPeer::interpret(RLP const& _r) { auto h = BlockInfo::headerHash(_r[i].data()); BlockInfo bi(_r[i].data()); - if (!m_server->m_chain->details(bi.parentHash) && !m_knownBlocks.count(bi.parentHash)) + if (!host()->m_chain->details(bi.parentHash) && !m_knownBlocks.count(bi.parentHash)) { unknownParents++; clogS(NetAllDetail) << "Unknown parent " << bi.parentHash << " of block " << h; @@ -282,7 +288,7 @@ void EthereumPeer::ensureGettingChain() void EthereumPeer::continueGettingChain() { if (!m_askedBlocks.size()) - m_askedBlocks = m_server->neededBlocks(m_failedBlocks); + m_askedBlocks = host()->neededBlocks(m_failedBlocks); if (m_askedBlocks.size()) { @@ -296,6 +302,6 @@ void EthereumPeer::continueGettingChain() else { clogS(NetMessageSummary) << "No blocks left to get. Peer doesn't seem to have " << m_failedBlocks.size() << "of our needed blocks."; - m_server->noteDoneBlocks(); + host()->noteDoneBlocks(); } } diff --git a/libethereum/EthereumSession.h b/libethereum/EthereumPeer.h similarity index 86% rename from libethereum/EthereumSession.h rename to libethereum/EthereumPeer.h index 1b10c71fe..3b182b450 100644 --- a/libethereum/EthereumSession.h +++ b/libethereum/EthereumPeer.h @@ -14,7 +14,7 @@ You should have received a copy of the GNU General Public License along with cpp-ethereum. If not, see . */ -/** @file EthereumSession.h +/** @file EthereumPeer.h * @author Gav Wood * @date 2014 */ @@ -28,13 +28,15 @@ #include #include #include +#include #include "CommonNet.h" -#include "EthereumHost.h" namespace eth { -class WhisperSession: public PeerSession +class HostCapabilityFace; + +class WhisperSession: public PeerCapability { public: WhisperSession(); @@ -43,11 +45,11 @@ public: static std::string name() { return "shh"; } private: - virtual bool interpret(RLP const&) {} + virtual bool interpret(RLP const&) { return false; } }; /** - * @brief The EthereumSession class + * @brief The EthereumPeer class * @todo Document fully. */ class EthereumPeer: public PeerCapability @@ -55,12 +57,12 @@ class EthereumPeer: public PeerCapability friend class EthereumHost; public: - EthereumPeer(PeerSession* _s, HostCapability* _h); + EthereumPeer(PeerSession* _s, HostCapabilityFace* _h); virtual ~EthereumPeer(); static std::string name() { return "eth"; } - EthereumHost* hostCapability() const { return static_cast(PeerCapability::hostCapability()); } + EthereumHost* host() const; private: virtual bool interpret(RLP const& _r); @@ -77,8 +79,6 @@ private: void giveUpOnFetch(); - EthereumHost* m_host; - uint m_protocolVersion; u256 m_networkId; diff --git a/libethereumx/Ethereum.cpp b/libethereumx/Ethereum.cpp index a7725abb5..c4cc29d87 100644 --- a/libethereumx/Ethereum.cpp +++ b/libethereumx/Ethereum.cpp @@ -38,11 +38,11 @@ void Ethereum::ensureReady() { m_client = unique_ptr(new Client("+ethereum+")); if (m_client) - startServer(); + startRPCServer(); } catch (DatabaseAlreadyOpen) { - startClient(); + connectToRPCServer(); } } @@ -55,11 +55,11 @@ bool Ethereum::connectionOpen() const return false; } -void Ethereum::startClient() +void Ethereum::connectToRPCServer() { } -void Ethereum::startServer() +void Ethereum::startRPCServer() { } diff --git a/libethereumx/Ethereum.h b/libethereumx/Ethereum.h index fb272c030..0966fd699 100644 --- a/libethereumx/Ethereum.h +++ b/libethereumx/Ethereum.h @@ -132,9 +132,9 @@ private: /// Check to see if the client/server connection is open. bool connectionOpen() const; /// Start the API client. - void startClient(); + void connectToRPCServer(); /// Start the API server. - void startServer(); + void startRPCServer(); std::unique_ptr m_client; diff --git a/libethnet/Common.cpp b/libethnet/Common.cpp index 6224f93ca..f0f0e4d2d 100644 --- a/libethnet/Common.cpp +++ b/libethnet/Common.cpp @@ -20,6 +20,10 @@ */ #include "Common.h" + +#include +#include "PeerSession.h" +#include "PeerHost.h" using namespace std; using namespace eth; @@ -61,6 +65,47 @@ std::string eth::reasonOf(DisconnectReason _r) void PeerCapability::disable(std::string const& _problem) { - clogS(NetConnect) << "Disabling capability '" << m_host->name() << "'. Reason:" << _problem; + clog(NetConnect) << "Disabling capability '" << m_host->name() << "'. Reason:" << _problem; m_enabled = false; } + +void HostCapabilityFace::seal(bytes& _b) +{ + m_host->seal(_b); +} + +std::vector > HostCapabilityFace::peers() const +{ + Guard l(m_host->x_peers); + std::vector > ret; + for (auto const& i: m_host->m_peers) + if (std::shared_ptr p = i.second.lock()) + if (p->m_capabilities.count(name())) + ret.push_back(p); + return ret; +} + +RLPStream& PeerCapability::prep(RLPStream& _s) +{ + return PeerSession::prep(_s); +} + +void PeerCapability::sealAndSend(RLPStream& _s) +{ + m_session->sealAndSend(_s); +} + +void PeerCapability::sendDestroy(bytes& _msg) +{ + m_session->sendDestroy(_msg); +} + +void PeerCapability::send(bytesConstRef _msg) +{ + m_session->send(_msg); +} + +void PeerCapability::addRating(unsigned _r) +{ + m_session->addRating(_r); +} diff --git a/libethnet/Common.h b/libethnet/Common.h index c70e42aaa..ccc066e88 100644 --- a/libethnet/Common.h +++ b/libethnet/Common.h @@ -38,6 +38,7 @@ namespace eth bool isPrivateAddress(bi::address _addressToCheck); class RLP; +class RLPStream; class PeerHost; class PeerSession; @@ -72,6 +73,7 @@ enum DisconnectReason TooManyPeers, DuplicatePeer, IncompatibleProtocol, + InvalidIdentity, ClientQuit, UserReason = 0x10 }; @@ -94,35 +96,41 @@ class PeerCapability; class HostCapabilityFace { friend class PeerHost; + template friend class HostCapability; + friend class PeerCapability; public: - HostCapabilityFace(PeerHost*) {} + HostCapabilityFace() {} virtual ~HostCapabilityFace() {} + PeerHost* host() const { return m_host; } + + std::vector > peers() const; + protected: virtual std::string name() const = 0; virtual PeerCapability* newPeerCapability(PeerSession* _s) = 0; virtual bool isInitialised() const = 0; + + void seal(bytes& _b); + +private: + PeerHost* m_host = nullptr; }; template class HostCapability: public HostCapabilityFace { public: - HostCapability(PeerHost* _h): m_host(_h) {} + HostCapability() {} virtual ~HostCapability() {} static std::string staticName() { return PeerCap::name(); } - PeerHost* host() const { return m_host; } - protected: virtual bool isInitialised() const = 0; virtual std::string name() const { return PeerCap::name(); } virtual PeerCapability* newPeerCapability(PeerSession* _s) { return new PeerCap(_s, this); } - -private: - PeerHost* m_host; }; class PeerCapability @@ -130,23 +138,30 @@ class PeerCapability friend class PeerSession; public: - PeerCapability(PeerSession* _s, HostCapability* _h): m_session(_s), m_host(_h) {} + PeerCapability(PeerSession* _s, HostCapabilityFace* _h): m_session(_s), m_host(_h) {} virtual ~PeerCapability() {} /// Must return the capability name. static std::string name() { return ""; } PeerSession* session() const { return m_session; } - HostCapability* hostCapability() const { return m_host; } + HostCapabilityFace* hostCapability() const { return m_host; } protected: virtual bool interpret(RLP const&) = 0; void disable(std::string const& _problem); + static RLPStream& prep(RLPStream& _s); + void sealAndSend(RLPStream& _s); + void sendDestroy(bytes& _msg); + void send(bytesConstRef _msg); + + void addRating(unsigned _r); + private: PeerSession* m_session; - HostCapability* m_host; + HostCapabilityFace* m_host; bool m_enabled = true; }; diff --git a/libethnet/PeerHost.cpp b/libethnet/PeerHost.cpp index b5627719f..1206eb0bf 100644 --- a/libethnet/PeerHost.cpp +++ b/libethnet/PeerHost.cpp @@ -62,6 +62,7 @@ PeerHost::PeerHost(std::string const& _clientVersion, unsigned short _port, stri populateAddresses(); determinePublic(_publicAddress, _upnp); ensureAccepting(); + m_lastPeersRequest = chrono::steady_clock::time_point::min(); clog(NetNote) << "Id:" << toHex(m_key.address().ref().cropped(0, 4)); } @@ -78,6 +79,7 @@ PeerHost::PeerHost(std::string const& _clientVersion, string const& _publicAddre populateAddresses(); determinePublic(_publicAddress, _upnp); ensureAccepting(); + m_lastPeersRequest = chrono::steady_clock::time_point::min(); clog(NetNote) << "Id:" << toHex(m_key.address().ref().cropped(0, 4)); } @@ -90,6 +92,7 @@ PeerHost::PeerHost(std::string const& _clientVersion): { // populate addresses. populateAddresses(); + m_lastPeersRequest = chrono::steady_clock::time_point::min(); clog(NetNote) << "Id:" << toHex(m_key.address().ref().cropped(0, 4)); } @@ -103,15 +106,15 @@ unsigned PeerHost::protocolVersion() const return 0; } -void PeerHost::registerPeer(std::shared_ptr _s) +void PeerHost::registerPeer(std::shared_ptr _s, vector const& _caps) { { Guard l(x_peers); m_peers[_s->m_id] = _s; } - for (auto const& i: _s->m_caps) + for (auto const& i: _caps) if (haveCapability(i)) - _s->m_capabilities.push_back(shared_ptr(m_capabilities[i]->newPeerCapability(_s.get()))); + _s->m_capabilities[i] = shared_ptr(m_capabilities[i]->newPeerCapability(_s.get())); } void PeerHost::disconnectPeers() @@ -453,8 +456,8 @@ std::vector PeerHost::peers(bool _updatePing) const void PeerHost::process() { for (auto const& i: m_capabilities) - if (!i->isInitialised()) - return false; + if (!i.second->isInitialised()) + return; growPeers(); prunePeers(); m_ioService.poll(); diff --git a/libethnet/PeerHost.h b/libethnet/PeerHost.h index 00c5beb88..d56b512c7 100644 --- a/libethnet/PeerHost.h +++ b/libethnet/PeerHost.h @@ -48,6 +48,7 @@ class BlockQueue; class PeerHost { friend class PeerSession; + friend class HostCapabilityFace; public: /// Start server, listening for connections on the given port. @@ -67,7 +68,7 @@ public: unsigned protocolVersion() const; /// Register a peer-capability; all new peer connections will have this capability. - template void registerCapability() { m_capabilities[T::name()] = std::shared_ptr(new T(this)); } + template void registerCapability(T* _t) { _t->m_host = this; m_capabilities[T::name()] = std::shared_ptr(_t); } /// Connect to a peer explicitly. void connect(std::string const& _addr, unsigned short _port = 30303) noexcept; @@ -102,7 +103,7 @@ public: /// Deserialise the data and populate the set of known peers. void restorePeers(bytesConstRef _b); - void registerPeer(std::shared_ptr _s); + void registerPeer(std::shared_ptr _s, std::vector const& _caps); bool haveCapability(std::string const& _name) const { return m_capabilities.count(_name); } std::vector caps() const { std::vector ret; for (auto const& i: m_capabilities) ret.push_back(i.first); return ret; } diff --git a/libethnet/PeerSession.cpp b/libethnet/PeerSession.cpp index dfa5d6195..d69100047 100644 --- a/libethnet/PeerSession.cpp +++ b/libethnet/PeerSession.cpp @@ -45,7 +45,7 @@ PeerSession::~PeerSession() { // Read-chain finished for one reason or another. for (auto& i: m_capabilities) - i.reset(); + i.second.reset(); try { @@ -76,11 +76,11 @@ bool PeerSession::interpret(RLP const& _r) { m_protocolVersion = _r[1].toInt(); auto clientVersion = _r[2].toString(); - m_caps = _r[3].toVector(); + auto caps = _r[3].toVector(); m_listenPort = _r[4].toInt(); m_id = _r[5].toHash(); - clogS(NetMessageSummary) << "Hello: " << clientVersion << "V[" << m_protocolVersion << "]" << m_id.abridged() << showbase << hex << m_caps << dec << m_listenPort; + clogS(NetMessageSummary) << "Hello: " << clientVersion << "V[" << m_protocolVersion << "]" << m_id.abridged() << showbase << hex << caps << dec << m_listenPort; if (m_server->havePeer(m_id)) { @@ -89,8 +89,12 @@ bool PeerSession::interpret(RLP const& _r) disconnect(DuplicatePeer); return false; } - - if (m_protocolVersion != m_server->protocolVersion() || !m_id) + if (!m_id) + { + disconnect(InvalidIdentity); + return false; + } + if (m_protocolVersion != m_server->protocolVersion()) { disconnect(IncompatibleProtocol); return false; @@ -103,7 +107,7 @@ bool PeerSession::interpret(RLP const& _r) return false; } - m_server->registerPeer(shared_from_this()); + m_server->registerPeer(shared_from_this(), caps); break; } case DisconnectPacket: @@ -180,7 +184,7 @@ bool PeerSession::interpret(RLP const& _r) break; default: for (auto const& i: m_capabilities) - if (i->m_enabled && i->interpret(_r)) + if (i.second->m_enabled && i.second->interpret(_r)) return true; return false; } diff --git a/libethnet/PeerSession.h b/libethnet/PeerSession.h index 5919bf11d..13ab6bb90 100644 --- a/libethnet/PeerSession.h +++ b/libethnet/PeerSession.h @@ -40,6 +40,7 @@ namespace eth class PeerSession: public std::enable_shared_from_this { friend class PeerHost; + friend class HostCapabilityFace; public: PeerSession(PeerHost* _server, bi::tcp::socket _socket, bi::address _peerAddress, unsigned short _peerPort = 0); @@ -52,13 +53,20 @@ public: bool isOpen() const { return m_socket.is_open(); } + unsigned id() const { return m_socket.native_handle(); } + bi::tcp::endpoint endpoint() const; ///< for other peers to connect to. + template + PeerCap* cap() const { try { return static_cast(m_capabilities.at(PeerCap::name())); } catch (...) { return nullptr; } } + static RLPStream& prep(RLPStream& _s); void sealAndSend(RLPStream& _s); void sendDestroy(bytes& _msg); void send(bytesConstRef _msg); + void addRating(unsigned _r) { m_rating += _r; } + private: void dropped(); void doRead(); @@ -77,7 +85,7 @@ private: std::recursive_mutex m_writeLock; std::deque m_writeQueue; - bi::tcp::socket m_socket; + mutable bi::tcp::socket m_socket; ///< Mutable to ask for native_handle(). std::array m_data; PeerInfo m_info; Public m_id; @@ -85,7 +93,6 @@ private: bytes m_incoming; uint m_protocolVersion; unsigned short m_listenPort; ///< Port that the remote client is listening on for connections. Useful for giving to peers. - std::vector m_caps; std::chrono::steady_clock::time_point m_ping; std::chrono::steady_clock::time_point m_connect; @@ -93,7 +100,7 @@ private: uint m_rating; - std::vector> m_capabilities; + std::map> m_capabilities; bool m_willBeDeleted = false; ///< True if we already posted a deleter on the strand. };