diff --git a/libp2p/Host.cpp b/libp2p/Host.cpp index 19df5ee2e..9ceac334f 100644 --- a/libp2p/Host.cpp +++ b/libp2p/Host.cpp @@ -176,14 +176,11 @@ unsigned Host::protocolVersion() const void Host::startPeerSession(Public const& _id, RLP const& _rlp, RLPXFrameIO* _io, bi::tcp::endpoint _endpoint) { - /// Get or create Peer - shared_ptr p; - p = m_peers[_id]; - if (!p) - { - p.reset(new Peer()); // this maybe redundant + shared_ptr p(new Peer()); + if (m_peers.count(_id)) + p = m_peers[_id]; + else p->id = _id; - } p->m_lastDisconnect = NoDisconnect; if (p->isOffline()) p->m_lastConnected = std::chrono::system_clock::now(); @@ -276,9 +273,9 @@ void Host::onNodeTableEvent(NodeId const& _n, NodeTableEventType const& _e) connect(p); } } - else if (_e == NodeEntryRemoved) + else if (_e == NodeEntryDropped) { - clog(NetNote) << "p2p.host.nodeTable.events.nodeEntryRemoved " << _n; + clog(NetNote) << "p2p.host.nodeTable.events.NodeEntryDropped " << _n; RecursiveGuard l(x_sessions); m_peers.erase(_n); diff --git a/libp2p/Host.h b/libp2p/Host.h index 219316c58..ff27807a7 100644 --- a/libp2p/Host.h +++ b/libp2p/Host.h @@ -113,6 +113,7 @@ public: CapDescs caps() const { CapDescs ret; for (auto const& i: m_capabilities) ret.push_back(i.first); return ret; } template std::shared_ptr cap() const { try { return std::static_pointer_cast(m_capabilities.at(std::make_pair(T::staticName(), T::staticVersion()))); } catch (...) { return nullptr; } } + /// Add node as a peer candidate. Node is added if discovery ping is successful and table has capacity. void addNode(NodeId const& _node, std::string const& _addr, unsigned short _tcpPort, unsigned short _udpPort); /// Set ideal number of peers. diff --git a/libp2p/NodeTable.cpp b/libp2p/NodeTable.cpp index 08bb0624a..dbd0eb06d 100644 --- a/libp2p/NodeTable.cpp +++ b/libp2p/NodeTable.cpp @@ -70,24 +70,24 @@ shared_ptr NodeTable::addNode(Public const& _pubk, bi::udp::endpoint shared_ptr NodeTable::addNode(Node const& _node) { + // TODO: Filter private addresses (After requirePeer() is added to Host and tests and Neighbors are updated) +// if (isPrivateAddress(_node.endpoint.udp.address()) || isPrivateAddress(_node.endpoint.tcp.address())) +// return move(shared_ptr()); + // ping address if nodeid is empty if (!_node.id) { + m_pubkDiscoverPings[m_node.endpoint.udp.address()] = std::chrono::steady_clock::now(); PingNode p(_node.endpoint.udp, m_node.endpoint.udp.address().to_string(), m_node.endpoint.udp.port()); p.sign(m_secret); m_socketPointer->send(p); - shared_ptr n; - return move(n); + return move(shared_ptr()); } - Guard l(x_nodes); - if (m_nodes.count(_node.id)) { -// // SECURITY: remove this in beta - it's only for lazy connections and presents an easy attack vector. -// if (m_server->m_peers.count(id) && isPrivateAddress(m_server->m_peers.at(id)->address.address()) && ep.port() != 0) -// // Update address if the node if we now have a public IP for it. -// m_server->m_peers[id]->address = ep; - return m_nodes[_node.id]; + Guard ln(x_nodes); + if (m_nodes.count(_node.id)) + return m_nodes[_node.id]; } shared_ptr ret(new NodeEntry(m_node, _node.id, NodeIPEndpoint(_node.endpoint.udp, _node.endpoint.tcp))); @@ -95,12 +95,6 @@ shared_ptr NodeTable::addNode(Node const& _node) PingNode p(_node.endpoint.udp, m_node.endpoint.udp.address().to_string(), m_node.endpoint.udp.port()); p.sign(m_secret); m_socketPointer->send(p); - - // TODO p2p: rename to p2p.nodes.pending, add p2p.nodes.add event (when pong is received) - clog(NodeTableUpdate) << "p2p.nodes.add " << _node.id.abridged(); - if (m_nodeEventHandler) - m_nodeEventHandler->appendEvent(_node.id, NodeEntryAdded); - return ret; } @@ -135,7 +129,6 @@ list NodeTable::snapshot() const Node NodeTable::node(NodeId const& _id) { - // TODO p2p: eloquent copy operator Guard l(x_nodes); if (m_nodes.count(_id)) { @@ -149,7 +142,7 @@ Node NodeTable::node(NodeId const& _id) shared_ptr NodeTable::nodeEntry(NodeId _id) { Guard l(x_nodes); - return m_nodes.count(_id) ? move(m_nodes[_id]) : move(shared_ptr()); + return m_nodes.count(_id) ? m_nodes[_id] : shared_ptr(); } void NodeTable::discover(NodeId _node, unsigned _round, shared_ptr>> _tried) @@ -191,7 +184,7 @@ void NodeTable::discover(NodeId _node, unsigned _round, shared_ptr _leastSeen, shared_ptr _n m_evictions.push_back(EvictionTimeout(make_pair(_leastSeen->id,chrono::steady_clock::now()), _new->id)); if (m_evictions.size() == 1) doCheckEvictions(boost::system::error_code()); - - m_evictions.push_back(EvictionTimeout(make_pair(_leastSeen->id,chrono::steady_clock::now()), _new->id)); } ping(_leastSeen.get()); } @@ -307,23 +298,26 @@ void NodeTable::noteActiveNode(Public const& _pubk, bi::udp::endpoint const& _en { if (_pubk == m_node.address()) return; - - clog(NodeTableConnect) << "Noting active node:" << _pubk.abridged() << _endpoint.address().to_string() << ":" << _endpoint.port(); - shared_ptr node(addNode(_pubk, _endpoint, bi::tcp::endpoint(_endpoint.address(), _endpoint.port()))); - - // TODO p2p: old bug (maybe gone now) sometimes node is nullptr here - if (!!node) + shared_ptr node = nodeEntry(_pubk); + if (!!node && !node->pending) { + clog(NodeTableConnect) << "Noting active node:" << _pubk.abridged() << _endpoint.address().to_string() << ":" << _endpoint.port(); + + // update udp endpoint + node->endpoint.udp.address(_endpoint.address()); + node->endpoint.udp.port(_endpoint.port()); + shared_ptr contested; { Guard l(x_state); NodeBucket& s = bucket_UNSAFE(node.get()); - s.nodes.remove_if([&node](weak_ptr n) + bool removed = false; + s.nodes.remove_if([&node, &removed](weak_ptr n) { if (n.lock() == node) - return true; - return false; + removed = true; + return removed; }); if (s.nodes.size() >= s_bucketSize) @@ -335,12 +329,18 @@ void NodeTable::noteActiveNode(Public const& _pubk, bi::udp::endpoint const& _en s.nodes.pop_front(); s.nodes.push_back(node); s.touch(); + + if (!removed) + m_nodeEventHandler->appendEvent(node->id, NodeEntryAdded); } } else { s.nodes.push_back(node); s.touch(); + + if (!removed) + m_nodeEventHandler->appendEvent(node->id, NodeEntryAdded); } } @@ -351,19 +351,17 @@ void NodeTable::noteActiveNode(Public const& _pubk, bi::udp::endpoint const& _en void NodeTable::dropNode(shared_ptr _n) { + // remove from nodetable { Guard l(x_state); NodeBucket& s = bucket_UNSAFE(_n.get()); s.nodes.remove_if([&_n](weak_ptr n) { return n.lock() == _n; }); } - { - Guard l(x_nodes); - m_nodes.erase(_n->id); - } + // notify host clog(NodeTableUpdate) << "p2p.nodes.drop " << _n->id.abridged(); if (m_nodeEventHandler) - m_nodeEventHandler->appendEvent(_n->id, NodeEntryRemoved); + m_nodeEventHandler->appendEvent(_n->id, NodeEntryDropped); } NodeTable::NodeBucket& NodeTable::bucket_UNSAFE(NodeEntry const* _n) @@ -401,9 +399,6 @@ void NodeTable::onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytes } unsigned packetType = signedBytes[0]; - if (packetType && packetType < 4) - noteActiveNode(nodeid, _from); - bytesConstRef rlpBytes(_packet.cropped(h256::size + Signature::size + 1)); RLP rlp(rlpBytes); try { @@ -411,37 +406,50 @@ void NodeTable::onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytes { case Pong::type: { -// clog(NodeTableMessageSummary) << "Received Pong from " << _from.address().to_string() << ":" << _from.port(); Pong in = Pong::fromBytesConstRef(_from, rlpBytes); // whenever a pong is received, check if it's in m_evictions Guard le(x_evictions); + bool evictionEntry = false; for (auto it = m_evictions.begin(); it != m_evictions.end(); it++) if (it->first.first == nodeid && it->first.second > std::chrono::steady_clock::now()) { + evictionEntry = true; if (auto n = nodeEntry(it->second)) dropNode(n); - if (auto n = node(it->first.first)) - addNode(n); + if (auto n = nodeEntry(it->first.first)) + if (m_nodeEventHandler && n->pending) + n->pending = false; it = m_evictions.erase(it); } + + // if not, check if it's known/pending or a pubk discovery ping + if (!evictionEntry) + { + if (auto n = nodeEntry(nodeid)) + if (n->pending) + n->pending = false; + } + else if (m_pubkDiscoverPings.count(_from.address())) + m_pubkDiscoverPings.erase(_from.address()); + else + return; // unsolicited pong; don't note node as active + break; } case Neighbours::type: { Neighbours in = Neighbours::fromBytesConstRef(_from, rlpBytes); -// clog(NodeTableMessageSummary) << "Received " << in.nodes.size() << " Neighbours from " << _from.address().to_string() << ":" << _from.port(); for (auto n: in.nodes) - noteActiveNode(n.node, bi::udp::endpoint(bi::address::from_string(n.ipAddress), n.port)); + addNode(n.node, bi::udp::endpoint(bi::address::from_string(n.ipAddress), n.port), bi::tcp::endpoint(bi::address::from_string(n.ipAddress), n.port)); break; } case FindNode::type: { -// clog(NodeTableMessageSummary) << "Received FindNode from " << _from.address().to_string() << ":" << _from.port(); FindNode in = FindNode::fromBytesConstRef(_from, rlpBytes); vector> nearest = nearestNodeEntries(in.target); @@ -450,6 +458,8 @@ void NodeTable::onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytes { Neighbours out(_from, nearest, offset, nlimit); out.sign(m_secret); + if (out.data.size() > 1280) + clog(NetWarn) << "Sending truncated datagram, size: " << out.data.size(); m_socketPointer->send(out); } break; @@ -457,8 +467,8 @@ void NodeTable::onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytes case PingNode::type: { -// clog(NodeTableMessageSummary) << "Received PingNode from " << _from.address().to_string() << ":" << _from.port(); PingNode in = PingNode::fromBytesConstRef(_from, rlpBytes); + addNode(nodeid, _from, bi::tcp::endpoint(bi::address::from_string(in.ipAddress), in.port)); Pong p(_from); p.echo = sha3(rlpBytes); @@ -471,6 +481,8 @@ void NodeTable::onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytes clog(NodeTableWarn) << "Invalid Message, " << hex << packetType << ", received from " << _from.address().to_string() << ":" << dec << _from.port(); return; } + + noteActiveNode(nodeid, _from); } catch (...) { diff --git a/libp2p/NodeTable.h b/libp2p/NodeTable.h index 9717b77a3..a97326501 100644 --- a/libp2p/NodeTable.h +++ b/libp2p/NodeTable.h @@ -44,11 +44,13 @@ struct NodeEntry: public Node NodeEntry(Node _src, Public _pubk, bi::udp::endpoint _udp); unsigned const distance; ///< Node's distance (xor of _src as integer). + + bool pending = true; ///< Node will be ignored until Pong is received }; enum NodeTableEventType { NodeEntryAdded, - NodeEntryRemoved + NodeEntryDropped }; class NodeTable; class NodeTableEventHandler @@ -101,11 +103,10 @@ inline std::ostream& operator<<(std::ostream& _out, NodeTable const& _nodeTable) * NodeTable accepts a port for UDP and will listen to the port on all available * interfaces. * + * * [Integration] - * @todo restore nodes: affects refreshbuckets * @todo TCP endpoints - * @todo makeRequired: don't try to evict node if node isRequired. - * @todo makeRequired: exclude bucket from refresh if we have node as peer. + * @todo GC uniform 1/32 entires at 112500ms interval * * [Optimization] * @todo serialize evictions per-bucket @@ -140,16 +141,16 @@ public: /// Returns distance based on xor metric two node ids. Used by NodeEntry and NodeTable. static unsigned distance(NodeId const& _a, NodeId const& _b) { u512 d = _a ^ _b; unsigned ret; for (ret = 0; d >>= 1; ++ret) {}; return ret; } - /// Set event handler for NodeEntryAdded and NodeEntryRemoved events. + /// Set event handler for NodeEntryAdded and NodeEntryDropped events. void setEventHandler(NodeTableEventHandler* _handler) { m_nodeEventHandler.reset(_handler); } - /// Called by implementation which provided handler to process NodeEntryAdded/NodeEntryRemoved events. Events are coalesced by type whereby old events are ignored. + /// Called by implementation which provided handler to process NodeEntryAdded/NodeEntryDropped events. Events are coalesced by type whereby old events are ignored. void processEvents(); - /// Add node. Node will be pinged if it's not already known. + /// Add node. Node will be pinged and empty shared_ptr is returned if NodeId is uknown. std::shared_ptr addNode(Public const& _pubk, bi::udp::endpoint const& _udp, bi::tcp::endpoint const& _tcp); - /// Add node. Node will be pinged if it's not already known. + /// Add node. Node will be pinged and empty shared_ptr is returned if node has never been seen. std::shared_ptr addNode(Node const& _node); /// To be called when node table is empty. Runs node discovery with m_node.id as the target in order to populate node-table. @@ -225,7 +226,7 @@ private: /// Asynchronously drops _leastSeen node if it doesn't reply and adds _new node, otherwise _new node is thrown away. void evict(std::shared_ptr _leastSeen, std::shared_ptr _new); - /// Called whenever activity is received from an unknown node in order to maintain node table. + /// Called whenever activity is received from a node in order to maintain node table. void noteActiveNode(Public const& _pubk, bi::udp::endpoint const& _endpoint); /// Used to drop node when timeout occurs or when evict() result is to keep previous node. @@ -265,6 +266,9 @@ private: Mutex x_evictions; ///< LOCK x_nodes first if both x_nodes and x_evictions locks are required. std::deque m_evictions; ///< Eviction timeouts. + + Mutex x_pubkDiscoverPings; ///< LOCK x_nodes first if both x_nodes and x_pubkDiscoverPings locks are required. + std::map m_pubkDiscoverPings; ///< List of pending pings where node entry wasn't created due to unkown pubk. ba::io_service& m_io; ///< Used by bucket refresh timer. std::shared_ptr m_socket; ///< Shared pointer for our UDPSocket; ASIO requires shared_ptr.