Browse Source

basic implementation of packets

cl-refactor
subtly 10 years ago
parent
commit
c06054f63e
  1. 31
      libp2p/NodeTable.cpp
  2. 240
      libp2p/NodeTable.h
  3. 13
      libp2p/UDP.cpp
  4. 35
      libp2p/UDP.h
  5. 6
      test/net.cpp

31
libp2p/NodeTable.cpp

@ -287,17 +287,22 @@ void NodeTable::onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytes
}
/// 3 items is PingNode, 2 items w/no lists is FindNode, 2 items w/first item as list is Neighbors, 1 item is Pong
RLP rlp(bytesConstRef(_packet.cropped(65, _packet.size() - 65)));
bytesConstRef rlpBytes(_packet.cropped(65, _packet.size() - 65));
RLP rlp(rlpBytes);
unsigned itemCount = rlp.itemCount();
// bytesConstRef sig(_packet.cropped(0, 65)); // verify signature (deferred)
try {
switch (itemCount) {
case 1:
{
clog(NodeTableMessageSummary) << "Received Pong from " << _from.address().to_string() << ":" << _from.port();
Pong in = Pong::fromBytesConstRef(_from, rlpBytes);
// whenever a pong is received, first check if it's in m_evictions, if so, remove it
Guard l(x_evictions);
// otherwise check if we're expecting a pong. if we weren't, blacklist IP for 300 seconds
break;
}
@ -305,19 +310,32 @@ void NodeTable::onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytes
if (rlp[0].isList())
{
clog(NodeTableMessageSummary) << "Received Neighbors from " << _from.address().to_string() << ":" << _from.port();
Neighbors in = Neighbors::fromBytesConstRef(_from, rlpBytes);
for (auto n: in.nodes)
noteNode(n.node, bi::udp::endpoint(bi::address::from_string(n.ipAddress), n.port));
}
else
{
clog(NodeTableMessageSummary) << "Received FindNode from " << _from.address().to_string() << ":" << _from.port();
FindNode in = FindNode::fromBytesConstRef(_from, rlpBytes);
std::vector<std::shared_ptr<NodeTable::NodeEntry>> nearest = findNearest(in.target);
Neighbors out(_from, nearest);
out.sign(m_secret);
m_socketPtr->send(out);
}
break;
case 3:
{
clog(NodeTableMessageSummary) << "Received PingNode from " << _from.address().to_string() << ":" << _from.port();
// todo: if we know the node, send a pong. otherwise ignore him.
// let's send a pong!
// todo: if we know the node, reply, otherwise ignore.
PingNode in = PingNode::fromBytesConstRef(_from, rlpBytes);
Pong p(_from);
p.replyTo = sha3(rlpBytes);
p.sign(m_secret);
m_socketPtr->send(p);
break;
}
@ -326,6 +344,11 @@ void NodeTable::onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytes
return;
}
}
catch (...)
{
}
}
void NodeTable::doCheckEvictions(boost::system::error_code const& _ec)
{

240
libp2p/NodeTable.h

@ -30,118 +30,6 @@ namespace dev
namespace p2p
{
/**
* Ping packet: Check if node is alive.
* PingNode is cached and regenerated after expiration - t, where t is timeout.
*
* RLP Encoded Items: 3
* Minimum Encoded Size: 18 bytes
* Maximum Encoded Size: bytes // todo after u128 addresses
*
* signature: Signature of message.
* ipAddress: Our IP address.
* port: Our port.
* expiration: Triggers regeneration of packet. May also provide control over synchronization.
*
* Ping is used to implement evict. When a new node is seen for
* a given bucket which is full, the least-responsive node is pinged.
* If the pinged node doesn't respond then it is removed and the new
* node is inserted.
*
* @todo uint128_t for ip address (<->integer ipv4/6, asio-address, asio-endpoint)
*
*/
struct PingNode: public RLPXDatagram
{
friend class NodeTable;
using RLPXDatagram::RLPXDatagram;
PingNode(bi::udp::endpoint _to, std::string _src, uint16_t _srcPort, std::chrono::seconds _expiration = std::chrono::seconds(60)): RLPXDatagram(_to), ipAddress(_src), port(_srcPort), expiration(fromNow(_expiration)) {}
std::string ipAddress; // rlp encoded bytes min: 16
uint16_t port; // rlp encoded bytes min: 1 max: 3
uint64_t expiration; // rlp encoded bytes min: 1 max: 9
protected:
void streamRLP(RLPStream& _s) const { _s.appendList(3); _s << ipAddress << port << expiration; }
void interpretRLP(bytesConstRef _bytes) { RLP r(_bytes); ipAddress = (std::string)r[0]; port = (uint16_t)r[1]; expiration = (uint64_t)r[2]; }
};
/**
* Pong packet: response to ping
*
* RLP Encoded Items: 1
* Minimum Encoded Size: 33 bytes
* Maximum Encoded Size: 33 bytes
*
* @todo value of replyTo
*/
struct Pong: RLPXDatagram
{
friend class NodeTable;
using RLPXDatagram::RLPXDatagram;
h256 replyTo;
void streamRLP(RLPStream& _s) const { _s.appendList(1); _s << replyTo; }
void interpretRLP(bytesConstRef _bytes) { RLP r(_bytes); replyTo = (h256)r[0]; }
};
/**
* FindNode Packet: Request k-nodes, closest to the target.
* FindNode is cached and regenerated after expiration - t, where t is timeout.
* FindNode implicitly results in finding neighbors of a given node.
*
* RLP Encoded Items: 2
* Minimum Encoded Size: 21 bytes
* Maximum Encoded Size: 30 bytes
*
* target: Address of node. The responding node will send back nodes closest to the target.
* expiration: Triggers regeneration of packet. May also provide control over synchronization.
*
*/
struct FindNode: RLPXDatagram
{
friend class NodeTable;
using RLPXDatagram::RLPXDatagram;
FindNode(bi::udp::endpoint _to, Address _target, std::chrono::seconds _expiration = std::chrono::seconds(30)): RLPXDatagram(_to), target(_target), expiration(fromNow(_expiration)) {}
h160 target;
uint64_t expiration;
void streamRLP(RLPStream& _s) const { _s.appendList(2); _s << target << expiration; }
void interpretRLP(bytesConstRef _bytes) { RLP r(_bytes); target = (h160)r[0]; expiration = (uint64_t)r[1]; }
};
/**
* Node Packet: Multiple node packets are sent in response to FindNode.
*
* RLP Encoded Items: 2 (first item is list)
* Minimum Encoded Size: 10 bytes
*
*/
struct Neighbors: RLPXDatagram
{
friend class NodeTable;
using RLPXDatagram::RLPXDatagram;
struct Node
{
Node() = default;
Node(bytesConstRef _bytes) { interpretRLP(_bytes); }
std::string ipAddress;
uint16_t port;
NodeId node;
void streamRLP(RLPStream& _s) const { _s.appendList(3); _s << ipAddress << port << node; }
void interpretRLP(bytesConstRef _bytes) { RLP r(_bytes); ipAddress = r[0].toString(); port = (uint16_t)r[1]; node = (h512)r[2]; }
};
std::list<Node> nodes;
h256 nonce;
void streamRLP(RLPStream& _s) const { _s.appendList(2); _s.appendList(nodes.size()); for (auto& n: nodes) n.streamRLP(_s); _s << nonce; }
void interpretRLP(bytesConstRef _bytes) { RLP r(_bytes); for (auto n: r[0]) nodes.push_back(Node(n.toBytesConstRef())); nonce = (h256)r[1]; }
};
/**
* NodeTable using S/Kademlia system for node discovery and preference.
* untouched buckets are refreshed if they have not been touched within an hour
@ -158,20 +46,20 @@ struct Neighbors: RLPXDatagram
* @todo std::shared_ptr<FindNeighbors> m_cachedFindSelfPacket;
*
* [Networking]
* @todo use eth/stun/ice/whatever for public-discovery
* @todo use eth/upnp/natpmp/stun/ice/etc for public-discovery
* @todo firewall
*
* [Protocol]
* @todo ping newly added nodes for eviction
* @todo optimize knowledge at opposite edges; eg, s_bitsPerStep lookups. (Can be done via pointers to NodeBucket)
* @todo ^ s_bitsPerStep = 5; // Denoted by b in [Kademlia]. Bits by which address space is divided.
* @todo optimize (use tree for state and/or custom compare for cache)
* @todo reputation (aka universal siblings lists)
* @todo dht (aka siblings)
*
* [Maintenance]
* @todo pretty logs
*/
class NodeTable: UDPSocketEvents, public std::enable_shared_from_this<NodeTable>
{
friend struct Neighbors;
using NodeSocket = UDPSocket<NodeTable, 1280>;
using TimePoint = std::chrono::steady_clock::time_point;
using EvictionTimeout = std::pair<std::pair<Address,TimePoint>,Address>;
@ -306,6 +194,126 @@ protected:
boost::asio::deadline_timer m_evictionCheckTimer; ///< Timer for handling node evictions.
};
/**
* Ping packet: Check if node is alive.
* PingNode is cached and regenerated after expiration - t, where t is timeout.
*
* RLP Encoded Items: 3
* Minimum Encoded Size: 18 bytes
* Maximum Encoded Size: bytes // todo after u128 addresses
*
* signature: Signature of message.
* ipAddress: Our IP address.
* port: Our port.
* expiration: Triggers regeneration of packet. May also provide control over synchronization.
*
* Ping is used to implement evict. When a new node is seen for
* a given bucket which is full, the least-responsive node is pinged.
* If the pinged node doesn't respond then it is removed and the new
* node is inserted.
*
* @todo uint128_t for ip address (<->integer ipv4/6, asio-address, asio-endpoint)
*
*/
struct PingNode: RLPXDatagram<PingNode>
{
using RLPXDatagram::RLPXDatagram;
PingNode(bi::udp::endpoint _ep, std::string _src, uint16_t _srcPort, std::chrono::seconds _expiration = std::chrono::seconds(60)): RLPXDatagram(_ep), ipAddress(_src), port(_srcPort), expiration(futureFromEpoch(_expiration)) {}
std::string ipAddress; // rlp encoded bytes min: 16
uint16_t port; // rlp encoded bytes min: 1 max: 3
uint64_t expiration; // rlp encoded bytes min: 1 max: 9
void streamRLP(RLPStream& _s) const { _s.appendList(3); _s << ipAddress << port << expiration; }
void interpretRLP(bytesConstRef _bytes) { RLP r(_bytes); ipAddress = (std::string)r[0]; port = (uint16_t)r[1]; expiration = (uint64_t)r[2]; }
};
/**
* Pong packet: response to ping
*
* RLP Encoded Items: 1
* Minimum Encoded Size: 33 bytes
* Maximum Encoded Size: 33 bytes
*
* @todo value of replyTo
* @todo create from PingNode (reqs RLPXDatagram verify flag)
*/
struct Pong: RLPXDatagram<Pong>
{
using RLPXDatagram::RLPXDatagram;
h256 replyTo; // hash of rlp of PingNode
void streamRLP(RLPStream& _s) const { _s.appendList(1); _s << replyTo; }
void interpretRLP(bytesConstRef _bytes) { RLP r(_bytes); replyTo = (h256)r[0]; }
};
/**
* FindNode Packet: Request k-nodes, closest to the target.
* FindNode is cached and regenerated after expiration - t, where t is timeout.
* FindNode implicitly results in finding neighbors of a given node.
*
* RLP Encoded Items: 2
* Minimum Encoded Size: 21 bytes
* Maximum Encoded Size: 30 bytes
*
* target: Address of node. The responding node will send back nodes closest to the target.
* expiration: Triggers regeneration of packet. May also provide control over synchronization.
*
*/
struct FindNode: RLPXDatagram<FindNode>
{
using RLPXDatagram::RLPXDatagram;
FindNode(bi::udp::endpoint _ep, Address _target, std::chrono::seconds _expiration = std::chrono::seconds(30)): RLPXDatagram(_ep), target(_target), expiration(futureFromEpoch(_expiration)) {}
h160 target;
uint64_t expiration;
void streamRLP(RLPStream& _s) const { _s.appendList(2); _s << target << expiration; }
void interpretRLP(bytesConstRef _bytes) { RLP r(_bytes); target = (h160)r[0]; expiration = (uint64_t)r[1]; }
};
/**
* Node Packet: Multiple node packets are sent in response to FindNode.
*
* RLP Encoded Items: 2 (first item is list)
* Minimum Encoded Size: 10 bytes
*
* @todo nonce
*/
struct Neighbors: RLPXDatagram<Neighbors>
{
struct Node
{
Node() = default;
Node(bytesConstRef _bytes) { interpretRLP(_bytes); }
std::string ipAddress;
uint16_t port;
NodeId node;
void streamRLP(RLPStream& _s) const { _s.appendList(3); _s << ipAddress << port << node; }
void interpretRLP(bytesConstRef _bytes) { RLP r(_bytes); ipAddress = r[0].toString(); port = (uint16_t)r[1]; node = (h512)r[2]; }
};
using RLPXDatagram::RLPXDatagram;
Neighbors(bi::udp::endpoint _to, std::vector<std::shared_ptr<NodeTable::NodeEntry>> const& _nearest): RLPXDatagram(_to), nonce(h256())
{
for (auto& n: _nearest)
{
Node node;
node.ipAddress = n->endpoint.udp.address().to_string();
node.port = n->endpoint.udp.port();
node.node = n->publicKey();
nodes.push_back(node);
}
}
std::list<Node> nodes;
h256 nonce;
void streamRLP(RLPStream& _s) const { _s.appendList(2); _s.appendList(nodes.size()); for (auto& n: nodes) n.streamRLP(_s); _s << nonce; }
void interpretRLP(bytesConstRef _bytes) { RLP r(_bytes); for (auto n: r[0]) nodes.push_back(Node(n.toBytesConstRef())); nonce = (h256)r[1]; }
};
struct NodeTableWarn: public LogChannel { static const char* name() { return "!P!"; } static const int verbosity = 0; };
struct NodeTableNote: public LogChannel { static const char* name() { return "*P*"; } static const int verbosity = 1; };
struct NodeTableMessageSummary: public LogChannel { static const char* name() { return "-P-"; } static const int verbosity = 2; };

13
libp2p/UDP.cpp

@ -22,16 +22,3 @@
#include "UDP.h"
using namespace dev;
using namespace dev::p2p;
h256 RLPXDatagram::sign(Secret const& _k)
{
RLPStream packet;
streamRLP(packet);
bytes b(packet.out());
h256 h(dev::sha3(b));
Signature sig = dev::sign(_k, h);
data.resize(b.size() + Signature::size);
sig.ref().copyTo(&data);
memcpy(data.data() + sizeof(Signature), b.data(), b.size());
return std::move(h);
}

35
libp2p/UDP.h

@ -40,13 +40,13 @@ namespace p2p
/**
* UDP Datagram
* @todo make data private
* @todo make data protected/functional
*/
class UDPDatagram
{
public:
UDPDatagram(bi::udp::endpoint _ep): locus(_ep) {}
UDPDatagram(bi::udp::endpoint _ep, bytes _data): data(_data), locus(_ep) {}
UDPDatagram(bi::udp::endpoint const& _ep): locus(_ep) {}
UDPDatagram(bi::udp::endpoint const& _ep, bytes _data): data(_data), locus(_ep) {}
bi::udp::endpoint const& endpoint() const { return locus; }
bytes data;
@ -56,21 +56,42 @@ protected:
/**
* @brief RLPX Datagram which can be signed.
* @todo compact templates
* @todo make data private/functional (see UDPDatagram)
* @todo valid=true/false (based on signature)
*/
template <class T>
struct RLPXDatagram: public UDPDatagram
{
static uint64_t fromNow(std::chrono::milliseconds _ms) { return std::chrono::duration_cast<std::chrono::milliseconds>((std::chrono::system_clock::now() + _ms).time_since_epoch()).count(); }
static uint64_t fromNow(std::chrono::seconds _sec) { return std::chrono::duration_cast<std::chrono::milliseconds>((std::chrono::system_clock::now() + _sec).time_since_epoch()).count(); }
static T fromBytesConstRef(bi::udp::endpoint const& _ep, bytesConstRef _bytes) { T t(_ep); t.interpretRLP(_bytes); return std::move(t); }
static uint64_t futureFromEpoch(std::chrono::milliseconds _ms) { return std::chrono::duration_cast<std::chrono::milliseconds>((std::chrono::system_clock::now() + _ms).time_since_epoch()).count(); }
static uint64_t futureFromEpoch(std::chrono::seconds _sec) { return std::chrono::duration_cast<std::chrono::milliseconds>((std::chrono::system_clock::now() + _sec).time_since_epoch()).count(); }
RLPXDatagram(bi::udp::endpoint _ep): UDPDatagram(_ep) {}
RLPXDatagram(bi::udp::endpoint const& _ep): UDPDatagram(_ep) {}
virtual h256 sign(Secret const& _from);
protected:
virtual void streamRLP(RLPStream&) const =0;
virtual void interpretRLP(bytesConstRef _bytes) =0;
protected:
Signature signature;
};
template <class T>
h256 RLPXDatagram<T>::sign(Secret const& _k)
{
RLPStream packet;
streamRLP(packet);
bytes b(packet.out());
h256 h(dev::sha3(b));
Signature sig = dev::sign(_k, h);
data.resize(b.size() + Signature::size);
sig.ref().copyTo(&data);
memcpy(data.data() + sizeof(Signature), b.data(), b.size());
return std::move(h);
}
/**
* @brief Interface which UDPSocket will implement.
*/

6
test/net.cpp

@ -54,7 +54,6 @@ struct TestNodeTable: public NodeTable
void setup(std::vector<std::pair<KeyPair,unsigned>> const& _testNodes)
{
/// Phase 1 test: populate with pings
/// Phase 2 test: pre-populate *expected* ping-responses, send pings
bi::address ourIp = bi::address::from_string("127.0.0.1");
@ -63,11 +62,6 @@ struct TestNodeTable: public NodeTable
for (auto& n: _testNodes)
ping(bi::udp::endpoint(ourIp, n.second));
// wait 1ms between each send
// send PingNode for each s_bootstrapNodes
// wait until nodecount is s_testNodes.count()
}
void reset()

Loading…
Cancel
Save