diff --git a/libp2p/Common.h b/libp2p/Common.h index 19cf447ce..219ec804b 100644 --- a/libp2p/Common.h +++ b/libp2p/Common.h @@ -169,6 +169,7 @@ struct Node NodeIPEndpoint endpoint; /// If true, node will not be removed from Node list. + // TODO: p2p implement bool required = false; virtual operator bool() const { return (bool)id; } diff --git a/libp2p/Host.cpp b/libp2p/Host.cpp index fc86cf06b..25a095ae6 100644 --- a/libp2p/Host.cpp +++ b/libp2p/Host.cpp @@ -53,7 +53,7 @@ Host::Host(std::string const& _clientVersion, NetworkPreferences const& _n, byte m_ifAddresses(Network::getInterfaceAddresses()), m_ioService(2), m_tcp4Acceptor(m_ioService), - m_alias(getNetworkAlias(_restoreNetwork)), + m_alias(networkAlias(_restoreNetwork)), m_lastPing(chrono::time_point::min()) { for (auto address: m_ifAddresses) @@ -182,7 +182,7 @@ void Host::onNodeTableEvent(NodeId const& _n, NodeTableEventType const& _e) { clog(NetNote) << "p2p.host.nodeTable.events.nodeEntryAdded " << _n; - auto n = (*m_nodeTable)[_n]; + auto n = m_nodeTable->node(_n); if (n) { RecursiveGuard l(x_sessions); @@ -195,7 +195,7 @@ void Host::onNodeTableEvent(NodeId const& _n, NodeTableEventType const& _e) } p->endpoint.tcp = n.endpoint.tcp; - // TODO: Implement similar to doFindNode. Attempt connecting to nodes + // TODO: Implement similar to discover. Attempt connecting to nodes // until ideal peer count is reached; if all nodes are tried, // repeat. Notably, this is an integrated process such that // when onNodeTableEvent occurs we should also update +/- @@ -442,9 +442,9 @@ void Host::connect(std::shared_ptr const& _p) Peer *nptr = _p.get(); { Guard l(x_pendingNodeConns); - if (m_pendingNodeConns.count(nptr)) + if (m_pendingPeerConns.count(nptr)) return; - m_pendingNodeConns.insert(nptr); + m_pendingPeerConns.insert(nptr); } clog(NetConnect) << "Attempting connection to node" << _p->id.abridged() << "@" << _p->peerEndpoint() << "from" << id().abridged(); @@ -454,25 +454,25 @@ void Host::connect(std::shared_ptr const& _p) if (ec) { clog(NetConnect) << "Connection refused to node" << _p->id.abridged() << "@" << _p->peerEndpoint() << "(" << ec.message() << ")"; - _p->lastDisconnect = TCPError; - _p->lastAttempted = std::chrono::system_clock::now(); + _p->m_lastDisconnect = TCPError; + _p->m_lastAttempted = std::chrono::system_clock::now(); } else { clog(NetConnect) << "Connected to" << _p->id.abridged() << "@" << _p->peerEndpoint(); - _p->lastConnected = std::chrono::system_clock::now(); + _p->m_lastConnected = std::chrono::system_clock::now(); auto ps = make_shared(this, std::move(*s), _p); ps->start(); } delete s; Guard l(x_pendingNodeConns); - m_pendingNodeConns.erase(nptr); + m_pendingPeerConns.erase(nptr); }); } -PeerSessionInfos Host::peers() const +PeerSessionInfos Host::peerSessionInfo() const { if (!m_run) return PeerSessionInfos(); @@ -622,7 +622,7 @@ bytes Host::saveNetwork() const // TODO: alpha: Figure out why it ever shares these ports.//p.address.port() >= 30300 && p.address.port() <= 30305 && // TODO: alpha: if/how to save private addresses // Only save peers which have connected within 2 days, with properly-advertised port and public IP address - if (chrono::system_clock::now() - p.lastConnected < chrono::seconds(3600 * 48) && p.peerEndpoint().port() > 0 && p.peerEndpoint().port() < /*49152*/32768 && p.id != id() && !isPrivateAddress(p.peerEndpoint().address())) + if (chrono::system_clock::now() - p.m_lastConnected < chrono::seconds(3600 * 48) && p.peerEndpoint().port() > 0 && p.peerEndpoint().port() < /*49152*/32768 && p.id != id() && !isPrivateAddress(p.peerEndpoint().address())) { network.appendList(10); if (p.peerEndpoint().address().is_v4()) @@ -631,15 +631,15 @@ bytes Host::saveNetwork() const network << p.peerEndpoint().address().to_v6().to_bytes(); // TODO: alpha: replace 0 with trust-state of node network << p.peerEndpoint().port() << p.id << 0 - << chrono::duration_cast(p.lastConnected.time_since_epoch()).count() - << chrono::duration_cast(p.lastAttempted.time_since_epoch()).count() - << p.failedAttempts << (unsigned)p.lastDisconnect << p.score << p.rating; + << chrono::duration_cast(p.m_lastConnected.time_since_epoch()).count() + << chrono::duration_cast(p.m_lastAttempted.time_since_epoch()).count() + << p.m_failedAttempts << (unsigned)p.m_lastDisconnect << p.m_score << p.m_rating; count++; } } } - auto state = m_nodeTable->state(); + auto state = m_nodeTable->snapshot(); state.sort(); for (auto const& s: state) { @@ -693,12 +693,12 @@ void Host::restoreNetwork(bytesConstRef _b) { shared_ptr p = make_shared(); p->id = id; - p->lastConnected = chrono::system_clock::time_point(chrono::seconds(i[4].toInt())); - p->lastAttempted = chrono::system_clock::time_point(chrono::seconds(i[5].toInt())); - p->failedAttempts = i[6].toInt(); - p->lastDisconnect = (DisconnectReason)i[7].toInt(); - p->score = (int)i[8].toInt(); - p->rating = (int)i[9].toInt(); + p->m_lastConnected = chrono::system_clock::time_point(chrono::seconds(i[4].toInt())); + p->m_lastAttempted = chrono::system_clock::time_point(chrono::seconds(i[5].toInt())); + p->m_failedAttempts = i[6].toInt(); + p->m_lastDisconnect = (DisconnectReason)i[7].toInt(); + p->m_score = (int)i[8].toInt(); + p->m_rating = (int)i[9].toInt(); p->endpoint.tcp = tcp; p->endpoint.udp = udp; m_peers[p->id] = p; @@ -708,7 +708,7 @@ void Host::restoreNetwork(bytesConstRef _b) } } -KeyPair Host::getNetworkAlias(bytesConstRef _b) +KeyPair Host::networkAlias(bytesConstRef _b) { RLP r(_b); if (r.itemCount() == 3 && r[0].isInt() && r[0].toInt() == 1) diff --git a/libp2p/Host.h b/libp2p/Host.h index cd45f3268..b24f1343c 100644 --- a/libp2p/Host.h +++ b/libp2p/Host.h @@ -79,37 +79,37 @@ public: bool isOffline() const { return !m_session.lock(); } bi::tcp::endpoint const& peerEndpoint() const { return endpoint.tcp; } - - int score = 0; ///< All time cumulative. - int rating = 0; ///< Trending. + + int m_score = 0; ///< All time cumulative. + int m_rating = 0; ///< Trending. /// Network Availability - std::chrono::system_clock::time_point lastConnected; - std::chrono::system_clock::time_point lastAttempted; - unsigned failedAttempts = 0; - DisconnectReason lastDisconnect = NoDisconnect; ///< Reason for disconnect that happened last. + std::chrono::system_clock::time_point m_lastConnected; + std::chrono::system_clock::time_point m_lastAttempted; + unsigned m_failedAttempts = 0; + DisconnectReason m_lastDisconnect = NoDisconnect; ///< Reason for disconnect that happened last. virtual bool operator<(Peer const& _p) const { if (isOffline() != _p.isOffline()) return isOffline(); else if (isOffline()) - if (lastAttempted == _p.lastAttempted) - return failedAttempts < _p.failedAttempts; + if (m_lastAttempted == _p.m_lastAttempted) + return m_failedAttempts < _p.m_failedAttempts; else - return lastAttempted < _p.lastAttempted; + return m_lastAttempted < _p.m_lastAttempted; else - if (score == _p.score) - if (rating == _p.rating) - if (failedAttempts == _p.failedAttempts) + if (m_score == _p.m_score) + if (m_rating == _p.m_rating) + if (m_failedAttempts == _p.m_failedAttempts) return id < _p.id; else - return failedAttempts < _p.failedAttempts; + return m_failedAttempts < _p.m_failedAttempts; else - return rating < _p.rating; + return m_rating < _p.m_rating; else - return score < _p.score; + return m_score < _p.m_score; } protected: @@ -121,9 +121,14 @@ using Peers = std::vector; class HostNodeTableHandler: public NodeTableEventHandler { - friend class Host; +public: HostNodeTableHandler(Host& _host); + + Host const& host() const { return m_host; } + +private: virtual void processEvent(NodeId const& _n, NodeTableEventType const& _e); + Host& m_host; }; @@ -181,7 +186,7 @@ public: void setIdealPeerCount(unsigned _n) { m_idealPeerCount = _n; } /// Get peer information. - PeerSessionInfos peers() const; + PeerSessionInfos peerSessionInfo() const; /// Get number of peers connected. size_t peerCount() const; @@ -196,7 +201,7 @@ public: bytes saveNetwork() const; // TODO: P2P this should be combined with peers into a HostStat object of some kind; coalesce data, as it's only used for status information. - Peers nodes() const { RecursiveGuard l(x_sessions); Peers ret; for (auto const& i: m_peers) ret.push_back(*i.second); return ret; } + Peers getPeers() const { RecursiveGuard l(x_sessions); Peers ret; for (auto const& i: m_peers) ret.push_back(*i.second); return ret; } void setNetworkPreferences(NetworkPreferences const& _p) { auto had = isStarted(); if (had) stop(); m_netPrefs = _p; if (had) start(); } @@ -252,7 +257,7 @@ private: virtual void doneWorking(); /// Get or create host identifier (KeyPair). - static KeyPair getNetworkAlias(bytesConstRef _b); + static KeyPair networkAlias(bytesConstRef _b); bytes m_restoreNetwork; ///< Set by constructor and used to set Host key and restore network peers & nodes. @@ -274,7 +279,7 @@ private: std::unique_ptr m_timer; ///< Timer which, when network is running, calls scheduler() every c_timerInterval ms. static const unsigned c_timerInterval = 100; ///< Interval which m_timer is run when network is connected. - std::set m_pendingNodeConns; /// Used only by connect(Peer&) to limit concurrently connecting to same node. See connect(shared_ptrconst&). + std::set m_pendingPeerConns; /// Used only by connect(Peer&) to limit concurrently connecting to same node. See connect(shared_ptrconst&). Mutex x_pendingNodeConns; bi::tcp::endpoint m_tcpPublic; ///< Our public listening endpoint. diff --git a/libp2p/NodeTable.cpp b/libp2p/NodeTable.cpp index dffa0933b..186ece5e2 100644 --- a/libp2p/NodeTable.cpp +++ b/libp2p/NodeTable.cpp @@ -24,15 +24,15 @@ using namespace std; using namespace dev; using namespace dev::p2p; -NodeEntry::NodeEntry(Node _src, Public _pubk, NodeIPEndpoint _gw): Node(_pubk, _gw), distance(NodeTable::dist(_src.id,_pubk)) {} -NodeEntry::NodeEntry(Node _src, Public _pubk, bi::udp::endpoint _udp): Node(_pubk, NodeIPEndpoint(_udp)), distance(NodeTable::dist(_src.id,_pubk)) {} +NodeEntry::NodeEntry(Node _src, Public _pubk, NodeIPEndpoint _gw): Node(_pubk, _gw), distance(NodeTable::distance(_src.id,_pubk)) {} +NodeEntry::NodeEntry(Node _src, Public _pubk, bi::udp::endpoint _udp): Node(_pubk, NodeIPEndpoint(_udp)), distance(NodeTable::distance(_src.id,_pubk)) {} NodeTable::NodeTable(ba::io_service& _io, KeyPair _alias, uint16_t _udp): m_node(Node(_alias.pub(), bi::udp::endpoint())), m_secret(_alias.sec()), m_io(_io), m_socket(new NodeSocket(m_io, *this, _udp)), - m_socketPtr(m_socket.get()), + m_socketPointer(m_socket.get()), m_bucketRefreshTimer(m_io), m_evictionCheckTimer(m_io) { @@ -42,15 +42,18 @@ NodeTable::NodeTable(ba::io_service& _io, KeyPair _alias, uint16_t _udp): m_state[i].modified = chrono::steady_clock::now() - chrono::seconds(1); } - m_socketPtr->connect(); + m_socketPointer->connect(); doRefreshBuckets(boost::system::error_code()); } NodeTable::~NodeTable() { + // Cancel scheduled tasks to ensure. m_evictionCheckTimer.cancel(); m_bucketRefreshTimer.cancel(); - m_socketPtr->disconnect(); + + // Disconnect socket so that deallocation is safe. + m_socketPointer->disconnect(); } void NodeTable::processEvents() @@ -87,14 +90,14 @@ shared_ptr NodeTable::addNode(Node const& _node) m_nodes[_node.id] = ret; PingNode p(_node.endpoint.udp, m_node.endpoint.udp.address().to_string(), m_node.endpoint.udp.port()); p.sign(m_secret); - m_socketPtr->send(p); + m_socketPointer->send(p); } return move(ret); } -void NodeTable::join() +void NodeTable::discover() { - doFindNode(m_node.id); + discover(m_node.id); } list NodeTable::nodes() const @@ -106,7 +109,7 @@ list NodeTable::nodes() const return move(nodes); } -list NodeTable::state() const +list NodeTable::snapshot() const { list ret; Guard l(x_state); @@ -116,42 +119,35 @@ list NodeTable::state() const return move(ret); } -Node NodeTable::operator[](NodeId _id) +Node NodeTable::node(NodeId _id) { Guard l(x_nodes); auto n = m_nodes[_id]; return !!n ? *n : Node(); } -shared_ptr NodeTable::getNodeEntry(NodeId _id) +shared_ptr NodeTable::nodeEntry(NodeId _id) { Guard l(x_nodes); auto n = m_nodes[_id]; return !!n ? move(n) : move(shared_ptr()); } -void NodeTable::requestNeighbours(NodeEntry const& _node, NodeId _target) const +void NodeTable::discover(NodeId _node, unsigned _round, shared_ptr>> _tried) { - FindNode p(_node.endpoint.udp, _target); - p.sign(m_secret); - m_socketPtr->send(p); -} - -void NodeTable::doFindNode(NodeId _node, unsigned _round, shared_ptr>> _tried) -{ - if (!m_socketPtr->isOpen() || _round == s_maxSteps) + if (!m_socketPointer->isOpen() || _round == s_maxSteps) return; if (_round == s_maxSteps) { - clog(NodeTableNote) << "Terminating doFindNode after " << _round << " rounds."; + clog(NodeTableNote) << "Terminating discover after " << _round << " rounds."; return; } else if(!_round && !_tried) // initialized _tried on first round _tried.reset(new set>()); - auto nearest = findNearest(_node); + auto nearest = nearestNodeEntries(_node); list> tried; for (unsigned i = 0; i < nearest.size() && tried.size() < s_alpha; i++) if (!_tried->count(nearest[i])) @@ -160,12 +156,12 @@ void NodeTable::doFindNode(NodeId _node, unsigned _round, shared_ptrendpoint.udp, _node); p.sign(m_secret); - m_socketPtr->send(p); + m_socketPointer->send(p); } if (tried.empty()) { - clog(NodeTableNote) << "Terminating doFindNode after " << _round << " rounds."; + clog(NodeTableNote) << "Terminating discover after " << _round << " rounds."; return; } @@ -181,15 +177,15 @@ void NodeTable::doFindNode(NodeId _node, unsigned _round, shared_ptr> NodeTable::findNearest(NodeId _target) +vector> NodeTable::nearestNodeEntries(NodeId _target) { // send s_alpha FindNode packets to nodes we know, closest to target static unsigned lastBin = s_bins - 1; - unsigned head = dist(m_node.id, _target); + unsigned head = distance(m_node.id, _target); unsigned tail = head == 0 ? lastBin : (head - 1) % s_bins; map>> found; @@ -204,7 +200,7 @@ vector> NodeTable::findNearest(NodeId _target) if (auto p = n.lock()) { if (count < s_bucketSize) - found[dist(_target, p->id)].push_back(p); + found[distance(_target, p->id)].push_back(p); else break; } @@ -214,7 +210,7 @@ vector> NodeTable::findNearest(NodeId _target) if (auto p = n.lock()) { if (count < s_bucketSize) - found[dist(_target, p->id)].push_back(p); + found[distance(_target, p->id)].push_back(p); else break; } @@ -231,7 +227,7 @@ vector> NodeTable::findNearest(NodeId _target) if (auto p = n.lock()) { if (count < s_bucketSize) - found[dist(_target, p->id)].push_back(p); + found[distance(_target, p->id)].push_back(p); else break; } @@ -245,7 +241,7 @@ vector> NodeTable::findNearest(NodeId _target) if (auto p = n.lock()) { if (count < s_bucketSize) - found[dist(_target, p->id)].push_back(p); + found[distance(_target, p->id)].push_back(p); else break; } @@ -263,7 +259,7 @@ void NodeTable::ping(bi::udp::endpoint _to) const { PingNode p(_to, m_node.endpoint.udp.address().to_string(), m_node.endpoint.udp.port()); p.sign(m_secret); - m_socketPtr->send(p); + m_socketPointer->send(p); } void NodeTable::ping(NodeEntry* _n) const @@ -274,65 +270,64 @@ void NodeTable::ping(NodeEntry* _n) const void NodeTable::evict(shared_ptr _leastSeen, shared_ptr _new) { - if (!m_socketPtr->isOpen()) + if (!m_socketPointer->isOpen()) return; - Guard l(x_evictions); - m_evictions.push_back(EvictionTimeout(make_pair(_leastSeen->id,chrono::steady_clock::now()), _new->id)); - if (m_evictions.size() == 1) - doCheckEvictions(boost::system::error_code()); - - m_evictions.push_back(EvictionTimeout(make_pair(_leastSeen->id,chrono::steady_clock::now()), _new->id)); + { + Guard l(x_evictions); + m_evictions.push_back(EvictionTimeout(make_pair(_leastSeen->id,chrono::steady_clock::now()), _new->id)); + if (m_evictions.size() == 1) + doCheckEvictions(boost::system::error_code()); + + m_evictions.push_back(EvictionTimeout(make_pair(_leastSeen->id,chrono::steady_clock::now()), _new->id)); + } ping(_leastSeen.get()); } -void NodeTable::noteNode(Public const& _pubk, bi::udp::endpoint const& _endpoint) +void NodeTable::noteActiveNode(Public const& _pubk, bi::udp::endpoint const& _endpoint) { if (_pubk == m_node.address()) return; shared_ptr node(addNode(_pubk, _endpoint)); - // todo: sometimes node is nullptr here + // TODO p2p: old bug (maybe gone now) sometimes node is nullptr here if (!!node) - noteNode(node); -} - -void NodeTable::noteNode(shared_ptr _n) -{ - shared_ptr contested; { - NodeBucket& s = bucket(_n.get()); - Guard l(x_state); - s.nodes.remove_if([&_n](weak_ptr n) - { - if (n.lock() == _n) - return true; - return false; - }); - - if (s.nodes.size() >= s_bucketSize) + shared_ptr contested; { - contested = s.nodes.front().lock(); - if (!contested) + Guard l(x_state); + NodeBucket& s = bucket_UNSAFE(node.get()); + s.nodes.remove_if([&node](weak_ptr n) + { + if (n.lock() == node) + return true; + return false; + }); + + if (s.nodes.size() >= s_bucketSize) { - s.nodes.pop_front(); - s.nodes.push_back(_n); + contested = s.nodes.front().lock(); + if (!contested) + { + s.nodes.pop_front(); + s.nodes.push_back(node); + } } + else + s.nodes.push_back(node); } - else - s.nodes.push_back(_n); + + if (contested) + evict(contested, node); } - - if (contested) - evict(contested, _n); } void NodeTable::dropNode(shared_ptr _n) { - NodeBucket &s = bucket(_n.get()); { Guard l(x_state); + NodeBucket& s = bucket_UNSAFE(_n.get()); s.nodes.remove_if([&_n](weak_ptr n) { return n.lock() == _n; }); } { @@ -345,7 +340,7 @@ void NodeTable::dropNode(shared_ptr _n) m_nodeEventHandler->appendEvent(_n->id, NodeEntryRemoved); } -NodeTable::NodeBucket& NodeTable::bucket(NodeEntry const* _n) +NodeTable::NodeBucket& NodeTable::bucket_UNSAFE(NodeEntry const* _n) { return m_state[_n->distance - 1]; } @@ -381,7 +376,7 @@ void NodeTable::onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytes unsigned packetType = signedBytes[0]; if (packetType && packetType < 4) - noteNode(nodeid, _from); + noteActiveNode(nodeid, _from); bytesConstRef rlpBytes(_packet.cropped(h256::size + Signature::size + 1)); RLP rlp(rlpBytes); @@ -398,10 +393,10 @@ void NodeTable::onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytes for (auto it = m_evictions.begin(); it != m_evictions.end(); it++) if (it->first.first == nodeid && it->first.second > std::chrono::steady_clock::now()) { - if (auto n = getNodeEntry(it->second)) + if (auto n = nodeEntry(it->second)) dropNode(n); - if (auto n = (*this)[it->first.first]) + if (auto n = node(it->first.first)) addNode(n); it = m_evictions.erase(it); @@ -414,7 +409,7 @@ void NodeTable::onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytes Neighbours in = Neighbours::fromBytesConstRef(_from, rlpBytes); // clog(NodeTableMessageSummary) << "Received " << in.nodes.size() << " Neighbours from " << _from.address().to_string() << ":" << _from.port(); for (auto n: in.nodes) - noteNode(n.node, bi::udp::endpoint(bi::address::from_string(n.ipAddress), n.port)); + noteActiveNode(n.node, bi::udp::endpoint(bi::address::from_string(n.ipAddress), n.port)); break; } @@ -423,13 +418,13 @@ void NodeTable::onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytes // clog(NodeTableMessageSummary) << "Received FindNode from " << _from.address().to_string() << ":" << _from.port(); FindNode in = FindNode::fromBytesConstRef(_from, rlpBytes); - vector> nearest = findNearest(in.target); - static unsigned const nlimit = (m_socketPtr->maxDatagramSize - 11) / 86; + vector> nearest = nearestNodeEntries(in.target); + static unsigned const nlimit = (m_socketPointer->maxDatagramSize - 11) / 86; for (unsigned offset = 0; offset < nearest.size(); offset += nlimit) { Neighbours out(_from, nearest, offset, nlimit); out.sign(m_secret); - m_socketPtr->send(out); + m_socketPointer->send(out); } break; } @@ -442,7 +437,7 @@ void NodeTable::onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytes Pong p(_from); p.echo = sha3(rlpBytes); p.sign(m_secret); - m_socketPtr->send(p); + m_socketPointer->send(p); break; } @@ -459,7 +454,7 @@ void NodeTable::onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytes void NodeTable::doCheckEvictions(boost::system::error_code const& _ec) { - if (_ec || !m_socketPtr->isOpen()) + if (_ec || !m_socketPointer->isOpen()) return; auto self(shared_from_this()); @@ -472,8 +467,8 @@ void NodeTable::doCheckEvictions(boost::system::error_code const& _ec) bool evictionsRemain = false; list> drop; { - Guard le(x_evictions); Guard ln(x_nodes); + Guard le(x_evictions); for (auto& e: m_evictions) if (chrono::steady_clock::now() - e.first.second > c_reqTimeout) if (auto n = m_nodes[e.second]) @@ -496,7 +491,7 @@ void NodeTable::doRefreshBuckets(boost::system::error_code const& _ec) return; clog(NodeTableNote) << "refreshing buckets"; - bool connected = m_socketPtr->isOpen(); + bool connected = m_socketPointer->isOpen(); bool refreshed = false; if (connected) { diff --git a/libp2p/NodeTable.h b/libp2p/NodeTable.h index e2ba06f4b..72f7800f5 100644 --- a/libp2p/NodeTable.h +++ b/libp2p/NodeTable.h @@ -53,7 +53,7 @@ class NodeTableEventHandler { friend class NodeTable; public: - virtual void processEvent(NodeId const& _n, NodeTableEventType const& _e) =0; + virtual void processEvent(NodeId const& _n, NodeTableEventType const& _e) = 0; protected: /// Called by NodeTable on behalf of an implementation (Host) to process new events without blocking nodetable. @@ -65,7 +65,8 @@ protected: if (!m_nodeEventHandler.size()) return; m_nodeEventHandler.unique(); - for (auto const& n: m_nodeEventHandler) events.push_back(std::make_pair(n,m_events[n])); + for (auto const& n: m_nodeEventHandler) + events.push_back(std::make_pair(n,m_events[n])); m_nodeEventHandler.clear(); m_events.clear(); } @@ -80,10 +81,17 @@ protected: std::list m_nodeEventHandler; std::map m_events; }; + +class NodeTable; +inline std::ostream& operator<<(std::ostream& _out, NodeTable const& _nodeTable); /** * NodeTable using modified kademlia for node discovery and preference. - * untouched buckets are refreshed if they have not been touched within an hour + * Node table requires an IO service, creates a socket for incoming + * UDP messages and implements a kademlia-like protocol. Node requests and + * responses are used to build a node table which can be queried to + * obtain a list of potential nodes to connect to, and, passes events to + * Host whenever a node is added or removed to/from the table. * * Thread-safety is ensured by modifying NodeEntry details via * shared_ptr replacement instead of mutating values. @@ -101,7 +109,7 @@ protected: * @todo serialize evictions per-bucket * @todo store evictions in map, unit-test eviction logic * @todo store root node in table - * @todo encapsulate doFindNode into NetworkAlgorithm (task) + * @todo encapsulate discover into NetworkAlgorithm (task) * @todo Pong to include ip:port where ping was received * @todo expiration and sha3(id) 'to' for messages which are replies (prevents replay) * @todo cache Ping and FindSelf @@ -117,33 +125,17 @@ protected: */ class NodeTable: UDPSocketEvents, public std::enable_shared_from_this { + friend std::ostream& operator<<(std::ostream& _out, NodeTable const& _nodeTable); using NodeSocket = UDPSocket; using TimePoint = std::chrono::steady_clock::time_point; - using EvictionTimeout = std::pair,NodeId>; ///< First NodeId may be evicted and replaced with second NodeId. + using EvictionTimeout = std::pair, NodeId>; ///< First NodeId may be evicted and replaced with second NodeId. public: NodeTable(ba::io_service& _io, KeyPair _alias, uint16_t _udpPort = 30303); ~NodeTable(); - /// Constants for Kademlia, derived from address space. - - static unsigned const s_addressByteSize = sizeof(NodeId); ///< Size of address type in bytes. - static unsigned const s_bits = 8 * s_addressByteSize; ///< Denoted by n in [Kademlia]. - static unsigned const s_bins = s_bits - 1; ///< Size of m_state (excludes root, which is us). - static unsigned const s_maxSteps = boost::static_log2::value; ///< Max iterations of discovery. (doFindNode) - - /// Chosen constants - - static unsigned const s_bucketSize = 16; ///< Denoted by k in [Kademlia]. Number of nodes stored in each bucket. - static unsigned const s_alpha = 3; ///< Denoted by \alpha in [Kademlia]. Number of concurrent FindNode requests. - - /// Intervals - - boost::posix_time::milliseconds const c_evictionCheckInterval = boost::posix_time::milliseconds(75); ///< Interval at which eviction timeouts are checked. - std::chrono::milliseconds const c_reqTimeout = std::chrono::milliseconds(300); ///< How long to wait for requests (evict, find iterations). - std::chrono::seconds const c_bucketRefresh = std::chrono::seconds(3600); ///< Refresh interval prevents bucket from becoming stale. [Kademlia] - - static unsigned dist(NodeId const& _a, NodeId const& _b) { u512 d = _a ^ _b; unsigned ret; for (ret = 0; d >>= 1; ++ret) {}; return ret; } + /// Returns distance based on xor metric two node ids. Used by NodeEntry and NodeTable. + static unsigned distance(NodeId const& _a, NodeId const& _b) { u512 d = _a ^ _b; unsigned ret; for (ret = 0; d >>= 1; ++ret) {}; return ret; } /// Set event handler for NodeEntryAdded and NodeEntryRemoved events. void setEventHandler(NodeTableEventHandler* _handler) { m_nodeEventHandler.reset(_handler); } @@ -157,18 +149,49 @@ public: /// Add node. Node will be pinged if it's not already known. std::shared_ptr addNode(Node const& _node); - void join(); + /// To be called when node table is empty. Runs node discovery with m_node.id as the target in order to populate node-table. + void discover(); - NodeEntry root() const { return NodeEntry(m_node, m_node.publicKey(), m_node.endpoint.udp); } + /// Returns list of node ids active in node table. std::list nodes() const; - unsigned size() const { return m_nodes.size(); } - std::list state() const; + /// Returns node count. + unsigned count() const { return m_nodes.size(); } + + /// Returns snapshot of table. + std::list snapshot() const; + + /// Returns true if node id is in node table. bool haveNode(NodeId _id) { Guard l(x_nodes); return m_nodes.count(_id); } - Node operator[](NodeId _id); - std::shared_ptr getNodeEntry(NodeId _id); + /// Returns the Node to the corresponding node id or the empty Node if that id is not found. + Node node(NodeId _id); + +#ifndef BOOST_AUTO_TEST_SUITE +private: +#else protected: +#endif + + /// Constants for Kademlia, derived from address space. + + static unsigned const s_addressByteSize = sizeof(NodeId); ///< Size of address type in bytes. + static unsigned const s_bits = 8 * s_addressByteSize; ///< Denoted by n in [Kademlia]. + static unsigned const s_bins = s_bits - 1; ///< Size of m_state (excludes root, which is us). + static unsigned const s_maxSteps = boost::static_log2::value; ///< Max iterations of discovery. (discover) + + /// Chosen constants + + static unsigned const s_bucketSize = 16; ///< Denoted by k in [Kademlia]. Number of nodes stored in each bucket. + static unsigned const s_alpha = 3; ///< Denoted by \alpha in [Kademlia]. Number of concurrent FindNode requests. + + /// Intervals + + /* todo: replace boost::posix_time; change constants to upper camelcase */ + boost::posix_time::milliseconds const c_evictionCheckInterval = boost::posix_time::milliseconds(75); ///< Interval at which eviction timeouts are checked. + std::chrono::milliseconds const c_reqTimeout = std::chrono::milliseconds(300); ///< How long to wait for requests (evict, find iterations). + std::chrono::seconds const c_bucketRefresh = std::chrono::seconds(3600); ///< Refresh interval prevents bucket from becoming stale. [Kademlia] + struct NodeBucket { unsigned distance; @@ -176,81 +199,95 @@ protected: std::list> nodes; }; - /// Repeatedly sends s_alpha concurrent requests to nodes nearest to target, for nodes nearest to target, up to s_maxSteps rounds. - void doFindNode(NodeId _node, unsigned _round = 0, std::shared_ptr>> _tried = std::shared_ptr>>()); - - /// Returns nodes nearest to target. - std::vector> findNearest(NodeId _target); - + /// Used to ping endpoint. void ping(bi::udp::endpoint _to) const; + /// Used ping known node. Used by node table when refreshing buckets and as part of eviction process (see evict). void ping(NodeEntry* _n) const; - void evict(std::shared_ptr _leastSeen, std::shared_ptr _new); + /// Returns center node entry which describes this node and used with dist() to calculate xor metric for node table nodes. + NodeEntry center() const { return NodeEntry(m_node, m_node.publicKey(), m_node.endpoint.udp); } + + /// Used by asynchronous operations to return NodeEntry which is active and managed by node table. + std::shared_ptr nodeEntry(NodeId _id); - void noteNode(Public const& _pubk, bi::udp::endpoint const& _endpoint); + /// Used to discovery nodes on network which are close to the given target. + /// Sends s_alpha concurrent requests to nodes nearest to target, for nodes nearest to target, up to s_maxSteps rounds. + void discover(NodeId _target, unsigned _round = 0, std::shared_ptr>> _tried = std::shared_ptr>>()); + + /// Returns nodes from node table which are closest to target. + std::vector> nearestNodeEntries(NodeId _target); - void noteNode(std::shared_ptr _n); + /// Asynchronously drops _leastSeen node if it doesn't reply and adds _new node, otherwise _new node is thrown away. + void evict(std::shared_ptr _leastSeen, std::shared_ptr _new); + /// Called whenever activity is received from an unknown node in order to maintain node table. + void noteActiveNode(Public const& _pubk, bi::udp::endpoint const& _endpoint); + + /// Used to drop node when timeout occurs or when evict() result is to keep previous node. void dropNode(std::shared_ptr _n); - NodeBucket& bucket(NodeEntry const* _n); + /// Returns references to bucket which corresponds to distance of node id. + /// @warning Only use the return reference locked x_state mutex. + // TODO p2p: Remove this method after removing offset-by-one functionality. + NodeBucket& bucket_UNSAFE(NodeEntry const* _n); /// General Network Events + /// Called by m_socket when packet is received. void onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytesConstRef _packet); - void onDisconnected(UDPSocketFace*) {}; + /// Called by m_socket when socket is disconnected. + void onDisconnected(UDPSocketFace*) {} /// Tasks + /// Called by evict() to ensure eviction check is scheduled to run and terminates when no evictions remain. Asynchronous. void doCheckEvictions(boost::system::error_code const& _ec); - - void doRefreshBuckets(boost::system::error_code const& _ec); -#ifndef BOOST_AUTO_TEST_SUITE -private: -#else -protected: -#endif - /// Sends FindNeighbor packet. See doFindNode. - void requestNeighbours(NodeEntry const& _node, NodeId _target) const; + /// Purges and pings nodes for any buckets which haven't been touched for c_bucketRefresh seconds. + void doRefreshBuckets(boost::system::error_code const& _ec); - std::unique_ptr m_nodeEventHandler; ///< Event handler for node events. + std::unique_ptr m_nodeEventHandler; ///< Event handler for node events. Node m_node; ///< This node. Secret m_secret; ///< This nodes secret key. - mutable Mutex x_nodes; ///< Mutable for thread-safe copy in nodes() const. + mutable Mutex x_nodes; ///< LOCK x_state first if both locks are required. Mutable for thread-safe copy in nodes() const. std::map> m_nodes; ///< Nodes - mutable Mutex x_state; + mutable Mutex x_state; ///< LOCK x_state first if both x_nodes and x_state locks are required. std::array m_state; ///< State of p2p node network. - Mutex x_evictions; + Mutex x_evictions; ///< LOCK x_nodes first if both x_nodes and x_evictions locks are required. std::deque m_evictions; ///< Eviction timeouts. ba::io_service& m_io; ///< Used by bucket refresh timer. std::shared_ptr m_socket; ///< Shared pointer for our UDPSocket; ASIO requires shared_ptr. - NodeSocket* m_socketPtr; ///< Set to m_socket.get(). + NodeSocket* m_socketPointer; ///< Set to m_socket.get(). Socket is created in constructor and disconnected in destructor to ensure access to pointer is safe. boost::asio::deadline_timer m_bucketRefreshTimer; ///< Timer which schedules and enacts bucket refresh. boost::asio::deadline_timer m_evictionCheckTimer; ///< Timer for handling node evictions. }; - + inline std::ostream& operator<<(std::ostream& _out, NodeTable const& _nodeTable) { - _out << _nodeTable.root().address() << "\t" << "0\t" << _nodeTable.root().endpoint.udp.address() << ":" << _nodeTable.root().endpoint.udp.port() << std::endl; - auto s = _nodeTable.state(); + _out << _nodeTable.center().address() << "\t" << "0\t" << _nodeTable.center().endpoint.udp.address() << ":" << _nodeTable.center().endpoint.udp.port() << std::endl; + auto s = _nodeTable.snapshot(); for (auto n: s) _out << n.address() << "\t" << n.distance << "\t" << n.endpoint.udp.address() << ":" << n.endpoint.udp.port() << std::endl; return _out; } /** - * Ping packet: Check if node is alive. + * Ping packet: Sent to check if node is alive. * PingNode is cached and regenerated after expiration - t, where t is timeout. * + * Ping is used to implement evict. When a new node is seen for + * a given bucket which is full, the least-responsive node is pinged. + * If the pinged node doesn't respond, then it is removed and the new + * node is inserted. + * * RLP Encoded Items: 3 * Minimum Encoded Size: 18 bytes * Maximum Encoded Size: bytes // todo after u128 addresses @@ -260,11 +297,6 @@ inline std::ostream& operator<<(std::ostream& _out, NodeTable const& _nodeTable) * port: Our port. * expiration: Triggers regeneration of packet. May also provide control over synchronization. * - * Ping is used to implement evict. When a new node is seen for - * a given bucket which is full, the least-responsive node is pinged. - * If the pinged node doesn't respond then it is removed and the new - * node is inserted. - * * @todo uint128_t for ip address (<->integer ipv4/6, asio-address, asio-endpoint) * */ @@ -285,7 +317,7 @@ struct PingNode: RLPXDatagram }; /** - * Pong packet: response to ping + * Pong packet: Sent in response to ping * * RLP Encoded Items: 2 * Minimum Encoded Size: 33 bytes @@ -365,7 +397,7 @@ struct Neighbours: RLPXDatagram } static const uint8_t type = 4; - std::list nodes; + std::vector nodes; unsigned expiration = 1; void streamRLP(RLPStream& _s) const { _s.appendList(2); _s.appendList(nodes.size()); for (auto& n: nodes) n.streamRLP(_s); _s << expiration; } diff --git a/libp2p/Session.cpp b/libp2p/Session.cpp index 4a6621d09..1d1d69b7d 100644 --- a/libp2p/Session.cpp +++ b/libp2p/Session.cpp @@ -48,7 +48,7 @@ Session::Session(Host* _s, bi::tcp::socket _socket, std::shared_ptr const& Session::~Session() { - m_peer->lastConnected = m_peer->lastAttempted - chrono::seconds(1); + m_peer->m_lastConnected = m_peer->m_lastAttempted - chrono::seconds(1); // Read-chain finished for one reason or another. for (auto& i: m_capabilities) @@ -75,14 +75,14 @@ void Session::addRating(unsigned _r) { if (m_peer) { - m_peer->rating += _r; - m_peer->score += _r; + m_peer->m_rating += _r; + m_peer->m_score += _r; } } int Session::rating() const { - return m_peer->rating; + return m_peer->m_rating; } template vector randomSelection(vector const& _t, unsigned _n) @@ -206,7 +206,7 @@ bool Session::interpret(RLP const& _r) } if (m_peer->isOffline()) - m_peer->lastConnected = chrono::system_clock::now(); + m_peer->m_lastConnected = chrono::system_clock::now(); if (m_protocolVersion != m_server->protocolVersion()) { @@ -446,13 +446,13 @@ void Session::drop(DisconnectReason _reason) if (m_peer) { - if (_reason != m_peer->lastDisconnect || _reason == NoDisconnect || _reason == ClientQuit || _reason == DisconnectRequested) - m_peer->failedAttempts = 0; - m_peer->lastDisconnect = _reason; + if (_reason != m_peer->m_lastDisconnect || _reason == NoDisconnect || _reason == ClientQuit || _reason == DisconnectRequested) + m_peer->m_failedAttempts = 0; + m_peer->m_lastDisconnect = _reason; if (_reason == BadProtocol) { - m_peer->rating /= 2; - m_peer->score /= 2; + m_peer->m_rating /= 2; + m_peer->m_score /= 2; } } m_dropped = true; diff --git a/libp2p/UDP.h b/libp2p/UDP.h index bf9a6a372..5c3b9362f 100644 --- a/libp2p/UDP.h +++ b/libp2p/UDP.h @@ -64,12 +64,12 @@ struct RLPXDatagramFace: public UDPDatagram static uint64_t futureFromEpoch(std::chrono::seconds _sec) { return std::chrono::duration_cast((std::chrono::system_clock::now() + _sec).time_since_epoch()).count(); } static Public authenticate(bytesConstRef _sig, bytesConstRef _rlp); - virtual uint8_t packetType() =0; + virtual uint8_t packetType() = 0; RLPXDatagramFace(bi::udp::endpoint const& _ep): UDPDatagram(_ep) {} virtual h256 sign(Secret const& _from); - virtual void streamRLP(RLPStream&) const =0; - virtual void interpretRLP(bytesConstRef _bytes) =0; + virtual void streamRLP(RLPStream&) const = 0; + virtual void interpretRLP(bytesConstRef _bytes) = 0; }; template diff --git a/libwebthree/WebThree.cpp b/libwebthree/WebThree.cpp index 7daec9693..3075108d1 100644 --- a/libwebthree/WebThree.cpp +++ b/libwebthree/WebThree.cpp @@ -77,7 +77,7 @@ void WebThreeDirect::setNetworkPreferences(p2p::NetworkPreferences const& _n) std::vector WebThreeDirect::peers() { - return m_net.peers(); + return m_net.peerSessionInfo(); } size_t WebThreeDirect::peerCount() const diff --git a/libwebthree/WebThree.h b/libwebthree/WebThree.h index f99eed5cf..1bce0820c 100644 --- a/libwebthree/WebThree.h +++ b/libwebthree/WebThree.h @@ -145,7 +145,7 @@ public: p2p::NodeId id() const override { return m_net.id(); } /// Gets the nodes. - p2p::Peers nodes() const override { return m_net.nodes(); } + p2p::Peers nodes() const override { return m_net.getPeers(); } /// Start the network subsystem. void startNetwork() override { m_net.start(); } diff --git a/test/net.cpp b/test/net.cpp index 7f30ed03b..5a7e56d23 100644 --- a/test/net.cpp +++ b/test/net.cpp @@ -87,7 +87,7 @@ struct TestNodeTable: public NodeTable bi::address ourIp = bi::address::from_string("127.0.0.1"); for (auto& n: _testNodes) if (_count--) - noteNode(n.first.pub(), bi::udp::endpoint(ourIp, n.second)); + noteActiveNode(n.first.pub(), bi::udp::endpoint(ourIp, n.second)); else break; } @@ -182,7 +182,7 @@ BOOST_AUTO_TEST_CASE(kademlia) // Not yet a 'real' test. TestNodeTableHost node(8); node.start(); - node.nodeTable->join(); // ideally, joining with empty node table logs warning we can check for + node.nodeTable->discover(); // ideally, joining with empty node table logs warning we can check for node.setup(); node.populate(); clog << "NodeTable:\n" << *node.nodeTable.get() << endl; @@ -199,11 +199,11 @@ BOOST_AUTO_TEST_CASE(kademlia) node.populate(1); clog << "NodeTable:\n" << *node.nodeTable.get() << endl; - node.nodeTable->join(); + node.nodeTable->discover(); this_thread::sleep_for(chrono::milliseconds(2000)); clog << "NodeTable:\n" << *node.nodeTable.get() << endl; - BOOST_REQUIRE_EQUAL(node.nodeTable->size(), 8); + BOOST_REQUIRE_EQUAL(node.nodeTable->count(), 8); auto netNodes = node.nodeTable->nodes(); netNodes.sort(); diff --git a/test/whisperTopic.cpp b/test/whisperTopic.cpp index 1dcc6b9d6..e59a3f289 100644 --- a/test/whisperTopic.cpp +++ b/test/whisperTopic.cpp @@ -111,7 +111,7 @@ BOOST_AUTO_TEST_CASE(forwarding) setThreadName("listener"); // Host must be configured not to share peers. - Host ph("Listner", NetworkPreferences(50303, "", false, true)); + Host ph("Listner", NetworkPreferences(30303, "", false, true)); ph.setIdealPeerCount(0); auto wh = ph.registerCapability(new WhisperHost()); ph.start(); @@ -145,7 +145,7 @@ BOOST_AUTO_TEST_CASE(forwarding) this_thread::sleep_for(chrono::milliseconds(50)); // Host must be configured not to share peers. - Host ph("Forwarder", NetworkPreferences(50305, "", false, true)); + Host ph("Forwarder", NetworkPreferences(30305, "", false, true)); ph.setIdealPeerCount(0); auto wh = ph.registerCapability(new WhisperHost()); this_thread::sleep_for(chrono::milliseconds(500)); @@ -153,7 +153,7 @@ BOOST_AUTO_TEST_CASE(forwarding) fwderid = ph.id(); this_thread::sleep_for(chrono::milliseconds(500)); - ph.addNode(phid, "127.0.0.1", 50303, 50303); + ph.addNode(phid, "127.0.0.1", 30303, 30303); startedForwarder = true; @@ -174,13 +174,13 @@ BOOST_AUTO_TEST_CASE(forwarding) while (!startedForwarder) this_thread::sleep_for(chrono::milliseconds(50)); - Host ph("Sender", NetworkPreferences(50300, "", false, true)); + Host ph("Sender", NetworkPreferences(30300, "", false, true)); ph.setIdealPeerCount(0); shared_ptr wh = ph.registerCapability(new WhisperHost()); this_thread::sleep_for(chrono::milliseconds(500)); ph.start(); this_thread::sleep_for(chrono::milliseconds(500)); - ph.addNode(fwderid, "127.0.0.1", 50305, 50305); + ph.addNode(fwderid, "127.0.0.1", 30305, 30305); KeyPair us = KeyPair::create(); wh->post(us.sec(), RLPStream().append(1).out(), BuildTopic("test")); @@ -210,14 +210,14 @@ BOOST_AUTO_TEST_CASE(asyncforwarding) setThreadName("forwarder"); // Host must be configured not to share peers. - Host ph("Forwarder", NetworkPreferences(50305, "", false, true)); + Host ph("Forwarder", NetworkPreferences(30305, "", false, true)); ph.setIdealPeerCount(0); auto wh = ph.registerCapability(new WhisperHost()); this_thread::sleep_for(chrono::milliseconds(500)); ph.start(); this_thread::sleep_for(chrono::milliseconds(500)); -// ph.addNode("127.0.0.1", 50303, 50303); +// ph.addNode("127.0.0.1", 30303, 30303); startedForwarder = true; @@ -239,13 +239,13 @@ BOOST_AUTO_TEST_CASE(asyncforwarding) this_thread::sleep_for(chrono::milliseconds(50)); { - Host ph("Sender", NetworkPreferences(50300, "", false, true)); + Host ph("Sender", NetworkPreferences(30300, "", false, true)); ph.setIdealPeerCount(0); shared_ptr wh = ph.registerCapability(new WhisperHost()); this_thread::sleep_for(chrono::milliseconds(500)); ph.start(); this_thread::sleep_for(chrono::milliseconds(500)); -// ph.addNode("127.0.0.1", 50305, 50305); +// ph.addNode("127.0.0.1", 30305, 30305); KeyPair us = KeyPair::create(); wh->post(us.sec(), RLPStream().append(1).out(), BuildTopic("test")); @@ -253,13 +253,13 @@ BOOST_AUTO_TEST_CASE(asyncforwarding) } { - Host ph("Listener", NetworkPreferences(50300, "", false, true)); + Host ph("Listener", NetworkPreferences(30300, "", false, true)); ph.setIdealPeerCount(0); shared_ptr wh = ph.registerCapability(new WhisperHost()); this_thread::sleep_for(chrono::milliseconds(500)); ph.start(); this_thread::sleep_for(chrono::milliseconds(500)); -// ph.addNode("127.0.0.1", 50305, 50305); +// ph.addNode("127.0.0.1", 30305, 30305); /// Only interested in odd packets auto w = wh->installWatch(BuildTopicMask("test"));