Browse Source

handle new endpoint properties for ping

cl-refactor
subtly 10 years ago
parent
commit
50f5c96e9c
  1. 96
      libp2p/NodeTable.cpp
  2. 9
      libp2p/NodeTable.h
  3. 2
      test/libp2p/peer.cpp

96
libp2p/NodeTable.cpp

@ -75,21 +75,10 @@ void NodeTable::processEvents()
m_nodeEventHandler->processEvents();
}
shared_ptr<NodeEntry> NodeTable::addNode(Public const& _pubk, NodeIPEndpoint const& _ep)
{
auto node = Node(_pubk, _ep);
return addNode(node);
}
shared_ptr<NodeEntry> NodeTable::addNode(Node const& _node)
{
// re-enable tcp checks when NAT hosts are handled by discover
// we handle when tcp endpoint is 0 below
if (_node.endpoint.address.to_string() == "0.0.0.0")
{
clog(NodeTableWarn) << "addNode Failed. Invalid UDP address" << url << "0.0.0.0" << "for" << _node.id;
if (!_node.endpoint)
return move(shared_ptr<NodeEntry>());
}
// ping address to recover nodeid if nodeid is empty
if (!_node.id)
@ -229,22 +218,24 @@ vector<shared_ptr<NodeEntry>> NodeTable::nearestNodeEntries(NodeId _target)
Guard l(x_state);
for (auto n: m_state[head].nodes)
if (auto p = n.lock())
{
if (count < s_bucketSize)
found[distance(_target, p->id)].push_back(p);
else
break;
}
if (count < s_bucketSize && tail)
for (auto n: m_state[tail].nodes)
if (auto p = n.lock())
if (!!p->endpoint && p->endpoint.isAllowed())
{
if (count < s_bucketSize)
found[distance(_target, p->id)].push_back(p);
else
break;
}
if (count < s_bucketSize && tail)
for (auto n: m_state[tail].nodes)
if (auto p = n.lock())
if (!!p->endpoint && p->endpoint.isAllowed())
{
if (count < s_bucketSize)
found[distance(_target, p->id)].push_back(p);
else
break;
}
head++;
if (tail)
@ -256,12 +247,13 @@ vector<shared_ptr<NodeEntry>> NodeTable::nearestNodeEntries(NodeId _target)
Guard l(x_state);
for (auto n: m_state[head].nodes)
if (auto p = n.lock())
{
if (count < s_bucketSize)
found[distance(_target, p->id)].push_back(p);
else
break;
}
if (!!p->endpoint && p->endpoint.isAllowed())
{
if (count < s_bucketSize)
found[distance(_target, p->id)].push_back(p);
else
break;
}
head++;
}
else
@ -270,19 +262,20 @@ vector<shared_ptr<NodeEntry>> NodeTable::nearestNodeEntries(NodeId _target)
Guard l(x_state);
for (auto n: m_state[tail].nodes)
if (auto p = n.lock())
{
if (count < s_bucketSize)
found[distance(_target, p->id)].push_back(p);
else
break;
}
if (!!p->endpoint && p->endpoint.isAllowed())
{
if (count < s_bucketSize)
found[distance(_target, p->id)].push_back(p);
else
break;
}
tail--;
}
vector<shared_ptr<NodeEntry>> ret;
for (auto& nodes: found)
for (auto n: nodes.second)
if (n->endpoint.isAllowed())
if (ret.size() < s_bucketSize && n->endpoint.isAllowed())
ret.push_back(n);
return move(ret);
}
@ -320,8 +313,9 @@ void NodeTable::noteActiveNode(Public const& _pubk, bi::udp::endpoint const& _en
return;
shared_ptr<NodeEntry> node = nodeEntry(_pubk);
if (!!node && !node->pending)
if (!!node && !node->pending && !!node->endpoint)
{
// todo: drop in favor of ping/pong packets
clog(NodeTableConnect) << "Noting active node:" << _pubk.abridged() << _endpoint.address().to_string() << ":" << _endpoint.port();
node->endpoint.address = _endpoint.address();
node->endpoint.udpPort = _endpoint.port();
@ -426,6 +420,8 @@ void NodeTable::onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytes
{
Pong in = Pong::fromBytesConstRef(_from, rlpBytes);
// TODO: check echo! (pending pings)
// whenever a pong is received, check if it's in m_evictions
Guard le(x_evictions);
bool evictionEntry = false;
@ -454,12 +450,22 @@ void NodeTable::onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytes
m_pubkDiscoverPings.erase(_from.address());
}
if (!haveNode(nodeid))
addNode(nodeid, NodeIPEndpoint(_from.address(), _from.port(), _from.port()));
addNode(Node(nodeid, NodeIPEndpoint(_from.address(), _from.port(), _from.port())));
}
else
return; // unsolicited pong; don't note node as active
}
// update our endpoint address and UDP port (with caution and iff appropriate)
if (false && !m_node.endpoint)
{
if (in.destination.address != m_node.endpoint.address)
m_node.endpoint.address = in.destination.address;
if (in.destination.udpPort != m_node.endpoint.udpPort)
m_node.endpoint.udpPort = in.destination.udpPort;
}
clog(NodeTableConnect) << "PONG from " << nodeid.abridged() << _from;
break;
}
@ -485,7 +491,7 @@ void NodeTable::onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytes
Neighbours in = Neighbours::fromBytesConstRef(_from, rlpBytes);
for (auto n: in.neighbours)
addNode(n.node, n.endpoint);
addNode(Node(n.node, n.endpoint));
break;
}
@ -510,16 +516,12 @@ void NodeTable::onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytes
{
PingNode in = PingNode::fromBytesConstRef(_from, rlpBytes);
if (in.version != dev::p2p::c_protocolVersion)
{
if (auto n = nodeEntry(nodeid))
if (n)
dropNode(n);
return;
}
// TODO: Feedback if _from.address() != in.ipAddress, or, _from.por() != in.source.udpPort
auto node = addNode(nodeid, NodeIPEndpoint(_from.address(), _from.port(), in.source.tcpPort));
Pong p(node->endpoint);
if (in.source.address.is_unspecified())
in.source.address = _from.address();
addNode(Node(nodeid, in.source));
Pong p(NodeIPEndpoint(_from.address(), _from.port(), in.source.tcpPort));
p.echo = sha3(rlpBytes);
p.sign(m_secret);
m_socketPointer->send(p);

9
libp2p/NodeTable.h

@ -146,10 +146,7 @@ public:
/// Called by implementation which provided handler to process NodeEntryAdded/NodeEntryDropped events. Events are coalesced by type whereby old events are ignored.
void processEvents();
/// Add node. Node will be pinged and empty shared_ptr is returned if NodeId is uknown.
std::shared_ptr<NodeEntry> addNode(Public const& _pubk, NodeIPEndpoint const& _ep);
/// Add node. Node will be pinged and empty shared_ptr is returned if node has never been seen.
/// Add node. Node will be pinged and empty shared_ptr is returned if node has never been seen or NodeId is empty.
std::shared_ptr<NodeEntry> addNode(Node const& _node);
/// To be called when node table is empty. Runs node discovery with m_node.id as the target in order to populate node-table.
@ -321,8 +318,8 @@ struct PingNode: RLPXDatagram<PingNode>
*/
struct Pong: RLPXDatagram<Pong>
{
Pong(bi::udp::endpoint _ep): RLPXDatagram<Pong>(_ep), destination(UnspecifiedNodeIPEndpoint) {}
Pong(NodeIPEndpoint _dest): RLPXDatagram<Pong>((bi::udp::endpoint)_dest), destination(_dest), ts(futureFromEpoch(std::chrono::seconds(60))) {}
Pong(bi::udp::endpoint const& _ep): RLPXDatagram<Pong>(_ep), destination(UnspecifiedNodeIPEndpoint) {}
Pong(NodeIPEndpoint const& _dest): RLPXDatagram<Pong>((bi::udp::endpoint)_dest), destination(_dest), ts(futureFromEpoch(std::chrono::seconds(60))) {}
static const uint8_t type = 2;

2
test/libp2p/peer.cpp

@ -55,7 +55,7 @@ BOOST_AUTO_TEST_CASE(host)
this_thread::sleep_for(chrono::milliseconds(20));
host1.addNode(node2, NodeIPEndpoint(bi::address::from_string("127.0.0.1"), host2prefs.listenPort, host2prefs.listenPort));
this_thread::sleep_for(chrono::seconds(10));
this_thread::sleep_for(chrono::seconds(3));
auto host1peerCount = host1.peerCount();
auto host2peerCount = host2.peerCount();

Loading…
Cancel
Save