Browse Source

Pass 1 integrating node table. TBD: whether to store/relay cap info.

cl-refactor
subtly 10 years ago
parent
commit
5436f90f04
  1. 202
      libp2p/Host.cpp
  2. 56
      libp2p/Host.h
  3. 19
      libp2p/NodeTable.cpp
  4. 88
      libp2p/NodeTable.h
  5. 2
      libp2p/Session.cpp
  6. 6
      libp2p/Session.h
  7. 2
      libp2p/UDP.h

202
libp2p/Host.cpp

@ -44,7 +44,8 @@ Host::Host(std::string const& _clientVersion, NetworkPreferences const& _n, bool
m_ifAddresses(Network::getInterfaceAddresses()), m_ifAddresses(Network::getInterfaceAddresses()),
m_ioService(2), m_ioService(2),
m_tcp4Acceptor(m_ioService), m_tcp4Acceptor(m_ioService),
m_key(KeyPair::create()) m_key(KeyPair::create()),
m_nodeTable(new NodeTable(m_ioService, m_key))
{ {
for (auto address: m_ifAddresses) for (auto address: m_ifAddresses)
if (address.is_v4()) if (address.is_v4())
@ -143,11 +144,12 @@ void Host::doneWorking()
unsigned Host::protocolVersion() const unsigned Host::protocolVersion() const
{ {
return 2; return 3;
} }
void Host::registerPeer(std::shared_ptr<Session> _s, CapDescs const& _caps) void Host::registerPeer(std::shared_ptr<Session> _s, CapDescs const& _caps)
{ {
#warning integration: todo rework so this is an exception
if (!_s->m_node || !_s->m_node->id) if (!_s->m_node || !_s->m_node->id)
{ {
cwarn << "Attempting to register a peer without node information!"; cwarn << "Attempting to register a peer without node information!";
@ -180,7 +182,8 @@ void Host::seal(bytes& _b)
_b[7] = len & 0xff; _b[7] = len & 0xff;
} }
shared_ptr<Node> Host::noteNode(NodeId _id, bi::tcp::endpoint _a, Origin _o, bool _ready, NodeId _oldId) #warning integration: todo remove origin, ready, oldid. port to NodeTable. see Session.cpp#244,363
shared_ptr<NodeInfo> Host::noteNode(NodeId _id, bi::tcp::endpoint _a, Origin _o, bool _ready, NodeId _oldId)
{ {
RecursiveGuard l(x_peers); RecursiveGuard l(x_peers);
if (_a.port() < 30300 || _a.port() > 30305) if (_a.port() < 30300 || _a.port() > 30305)
@ -192,11 +195,11 @@ shared_ptr<Node> Host::noteNode(NodeId _id, bi::tcp::endpoint _a, Origin _o, boo
_a = bi::tcp::endpoint(_a.address(), 0); _a = bi::tcp::endpoint(_a.address(), 0);
} }
// cnote << "Node:" << _id.abridged() << _a << (_ready ? "ready" : "used") << _oldId.abridged() << (m_nodes.count(_id) ? "[have]" : "[NEW]"); // cnote << "NodeInfo:" << _id.abridged() << _a << (_ready ? "ready" : "used") << _oldId.abridged() << (m_nodes.count(_id) ? "[have]" : "[NEW]");
// First check for another node with the same connection credentials, and put it in oldId if found. // First check for another node with the same connection credentials, and put it in oldId if found.
if (!_oldId) if (!_oldId)
for (pair<h512, shared_ptr<Node>> const& n: m_nodes) for (pair<h512, shared_ptr<NodeInfo>> const& n: m_nodes)
if (n.second->address == _a && n.second->id != _id) if (n.second->address == _a && n.second->id != _id)
{ {
_oldId = n.second->id; _oldId = n.second->id;
@ -217,7 +220,7 @@ shared_ptr<Node> Host::noteNode(NodeId _id, bi::tcp::endpoint _a, Origin _o, boo
i = m_nodesList.size(); i = m_nodesList.size();
m_nodesList.push_back(_id); m_nodesList.push_back(_id);
} }
m_nodes[_id] = make_shared<Node>(); m_nodes[_id] = make_shared<NodeInfo>();
m_nodes[_id]->id = _id; m_nodes[_id]->id = _id;
m_nodes[_id]->index = i; m_nodes[_id]->index = i;
m_nodes[_id]->idOrigin = _o; m_nodes[_id]->idOrigin = _o;
@ -246,6 +249,7 @@ shared_ptr<Node> Host::noteNode(NodeId _id, bi::tcp::endpoint _a, Origin _o, boo
return m_nodes[_id]; return m_nodes[_id];
} }
#warning integration: TBD caps in NodeTable/NodeEntry
Nodes Host::potentialPeers(RangeMask<unsigned> const& _known) Nodes Host::potentialPeers(RangeMask<unsigned> const& _known)
{ {
RecursiveGuard l(x_peers); RecursiveGuard l(x_peers);
@ -376,6 +380,7 @@ string Host::pocHost()
return "poc-" + strs[1] + ".ethdev.com"; return "poc-" + strs[1] + ".ethdev.com";
} }
#warning integration: todo remove all connect() w/ addNode makeRequired (this requires pubkey)
void Host::connect(std::string const& _addr, unsigned short _port) noexcept void Host::connect(std::string const& _addr, unsigned short _port) noexcept
{ {
if (!m_run) if (!m_run)
@ -428,13 +433,13 @@ void Host::connect(bi::tcp::endpoint const& _ep)
}); });
} }
void Host::connect(std::shared_ptr<Node> const& _n) void Host::connect(std::shared_ptr<NodeInfo> const& _n)
{ {
if (!m_run) if (!m_run)
return; return;
// prevent concurrently connecting to a node; todo: better abstraction // prevent concurrently connecting to a node; todo: better abstraction
Node *nptr = _n.get(); NodeInfo *nptr = _n.get();
{ {
Guard l(x_pendingNodeConns); Guard l(x_pendingNodeConns);
if (m_pendingNodeConns.count(nptr)) if (m_pendingNodeConns.count(nptr))
@ -488,7 +493,7 @@ bool Host::havePeer(NodeId _id) const
return !!m_peers.count(_id); return !!m_peers.count(_id);
} }
unsigned Node::fallbackSeconds() const unsigned NodeInfo::fallbackSeconds() const
{ {
switch (lastDisconnect) switch (lastDisconnect)
{ {
@ -510,85 +515,83 @@ unsigned Node::fallbackSeconds() const
} }
} }
bool Node::shouldReconnect() const #warning integration: ---- grow/prunePeers
{ #warning integration: todo grow/prune into 'maintainPeers' & evaluate reputation instead of availability. schedule via deadline timer.
return chrono::system_clock::now() > lastAttempted + chrono::seconds(fallbackSeconds()); //void Host::growPeers()
} //{
// RecursiveGuard l(x_peers);
void Host::growPeers() // int morePeers = (int)m_idealPeerCount - m_peers.size();
{ // if (morePeers > 0)
RecursiveGuard l(x_peers); // {
int morePeers = (int)m_idealPeerCount - m_peers.size(); // auto toTry = m_ready;
if (morePeers > 0) // if (!m_netPrefs.localNetworking)
{ // toTry -= m_private;
auto toTry = m_ready; // set<NodeInfo> ns;
if (!m_netPrefs.localNetworking) // for (auto i: toTry)
toTry -= m_private; // if (m_nodes[m_nodesList[i]]->shouldReconnect())
set<Node> ns; // ns.insert(*m_nodes[m_nodesList[i]]);
for (auto i: toTry) //
if (m_nodes[m_nodesList[i]]->shouldReconnect()) // if (ns.size())
ns.insert(*m_nodes[m_nodesList[i]]); // for (NodeInfo const& i: ns)
// {
if (ns.size()) // connect(m_nodes[i.id]);
for (Node const& i: ns) // if (!--morePeers)
{ // return;
connect(m_nodes[i.id]); // }
if (!--morePeers) // else
return; // for (auto const& i: m_peers)
} // if (auto p = i.second.lock())
else // p->ensureNodesRequested();
for (auto const& i: m_peers) // }
if (auto p = i.second.lock()) //}
p->ensureNodesRequested(); //
} //void Host::prunePeers()
} //{
// RecursiveGuard l(x_peers);
void Host::prunePeers() // // We'll keep at most twice as many as is ideal, halfing what counts as "too young to kill" until we get there.
{ // set<NodeId> dc;
RecursiveGuard l(x_peers); // for (unsigned old = 15000; m_peers.size() - dc.size() > m_idealPeerCount * 2 && old > 100; old /= 2)
// We'll keep at most twice as many as is ideal, halfing what counts as "too young to kill" until we get there. // if (m_peers.size() - dc.size() > m_idealPeerCount)
set<NodeId> dc; // {
for (unsigned old = 15000; m_peers.size() - dc.size() > m_idealPeerCount * 2 && old > 100; old /= 2) // // look for worst peer to kick off
if (m_peers.size() - dc.size() > m_idealPeerCount) // // first work out how many are old enough to kick off.
{ // shared_ptr<Session> worst;
// look for worst peer to kick off // unsigned agedPeers = 0;
// first work out how many are old enough to kick off. // for (auto i: m_peers)
shared_ptr<Session> worst; // if (!dc.count(i.first))
unsigned agedPeers = 0; // if (auto p = i.second.lock())
for (auto i: m_peers) // if (/*(m_mode != NodeMode::Host || p->m_caps != 0x01) &&*/ chrono::steady_clock::now() > p->m_connect + chrono::milliseconds(old)) // don't throw off new peers; peer-servers should never kick off other peer-servers.
if (!dc.count(i.first)) // {
if (auto p = i.second.lock()) // ++agedPeers;
if (/*(m_mode != NodeMode::Host || p->m_caps != 0x01) &&*/ chrono::steady_clock::now() > p->m_connect + chrono::milliseconds(old)) // don't throw off new peers; peer-servers should never kick off other peer-servers. // if ((!worst || p->rating() < worst->rating() || (p->rating() == worst->rating() && p->m_connect > worst->m_connect))) // kill older ones
{ // worst = p;
++agedPeers; // }
if ((!worst || p->rating() < worst->rating() || (p->rating() == worst->rating() && p->m_connect > worst->m_connect))) // kill older ones // if (!worst || agedPeers <= m_idealPeerCount)
worst = p; // break;
} // dc.insert(worst->id());
if (!worst || agedPeers <= m_idealPeerCount) // worst->disconnect(TooManyPeers);
break; // }
dc.insert(worst->id()); //
worst->disconnect(TooManyPeers); // // Remove dead peers from list.
} // for (auto i = m_peers.begin(); i != m_peers.end();)
// if (i->second.lock().get())
// Remove dead peers from list. // ++i;
for (auto i = m_peers.begin(); i != m_peers.end();) // else
if (i->second.lock().get()) // i = m_peers.erase(i);
++i; //}
else
i = m_peers.erase(i); PeerInfos Host::peers() const
}
PeerInfos Host::peers(bool _updatePing) const
{ {
if (!m_run) if (!m_run)
return PeerInfos(); return PeerInfos();
#warning integration: ---- pingAll. It is called every 30secs via deadline timer.
RecursiveGuard l(x_peers); RecursiveGuard l(x_peers);
if (_updatePing) // if (_updatePing)
{ // {
const_cast<Host*>(this)->pingAll(); // const_cast<Host*>(this)->pingAll();
this_thread::sleep_for(chrono::milliseconds(200)); // this_thread::sleep_for(chrono::milliseconds(200));
} // }
std::vector<PeerInfo> ret; std::vector<PeerInfo> ret;
for (auto& i: m_peers) for (auto& i: m_peers)
if (auto j = i.second.lock()) if (auto j = i.second.lock())
@ -610,13 +613,14 @@ void Host::run(boost::system::error_code const&)
return; return;
} }
m_lastTick += c_timerInterval; #warning integration: ----
if (m_lastTick >= c_timerInterval * 10) // m_lastTick += c_timerInterval;
{ // if (m_lastTick >= c_timerInterval * 10)
growPeers(); // {
prunePeers(); // growPeers();
m_lastTick = 0; // prunePeers();
} // m_lastTick = 0;
// }
if (m_hadNewNodes) if (m_hadNewNodes)
{ {
@ -670,12 +674,19 @@ void Host::startedWorking()
if (m_listenPort > 0) if (m_listenPort > 0)
runAcceptor(); runAcceptor();
#warning integration: ++++
if (!m_tcpPublic.address().is_unspecified())
m_nodeTable.reset(new NodeTable(m_ioService, m_key, m_listenPort, m_tcpPublic));
else
m_nodeTable.reset(new NodeTable(m_ioService, m_key, m_listenPort > 0 ? m_listenPort : 30303));
} }
// if m_public address is valid then add us to node list #warning integration: ----
// todo: abstract empty() and emplace logic // // if m_public address is valid then add us to node list
if (!m_tcpPublic.address().is_unspecified() && (m_nodes.empty() || m_nodes[m_nodesList[0]]->id != id())) // // todo: abstract empty() and emplace logic
noteNode(id(), m_tcpPublic, Origin::Perfect, false); // if (!m_tcpPublic.address().is_unspecified() && (m_nodes.empty() || m_nodes[m_nodesList[0]]->id != id()))
// noteNode(id(), m_tcpPublic, Origin::Perfect, false);
clog(NetNote) << "Id:" << id().abridged(); clog(NetNote) << "Id:" << id().abridged();
@ -697,6 +708,7 @@ void Host::pingAll()
m_lastPing = chrono::steady_clock::now(); m_lastPing = chrono::steady_clock::now();
} }
#warning integration: todo save/restoreNodes
bytes Host::saveNodes() const bytes Host::saveNodes() const
{ {
RLPStream nodes; RLPStream nodes;
@ -705,7 +717,7 @@ bytes Host::saveNodes() const
RecursiveGuard l(x_peers); RecursiveGuard l(x_peers);
for (auto const& i: m_nodes) for (auto const& i: m_nodes)
{ {
Node const& n = *(i.second); NodeInfo const& n = *(i.second);
// TODO: PoC-7: Figure out why it ever shares these ports.//n.address.port() >= 30300 && n.address.port() <= 30305 && // TODO: PoC-7: Figure out why it ever shares these ports.//n.address.port() >= 30300 && n.address.port() <= 30305 &&
if (!n.dead && chrono::system_clock::now() - n.lastConnected < chrono::seconds(3600 * 48) && n.address.port() > 0 && n.address.port() < /*49152*/32768 && n.id != id() && !isPrivateAddress(n.address.address())) if (!n.dead && chrono::system_clock::now() - n.lastConnected < chrono::seconds(3600 * 48) && n.address.port() > 0 && n.address.port() < /*49152*/32768 && n.id != id() && !isPrivateAddress(n.address.address()))
{ {

56
libp2p/Host.h

@ -34,6 +34,7 @@
#include <libdevcore/Worker.h> #include <libdevcore/Worker.h>
#include <libdevcore/RangeMask.h> #include <libdevcore/RangeMask.h>
#include <libdevcrypto/Common.h> #include <libdevcrypto/Common.h>
#include "NodeTable.h"
#include "HostCapability.h" #include "HostCapability.h"
#include "Network.h" #include "Network.h"
#include "Common.h" #include "Common.h"
@ -59,29 +60,47 @@ enum class Origin
Perfect, Perfect,
}; };
struct Node struct NodeInfo
{ {
NodeId id; ///< Their id/public key. NodeId id; ///< Their id/public key.
unsigned index; ///< Index into m_nodesList unsigned index; ///< Index into m_nodesList
// p2p: move to NodeEndpoint
bi::tcp::endpoint address; ///< As reported from the node itself. bi::tcp::endpoint address; ///< As reported from the node itself.
int score = 0; ///< All time cumulative.
int rating = 0; ///< Trending. // p2p: This information is relevant to the network-stack, ex: firewall, rather than node itself
bool dead = false; ///< If true, we believe this node is permanently dead - forget all about it.
std::chrono::system_clock::time_point lastConnected; std::chrono::system_clock::time_point lastConnected;
std::chrono::system_clock::time_point lastAttempted; std::chrono::system_clock::time_point lastAttempted;
unsigned failedAttempts = 0; unsigned failedAttempts = 0;
DisconnectReason lastDisconnect = NoDisconnect; ///< Reason for disconnect that happened last. DisconnectReason lastDisconnect = NoDisconnect; ///< Reason for disconnect that happened last.
// p2p: just remove node
// p2p: revisit logic in see Session.cpp#210
bool dead = false; ///< If true, we believe this node is permanently dead - forget all about it.
// p2p: move to protocol-specific map
int score = 0; ///< All time cumulative.
int rating = 0; ///< Trending.
// p2p: remove
Origin idOrigin = Origin::Unknown; ///< How did we get to know this node's id? Origin idOrigin = Origin::Unknown; ///< How did we get to know this node's id?
// p2p: move to NodeEndpoint
int secondsSinceLastConnected() const { return lastConnected == std::chrono::system_clock::time_point() ? -1 : (int)std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now() - lastConnected).count(); } int secondsSinceLastConnected() const { return lastConnected == std::chrono::system_clock::time_point() ? -1 : (int)std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now() - lastConnected).count(); }
// p2p: move to NodeEndpoint
int secondsSinceLastAttempted() const { return lastAttempted == std::chrono::system_clock::time_point() ? -1 : (int)std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now() - lastAttempted).count(); } int secondsSinceLastAttempted() const { return lastAttempted == std::chrono::system_clock::time_point() ? -1 : (int)std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now() - lastAttempted).count(); }
// p2p: move to NodeEndpoint
unsigned fallbackSeconds() const; unsigned fallbackSeconds() const;
bool shouldReconnect() const; // p2p: move to NodeEndpoint
bool shouldReconnect() const { return std::chrono::system_clock::now() > lastAttempted + std::chrono::seconds(fallbackSeconds()); }
// p2p: This has two meanings now. It's possible UDP works but TPC is down (unable to punch hole).
// p2p: Rename to isConnect() and move to endpoint. revisit Session.cpp#245, MainWin.cpp#877
bool isOffline() const { return lastAttempted > lastConnected; } bool isOffline() const { return lastAttempted > lastConnected; }
bool operator<(Node const& _n) const
// p2p: Remove (in favor of lru eviction and sub-protocol ratings).
bool operator<(NodeInfo const& _n) const
{ {
if (isOffline() != _n.isOffline()) if (isOffline() != _n.isOffline())
return isOffline(); return isOffline();
@ -101,7 +120,7 @@ struct Node
} }
}; };
using Nodes = std::vector<Node>; using Nodes = std::vector<NodeInfo>;
/** /**
* @brief The Host class * @brief The Host class
@ -111,7 +130,7 @@ class Host: public Worker
{ {
friend class Session; friend class Session;
friend class HostCapabilityFace; friend class HostCapabilityFace;
friend struct Node; friend struct NodeInfo;
public: public:
/// Start server, listening for connections on the given port. /// Start server, listening for connections on the given port.
@ -134,7 +153,7 @@ public:
static std::string pocHost(); static std::string pocHost();
void connect(std::string const& _addr, unsigned short _port = 30303) noexcept; void connect(std::string const& _addr, unsigned short _port = 30303) noexcept;
void connect(bi::tcp::endpoint const& _ep); void connect(bi::tcp::endpoint const& _ep);
void connect(std::shared_ptr<Node> const& _n); void connect(std::shared_ptr<NodeInfo> const& _n);
/// @returns true iff we have a peer of the given id. /// @returns true iff we have a peer of the given id.
bool havePeer(NodeId _id) const; bool havePeer(NodeId _id) const;
@ -142,8 +161,11 @@ public:
/// Set ideal number of peers. /// Set ideal number of peers.
void setIdealPeerCount(unsigned _n) { m_idealPeerCount = _n; } void setIdealPeerCount(unsigned _n) { m_idealPeerCount = _n; }
/// p2p: template?
void setIdealPeerCount(HostCapabilityFace* _cap, unsigned _n) { m_capIdealPeerCount[_cap->capDesc()] = _n; }
/// Get peer information. /// Get peer information.
PeerInfos peers(bool _updatePing = false) const; PeerInfos peers() const;
/// Get number of peers connected; equivalent to, but faster than, peers().size(). /// Get number of peers connected; equivalent to, but faster than, peers().size().
size_t peerCount() const { RecursiveGuard l(x_peers); return m_peers.size(); } size_t peerCount() const { RecursiveGuard l(x_peers); return m_peers.size(); }
@ -178,7 +200,7 @@ public:
void registerPeer(std::shared_ptr<Session> _s, CapDescs const& _caps); void registerPeer(std::shared_ptr<Session> _s, CapDescs const& _caps);
std::shared_ptr<Node> node(NodeId _id) const { if (m_nodes.count(_id)) return m_nodes.at(_id); return std::shared_ptr<Node>(); } std::shared_ptr<NodeInfo> node(NodeId _id) const { if (m_nodes.count(_id)) return m_nodes.at(_id); return std::shared_ptr<NodeInfo>(); }
private: private:
/// Populate m_peerAddresses with available public addresses. /// Populate m_peerAddresses with available public addresses.
@ -189,9 +211,6 @@ private:
void seal(bytes& _b); void seal(bytes& _b);
void growPeers();
void prunePeers();
/// Called by Worker. Not thread-safe; to be called only by worker. /// Called by Worker. Not thread-safe; to be called only by worker.
virtual void startedWorking(); virtual void startedWorking();
/// Called by startedWorking. Not thread-safe; to be called only be Worker. /// Called by startedWorking. Not thread-safe; to be called only be Worker.
@ -203,7 +222,7 @@ private:
/// Shutdown network. Not thread-safe; to be called only by worker. /// Shutdown network. Not thread-safe; to be called only by worker.
virtual void doneWorking(); virtual void doneWorking();
std::shared_ptr<Node> noteNode(NodeId _id, bi::tcp::endpoint _a, Origin _o, bool _ready, NodeId _oldId = NodeId()); std::shared_ptr<NodeInfo> noteNode(NodeId _id, bi::tcp::endpoint _a, Origin _o, bool _ready, NodeId _oldId = NodeId());
Nodes potentialPeers(RangeMask<unsigned> const& _known); Nodes potentialPeers(RangeMask<unsigned> const& _known);
bool m_run = false; ///< Whether network is running. bool m_run = false; ///< Whether network is running.
@ -224,13 +243,14 @@ private:
std::unique_ptr<boost::asio::deadline_timer> m_timer; ///< Timer which, when network is running, calls scheduler() every c_timerInterval ms. std::unique_ptr<boost::asio::deadline_timer> m_timer; ///< Timer which, when network is running, calls scheduler() every c_timerInterval ms.
static const unsigned c_timerInterval = 100; ///< Interval which m_timer is run when network is connected. static const unsigned c_timerInterval = 100; ///< Interval which m_timer is run when network is connected.
unsigned m_lastTick = 0; ///< Used by run() for scheduling; must not be mutated outside of run().
std::set<Node*> m_pendingNodeConns; /// Used only by connect(Node&) to limit concurrently connecting to same node. See connect(shared_ptr<Node>const&). std::set<NodeInfo*> m_pendingNodeConns; /// Used only by connect(NodeInfo&) to limit concurrently connecting to same node. See connect(shared_ptr<NodeInfo>const&).
Mutex x_pendingNodeConns; Mutex x_pendingNodeConns;
bi::tcp::endpoint m_tcpPublic; ///< Our public listening endpoint. bi::tcp::endpoint m_tcpPublic; ///< Our public listening endpoint.
KeyPair m_key; ///< Our unique ID. KeyPair m_key; ///< Our unique ID.
std::shared_ptr<NodeTable> m_nodeTable; ///< Node table (uses kademlia-like discovery).
std::map<CapDesc, unsigned> m_capIdealPeerCount; ///< Ideal peer count for capability.
bool m_hadNewNodes = false; bool m_hadNewNodes = false;
@ -242,7 +262,7 @@ private:
/// Nodes to which we may connect (or to which we have connected). /// Nodes to which we may connect (or to which we have connected).
/// TODO: does this need a lock? /// TODO: does this need a lock?
std::map<NodeId, std::shared_ptr<Node> > m_nodes; std::map<NodeId, std::shared_ptr<NodeInfo> > m_nodes;
/// A list of node IDs. This contains every index from m_nodes; the order is guaranteed to remain the same. /// A list of node IDs. This contains every index from m_nodes; the order is guaranteed to remain the same.
std::vector<NodeId> m_nodesList; std::vector<NodeId> m_nodesList;

19
libp2p/NodeTable.cpp

@ -24,12 +24,15 @@ using namespace std;
using namespace dev; using namespace dev;
using namespace dev::p2p; using namespace dev::p2p;
NodeTable::NodeTable(ba::io_service& _io, KeyPair _alias, uint16_t _listenPort): NodeEntry::NodeEntry(Node _src, Public _pubk, NodeDefaultEndpoint _gw): Node(_pubk, _gw), distance(NodeTable::dist(_src.id,_pubk)) {}
NodeEntry::NodeEntry(Node _src, Public _pubk, bi::udp::endpoint _udp): Node(_pubk, NodeDefaultEndpoint(_udp)), distance(NodeTable::dist(_src.id,_pubk)) {}
NodeTable::NodeTable(ba::io_service& _io, KeyPair _alias, uint16_t _udp, bi::tcp::endpoint _ep):
m_node(Node(_alias.pub(), bi::udp::endpoint())), m_node(Node(_alias.pub(), bi::udp::endpoint())),
m_secret(_alias.sec()), m_secret(_alias.sec()),
m_socket(new NodeSocket(_io, *this, _listenPort)),
m_socketPtr(m_socket.get()),
m_io(_io), m_io(_io),
m_socket(new NodeSocket(m_io, *this, _udp)),
m_socketPtr(m_socket.get()),
m_bucketRefreshTimer(m_io), m_bucketRefreshTimer(m_io),
m_evictionCheckTimer(m_io) m_evictionCheckTimer(m_io)
{ {
@ -64,7 +67,7 @@ std::list<NodeId> NodeTable::nodes() const
return std::move(nodes); return std::move(nodes);
} }
list<NodeTable::NodeEntry> NodeTable::state() const list<NodeEntry> NodeTable::state() const
{ {
list<NodeEntry> ret; list<NodeEntry> ret;
Guard l(x_state); Guard l(x_state);
@ -74,7 +77,7 @@ list<NodeTable::NodeEntry> NodeTable::state() const
return move(ret); return move(ret);
} }
NodeTable::NodeEntry NodeTable::operator[](NodeId _id) NodeEntry NodeTable::operator[](NodeId _id)
{ {
Guard l(x_nodes); Guard l(x_nodes);
return *m_nodes[_id]; return *m_nodes[_id];
@ -135,7 +138,7 @@ void NodeTable::doFindNode(NodeId _node, unsigned _round, std::shared_ptr<std::s
}); });
} }
std::vector<std::shared_ptr<NodeTable::NodeEntry>> NodeTable::findNearest(NodeId _target) std::vector<std::shared_ptr<NodeEntry>> NodeTable::findNearest(NodeId _target)
{ {
// send s_alpha FindNode packets to nodes we know, closest to target // send s_alpha FindNode packets to nodes we know, closest to target
static unsigned lastBin = s_bins - 1; static unsigned lastBin = s_bins - 1;
@ -259,7 +262,7 @@ void NodeTable::noteNode(Public const& _pubk, bi::udp::endpoint const& _endpoint
} }
} }
// todo: why is this necessary? // todo: sometimes node is nullptr here
if (!!node) if (!!node)
noteNode(node); noteNode(node);
} }
@ -365,7 +368,7 @@ void NodeTable::onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytes
// clog(NodeTableMessageSummary) << "Received FindNode from " << _from.address().to_string() << ":" << _from.port(); // clog(NodeTableMessageSummary) << "Received FindNode from " << _from.address().to_string() << ":" << _from.port();
FindNode in = FindNode::fromBytesConstRef(_from, rlpBytes); FindNode in = FindNode::fromBytesConstRef(_from, rlpBytes);
std::vector<std::shared_ptr<NodeTable::NodeEntry>> nearest = findNearest(in.target); std::vector<std::shared_ptr<NodeEntry>> nearest = findNearest(in.target);
static unsigned const nlimit = (m_socketPtr->maxDatagramSize - 11) / 86; static unsigned const nlimit = (m_socketPtr->maxDatagramSize - 11) / 86;
for (unsigned offset = 0; offset < nearest.size(); offset += nlimit) for (unsigned offset = 0; offset < nearest.size(); offset += nlimit)
{ {

88
libp2p/NodeTable.h

@ -31,6 +31,41 @@ namespace dev
namespace p2p namespace p2p
{ {
struct NodeDefaultEndpoint
{
NodeDefaultEndpoint(bi::udp::endpoint _udp): udp(_udp) {}
NodeDefaultEndpoint(bi::tcp::endpoint _tcp): tcp(_tcp) {}
NodeDefaultEndpoint(bi::udp::endpoint _udp, bi::tcp::endpoint _tcp): udp(_udp), tcp(_tcp) {}
bi::udp::endpoint udp;
bi::tcp::endpoint tcp;
};
struct Node
{
Node(Public _pubk, NodeDefaultEndpoint _udp): id(_pubk), endpoint(_udp) {}
Node(Public _pubk, bi::udp::endpoint _udp): Node(_pubk, NodeDefaultEndpoint(_udp)) {}
virtual NodeId const& address() const { return id; }
virtual Public const& publicKey() const { return id; }
NodeId id;
NodeDefaultEndpoint endpoint;
};
/**
* NodeEntry
* @brief Entry in Node Table
*/
struct NodeEntry: public Node
{
NodeEntry(Node _src, Public _pubk, NodeDefaultEndpoint _gw); //: Node(_pubk, _gw), distance(dist(_src.id,_pubk)) {}
NodeEntry(Node _src, Public _pubk, bi::udp::endpoint _udp); //: Node(_pubk, NodeDefaultEndpoint(_udp)), distance(dist(_src.id,_pubk)) {}
const unsigned distance; ///< Node's distance from _src (see constructor).
};
/** /**
* NodeTable using S/Kademlia system for node discovery and preference. * NodeTable using S/Kademlia system for node discovery and preference.
* untouched buckets are refreshed if they have not been touched within an hour * untouched buckets are refreshed if they have not been touched within an hour
@ -50,7 +85,7 @@ namespace p2p
* @todo expiration and sha3(id) 'to' for messages which are replies (prevents replay) * @todo expiration and sha3(id) 'to' for messages which are replies (prevents replay)
* @todo std::shared_ptr<PingNode> m_cachedPingPacket; * @todo std::shared_ptr<PingNode> m_cachedPingPacket;
* @todo std::shared_ptr<FindNeighbors> m_cachedFindSelfPacket; * @todo std::shared_ptr<FindNeighbors> m_cachedFindSelfPacket;
* @todo store root node in table? * @todo store self (root) in table? (potential bug. alt is to add to list when self is closest to target)
* *
* [Networking] * [Networking]
* @todo TCP endpoints * @todo TCP endpoints
@ -71,45 +106,8 @@ class NodeTable: UDPSocketEvents, public std::enable_shared_from_this<NodeTable>
using TimePoint = std::chrono::steady_clock::time_point; using TimePoint = std::chrono::steady_clock::time_point;
using EvictionTimeout = std::pair<std::pair<NodeId,TimePoint>,NodeId>; ///< First NodeId may be evicted and replaced with second NodeId. using EvictionTimeout = std::pair<std::pair<NodeId,TimePoint>,NodeId>; ///< First NodeId may be evicted and replaced with second NodeId.
struct NodeDefaultEndpoint
{
NodeDefaultEndpoint(bi::udp::endpoint _udp): udp(_udp) {}
bi::udp::endpoint udp;
};
struct Node
{
Node(Public _pubk, NodeDefaultEndpoint _udp): id(_pubk), endpoint(_udp) {}
Node(Public _pubk, bi::udp::endpoint _udp): Node(_pubk, NodeDefaultEndpoint(_udp)) {}
virtual NodeId const& address() const { return id; }
virtual Public const& publicKey() const { return id; }
NodeId id;
NodeDefaultEndpoint endpoint;
};
/**
* NodeEntry
* @todo Type of id will become template parameter.
*/
struct NodeEntry: public Node
{
NodeEntry(Node _src, Public _pubk, NodeDefaultEndpoint _gw): Node(_pubk, _gw), distance(dist(_src.id,_pubk)) {}
NodeEntry(Node _src, Public _pubk, bi::udp::endpoint _udp): Node(_pubk, NodeDefaultEndpoint(_udp)), distance(dist(_src.id,_pubk)) {}
const unsigned distance; ///< Node's distance from _src (see constructor).
};
struct NodeBucket
{
unsigned distance;
TimePoint modified;
std::list<std::weak_ptr<NodeEntry>> nodes;
};
public: public:
NodeTable(ba::io_service& _io, KeyPair _alias, uint16_t _port = 30300); NodeTable(ba::io_service& _io, KeyPair _alias, uint16_t _udpPort = 30303, bi::tcp::endpoint _ep = bi::tcp::endpoint());
~NodeTable(); ~NodeTable();
/// Constants for Kademlia, mostly derived from address space. /// Constants for Kademlia, mostly derived from address space.
@ -142,6 +140,13 @@ public:
protected: protected:
struct NodeBucket
{
unsigned distance;
TimePoint modified;
std::list<std::weak_ptr<NodeEntry>> nodes;
};
/// Repeatedly sends s_alpha concurrent requests to nodes nearest to target, for nodes nearest to target, up to s_maxSteps rounds. /// Repeatedly sends s_alpha concurrent requests to nodes nearest to target, for nodes nearest to target, up to s_maxSteps rounds.
void doFindNode(NodeId _node, unsigned _round = 0, std::shared_ptr<std::set<std::shared_ptr<NodeEntry>>> _tried = std::shared_ptr<std::set<std::shared_ptr<NodeEntry>>>()); void doFindNode(NodeId _node, unsigned _round = 0, std::shared_ptr<std::set<std::shared_ptr<NodeEntry>>> _tried = std::shared_ptr<std::set<std::shared_ptr<NodeEntry>>>());
@ -194,9 +199,10 @@ protected:
Mutex x_evictions; Mutex x_evictions;
std::deque<EvictionTimeout> m_evictions; ///< Eviction timeouts. std::deque<EvictionTimeout> m_evictions; ///< Eviction timeouts.
ba::io_service& m_io; ///< Used by bucket refresh timer.
std::shared_ptr<NodeSocket> m_socket; ///< Shared pointer for our UDPSocket; ASIO requires shared_ptr. std::shared_ptr<NodeSocket> m_socket; ///< Shared pointer for our UDPSocket; ASIO requires shared_ptr.
NodeSocket* m_socketPtr; ///< Set to m_socket.get(). NodeSocket* m_socketPtr; ///< Set to m_socket.get().
ba::io_service& m_io; ///< Used by bucket refresh timer.
boost::asio::deadline_timer m_bucketRefreshTimer; ///< Timer which schedules and enacts bucket refresh. boost::asio::deadline_timer m_bucketRefreshTimer; ///< Timer which schedules and enacts bucket refresh.
boost::asio::deadline_timer m_evictionCheckTimer; ///< Timer for handling node evictions. boost::asio::deadline_timer m_evictionCheckTimer; ///< Timer for handling node evictions.
}; };
@ -317,7 +323,7 @@ struct Neighbors: RLPXDatagram<Neighbors>
}; };
using RLPXDatagram<Neighbors>::RLPXDatagram; using RLPXDatagram<Neighbors>::RLPXDatagram;
Neighbors(bi::udp::endpoint _to, std::vector<std::shared_ptr<NodeTable::NodeEntry>> const& _nearest, unsigned _offset = 0, unsigned _limit = 0): RLPXDatagram<Neighbors>(_to) Neighbors(bi::udp::endpoint _to, std::vector<std::shared_ptr<NodeEntry>> const& _nearest, unsigned _offset = 0, unsigned _limit = 0): RLPXDatagram<Neighbors>(_to)
{ {
auto limit = _limit ? std::min(_nearest.size(), (size_t)(_offset + _limit)) : _nearest.size(); auto limit = _limit ? std::min(_nearest.size(), (size_t)(_offset + _limit)) : _nearest.size();
for (auto i = _offset; i < limit; i++) for (auto i = _offset; i < limit; i++)

2
libp2p/Session.cpp

@ -47,7 +47,7 @@ Session::Session(Host* _s, bi::tcp::socket _socket, bi::tcp::endpoint const& _ma
m_info = PeerInfo({NodeId(), "?", m_manualEndpoint.address().to_string(), 0, std::chrono::steady_clock::duration(0), CapDescSet(), 0, map<string, string>()}); m_info = PeerInfo({NodeId(), "?", m_manualEndpoint.address().to_string(), 0, std::chrono::steady_clock::duration(0), CapDescSet(), 0, map<string, string>()});
} }
Session::Session(Host* _s, bi::tcp::socket _socket, std::shared_ptr<Node> const& _n, bool _force): Session::Session(Host* _s, bi::tcp::socket _socket, std::shared_ptr<NodeInfo> const& _n, bool _force):
m_server(_s), m_server(_s),
m_socket(std::move(_socket)), m_socket(std::move(_socket)),
m_node(_n), m_node(_n),

6
libp2p/Session.h

@ -39,7 +39,7 @@ namespace dev
namespace p2p namespace p2p
{ {
struct Node; struct NodeInfo;
/** /**
* @brief The Session class * @brief The Session class
@ -51,7 +51,7 @@ class Session: public std::enable_shared_from_this<Session>
friend class HostCapabilityFace; friend class HostCapabilityFace;
public: public:
Session(Host* _server, bi::tcp::socket _socket, std::shared_ptr<Node> const& _n, bool _force = false); Session(Host* _server, bi::tcp::socket _socket, std::shared_ptr<NodeInfo> const& _n, bool _force = false);
Session(Host* _server, bi::tcp::socket _socket, bi::tcp::endpoint const& _manual); Session(Host* _server, bi::tcp::socket _socket, bi::tcp::endpoint const& _manual);
virtual ~Session(); virtual ~Session();
@ -113,7 +113,7 @@ private:
PeerInfo m_info; ///< Dynamic information about this peer. PeerInfo m_info; ///< Dynamic information about this peer.
unsigned m_protocolVersion = 0; ///< The protocol version of the peer. unsigned m_protocolVersion = 0; ///< The protocol version of the peer.
std::shared_ptr<Node> m_node; ///< The Node object. Might be null if we constructed using a bare address/port. std::shared_ptr<NodeInfo> m_node; ///< The NodeInfo object. Might be null if we constructed using a bare address/port.
bi::tcp::endpoint m_manualEndpoint; ///< The endpoint as specified by the constructor. bi::tcp::endpoint m_manualEndpoint; ///< The endpoint as specified by the constructor.
bool m_force = false; ///< If true, ignore IDs being different. This could open you up to MitM attacks. bool m_force = false; ///< If true, ignore IDs being different. This could open you up to MitM attacks.
bool m_dropped = false; ///< If true, we've already divested ourselves of this peer. We're just waiting for the reads & writes to fail before the shared_ptr goes OOS and the destructor kicks in. bool m_dropped = false; ///< If true, we've already divested ourselves of this peer. We're just waiting for the reads & writes to fail before the shared_ptr goes OOS and the destructor kicks in.

2
libp2p/UDP.h

@ -57,8 +57,6 @@ protected:
/** /**
* @brief RLPX Datagram which can be signed. * @brief RLPX Datagram which can be signed.
* @todo compact templates
* @todo make data private/functional (see UDPDatagram)
*/ */
struct RLPXDatagramFace: public UDPDatagram struct RLPXDatagramFace: public UDPDatagram
{ {

Loading…
Cancel
Save