From 72449d349aa85883de61d4f65a11fd6276927d61 Mon Sep 17 00:00:00 2001 From: Gav Wood Date: Tue, 7 Oct 2014 19:49:15 +0200 Subject: [PATCH] PoC-7: Dynamic message IDs. --- libethereum/CommonNet.h | 5 +++-- libethereum/EthereumHost.cpp | 20 +++++++------------- libethereum/EthereumPeer.cpp | 25 ++++++++++++------------- libethereum/EthereumPeer.h | 9 +++++++-- libp2p/Capability.cpp | 14 +++++++++++--- libp2p/Capability.h | 14 ++++++++------ libp2p/Host.cpp | 8 ++++++-- libp2p/HostCapability.h | 8 ++++++-- libp2p/Session.cpp | 27 ++++++++++++++++----------- libp2p/Session.h | 1 + libwhisper/Common.h | 5 +++-- libwhisper/WhisperPeer.cpp | 13 +++++-------- libwhisper/WhisperPeer.h | 5 +++-- 13 files changed, 88 insertions(+), 66 deletions(-) diff --git a/libethereum/CommonNet.h b/libethereum/CommonNet.h index 15b05bff3..7e4821bb4 100644 --- a/libethereum/CommonNet.h +++ b/libethereum/CommonNet.h @@ -50,9 +50,9 @@ class TransactionQueue; class EthereumHost; class EthereumPeer; -enum EthereumPacket +enum { - StatusPacket = 0x10, + StatusPacket = 0, GetTransactionsPacket, TransactionsPacket, GetBlockHashesPacket, @@ -60,6 +60,7 @@ enum EthereumPacket GetBlocksPacket, BlocksPacket, NewBlockPacket, + PacketCount }; enum class Asking diff --git a/libethereum/EthereumHost.cpp b/libethereum/EthereumHost.cpp index c5c3e90cd..bc5f11936 100644 --- a/libethereum/EthereumHost.cpp +++ b/libethereum/EthereumHost.cpp @@ -177,11 +177,8 @@ void EthereumHost::maintainTransactions(h256 _currentHash) if (n || ep->m_requireTransactions) { RLPStream ts; - EthereumPeer::prep(ts); - ts.appendList(n + 1) << TransactionsPacket; - ts.appendRaw(b, n).swapOut(b); - seal(b); - ep->send(&b); + ep->prep(ts, TransactionsPacket, n).appendRaw(b, n); + ep->sealAndSend(ts); } ep->m_requireTransactions = false; } @@ -195,24 +192,21 @@ void EthereumHost::maintainBlocks(h256 _currentHash) // TODO: clean up h256s hs; hs.push_back(_currentHash); - RLPStream ts; - EthereumPeer::prep(ts); bytes bs; for (auto h: hs) bs += m_chain.block(h); clog(NetMessageSummary) << "Sending" << hs.size() << "new blocks (current is" << _currentHash << ", was" << m_latestBlockSent << ")"; - ts.appendList(1 + hs.size()).append(BlocksPacket).appendRaw(bs, hs.size()); - bytes b; - ts.swapOut(b); - seal(b); - for (auto j: peers()) { auto p = j->cap(); + + RLPStream ts; + p->prep(ts, BlocksPacket, hs.size()).appendRaw(bs, hs.size()); + Guard l(p->x_knownBlocks); if (!p->m_knownBlocks.count(_currentHash)) - p->send(&b); + p->sealAndSend(ts); p->m_knownBlocks.clear(); } m_latestBlockSent = _currentHash; diff --git a/libethereum/EthereumPeer.cpp b/libethereum/EthereumPeer.cpp index 45e850de6..875f86f06 100644 --- a/libethereum/EthereumPeer.cpp +++ b/libethereum/EthereumPeer.cpp @@ -36,8 +36,8 @@ using namespace p2p; #define clogS(X) dev::LogOutputStream(false) << "| " << std::setw(2) << session()->socketId() << "] " -EthereumPeer::EthereumPeer(Session* _s, HostCapabilityFace* _h): - Capability(_s, _h), +EthereumPeer::EthereumPeer(Session* _s, HostCapabilityFace* _h, unsigned _i): + Capability(_s, _h, _i), m_sub(host()->m_man) { transition(Asking::State); @@ -80,13 +80,12 @@ void EthereumPeer::transition(Asking _a, bool _force) clogS(NetMessageSummary) << "Transition!" << ::toString(_a) << "from" << ::toString(m_asking) << ", " << (isSyncing() ? "syncing" : "holding") << (needsSyncing() ? "& needed" : ""); RLPStream s; - prep(s); if (_a == Asking::State) { if (m_asking == Asking::Nothing) { setAsking(Asking::State, false); - s.appendList(6) << StatusPacket + prep(s, StatusPacket, 5) << host()->protocolVersion() << host()->networkId() << host()->m_chain.details().totalDifficulty @@ -108,7 +107,7 @@ void EthereumPeer::transition(Asking _a, bool _force) resetNeedsSyncing(); setAsking(_a, true); - s.appendList(3) << GetBlockHashesPacket << m_syncingLatestHash << c_maxHashesAsk; + prep(s, GetBlockHashesPacket, 2) << m_syncingLatestHash << c_maxHashesAsk; m_syncingNeededBlocks = h256s(1, m_syncingLatestHash); sealAndSend(s); return; @@ -119,7 +118,7 @@ void EthereumPeer::transition(Asking _a, bool _force) clogS(NetWarn) << "Bad state: asking for Hashes yet not syncing!"; setAsking(_a, true); - s.appendList(3) << GetBlockHashesPacket << m_syncingNeededBlocks.back() << c_maxHashesAsk; + prep(s, GetBlockHashesPacket, 2) << m_syncingNeededBlocks.back() << c_maxHashesAsk; sealAndSend(s); return; } @@ -154,7 +153,7 @@ void EthereumPeer::transition(Asking _a, bool _force) auto blocks = m_sub.nextFetch(c_maxBlocksAsk); if (blocks.size()) { - s.appendList(blocks.size() + 1) << GetBlocksPacket; + prep(s, GetBlocksPacket, blocks.size()); for (auto const& i: blocks) s << i; sealAndSend(s); @@ -284,9 +283,9 @@ void EthereumPeer::attemptSync() } } -bool EthereumPeer::interpret(RLP const& _r) +bool EthereumPeer::interpret(unsigned _id, RLP const& _r) { - switch (_r[0].toInt()) + switch (_id) { case StatusPacket: { @@ -314,8 +313,7 @@ bool EthereumPeer::interpret(RLP const& _r) { // Grab transactions off them. RLPStream s; - prep(s).appendList(1); - s << GetTransactionsPacket; + prep(s, GetTransactionsPacket); sealAndSend(s); transition(Asking::Nothing); } @@ -350,7 +348,7 @@ bool EthereumPeer::interpret(RLP const& _r) unsigned c = min(host()->m_chain.number(later), limit); RLPStream s; - prep(s).appendList(1 + c).append(BlockHashesPacket); + prep(s, BlockHashesPacket, c); h256 p = host()->m_chain.details(later).parent; for (unsigned i = 0; i < c && p; ++i, p = host()->m_chain.details(p).parent) s << p; @@ -402,7 +400,8 @@ bool EthereumPeer::interpret(RLP const& _r) } } RLPStream s; - sealAndSend(prep(s).appendList(n + 1).append(BlocksPacket).appendRaw(rlp, n)); + prep(s, BlocksPacket, n).appendRaw(rlp, n); + sealAndSend(s); break; } case BlocksPacket: diff --git a/libethereum/EthereumPeer.h b/libethereum/EthereumPeer.h index bad12d3b7..71bfc544f 100644 --- a/libethereum/EthereumPeer.h +++ b/libethereum/EthereumPeer.h @@ -50,7 +50,7 @@ class EthereumPeer: public p2p::Capability public: /// Basic constructor. - EthereumPeer(p2p::Session* _s, p2p::HostCapabilityFace* _h); + EthereumPeer(p2p::Session* _s, p2p::HostCapabilityFace* _h, unsigned _i); /// Basic destructor. virtual ~EthereumPeer(); @@ -61,12 +61,17 @@ public: /// What is our version? static u256 version() { return c_protocolVersion; } + /// How many message types do we have? + static unsigned messageCount() { return PacketCount; } + /// What is the ethereum subprotocol host object. EthereumHost* host() const; private: + using p2p::Capability::sealAndSend; + /// Interpret an incoming message. - virtual bool interpret(RLP const& _r); + virtual bool interpret(unsigned _id, RLP const& _r); /// Transition state in a particular direction. void transition(Asking _wantState, bool _force = false); diff --git a/libp2p/Capability.cpp b/libp2p/Capability.cpp index 5ca674120..3ca91d4ca 100644 --- a/libp2p/Capability.cpp +++ b/libp2p/Capability.cpp @@ -21,20 +21,28 @@ #include "Capability.h" +#include #include "Session.h" using namespace std; using namespace dev; using namespace dev::p2p; +#define clogS(X) dev::LogOutputStream(false) << "| " << std::setw(2) << session()->socketId() << "] " + +Capability::Capability(Session* _s, HostCapabilityFace* _h, unsigned _idOffset): m_session(_s), m_host(_h), m_idOffset(_idOffset) +{ + clogS(NetConnect) << "New session for capability" << m_host->name() << "; idOffset:" << m_idOffset; +} + void Capability::disable(std::string const& _problem) { - clog(NetConnect) << "Disabling capability '" << m_host->name() << "'. Reason:" << _problem; + clogS(NetConnect) << "Disabling capability '" << m_host->name() << "'. Reason:" << _problem; m_enabled = false; } -RLPStream& Capability::prep(RLPStream& _s) +RLPStream& Capability::prep(RLPStream& _s, unsigned _id, unsigned _args) { - return Session::prep(_s); + return Session::prep(_s).appendList(_args + 1).append(_id + m_idOffset); } void Capability::sealAndSend(RLPStream& _s) diff --git a/libp2p/Capability.h b/libp2p/Capability.h index 00cccaeef..bd2ab7a95 100644 --- a/libp2p/Capability.h +++ b/libp2p/Capability.h @@ -34,22 +34,23 @@ class Capability friend class Session; public: - Capability(Session* _s, HostCapabilityFace* _h): m_session(_s), m_host(_h) {} + Capability(Session* _s, HostCapabilityFace* _h, unsigned _idOffset); virtual ~Capability() {} - /// Must return the capability name. - static std::string name() { return ""; } + // Implement these in the derived class. +/* static std::string name() { return ""; } static u256 version() { return 0; } - + static unsigned messageCount() { return 0; } +*/ Session* session() const { return m_session; } HostCapabilityFace* hostCapability() const { return m_host; } protected: - virtual bool interpret(RLP const&) = 0; + virtual bool interpret(unsigned _id, RLP const&) = 0; void disable(std::string const& _problem); - static RLPStream& prep(RLPStream& _s); + RLPStream& prep(RLPStream& _s, unsigned _id, unsigned _args = 0); void sealAndSend(RLPStream& _s); void sendDestroy(bytes& _msg); void send(bytesConstRef _msg); @@ -60,6 +61,7 @@ private: Session* m_session; HostCapabilityFace* m_host; bool m_enabled = true; + unsigned m_idOffset; }; } diff --git a/libp2p/Host.cpp b/libp2p/Host.cpp index d13cbf4f9..baa7ca6e9 100644 --- a/libp2p/Host.cpp +++ b/libp2p/Host.cpp @@ -152,9 +152,13 @@ void Host::registerPeer(std::shared_ptr _s, CapDescs const& _caps) Guard l(x_peers); m_peers[_s->m_id] = _s; } + unsigned o = (unsigned)UserPacket; for (auto const& i: _caps) if (haveCapability(i)) - _s->m_capabilities[i] = shared_ptr(m_capabilities[i]->newPeerCapability(_s.get())); + { + _s->m_capabilities[i] = shared_ptr(m_capabilities[i]->newPeerCapability(_s.get(), o)); + o += m_capabilities[i]->messageCount(); + } } void Host::disconnectPeers() @@ -458,7 +462,7 @@ void Host::growPeers() { RLPStream s; bytes b; - (Session::prep(s).appendList(1) << GetPeersPacket).swapOut(b); + Session::prep(s, GetPeersPacket).swapOut(b); seal(b); for (auto const& i: m_peers) if (auto p = i.second.lock()) diff --git a/libp2p/HostCapability.h b/libp2p/HostCapability.h index f07900034..da454860a 100644 --- a/libp2p/HostCapability.h +++ b/libp2p/HostCapability.h @@ -36,6 +36,7 @@ class HostCapabilityFace friend class Host; template friend class HostCapability; friend class Capability; + friend class Session; public: HostCapabilityFace() {} @@ -49,7 +50,8 @@ protected: virtual std::string name() const = 0; virtual u256 version() const = 0; CapDesc capDesc() const { return std::make_pair(name(), version()); } - virtual Capability* newPeerCapability(Session* _s) = 0; + virtual unsigned messageCount() const = 0; + virtual Capability* newPeerCapability(Session* _s, unsigned _idOffset) = 0; virtual void onStarting() {} virtual void onStopping() {} @@ -69,11 +71,13 @@ public: static std::string staticName() { return PeerCap::name(); } static u256 staticVersion() { return PeerCap::version(); } + static unsigned staticMessageCount() { return PeerCap::messageCount(); } protected: virtual std::string name() const { return PeerCap::name(); } virtual u256 version() const { return PeerCap::version(); } - virtual Capability* newPeerCapability(Session* _s) { return new PeerCap(_s, this); } + virtual unsigned messageCount() const { return PeerCap::messageCount(); } + virtual Capability* newPeerCapability(Session* _s, unsigned _idOffset) { return new PeerCap(_s, this, _idOffset); } }; } diff --git a/libp2p/Session.cpp b/libp2p/Session.cpp index 71780ff6b..0b939882f 100644 --- a/libp2p/Session.cpp +++ b/libp2p/Session.cpp @@ -72,7 +72,7 @@ bi::tcp::endpoint Session::endpoint() const bool Session::interpret(RLP const& _r) { clogS(NetRight) << _r; - switch (_r[0].toInt()) + switch ((PacketType)_r[0].toInt()) { case HelloPacket: { @@ -130,7 +130,7 @@ bool Session::interpret(RLP const& _r) { clogS(NetTriviaSummary) << "Ping"; RLPStream s; - sealAndSend(prep(s).appendList(1) << PongPacket); + sealAndSend(prep(s, PongPacket)); break; } case PongPacket: @@ -142,8 +142,7 @@ bool Session::interpret(RLP const& _r) clogS(NetTriviaSummary) << "GetPeers"; auto peers = m_server->potentialPeers(); RLPStream s; - prep(s).appendList(peers.size() + 1); - s << PeersPacket; + prep(s, PeersPacket, peers.size()); for (auto i: peers) { clogS(NetTriviaDetail) << "Sending peer " << i.first.abridged() << i.second; @@ -185,25 +184,33 @@ bool Session::interpret(RLP const& _r) } break; default: + { + auto id = _r[0].toInt(); for (auto const& i: m_capabilities) - if (i.second->m_enabled && i.second->interpret(_r)) + if (i.second->m_enabled && id >= i.second->m_idOffset && id - i.second->m_idOffset < i.second->hostCapability()->messageCount() && i.second->interpret(id - i.second->m_idOffset, _r)) return true; return false; } + } return true; } void Session::ping() { RLPStream s; - sealAndSend(prep(s).appendList(1) << PingPacket); + sealAndSend(prep(s, PingPacket)); m_ping = std::chrono::steady_clock::now(); } void Session::getPeers() { RLPStream s; - sealAndSend(prep(s).appendList(1) << GetPeersPacket); + sealAndSend(prep(s, GetPeersPacket)); +} + +RLPStream& Session::prep(RLPStream& _s, PacketType _id, unsigned _args) +{ + return prep(_s).appendList(_args + 1).append((unsigned)_id); } RLPStream& Session::prep(RLPStream& _s) @@ -323,8 +330,7 @@ void Session::disconnect(int _reason) if (m_disconnect == chrono::steady_clock::time_point::max()) { RLPStream s; - prep(s); - s.appendList(2) << DisconnectPacket << _reason; + prep(s, DisconnectPacket, 1) << _reason; sealAndSend(s); m_disconnect = chrono::steady_clock::now(); } @@ -336,8 +342,7 @@ void Session::disconnect(int _reason) void Session::start() { RLPStream s; - prep(s); - s.appendList(6) << HelloPacket + prep(s, HelloPacket, 5) << m_server->protocolVersion() << m_server->m_clientVersion << m_server->caps() diff --git a/libp2p/Session.h b/libp2p/Session.h index 9b04154ee..0498ede9b 100644 --- a/libp2p/Session.h +++ b/libp2p/Session.h @@ -64,6 +64,7 @@ public: template std::shared_ptr cap() const { try { return std::static_pointer_cast(m_capabilities.at(std::make_pair(PeerCap::name(), PeerCap::version()))); } catch (...) { return nullptr; } } + static RLPStream& prep(RLPStream& _s, PacketType _t, unsigned _args = 0); static RLPStream& prep(RLPStream& _s); void sealAndSend(RLPStream& _s); void sendDestroy(bytes& _msg); diff --git a/libwhisper/Common.h b/libwhisper/Common.h index ba4285f2b..251a089b7 100644 --- a/libwhisper/Common.h +++ b/libwhisper/Common.h @@ -51,10 +51,11 @@ class Whisper; enum WhisperPacket { - StatusPacket = 0x20, + StatusPacket = 0, MessagesPacket, AddFilterPacket, - RemoveFilterPacket + RemoveFilterPacket, + PacketCount }; } diff --git a/libwhisper/WhisperPeer.cpp b/libwhisper/WhisperPeer.cpp index e92e2cac3..25bd58271 100644 --- a/libwhisper/WhisperPeer.cpp +++ b/libwhisper/WhisperPeer.cpp @@ -30,11 +30,10 @@ using namespace dev::p2p; using namespace dev::shh; #define clogS(X) dev::LogOutputStream(false) << "| " << std::setw(2) << session()->socketId() << "] " -WhisperPeer::WhisperPeer(Session* _s, HostCapabilityFace* _h): Capability(_s, _h) +WhisperPeer::WhisperPeer(Session* _s, HostCapabilityFace* _h, unsigned _i): Capability(_s, _h, _i) { RLPStream s; - prep(s); - sealAndSend(s.appendList(2) << StatusPacket << host()->protocolVersion()); + sealAndSend(prep(s, StatusPacket, 1) << host()->protocolVersion()); } WhisperPeer::~WhisperPeer() @@ -46,9 +45,9 @@ WhisperHost* WhisperPeer::host() const return static_cast(Capability::hostCapability()); } -bool WhisperPeer::interpret(RLP const& _r) +bool WhisperPeer::interpret(unsigned _id, RLP const& _r) { - switch (_r[0].toInt()) + switch (_id) { case StatusPacket: { @@ -95,9 +94,7 @@ void WhisperPeer::sendMessages() if (n) { RLPStream s; - prep(s); - s.appendList(n + 1) << MessagesPacket; - s.appendRaw(amalg.out(), n); + prep(s, MessagesPacket, n).appendRaw(amalg.out(), n); sealAndSend(s); } else diff --git a/libwhisper/WhisperPeer.h b/libwhisper/WhisperPeer.h index ae20cae68..3da246562 100644 --- a/libwhisper/WhisperPeer.h +++ b/libwhisper/WhisperPeer.h @@ -49,16 +49,17 @@ class WhisperPeer: public Capability friend class WhisperHost; public: - WhisperPeer(Session* _s, HostCapabilityFace* _h); + WhisperPeer(Session* _s, HostCapabilityFace* _h, unsigned _i); virtual ~WhisperPeer(); static std::string name() { return "shh"; } static u256 version() { return 1; } + static unsigned messageCount() { return PacketCount; } WhisperHost* host() const; private: - virtual bool interpret(RLP const&); + virtual bool interpret(unsigned _id, RLP const&); void sendMessages();