From 46692bcf3235b054a6a1870e7c46ece81b04b239 Mon Sep 17 00:00:00 2001 From: subtly Date: Wed, 18 Mar 2015 15:07:41 +0100 Subject: [PATCH 1/4] track pings where pubk is unknown so pongs are properly handled correct mutex usage notify Host of new node only when it is inserted into table temporarily double discovery timeout until concatenated packets are supported remove duplicate eviction-timeout entry update node's udp endpoint when node is noted as active (fixes bug which duplicates NodeEntry) don't note pending nodes as active, don't add pending nodes to table change NodeEntryRemoved to NodeEntryDroppped (Remove will be permanent) note active node after packet is processed instead of before do not respond to unsolicited packets store node pubk and tcp endpoint when Ping is received fix bug in Host causing empty Peer shared-ptr to be created --- libp2p/Host.cpp | 15 +++---- libp2p/Host.h | 1 + libp2p/NodeTable.cpp | 100 ++++++++++++++++++++++++------------------- libp2p/NodeTable.h | 22 ++++++---- 4 files changed, 76 insertions(+), 62 deletions(-) diff --git a/libp2p/Host.cpp b/libp2p/Host.cpp index 19df5ee2e..9ceac334f 100644 --- a/libp2p/Host.cpp +++ b/libp2p/Host.cpp @@ -176,14 +176,11 @@ unsigned Host::protocolVersion() const void Host::startPeerSession(Public const& _id, RLP const& _rlp, RLPXFrameIO* _io, bi::tcp::endpoint _endpoint) { - /// Get or create Peer - shared_ptr p; - p = m_peers[_id]; - if (!p) - { - p.reset(new Peer()); // this maybe redundant + shared_ptr p(new Peer()); + if (m_peers.count(_id)) + p = m_peers[_id]; + else p->id = _id; - } p->m_lastDisconnect = NoDisconnect; if (p->isOffline()) p->m_lastConnected = std::chrono::system_clock::now(); @@ -276,9 +273,9 @@ void Host::onNodeTableEvent(NodeId const& _n, NodeTableEventType const& _e) connect(p); } } - else if (_e == NodeEntryRemoved) + else if (_e == NodeEntryDropped) { - clog(NetNote) << "p2p.host.nodeTable.events.nodeEntryRemoved " << _n; + clog(NetNote) << "p2p.host.nodeTable.events.NodeEntryDropped " << _n; RecursiveGuard l(x_sessions); m_peers.erase(_n); diff --git a/libp2p/Host.h b/libp2p/Host.h index 219316c58..ff27807a7 100644 --- a/libp2p/Host.h +++ b/libp2p/Host.h @@ -113,6 +113,7 @@ public: CapDescs caps() const { CapDescs ret; for (auto const& i: m_capabilities) ret.push_back(i.first); return ret; } template std::shared_ptr cap() const { try { return std::static_pointer_cast(m_capabilities.at(std::make_pair(T::staticName(), T::staticVersion()))); } catch (...) { return nullptr; } } + /// Add node as a peer candidate. Node is added if discovery ping is successful and table has capacity. void addNode(NodeId const& _node, std::string const& _addr, unsigned short _tcpPort, unsigned short _udpPort); /// Set ideal number of peers. diff --git a/libp2p/NodeTable.cpp b/libp2p/NodeTable.cpp index 08bb0624a..dbd0eb06d 100644 --- a/libp2p/NodeTable.cpp +++ b/libp2p/NodeTable.cpp @@ -70,24 +70,24 @@ shared_ptr NodeTable::addNode(Public const& _pubk, bi::udp::endpoint shared_ptr NodeTable::addNode(Node const& _node) { + // TODO: Filter private addresses (After requirePeer() is added to Host and tests and Neighbors are updated) +// if (isPrivateAddress(_node.endpoint.udp.address()) || isPrivateAddress(_node.endpoint.tcp.address())) +// return move(shared_ptr()); + // ping address if nodeid is empty if (!_node.id) { + m_pubkDiscoverPings[m_node.endpoint.udp.address()] = std::chrono::steady_clock::now(); PingNode p(_node.endpoint.udp, m_node.endpoint.udp.address().to_string(), m_node.endpoint.udp.port()); p.sign(m_secret); m_socketPointer->send(p); - shared_ptr n; - return move(n); + return move(shared_ptr()); } - Guard l(x_nodes); - if (m_nodes.count(_node.id)) { -// // SECURITY: remove this in beta - it's only for lazy connections and presents an easy attack vector. -// if (m_server->m_peers.count(id) && isPrivateAddress(m_server->m_peers.at(id)->address.address()) && ep.port() != 0) -// // Update address if the node if we now have a public IP for it. -// m_server->m_peers[id]->address = ep; - return m_nodes[_node.id]; + Guard ln(x_nodes); + if (m_nodes.count(_node.id)) + return m_nodes[_node.id]; } shared_ptr ret(new NodeEntry(m_node, _node.id, NodeIPEndpoint(_node.endpoint.udp, _node.endpoint.tcp))); @@ -95,12 +95,6 @@ shared_ptr NodeTable::addNode(Node const& _node) PingNode p(_node.endpoint.udp, m_node.endpoint.udp.address().to_string(), m_node.endpoint.udp.port()); p.sign(m_secret); m_socketPointer->send(p); - - // TODO p2p: rename to p2p.nodes.pending, add p2p.nodes.add event (when pong is received) - clog(NodeTableUpdate) << "p2p.nodes.add " << _node.id.abridged(); - if (m_nodeEventHandler) - m_nodeEventHandler->appendEvent(_node.id, NodeEntryAdded); - return ret; } @@ -135,7 +129,6 @@ list NodeTable::snapshot() const Node NodeTable::node(NodeId const& _id) { - // TODO p2p: eloquent copy operator Guard l(x_nodes); if (m_nodes.count(_id)) { @@ -149,7 +142,7 @@ Node NodeTable::node(NodeId const& _id) shared_ptr NodeTable::nodeEntry(NodeId _id) { Guard l(x_nodes); - return m_nodes.count(_id) ? move(m_nodes[_id]) : move(shared_ptr()); + return m_nodes.count(_id) ? m_nodes[_id] : shared_ptr(); } void NodeTable::discover(NodeId _node, unsigned _round, shared_ptr>> _tried) @@ -191,7 +184,7 @@ void NodeTable::discover(NodeId _node, unsigned _round, shared_ptr _leastSeen, shared_ptr _n m_evictions.push_back(EvictionTimeout(make_pair(_leastSeen->id,chrono::steady_clock::now()), _new->id)); if (m_evictions.size() == 1) doCheckEvictions(boost::system::error_code()); - - m_evictions.push_back(EvictionTimeout(make_pair(_leastSeen->id,chrono::steady_clock::now()), _new->id)); } ping(_leastSeen.get()); } @@ -307,23 +298,26 @@ void NodeTable::noteActiveNode(Public const& _pubk, bi::udp::endpoint const& _en { if (_pubk == m_node.address()) return; - - clog(NodeTableConnect) << "Noting active node:" << _pubk.abridged() << _endpoint.address().to_string() << ":" << _endpoint.port(); - shared_ptr node(addNode(_pubk, _endpoint, bi::tcp::endpoint(_endpoint.address(), _endpoint.port()))); - - // TODO p2p: old bug (maybe gone now) sometimes node is nullptr here - if (!!node) + shared_ptr node = nodeEntry(_pubk); + if (!!node && !node->pending) { + clog(NodeTableConnect) << "Noting active node:" << _pubk.abridged() << _endpoint.address().to_string() << ":" << _endpoint.port(); + + // update udp endpoint + node->endpoint.udp.address(_endpoint.address()); + node->endpoint.udp.port(_endpoint.port()); + shared_ptr contested; { Guard l(x_state); NodeBucket& s = bucket_UNSAFE(node.get()); - s.nodes.remove_if([&node](weak_ptr n) + bool removed = false; + s.nodes.remove_if([&node, &removed](weak_ptr n) { if (n.lock() == node) - return true; - return false; + removed = true; + return removed; }); if (s.nodes.size() >= s_bucketSize) @@ -335,12 +329,18 @@ void NodeTable::noteActiveNode(Public const& _pubk, bi::udp::endpoint const& _en s.nodes.pop_front(); s.nodes.push_back(node); s.touch(); + + if (!removed) + m_nodeEventHandler->appendEvent(node->id, NodeEntryAdded); } } else { s.nodes.push_back(node); s.touch(); + + if (!removed) + m_nodeEventHandler->appendEvent(node->id, NodeEntryAdded); } } @@ -351,19 +351,17 @@ void NodeTable::noteActiveNode(Public const& _pubk, bi::udp::endpoint const& _en void NodeTable::dropNode(shared_ptr _n) { + // remove from nodetable { Guard l(x_state); NodeBucket& s = bucket_UNSAFE(_n.get()); s.nodes.remove_if([&_n](weak_ptr n) { return n.lock() == _n; }); } - { - Guard l(x_nodes); - m_nodes.erase(_n->id); - } + // notify host clog(NodeTableUpdate) << "p2p.nodes.drop " << _n->id.abridged(); if (m_nodeEventHandler) - m_nodeEventHandler->appendEvent(_n->id, NodeEntryRemoved); + m_nodeEventHandler->appendEvent(_n->id, NodeEntryDropped); } NodeTable::NodeBucket& NodeTable::bucket_UNSAFE(NodeEntry const* _n) @@ -401,9 +399,6 @@ void NodeTable::onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytes } unsigned packetType = signedBytes[0]; - if (packetType && packetType < 4) - noteActiveNode(nodeid, _from); - bytesConstRef rlpBytes(_packet.cropped(h256::size + Signature::size + 1)); RLP rlp(rlpBytes); try { @@ -411,37 +406,50 @@ void NodeTable::onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytes { case Pong::type: { -// clog(NodeTableMessageSummary) << "Received Pong from " << _from.address().to_string() << ":" << _from.port(); Pong in = Pong::fromBytesConstRef(_from, rlpBytes); // whenever a pong is received, check if it's in m_evictions Guard le(x_evictions); + bool evictionEntry = false; for (auto it = m_evictions.begin(); it != m_evictions.end(); it++) if (it->first.first == nodeid && it->first.second > std::chrono::steady_clock::now()) { + evictionEntry = true; if (auto n = nodeEntry(it->second)) dropNode(n); - if (auto n = node(it->first.first)) - addNode(n); + if (auto n = nodeEntry(it->first.first)) + if (m_nodeEventHandler && n->pending) + n->pending = false; it = m_evictions.erase(it); } + + // if not, check if it's known/pending or a pubk discovery ping + if (!evictionEntry) + { + if (auto n = nodeEntry(nodeid)) + if (n->pending) + n->pending = false; + } + else if (m_pubkDiscoverPings.count(_from.address())) + m_pubkDiscoverPings.erase(_from.address()); + else + return; // unsolicited pong; don't note node as active + break; } case Neighbours::type: { Neighbours in = Neighbours::fromBytesConstRef(_from, rlpBytes); -// clog(NodeTableMessageSummary) << "Received " << in.nodes.size() << " Neighbours from " << _from.address().to_string() << ":" << _from.port(); for (auto n: in.nodes) - noteActiveNode(n.node, bi::udp::endpoint(bi::address::from_string(n.ipAddress), n.port)); + addNode(n.node, bi::udp::endpoint(bi::address::from_string(n.ipAddress), n.port), bi::tcp::endpoint(bi::address::from_string(n.ipAddress), n.port)); break; } case FindNode::type: { -// clog(NodeTableMessageSummary) << "Received FindNode from " << _from.address().to_string() << ":" << _from.port(); FindNode in = FindNode::fromBytesConstRef(_from, rlpBytes); vector> nearest = nearestNodeEntries(in.target); @@ -450,6 +458,8 @@ void NodeTable::onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytes { Neighbours out(_from, nearest, offset, nlimit); out.sign(m_secret); + if (out.data.size() > 1280) + clog(NetWarn) << "Sending truncated datagram, size: " << out.data.size(); m_socketPointer->send(out); } break; @@ -457,8 +467,8 @@ void NodeTable::onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytes case PingNode::type: { -// clog(NodeTableMessageSummary) << "Received PingNode from " << _from.address().to_string() << ":" << _from.port(); PingNode in = PingNode::fromBytesConstRef(_from, rlpBytes); + addNode(nodeid, _from, bi::tcp::endpoint(bi::address::from_string(in.ipAddress), in.port)); Pong p(_from); p.echo = sha3(rlpBytes); @@ -471,6 +481,8 @@ void NodeTable::onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytes clog(NodeTableWarn) << "Invalid Message, " << hex << packetType << ", received from " << _from.address().to_string() << ":" << dec << _from.port(); return; } + + noteActiveNode(nodeid, _from); } catch (...) { diff --git a/libp2p/NodeTable.h b/libp2p/NodeTable.h index 9717b77a3..a97326501 100644 --- a/libp2p/NodeTable.h +++ b/libp2p/NodeTable.h @@ -44,11 +44,13 @@ struct NodeEntry: public Node NodeEntry(Node _src, Public _pubk, bi::udp::endpoint _udp); unsigned const distance; ///< Node's distance (xor of _src as integer). + + bool pending = true; ///< Node will be ignored until Pong is received }; enum NodeTableEventType { NodeEntryAdded, - NodeEntryRemoved + NodeEntryDropped }; class NodeTable; class NodeTableEventHandler @@ -101,11 +103,10 @@ inline std::ostream& operator<<(std::ostream& _out, NodeTable const& _nodeTable) * NodeTable accepts a port for UDP and will listen to the port on all available * interfaces. * + * * [Integration] - * @todo restore nodes: affects refreshbuckets * @todo TCP endpoints - * @todo makeRequired: don't try to evict node if node isRequired. - * @todo makeRequired: exclude bucket from refresh if we have node as peer. + * @todo GC uniform 1/32 entires at 112500ms interval * * [Optimization] * @todo serialize evictions per-bucket @@ -140,16 +141,16 @@ public: /// Returns distance based on xor metric two node ids. Used by NodeEntry and NodeTable. static unsigned distance(NodeId const& _a, NodeId const& _b) { u512 d = _a ^ _b; unsigned ret; for (ret = 0; d >>= 1; ++ret) {}; return ret; } - /// Set event handler for NodeEntryAdded and NodeEntryRemoved events. + /// Set event handler for NodeEntryAdded and NodeEntryDropped events. void setEventHandler(NodeTableEventHandler* _handler) { m_nodeEventHandler.reset(_handler); } - /// Called by implementation which provided handler to process NodeEntryAdded/NodeEntryRemoved events. Events are coalesced by type whereby old events are ignored. + /// Called by implementation which provided handler to process NodeEntryAdded/NodeEntryDropped events. Events are coalesced by type whereby old events are ignored. void processEvents(); - /// Add node. Node will be pinged if it's not already known. + /// Add node. Node will be pinged and empty shared_ptr is returned if NodeId is uknown. std::shared_ptr addNode(Public const& _pubk, bi::udp::endpoint const& _udp, bi::tcp::endpoint const& _tcp); - /// Add node. Node will be pinged if it's not already known. + /// Add node. Node will be pinged and empty shared_ptr is returned if node has never been seen. std::shared_ptr addNode(Node const& _node); /// To be called when node table is empty. Runs node discovery with m_node.id as the target in order to populate node-table. @@ -225,7 +226,7 @@ private: /// Asynchronously drops _leastSeen node if it doesn't reply and adds _new node, otherwise _new node is thrown away. void evict(std::shared_ptr _leastSeen, std::shared_ptr _new); - /// Called whenever activity is received from an unknown node in order to maintain node table. + /// Called whenever activity is received from a node in order to maintain node table. void noteActiveNode(Public const& _pubk, bi::udp::endpoint const& _endpoint); /// Used to drop node when timeout occurs or when evict() result is to keep previous node. @@ -265,6 +266,9 @@ private: Mutex x_evictions; ///< LOCK x_nodes first if both x_nodes and x_evictions locks are required. std::deque m_evictions; ///< Eviction timeouts. + + Mutex x_pubkDiscoverPings; ///< LOCK x_nodes first if both x_nodes and x_pubkDiscoverPings locks are required. + std::map m_pubkDiscoverPings; ///< List of pending pings where node entry wasn't created due to unkown pubk. ba::io_service& m_io; ///< Used by bucket refresh timer. std::shared_ptr m_socket; ///< Shared pointer for our UDPSocket; ASIO requires shared_ptr. From 29cce7b80d6a5651a99ad39398c46e7258c65c2a Mon Sep 17 00:00:00 2001 From: subtly Date: Wed, 18 Mar 2015 15:10:52 +0100 Subject: [PATCH 2/4] remove commented code --- libp2p/NodeTable.cpp | 4 ---- 1 file changed, 4 deletions(-) diff --git a/libp2p/NodeTable.cpp b/libp2p/NodeTable.cpp index dbd0eb06d..fa00e9ce2 100644 --- a/libp2p/NodeTable.cpp +++ b/libp2p/NodeTable.cpp @@ -70,10 +70,6 @@ shared_ptr NodeTable::addNode(Public const& _pubk, bi::udp::endpoint shared_ptr NodeTable::addNode(Node const& _node) { - // TODO: Filter private addresses (After requirePeer() is added to Host and tests and Neighbors are updated) -// if (isPrivateAddress(_node.endpoint.udp.address()) || isPrivateAddress(_node.endpoint.tcp.address())) -// return move(shared_ptr()); - // ping address if nodeid is empty if (!_node.id) { From f5e03f5550c99c060c514359022b0caf1b7b4cca Mon Sep 17 00:00:00 2001 From: subtly Date: Wed, 18 Mar 2015 17:14:37 +0100 Subject: [PATCH 3/4] code review --- libp2p/Host.cpp | 11 +++++++---- libp2p/NodeTable.cpp | 2 +- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/libp2p/Host.cpp b/libp2p/Host.cpp index 9ceac334f..1d54f4afe 100644 --- a/libp2p/Host.cpp +++ b/libp2p/Host.cpp @@ -176,11 +176,14 @@ unsigned Host::protocolVersion() const void Host::startPeerSession(Public const& _id, RLP const& _rlp, RLPXFrameIO* _io, bi::tcp::endpoint _endpoint) { - shared_ptr p(new Peer()); - if (m_peers.count(_id)) - p = m_peers[_id]; - else + shared_ptr p; + if (!m_peers.count(_id)) + { + p.reset(new Peer()); p->id = _id; + } + else + p = m_peers[_id]; p->m_lastDisconnect = NoDisconnect; if (p->isOffline()) p->m_lastConnected = std::chrono::system_clock::now(); diff --git a/libp2p/NodeTable.cpp b/libp2p/NodeTable.cpp index fa00e9ce2..47aefe63c 100644 --- a/libp2p/NodeTable.cpp +++ b/libp2p/NodeTable.cpp @@ -309,7 +309,7 @@ void NodeTable::noteActiveNode(Public const& _pubk, bi::udp::endpoint const& _en Guard l(x_state); NodeBucket& s = bucket_UNSAFE(node.get()); bool removed = false; - s.nodes.remove_if([&node, &removed](weak_ptr n) + s.nodes.remove_if([&node, &removed](weak_ptr const& n) { if (n.lock() == node) removed = true; From eaa5908a3dcba5a69eb7254255ebeb0e152582fa Mon Sep 17 00:00:00 2001 From: subtly Date: Wed, 18 Mar 2015 17:22:02 +0100 Subject: [PATCH 4/4] code review. collapse if statement. --- libp2p/NodeTable.cpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/libp2p/NodeTable.cpp b/libp2p/NodeTable.cpp index 47aefe63c..ccfbeaba4 100644 --- a/libp2p/NodeTable.cpp +++ b/libp2p/NodeTable.cpp @@ -425,8 +425,7 @@ void NodeTable::onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytes if (!evictionEntry) { if (auto n = nodeEntry(nodeid)) - if (n->pending) - n->pending = false; + n->pending = false; } else if (m_pubkDiscoverPings.count(_from.address())) m_pubkDiscoverPings.erase(_from.address());