Browse Source

track pings where pubk is unknown so pongs are properly handled

correct mutex usage
notify Host of new node only when it is inserted into table
temporarily double discovery timeout until concatenated packets are supported
remove duplicate eviction-timeout entry
update node's udp endpoint when node is noted as active (fixes bug which duplicates NodeEntry)
don't note pending nodes as active, don't add pending nodes to table
change NodeEntryRemoved to NodeEntryDroppped (Remove will be permanent)
note active node after packet is processed instead of before
do not respond to unsolicited packets
store node pubk and tcp endpoint when Ping is received
fix bug in Host causing empty Peer shared-ptr to be created
cl-refactor
subtly 10 years ago
parent
commit
46692bcf32
  1. 15
      libp2p/Host.cpp
  2. 1
      libp2p/Host.h
  3. 100
      libp2p/NodeTable.cpp
  4. 22
      libp2p/NodeTable.h

15
libp2p/Host.cpp

@ -176,14 +176,11 @@ unsigned Host::protocolVersion() const
void Host::startPeerSession(Public const& _id, RLP const& _rlp, RLPXFrameIO* _io, bi::tcp::endpoint _endpoint)
{
/// Get or create Peer
shared_ptr<Peer> p;
p = m_peers[_id];
if (!p)
{
p.reset(new Peer()); // this maybe redundant
shared_ptr<Peer> p(new Peer());
if (m_peers.count(_id))
p = m_peers[_id];
else
p->id = _id;
}
p->m_lastDisconnect = NoDisconnect;
if (p->isOffline())
p->m_lastConnected = std::chrono::system_clock::now();
@ -276,9 +273,9 @@ void Host::onNodeTableEvent(NodeId const& _n, NodeTableEventType const& _e)
connect(p);
}
}
else if (_e == NodeEntryRemoved)
else if (_e == NodeEntryDropped)
{
clog(NetNote) << "p2p.host.nodeTable.events.nodeEntryRemoved " << _n;
clog(NetNote) << "p2p.host.nodeTable.events.NodeEntryDropped " << _n;
RecursiveGuard l(x_sessions);
m_peers.erase(_n);

1
libp2p/Host.h

@ -113,6 +113,7 @@ public:
CapDescs caps() const { CapDescs ret; for (auto const& i: m_capabilities) ret.push_back(i.first); return ret; }
template <class T> std::shared_ptr<T> cap() const { try { return std::static_pointer_cast<T>(m_capabilities.at(std::make_pair(T::staticName(), T::staticVersion()))); } catch (...) { return nullptr; } }
/// Add node as a peer candidate. Node is added if discovery ping is successful and table has capacity.
void addNode(NodeId const& _node, std::string const& _addr, unsigned short _tcpPort, unsigned short _udpPort);
/// Set ideal number of peers.

100
libp2p/NodeTable.cpp

@ -70,24 +70,24 @@ shared_ptr<NodeEntry> NodeTable::addNode(Public const& _pubk, bi::udp::endpoint
shared_ptr<NodeEntry> NodeTable::addNode(Node const& _node)
{
// TODO: Filter private addresses (After requirePeer() is added to Host and tests and Neighbors are updated)
// if (isPrivateAddress(_node.endpoint.udp.address()) || isPrivateAddress(_node.endpoint.tcp.address()))
// return move(shared_ptr<NodeEntry>());
// ping address if nodeid is empty
if (!_node.id)
{
m_pubkDiscoverPings[m_node.endpoint.udp.address()] = std::chrono::steady_clock::now();
PingNode p(_node.endpoint.udp, m_node.endpoint.udp.address().to_string(), m_node.endpoint.udp.port());
p.sign(m_secret);
m_socketPointer->send(p);
shared_ptr<NodeEntry> n;
return move(n);
return move(shared_ptr<NodeEntry>());
}
Guard l(x_nodes);
if (m_nodes.count(_node.id))
{
// // SECURITY: remove this in beta - it's only for lazy connections and presents an easy attack vector.
// if (m_server->m_peers.count(id) && isPrivateAddress(m_server->m_peers.at(id)->address.address()) && ep.port() != 0)
// // Update address if the node if we now have a public IP for it.
// m_server->m_peers[id]->address = ep;
return m_nodes[_node.id];
Guard ln(x_nodes);
if (m_nodes.count(_node.id))
return m_nodes[_node.id];
}
shared_ptr<NodeEntry> ret(new NodeEntry(m_node, _node.id, NodeIPEndpoint(_node.endpoint.udp, _node.endpoint.tcp)));
@ -95,12 +95,6 @@ shared_ptr<NodeEntry> NodeTable::addNode(Node const& _node)
PingNode p(_node.endpoint.udp, m_node.endpoint.udp.address().to_string(), m_node.endpoint.udp.port());
p.sign(m_secret);
m_socketPointer->send(p);
// TODO p2p: rename to p2p.nodes.pending, add p2p.nodes.add event (when pong is received)
clog(NodeTableUpdate) << "p2p.nodes.add " << _node.id.abridged();
if (m_nodeEventHandler)
m_nodeEventHandler->appendEvent(_node.id, NodeEntryAdded);
return ret;
}
@ -135,7 +129,6 @@ list<NodeEntry> NodeTable::snapshot() const
Node NodeTable::node(NodeId const& _id)
{
// TODO p2p: eloquent copy operator
Guard l(x_nodes);
if (m_nodes.count(_id))
{
@ -149,7 +142,7 @@ Node NodeTable::node(NodeId const& _id)
shared_ptr<NodeEntry> NodeTable::nodeEntry(NodeId _id)
{
Guard l(x_nodes);
return m_nodes.count(_id) ? move(m_nodes[_id]) : move(shared_ptr<NodeEntry>());
return m_nodes.count(_id) ? m_nodes[_id] : shared_ptr<NodeEntry>();
}
void NodeTable::discover(NodeId _node, unsigned _round, shared_ptr<set<shared_ptr<NodeEntry>>> _tried)
@ -191,7 +184,7 @@ void NodeTable::discover(NodeId _node, unsigned _round, shared_ptr<set<shared_pt
}
auto self(shared_from_this());
m_evictionCheckTimer.expires_from_now(boost::posix_time::milliseconds(c_reqTimeout.count()));
m_evictionCheckTimer.expires_from_now(boost::posix_time::milliseconds(c_reqTimeout.count() * 2));
m_evictionCheckTimer.async_wait([this, self, _node, _round, _tried](boost::system::error_code const& _ec)
{
if (_ec)
@ -297,8 +290,6 @@ void NodeTable::evict(shared_ptr<NodeEntry> _leastSeen, shared_ptr<NodeEntry> _n
m_evictions.push_back(EvictionTimeout(make_pair(_leastSeen->id,chrono::steady_clock::now()), _new->id));
if (m_evictions.size() == 1)
doCheckEvictions(boost::system::error_code());
m_evictions.push_back(EvictionTimeout(make_pair(_leastSeen->id,chrono::steady_clock::now()), _new->id));
}
ping(_leastSeen.get());
}
@ -307,23 +298,26 @@ void NodeTable::noteActiveNode(Public const& _pubk, bi::udp::endpoint const& _en
{
if (_pubk == m_node.address())
return;
clog(NodeTableConnect) << "Noting active node:" << _pubk.abridged() << _endpoint.address().to_string() << ":" << _endpoint.port();
shared_ptr<NodeEntry> node(addNode(_pubk, _endpoint, bi::tcp::endpoint(_endpoint.address(), _endpoint.port())));
// TODO p2p: old bug (maybe gone now) sometimes node is nullptr here
if (!!node)
shared_ptr<NodeEntry> node = nodeEntry(_pubk);
if (!!node && !node->pending)
{
clog(NodeTableConnect) << "Noting active node:" << _pubk.abridged() << _endpoint.address().to_string() << ":" << _endpoint.port();
// update udp endpoint
node->endpoint.udp.address(_endpoint.address());
node->endpoint.udp.port(_endpoint.port());
shared_ptr<NodeEntry> contested;
{
Guard l(x_state);
NodeBucket& s = bucket_UNSAFE(node.get());
s.nodes.remove_if([&node](weak_ptr<NodeEntry> n)
bool removed = false;
s.nodes.remove_if([&node, &removed](weak_ptr<NodeEntry> n)
{
if (n.lock() == node)
return true;
return false;
removed = true;
return removed;
});
if (s.nodes.size() >= s_bucketSize)
@ -335,12 +329,18 @@ void NodeTable::noteActiveNode(Public const& _pubk, bi::udp::endpoint const& _en
s.nodes.pop_front();
s.nodes.push_back(node);
s.touch();
if (!removed)
m_nodeEventHandler->appendEvent(node->id, NodeEntryAdded);
}
}
else
{
s.nodes.push_back(node);
s.touch();
if (!removed)
m_nodeEventHandler->appendEvent(node->id, NodeEntryAdded);
}
}
@ -351,19 +351,17 @@ void NodeTable::noteActiveNode(Public const& _pubk, bi::udp::endpoint const& _en
void NodeTable::dropNode(shared_ptr<NodeEntry> _n)
{
// remove from nodetable
{
Guard l(x_state);
NodeBucket& s = bucket_UNSAFE(_n.get());
s.nodes.remove_if([&_n](weak_ptr<NodeEntry> n) { return n.lock() == _n; });
}
{
Guard l(x_nodes);
m_nodes.erase(_n->id);
}
// notify host
clog(NodeTableUpdate) << "p2p.nodes.drop " << _n->id.abridged();
if (m_nodeEventHandler)
m_nodeEventHandler->appendEvent(_n->id, NodeEntryRemoved);
m_nodeEventHandler->appendEvent(_n->id, NodeEntryDropped);
}
NodeTable::NodeBucket& NodeTable::bucket_UNSAFE(NodeEntry const* _n)
@ -401,9 +399,6 @@ void NodeTable::onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytes
}
unsigned packetType = signedBytes[0];
if (packetType && packetType < 4)
noteActiveNode(nodeid, _from);
bytesConstRef rlpBytes(_packet.cropped(h256::size + Signature::size + 1));
RLP rlp(rlpBytes);
try {
@ -411,37 +406,50 @@ void NodeTable::onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytes
{
case Pong::type:
{
// clog(NodeTableMessageSummary) << "Received Pong from " << _from.address().to_string() << ":" << _from.port();
Pong in = Pong::fromBytesConstRef(_from, rlpBytes);
// whenever a pong is received, check if it's in m_evictions
Guard le(x_evictions);
bool evictionEntry = false;
for (auto it = m_evictions.begin(); it != m_evictions.end(); it++)
if (it->first.first == nodeid && it->first.second > std::chrono::steady_clock::now())
{
evictionEntry = true;
if (auto n = nodeEntry(it->second))
dropNode(n);
if (auto n = node(it->first.first))
addNode(n);
if (auto n = nodeEntry(it->first.first))
if (m_nodeEventHandler && n->pending)
n->pending = false;
it = m_evictions.erase(it);
}
// if not, check if it's known/pending or a pubk discovery ping
if (!evictionEntry)
{
if (auto n = nodeEntry(nodeid))
if (n->pending)
n->pending = false;
}
else if (m_pubkDiscoverPings.count(_from.address()))
m_pubkDiscoverPings.erase(_from.address());
else
return; // unsolicited pong; don't note node as active
break;
}
case Neighbours::type:
{
Neighbours in = Neighbours::fromBytesConstRef(_from, rlpBytes);
// clog(NodeTableMessageSummary) << "Received " << in.nodes.size() << " Neighbours from " << _from.address().to_string() << ":" << _from.port();
for (auto n: in.nodes)
noteActiveNode(n.node, bi::udp::endpoint(bi::address::from_string(n.ipAddress), n.port));
addNode(n.node, bi::udp::endpoint(bi::address::from_string(n.ipAddress), n.port), bi::tcp::endpoint(bi::address::from_string(n.ipAddress), n.port));
break;
}
case FindNode::type:
{
// clog(NodeTableMessageSummary) << "Received FindNode from " << _from.address().to_string() << ":" << _from.port();
FindNode in = FindNode::fromBytesConstRef(_from, rlpBytes);
vector<shared_ptr<NodeEntry>> nearest = nearestNodeEntries(in.target);
@ -450,6 +458,8 @@ void NodeTable::onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytes
{
Neighbours out(_from, nearest, offset, nlimit);
out.sign(m_secret);
if (out.data.size() > 1280)
clog(NetWarn) << "Sending truncated datagram, size: " << out.data.size();
m_socketPointer->send(out);
}
break;
@ -457,8 +467,8 @@ void NodeTable::onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytes
case PingNode::type:
{
// clog(NodeTableMessageSummary) << "Received PingNode from " << _from.address().to_string() << ":" << _from.port();
PingNode in = PingNode::fromBytesConstRef(_from, rlpBytes);
addNode(nodeid, _from, bi::tcp::endpoint(bi::address::from_string(in.ipAddress), in.port));
Pong p(_from);
p.echo = sha3(rlpBytes);
@ -471,6 +481,8 @@ void NodeTable::onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytes
clog(NodeTableWarn) << "Invalid Message, " << hex << packetType << ", received from " << _from.address().to_string() << ":" << dec << _from.port();
return;
}
noteActiveNode(nodeid, _from);
}
catch (...)
{

22
libp2p/NodeTable.h

@ -44,11 +44,13 @@ struct NodeEntry: public Node
NodeEntry(Node _src, Public _pubk, bi::udp::endpoint _udp);
unsigned const distance; ///< Node's distance (xor of _src as integer).
bool pending = true; ///< Node will be ignored until Pong is received
};
enum NodeTableEventType {
NodeEntryAdded,
NodeEntryRemoved
NodeEntryDropped
};
class NodeTable;
class NodeTableEventHandler
@ -101,11 +103,10 @@ inline std::ostream& operator<<(std::ostream& _out, NodeTable const& _nodeTable)
* NodeTable accepts a port for UDP and will listen to the port on all available
* interfaces.
*
*
* [Integration]
* @todo restore nodes: affects refreshbuckets
* @todo TCP endpoints
* @todo makeRequired: don't try to evict node if node isRequired.
* @todo makeRequired: exclude bucket from refresh if we have node as peer.
* @todo GC uniform 1/32 entires at 112500ms interval
*
* [Optimization]
* @todo serialize evictions per-bucket
@ -140,16 +141,16 @@ public:
/// Returns distance based on xor metric two node ids. Used by NodeEntry and NodeTable.
static unsigned distance(NodeId const& _a, NodeId const& _b) { u512 d = _a ^ _b; unsigned ret; for (ret = 0; d >>= 1; ++ret) {}; return ret; }
/// Set event handler for NodeEntryAdded and NodeEntryRemoved events.
/// Set event handler for NodeEntryAdded and NodeEntryDropped events.
void setEventHandler(NodeTableEventHandler* _handler) { m_nodeEventHandler.reset(_handler); }
/// Called by implementation which provided handler to process NodeEntryAdded/NodeEntryRemoved events. Events are coalesced by type whereby old events are ignored.
/// Called by implementation which provided handler to process NodeEntryAdded/NodeEntryDropped events. Events are coalesced by type whereby old events are ignored.
void processEvents();
/// Add node. Node will be pinged if it's not already known.
/// Add node. Node will be pinged and empty shared_ptr is returned if NodeId is uknown.
std::shared_ptr<NodeEntry> addNode(Public const& _pubk, bi::udp::endpoint const& _udp, bi::tcp::endpoint const& _tcp);
/// Add node. Node will be pinged if it's not already known.
/// Add node. Node will be pinged and empty shared_ptr is returned if node has never been seen.
std::shared_ptr<NodeEntry> addNode(Node const& _node);
/// To be called when node table is empty. Runs node discovery with m_node.id as the target in order to populate node-table.
@ -225,7 +226,7 @@ private:
/// Asynchronously drops _leastSeen node if it doesn't reply and adds _new node, otherwise _new node is thrown away.
void evict(std::shared_ptr<NodeEntry> _leastSeen, std::shared_ptr<NodeEntry> _new);
/// Called whenever activity is received from an unknown node in order to maintain node table.
/// Called whenever activity is received from a node in order to maintain node table.
void noteActiveNode(Public const& _pubk, bi::udp::endpoint const& _endpoint);
/// Used to drop node when timeout occurs or when evict() result is to keep previous node.
@ -265,6 +266,9 @@ private:
Mutex x_evictions; ///< LOCK x_nodes first if both x_nodes and x_evictions locks are required.
std::deque<EvictionTimeout> m_evictions; ///< Eviction timeouts.
Mutex x_pubkDiscoverPings; ///< LOCK x_nodes first if both x_nodes and x_pubkDiscoverPings locks are required.
std::map<bi::address, TimePoint> m_pubkDiscoverPings; ///< List of pending pings where node entry wasn't created due to unkown pubk.
ba::io_service& m_io; ///< Used by bucket refresh timer.
std::shared_ptr<NodeSocket> m_socket; ///< Shared pointer for our UDPSocket; ASIO requires shared_ptr.

Loading…
Cancel
Save