Browse Source

Merge pull request #1347 from ethereum/p2p

p2p: Bug fixes and critical discovery improvements
cl-refactor
Gav Wood 10 years ago
parent
commit
29da3c0a6c
  1. 12
      libp2p/Host.cpp
  2. 1
      libp2p/Host.h
  3. 93
      libp2p/NodeTable.cpp
  4. 22
      libp2p/NodeTable.h

12
libp2p/Host.cpp

@ -177,14 +177,14 @@ unsigned Host::protocolVersion() const
void Host::startPeerSession(Public const& _id, RLP const& _rlp, RLPXFrameIO* _io, bi::tcp::endpoint _endpoint) void Host::startPeerSession(Public const& _id, RLP const& _rlp, RLPXFrameIO* _io, bi::tcp::endpoint _endpoint)
{ {
/// Get or create Peer
shared_ptr<Peer> p; shared_ptr<Peer> p;
p = m_peers[_id]; if (!m_peers.count(_id))
if (!p)
{ {
p.reset(new Peer()); // this maybe redundant p.reset(new Peer());
p->id = _id; p->id = _id;
} }
else
p = m_peers[_id];
p->m_lastDisconnect = NoDisconnect; p->m_lastDisconnect = NoDisconnect;
if (p->isOffline()) if (p->isOffline())
p->m_lastConnected = std::chrono::system_clock::now(); p->m_lastConnected = std::chrono::system_clock::now();
@ -277,9 +277,9 @@ void Host::onNodeTableEvent(NodeId const& _n, NodeTableEventType const& _e)
connect(p); connect(p);
} }
} }
else if (_e == NodeEntryRemoved) else if (_e == NodeEntryDropped)
{ {
clog(NetNote) << "p2p.host.nodeTable.events.nodeEntryRemoved " << _n; clog(NetNote) << "p2p.host.nodeTable.events.NodeEntryDropped " << _n;
RecursiveGuard l(x_sessions); RecursiveGuard l(x_sessions);
m_peers.erase(_n); m_peers.erase(_n);

1
libp2p/Host.h

@ -113,6 +113,7 @@ public:
CapDescs caps() const { CapDescs ret; for (auto const& i: m_capabilities) ret.push_back(i.first); return ret; } CapDescs caps() const { CapDescs ret; for (auto const& i: m_capabilities) ret.push_back(i.first); return ret; }
template <class T> std::shared_ptr<T> cap() const { try { return std::static_pointer_cast<T>(m_capabilities.at(std::make_pair(T::staticName(), T::staticVersion()))); } catch (...) { return nullptr; } } template <class T> std::shared_ptr<T> cap() const { try { return std::static_pointer_cast<T>(m_capabilities.at(std::make_pair(T::staticName(), T::staticVersion()))); } catch (...) { return nullptr; } }
/// Add node as a peer candidate. Node is added if discovery ping is successful and table has capacity.
void addNode(NodeId const& _node, std::string const& _addr, unsigned short _tcpPort, unsigned short _udpPort); void addNode(NodeId const& _node, std::string const& _addr, unsigned short _tcpPort, unsigned short _udpPort);
/// Set ideal number of peers. /// Set ideal number of peers.

93
libp2p/NodeTable.cpp

@ -73,21 +73,17 @@ shared_ptr<NodeEntry> NodeTable::addNode(Node const& _node)
// ping address if nodeid is empty // ping address if nodeid is empty
if (!_node.id) if (!_node.id)
{ {
m_pubkDiscoverPings[m_node.endpoint.udp.address()] = std::chrono::steady_clock::now();
PingNode p(_node.endpoint.udp, m_node.endpoint.udp.address().to_string(), m_node.endpoint.udp.port()); PingNode p(_node.endpoint.udp, m_node.endpoint.udp.address().to_string(), m_node.endpoint.udp.port());
p.sign(m_secret); p.sign(m_secret);
m_socketPointer->send(p); m_socketPointer->send(p);
shared_ptr<NodeEntry> n; return move(shared_ptr<NodeEntry>());
return move(n);
} }
Guard l(x_nodes);
if (m_nodes.count(_node.id))
{ {
// // SECURITY: remove this in beta - it's only for lazy connections and presents an easy attack vector. Guard ln(x_nodes);
// if (m_server->m_peers.count(id) && isPrivateAddress(m_server->m_peers.at(id)->address.address()) && ep.port() != 0) if (m_nodes.count(_node.id))
// // Update address if the node if we now have a public IP for it. return m_nodes[_node.id];
// m_server->m_peers[id]->address = ep;
return m_nodes[_node.id];
} }
shared_ptr<NodeEntry> ret(new NodeEntry(m_node, _node.id, NodeIPEndpoint(_node.endpoint.udp, _node.endpoint.tcp))); shared_ptr<NodeEntry> ret(new NodeEntry(m_node, _node.id, NodeIPEndpoint(_node.endpoint.udp, _node.endpoint.tcp)));
@ -95,12 +91,6 @@ shared_ptr<NodeEntry> NodeTable::addNode(Node const& _node)
PingNode p(_node.endpoint.udp, m_node.endpoint.udp.address().to_string(), m_node.endpoint.udp.port()); PingNode p(_node.endpoint.udp, m_node.endpoint.udp.address().to_string(), m_node.endpoint.udp.port());
p.sign(m_secret); p.sign(m_secret);
m_socketPointer->send(p); m_socketPointer->send(p);
// TODO p2p: rename to p2p.nodes.pending, add p2p.nodes.add event (when pong is received)
clog(NodeTableUpdate) << "p2p.nodes.add " << _node.id.abridged();
if (m_nodeEventHandler)
m_nodeEventHandler->appendEvent(_node.id, NodeEntryAdded);
return ret; return ret;
} }
@ -135,7 +125,6 @@ list<NodeEntry> NodeTable::snapshot() const
Node NodeTable::node(NodeId const& _id) Node NodeTable::node(NodeId const& _id)
{ {
// TODO p2p: eloquent copy operator
Guard l(x_nodes); Guard l(x_nodes);
if (m_nodes.count(_id)) if (m_nodes.count(_id))
{ {
@ -149,7 +138,7 @@ Node NodeTable::node(NodeId const& _id)
shared_ptr<NodeEntry> NodeTable::nodeEntry(NodeId _id) shared_ptr<NodeEntry> NodeTable::nodeEntry(NodeId _id)
{ {
Guard l(x_nodes); Guard l(x_nodes);
return m_nodes.count(_id) ? move(m_nodes[_id]) : move(shared_ptr<NodeEntry>()); return m_nodes.count(_id) ? m_nodes[_id] : shared_ptr<NodeEntry>();
} }
void NodeTable::discover(NodeId _node, unsigned _round, shared_ptr<set<shared_ptr<NodeEntry>>> _tried) void NodeTable::discover(NodeId _node, unsigned _round, shared_ptr<set<shared_ptr<NodeEntry>>> _tried)
@ -191,7 +180,7 @@ void NodeTable::discover(NodeId _node, unsigned _round, shared_ptr<set<shared_pt
} }
auto self(shared_from_this()); auto self(shared_from_this());
m_evictionCheckTimer.expires_from_now(boost::posix_time::milliseconds(c_reqTimeout.count())); m_evictionCheckTimer.expires_from_now(boost::posix_time::milliseconds(c_reqTimeout.count() * 2));
m_evictionCheckTimer.async_wait([this, self, _node, _round, _tried](boost::system::error_code const& _ec) m_evictionCheckTimer.async_wait([this, self, _node, _round, _tried](boost::system::error_code const& _ec)
{ {
if (_ec) if (_ec)
@ -297,8 +286,6 @@ void NodeTable::evict(shared_ptr<NodeEntry> _leastSeen, shared_ptr<NodeEntry> _n
m_evictions.push_back(EvictionTimeout(make_pair(_leastSeen->id,chrono::steady_clock::now()), _new->id)); m_evictions.push_back(EvictionTimeout(make_pair(_leastSeen->id,chrono::steady_clock::now()), _new->id));
if (m_evictions.size() == 1) if (m_evictions.size() == 1)
doCheckEvictions(boost::system::error_code()); doCheckEvictions(boost::system::error_code());
m_evictions.push_back(EvictionTimeout(make_pair(_leastSeen->id,chrono::steady_clock::now()), _new->id));
} }
ping(_leastSeen.get()); ping(_leastSeen.get());
} }
@ -308,22 +295,25 @@ void NodeTable::noteActiveNode(Public const& _pubk, bi::udp::endpoint const& _en
if (_pubk == m_node.address()) if (_pubk == m_node.address())
return; return;
clog(NodeTableConnect) << "Noting active node:" << _pubk.abridged() << _endpoint.address().to_string() << ":" << _endpoint.port(); shared_ptr<NodeEntry> node = nodeEntry(_pubk);
if (!!node && !node->pending)
{
clog(NodeTableConnect) << "Noting active node:" << _pubk.abridged() << _endpoint.address().to_string() << ":" << _endpoint.port();
shared_ptr<NodeEntry> node(addNode(_pubk, _endpoint, bi::tcp::endpoint(_endpoint.address(), _endpoint.port()))); // update udp endpoint
node->endpoint.udp.address(_endpoint.address());
node->endpoint.udp.port(_endpoint.port());
// TODO p2p: old bug (maybe gone now) sometimes node is nullptr here
if (!!node)
{
shared_ptr<NodeEntry> contested; shared_ptr<NodeEntry> contested;
{ {
Guard l(x_state); Guard l(x_state);
NodeBucket& s = bucket_UNSAFE(node.get()); NodeBucket& s = bucket_UNSAFE(node.get());
s.nodes.remove_if([&node](weak_ptr<NodeEntry> n) bool removed = false;
s.nodes.remove_if([&node, &removed](weak_ptr<NodeEntry> const& n)
{ {
if (n.lock() == node) if (n.lock() == node)
return true; removed = true;
return false; return removed;
}); });
if (s.nodes.size() >= s_bucketSize) if (s.nodes.size() >= s_bucketSize)
@ -335,12 +325,18 @@ void NodeTable::noteActiveNode(Public const& _pubk, bi::udp::endpoint const& _en
s.nodes.pop_front(); s.nodes.pop_front();
s.nodes.push_back(node); s.nodes.push_back(node);
s.touch(); s.touch();
if (!removed)
m_nodeEventHandler->appendEvent(node->id, NodeEntryAdded);
} }
} }
else else
{ {
s.nodes.push_back(node); s.nodes.push_back(node);
s.touch(); s.touch();
if (!removed)
m_nodeEventHandler->appendEvent(node->id, NodeEntryAdded);
} }
} }
@ -351,19 +347,17 @@ void NodeTable::noteActiveNode(Public const& _pubk, bi::udp::endpoint const& _en
void NodeTable::dropNode(shared_ptr<NodeEntry> _n) void NodeTable::dropNode(shared_ptr<NodeEntry> _n)
{ {
// remove from nodetable
{ {
Guard l(x_state); Guard l(x_state);
NodeBucket& s = bucket_UNSAFE(_n.get()); NodeBucket& s = bucket_UNSAFE(_n.get());
s.nodes.remove_if([&_n](weak_ptr<NodeEntry> n) { return n.lock() == _n; }); s.nodes.remove_if([&_n](weak_ptr<NodeEntry> n) { return n.lock() == _n; });
} }
{
Guard l(x_nodes);
m_nodes.erase(_n->id);
}
// notify host
clog(NodeTableUpdate) << "p2p.nodes.drop " << _n->id.abridged(); clog(NodeTableUpdate) << "p2p.nodes.drop " << _n->id.abridged();
if (m_nodeEventHandler) if (m_nodeEventHandler)
m_nodeEventHandler->appendEvent(_n->id, NodeEntryRemoved); m_nodeEventHandler->appendEvent(_n->id, NodeEntryDropped);
} }
NodeTable::NodeBucket& NodeTable::bucket_UNSAFE(NodeEntry const* _n) NodeTable::NodeBucket& NodeTable::bucket_UNSAFE(NodeEntry const* _n)
@ -401,9 +395,6 @@ void NodeTable::onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytes
} }
unsigned packetType = signedBytes[0]; unsigned packetType = signedBytes[0];
if (packetType && packetType < 4)
noteActiveNode(nodeid, _from);
bytesConstRef rlpBytes(_packet.cropped(h256::size + Signature::size + 1)); bytesConstRef rlpBytes(_packet.cropped(h256::size + Signature::size + 1));
RLP rlp(rlpBytes); RLP rlp(rlpBytes);
try { try {
@ -411,37 +402,49 @@ void NodeTable::onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytes
{ {
case Pong::type: case Pong::type:
{ {
// clog(NodeTableMessageSummary) << "Received Pong from " << _from.address().to_string() << ":" << _from.port();
Pong in = Pong::fromBytesConstRef(_from, rlpBytes); Pong in = Pong::fromBytesConstRef(_from, rlpBytes);
// whenever a pong is received, check if it's in m_evictions // whenever a pong is received, check if it's in m_evictions
Guard le(x_evictions); Guard le(x_evictions);
bool evictionEntry = false;
for (auto it = m_evictions.begin(); it != m_evictions.end(); it++) for (auto it = m_evictions.begin(); it != m_evictions.end(); it++)
if (it->first.first == nodeid && it->first.second > std::chrono::steady_clock::now()) if (it->first.first == nodeid && it->first.second > std::chrono::steady_clock::now())
{ {
evictionEntry = true;
if (auto n = nodeEntry(it->second)) if (auto n = nodeEntry(it->second))
dropNode(n); dropNode(n);
if (auto n = node(it->first.first)) if (auto n = nodeEntry(it->first.first))
addNode(n); if (m_nodeEventHandler && n->pending)
n->pending = false;
it = m_evictions.erase(it); it = m_evictions.erase(it);
} }
// if not, check if it's known/pending or a pubk discovery ping
if (!evictionEntry)
{
if (auto n = nodeEntry(nodeid))
n->pending = false;
}
else if (m_pubkDiscoverPings.count(_from.address()))
m_pubkDiscoverPings.erase(_from.address());
else
return; // unsolicited pong; don't note node as active
break; break;
} }
case Neighbours::type: case Neighbours::type:
{ {
Neighbours in = Neighbours::fromBytesConstRef(_from, rlpBytes); Neighbours in = Neighbours::fromBytesConstRef(_from, rlpBytes);
// clog(NodeTableMessageSummary) << "Received " << in.nodes.size() << " Neighbours from " << _from.address().to_string() << ":" << _from.port();
for (auto n: in.nodes) for (auto n: in.nodes)
noteActiveNode(n.node, bi::udp::endpoint(bi::address::from_string(n.ipAddress), n.port)); addNode(n.node, bi::udp::endpoint(bi::address::from_string(n.ipAddress), n.port), bi::tcp::endpoint(bi::address::from_string(n.ipAddress), n.port));
break; break;
} }
case FindNode::type: case FindNode::type:
{ {
// clog(NodeTableMessageSummary) << "Received FindNode from " << _from.address().to_string() << ":" << _from.port();
FindNode in = FindNode::fromBytesConstRef(_from, rlpBytes); FindNode in = FindNode::fromBytesConstRef(_from, rlpBytes);
vector<shared_ptr<NodeEntry>> nearest = nearestNodeEntries(in.target); vector<shared_ptr<NodeEntry>> nearest = nearestNodeEntries(in.target);
@ -450,6 +453,8 @@ void NodeTable::onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytes
{ {
Neighbours out(_from, nearest, offset, nlimit); Neighbours out(_from, nearest, offset, nlimit);
out.sign(m_secret); out.sign(m_secret);
if (out.data.size() > 1280)
clog(NetWarn) << "Sending truncated datagram, size: " << out.data.size();
m_socketPointer->send(out); m_socketPointer->send(out);
} }
break; break;
@ -457,8 +462,8 @@ void NodeTable::onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytes
case PingNode::type: case PingNode::type:
{ {
// clog(NodeTableMessageSummary) << "Received PingNode from " << _from.address().to_string() << ":" << _from.port();
PingNode in = PingNode::fromBytesConstRef(_from, rlpBytes); PingNode in = PingNode::fromBytesConstRef(_from, rlpBytes);
addNode(nodeid, _from, bi::tcp::endpoint(bi::address::from_string(in.ipAddress), in.port));
Pong p(_from); Pong p(_from);
p.echo = sha3(rlpBytes); p.echo = sha3(rlpBytes);
@ -471,6 +476,8 @@ void NodeTable::onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytes
clog(NodeTableWarn) << "Invalid Message, " << hex << packetType << ", received from " << _from.address().to_string() << ":" << dec << _from.port(); clog(NodeTableWarn) << "Invalid Message, " << hex << packetType << ", received from " << _from.address().to_string() << ":" << dec << _from.port();
return; return;
} }
noteActiveNode(nodeid, _from);
} }
catch (...) catch (...)
{ {

22
libp2p/NodeTable.h

@ -44,11 +44,13 @@ struct NodeEntry: public Node
NodeEntry(Node _src, Public _pubk, bi::udp::endpoint _udp); NodeEntry(Node _src, Public _pubk, bi::udp::endpoint _udp);
unsigned const distance; ///< Node's distance (xor of _src as integer). unsigned const distance; ///< Node's distance (xor of _src as integer).
bool pending = true; ///< Node will be ignored until Pong is received
}; };
enum NodeTableEventType { enum NodeTableEventType {
NodeEntryAdded, NodeEntryAdded,
NodeEntryRemoved NodeEntryDropped
}; };
class NodeTable; class NodeTable;
class NodeTableEventHandler class NodeTableEventHandler
@ -101,11 +103,10 @@ inline std::ostream& operator<<(std::ostream& _out, NodeTable const& _nodeTable)
* NodeTable accepts a port for UDP and will listen to the port on all available * NodeTable accepts a port for UDP and will listen to the port on all available
* interfaces. * interfaces.
* *
*
* [Integration] * [Integration]
* @todo restore nodes: affects refreshbuckets
* @todo TCP endpoints * @todo TCP endpoints
* @todo makeRequired: don't try to evict node if node isRequired. * @todo GC uniform 1/32 entires at 112500ms interval
* @todo makeRequired: exclude bucket from refresh if we have node as peer.
* *
* [Optimization] * [Optimization]
* @todo serialize evictions per-bucket * @todo serialize evictions per-bucket
@ -140,16 +141,16 @@ public:
/// Returns distance based on xor metric two node ids. Used by NodeEntry and NodeTable. /// Returns distance based on xor metric two node ids. Used by NodeEntry and NodeTable.
static unsigned distance(NodeId const& _a, NodeId const& _b) { u512 d = _a ^ _b; unsigned ret; for (ret = 0; d >>= 1; ++ret) {}; return ret; } static unsigned distance(NodeId const& _a, NodeId const& _b) { u512 d = _a ^ _b; unsigned ret; for (ret = 0; d >>= 1; ++ret) {}; return ret; }
/// Set event handler for NodeEntryAdded and NodeEntryRemoved events. /// Set event handler for NodeEntryAdded and NodeEntryDropped events.
void setEventHandler(NodeTableEventHandler* _handler) { m_nodeEventHandler.reset(_handler); } void setEventHandler(NodeTableEventHandler* _handler) { m_nodeEventHandler.reset(_handler); }
/// Called by implementation which provided handler to process NodeEntryAdded/NodeEntryRemoved events. Events are coalesced by type whereby old events are ignored. /// Called by implementation which provided handler to process NodeEntryAdded/NodeEntryDropped events. Events are coalesced by type whereby old events are ignored.
void processEvents(); void processEvents();
/// Add node. Node will be pinged if it's not already known. /// Add node. Node will be pinged and empty shared_ptr is returned if NodeId is uknown.
std::shared_ptr<NodeEntry> addNode(Public const& _pubk, bi::udp::endpoint const& _udp, bi::tcp::endpoint const& _tcp); std::shared_ptr<NodeEntry> addNode(Public const& _pubk, bi::udp::endpoint const& _udp, bi::tcp::endpoint const& _tcp);
/// Add node. Node will be pinged if it's not already known. /// Add node. Node will be pinged and empty shared_ptr is returned if node has never been seen.
std::shared_ptr<NodeEntry> addNode(Node const& _node); std::shared_ptr<NodeEntry> addNode(Node const& _node);
/// To be called when node table is empty. Runs node discovery with m_node.id as the target in order to populate node-table. /// To be called when node table is empty. Runs node discovery with m_node.id as the target in order to populate node-table.
@ -225,7 +226,7 @@ private:
/// Asynchronously drops _leastSeen node if it doesn't reply and adds _new node, otherwise _new node is thrown away. /// Asynchronously drops _leastSeen node if it doesn't reply and adds _new node, otherwise _new node is thrown away.
void evict(std::shared_ptr<NodeEntry> _leastSeen, std::shared_ptr<NodeEntry> _new); void evict(std::shared_ptr<NodeEntry> _leastSeen, std::shared_ptr<NodeEntry> _new);
/// Called whenever activity is received from an unknown node in order to maintain node table. /// Called whenever activity is received from a node in order to maintain node table.
void noteActiveNode(Public const& _pubk, bi::udp::endpoint const& _endpoint); void noteActiveNode(Public const& _pubk, bi::udp::endpoint const& _endpoint);
/// Used to drop node when timeout occurs or when evict() result is to keep previous node. /// Used to drop node when timeout occurs or when evict() result is to keep previous node.
@ -266,6 +267,9 @@ private:
Mutex x_evictions; ///< LOCK x_nodes first if both x_nodes and x_evictions locks are required. Mutex x_evictions; ///< LOCK x_nodes first if both x_nodes and x_evictions locks are required.
std::deque<EvictionTimeout> m_evictions; ///< Eviction timeouts. std::deque<EvictionTimeout> m_evictions; ///< Eviction timeouts.
Mutex x_pubkDiscoverPings; ///< LOCK x_nodes first if both x_nodes and x_pubkDiscoverPings locks are required.
std::map<bi::address, TimePoint> m_pubkDiscoverPings; ///< List of pending pings where node entry wasn't created due to unkown pubk.
ba::io_service& m_io; ///< Used by bucket refresh timer. ba::io_service& m_io; ///< Used by bucket refresh timer.
std::shared_ptr<NodeSocket> m_socket; ///< Shared pointer for our UDPSocket; ASIO requires shared_ptr. std::shared_ptr<NodeSocket> m_socket; ///< Shared pointer for our UDPSocket; ASIO requires shared_ptr.
NodeSocket* m_socketPointer; ///< Set to m_socket.get(). Socket is created in constructor and disconnected in destructor to ensure access to pointer is safe. NodeSocket* m_socketPointer; ///< Set to m_socket.get(). Socket is created in constructor and disconnected in destructor to ensure access to pointer is safe.

Loading…
Cancel
Save