Browse Source

Merge pull request #1706 from subtly/discovery

Discovery protocol updates
cl-refactor
Gav Wood 10 years ago
parent
commit
c85b3f6493
  1. 28
      libp2p/Common.cpp
  2. 13
      libp2p/Common.h
  3. 101
      libp2p/Host.cpp
  4. 123
      libp2p/NodeTable.cpp
  5. 115
      libp2p/NodeTable.h
  6. 4
      libp2p/UDP.h
  7. 38
      test/libp2p/net.cpp
  8. 8
      test/libp2p/peer.cpp

28
libp2p/Common.cpp

@ -24,8 +24,9 @@ using namespace std;
using namespace dev;
using namespace dev::p2p;
const unsigned dev::p2p::c_protocolVersion = 3;
const unsigned dev::p2p::c_protocolVersion = 4;
const unsigned dev::p2p::c_defaultIPPort = 30303;
static_assert(dev::p2p::c_protocolVersion == 4, "Replace v3 compatbility with v4 compatibility before updating network version.");
const dev::p2p::NodeIPEndpoint dev::p2p::UnspecifiedNodeIPEndpoint = NodeIPEndpoint(bi::address(), 0, 0);
const dev::p2p::Node dev::p2p::UnspecifiedNode = dev::p2p::Node(NodeId(), UnspecifiedNodeIPEndpoint);
@ -144,6 +145,31 @@ std::string p2p::reasonOf(DisconnectReason _r)
}
}
void NodeIPEndpoint::streamRLP(RLPStream& _s, RLPAppend _append) const
{
if (_append == StreamList)
_s.appendList(3);
if (address.is_v4())
_s << bytesConstRef(&address.to_v4().to_bytes()[0], 4);
else if (address.is_v6())
_s << bytesConstRef(&address.to_v6().to_bytes()[0], 16);
else
_s << bytes();
_s << udpPort << tcpPort;
}
void NodeIPEndpoint::interpretRLP(RLP const& _r)
{
if (_r[0].size() == 4)
address = bi::address_v4(*(bi::address_v4::bytes_type*)_r[0].toBytes().data());
else if (_r[0].size() == 16)
address = bi::address_v6(*(bi::address_v6::bytes_type*)_r[0].toBytes().data());
else
address = bi::address();
udpPort = _r[1].toInt<uint16_t>();
tcpPort = _r[2].toInt<uint16_t>();
}
namespace dev {
std::ostream& operator<<(std::ostream& _out, dev::p2p::NodeIPEndpoint const& _ep)

13
libp2p/Common.h

@ -36,6 +36,7 @@
#include <libdevcrypto/Common.h>
#include <libdevcore/Log.h>
#include <libdevcore/Exceptions.h>
#include <libdevcore/RLP.h>
namespace ba = boost::asio;
namespace bi = boost::asio::ip;
@ -162,10 +163,17 @@ using PeerSessionInfos = std::vector<PeerSessionInfo>;
*/
struct NodeIPEndpoint
{
enum RLPAppend
{
StreamList,
StreamInline
};
/// Setting true causes isAllowed to return true for all addresses. (Used by test fixtures)
static bool test_allowLocal;
NodeIPEndpoint(bi::address _addr, uint16_t _udp, uint16_t _tcp): address(_addr), udpPort(_udp), tcpPort(_tcp) {}
NodeIPEndpoint(RLP const& _r) { interpretRLP(_r); }
bi::address address;
uint16_t udpPort;
@ -177,11 +185,14 @@ struct NodeIPEndpoint
operator bool() const { return !address.is_unspecified() && udpPort > 0 && tcpPort > 0; }
bool isAllowed() const { return NodeIPEndpoint::test_allowLocal ? !address.is_unspecified() : isPublicAddress(address); }
void streamRLP(RLPStream& _s, RLPAppend _append = StreamList) const;
void interpretRLP(RLP const& _r);
};
struct Node
{
Node(Public _pubk, NodeIPEndpoint _ip, bool _required = false): id(_pubk), endpoint(_ip), required(_required) {}
Node(Public _pubk, NodeIPEndpoint const& _ip, bool _required = false): id(_pubk), endpoint(_ip), required(_required) {}
virtual NodeId const& address() const { return id; }
virtual Public const& publicKey() const { return id; }

101
libp2p/Host.cpp

@ -208,7 +208,7 @@ void Host::startPeerSession(Public const& _id, RLP const& _rlp, RLPXFrameIO* _io
// create session so disconnects are managed
auto ps = make_shared<Session>(this, _io, p, PeerSessionInfo({_id, clientVersion, _endpoint.address().to_string(), listenPort, chrono::steady_clock::duration(), _rlp[2].toSet<CapDesc>(), 0, map<string, string>()}));
if (protocolVersion < dev::p2p::c_protocolVersion)
if (protocolVersion < dev::p2p::c_protocolVersion - 1)
{
ps->disconnect(IncompatibleProtocol);
return;
@ -696,15 +696,16 @@ bytes Host::saveNetwork() const
int count = 0;
for (auto const& p: peers)
{
// Only save peers which have connected within 2 days, with properly-advertised port and public IP address
// todo: e2e ipv6 support
// todo: ipv6
if (!p.endpoint.address.is_v4())
continue;
if (chrono::system_clock::now() - p.m_lastConnected < chrono::seconds(3600 * 48) && p.endpoint.tcpPort > 0 && p.id != id() && (p.required || p.endpoint.isAllowed()))
// Only save peers which have connected within 2 days, with properly-advertised port and public IP address
if (chrono::system_clock::now() - p.m_lastConnected < chrono::seconds(3600 * 48) && !!p.endpoint && p.id != id() && (p.required || p.endpoint.isAllowed()))
{
network.appendList(10);
network << p.endpoint.address.to_v4().to_bytes() << p.endpoint.tcpPort << p.id << p.required
network.appendList(11);
p.endpoint.streamRLP(network, NodeIPEndpoint::StreamInline);
network << p.id << p.required
<< chrono::duration_cast<chrono::seconds>(p.m_lastConnected.time_since_epoch()).count()
<< chrono::duration_cast<chrono::seconds>(p.m_lastAttempted.time_since_epoch()).count()
<< p.m_failedAttempts << (unsigned)p.m_lastDisconnect << p.m_score << p.m_rating;
@ -718,12 +719,9 @@ bytes Host::saveNetwork() const
state.sort();
for (auto const& entry: state)
{
network.appendList(3);
if (entry.endpoint.address.is_v4())
network << entry.endpoint.address.to_v4().to_bytes();
else
network << entry.endpoint.address.to_v6().to_bytes();
network << entry.endpoint.tcpPort << entry.id;
network.appendList(4);
entry.endpoint.streamRLP(network, NodeIPEndpoint::StreamInline);
network << entry.id;
count++;
}
}
@ -739,6 +737,9 @@ bytes Host::saveNetwork() const
void Host::restoreNetwork(bytesConstRef _b)
{
if (!_b.size())
return;
// nodes can only be added if network is added
if (!isStarted())
BOOST_THROW_EXCEPTION(NetworkStartRequired());
@ -748,7 +749,8 @@ void Host::restoreNetwork(bytesConstRef _b)
RecursiveGuard l(x_sessions);
RLP r(_b);
if (r.itemCount() > 0 && r[0].isInt() && r[0].toInt<unsigned>() == dev::p2p::c_protocolVersion)
unsigned fileVersion = r[0].toInt<unsigned>();
if (r.itemCount() > 0 && r[0].isInt() && fileVersion >= dev::p2p::c_protocolVersion - 1)
{
// r[0] = version
// r[1] = key
@ -756,30 +758,57 @@ void Host::restoreNetwork(bytesConstRef _b)
for (auto i: r[2])
{
if (i[0].itemCount() != 4)
// todo: ipv6
if (i[0].itemCount() != 4 && i[0].size() != 4)
continue;
// todo: ipv6, bi::address_v6(i[0].toArray<byte, 16>()
Node n((NodeId)i[2], NodeIPEndpoint(bi::address_v4(i[0].toArray<byte, 4>()), i[1].toInt<uint16_t>(), i[1].toInt<uint16_t>()));
if (i.itemCount() == 3 && n.endpoint.isAllowed())
m_nodeTable->addNode(n, NodeTable::NodeRelation::Known);
else if (i.itemCount() == 10)
if (i.itemCount() == 4 || i.itemCount() == 11)
{
n.required = i[3].toInt<bool>();
if (!n.endpoint.isAllowed() && !n.required)
continue;
shared_ptr<Peer> p = make_shared<Peer>(n);
p->m_lastConnected = chrono::system_clock::time_point(chrono::seconds(i[4].toInt<unsigned>()));
p->m_lastAttempted = chrono::system_clock::time_point(chrono::seconds(i[5].toInt<unsigned>()));
p->m_failedAttempts = i[6].toInt<unsigned>();
p->m_lastDisconnect = (DisconnectReason)i[7].toInt<unsigned>();
p->m_score = (int)i[8].toInt<unsigned>();
p->m_rating = (int)i[9].toInt<unsigned>();
m_peers[p->id] = p;
if (p->required)
requirePeer(p->id, n.endpoint);
else
m_nodeTable->addNode(*p.get(), NodeTable::NodeRelation::Known);
Node n((NodeId)i[3], NodeIPEndpoint(i));
if (i.itemCount() == 4 && n.endpoint.isAllowed())
m_nodeTable->addNode(n);
else if (i.itemCount() == 11)
{
n.required = i[4].toInt<bool>();
if (!n.endpoint.isAllowed() && !n.required)
continue;
shared_ptr<Peer> p = make_shared<Peer>(n);
p->m_lastConnected = chrono::system_clock::time_point(chrono::seconds(i[5].toInt<unsigned>()));
p->m_lastAttempted = chrono::system_clock::time_point(chrono::seconds(i[6].toInt<unsigned>()));
p->m_failedAttempts = i[7].toInt<unsigned>();
p->m_lastDisconnect = (DisconnectReason)i[8].toInt<unsigned>();
p->m_score = (int)i[9].toInt<unsigned>();
p->m_rating = (int)i[10].toInt<unsigned>();
m_peers[p->id] = p;
if (p->required)
requirePeer(p->id, n.endpoint);
else
m_nodeTable->addNode(*p.get(), NodeTable::NodeRelation::Known);
}
}
else if (i.itemCount() == 3 || i.itemCount() == 10)
{
Node n((NodeId)i[2], NodeIPEndpoint(bi::address_v4(i[0].toArray<byte, 4>()), i[1].toInt<uint16_t>(), i[1].toInt<uint16_t>()));
if (i.itemCount() == 3 && n.endpoint.isAllowed())
m_nodeTable->addNode(n);
else if (i.itemCount() == 10)
{
n.required = i[3].toInt<bool>();
if (!n.endpoint.isAllowed() && !n.required)
continue;
shared_ptr<Peer> p = make_shared<Peer>(n);
p->m_lastConnected = chrono::system_clock::time_point(chrono::seconds(i[4].toInt<unsigned>()));
p->m_lastAttempted = chrono::system_clock::time_point(chrono::seconds(i[5].toInt<unsigned>()));
p->m_failedAttempts = i[6].toInt<unsigned>();
p->m_lastDisconnect = (DisconnectReason)i[7].toInt<unsigned>();
p->m_score = (int)i[8].toInt<unsigned>();
p->m_rating = (int)i[9].toInt<unsigned>();
m_peers[p->id] = p;
if (p->required)
requirePeer(p->id, n.endpoint);
else
m_nodeTable->addNode(*p.get(), NodeTable::NodeRelation::Known);
}
}
}
}
@ -788,7 +817,7 @@ void Host::restoreNetwork(bytesConstRef _b)
KeyPair Host::networkAlias(bytesConstRef _b)
{
RLP r(_b);
if (r.itemCount() == 3 && r[0].isInt() && r[0].toInt<unsigned>() == dev::p2p::c_protocolVersion)
if (r.itemCount() == 3 && r[0].isInt() && r[0].toInt<unsigned>() >= 3)
return move(KeyPair(move(Secret(r[1].toBytes()))));
else
return move(KeyPair::create());

123
libp2p/NodeTable.cpp

@ -75,12 +75,6 @@ void NodeTable::processEvents()
m_nodeEventHandler->processEvents();
}
shared_ptr<NodeEntry> NodeTable::addNode(Public const& _pubk, NodeIPEndpoint const& _ep)
{
auto node = Node(_pubk, _ep);
return addNode(node);
}
shared_ptr<NodeEntry> NodeTable::addNode(Node const& _node, NodeRelation _relation)
{
if (_relation == Known)
@ -92,13 +86,8 @@ shared_ptr<NodeEntry> NodeTable::addNode(Node const& _node, NodeRelation _relati
return ret;
}
// re-enable tcp checks when NAT hosts are handled by discover
// we handle when tcp endpoint is 0 below
if (_node.endpoint.address.to_string() == "0.0.0.0")
{
clog(NodeTableWarn) << "addNode Failed. Invalid UDP address" << LogTag::Url << "0.0.0.0" << "for" << _node.id;
if (!_node.endpoint)
return move(shared_ptr<NodeEntry>());
}
// ping address to recover nodeid if nodeid is empty
if (!_node.id)
@ -108,9 +97,7 @@ shared_ptr<NodeEntry> NodeTable::addNode(Node const& _node, NodeRelation _relati
Guard l(x_pubkDiscoverPings);
m_pubkDiscoverPings[_node.endpoint.address] = std::chrono::steady_clock::now();
}
PingNode p(_node.endpoint, m_node.endpoint.address.to_string(), m_node.endpoint.udpPort);
p.sign(m_secret);
m_socketPointer->send(p);
ping(_node.endpoint);
return move(shared_ptr<NodeEntry>());
}
@ -123,9 +110,7 @@ shared_ptr<NodeEntry> NodeTable::addNode(Node const& _node, NodeRelation _relati
shared_ptr<NodeEntry> ret(new NodeEntry(m_node, _node.id, _node.endpoint));
m_nodes[_node.id] = ret;
clog(NodeTableConnect) << "addNode pending for" << _node.endpoint;
PingNode p(_node.endpoint, m_node.endpoint.address.to_string(), m_node.endpoint.udpPort);
p.sign(m_secret);
m_socketPointer->send(p);
ping(_node.endpoint);
return ret;
}
@ -153,8 +138,10 @@ list<NodeEntry> NodeTable::snapshot() const
list<NodeEntry> ret;
Guard l(x_state);
for (auto s: m_state)
for (auto n: s.nodes)
ret.push_back(*n.lock());
for (auto np: s.nodes)
if (auto n = np.lock())
if (!!n)
ret.push_back(*n);
return move(ret);
}
@ -295,14 +282,14 @@ vector<shared_ptr<NodeEntry>> NodeTable::nearestNodeEntries(NodeId _target)
vector<shared_ptr<NodeEntry>> ret;
for (auto& nodes: found)
for (auto n: nodes.second)
if (n->endpoint.isAllowed())
if (ret.size() < s_bucketSize && !!n->endpoint && n->endpoint.isAllowed())
ret.push_back(n);
return move(ret);
}
void NodeTable::ping(bi::udp::endpoint _to) const
void NodeTable::ping(NodeIPEndpoint _to) const
{
PingNode p(_to, m_node.endpoint.address.to_string(), m_node.endpoint.udpPort);
PingNode p(m_node.endpoint, _to);
p.sign(m_secret);
m_socketPointer->send(p);
}
@ -467,12 +454,17 @@ void NodeTable::onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytes
m_pubkDiscoverPings.erase(_from.address());
}
if (!haveNode(nodeid))
addNode(nodeid, NodeIPEndpoint(_from.address(), _from.port(), _from.port()));
addNode(Node(nodeid, NodeIPEndpoint(_from.address(), _from.port(), _from.port())));
}
else
return; // unsolicited pong; don't note node as active
}
// update our endpoint address and UDP port
if ((!m_node.endpoint || !m_node.endpoint.isAllowed()) && isPublicAddress(in.destination.address))
m_node.endpoint.address = in.destination.address;
m_node.endpoint.udpPort = in.destination.udpPort;
clog(NodeTableConnect) << "PONG from " << nodeid << _from;
break;
}
@ -497,17 +489,22 @@ void NodeTable::onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytes
}
Neighbours in = Neighbours::fromBytesConstRef(_from, rlpBytes);
for (auto n: in.nodes)
addNode(n.node, NodeIPEndpoint(bi::address::from_string(n.ipAddress), n.udpPort, n.udpPort));
for (auto n: in.neighbours)
addNode(Node(n.node, n.endpoint));
break;
}
case FindNode::type:
{
FindNode in = FindNode::fromBytesConstRef(_from, rlpBytes);
if (RLPXDatagramFace::secondsSinceEpoch() > in.ts)
{
clog(NodeTableTriviaSummary) << "Received expired FindNode from " << _from.address().to_string() << ":" << _from.port();
return;
}
vector<shared_ptr<NodeEntry>> nearest = nearestNodeEntries(in.target);
static unsigned const nlimit = (m_socketPointer->maxDatagramSize - 111) / 87;
static unsigned const nlimit = (m_socketPointer->maxDatagramSize - 109) / 90;
for (unsigned offset = 0; offset < nearest.size(); offset += nlimit)
{
Neighbours out(_from, nearest, offset, nlimit);
@ -522,17 +519,29 @@ void NodeTable::onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytes
case PingNode::type:
{
PingNode in = PingNode::fromBytesConstRef(_from, rlpBytes);
if (in.version != dev::p2p::c_protocolVersion)
if (in.version < dev::p2p::c_protocolVersion)
{
if (auto n = nodeEntry(nodeid))
dropNode(n);
return;
if (in.version == 3)
{
compat::Pong p(in.source);
p.echo = sha3(rlpBytes);
p.sign(m_secret);
m_socketPointer->send(p);
}
else
return;
}
// TODO: Feedback if _from.address() != in.ipAddress
addNode(nodeid, NodeIPEndpoint(_from.address(), _from.port(), in.tcpPort));
if (RLPXDatagramFace::secondsSinceEpoch() > in.ts)
{
clog(NodeTableTriviaSummary) << "Received expired PingNode from " << _from.address().to_string() << ":" << _from.port();
return;
}
Pong p(_from);
in.source.address = _from.address();
in.source.udpPort = _from.port();
addNode(Node(nodeid, in.source));
Pong p(in.source);
p.echo = sha3(rlpBytes);
p.sign(m_secret);
m_socketPointer->send(p);
@ -608,26 +617,44 @@ void NodeTable::doRefreshBuckets(boost::system::error_code const& _ec)
void PingNode::streamRLP(RLPStream& _s) const
{
_s.appendList(4);
_s << dev::p2p::c_protocolVersion << ipAddress << tcpPort << ts;
_s << dev::p2p::c_protocolVersion;
source.streamRLP(_s);
destination.streamRLP(_s);
_s << ts;
}
void PingNode::interpretRLP(bytesConstRef _bytes)
{
RLP r(_bytes);
if (r.itemCountStrict() == 3)
if (r.itemCountStrict() == 4 && r[0].isInt() && r[0].toInt<unsigned>(RLP::Strict) == dev::p2p::c_protocolVersion)
{
version = 2;
ipAddress = r[0].toString();
tcpPort = r[1].toInt<unsigned>(RLP::Strict);
ts = r[2].toInt<unsigned>(RLP::Strict);
}
else if (r.itemCountStrict() == 4)
{
version = r[0].toInt<unsigned>(RLP::Strict);
ipAddress = r[1].toString();
tcpPort = r[2].toInt<unsigned>(RLP::Strict);
ts = r[3].toInt<unsigned>(RLP::Strict);
version = dev::p2p::c_protocolVersion;
source.interpretRLP(r[1]);
destination.interpretRLP(r[2]);
ts = r[3].toInt<uint32_t>(RLP::Strict);
}
else
BOOST_THROW_EXCEPTION(InvalidRLP());
version = r[0].toInt<unsigned>(RLP::Strict);
}
void Pong::streamRLP(RLPStream& _s) const
{
_s.appendList(3);
destination.streamRLP(_s);
_s << echo << ts;
}
void Pong::interpretRLP(bytesConstRef _bytes)
{
RLP r(_bytes);
destination.interpretRLP(r[0]);
echo = (h256)r[1];
ts = r[2].toInt<uint32_t>();
}
void compat::Pong::interpretRLP(bytesConstRef _bytes)
{
RLP r(_bytes);
echo = (h256)r[0];
ts = r[1].toInt<uint32_t>();
}

115
libp2p/NodeTable.h

@ -100,23 +100,15 @@ inline std::ostream& operator<<(std::ostream& _out, NodeTable const& _nodeTable)
* NodeTable accepts a port for UDP and will listen to the port on all available
* interfaces.
*
*
* [Integration]
* @todo TCP endpoints
* @todo GC uniform 1/32 entires at 112500ms interval
*
* [Optimization]
* @todo serialize evictions per-bucket
* @todo store evictions in map, unit-test eviction logic
* @todo store root node in table
* @todo encapsulate discover into NetworkAlgorithm (task)
* @todo Pong to include ip:port where ping was received
* @todo expiration and sha3(id) 'to' for messages which are replies (prevents replay)
* @todo cache Ping and FindSelf
*
* [Networking]
* @todo node-endpoint updates
* @todo TCP endpoints
* @todo eth/upnp/natpmp/stun/ice/etc for public-discovery
* @todo firewall
*
@ -131,7 +123,7 @@ class NodeTable: UDPSocketEvents, public std::enable_shared_from_this<NodeTable>
using TimePoint = std::chrono::steady_clock::time_point; ///< Steady time point.
using NodeIdTimePoint = std::pair<NodeId, TimePoint>;
using EvictionTimeout = std::pair<NodeIdTimePoint, NodeId>; ///< First NodeId (NodeIdTimePoint) may be evicted and replaced with second NodeId.
public:
enum NodeRelation { Unknown = 0, Known };
@ -148,9 +140,6 @@ public:
/// Called by implementation which provided handler to process NodeEntryAdded/NodeEntryDropped events. Events are coalesced by type whereby old events are ignored.
void processEvents();
/// Add node. Node will be pinged and empty shared_ptr is returned if NodeId is uknown.
std::shared_ptr<NodeEntry> addNode(Public const& _pubk, NodeIPEndpoint const& _ep);
/// Add node. Node will be pinged and empty shared_ptr is returned if node has never been seen or NodeId is empty.
std::shared_ptr<NodeEntry> addNode(Node const& _node, NodeRelation _relation = NodeRelation::Unknown);
@ -206,7 +195,7 @@ private:
};
/// Used to ping endpoint.
void ping(bi::udp::endpoint _to) const;
void ping(NodeIPEndpoint _to) const;
/// Used ping known node. Used by node table when refreshing buckets and as part of eviction process (see evict).
void ping(NodeEntry* _n) const;
@ -301,30 +290,21 @@ struct InvalidRLP: public Exception {};
* a given bucket which is full, the least-responsive node is pinged.
* If the pinged node doesn't respond, then it is removed and the new
* node is inserted.
*
* RLP Encoded Items: 3
* Minimum Encoded Size: 18 bytes
* Maximum Encoded Size: bytes // todo after u128 addresses
*
* signature: Signature of message.
* ipAddress: Our IP address.
* port: Our port.
*
* @todo uint128_t for ip address (<->integer ipv4/6, asio-address, asio-endpoint)
*
*/
struct PingNode: RLPXDatagram<PingNode>
{
PingNode(bi::udp::endpoint _ep): RLPXDatagram<PingNode>(_ep) {}
PingNode(bi::udp::endpoint _ep, std::string _src, uint16_t _srcPort, std::chrono::seconds _ts = std::chrono::seconds(60)): RLPXDatagram<PingNode>(_ep), ipAddress(_src), tcpPort(_srcPort), ts(futureFromEpoch(_ts)) {}
/// Constructor used for sending PingNode.
PingNode(NodeIPEndpoint _src, NodeIPEndpoint _dest): RLPXDatagram<PingNode>(_dest), source(_src), destination(_dest), ts(futureFromEpoch(std::chrono::seconds(60))) {}
/// Constructor used to create empty PingNode for parsing inbound packets.
PingNode(bi::udp::endpoint _ep): RLPXDatagram<PingNode>(_ep), source(UnspecifiedNodeIPEndpoint), destination(UnspecifiedNodeIPEndpoint) {}
static const uint8_t type = 1;
unsigned version = 0;
std::string ipAddress;
// uint16_t udpPort;
uint16_t tcpPort;
unsigned ts;
NodeIPEndpoint source;
NodeIPEndpoint destination;
uint32_t ts = 0;
void streamRLP(RLPStream& _s) const override;
void interpretRLP(bytesConstRef _bytes) override;
@ -332,22 +312,20 @@ struct PingNode: RLPXDatagram<PingNode>
/**
* Pong packet: Sent in response to ping
*
* RLP Encoded Items: 2
* Minimum Encoded Size: 33 bytes
* Maximum Encoded Size: 33 bytes
*/
struct Pong: RLPXDatagram<Pong>
{
Pong(bi::udp::endpoint _ep): RLPXDatagram<Pong>(_ep), ts(futureFromEpoch(std::chrono::seconds(60))) {}
Pong(bi::udp::endpoint const& _ep): RLPXDatagram<Pong>(_ep), destination(UnspecifiedNodeIPEndpoint) {}
Pong(NodeIPEndpoint const& _dest): RLPXDatagram<Pong>((bi::udp::endpoint)_dest), destination(_dest), ts(futureFromEpoch(std::chrono::seconds(60))) {}
static const uint8_t type = 2;
NodeIPEndpoint destination;
h256 echo; ///< MCD of PingNode
unsigned ts;
uint32_t ts = 0;
void streamRLP(RLPStream& _s) const { _s.appendList(2); _s << echo << ts; }
void interpretRLP(bytesConstRef _bytes) { RLP r(_bytes); echo = (h256)r[0]; ts = r[1].toInt<unsigned>(); }
void streamRLP(RLPStream& _s) const;
void interpretRLP(bytesConstRef _bytes);
};
/**
@ -365,58 +343,63 @@ struct Pong: RLPXDatagram<Pong>
struct FindNode: RLPXDatagram<FindNode>
{
FindNode(bi::udp::endpoint _ep): RLPXDatagram<FindNode>(_ep) {}
FindNode(bi::udp::endpoint _ep, NodeId _target, std::chrono::seconds _ts = std::chrono::seconds(60)): RLPXDatagram<FindNode>(_ep), target(_target), ts(futureFromEpoch(_ts)) {}
FindNode(bi::udp::endpoint _ep, NodeId _target): RLPXDatagram<FindNode>(_ep), target(_target), ts(futureFromEpoch(std::chrono::seconds(60))) {}
static const uint8_t type = 3;
h512 target;
unsigned ts;
uint32_t ts = 0;
void streamRLP(RLPStream& _s) const { _s.appendList(2); _s << target << ts; }
void interpretRLP(bytesConstRef _bytes) { RLP r(_bytes); target = r[0].toHash<h512>(); ts = r[1].toInt<unsigned>(); }
void interpretRLP(bytesConstRef _bytes) { RLP r(_bytes); target = r[0].toHash<h512>(); ts = r[1].toInt<uint32_t>(); }
};
/**
* Node Packet: Multiple node packets are sent in response to FindNode.
*
* RLP Encoded Items: 2 (first item is list)
* Minimum Encoded Size: 10 bytes
* Node Packet: One or more node packets are sent in response to FindNode.
*/
struct Neighbours: RLPXDatagram<Neighbours>
{
struct Node
struct Neighbour
{
Node() = default;
Node(RLP const& _r) { interpretRLP(_r); }
std::string ipAddress;
uint16_t udpPort;
// uint16_t tcpPort;
Neighbour(Node const& _node): endpoint(_node.endpoint), node(_node.id) {}
Neighbour(RLP const& _r): endpoint(_r) { node = h512(_r[3].toBytes()); }
NodeIPEndpoint endpoint;
NodeId node;
void streamRLP(RLPStream& _s) const { _s.appendList(3); _s << ipAddress << udpPort << node; }
void interpretRLP(RLP const& _r) { ipAddress = _r[0].toString(); udpPort = _r[1].toInt<uint16_t>(); node = h512(_r[2].toBytes()); }
void streamRLP(RLPStream& _s) const { _s.appendList(4); endpoint.streamRLP(_s, NodeIPEndpoint::StreamInline); _s << node; }
};
Neighbours(bi::udp::endpoint _ep): RLPXDatagram<Neighbours>(_ep), ts(futureFromEpoch(std::chrono::seconds(30))) {}
Neighbours(bi::udp::endpoint _to, std::vector<std::shared_ptr<NodeEntry>> const& _nearest, unsigned _offset = 0, unsigned _limit = 0): RLPXDatagram<Neighbours>(_to), ts(futureFromEpoch(std::chrono::seconds(30)))
Neighbours(bi::udp::endpoint _ep): RLPXDatagram<Neighbours>(_ep), ts(secondsSinceEpoch()) {}
Neighbours(bi::udp::endpoint _to, std::vector<std::shared_ptr<NodeEntry>> const& _nearest, unsigned _offset = 0, unsigned _limit = 0): RLPXDatagram<Neighbours>(_to), ts(futureFromEpoch(std::chrono::seconds(60)))
{
auto limit = _limit ? std::min(_nearest.size(), (size_t)(_offset + _limit)) : _nearest.size();
for (auto i = _offset; i < limit; i++)
{
Node node;
node.ipAddress = _nearest[i]->endpoint.address.to_string();
node.udpPort = _nearest[i]->endpoint.udpPort;
node.node = _nearest[i]->publicKey();
nodes.push_back(node);
}
neighbours.push_back(Neighbour(*_nearest[i]));
}
static const uint8_t type = 4;
std::vector<Node> nodes;
unsigned ts = 1;
std::vector<Neighbour> neighbours;
uint32_t ts = 0;
void streamRLP(RLPStream& _s) const { _s.appendList(2); _s.appendList(nodes.size()); for (auto& n: nodes) n.streamRLP(_s); _s << ts; }
void interpretRLP(bytesConstRef _bytes) { RLP r(_bytes); for (auto n: r[0]) nodes.push_back(Node(n)); ts = r[1].toInt<unsigned>(); }
void streamRLP(RLPStream& _s) const { _s.appendList(2); _s.appendList(neighbours.size()); for (auto& n: neighbours) n.streamRLP(_s); _s << ts; }
void interpretRLP(bytesConstRef _bytes) { RLP r(_bytes); for (auto n: r[0]) neighbours.push_back(Neighbour(n)); ts = r[1].toInt<uint32_t>(); }
};
namespace compat
{
/**
* Pong packet [compatability]: Sent in response to ping
*/
struct Pong: RLPXDatagram<Pong>
{
Pong(bi::udp::endpoint const& _ep): RLPXDatagram<Pong>(_ep) {}
Pong(NodeIPEndpoint const& _dest): RLPXDatagram<Pong>((bi::udp::endpoint)_dest), ts(futureFromEpoch(std::chrono::seconds(60))) {}
static const uint8_t type = 2;
h256 echo;
uint32_t ts = 0;
void streamRLP(RLPStream& _s) const { _s.appendList(2); _s << echo << ts; }
void interpretRLP(bytesConstRef _bytes);
};
}
struct NodeTableWarn: public LogChannel { static const char* name(); static const int verbosity = 0; };
struct NodeTableNote: public LogChannel { static const char* name(); static const int verbosity = 1; };

4
libp2p/UDP.h

@ -61,8 +61,8 @@ protected:
*/
struct RLPXDatagramFace: public UDPDatagram
{
static uint64_t futureFromEpoch(std::chrono::milliseconds _ms) { return std::chrono::duration_cast<std::chrono::seconds>((std::chrono::system_clock::now() + _ms).time_since_epoch()).count(); }
static uint64_t futureFromEpoch(std::chrono::seconds _sec) { return std::chrono::duration_cast<std::chrono::seconds>((std::chrono::system_clock::now() + _sec).time_since_epoch()).count(); }
static uint32_t futureFromEpoch(std::chrono::seconds _sec) { return std::chrono::duration_cast<std::chrono::seconds>((std::chrono::system_clock::now() + _sec).time_since_epoch()).count(); }
static uint32_t secondsSinceEpoch() { return std::chrono::duration_cast<std::chrono::seconds>((std::chrono::system_clock::now()).time_since_epoch()).count(); }
static Public authenticate(bytesConstRef _sig, bytesConstRef _rlp);
virtual uint8_t packetType() = 0;

38
test/libp2p/net.cpp

@ -82,7 +82,7 @@ struct TestNodeTable: public NodeTable
bi::address ourIp = bi::address::from_string("127.0.0.1");
for (auto& n: _testNodes)
{
ping(bi::udp::endpoint(ourIp, n.second));
ping(NodeIPEndpoint(ourIp, n.second, n.second));
this_thread::sleep_for(chrono::milliseconds(2));
}
}
@ -226,7 +226,7 @@ BOOST_AUTO_TEST_CASE(v2PingNodePacket)
PingNode p((bi::udp::endpoint()));
BOOST_REQUIRE_NO_THROW(p = PingNode::fromBytesConstRef(bi::udp::endpoint(), bytesConstRef(&s.out())));
BOOST_REQUIRE(p.version == 2);
BOOST_REQUIRE(p.version == 0);
}
BOOST_AUTO_TEST_CASE(neighboursPacketLength)
@ -235,8 +235,8 @@ BOOST_AUTO_TEST_CASE(neighboursPacketLength)
std::vector<std::pair<KeyPair,unsigned>> testNodes(TestNodeTable::createTestNodes(16));
bi::udp::endpoint to(boost::asio::ip::address::from_string("127.0.0.1"), 30000);
// hash(32), signature(65), overhead: packet(2), type(1), nodeList(2), ts(9),
static unsigned const nlimit = (1280 - 111) / 87;
// hash(32), signature(65), overhead: packetSz(3), type(1), nodeListSz(3), ts(5),
static unsigned const nlimit = (1280 - 109) / 90; // neighbour: 2 + 65 + 3 + 3 + 17
for (unsigned offset = 0; offset < testNodes.size(); offset += nlimit)
{
Neighbours out(to);
@ -244,11 +244,9 @@ BOOST_AUTO_TEST_CASE(neighboursPacketLength)
auto limit = nlimit ? std::min(testNodes.size(), (size_t)(offset + nlimit)) : testNodes.size();
for (auto i = offset; i < limit; i++)
{
Neighbours::Node node;
node.ipAddress = boost::asio::ip::address::from_string("200.200.200.200").to_string();
node.udpPort = testNodes[i].second;
node.node = testNodes[i].first.pub();
out.nodes.push_back(node);
Node n(testNodes[i].first.pub(), NodeIPEndpoint(boost::asio::ip::address::from_string("200.200.200.200"), testNodes[i].second, testNodes[i].second));
Neighbours::Neighbour neighbour(n);
out.neighbours.push_back(neighbour);
}
out.sign(k.sec());
@ -256,7 +254,7 @@ BOOST_AUTO_TEST_CASE(neighboursPacketLength)
}
}
BOOST_AUTO_TEST_CASE(test_neighbours_packet)
BOOST_AUTO_TEST_CASE(neighboursPacket)
{
KeyPair k = KeyPair::create();
std::vector<std::pair<KeyPair,unsigned>> testNodes(TestNodeTable::createTestNodes(16));
@ -265,11 +263,9 @@ BOOST_AUTO_TEST_CASE(test_neighbours_packet)
Neighbours out(to);
for (auto n: testNodes)
{
Neighbours::Node node;
node.ipAddress = boost::asio::ip::address::from_string("127.0.0.1").to_string();
node.udpPort = n.second;
node.node = n.first.pub();
out.nodes.push_back(node);
Node node(n.first.pub(), NodeIPEndpoint(boost::asio::ip::address::from_string("200.200.200.200"), n.second, n.second));
Neighbours::Neighbour neighbour(node);
out.neighbours.push_back(neighbour);
}
out.sign(k.sec());
@ -277,9 +273,9 @@ BOOST_AUTO_TEST_CASE(test_neighbours_packet)
bytesConstRef rlpBytes(packet.cropped(h256::size + Signature::size + 1));
Neighbours in = Neighbours::fromBytesConstRef(to, rlpBytes);
int count = 0;
for (auto n: in.nodes)
for (auto n: in.neighbours)
{
BOOST_REQUIRE_EQUAL(testNodes[count].second, n.udpPort);
BOOST_REQUIRE_EQUAL(testNodes[count].second, n.endpoint.udpPort);
BOOST_REQUIRE_EQUAL(testNodes[count].first.pub(), n.node);
BOOST_REQUIRE_EQUAL(sha3(testNodes[count].first.pub()), sha3(n.node));
count++;
@ -293,12 +289,6 @@ BOOST_AUTO_TEST_CASE(test_findnode_neighbours)
// into the same list of nearest nodes.
}
BOOST_AUTO_TEST_CASE(test_windows_template)
{
bi::udp::endpoint ep;
PingNode p(ep);
}
BOOST_AUTO_TEST_CASE(kademlia)
{
// Not yet a 'real' test.
@ -332,7 +322,7 @@ BOOST_AUTO_TEST_CASE(kademlia)
}
BOOST_AUTO_TEST_CASE(test_udp_once)
BOOST_AUTO_TEST_CASE(udpOnce)
{
UDPDatagram d(bi::udp::endpoint(boost::asio::ip::address::from_string("127.0.0.1"), 30300), bytes({65,65,65,65}));
TestUDPSocket a; a.m_socket->connect(); a.start();

8
test/libp2p/peer.cpp

@ -51,6 +51,8 @@ BOOST_AUTO_TEST_CASE(host)
auto node2 = host2.id();
host2.start();
while (!host2.isStarted())
this_thread::sleep_for(chrono::milliseconds(20));
host1.addNode(node2, NodeIPEndpoint(bi::address::from_string("127.0.0.1"), host2prefs.listenPort, host2prefs.listenPort));
this_thread::sleep_for(chrono::seconds(3));
@ -72,7 +74,7 @@ BOOST_AUTO_TEST_CASE(networkConfig)
BOOST_REQUIRE(save.id() == restore.id());
}
BOOST_AUTO_TEST_CASE(save_nodes)
BOOST_AUTO_TEST_CASE(saveNodes)
{
std::list<Host*> hosts;
for (auto i:{0,1,2,3,4,5})
@ -111,8 +113,8 @@ BOOST_AUTO_TEST_CASE(save_nodes)
for (auto i: r[2])
{
BOOST_REQUIRE(i.itemCount() == 3 || i.itemCount() == 10);
BOOST_REQUIRE(i[0].itemCount() == 4 || i[0].itemCount() == 16);
BOOST_REQUIRE(i.itemCount() == 4 || i.itemCount() == 11);
BOOST_REQUIRE(i[0].size() == 4 || i[0].size() == 16);
}
}

Loading…
Cancel
Save