Browse Source

PoC-7: Dynamic message IDs.

cl-refactor
Gav Wood 11 years ago
parent
commit
72449d349a
  1. 5
      libethereum/CommonNet.h
  2. 20
      libethereum/EthereumHost.cpp
  3. 25
      libethereum/EthereumPeer.cpp
  4. 9
      libethereum/EthereumPeer.h
  5. 14
      libp2p/Capability.cpp
  6. 14
      libp2p/Capability.h
  7. 8
      libp2p/Host.cpp
  8. 8
      libp2p/HostCapability.h
  9. 27
      libp2p/Session.cpp
  10. 1
      libp2p/Session.h
  11. 5
      libwhisper/Common.h
  12. 13
      libwhisper/WhisperPeer.cpp
  13. 5
      libwhisper/WhisperPeer.h

5
libethereum/CommonNet.h

@ -50,9 +50,9 @@ class TransactionQueue;
class EthereumHost; class EthereumHost;
class EthereumPeer; class EthereumPeer;
enum EthereumPacket enum
{ {
StatusPacket = 0x10, StatusPacket = 0,
GetTransactionsPacket, GetTransactionsPacket,
TransactionsPacket, TransactionsPacket,
GetBlockHashesPacket, GetBlockHashesPacket,
@ -60,6 +60,7 @@ enum EthereumPacket
GetBlocksPacket, GetBlocksPacket,
BlocksPacket, BlocksPacket,
NewBlockPacket, NewBlockPacket,
PacketCount
}; };
enum class Asking enum class Asking

20
libethereum/EthereumHost.cpp

@ -177,11 +177,8 @@ void EthereumHost::maintainTransactions(h256 _currentHash)
if (n || ep->m_requireTransactions) if (n || ep->m_requireTransactions)
{ {
RLPStream ts; RLPStream ts;
EthereumPeer::prep(ts); ep->prep(ts, TransactionsPacket, n).appendRaw(b, n);
ts.appendList(n + 1) << TransactionsPacket; ep->sealAndSend(ts);
ts.appendRaw(b, n).swapOut(b);
seal(b);
ep->send(&b);
} }
ep->m_requireTransactions = false; ep->m_requireTransactions = false;
} }
@ -195,24 +192,21 @@ void EthereumHost::maintainBlocks(h256 _currentHash)
// TODO: clean up // TODO: clean up
h256s hs; h256s hs;
hs.push_back(_currentHash); hs.push_back(_currentHash);
RLPStream ts;
EthereumPeer::prep(ts);
bytes bs; bytes bs;
for (auto h: hs) for (auto h: hs)
bs += m_chain.block(h); bs += m_chain.block(h);
clog(NetMessageSummary) << "Sending" << hs.size() << "new blocks (current is" << _currentHash << ", was" << m_latestBlockSent << ")"; clog(NetMessageSummary) << "Sending" << hs.size() << "new blocks (current is" << _currentHash << ", was" << m_latestBlockSent << ")";
ts.appendList(1 + hs.size()).append(BlocksPacket).appendRaw(bs, hs.size());
bytes b;
ts.swapOut(b);
seal(b);
for (auto j: peers()) for (auto j: peers())
{ {
auto p = j->cap<EthereumPeer>(); auto p = j->cap<EthereumPeer>();
RLPStream ts;
p->prep(ts, BlocksPacket, hs.size()).appendRaw(bs, hs.size());
Guard l(p->x_knownBlocks); Guard l(p->x_knownBlocks);
if (!p->m_knownBlocks.count(_currentHash)) if (!p->m_knownBlocks.count(_currentHash))
p->send(&b); p->sealAndSend(ts);
p->m_knownBlocks.clear(); p->m_knownBlocks.clear();
} }
m_latestBlockSent = _currentHash; m_latestBlockSent = _currentHash;

25
libethereum/EthereumPeer.cpp

@ -36,8 +36,8 @@ using namespace p2p;
#define clogS(X) dev::LogOutputStream<X, true>(false) << "| " << std::setw(2) << session()->socketId() << "] " #define clogS(X) dev::LogOutputStream<X, true>(false) << "| " << std::setw(2) << session()->socketId() << "] "
EthereumPeer::EthereumPeer(Session* _s, HostCapabilityFace* _h): EthereumPeer::EthereumPeer(Session* _s, HostCapabilityFace* _h, unsigned _i):
Capability(_s, _h), Capability(_s, _h, _i),
m_sub(host()->m_man) m_sub(host()->m_man)
{ {
transition(Asking::State); transition(Asking::State);
@ -80,13 +80,12 @@ void EthereumPeer::transition(Asking _a, bool _force)
clogS(NetMessageSummary) << "Transition!" << ::toString(_a) << "from" << ::toString(m_asking) << ", " << (isSyncing() ? "syncing" : "holding") << (needsSyncing() ? "& needed" : ""); clogS(NetMessageSummary) << "Transition!" << ::toString(_a) << "from" << ::toString(m_asking) << ", " << (isSyncing() ? "syncing" : "holding") << (needsSyncing() ? "& needed" : "");
RLPStream s; RLPStream s;
prep(s);
if (_a == Asking::State) if (_a == Asking::State)
{ {
if (m_asking == Asking::Nothing) if (m_asking == Asking::Nothing)
{ {
setAsking(Asking::State, false); setAsking(Asking::State, false);
s.appendList(6) << StatusPacket prep(s, StatusPacket, 5)
<< host()->protocolVersion() << host()->protocolVersion()
<< host()->networkId() << host()->networkId()
<< host()->m_chain.details().totalDifficulty << host()->m_chain.details().totalDifficulty
@ -108,7 +107,7 @@ void EthereumPeer::transition(Asking _a, bool _force)
resetNeedsSyncing(); resetNeedsSyncing();
setAsking(_a, true); setAsking(_a, true);
s.appendList(3) << GetBlockHashesPacket << m_syncingLatestHash << c_maxHashesAsk; prep(s, GetBlockHashesPacket, 2) << m_syncingLatestHash << c_maxHashesAsk;
m_syncingNeededBlocks = h256s(1, m_syncingLatestHash); m_syncingNeededBlocks = h256s(1, m_syncingLatestHash);
sealAndSend(s); sealAndSend(s);
return; return;
@ -119,7 +118,7 @@ void EthereumPeer::transition(Asking _a, bool _force)
clogS(NetWarn) << "Bad state: asking for Hashes yet not syncing!"; clogS(NetWarn) << "Bad state: asking for Hashes yet not syncing!";
setAsking(_a, true); setAsking(_a, true);
s.appendList(3) << GetBlockHashesPacket << m_syncingNeededBlocks.back() << c_maxHashesAsk; prep(s, GetBlockHashesPacket, 2) << m_syncingNeededBlocks.back() << c_maxHashesAsk;
sealAndSend(s); sealAndSend(s);
return; return;
} }
@ -154,7 +153,7 @@ void EthereumPeer::transition(Asking _a, bool _force)
auto blocks = m_sub.nextFetch(c_maxBlocksAsk); auto blocks = m_sub.nextFetch(c_maxBlocksAsk);
if (blocks.size()) if (blocks.size())
{ {
s.appendList(blocks.size() + 1) << GetBlocksPacket; prep(s, GetBlocksPacket, blocks.size());
for (auto const& i: blocks) for (auto const& i: blocks)
s << i; s << i;
sealAndSend(s); sealAndSend(s);
@ -284,9 +283,9 @@ void EthereumPeer::attemptSync()
} }
} }
bool EthereumPeer::interpret(RLP const& _r) bool EthereumPeer::interpret(unsigned _id, RLP const& _r)
{ {
switch (_r[0].toInt<unsigned>()) switch (_id)
{ {
case StatusPacket: case StatusPacket:
{ {
@ -314,8 +313,7 @@ bool EthereumPeer::interpret(RLP const& _r)
{ {
// Grab transactions off them. // Grab transactions off them.
RLPStream s; RLPStream s;
prep(s).appendList(1); prep(s, GetTransactionsPacket);
s << GetTransactionsPacket;
sealAndSend(s); sealAndSend(s);
transition(Asking::Nothing); transition(Asking::Nothing);
} }
@ -350,7 +348,7 @@ bool EthereumPeer::interpret(RLP const& _r)
unsigned c = min<unsigned>(host()->m_chain.number(later), limit); unsigned c = min<unsigned>(host()->m_chain.number(later), limit);
RLPStream s; RLPStream s;
prep(s).appendList(1 + c).append(BlockHashesPacket); prep(s, BlockHashesPacket, c);
h256 p = host()->m_chain.details(later).parent; h256 p = host()->m_chain.details(later).parent;
for (unsigned i = 0; i < c && p; ++i, p = host()->m_chain.details(p).parent) for (unsigned i = 0; i < c && p; ++i, p = host()->m_chain.details(p).parent)
s << p; s << p;
@ -402,7 +400,8 @@ bool EthereumPeer::interpret(RLP const& _r)
} }
} }
RLPStream s; RLPStream s;
sealAndSend(prep(s).appendList(n + 1).append(BlocksPacket).appendRaw(rlp, n)); prep(s, BlocksPacket, n).appendRaw(rlp, n);
sealAndSend(s);
break; break;
} }
case BlocksPacket: case BlocksPacket:

9
libethereum/EthereumPeer.h

@ -50,7 +50,7 @@ class EthereumPeer: public p2p::Capability
public: public:
/// Basic constructor. /// Basic constructor.
EthereumPeer(p2p::Session* _s, p2p::HostCapabilityFace* _h); EthereumPeer(p2p::Session* _s, p2p::HostCapabilityFace* _h, unsigned _i);
/// Basic destructor. /// Basic destructor.
virtual ~EthereumPeer(); virtual ~EthereumPeer();
@ -61,12 +61,17 @@ public:
/// What is our version? /// What is our version?
static u256 version() { return c_protocolVersion; } static u256 version() { return c_protocolVersion; }
/// How many message types do we have?
static unsigned messageCount() { return PacketCount; }
/// What is the ethereum subprotocol host object. /// What is the ethereum subprotocol host object.
EthereumHost* host() const; EthereumHost* host() const;
private: private:
using p2p::Capability::sealAndSend;
/// Interpret an incoming message. /// Interpret an incoming message.
virtual bool interpret(RLP const& _r); virtual bool interpret(unsigned _id, RLP const& _r);
/// Transition state in a particular direction. /// Transition state in a particular direction.
void transition(Asking _wantState, bool _force = false); void transition(Asking _wantState, bool _force = false);

14
libp2p/Capability.cpp

@ -21,20 +21,28 @@
#include "Capability.h" #include "Capability.h"
#include <libdevcore/Log.h>
#include "Session.h" #include "Session.h"
using namespace std; using namespace std;
using namespace dev; using namespace dev;
using namespace dev::p2p; using namespace dev::p2p;
#define clogS(X) dev::LogOutputStream<X, true>(false) << "| " << std::setw(2) << session()->socketId() << "] "
Capability::Capability(Session* _s, HostCapabilityFace* _h, unsigned _idOffset): m_session(_s), m_host(_h), m_idOffset(_idOffset)
{
clogS(NetConnect) << "New session for capability" << m_host->name() << "; idOffset:" << m_idOffset;
}
void Capability::disable(std::string const& _problem) void Capability::disable(std::string const& _problem)
{ {
clog(NetConnect) << "Disabling capability '" << m_host->name() << "'. Reason:" << _problem; clogS(NetConnect) << "Disabling capability '" << m_host->name() << "'. Reason:" << _problem;
m_enabled = false; m_enabled = false;
} }
RLPStream& Capability::prep(RLPStream& _s) RLPStream& Capability::prep(RLPStream& _s, unsigned _id, unsigned _args)
{ {
return Session::prep(_s); return Session::prep(_s).appendList(_args + 1).append(_id + m_idOffset);
} }
void Capability::sealAndSend(RLPStream& _s) void Capability::sealAndSend(RLPStream& _s)

14
libp2p/Capability.h

@ -34,22 +34,23 @@ class Capability
friend class Session; friend class Session;
public: public:
Capability(Session* _s, HostCapabilityFace* _h): m_session(_s), m_host(_h) {} Capability(Session* _s, HostCapabilityFace* _h, unsigned _idOffset);
virtual ~Capability() {} virtual ~Capability() {}
/// Must return the capability name. // Implement these in the derived class.
static std::string name() { return ""; } /* static std::string name() { return ""; }
static u256 version() { return 0; } static u256 version() { return 0; }
static unsigned messageCount() { return 0; }
*/
Session* session() const { return m_session; } Session* session() const { return m_session; }
HostCapabilityFace* hostCapability() const { return m_host; } HostCapabilityFace* hostCapability() const { return m_host; }
protected: protected:
virtual bool interpret(RLP const&) = 0; virtual bool interpret(unsigned _id, RLP const&) = 0;
void disable(std::string const& _problem); void disable(std::string const& _problem);
static RLPStream& prep(RLPStream& _s); RLPStream& prep(RLPStream& _s, unsigned _id, unsigned _args = 0);
void sealAndSend(RLPStream& _s); void sealAndSend(RLPStream& _s);
void sendDestroy(bytes& _msg); void sendDestroy(bytes& _msg);
void send(bytesConstRef _msg); void send(bytesConstRef _msg);
@ -60,6 +61,7 @@ private:
Session* m_session; Session* m_session;
HostCapabilityFace* m_host; HostCapabilityFace* m_host;
bool m_enabled = true; bool m_enabled = true;
unsigned m_idOffset;
}; };
} }

8
libp2p/Host.cpp

@ -152,9 +152,13 @@ void Host::registerPeer(std::shared_ptr<Session> _s, CapDescs const& _caps)
Guard l(x_peers); Guard l(x_peers);
m_peers[_s->m_id] = _s; m_peers[_s->m_id] = _s;
} }
unsigned o = (unsigned)UserPacket;
for (auto const& i: _caps) for (auto const& i: _caps)
if (haveCapability(i)) if (haveCapability(i))
_s->m_capabilities[i] = shared_ptr<Capability>(m_capabilities[i]->newPeerCapability(_s.get())); {
_s->m_capabilities[i] = shared_ptr<Capability>(m_capabilities[i]->newPeerCapability(_s.get(), o));
o += m_capabilities[i]->messageCount();
}
} }
void Host::disconnectPeers() void Host::disconnectPeers()
@ -458,7 +462,7 @@ void Host::growPeers()
{ {
RLPStream s; RLPStream s;
bytes b; bytes b;
(Session::prep(s).appendList(1) << GetPeersPacket).swapOut(b); Session::prep(s, GetPeersPacket).swapOut(b);
seal(b); seal(b);
for (auto const& i: m_peers) for (auto const& i: m_peers)
if (auto p = i.second.lock()) if (auto p = i.second.lock())

8
libp2p/HostCapability.h

@ -36,6 +36,7 @@ class HostCapabilityFace
friend class Host; friend class Host;
template <class T> friend class HostCapability; template <class T> friend class HostCapability;
friend class Capability; friend class Capability;
friend class Session;
public: public:
HostCapabilityFace() {} HostCapabilityFace() {}
@ -49,7 +50,8 @@ protected:
virtual std::string name() const = 0; virtual std::string name() const = 0;
virtual u256 version() const = 0; virtual u256 version() const = 0;
CapDesc capDesc() const { return std::make_pair(name(), version()); } CapDesc capDesc() const { return std::make_pair(name(), version()); }
virtual Capability* newPeerCapability(Session* _s) = 0; virtual unsigned messageCount() const = 0;
virtual Capability* newPeerCapability(Session* _s, unsigned _idOffset) = 0;
virtual void onStarting() {} virtual void onStarting() {}
virtual void onStopping() {} virtual void onStopping() {}
@ -69,11 +71,13 @@ public:
static std::string staticName() { return PeerCap::name(); } static std::string staticName() { return PeerCap::name(); }
static u256 staticVersion() { return PeerCap::version(); } static u256 staticVersion() { return PeerCap::version(); }
static unsigned staticMessageCount() { return PeerCap::messageCount(); }
protected: protected:
virtual std::string name() const { return PeerCap::name(); } virtual std::string name() const { return PeerCap::name(); }
virtual u256 version() const { return PeerCap::version(); } virtual u256 version() const { return PeerCap::version(); }
virtual Capability* newPeerCapability(Session* _s) { return new PeerCap(_s, this); } virtual unsigned messageCount() const { return PeerCap::messageCount(); }
virtual Capability* newPeerCapability(Session* _s, unsigned _idOffset) { return new PeerCap(_s, this, _idOffset); }
}; };
} }

27
libp2p/Session.cpp

@ -72,7 +72,7 @@ bi::tcp::endpoint Session::endpoint() const
bool Session::interpret(RLP const& _r) bool Session::interpret(RLP const& _r)
{ {
clogS(NetRight) << _r; clogS(NetRight) << _r;
switch (_r[0].toInt<unsigned>()) switch ((PacketType)_r[0].toInt<unsigned>())
{ {
case HelloPacket: case HelloPacket:
{ {
@ -130,7 +130,7 @@ bool Session::interpret(RLP const& _r)
{ {
clogS(NetTriviaSummary) << "Ping"; clogS(NetTriviaSummary) << "Ping";
RLPStream s; RLPStream s;
sealAndSend(prep(s).appendList(1) << PongPacket); sealAndSend(prep(s, PongPacket));
break; break;
} }
case PongPacket: case PongPacket:
@ -142,8 +142,7 @@ bool Session::interpret(RLP const& _r)
clogS(NetTriviaSummary) << "GetPeers"; clogS(NetTriviaSummary) << "GetPeers";
auto peers = m_server->potentialPeers(); auto peers = m_server->potentialPeers();
RLPStream s; RLPStream s;
prep(s).appendList(peers.size() + 1); prep(s, PeersPacket, peers.size());
s << PeersPacket;
for (auto i: peers) for (auto i: peers)
{ {
clogS(NetTriviaDetail) << "Sending peer " << i.first.abridged() << i.second; clogS(NetTriviaDetail) << "Sending peer " << i.first.abridged() << i.second;
@ -185,25 +184,33 @@ bool Session::interpret(RLP const& _r)
} }
break; break;
default: default:
{
auto id = _r[0].toInt<unsigned>();
for (auto const& i: m_capabilities) for (auto const& i: m_capabilities)
if (i.second->m_enabled && i.second->interpret(_r)) if (i.second->m_enabled && id >= i.second->m_idOffset && id - i.second->m_idOffset < i.second->hostCapability()->messageCount() && i.second->interpret(id - i.second->m_idOffset, _r))
return true; return true;
return false; return false;
} }
}
return true; return true;
} }
void Session::ping() void Session::ping()
{ {
RLPStream s; RLPStream s;
sealAndSend(prep(s).appendList(1) << PingPacket); sealAndSend(prep(s, PingPacket));
m_ping = std::chrono::steady_clock::now(); m_ping = std::chrono::steady_clock::now();
} }
void Session::getPeers() void Session::getPeers()
{ {
RLPStream s; RLPStream s;
sealAndSend(prep(s).appendList(1) << GetPeersPacket); sealAndSend(prep(s, GetPeersPacket));
}
RLPStream& Session::prep(RLPStream& _s, PacketType _id, unsigned _args)
{
return prep(_s).appendList(_args + 1).append((unsigned)_id);
} }
RLPStream& Session::prep(RLPStream& _s) RLPStream& Session::prep(RLPStream& _s)
@ -323,8 +330,7 @@ void Session::disconnect(int _reason)
if (m_disconnect == chrono::steady_clock::time_point::max()) if (m_disconnect == chrono::steady_clock::time_point::max())
{ {
RLPStream s; RLPStream s;
prep(s); prep(s, DisconnectPacket, 1) << _reason;
s.appendList(2) << DisconnectPacket << _reason;
sealAndSend(s); sealAndSend(s);
m_disconnect = chrono::steady_clock::now(); m_disconnect = chrono::steady_clock::now();
} }
@ -336,8 +342,7 @@ void Session::disconnect(int _reason)
void Session::start() void Session::start()
{ {
RLPStream s; RLPStream s;
prep(s); prep(s, HelloPacket, 5)
s.appendList(6) << HelloPacket
<< m_server->protocolVersion() << m_server->protocolVersion()
<< m_server->m_clientVersion << m_server->m_clientVersion
<< m_server->caps() << m_server->caps()

1
libp2p/Session.h

@ -64,6 +64,7 @@ public:
template <class PeerCap> template <class PeerCap>
std::shared_ptr<PeerCap> cap() const { try { return std::static_pointer_cast<PeerCap>(m_capabilities.at(std::make_pair(PeerCap::name(), PeerCap::version()))); } catch (...) { return nullptr; } } std::shared_ptr<PeerCap> cap() const { try { return std::static_pointer_cast<PeerCap>(m_capabilities.at(std::make_pair(PeerCap::name(), PeerCap::version()))); } catch (...) { return nullptr; } }
static RLPStream& prep(RLPStream& _s, PacketType _t, unsigned _args = 0);
static RLPStream& prep(RLPStream& _s); static RLPStream& prep(RLPStream& _s);
void sealAndSend(RLPStream& _s); void sealAndSend(RLPStream& _s);
void sendDestroy(bytes& _msg); void sendDestroy(bytes& _msg);

5
libwhisper/Common.h

@ -51,10 +51,11 @@ class Whisper;
enum WhisperPacket enum WhisperPacket
{ {
StatusPacket = 0x20, StatusPacket = 0,
MessagesPacket, MessagesPacket,
AddFilterPacket, AddFilterPacket,
RemoveFilterPacket RemoveFilterPacket,
PacketCount
}; };
} }

13
libwhisper/WhisperPeer.cpp

@ -30,11 +30,10 @@ using namespace dev::p2p;
using namespace dev::shh; using namespace dev::shh;
#define clogS(X) dev::LogOutputStream<X, true>(false) << "| " << std::setw(2) << session()->socketId() << "] " #define clogS(X) dev::LogOutputStream<X, true>(false) << "| " << std::setw(2) << session()->socketId() << "] "
WhisperPeer::WhisperPeer(Session* _s, HostCapabilityFace* _h): Capability(_s, _h) WhisperPeer::WhisperPeer(Session* _s, HostCapabilityFace* _h, unsigned _i): Capability(_s, _h, _i)
{ {
RLPStream s; RLPStream s;
prep(s); sealAndSend(prep(s, StatusPacket, 1) << host()->protocolVersion());
sealAndSend(s.appendList(2) << StatusPacket << host()->protocolVersion());
} }
WhisperPeer::~WhisperPeer() WhisperPeer::~WhisperPeer()
@ -46,9 +45,9 @@ WhisperHost* WhisperPeer::host() const
return static_cast<WhisperHost*>(Capability::hostCapability()); return static_cast<WhisperHost*>(Capability::hostCapability());
} }
bool WhisperPeer::interpret(RLP const& _r) bool WhisperPeer::interpret(unsigned _id, RLP const& _r)
{ {
switch (_r[0].toInt<unsigned>()) switch (_id)
{ {
case StatusPacket: case StatusPacket:
{ {
@ -95,9 +94,7 @@ void WhisperPeer::sendMessages()
if (n) if (n)
{ {
RLPStream s; RLPStream s;
prep(s); prep(s, MessagesPacket, n).appendRaw(amalg.out(), n);
s.appendList(n + 1) << MessagesPacket;
s.appendRaw(amalg.out(), n);
sealAndSend(s); sealAndSend(s);
} }
else else

5
libwhisper/WhisperPeer.h

@ -49,16 +49,17 @@ class WhisperPeer: public Capability
friend class WhisperHost; friend class WhisperHost;
public: public:
WhisperPeer(Session* _s, HostCapabilityFace* _h); WhisperPeer(Session* _s, HostCapabilityFace* _h, unsigned _i);
virtual ~WhisperPeer(); virtual ~WhisperPeer();
static std::string name() { return "shh"; } static std::string name() { return "shh"; }
static u256 version() { return 1; } static u256 version() { return 1; }
static unsigned messageCount() { return PacketCount; }
WhisperHost* host() const; WhisperHost* host() const;
private: private:
virtual bool interpret(RLP const&); virtual bool interpret(unsigned _id, RLP const&);
void sendMessages(); void sendMessages();

Loading…
Cancel
Save