From 5436f90f04d1ec6a81f1fe90f57a8385e9e8635d Mon Sep 17 00:00:00 2001 From: subtly Date: Mon, 5 Jan 2015 00:49:16 +0100 Subject: [PATCH] Pass 1 integrating node table. TBD: whether to store/relay cap info. --- libp2p/Host.cpp | 202 +++++++++++++++++++++++-------------------- libp2p/Host.h | 66 +++++++++----- libp2p/NodeTable.cpp | 19 ++-- libp2p/NodeTable.h | 92 +++++++++++--------- libp2p/Session.cpp | 2 +- libp2p/Session.h | 6 +- libp2p/UDP.h | 2 - 7 files changed, 214 insertions(+), 175 deletions(-) diff --git a/libp2p/Host.cpp b/libp2p/Host.cpp index 4a99ac90f..73fabe7f2 100644 --- a/libp2p/Host.cpp +++ b/libp2p/Host.cpp @@ -44,7 +44,8 @@ Host::Host(std::string const& _clientVersion, NetworkPreferences const& _n, bool m_ifAddresses(Network::getInterfaceAddresses()), m_ioService(2), m_tcp4Acceptor(m_ioService), - m_key(KeyPair::create()) + m_key(KeyPair::create()), + m_nodeTable(new NodeTable(m_ioService, m_key)) { for (auto address: m_ifAddresses) if (address.is_v4()) @@ -143,11 +144,12 @@ void Host::doneWorking() unsigned Host::protocolVersion() const { - return 2; + return 3; } void Host::registerPeer(std::shared_ptr _s, CapDescs const& _caps) { +#warning integration: todo rework so this is an exception if (!_s->m_node || !_s->m_node->id) { cwarn << "Attempting to register a peer without node information!"; @@ -180,7 +182,8 @@ void Host::seal(bytes& _b) _b[7] = len & 0xff; } -shared_ptr Host::noteNode(NodeId _id, bi::tcp::endpoint _a, Origin _o, bool _ready, NodeId _oldId) +#warning integration: todo remove origin, ready, oldid. port to NodeTable. see Session.cpp#244,363 +shared_ptr Host::noteNode(NodeId _id, bi::tcp::endpoint _a, Origin _o, bool _ready, NodeId _oldId) { RecursiveGuard l(x_peers); if (_a.port() < 30300 || _a.port() > 30305) @@ -192,11 +195,11 @@ shared_ptr Host::noteNode(NodeId _id, bi::tcp::endpoint _a, Origin _o, boo _a = bi::tcp::endpoint(_a.address(), 0); } -// cnote << "Node:" << _id.abridged() << _a << (_ready ? "ready" : "used") << _oldId.abridged() << (m_nodes.count(_id) ? "[have]" : "[NEW]"); +// cnote << "NodeInfo:" << _id.abridged() << _a << (_ready ? "ready" : "used") << _oldId.abridged() << (m_nodes.count(_id) ? "[have]" : "[NEW]"); // First check for another node with the same connection credentials, and put it in oldId if found. if (!_oldId) - for (pair> const& n: m_nodes) + for (pair> const& n: m_nodes) if (n.second->address == _a && n.second->id != _id) { _oldId = n.second->id; @@ -217,7 +220,7 @@ shared_ptr Host::noteNode(NodeId _id, bi::tcp::endpoint _a, Origin _o, boo i = m_nodesList.size(); m_nodesList.push_back(_id); } - m_nodes[_id] = make_shared(); + m_nodes[_id] = make_shared(); m_nodes[_id]->id = _id; m_nodes[_id]->index = i; m_nodes[_id]->idOrigin = _o; @@ -246,6 +249,7 @@ shared_ptr Host::noteNode(NodeId _id, bi::tcp::endpoint _a, Origin _o, boo return m_nodes[_id]; } +#warning integration: TBD caps in NodeTable/NodeEntry Nodes Host::potentialPeers(RangeMask const& _known) { RecursiveGuard l(x_peers); @@ -376,6 +380,7 @@ string Host::pocHost() return "poc-" + strs[1] + ".ethdev.com"; } +#warning integration: todo remove all connect() w/ addNode makeRequired (this requires pubkey) void Host::connect(std::string const& _addr, unsigned short _port) noexcept { if (!m_run) @@ -428,13 +433,13 @@ void Host::connect(bi::tcp::endpoint const& _ep) }); } -void Host::connect(std::shared_ptr const& _n) +void Host::connect(std::shared_ptr const& _n) { if (!m_run) return; // prevent concurrently connecting to a node; todo: better abstraction - Node *nptr = _n.get(); + NodeInfo *nptr = _n.get(); { Guard l(x_pendingNodeConns); if (m_pendingNodeConns.count(nptr)) @@ -488,7 +493,7 @@ bool Host::havePeer(NodeId _id) const return !!m_peers.count(_id); } -unsigned Node::fallbackSeconds() const +unsigned NodeInfo::fallbackSeconds() const { switch (lastDisconnect) { @@ -510,85 +515,83 @@ unsigned Node::fallbackSeconds() const } } -bool Node::shouldReconnect() const -{ - return chrono::system_clock::now() > lastAttempted + chrono::seconds(fallbackSeconds()); -} - -void Host::growPeers() -{ - RecursiveGuard l(x_peers); - int morePeers = (int)m_idealPeerCount - m_peers.size(); - if (morePeers > 0) - { - auto toTry = m_ready; - if (!m_netPrefs.localNetworking) - toTry -= m_private; - set ns; - for (auto i: toTry) - if (m_nodes[m_nodesList[i]]->shouldReconnect()) - ns.insert(*m_nodes[m_nodesList[i]]); - - if (ns.size()) - for (Node const& i: ns) - { - connect(m_nodes[i.id]); - if (!--morePeers) - return; - } - else - for (auto const& i: m_peers) - if (auto p = i.second.lock()) - p->ensureNodesRequested(); - } -} - -void Host::prunePeers() -{ - RecursiveGuard l(x_peers); - // We'll keep at most twice as many as is ideal, halfing what counts as "too young to kill" until we get there. - set dc; - for (unsigned old = 15000; m_peers.size() - dc.size() > m_idealPeerCount * 2 && old > 100; old /= 2) - if (m_peers.size() - dc.size() > m_idealPeerCount) - { - // look for worst peer to kick off - // first work out how many are old enough to kick off. - shared_ptr worst; - unsigned agedPeers = 0; - for (auto i: m_peers) - if (!dc.count(i.first)) - if (auto p = i.second.lock()) - if (/*(m_mode != NodeMode::Host || p->m_caps != 0x01) &&*/ chrono::steady_clock::now() > p->m_connect + chrono::milliseconds(old)) // don't throw off new peers; peer-servers should never kick off other peer-servers. - { - ++agedPeers; - if ((!worst || p->rating() < worst->rating() || (p->rating() == worst->rating() && p->m_connect > worst->m_connect))) // kill older ones - worst = p; - } - if (!worst || agedPeers <= m_idealPeerCount) - break; - dc.insert(worst->id()); - worst->disconnect(TooManyPeers); - } - - // Remove dead peers from list. - for (auto i = m_peers.begin(); i != m_peers.end();) - if (i->second.lock().get()) - ++i; - else - i = m_peers.erase(i); -} - -PeerInfos Host::peers(bool _updatePing) const +#warning integration: ---- grow/prunePeers +#warning integration: todo grow/prune into 'maintainPeers' & evaluate reputation instead of availability. schedule via deadline timer. +//void Host::growPeers() +//{ +// RecursiveGuard l(x_peers); +// int morePeers = (int)m_idealPeerCount - m_peers.size(); +// if (morePeers > 0) +// { +// auto toTry = m_ready; +// if (!m_netPrefs.localNetworking) +// toTry -= m_private; +// set ns; +// for (auto i: toTry) +// if (m_nodes[m_nodesList[i]]->shouldReconnect()) +// ns.insert(*m_nodes[m_nodesList[i]]); +// +// if (ns.size()) +// for (NodeInfo const& i: ns) +// { +// connect(m_nodes[i.id]); +// if (!--morePeers) +// return; +// } +// else +// for (auto const& i: m_peers) +// if (auto p = i.second.lock()) +// p->ensureNodesRequested(); +// } +//} +// +//void Host::prunePeers() +//{ +// RecursiveGuard l(x_peers); +// // We'll keep at most twice as many as is ideal, halfing what counts as "too young to kill" until we get there. +// set dc; +// for (unsigned old = 15000; m_peers.size() - dc.size() > m_idealPeerCount * 2 && old > 100; old /= 2) +// if (m_peers.size() - dc.size() > m_idealPeerCount) +// { +// // look for worst peer to kick off +// // first work out how many are old enough to kick off. +// shared_ptr worst; +// unsigned agedPeers = 0; +// for (auto i: m_peers) +// if (!dc.count(i.first)) +// if (auto p = i.second.lock()) +// if (/*(m_mode != NodeMode::Host || p->m_caps != 0x01) &&*/ chrono::steady_clock::now() > p->m_connect + chrono::milliseconds(old)) // don't throw off new peers; peer-servers should never kick off other peer-servers. +// { +// ++agedPeers; +// if ((!worst || p->rating() < worst->rating() || (p->rating() == worst->rating() && p->m_connect > worst->m_connect))) // kill older ones +// worst = p; +// } +// if (!worst || agedPeers <= m_idealPeerCount) +// break; +// dc.insert(worst->id()); +// worst->disconnect(TooManyPeers); +// } +// +// // Remove dead peers from list. +// for (auto i = m_peers.begin(); i != m_peers.end();) +// if (i->second.lock().get()) +// ++i; +// else +// i = m_peers.erase(i); +//} + +PeerInfos Host::peers() const { if (!m_run) return PeerInfos(); +#warning integration: ---- pingAll. It is called every 30secs via deadline timer. RecursiveGuard l(x_peers); - if (_updatePing) - { - const_cast(this)->pingAll(); - this_thread::sleep_for(chrono::milliseconds(200)); - } +// if (_updatePing) +// { +// const_cast(this)->pingAll(); +// this_thread::sleep_for(chrono::milliseconds(200)); +// } std::vector ret; for (auto& i: m_peers) if (auto j = i.second.lock()) @@ -610,13 +613,14 @@ void Host::run(boost::system::error_code const&) return; } - m_lastTick += c_timerInterval; - if (m_lastTick >= c_timerInterval * 10) - { - growPeers(); - prunePeers(); - m_lastTick = 0; - } +#warning integration: ---- +// m_lastTick += c_timerInterval; +// if (m_lastTick >= c_timerInterval * 10) +// { +// growPeers(); +// prunePeers(); +// m_lastTick = 0; +// } if (m_hadNewNodes) { @@ -670,12 +674,19 @@ void Host::startedWorking() if (m_listenPort > 0) runAcceptor(); + +#warning integration: ++++ + if (!m_tcpPublic.address().is_unspecified()) + m_nodeTable.reset(new NodeTable(m_ioService, m_key, m_listenPort, m_tcpPublic)); + else + m_nodeTable.reset(new NodeTable(m_ioService, m_key, m_listenPort > 0 ? m_listenPort : 30303)); } - // if m_public address is valid then add us to node list - // todo: abstract empty() and emplace logic - if (!m_tcpPublic.address().is_unspecified() && (m_nodes.empty() || m_nodes[m_nodesList[0]]->id != id())) - noteNode(id(), m_tcpPublic, Origin::Perfect, false); +#warning integration: ---- +// // if m_public address is valid then add us to node list +// // todo: abstract empty() and emplace logic +// if (!m_tcpPublic.address().is_unspecified() && (m_nodes.empty() || m_nodes[m_nodesList[0]]->id != id())) +// noteNode(id(), m_tcpPublic, Origin::Perfect, false); clog(NetNote) << "Id:" << id().abridged(); @@ -697,6 +708,7 @@ void Host::pingAll() m_lastPing = chrono::steady_clock::now(); } +#warning integration: todo save/restoreNodes bytes Host::saveNodes() const { RLPStream nodes; @@ -705,7 +717,7 @@ bytes Host::saveNodes() const RecursiveGuard l(x_peers); for (auto const& i: m_nodes) { - Node const& n = *(i.second); + NodeInfo const& n = *(i.second); // TODO: PoC-7: Figure out why it ever shares these ports.//n.address.port() >= 30300 && n.address.port() <= 30305 && if (!n.dead && chrono::system_clock::now() - n.lastConnected < chrono::seconds(3600 * 48) && n.address.port() > 0 && n.address.port() < /*49152*/32768 && n.id != id() && !isPrivateAddress(n.address.address())) { diff --git a/libp2p/Host.h b/libp2p/Host.h index 8ed25f2ae..9600c723b 100644 --- a/libp2p/Host.h +++ b/libp2p/Host.h @@ -34,6 +34,7 @@ #include #include #include +#include "NodeTable.h" #include "HostCapability.h" #include "Network.h" #include "Common.h" @@ -59,29 +60,47 @@ enum class Origin Perfect, }; -struct Node +struct NodeInfo { NodeId id; ///< Their id/public key. unsigned index; ///< Index into m_nodesList + + // p2p: move to NodeEndpoint bi::tcp::endpoint address; ///< As reported from the node itself. - int score = 0; ///< All time cumulative. - int rating = 0; ///< Trending. - bool dead = false; ///< If true, we believe this node is permanently dead - forget all about it. + + // p2p: This information is relevant to the network-stack, ex: firewall, rather than node itself std::chrono::system_clock::time_point lastConnected; std::chrono::system_clock::time_point lastAttempted; unsigned failedAttempts = 0; DisconnectReason lastDisconnect = NoDisconnect; ///< Reason for disconnect that happened last. + // p2p: just remove node + // p2p: revisit logic in see Session.cpp#210 + bool dead = false; ///< If true, we believe this node is permanently dead - forget all about it. + + // p2p: move to protocol-specific map + int score = 0; ///< All time cumulative. + int rating = 0; ///< Trending. + + // p2p: remove Origin idOrigin = Origin::Unknown; ///< How did we get to know this node's id? + // p2p: move to NodeEndpoint int secondsSinceLastConnected() const { return lastConnected == std::chrono::system_clock::time_point() ? -1 : (int)std::chrono::duration_cast(std::chrono::system_clock::now() - lastConnected).count(); } + // p2p: move to NodeEndpoint int secondsSinceLastAttempted() const { return lastAttempted == std::chrono::system_clock::time_point() ? -1 : (int)std::chrono::duration_cast(std::chrono::system_clock::now() - lastAttempted).count(); } + // p2p: move to NodeEndpoint unsigned fallbackSeconds() const; - bool shouldReconnect() const; + // p2p: move to NodeEndpoint + bool shouldReconnect() const { return std::chrono::system_clock::now() > lastAttempted + std::chrono::seconds(fallbackSeconds()); } + // p2p: This has two meanings now. It's possible UDP works but TPC is down (unable to punch hole). + // p2p: Rename to isConnect() and move to endpoint. revisit Session.cpp#245, MainWin.cpp#877 bool isOffline() const { return lastAttempted > lastConnected; } - bool operator<(Node const& _n) const + + // p2p: Remove (in favor of lru eviction and sub-protocol ratings). + bool operator<(NodeInfo const& _n) const { if (isOffline() != _n.isOffline()) return isOffline(); @@ -101,7 +120,7 @@ struct Node } }; -using Nodes = std::vector; +using Nodes = std::vector; /** * @brief The Host class @@ -111,7 +130,7 @@ class Host: public Worker { friend class Session; friend class HostCapabilityFace; - friend struct Node; + friend struct NodeInfo; public: /// Start server, listening for connections on the given port. @@ -134,16 +153,19 @@ public: static std::string pocHost(); void connect(std::string const& _addr, unsigned short _port = 30303) noexcept; void connect(bi::tcp::endpoint const& _ep); - void connect(std::shared_ptr const& _n); + void connect(std::shared_ptr const& _n); /// @returns true iff we have a peer of the given id. bool havePeer(NodeId _id) const; /// Set ideal number of peers. void setIdealPeerCount(unsigned _n) { m_idealPeerCount = _n; } + + /// p2p: template? + void setIdealPeerCount(HostCapabilityFace* _cap, unsigned _n) { m_capIdealPeerCount[_cap->capDesc()] = _n; } /// Get peer information. - PeerInfos peers(bool _updatePing = false) const; + PeerInfos peers() const; /// Get number of peers connected; equivalent to, but faster than, peers().size(). size_t peerCount() const { RecursiveGuard l(x_peers); return m_peers.size(); } @@ -178,7 +200,7 @@ public: void registerPeer(std::shared_ptr _s, CapDescs const& _caps); - std::shared_ptr node(NodeId _id) const { if (m_nodes.count(_id)) return m_nodes.at(_id); return std::shared_ptr(); } + std::shared_ptr node(NodeId _id) const { if (m_nodes.count(_id)) return m_nodes.at(_id); return std::shared_ptr(); } private: /// Populate m_peerAddresses with available public addresses. @@ -189,9 +211,6 @@ private: void seal(bytes& _b); - void growPeers(); - void prunePeers(); - /// Called by Worker. Not thread-safe; to be called only by worker. virtual void startedWorking(); /// Called by startedWorking. Not thread-safe; to be called only be Worker. @@ -203,34 +222,35 @@ private: /// Shutdown network. Not thread-safe; to be called only by worker. virtual void doneWorking(); - std::shared_ptr noteNode(NodeId _id, bi::tcp::endpoint _a, Origin _o, bool _ready, NodeId _oldId = NodeId()); + std::shared_ptr noteNode(NodeId _id, bi::tcp::endpoint _a, Origin _o, bool _ready, NodeId _oldId = NodeId()); Nodes potentialPeers(RangeMask const& _known); bool m_run = false; ///< Whether network is running. - std::mutex x_runTimer; ///< Start/stop mutex. + std::mutex x_runTimer; ///< Start/stop mutex. std::string m_clientVersion; ///< Our version string. NetworkPreferences m_netPrefs; ///< Network settings. /// Interface addresses (private, public) - std::vector m_ifAddresses; ///< Interface addresses. + std::vector m_ifAddresses; ///< Interface addresses. int m_listenPort = -1; ///< What port are we listening on. -1 means binding failed or acceptor hasn't been initialized. - ba::io_service m_ioService; ///< IOService for network stuff. - bi::tcp::acceptor m_tcp4Acceptor; ///< Listening acceptor. + ba::io_service m_ioService; ///< IOService for network stuff. + bi::tcp::acceptor m_tcp4Acceptor; ///< Listening acceptor. std::unique_ptr m_socket; ///< Listening socket. std::unique_ptr m_timer; ///< Timer which, when network is running, calls scheduler() every c_timerInterval ms. static const unsigned c_timerInterval = 100; ///< Interval which m_timer is run when network is connected. - unsigned m_lastTick = 0; ///< Used by run() for scheduling; must not be mutated outside of run(). - std::set m_pendingNodeConns; /// Used only by connect(Node&) to limit concurrently connecting to same node. See connect(shared_ptrconst&). + std::set m_pendingNodeConns; /// Used only by connect(NodeInfo&) to limit concurrently connecting to same node. See connect(shared_ptrconst&). Mutex x_pendingNodeConns; bi::tcp::endpoint m_tcpPublic; ///< Our public listening endpoint. - KeyPair m_key; ///< Our unique ID. + KeyPair m_key; ///< Our unique ID. + std::shared_ptr m_nodeTable; ///< Node table (uses kademlia-like discovery). + std::map m_capIdealPeerCount; ///< Ideal peer count for capability. bool m_hadNewNodes = false; @@ -242,7 +262,7 @@ private: /// Nodes to which we may connect (or to which we have connected). /// TODO: does this need a lock? - std::map > m_nodes; + std::map > m_nodes; /// A list of node IDs. This contains every index from m_nodes; the order is guaranteed to remain the same. std::vector m_nodesList; diff --git a/libp2p/NodeTable.cpp b/libp2p/NodeTable.cpp index 20f9f5fdf..54c8c9f35 100644 --- a/libp2p/NodeTable.cpp +++ b/libp2p/NodeTable.cpp @@ -24,12 +24,15 @@ using namespace std; using namespace dev; using namespace dev::p2p; -NodeTable::NodeTable(ba::io_service& _io, KeyPair _alias, uint16_t _listenPort): +NodeEntry::NodeEntry(Node _src, Public _pubk, NodeDefaultEndpoint _gw): Node(_pubk, _gw), distance(NodeTable::dist(_src.id,_pubk)) {} +NodeEntry::NodeEntry(Node _src, Public _pubk, bi::udp::endpoint _udp): Node(_pubk, NodeDefaultEndpoint(_udp)), distance(NodeTable::dist(_src.id,_pubk)) {} + +NodeTable::NodeTable(ba::io_service& _io, KeyPair _alias, uint16_t _udp, bi::tcp::endpoint _ep): m_node(Node(_alias.pub(), bi::udp::endpoint())), m_secret(_alias.sec()), - m_socket(new NodeSocket(_io, *this, _listenPort)), - m_socketPtr(m_socket.get()), m_io(_io), + m_socket(new NodeSocket(m_io, *this, _udp)), + m_socketPtr(m_socket.get()), m_bucketRefreshTimer(m_io), m_evictionCheckTimer(m_io) { @@ -64,7 +67,7 @@ std::list NodeTable::nodes() const return std::move(nodes); } -list NodeTable::state() const +list NodeTable::state() const { list ret; Guard l(x_state); @@ -74,7 +77,7 @@ list NodeTable::state() const return move(ret); } -NodeTable::NodeEntry NodeTable::operator[](NodeId _id) +NodeEntry NodeTable::operator[](NodeId _id) { Guard l(x_nodes); return *m_nodes[_id]; @@ -135,7 +138,7 @@ void NodeTable::doFindNode(NodeId _node, unsigned _round, std::shared_ptr> NodeTable::findNearest(NodeId _target) +std::vector> NodeTable::findNearest(NodeId _target) { // send s_alpha FindNode packets to nodes we know, closest to target static unsigned lastBin = s_bins - 1; @@ -259,7 +262,7 @@ void NodeTable::noteNode(Public const& _pubk, bi::udp::endpoint const& _endpoint } } - // todo: why is this necessary? + // todo: sometimes node is nullptr here if (!!node) noteNode(node); } @@ -365,7 +368,7 @@ void NodeTable::onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytes // clog(NodeTableMessageSummary) << "Received FindNode from " << _from.address().to_string() << ":" << _from.port(); FindNode in = FindNode::fromBytesConstRef(_from, rlpBytes); - std::vector> nearest = findNearest(in.target); + std::vector> nearest = findNearest(in.target); static unsigned const nlimit = (m_socketPtr->maxDatagramSize - 11) / 86; for (unsigned offset = 0; offset < nearest.size(); offset += nlimit) { diff --git a/libp2p/NodeTable.h b/libp2p/NodeTable.h index 5466a3ae3..e3503b0b9 100644 --- a/libp2p/NodeTable.h +++ b/libp2p/NodeTable.h @@ -31,6 +31,41 @@ namespace dev namespace p2p { +struct NodeDefaultEndpoint +{ + NodeDefaultEndpoint(bi::udp::endpoint _udp): udp(_udp) {} + NodeDefaultEndpoint(bi::tcp::endpoint _tcp): tcp(_tcp) {} + NodeDefaultEndpoint(bi::udp::endpoint _udp, bi::tcp::endpoint _tcp): udp(_udp), tcp(_tcp) {} + + bi::udp::endpoint udp; + bi::tcp::endpoint tcp; +}; + +struct Node +{ + Node(Public _pubk, NodeDefaultEndpoint _udp): id(_pubk), endpoint(_udp) {} + Node(Public _pubk, bi::udp::endpoint _udp): Node(_pubk, NodeDefaultEndpoint(_udp)) {} + + virtual NodeId const& address() const { return id; } + virtual Public const& publicKey() const { return id; } + + NodeId id; + NodeDefaultEndpoint endpoint; +}; + + +/** + * NodeEntry + * @brief Entry in Node Table + */ +struct NodeEntry: public Node +{ + NodeEntry(Node _src, Public _pubk, NodeDefaultEndpoint _gw); //: Node(_pubk, _gw), distance(dist(_src.id,_pubk)) {} + NodeEntry(Node _src, Public _pubk, bi::udp::endpoint _udp); //: Node(_pubk, NodeDefaultEndpoint(_udp)), distance(dist(_src.id,_pubk)) {} + + const unsigned distance; ///< Node's distance from _src (see constructor). +}; + /** * NodeTable using S/Kademlia system for node discovery and preference. * untouched buckets are refreshed if they have not been touched within an hour @@ -50,7 +85,7 @@ namespace p2p * @todo expiration and sha3(id) 'to' for messages which are replies (prevents replay) * @todo std::shared_ptr m_cachedPingPacket; * @todo std::shared_ptr m_cachedFindSelfPacket; - * @todo store root node in table? + * @todo store self (root) in table? (potential bug. alt is to add to list when self is closest to target) * * [Networking] * @todo TCP endpoints @@ -70,46 +105,9 @@ class NodeTable: UDPSocketEvents, public std::enable_shared_from_this using NodeSocket = UDPSocket; using TimePoint = std::chrono::steady_clock::time_point; using EvictionTimeout = std::pair,NodeId>; ///< First NodeId may be evicted and replaced with second NodeId. - - struct NodeDefaultEndpoint - { - NodeDefaultEndpoint(bi::udp::endpoint _udp): udp(_udp) {} - bi::udp::endpoint udp; - }; - - struct Node - { - Node(Public _pubk, NodeDefaultEndpoint _udp): id(_pubk), endpoint(_udp) {} - Node(Public _pubk, bi::udp::endpoint _udp): Node(_pubk, NodeDefaultEndpoint(_udp)) {} - - virtual NodeId const& address() const { return id; } - virtual Public const& publicKey() const { return id; } - - NodeId id; - NodeDefaultEndpoint endpoint; - }; - - /** - * NodeEntry - * @todo Type of id will become template parameter. - */ - struct NodeEntry: public Node - { - NodeEntry(Node _src, Public _pubk, NodeDefaultEndpoint _gw): Node(_pubk, _gw), distance(dist(_src.id,_pubk)) {} - NodeEntry(Node _src, Public _pubk, bi::udp::endpoint _udp): Node(_pubk, NodeDefaultEndpoint(_udp)), distance(dist(_src.id,_pubk)) {} - - const unsigned distance; ///< Node's distance from _src (see constructor). - }; - - struct NodeBucket - { - unsigned distance; - TimePoint modified; - std::list> nodes; - }; public: - NodeTable(ba::io_service& _io, KeyPair _alias, uint16_t _port = 30300); + NodeTable(ba::io_service& _io, KeyPair _alias, uint16_t _udpPort = 30303, bi::tcp::endpoint _ep = bi::tcp::endpoint()); ~NodeTable(); /// Constants for Kademlia, mostly derived from address space. @@ -121,7 +119,7 @@ public: /// Chosen constants - static unsigned const s_bucketSize = 16; ///< Denoted by k in [Kademlia]. Number of nodes stored in each bucket. + static unsigned const s_bucketSize = 16; ///< Denoted by k in [Kademlia]. Number of nodes stored in each bucket. static unsigned const s_alpha = 3; ///< Denoted by \alpha in [Kademlia]. Number of concurrent FindNode requests. /// Intervals @@ -142,6 +140,13 @@ public: protected: + struct NodeBucket + { + unsigned distance; + TimePoint modified; + std::list> nodes; + }; + /// Repeatedly sends s_alpha concurrent requests to nodes nearest to target, for nodes nearest to target, up to s_maxSteps rounds. void doFindNode(NodeId _node, unsigned _round = 0, std::shared_ptr>> _tried = std::shared_ptr>>()); @@ -193,10 +198,11 @@ protected: Mutex x_evictions; std::deque m_evictions; ///< Eviction timeouts. - + + ba::io_service& m_io; ///< Used by bucket refresh timer. std::shared_ptr m_socket; ///< Shared pointer for our UDPSocket; ASIO requires shared_ptr. NodeSocket* m_socketPtr; ///< Set to m_socket.get(). - ba::io_service& m_io; ///< Used by bucket refresh timer. + boost::asio::deadline_timer m_bucketRefreshTimer; ///< Timer which schedules and enacts bucket refresh. boost::asio::deadline_timer m_evictionCheckTimer; ///< Timer for handling node evictions. }; @@ -317,7 +323,7 @@ struct Neighbors: RLPXDatagram }; using RLPXDatagram::RLPXDatagram; - Neighbors(bi::udp::endpoint _to, std::vector> const& _nearest, unsigned _offset = 0, unsigned _limit = 0): RLPXDatagram(_to) + Neighbors(bi::udp::endpoint _to, std::vector> const& _nearest, unsigned _offset = 0, unsigned _limit = 0): RLPXDatagram(_to) { auto limit = _limit ? std::min(_nearest.size(), (size_t)(_offset + _limit)) : _nearest.size(); for (auto i = _offset; i < limit; i++) diff --git a/libp2p/Session.cpp b/libp2p/Session.cpp index 3a0dcf1cf..9a0dd2d2e 100644 --- a/libp2p/Session.cpp +++ b/libp2p/Session.cpp @@ -47,7 +47,7 @@ Session::Session(Host* _s, bi::tcp::socket _socket, bi::tcp::endpoint const& _ma m_info = PeerInfo({NodeId(), "?", m_manualEndpoint.address().to_string(), 0, std::chrono::steady_clock::duration(0), CapDescSet(), 0, map()}); } -Session::Session(Host* _s, bi::tcp::socket _socket, std::shared_ptr const& _n, bool _force): +Session::Session(Host* _s, bi::tcp::socket _socket, std::shared_ptr const& _n, bool _force): m_server(_s), m_socket(std::move(_socket)), m_node(_n), diff --git a/libp2p/Session.h b/libp2p/Session.h index cabef2cbf..869eca96e 100644 --- a/libp2p/Session.h +++ b/libp2p/Session.h @@ -39,7 +39,7 @@ namespace dev namespace p2p { -struct Node; +struct NodeInfo; /** * @brief The Session class @@ -51,7 +51,7 @@ class Session: public std::enable_shared_from_this friend class HostCapabilityFace; public: - Session(Host* _server, bi::tcp::socket _socket, std::shared_ptr const& _n, bool _force = false); + Session(Host* _server, bi::tcp::socket _socket, std::shared_ptr const& _n, bool _force = false); Session(Host* _server, bi::tcp::socket _socket, bi::tcp::endpoint const& _manual); virtual ~Session(); @@ -113,7 +113,7 @@ private: PeerInfo m_info; ///< Dynamic information about this peer. unsigned m_protocolVersion = 0; ///< The protocol version of the peer. - std::shared_ptr m_node; ///< The Node object. Might be null if we constructed using a bare address/port. + std::shared_ptr m_node; ///< The NodeInfo object. Might be null if we constructed using a bare address/port. bi::tcp::endpoint m_manualEndpoint; ///< The endpoint as specified by the constructor. bool m_force = false; ///< If true, ignore IDs being different. This could open you up to MitM attacks. bool m_dropped = false; ///< If true, we've already divested ourselves of this peer. We're just waiting for the reads & writes to fail before the shared_ptr goes OOS and the destructor kicks in. diff --git a/libp2p/UDP.h b/libp2p/UDP.h index ac4afb0b1..8e732d3ff 100644 --- a/libp2p/UDP.h +++ b/libp2p/UDP.h @@ -57,8 +57,6 @@ protected: /** * @brief RLPX Datagram which can be signed. - * @todo compact templates - * @todo make data private/functional (see UDPDatagram) */ struct RLPXDatagramFace: public UDPDatagram {