Browse Source

refactor constants to be determined during compile. added RLPXDatagram. constructors and updates to datagram constructors. docs and logging. WIP tests and host/harness for NodeTable.

cl-refactor
subtly 10 years ago
parent
commit
da5acab432
  1. 91
      libp2p/NodeTable.cpp
  2. 162
      libp2p/NodeTable.h
  3. 8
      libp2p/UDP.cpp
  4. 46
      libp2p/UDP.h
  5. 71
      test/net.cpp

91
libp2p/NodeTable.cpp

@ -24,18 +24,19 @@ using namespace std;
using namespace dev;
using namespace dev::p2p;
NodeTable::NodeTable(ba::io_service& _io):
m_node(NodeEntry(Address(), Public(), bi::udp::endpoint())),
m_socket(new nodeSocket(_io, *this, 30300)),
m_socketPtr(m_socket.get()),
m_io(_io),
m_bucketRefreshTimer(m_io),
m_evictionCheckTimer(m_io)
{
for (unsigned i = 0; i < s_bins; i++)
m_state[i].distance = i, m_state[i].modified = chrono::steady_clock::now() - chrono::seconds(1);
doRefreshBuckets(boost::system::error_code());
}
NodeTable::NodeTable(ba::io_service& _io, uint16_t _port):
m_node(Node(Address(), Public(), bi::udp::endpoint())),
m_socket(new nodeSocket(_io, *this, _port)),
m_socketPtr(m_socket.get()),
m_io(_io),
m_bucketRefreshTimer(m_io),
m_evictionCheckTimer(m_io)
{
for (unsigned i = 0; i < s_bins; i++)
m_state[i].distance = i, m_state[i].modified = chrono::steady_clock::now() - chrono::seconds(1);
doRefreshBuckets(boost::system::error_code());
m_socketPtr->connect();
}
NodeTable::~NodeTable()
{
@ -44,7 +45,7 @@ NodeTable::~NodeTable()
m_socketPtr->disconnect();
}
void NodeTable::join()
void NodeTable::join()
{
doFindNode(m_node.id);
}
@ -66,17 +67,14 @@ NodeTable::NodeEntry NodeTable::operator[](Address _id)
void NodeTable::requestNeighbors(NodeEntry const& _node, Address _target) const
{
FindNeighbors p;
p.target = _target;
p.to = _node.endpoint.udp;
p.seal(m_secret);
FindNode p(_node.endpoint.udp, _target);
p.sign(m_secret);
m_socketPtr->send(p);
}
void NodeTable::doFindNode(Address _node, unsigned _round, std::shared_ptr<std::set<std::shared_ptr<NodeEntry>>> _tried)
{
if (!m_socketPtr->isOpen() || _round == 7)
if (!m_socketPtr->isOpen() || _round == s_maxSteps)
return;
auto nearest = findNearest(_node);
@ -84,20 +82,27 @@ void NodeTable::doFindNode(Address _node, unsigned _round, std::shared_ptr<std::
for (unsigned i = 0; i < nearest.size() && tried.size() < s_alpha; i++)
if (!_tried->count(nearest[i]))
{
tried.push_back(nearest[i]);
requestNeighbors(*nearest[i], _node);
auto r = nearest[i];
tried.push_back(r);
FindNode p(r->endpoint.udp, _node);
p.sign(m_secret);
m_socketPtr->send(p);
}
else
continue;
while (auto n = tried.front())
if (tried.empty())
{
_tried->insert(n);
clog(NodeTableWarn) << "Terminating doFindNode after " << _round << " rounds.";
return;
}
while (!tried.empty())
{
_tried->insert(tried.front());
tried.pop_front();
}
auto self(shared_from_this());
m_evictionCheckTimer.expires_from_now(boost::posix_time::milliseconds(s_findTimout));
m_evictionCheckTimer.expires_from_now(boost::posix_time::milliseconds(c_reqTimeout.count()));
m_evictionCheckTimer.async_wait([this, self, _node, _round, _tried](boost::system::error_code const& _ec)
{
if (_ec)
@ -108,15 +113,16 @@ void NodeTable::doFindNode(Address _node, unsigned _round, std::shared_ptr<std::
std::vector<std::shared_ptr<NodeTable::NodeEntry>> NodeTable::findNearest(Address _target)
{
// send s_alpha FindNeighbors packets to nodes we know, closest to target
// send s_alpha FindNode packets to nodes we know, closest to target
unsigned head = dist(m_node.id, _target);
unsigned tail = (head - 1) % (s_bits - 1);
unsigned tail = (head - 1) % s_bins;
// todo: optimize with tree
std::map<unsigned, std::list<std::shared_ptr<NodeEntry>>> found;
unsigned count = 0;
// if d is 0, then we roll look forward, if last, we reverse, else, spread from d
#pragma warning TODO: This should probably be s_bins instead of s_bits.
if (head != 0 && tail != s_bits)
while (head != tail && count < s_bucketSize)
{
@ -140,7 +146,7 @@ std::vector<std::shared_ptr<NodeTable::NodeEntry>> NodeTable::findNearest(Addres
break;
}
head++;
tail = (tail - 1) % (s_bits - 1);
tail = (tail - 1) % s_bins;
}
else if (head == 0)
while (head < s_bucketSize && count < s_bucketSize)
@ -156,7 +162,7 @@ std::vector<std::shared_ptr<NodeTable::NodeEntry>> NodeTable::findNearest(Addres
}
head--;
}
else if (tail == s_bits - 1)
else if (tail == s_bins)
while (tail > 0 && count < s_bucketSize)
{
Guard l(x_state);
@ -178,21 +184,17 @@ std::vector<std::shared_ptr<NodeTable::NodeEntry>> NodeTable::findNearest(Addres
return std::move(ret);
}
void NodeTable::ping(bi::address _address, unsigned _port) const
void NodeTable::ping(bi::udp::endpoint _to) const
{
PingNode p;
string ip = m_node.endpoint.udp.address().to_string();
p.ipAddress = asBytes(ip);
p.port = m_node.endpoint.udp.port();
// p.expiration;
p.seal(m_secret);
PingNode p(_to, m_node.endpoint.udp.address().to_string(), m_node.endpoint.udp.port());
p.sign(m_secret);
m_socketPtr->send(p);
}
void NodeTable::ping(NodeEntry* _n) const
{
if (_n && _n->endpoint.udp.address().is_v4())
ping(_n->endpoint.udp.address(), _n->endpoint.udp.port());
if (_n)
ping(_n->endpoint.udp);
}
void NodeTable::evict(std::shared_ptr<NodeEntry> _leastSeen, std::shared_ptr<NodeEntry> _new)
@ -279,6 +281,7 @@ void NodeTable::onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytes
{
RLP rlp(_packet);
clog(NodeTableNote) << "Received message from " << _from.address().to_string() << ":" << _from.port();
// whenever a pong is received, first check if it's in m_evictions, if so, remove it
Guard l(x_evictions);
@ -289,8 +292,8 @@ void NodeTable::doCheckEvictions(boost::system::error_code const& _ec)
if (_ec || !m_socketPtr->isOpen())
return;
m_evictionCheckTimer.expires_from_now(boost::posix_time::milliseconds(s_evictionCheckInterval));
auto self(shared_from_this());
m_evictionCheckTimer.expires_from_now(boost::posix_time::milliseconds(s_evictionCheckInterval));
m_evictionCheckTimer.async_wait([this, self](boost::system::error_code const& _ec)
{
if (_ec)
@ -301,7 +304,7 @@ void NodeTable::doCheckEvictions(boost::system::error_code const& _ec)
{
Guard l(x_evictions);
for (auto& e: m_evictions)
if (chrono::steady_clock::now() - e.first.second > chrono::milliseconds(s_pingTimeout))
if (chrono::steady_clock::now() - e.first.second > c_reqTimeout)
{
Guard l(x_nodes);
drop.push_back(m_nodes[e.second]);
@ -319,7 +322,7 @@ void NodeTable::doCheckEvictions(boost::system::error_code const& _ec)
void NodeTable::doRefreshBuckets(boost::system::error_code const& _ec)
{
cout << "refreshing buckets" << endl;
clog(NodeTableNote) << "refreshing buckets";
if (_ec)
return;
@ -329,7 +332,7 @@ void NodeTable::doRefreshBuckets(boost::system::error_code const& _ec)
{
Guard l(x_state);
for (auto& d: m_state)
if (chrono::steady_clock::now() - d.modified > chrono::seconds(s_bucketRefresh))
if (chrono::steady_clock::now() - d.modified > c_bucketRefresh)
while (!d.nodes.empty())
{
auto n = d.nodes.front();
@ -343,7 +346,7 @@ void NodeTable::doRefreshBuckets(boost::system::error_code const& _ec)
}
}
unsigned nextRefresh = connected ? (refreshed ? 200 : s_bucketRefresh*1000) : 10000;
unsigned nextRefresh = connected ? (refreshed ? 200 : c_bucketRefresh.count()*1000) : 10000;
auto runcb = [this](boost::system::error_code const& error) -> void { doRefreshBuckets(error); };
m_bucketRefreshTimer.expires_from_now(boost::posix_time::milliseconds(nextRefresh));
m_bucketRefreshTimer.async_wait(runcb);

162
libp2p/NodeTable.h

@ -21,6 +21,7 @@
#pragma once
#include <boost/integer/static_log2.hpp>
#include <libdevcrypto/Common.h>
#include <libp2p/UDP.h>
@ -42,51 +43,58 @@ namespace p2p
* a given bucket which is full, the least-responsive node is pinged.
* If the pinged node doesn't respond then it is removed and the new
* node is inserted.
*
* @todo uint128_t for ip address (<->integer ipv4/6, asio-address, asio-endpoint)
*
*/
struct PingNode: RLPDatagram
struct PingNode: RLPXDatagram
{
bytes ipAddress;
using RLPXDatagram::RLPXDatagram;
PingNode(bi::udp::endpoint _to, std::string _src, uint16_t _srcPort, std::chrono::seconds _expiration = std::chrono::seconds(60)): RLPXDatagram(_to), ipAddress(_src), port(_srcPort), expiration(fromNow(_expiration)) {}
std::string ipAddress;
uint16_t port;
uint64_t expiration;
Signature signature;
void streamRLP(RLPStream& _s) const { _s.appendList(3); _s << ipAddress << port << expiration; }
};
struct Pong: RLPDatagram
struct Pong: RLPXDatagram
{
// todo: weak-signed pong
Address from;
uint64_t replyTo; /// expiration from PingNode
using RLPXDatagram::RLPXDatagram;
h256 replyTo; /// TBD
void streamRLP(RLPStream& _s) const { _s.appendList(2); _s << from << replyTo; }
void streamRLP(RLPStream& _s) const { _s.appendList(1); _s << replyTo; }
};
/**
* FindNeighbors Packet: Request k-nodes, closest to the target.
* FindNeighbors is cached and regenerated after expiration - t, where t is timeout.
* FindNode Packet: Request k-nodes, closest to the target.
* FindNode is cached and regenerated after expiration - t, where t is timeout.
* FindNode implicitly results in finding neighbors of a given node.
*
* signature: Signature of message.
* target: Address of NodeId. The responding node will send back nodes closest to the target.
* expiration: Triggers regeneration of packet. May also provide control over synchronization.
*
*/
struct FindNeighbors: RLPDatagram
struct FindNode: RLPXDatagram
{
using RLPXDatagram::RLPXDatagram;
FindNode(bi::udp::endpoint _to, Address _target, std::chrono::seconds _expiration = std::chrono::seconds(30)): RLPXDatagram(_to), target(_target), expiration(fromNow(_expiration)) {}
h160 target;
uint64_t expiration;
Signature signature;
void streamRLP(RLPStream& _s) const { _s.appendList(2); _s << target << expiration; }
};
/**
* Node Packet: Multiple node packets are sent in response to FindNeighbors.
* Node Packet: Multiple node packets are sent in response to FindNode.
*/
struct Neighbors: RLPDatagram
struct Neighbors: RLPXDatagram
{
using RLPXDatagram::RLPXDatagram;
struct Node
{
bytes ipAddress;
@ -110,31 +118,32 @@ struct Neighbors: RLPDatagram
* Thread-safety is ensured by modifying NodeEntry details via
* shared_ptr replacement instead of mutating values.
*
* @todo don't try to evict node if node isRequired. (support for makeRequired)
* @todo optimize (use tree for state (or set w/custom compare for cache))
* [Interface]
* @todo constructor support for m_node, m_secret
* @todo use s_bitsPerStep for find and refresh/ping
* @todo exclude bucket from refresh if we have node as peer
* @todo don't try to evict node if node isRequired. (support for makeRequired)
* @todo exclude bucket from refresh if we have node as peer (support for makeRequired)
* @todo restore nodes
* @todo std::shared_ptr<PingNode> m_cachedPingPacket;
* @todo std::shared_ptr<FindNeighbors> m_cachedFindSelfPacket;
*
* [Networking]
* @todo use eth/stun/ice/whatever for public-discovery
*
* [Protocol]
* @todo optimize knowledge at opposite edges; eg, s_bitsPerStep lookups. (Can be done via pointers to NodeBucket)
* @todo ^ s_bitsPerStep = 5; // Denoted by b in [Kademlia]. Bits by which address space is divided.
* @todo optimize (use tree for state and/or custom compare for cache)
* @todo reputation (aka universal siblings lists)
* @todo dht (aka siblings)
*
* [Maintenance]
* @todo pretty logs
*/
class NodeTable: UDPSocketEvents, public std::enable_shared_from_this<NodeTable>
{
using nodeSocket = UDPSocket<NodeTable, 1024>;
using nodeSocket = UDPSocket<NodeTable, 1280>;
using timePoint = std::chrono::steady_clock::time_point;
static unsigned const s_bucketSize = 16; // Denoted by k in [Kademlia]. Number of nodes stored in each bucket.
// const unsigned s_bitsPerStep = 5; // @todo Denoted by b in [Kademlia]. Bits by which address space will be divided for find responses.
static unsigned const s_alpha = 3; // Denoted by \alpha in [Kademlia]. Number of concurrent FindNeighbors requests.
const unsigned s_findTimout = 300; // How long to wait between find queries.
// const unsigned s_siblings = 5; // @todo Denoted by s in [S/Kademlia]. User-defined by sub-protocols.
const unsigned s_bucketRefresh = 3600; // Refresh interval prevents bucket from becoming stale. [Kademlia]
static unsigned const s_bits = 8 * Address::size; // Denoted by n.
static unsigned const s_bins = s_bits - 1; //
const unsigned s_evictionCheckInterval = 75; // Interval by which eviction timeouts are checked.
const unsigned s_pingTimeout = 500;
public:
static unsigned dist(Address const& _a, Address const& _b) { u160 d = _a ^ _b; unsigned ret; for (ret = 0; d >>= 1; ++ret) {}; return ret; }
using EvictionTimeout = std::pair<std::pair<Address,timePoint>,Address>;
struct NodeDefaultEndpoint
{
@ -142,16 +151,29 @@ public:
bi::udp::endpoint udp;
};
struct NodeEntry
struct Node
{
NodeEntry(Address _id, Public _pubk, bi::udp::endpoint _udp): id(_id), pubk(_pubk), endpoint(NodeDefaultEndpoint(_udp)), distance(0) {}
NodeEntry(NodeEntry _src, Address _id, Public _pubk, bi::udp::endpoint _udp): id(_id), pubk(_pubk), endpoint(NodeDefaultEndpoint(_udp)), distance(dist(_src.id,_id)) {}
NodeEntry(NodeEntry _src, Address _id, Public _pubk, NodeDefaultEndpoint _gw): id(_id), pubk(_pubk), endpoint(_gw), distance(dist(_src.id,_id)) {}
Node(Address _id, Public _pubk, NodeDefaultEndpoint _udp): id(_id), pubk(_pubk), endpoint(_udp) {}
Node(Address _id, Public _pubk, bi::udp::endpoint _udp): Node(_id, _pubk, NodeDefaultEndpoint(_udp)) {}
virtual Address const& address() const { return id; }
virtual Public const& publicKey() const { return pubk; }
Address id;
Public pubk;
NodeDefaultEndpoint endpoint; ///< How we've previously connected to this node. (must match node's reported endpoint)
const unsigned distance;
timePoint activePing;
NodeDefaultEndpoint endpoint;
};
/**
* NodeEntry
* @todo Type of id will become template parameter.
*/
struct NodeEntry: public Node
{
NodeEntry(Node _src, Address _id, Public _pubk, NodeDefaultEndpoint _gw): Node(_id, _pubk, _gw), distance(dist(_src.id,_id)) {}
NodeEntry(Node _src, Address _id, Public _pubk, bi::udp::endpoint _udp): Node(_id, _pubk, NodeDefaultEndpoint(_udp)), distance(dist(_src.id,_id)) {}
const unsigned distance; ///< Node's distance from _src (see constructor).
};
struct NodeBucket
@ -161,9 +183,30 @@ public:
std::list<std::weak_ptr<NodeEntry>> nodes;
};
using EvictionTimeout = std::pair<std::pair<Address,timePoint>,Address>;
public:
/// Constants for Kademlia, mostly derived from address space.
static constexpr unsigned s_addressByteSize = sizeof(NodeEntry::id); ///< Size of address type in bytes.
static constexpr unsigned s_bits = 8 * s_addressByteSize; ///< Denoted by n in [Kademlia].
static constexpr unsigned s_bins = s_bits - 1; ///< Size of m_state (excludes root, which is us).
static constexpr unsigned s_maxSteps = boost::static_log2<s_bits>::value; ///< Max iterations of discovery. (doFindNode)
/// Chosen constants
static constexpr unsigned s_bucketSize = 16; ///< Denoted by k in [Kademlia]. Number of nodes stored in each bucket.
static constexpr unsigned s_alpha = 3; ///< Denoted by \alpha in [Kademlia]. Number of concurrent FindNode requests.
static constexpr uint16_t s_defaultPort = 30300; ///< Default port to listen on.
/// Intervals
static constexpr unsigned s_evictionCheckInterval = 75; ///< Interval at which eviction timeouts are checked.
std::chrono::milliseconds const c_reqTimeout = std::chrono::milliseconds(300); ///< How long to wait for requests (evict, find iterations).
std::chrono::seconds const c_bucketRefresh = std::chrono::seconds(3600); ///< Refresh interval prevents bucket from becoming stale. [Kademlia]
static unsigned dist(Address const& _a, Address const& _b) { u160 d = _a ^ _b; unsigned ret; for (ret = 0; d >>= 1; ++ret) {}; return ret; }
NodeTable(ba::io_service& _io);
NodeTable(ba::io_service& _io, uint16_t _port = s_defaultPort);
~NodeTable();
void join();
@ -173,14 +216,13 @@ public:
NodeEntry operator[](Address _id);
protected:
void requestNeighbors(NodeEntry const& _node, Address _target) const;
/// Sends requests to other nodes requesting nodes "near" to us in order to populate node table such that connected nodes form centrality.
/// Repeatedly sends s_alpha concurrent requests to nodes nearest to target, for nodes nearest to target, up to .
void doFindNode(Address _node, unsigned _round = 0, std::shared_ptr<std::set<std::shared_ptr<NodeEntry>>> _tried = std::shared_ptr<std::set<std::shared_ptr<NodeEntry>>>());
/// Returns nodes nearest to target.
std::vector<std::shared_ptr<NodeEntry>> findNearest(Address _target);
void ping(bi::address _address, unsigned _port) const;
void ping(bi::udp::endpoint _to) const;
void ping(NodeEntry* _n) const;
@ -194,16 +236,27 @@ protected:
NodeBucket const& bucket(NodeEntry* _n) const;
/// Network Events
void onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytesConstRef _packet);
void onDisconnected(UDPSocketFace*) {};
/// Tasks
void doCheckEvictions(boost::system::error_code const& _ec);
void doRefreshBuckets(boost::system::error_code const& _ec);
#ifndef BOOST_AUTO_TEST_SUITE
private:
NodeEntry m_node; ///< This node.
#else
protected:
#endif
/// Sends s_alpha concurrent FindNeighbor requests to nodes closest to target until
void requestNeighbors(NodeEntry const& _node, Address _target) const;
Node m_node; ///< This node.
Secret m_secret; ///< This nodes secret key.
mutable Mutex x_nodes; ///< Mutable for thread-safe copy in nodes() const.
@ -215,12 +268,15 @@ private:
Mutex x_evictions;
std::deque<EvictionTimeout> m_evictions; ///< Eviction timeouts.
std::shared_ptr<nodeSocket> m_socket; ///< Shared pointer for our UDPSocket; ASIO requires shared_ptr.
std::shared_ptr<nodeSocket> m_socket; ///< Shared pointer for our UDPSocket; ASIO requires shared_ptr.
nodeSocket* m_socketPtr; ///< Set to m_socket.get().
ba::io_service& m_io; ///< Used by bucket refresh timer.
boost::asio::deadline_timer m_bucketRefreshTimer; ///< Timer which schedules and enacts bucket refresh.
boost::asio::deadline_timer m_evictionCheckTimer; ///< Timer for handling node evictions.
};
struct NodeTableWarn: public LogChannel { static const char* name() { return "!P!"; } static const int verbosity = 0; };
struct NodeTableNote: public LogChannel { static const char* name() { return "*P*"; } static const int verbosity = 1; };
}
}

8
libp2p/UDP.cpp

@ -23,13 +23,15 @@
using namespace dev;
using namespace dev::p2p;
void RLPDatagram::seal(Secret const& _k)
h256 RLPXDatagram::sign(Secret const& _k)
{
RLPStream packet;
streamRLP(packet);
bytes b(packet.out());
Signature sig = dev::sign(_k, dev::sha3(b));
h256 h(dev::sha3(b));
Signature sig = dev::sign(_k, h);
data.resize(data.size() + Signature::size);
sig.ref().copyTo(&data);
memcpy(data.data()+sizeof(Signature),b.data(),b.size());
memcpy(data.data() + sizeof(Signature), b.data(), b.size());
return std::move(h);
}

46
libp2p/UDP.h

@ -38,27 +38,49 @@ namespace dev
namespace p2p
{
struct UDPDatagram
/**
* UDP Datagram
* @todo make data private
*/
class UDPDatagram
{
UDPDatagram() = default;
UDPDatagram(bi::udp::endpoint _ep, bytes _data): to(_ep), data(std::move(_data)) {}
bi::udp::endpoint to;
public:
UDPDatagram(bi::udp::endpoint _ep): locus(_ep) {}
UDPDatagram(bi::udp::endpoint _ep, bytes _data): data(_data), locus(_ep) {}
bi::udp::endpoint const& endpoint() const { return locus; }
bytes data;
protected:
bi::udp::endpoint locus;
};
struct RLPDatagram: UDPDatagram
/**
* @brief RLPX Datagram which can be signed.
*/
struct RLPXDatagram: public UDPDatagram
{
virtual void seal(Secret const& _k);
protected:
virtual void streamRLP(RLPStream& _s) const {};
static uint64_t fromNow(std::chrono::milliseconds _ms) { return std::chrono::duration_cast<std::chrono::milliseconds>((std::chrono::system_clock::now() + _ms).time_since_epoch()).count(); }
static uint64_t fromNow(std::chrono::seconds _sec) { return std::chrono::duration_cast<std::chrono::milliseconds>((std::chrono::system_clock::now() + _sec).time_since_epoch()).count(); }
RLPXDatagram(bi::udp::endpoint _ep): UDPDatagram(_ep) {}
virtual h256 sign(Secret const& _from);
virtual void streamRLP(RLPStream&) const = 0;
Signature signature;
};
/**
* @brief Interface which UDPSocket will implement.
*/
struct UDPSocketFace
{
virtual bool send(UDPDatagram const& _msg) = 0;
virtual void disconnect() = 0;
};
/**
* @brief Interface which a UDPSocket's owner must implement.
*/
struct UDPSocketEvents
{
virtual void onDisconnected(UDPSocketFace*) {};
@ -68,6 +90,9 @@ struct UDPSocketEvents
/**
* @brief UDP Interface
* Handler must implement UDPSocketEvents.
*
* @todo multiple endpoints (we cannot advertise 0.0.0.0)
* @todo decouple deque from UDPDatagram and add ref() to datagram for fire&forget
*/
template <typename Handler, unsigned MaxDatagramSize>
class UDPSocket: UDPSocketFace, public std::enable_shared_from_this<UDPSocket<Handler, MaxDatagramSize>>
@ -76,7 +101,6 @@ public:
static constexpr unsigned maxDatagramSize = MaxDatagramSize;
static_assert(maxDatagramSize < 65507, "UDP datagrams cannot be larger than 65507 bytes");
/// Construct open socket to endpoint.
UDPSocket(ba::io_service& _io, UDPSocketEvents& _host, unsigned _port): m_host(_host), m_endpoint(bi::udp::v4(), _port), m_socket(_io) { m_started.store(false); m_closed.store(true); };
virtual ~UDPSocket() { disconnect(); }
@ -142,6 +166,7 @@ bool UDPSocket<Handler,MaxDatagramSize>::send(UDPDatagram const& _datagram)
Guard l(x_sendQ);
sendQ.push_back(_datagram);
clog(NoteChannel) << "qued datagram";
if (sendQ.size() == 1)
doWrite();
@ -169,7 +194,7 @@ void UDPSocket<Handler,MaxDatagramSize>::doWrite()
{
const UDPDatagram& datagram = sendQ[0];
auto self(UDPSocket<Handler, MaxDatagramSize>::shared_from_this());
m_socket.async_send_to(boost::asio::buffer(datagram.data), datagram.to, [this, self](boost::system::error_code _ec, std::size_t)
m_socket.async_send_to(boost::asio::buffer(datagram.data), datagram.endpoint(), [this, self](boost::system::error_code _ec, std::size_t)
{
if (_ec)
return disconnectWithError(_ec);
@ -180,6 +205,7 @@ void UDPSocket<Handler,MaxDatagramSize>::doWrite()
if (sendQ.empty())
return;
}
clog(NoteChannel) << "sent datagram";
doWrite();
});
}

71
test/net.cpp

@ -30,6 +30,8 @@ using namespace dev::p2p;
namespace ba = boost::asio;
namespace bi = ba::ip;
BOOST_AUTO_TEST_SUITE(p2p)
/**
* Only used for testing. Not useful beyond tests.
*/
@ -37,7 +39,7 @@ class TestHost: public Worker
{
public:
TestHost(): Worker("test",0), m_io() {};
~TestHost() { m_io.stop(); stopWorking(); }
virtual ~TestHost() { m_io.stop(); stopWorking(); }
void start() { startWorking(); }
void doWork() { m_io.run(); }
@ -45,28 +47,62 @@ protected:
ba::io_service m_io;
};
struct TestNodeTable: public NodeTable
{
void generateTestNodes(int _count = 10)
{
asserts(_count < 1000);
static uint16_t s_basePort = 30500;
m_testNodes.clear();
for (auto i = 0; i < _count; i++)
m_testNodes.push_back(make_pair(KeyPair::create(),s_basePort++));
}
std::vector<std::pair<KeyPair,unsigned>> m_testNodes; // keypair and port
/// Constructor
using NodeTable::NodeTable;
void setup()
{
/// Phase 1 test: populate with pings
/// Phase 2 test: pre-populate *expected* ping-responses, send pings
bi::address ourIp = bi::address::from_string("127.0.0.1");
uint16_t ourPort = 30300;
bi::udp::endpoint ourEndpoint(ourIp, ourPort);
generateTestNodes();
for (auto& n: m_testNodes)
ping(bi::udp::endpoint(ourIp, n.second));
// wait 1ms between each send
// send PingNode for each s_bootstrapNodes
// wait until nodecount is s_testNodes.count()
}
void reset()
{
Guard l(x_state);
for (auto& n: m_state) n.nodes.clear();
}
};
/**
* Only used for testing. Not useful beyond tests.
*/
class TestNodeHost: public TestHost
struct TestNodeTableHost: public TestHost
{
public:
TestNodeHost(): m_nodes(m_io) {};
~TestNodeHost() { m_io.stop(); stopWorking(); }
void start() { startWorking(); }
void doWork() { m_io.run(); }
NodeTable m_nodes;
TestNodeTableHost(): nodeTable(new TestNodeTable(m_io)) {};
shared_ptr<TestNodeTable> nodeTable;
};
class TestUDPSocket: UDPSocketEvents, public TestHost
{
public:
TestUDPSocket(): m_socket(new UDPSocket<TestUDPSocket, 1024>(m_io, *this, 30300)) {}
~TestUDPSocket() { m_io.stop(); stopWorking(); }
void start() { startWorking(); }
void doWork() { m_io.run(); }
void onDisconnected(UDPSocketFace*) {};
void onReceived(UDPSocketFace*, bi::udp::endpoint const&, bytesConstRef _packet) { if (_packet.toString() == "AAAA") success = true; }
@ -75,12 +111,13 @@ public:
bool success = false;
};
BOOST_AUTO_TEST_SUITE(p2p)
BOOST_AUTO_TEST_CASE(kademlia)
{
TestNodeHost nodeHost;
// TestNodeTableHost node;
// node.start();
// node.nodeTable->join(); // ideally, joining with empty node table logs warning we can check for
// node.nodeTable->setup();
// sleep(1);
}
BOOST_AUTO_TEST_CASE(test_txrx_one)

Loading…
Cancel
Save