Browse Source

Merge pull request #720 from ethereum/p2p-udp-nodetable

p2p udp nodetable
cl-refactor
Gav Wood 10 years ago
parent
commit
5cd87a25f5
  1. 6
      libdevcore/Common.h
  2. 46
      libp2p/Host.cpp
  3. 6
      libp2p/Host.h
  4. 2
      libp2p/Network.cpp
  5. 11
      libp2p/Network.h
  6. 467
      libp2p/NodeTable.cpp
  7. 349
      libp2p/NodeTable.h
  8. 2
      libp2p/Session.cpp
  9. 4
      libp2p/Session.h
  10. 54
      libp2p/UDP.cpp
  11. 265
      libp2p/UDP.h
  12. 4
      test/boostTest.cpp
  13. 21
      test/kademlia.cpp
  14. 214
      test/net.cpp
  15. 55
      test/network.cpp

6
libdevcore/Common.h

@ -36,7 +36,12 @@
#include <map>
#include <vector>
#include <set>
#pragma warning(push)
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wunused-parameter"
#include <boost/multiprecision/cpp_int.hpp>
#pragma warning(pop)
#pragma GCC diagnostic pop
#include "vector_ref.h"
#include "debugbreak.h"
@ -64,6 +69,7 @@ using u256 = boost::multiprecision::number<boost::multiprecision::cpp_int_backe
using s256 = boost::multiprecision::number<boost::multiprecision::cpp_int_backend<256, 256, boost::multiprecision::signed_magnitude, boost::multiprecision::unchecked, void>>;
using u160 = boost::multiprecision::number<boost::multiprecision::cpp_int_backend<160, 160, boost::multiprecision::unsigned_magnitude, boost::multiprecision::unchecked, void>>;
using s160 = boost::multiprecision::number<boost::multiprecision::cpp_int_backend<160, 160, boost::multiprecision::signed_magnitude, boost::multiprecision::unchecked, void>>;
using u512 = boost::multiprecision::number<boost::multiprecision::cpp_int_backend<512, 512, boost::multiprecision::unsigned_magnitude, boost::multiprecision::unchecked, void>>;
using u256s = std::vector<u256>;
using u160s = std::vector<u160>;
using u256Set = std::set<u256>;

46
libp2p/Host.cpp

@ -43,7 +43,7 @@ Host::Host(std::string const& _clientVersion, NetworkPreferences const& _n, bool
m_netPrefs(_n),
m_ifAddresses(Network::getInterfaceAddresses()),
m_ioService(2),
m_acceptorV4(m_ioService),
m_tcp4Acceptor(m_ioService),
m_key(KeyPair::create())
{
for (auto address: m_ifAddresses)
@ -95,9 +95,9 @@ void Host::doneWorking()
m_ioService.reset();
// shutdown acceptor
m_acceptorV4.cancel();
if (m_acceptorV4.is_open())
m_acceptorV4.close();
m_tcp4Acceptor.cancel();
if (m_tcp4Acceptor.is_open())
m_tcp4Acceptor.close();
// There maybe an incoming connection which started but hasn't finished.
// Wait for acceptor to end itself instead of assuming it's complete.
@ -272,15 +272,15 @@ void Host::determinePublic(string const& _publicAddress, bool _upnp)
// if user supplied address is a public address then we use it
// if user supplied address is private, and localnetworking is enabled, we use it
bi::address reqpublicaddr(bi::address(_publicAddress.empty() ? bi::address() : bi::address::from_string(_publicAddress)));
bi::tcp::endpoint reqpublic(reqpublicaddr, m_listenPort);
bool isprivate = isPrivateAddress(reqpublicaddr);
bool ispublic = !isprivate && !isLocalHostAddress(reqpublicaddr);
if (!reqpublicaddr.is_unspecified() && (ispublic || (isprivate && m_netPrefs.localNetworking)))
bi::address reqPublicAddr(bi::address(_publicAddress.empty() ? bi::address() : bi::address::from_string(_publicAddress)));
bi::tcp::endpoint reqPublic(reqPublicAddr, m_listenPort);
bool isprivate = isPrivateAddress(reqPublicAddr);
bool ispublic = !isprivate && !isLocalHostAddress(reqPublicAddr);
if (!reqPublicAddr.is_unspecified() && (ispublic || (isprivate && m_netPrefs.localNetworking)))
{
if (!m_peerAddresses.count(reqpublicaddr))
m_peerAddresses.insert(reqpublicaddr);
m_public = reqpublic;
if (!m_peerAddresses.count(reqPublicAddr))
m_peerAddresses.insert(reqPublicAddr);
m_tcpPublic = reqPublic;
return;
}
@ -288,7 +288,7 @@ void Host::determinePublic(string const& _publicAddress, bool _upnp)
for (auto addr: m_peerAddresses)
if (addr.is_v4() && !isPrivateAddress(addr))
{
m_public = bi::tcp::endpoint(*m_peerAddresses.begin(), m_listenPort);
m_tcpPublic = bi::tcp::endpoint(*m_peerAddresses.begin(), m_listenPort);
return;
}
@ -301,23 +301,23 @@ void Host::determinePublic(string const& _publicAddress, bool _upnp)
{
if (!m_peerAddresses.count(upnpep.address()))
m_peerAddresses.insert(upnpep.address());
m_public = upnpep;
m_tcpPublic = upnpep;
return;
}
}
// or if no address provided, use private ipv4 address if local networking is enabled
if (reqpublicaddr.is_unspecified())
if (reqPublicAddr.is_unspecified())
if (m_netPrefs.localNetworking)
for (auto addr: m_peerAddresses)
if (addr.is_v4() && isPrivateAddress(addr))
{
m_public = bi::tcp::endpoint(addr, m_listenPort);
m_tcpPublic = bi::tcp::endpoint(addr, m_listenPort);
return;
}
// otherwise address is unspecified
m_public = bi::tcp::endpoint(bi::address(), m_listenPort);
m_tcpPublic = bi::tcp::endpoint(bi::address(), m_listenPort);
}
void Host::runAcceptor()
@ -326,10 +326,10 @@ void Host::runAcceptor()
if (m_run && !m_accepting)
{
clog(NetConnect) << "Listening on local port " << m_listenPort << " (public: " << m_public << ")";
clog(NetConnect) << "Listening on local port " << m_listenPort << " (public: " << m_tcpPublic << ")";
m_accepting = true;
m_socket.reset(new bi::tcp::socket(m_ioService));
m_acceptorV4.async_accept(*m_socket, [=](boost::system::error_code ec)
m_tcp4Acceptor.async_accept(*m_socket, [=](boost::system::error_code ec)
{
bool success = false;
if (!ec)
@ -656,7 +656,7 @@ void Host::startedWorking()
}
// try to open acceptor (todo: ipv6)
m_listenPort = Network::listen4(m_acceptorV4, m_netPrefs.listenPort);
m_listenPort = Network::tcp4Listen(m_tcp4Acceptor, m_netPrefs.listenPort);
// start capability threads
for (auto const& h: m_capabilities)
@ -674,8 +674,8 @@ void Host::startedWorking()
// if m_public address is valid then add us to node list
// todo: abstract empty() and emplace logic
if (!m_public.address().is_unspecified() && (m_nodes.empty() || m_nodes[m_nodesList[0]]->id != id()))
noteNode(id(), m_public, Origin::Perfect, false);
if (!m_tcpPublic.address().is_unspecified() && (m_nodes.empty() || m_nodes[m_nodesList[0]]->id != id()))
noteNode(id(), m_tcpPublic, Origin::Perfect, false);
clog(NetNote) << "Id:" << id().abridged();
@ -739,7 +739,7 @@ void Host::restoreNodes(bytesConstRef _b)
{
auto oldId = id();
m_key = KeyPair(r[1].toHash<Secret>());
noteNode(id(), m_public, Origin::Perfect, false, oldId);
noteNode(id(), m_tcpPublic, Origin::Perfect, false, oldId);
for (auto i: r[2])
{

6
libp2p/Host.h

@ -152,7 +152,7 @@ public:
void pingAll();
/// Get the port we're listening on currently.
unsigned short listenPort() const { return m_public.port(); }
unsigned short listenPort() const { return m_tcpPublic.port(); }
/// Serialise the set of known peers.
bytes saveNodes() const;
@ -219,7 +219,7 @@ private:
int m_listenPort = -1; ///< What port are we listening on. -1 means binding failed or acceptor hasn't been initialized.
ba::io_service m_ioService; ///< IOService for network stuff.
bi::tcp::acceptor m_acceptorV4; ///< Listening acceptor.
bi::tcp::acceptor m_tcp4Acceptor; ///< Listening acceptor.
std::unique_ptr<bi::tcp::socket> m_socket; ///< Listening socket.
std::unique_ptr<boost::asio::deadline_timer> m_timer; ///< Timer which, when network is running, calls scheduler() every c_timerInterval ms.
@ -229,7 +229,7 @@ private:
std::set<Node*> m_pendingNodeConns; /// Used only by connect(Node&) to limit concurrently connecting to same node. See connect(shared_ptr<Node>const&).
Mutex x_pendingNodeConns;
bi::tcp::endpoint m_public; ///< Our public listening endpoint.
bi::tcp::endpoint m_tcpPublic; ///< Our public listening endpoint.
KeyPair m_key; ///< Our unique ID.
bool m_hadNewNodes = false;

2
libp2p/Network.cpp

@ -111,7 +111,7 @@ std::vector<bi::address> Network::getInterfaceAddresses()
return std::move(addresses);
}
int Network::listen4(bi::tcp::acceptor& _acceptor, unsigned short _listenPort)
int Network::tcp4Listen(bi::tcp::acceptor& _acceptor, unsigned short _listenPort)
{
int retport = -1;
for (unsigned i = 0; i < 2; ++i)

11
libp2p/Network.h

@ -22,13 +22,18 @@
#pragma once
#include <memory>
#include <vector>
#include <deque>
#include <array>
#include <libdevcore/RLP.h>
#include <libdevcore/Guards.h>
#include "Common.h"
namespace ba = boost::asio;
namespace bi = ba::ip;
namespace dev
{
namespace p2p
{
@ -53,8 +58,8 @@ public:
static std::vector<bi::address> getInterfaceAddresses();
/// Try to bind and listen on _listenPort, else attempt net-allocated port.
static int listen4(bi::tcp::acceptor& _acceptor, unsigned short _listenPort);
static int tcp4Listen(bi::tcp::acceptor& _acceptor, unsigned short _listenPort);
/// Return public endpoint of upnp interface. If successful o_upnpifaddr will be a private interface address and endpoint will contain public address and port.
static bi::tcp::endpoint traverseNAT(std::vector<bi::address> const& _ifAddresses, unsigned short _listenPort, bi::address& o_upnpifaddr);
};

467
libp2p/NodeTable.cpp

@ -0,0 +1,467 @@
/*
This file is part of cpp-ethereum.
cpp-ethereum is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
cpp-ethereum is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with cpp-ethereum. If not, see <http://www.gnu.org/licenses/>.
*/
/** @file NodeTable.cpp
* @author Alex Leverington <nessence@gmail.com>
* @date 2014
*/
#include "NodeTable.h"
using namespace std;
using namespace dev;
using namespace dev::p2p;
NodeTable::NodeTable(ba::io_service& _io, KeyPair _alias, uint16_t _listenPort):
m_node(Node(_alias.pub(), bi::udp::endpoint())),
m_secret(_alias.sec()),
m_socket(new NodeSocket(_io, *this, _listenPort)),
m_socketPtr(m_socket.get()),
m_io(_io),
m_bucketRefreshTimer(m_io),
m_evictionCheckTimer(m_io)
{
for (unsigned i = 0; i < s_bins; i++)
{
m_state[i].distance = i;
m_state[i].modified = chrono::steady_clock::now() - chrono::seconds(1);
}
m_socketPtr->connect();
doRefreshBuckets(boost::system::error_code());
}
NodeTable::~NodeTable()
{
m_evictionCheckTimer.cancel();
m_bucketRefreshTimer.cancel();
m_socketPtr->disconnect();
}
void NodeTable::join()
{
doFindNode(m_node.id);
}
list<NodeId> NodeTable::nodes() const
{
list<NodeId> nodes;
Guard l(x_nodes);
for (auto& i: m_nodes)
nodes.push_back(i.second->id);
return move(nodes);
}
list<NodeTable::NodeEntry> NodeTable::state() const
{
list<NodeEntry> ret;
Guard l(x_state);
for (auto s: m_state)
for (auto n: s.nodes)
ret.push_back(*n.lock());
return move(ret);
}
NodeTable::NodeEntry NodeTable::operator[](NodeId _id)
{
Guard l(x_nodes);
return *m_nodes[_id];
}
void NodeTable::requestNeighbours(NodeEntry const& _node, NodeId _target) const
{
FindNode p(_node.endpoint.udp, _target);
p.sign(m_secret);
m_socketPtr->send(p);
}
void NodeTable::doFindNode(NodeId _node, unsigned _round, shared_ptr<set<shared_ptr<NodeEntry>>> _tried)
{
if (!m_socketPtr->isOpen() || _round == s_maxSteps)
return;
if (_round == s_maxSteps)
{
clog(NodeTableNote) << "Terminating doFindNode after " << _round << " rounds.";
return;
}
else if(!_round && !_tried)
// initialized _tried on first round
_tried.reset(new set<shared_ptr<NodeEntry>>());
auto nearest = findNearest(_node);
list<shared_ptr<NodeEntry>> tried;
for (unsigned i = 0; i < nearest.size() && tried.size() < s_alpha; i++)
if (!_tried->count(nearest[i]))
{
auto r = nearest[i];
tried.push_back(r);
FindNode p(r->endpoint.udp, _node);
p.sign(m_secret);
m_socketPtr->send(p);
}
if (tried.empty())
{
clog(NodeTableNote) << "Terminating doFindNode after " << _round << " rounds.";
return;
}
while (!tried.empty())
{
_tried->insert(tried.front());
tried.pop_front();
}
auto self(shared_from_this());
m_evictionCheckTimer.expires_from_now(boost::posix_time::milliseconds(c_reqTimeout.count()));
m_evictionCheckTimer.async_wait([this, self, _node, _round, _tried](boost::system::error_code const& _ec)
{
if (_ec)
return;
doFindNode(_node, _round + 1, _tried);
});
}
vector<shared_ptr<NodeTable::NodeEntry>> NodeTable::findNearest(NodeId _target)
{
// send s_alpha FindNode packets to nodes we know, closest to target
static unsigned lastBin = s_bins - 1;
unsigned head = dist(m_node.id, _target);
unsigned tail = head == 0 ? lastBin : (head - 1) % s_bins;
map<unsigned, list<shared_ptr<NodeEntry>>> found;
unsigned count = 0;
// if d is 0, then we roll look forward, if last, we reverse, else, spread from d
if (head > 1 && tail != lastBin)
while (head != tail && head < s_bins && count < s_bucketSize)
{
Guard l(x_state);
for (auto n: m_state[head].nodes)
if (auto p = n.lock())
{
if (count < s_bucketSize)
found[dist(_target, p->id)].push_back(p);
else
break;
}
if (count < s_bucketSize && tail)
for (auto n: m_state[tail].nodes)
if (auto p = n.lock())
{
if (count < s_bucketSize)
found[dist(_target, p->id)].push_back(p);
else
break;
}
head++;
if (tail)
tail--;
}
else if (head < 2)
while (head < s_bins && count < s_bucketSize)
{
Guard l(x_state);
for (auto n: m_state[head].nodes)
if (auto p = n.lock())
{
if (count < s_bucketSize)
found[dist(_target, p->id)].push_back(p);
else
break;
}
head++;
}
else
while (tail > 0 && count < s_bucketSize)
{
Guard l(x_state);
for (auto n: m_state[tail].nodes)
if (auto p = n.lock())
{
if (count < s_bucketSize)
found[dist(_target, p->id)].push_back(p);
else
break;
}
tail--;
}
vector<shared_ptr<NodeEntry>> ret;
for (auto& nodes: found)
for (auto n: nodes.second)
ret.push_back(n);
return move(ret);
}
void NodeTable::ping(bi::udp::endpoint _to) const
{
PingNode p(_to, m_node.endpoint.udp.address().to_string(), m_node.endpoint.udp.port());
p.sign(m_secret);
m_socketPtr->send(p);
}
void NodeTable::ping(NodeEntry* _n) const
{
if (_n)
ping(_n->endpoint.udp);
}
void NodeTable::evict(shared_ptr<NodeEntry> _leastSeen, shared_ptr<NodeEntry> _new)
{
if (!m_socketPtr->isOpen())
return;
Guard l(x_evictions);
m_evictions.push_back(EvictionTimeout(make_pair(_leastSeen->id,chrono::steady_clock::now()), _new->id));
if (m_evictions.size() == 1)
doCheckEvictions(boost::system::error_code());
m_evictions.push_back(EvictionTimeout(make_pair(_leastSeen->id,chrono::steady_clock::now()), _new->id));
ping(_leastSeen.get());
}
void NodeTable::noteNode(Public const& _pubk, bi::udp::endpoint const& _endpoint)
{
// Don't add ourself
if (_pubk == m_node.address())
return;
shared_ptr<NodeEntry> node;
{
Guard l(x_nodes);
auto n = m_nodes.find(_pubk);
if (n == m_nodes.end())
{
node.reset(new NodeEntry(m_node, _pubk, _endpoint));
m_nodes[_pubk] = node;
// clog(NodeTableMessageSummary) << "Adding node to cache: " << _pubk;
}
else
{
node = n->second;
// clog(NodeTableMessageSummary) << "Found node in cache: " << _pubk;
}
}
// todo: why is this necessary?
if (!!node)
noteNode(node);
}
void NodeTable::noteNode(shared_ptr<NodeEntry> _n)
{
shared_ptr<NodeEntry> contested;
{
NodeBucket& s = bucket(_n.get());
Guard l(x_state);
s.nodes.remove_if([&_n](weak_ptr<NodeEntry> n)
{
if (n.lock() == _n)
return true;
return false;
});
if (s.nodes.size() >= s_bucketSize)
{
contested = s.nodes.front().lock();
if (!contested)
{
s.nodes.pop_front();
s.nodes.push_back(_n);
}
}
else
s.nodes.push_back(_n);
}
if (contested)
evict(contested, _n);
}
void NodeTable::dropNode(shared_ptr<NodeEntry> _n)
{
NodeBucket &s = bucket(_n.get());
{
Guard l(x_state);
s.nodes.remove_if([&_n](weak_ptr<NodeEntry> n) { return n.lock() == _n; });
}
Guard l(x_nodes);
m_nodes.erase(_n->id);
}
NodeTable::NodeBucket& NodeTable::bucket(NodeEntry const* _n)
{
return m_state[_n->distance - 1];
}
void NodeTable::onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytesConstRef _packet)
{
// h256 + Signature + RLP (smallest possible packet is empty neighbours packet which is 3 bytes)
if (_packet.size() < h256::size + Signature::size + 3)
{
clog(NodeTableMessageSummary) << "Invalid Message size from " << _from.address().to_string() << ":" << _from.port();
return;
}
bytesConstRef signedBytes(_packet.cropped(h256::size, _packet.size() - h256::size));
h256 hashSigned(sha3(signedBytes));
if (!_packet.cropped(0, h256::size).contentsEqual(hashSigned.asBytes()))
{
clog(NodeTableMessageSummary) << "Invalid Message hash from " << _from.address().to_string() << ":" << _from.port();
return;
}
bytesConstRef rlpBytes(signedBytes.cropped(Signature::size, signedBytes.size() - Signature::size));
RLP rlp(rlpBytes);
unsigned itemCount = rlp.itemCount();
bytesConstRef sigBytes(_packet.cropped(h256::size, Signature::size));
Public nodeid(dev::recover(*(Signature const*)sigBytes.data(), sha3(rlpBytes)));
if (!nodeid)
{
clog(NodeTableMessageSummary) << "Invalid Message signature from " << _from.address().to_string() << ":" << _from.port();
return;
}
noteNode(nodeid, _from);
try {
switch (itemCount)
{
case 1:
{
// clog(NodeTableMessageSummary) << "Received Pong from " << _from.address().to_string() << ":" << _from.port();
Pong in = Pong::fromBytesConstRef(_from, rlpBytes);
// whenever a pong is received, first check if it's in m_evictions
break;
}
case 2:
if (rlp[0].isList())
{
Neighbours in = Neighbours::fromBytesConstRef(_from, rlpBytes);
// clog(NodeTableMessageSummary) << "Received " << in.nodes.size() << " Neighbours from " << _from.address().to_string() << ":" << _from.port();
for (auto n: in.nodes)
noteNode(n.node, bi::udp::endpoint(bi::address::from_string(n.ipAddress), n.port));
}
else
{
// clog(NodeTableMessageSummary) << "Received FindNode from " << _from.address().to_string() << ":" << _from.port();
FindNode in = FindNode::fromBytesConstRef(_from, rlpBytes);
vector<shared_ptr<NodeTable::NodeEntry>> nearest = findNearest(in.target);
static unsigned const nlimit = (m_socketPtr->maxDatagramSize - 11) / 86;
for (unsigned offset = 0; offset < nearest.size(); offset += nlimit)
{
Neighbours out(_from, nearest, offset, nlimit);
out.sign(m_secret);
m_socketPtr->send(out);
}
}
break;
case 3:
{
// clog(NodeTableMessageSummary) << "Received PingNode from " << _from.address().to_string() << ":" << _from.port();
PingNode in = PingNode::fromBytesConstRef(_from, rlpBytes);
Pong p(_from);
p.replyTo = sha3(rlpBytes);
p.sign(m_secret);
m_socketPtr->send(p);
break;
}
default:
clog(NodeTableMessageSummary) << "Invalid Message received from " << _from.address().to_string() << ":" << _from.port();
return;
}
}
catch (...)
{
clog(NodeTableWarn) << "Exception processing message from " << _from.address().to_string() << ":" << _from.port();
}
}
void NodeTable::doCheckEvictions(boost::system::error_code const& _ec)
{
if (_ec || !m_socketPtr->isOpen())
return;
auto self(shared_from_this());
m_evictionCheckTimer.expires_from_now(c_evictionCheckInterval);
m_evictionCheckTimer.async_wait([this, self](boost::system::error_code const& _ec)
{
if (_ec)
return;
bool evictionsRemain = false;
list<shared_ptr<NodeEntry>> drop;
{
Guard le(x_evictions);
Guard ln(x_nodes);
for (auto& e: m_evictions)
if (chrono::steady_clock::now() - e.first.second > c_reqTimeout)
if (auto n = m_nodes[e.second])
drop.push_back(n);
evictionsRemain = m_evictions.size() - drop.size() > 0;
}
drop.unique();
for (auto n: drop)
dropNode(n);
if (evictionsRemain)
doCheckEvictions(boost::system::error_code());
});
}
void NodeTable::doRefreshBuckets(boost::system::error_code const& _ec)
{
if (_ec)
return;
clog(NodeTableNote) << "refreshing buckets";
bool connected = m_socketPtr->isOpen();
bool refreshed = false;
if (connected)
{
Guard l(x_state);
for (auto& d: m_state)
if (chrono::steady_clock::now() - d.modified > c_bucketRefresh)
while (!d.nodes.empty())
{
auto n = d.nodes.front();
if (auto p = n.lock())
{
refreshed = true;
ping(p.get());
break;
}
d.nodes.pop_front();
}
}
unsigned nextRefresh = connected ? (refreshed ? 200 : c_bucketRefresh.count()*1000) : 10000;
auto runcb = [this](boost::system::error_code const& error) -> void { doRefreshBuckets(error); };
m_bucketRefreshTimer.expires_from_now(boost::posix_time::milliseconds(nextRefresh));
m_bucketRefreshTimer.async_wait(runcb);
}

349
libp2p/NodeTable.h

@ -0,0 +1,349 @@
/*
This file is part of cpp-ethereum.
cpp-ethereum is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
cpp-ethereum is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with cpp-ethereum. If not, see <http://www.gnu.org/licenses/>.
*/
/** @file NodeTable.h
* @author Alex Leverington <nessence@gmail.com>
* @date 2014
*/
#pragma once
#include <algorithm>
#include <boost/integer/static_log2.hpp>
#include <libdevcrypto/Common.h>
#include <libp2p/UDP.h>
namespace dev
{
namespace p2p
{
/**
* NodeTable using S/Kademlia system for node discovery and preference.
* untouched buckets are refreshed if they have not been touched within an hour
*
* Thread-safety is ensured by modifying NodeEntry details via
* shared_ptr replacement instead of mutating values.
*
* [Integration]
* @todo deadline-timer which maintains tcp/peer connections
* @todo restore nodes: affects refreshbuckets
* @todo TCP endpoints
* @todo makeRequired: don't try to evict node if node isRequired.
* @todo makeRequired: exclude bucket from refresh if we have node as peer.
*
* [Optimization]
* @todo encapsulate doFindNode into NetworkAlgorithm (task)
* @todo Pong to include ip:port where ping was received
* @todo expiration and sha3(id) 'to' for messages which are replies (prevents replay)
* @todo std::shared_ptr<PingNode> m_cachedPingPacket;
* @todo std::shared_ptr<FindNeighbours> m_cachedFindSelfPacket;
* @todo store root node in table?
*
* [Networking]
* @todo TCP endpoints
* @todo eth/upnp/natpmp/stun/ice/etc for public-discovery
* @todo firewall
*
* [Protocol]
* @todo post-eviction pong
* @todo optimize knowledge at opposite edges; eg, s_bitsPerStep lookups. (Can be done via pointers to NodeBucket)
* @todo ^ s_bitsPerStep = 5; // Denoted by b in [Kademlia]. Bits by which address space is divided.
* @todo optimize (use tree for state and/or custom compare for cache)
* @todo reputation (aka universal siblings lists)
*/
class NodeTable: UDPSocketEvents, public std::enable_shared_from_this<NodeTable>
{
friend struct Neighbours;
using NodeSocket = UDPSocket<NodeTable, 1280>;
using TimePoint = std::chrono::steady_clock::time_point;
using EvictionTimeout = std::pair<std::pair<NodeId,TimePoint>,NodeId>; ///< First NodeId may be evicted and replaced with second NodeId.
struct NodeDefaultEndpoint
{
NodeDefaultEndpoint(bi::udp::endpoint _udp): udp(_udp) {}
bi::udp::endpoint udp;
};
struct Node
{
Node(Public _pubk, NodeDefaultEndpoint _udp): id(_pubk), endpoint(_udp) {}
Node(Public _pubk, bi::udp::endpoint _udp): Node(_pubk, NodeDefaultEndpoint(_udp)) {}
virtual NodeId const& address() const { return id; }
virtual Public const& publicKey() const { return id; }
NodeId id;
NodeDefaultEndpoint endpoint;
};
/**
* NodeEntry
* @todo Type of id will become template parameter.
*/
struct NodeEntry: public Node
{
NodeEntry(Node _src, Public _pubk, NodeDefaultEndpoint _gw): Node(_pubk, _gw), distance(dist(_src.id,_pubk)) {}
NodeEntry(Node _src, Public _pubk, bi::udp::endpoint _udp): Node(_pubk, NodeDefaultEndpoint(_udp)), distance(dist(_src.id,_pubk)) {}
const unsigned distance; ///< Node's distance from _src (see constructor).
};
struct NodeBucket
{
unsigned distance;
TimePoint modified;
std::list<std::weak_ptr<NodeEntry>> nodes;
};
public:
NodeTable(ba::io_service& _io, KeyPair _alias, uint16_t _port = 30300);
~NodeTable();
/// Constants for Kademlia, mostly derived from address space.
static unsigned const s_addressByteSize = sizeof(NodeId); ///< Size of address type in bytes.
static unsigned const s_bits = 8 * s_addressByteSize; ///< Denoted by n in [Kademlia].
static unsigned const s_bins = s_bits - 1; ///< Size of m_state (excludes root, which is us).
static unsigned const s_maxSteps = boost::static_log2<s_bits>::value; ///< Max iterations of discovery. (doFindNode)
/// Chosen constants
static unsigned const s_bucketSize = 16; ///< Denoted by k in [Kademlia]. Number of nodes stored in each bucket.
static unsigned const s_alpha = 3; ///< Denoted by \alpha in [Kademlia]. Number of concurrent FindNode requests.
/// Intervals
boost::posix_time::milliseconds const c_evictionCheckInterval = boost::posix_time::milliseconds(75); ///< Interval at which eviction timeouts are checked.
std::chrono::milliseconds const c_reqTimeout = std::chrono::milliseconds(300); ///< How long to wait for requests (evict, find iterations).
std::chrono::seconds const c_bucketRefresh = std::chrono::seconds(3600); ///< Refresh interval prevents bucket from becoming stale. [Kademlia]
static unsigned dist(NodeId const& _a, NodeId const& _b) { u512 d = _a ^ _b; unsigned ret; for (ret = 0; d >>= 1; ++ret) {}; return ret; }
void join();
NodeEntry root() const { return NodeEntry(m_node, m_node.publicKey(), m_node.endpoint.udp); }
std::list<NodeId> nodes() const;
std::list<NodeEntry> state() const;
NodeEntry operator[](NodeId _id);
protected:
/// Repeatedly sends s_alpha concurrent requests to nodes nearest to target, for nodes nearest to target, up to s_maxSteps rounds.
void doFindNode(NodeId _node, unsigned _round = 0, std::shared_ptr<std::set<std::shared_ptr<NodeEntry>>> _tried = std::shared_ptr<std::set<std::shared_ptr<NodeEntry>>>());
/// Returns nodes nearest to target.
std::vector<std::shared_ptr<NodeEntry>> findNearest(NodeId _target);
void ping(bi::udp::endpoint _to) const;
void ping(NodeEntry* _n) const;
void evict(std::shared_ptr<NodeEntry> _leastSeen, std::shared_ptr<NodeEntry> _new);
void noteNode(Public const& _pubk, bi::udp::endpoint const& _endpoint);
void noteNode(std::shared_ptr<NodeEntry> _n);
void dropNode(std::shared_ptr<NodeEntry> _n);
NodeBucket& bucket(NodeEntry const* _n);
/// General Network Events
void onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytesConstRef _packet);
void onDisconnected(UDPSocketFace*) {};
/// Tasks
void doCheckEvictions(boost::system::error_code const& _ec);
void doRefreshBuckets(boost::system::error_code const& _ec);
#ifndef BOOST_AUTO_TEST_SUITE
private:
#else
protected:
#endif
/// Sends FindNeighbor packet. See doFindNode.
void requestNeighbours(NodeEntry const& _node, NodeId _target) const;
Node m_node; ///< This node.
Secret m_secret; ///< This nodes secret key.
mutable Mutex x_nodes; ///< Mutable for thread-safe copy in nodes() const.
std::map<NodeId, std::shared_ptr<NodeEntry>> m_nodes; ///< NodeId -> Node table (most common lookup path)
mutable Mutex x_state;
std::array<NodeBucket, s_bins> m_state; ///< State table of binned nodes.
Mutex x_evictions;
std::deque<EvictionTimeout> m_evictions; ///< Eviction timeouts.
std::shared_ptr<NodeSocket> m_socket; ///< Shared pointer for our UDPSocket; ASIO requires shared_ptr.
NodeSocket* m_socketPtr; ///< Set to m_socket.get().
ba::io_service& m_io; ///< Used by bucket refresh timer.
boost::asio::deadline_timer m_bucketRefreshTimer; ///< Timer which schedules and enacts bucket refresh.
boost::asio::deadline_timer m_evictionCheckTimer; ///< Timer for handling node evictions.
};
inline std::ostream& operator<<(std::ostream& _out, NodeTable const& _nodeTable)
{
_out << _nodeTable.root().address() << "\t" << "0\t" << _nodeTable.root().endpoint.udp.address() << ":" << _nodeTable.root().endpoint.udp.port() << std::endl;
auto s = _nodeTable.state();
for (auto n: s)
_out << n.address() << "\t" << n.distance << "\t" << n.endpoint.udp.address() << ":" << n.endpoint.udp.port() << std::endl;
return _out;
}
/**
* Ping packet: Check if node is alive.
* PingNode is cached and regenerated after expiration - t, where t is timeout.
*
* RLP Encoded Items: 3
* Minimum Encoded Size: 18 bytes
* Maximum Encoded Size: bytes // todo after u128 addresses
*
* signature: Signature of message.
* ipAddress: Our IP address.
* port: Our port.
* expiration: Triggers regeneration of packet. May also provide control over synchronization.
*
* Ping is used to implement evict. When a new node is seen for
* a given bucket which is full, the least-responsive node is pinged.
* If the pinged node doesn't respond then it is removed and the new
* node is inserted.
*
* @todo uint128_t for ip address (<->integer ipv4/6, asio-address, asio-endpoint)
*
*/
struct PingNode: RLPXDatagram<PingNode>
{
PingNode(bi::udp::endpoint _ep): RLPXDatagram<PingNode>(_ep) {}
PingNode(bi::udp::endpoint _ep, std::string _src, uint16_t _srcPort, std::chrono::seconds _expiration = std::chrono::seconds(60)): RLPXDatagram<PingNode>(_ep), ipAddress(_src), port(_srcPort), expiration(futureFromEpoch(_expiration)) {}
std::string ipAddress;
unsigned port;
unsigned expiration;
void streamRLP(RLPStream& _s) const { _s.appendList(3); _s << ipAddress << port << expiration; }
void interpretRLP(bytesConstRef _bytes) { RLP r(_bytes); ipAddress = r[0].toString(); port = r[1].toInt<unsigned>(); expiration = r[2].toInt<unsigned>(); }
};
/**
* Pong packet: response to ping
*
* RLP Encoded Items: 1
* Minimum Encoded Size: 33 bytes
* Maximum Encoded Size: 33 bytes
*
* @todo expiration
* @todo value of replyTo
* @todo create from PingNode (reqs RLPXDatagram verify flag)
*/
struct Pong: RLPXDatagram<Pong>
{
Pong(bi::udp::endpoint _ep): RLPXDatagram<Pong>(_ep) {}
h256 replyTo; // hash of rlp of PingNode
unsigned expiration;
void streamRLP(RLPStream& _s) const { _s.appendList(1); _s << replyTo; }
void interpretRLP(bytesConstRef _bytes) { RLP r(_bytes); replyTo = (h256)r[0]; }
};
/**
* FindNode Packet: Request k-nodes, closest to the target.
* FindNode is cached and regenerated after expiration - t, where t is timeout.
* FindNode implicitly results in finding neighbours of a given node.
*
* RLP Encoded Items: 2
* Minimum Encoded Size: 21 bytes
* Maximum Encoded Size: 30 bytes
*
* target: NodeId of node. The responding node will send back nodes closest to the target.
* expiration: Triggers regeneration of packet. May also provide control over synchronization.
*
*/
struct FindNode: RLPXDatagram<FindNode>
{
FindNode(bi::udp::endpoint _ep): RLPXDatagram<FindNode>(_ep) {}
FindNode(bi::udp::endpoint _ep, NodeId _target, std::chrono::seconds _expiration = std::chrono::seconds(30)): RLPXDatagram<FindNode>(_ep), target(_target), expiration(futureFromEpoch(_expiration)) {}
h512 target;
unsigned expiration;
void streamRLP(RLPStream& _s) const { _s.appendList(2); _s << target << expiration; }
void interpretRLP(bytesConstRef _bytes) { RLP r(_bytes); target = r[0].toHash<h512>(); expiration = r[1].toInt<unsigned>(); }
};
/**
* Node Packet: Multiple node packets are sent in response to FindNode.
*
* RLP Encoded Items: 2 (first item is list)
* Minimum Encoded Size: 10 bytes
*
* @todo nonce: Should be replaced with expiration.
*/
struct Neighbours: RLPXDatagram<Neighbours>
{
struct Node
{
Node() = default;
Node(RLP const& _r) { interpretRLP(_r); }
std::string ipAddress;
unsigned port;
NodeId node;
void streamRLP(RLPStream& _s) const { _s.appendList(3); _s << ipAddress << port << node; }
void interpretRLP(RLP const& _r) { ipAddress = _r[0].toString(); port = _r[1].toInt<unsigned>(); node = h512(_r[2].toBytes()); }
};
Neighbours(bi::udp::endpoint _ep): RLPXDatagram<Neighbours>(_ep) {}
Neighbours(bi::udp::endpoint _to, std::vector<std::shared_ptr<NodeTable::NodeEntry>> const& _nearest, unsigned _offset = 0, unsigned _limit = 0): RLPXDatagram<Neighbours>(_to)
{
auto limit = _limit ? std::min(_nearest.size(), (size_t)(_offset + _limit)) : _nearest.size();
for (auto i = _offset; i < limit; i++)
{
Node node;
node.ipAddress = _nearest[i]->endpoint.udp.address().to_string();
node.port = _nearest[i]->endpoint.udp.port();
node.node = _nearest[i]->publicKey();
nodes.push_back(node);
}
}
std::list<Node> nodes;
unsigned expiration = 1;
void streamRLP(RLPStream& _s) const { _s.appendList(2); _s.appendList(nodes.size()); for (auto& n: nodes) n.streamRLP(_s); _s << expiration; }
void interpretRLP(bytesConstRef _bytes) { RLP r(_bytes); for (auto n: r[0]) nodes.push_back(Node(n)); expiration = r[1].toInt<unsigned>(); }
};
struct NodeTableWarn: public LogChannel { static const char* name() { return "!P!"; } static const int verbosity = 0; };
struct NodeTableNote: public LogChannel { static const char* name() { return "*P*"; } static const int verbosity = 1; };
struct NodeTableMessageSummary: public LogChannel { static const char* name() { return "-P-"; } static const int verbosity = 2; };
struct NodeTableConnect: public LogChannel { static const char* name() { return "+P+"; } static const int verbosity = 10; };
struct NodeTableMessageDetail: public LogChannel { static const char* name() { return "=P="; } static const int verbosity = 5; };
struct NodeTableTriviaSummary: public LogChannel { static const char* name() { return "-P-"; } static const int verbosity = 10; };
struct NodeTableTriviaDetail: public LogChannel { static const char* name() { return "=P="; } static const int verbosity = 11; };
struct NodeTableAllDetail: public LogChannel { static const char* name() { return "=P="; } static const int verbosity = 13; };
struct NodeTableEgress: public LogChannel { static const char* name() { return ">>P"; } static const int verbosity = 14; };
struct NodeTableIngress: public LogChannel { static const char* name() { return "<<P"; } static const int verbosity = 15; };
}
}

2
libp2p/Session.cpp

@ -522,7 +522,7 @@ void Session::start()
<< m_server->protocolVersion()
<< m_server->m_clientVersion
<< m_server->caps()
<< m_server->m_public.port()
<< m_server->m_tcpPublic.port()
<< m_server->id();
sealAndSend(s);
ping();

4
libp2p/Session.h

@ -107,8 +107,8 @@ private:
mutable bi::tcp::socket m_socket; ///< Socket for the peer's connection. Mutable to ask for native_handle().
Mutex x_writeQueue; ///< Mutex for the write queue.
std::deque<bytes> m_writeQueue; ///< The write queue.
std::array<byte, 65536> m_data; ///< Data buffer for the write queue.
bytes m_incoming; ///< The incoming read queue of bytes.
std::array<byte, 65536> m_data; ///< Buffer for ingress packet data.
bytes m_incoming; ///< Read buffer for ingress bytes.
PeerInfo m_info; ///< Dynamic information about this peer.

54
libp2p/UDP.cpp

@ -0,0 +1,54 @@
/*
This file is part of cpp-ethereum.
cpp-ethereum is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
cpp-ethereum is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with cpp-ethereum. If not, see <http://www.gnu.org/licenses/>.
*/
/** @file UDP.cpp
* @author Alex Leverington <nessence@gmail.com>
* @date 2014
*/
#include "UDP.h"
using namespace dev;
using namespace dev::p2p;
h256 RLPXDatagramFace::sign(Secret const& _k)
{
RLPStream rlpstream;
streamRLP(rlpstream);
bytes rlpBytes(rlpstream.out());
bytesConstRef rlp(&rlpBytes);
h256 hash(dev::sha3(rlp));
Signature sig = dev::sign(_k, hash);
data.resize(h256::size + Signature::size + rlp.size());
bytesConstRef packetHash(&data[0], h256::size);
bytesConstRef signedPayload(&data[h256::size], Signature::size + rlp.size());
bytesConstRef payloadSig(&data[h256::size], Signature::size);
bytesConstRef payload(&data[h256::size + Signature::size], rlp.size());
sig.ref().copyTo(payloadSig);
rlp.copyTo(payload);
dev::sha3(signedPayload).ref().copyTo(packetHash);
return std::move(hash);
};
Public RLPXDatagramFace::authenticate(bytesConstRef _sig, bytesConstRef _rlp)
{
Signature const& sig = *(Signature const*)_sig.data();
return std::move(dev::recover(sig, sha3(_rlp)));
};

265
libp2p/UDP.h

@ -0,0 +1,265 @@
/*
This file is part of cpp-ethereum.
cpp-ethereum is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
cpp-ethereum is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with cpp-ethereum. If not, see <http://www.gnu.org/licenses/>.
*/
/** @file UDP.h
* @author Alex Leverington <nessence@gmail.com>
* @date 2014
*/
#pragma once
#include <atomic>
#include <memory>
#include <vector>
#include <deque>
#include <array>
#include <libdevcore/Guards.h>
#include <libdevcrypto/Common.h>
#include <libdevcrypto/SHA3.h>
#include <libdevcore/RLP.h>
#include "Common.h"
namespace ba = boost::asio;
namespace bi = ba::ip;
namespace dev
{
namespace p2p
{
/**
* UDP Datagram
* @todo make data protected/functional
*/
class UDPDatagram
{
public:
UDPDatagram(bi::udp::endpoint const& _ep): locus(_ep) {}
UDPDatagram(bi::udp::endpoint const& _ep, bytes _data): data(_data), locus(_ep) {}
bi::udp::endpoint const& endpoint() const { return locus; }
bytes data;
protected:
bi::udp::endpoint locus;
};
/**
* @brief RLPX Datagram which can be signed.
* @todo compact templates
* @todo make data private/functional (see UDPDatagram)
*/
struct RLPXDatagramFace: public UDPDatagram
{
static uint64_t futureFromEpoch(std::chrono::milliseconds _ms) { return std::chrono::duration_cast<std::chrono::milliseconds>((std::chrono::system_clock::now() + _ms).time_since_epoch()).count(); }
static uint64_t futureFromEpoch(std::chrono::seconds _sec) { return std::chrono::duration_cast<std::chrono::milliseconds>((std::chrono::system_clock::now() + _sec).time_since_epoch()).count(); }
static Public authenticate(bytesConstRef _sig, bytesConstRef _rlp);
RLPXDatagramFace(bi::udp::endpoint const& _ep): UDPDatagram(_ep) {}
virtual h256 sign(Secret const& _from);
virtual void streamRLP(RLPStream&) const =0;
virtual void interpretRLP(bytesConstRef _bytes) =0;
};
template <class T>
struct RLPXDatagram: public RLPXDatagramFace
{
RLPXDatagram(bi::udp::endpoint const& _ep): RLPXDatagramFace(_ep) {}
static T fromBytesConstRef(bi::udp::endpoint const& _ep, bytesConstRef _bytes) { T t(_ep); t.interpretRLP(_bytes); return std::move(t); }
};
/**
* @brief Interface which UDPSocket will implement.
*/
struct UDPSocketFace
{
virtual bool send(UDPDatagram const& _msg) = 0;
virtual void disconnect() = 0;
};
/**
* @brief Interface which a UDPSocket's owner must implement.
*/
struct UDPSocketEvents
{
virtual void onDisconnected(UDPSocketFace*) {};
virtual void onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytesConstRef _packetData) = 0;
};
/**
* @brief UDP Interface
* Handler must implement UDPSocketEvents.
*
* @todo multiple endpoints (we cannot advertise 0.0.0.0)
* @todo decouple deque from UDPDatagram and add ref() to datagram for fire&forget
*/
template <typename Handler, unsigned MaxDatagramSize>
class UDPSocket: UDPSocketFace, public std::enable_shared_from_this<UDPSocket<Handler, MaxDatagramSize>>
{
public:
enum { maxDatagramSize = MaxDatagramSize };
static_assert(maxDatagramSize < 65507, "UDP datagrams cannot be larger than 65507 bytes");
UDPSocket(ba::io_service& _io, UDPSocketEvents& _host, unsigned _port): m_host(_host), m_endpoint(bi::udp::v4(), _port), m_socket(_io) { m_started.store(false); m_closed.store(true); };
virtual ~UDPSocket() { disconnect(); }
/// Socket will begin listening for and delivering packets
void connect();
/// Send datagram.
bool send(UDPDatagram const& _datagram);
/// Returns if socket is open.
bool isOpen() { return !m_closed; }
/// Disconnect socket.
void disconnect() { disconnectWithError(boost::asio::error::connection_reset); }
protected:
void doRead();
void doWrite();
void disconnectWithError(boost::system::error_code _ec);
std::atomic<bool> m_started; ///< Atomically ensure connection is started once. Start cannot occur unless m_started is false. Managed by start and disconnectWithError.
std::atomic<bool> m_closed; ///< Connection availability.
UDPSocketEvents& m_host; ///< Interface which owns this socket.
bi::udp::endpoint m_endpoint; ///< Endpoint which we listen to.
Mutex x_sendQ;
std::deque<UDPDatagram> m_sendQ; ///< Queue for egress data.
std::array<byte, maxDatagramSize> m_recvData; ///< Buffer for ingress data.
bi::udp::endpoint m_recvEndpoint; ///< Endpoint data was received from.
bi::udp::socket m_socket; ///< Boost asio udp socket.
Mutex x_socketError; ///< Mutex for error which can be set from host or IO thread.
boost::system::error_code m_socketError; ///< Set when shut down due to error.
};
template <typename Handler, unsigned MaxDatagramSize>
void UDPSocket<Handler, MaxDatagramSize>::connect()
{
bool expect = false;
if (!m_started.compare_exchange_strong(expect, true))
return;
m_socket.open(bi::udp::v4());
m_socket.bind(m_endpoint);
// clear write queue so reconnect doesn't send stale messages
Guard l(x_sendQ);
m_sendQ.clear();
m_closed = false;
doRead();
}
template <typename Handler, unsigned MaxDatagramSize>
bool UDPSocket<Handler, MaxDatagramSize>::send(UDPDatagram const& _datagram)
{
if (m_closed)
return false;
Guard l(x_sendQ);
m_sendQ.push_back(_datagram);
if (m_sendQ.size() == 1)
doWrite();
return true;
}
template <typename Handler, unsigned MaxDatagramSize>
void UDPSocket<Handler, MaxDatagramSize>::doRead()
{
if (m_closed)
return;
auto self(UDPSocket<Handler, MaxDatagramSize>::shared_from_this());
m_socket.async_receive_from(boost::asio::buffer(m_recvData), m_recvEndpoint, [this, self](boost::system::error_code _ec, size_t _len)
{
if (_ec)
return disconnectWithError(_ec);
assert(_len);
m_host.onReceived(this, m_recvEndpoint, bytesConstRef(m_recvData.data(), _len));
doRead();
});
}
template <typename Handler, unsigned MaxDatagramSize>
void UDPSocket<Handler, MaxDatagramSize>::doWrite()
{
if (m_closed)
return;
const UDPDatagram& datagram = m_sendQ[0];
auto self(UDPSocket<Handler, MaxDatagramSize>::shared_from_this());
m_socket.async_send_to(boost::asio::buffer(datagram.data), datagram.endpoint(), [this, self](boost::system::error_code _ec, std::size_t)
{
if (_ec)
return disconnectWithError(_ec);
else
{
Guard l(x_sendQ);
m_sendQ.pop_front();
if (m_sendQ.empty())
return;
}
doWrite();
});
}
template <typename Handler, unsigned MaxDatagramSize>
void UDPSocket<Handler, MaxDatagramSize>::disconnectWithError(boost::system::error_code _ec)
{
// If !started and already stopped, shutdown has already occured. (EOF or Operation canceled)
if (!m_started && m_closed && !m_socket.is_open() /* todo: veirfy this logic*/)
return;
assert(_ec);
{
// disconnect-operation following prior non-zero errors are ignored
Guard l(x_socketError);
if (m_socketError != boost::system::error_code())
return;
m_socketError = _ec;
}
// TODO: (if non-zero error) schedule high-priority writes
// prevent concurrent disconnect
bool expected = true;
if (!m_started.compare_exchange_strong(expected, false))
return;
// set m_closed to true to prevent undeliverable egress messages
bool wasClosed = m_closed;
m_closed = true;
// close sockets
boost::system::error_code ec;
m_socket.shutdown(bi::udp::socket::shutdown_both, ec);
m_socket.close();
// socket never started if it never left stopped-state (pre-handshake)
if (wasClosed)
return;
m_host.onDisconnected(this);
}
}
}

4
test/boostTest.cpp

@ -21,4 +21,8 @@
*/
#define BOOST_TEST_MODULE EthereumTests
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wunused-parameter"
#include <boost/test/included/unit_test.hpp>
#pragma warning(pop)
#pragma GCC diagnostic pop

21
test/kademlia.cpp

@ -0,0 +1,21 @@
/*
This file is part of cpp-ethereum.
cpp-ethereum is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
cpp-ethereum is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with cpp-ethereum. If not, see <http://www.gnu.org/licenses/>.
*/
/** @file kademlia.cpp
* @author Alex Leverington <nessence@gmail.com>
* @date 2014
*/

214
test/net.cpp

@ -0,0 +1,214 @@
/*
This file is part of cpp-ethereum.
cpp-ethereum is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
cpp-ethereum is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with cpp-ethereum. If not, see <http://www.gnu.org/licenses/>.
*/
/** @file net.cpp
* @author Alex Leverington <nessence@gmail.com>
* @date 2014
*/
#include <boost/test/unit_test.hpp>
#include <libdevcore/Worker.h>
#include <libdevcrypto/Common.h>
#include <libp2p/UDP.h>
#include <libp2p/NodeTable.h>
using namespace std;
using namespace dev;
using namespace dev::p2p;
namespace ba = boost::asio;
namespace bi = ba::ip;
BOOST_AUTO_TEST_SUITE(p2p)
/**
* Only used for testing. Not useful beyond tests.
*/
class TestHost: public Worker
{
public:
TestHost(): Worker("test",0), m_io() {};
virtual ~TestHost() { m_io.stop(); stopWorking(); }
void start() { startWorking(); }
void doWork() { m_io.run(); }
void doneWorking() { m_io.reset(); m_io.poll(); m_io.reset(); }
protected:
ba::io_service m_io;
};
struct TestNodeTable: public NodeTable
{
/// Constructor
TestNodeTable(ba::io_service& _io, KeyPair _alias, uint16_t _port = 30300): NodeTable(_io, _alias, _port) {}
static std::vector<std::pair<KeyPair,unsigned>> createTestNodes(unsigned _count)
{
std::vector<std::pair<KeyPair,unsigned>> ret;
asserts(_count < 1000);
static uint16_t s_basePort = 30500;
ret.clear();
for (unsigned i = 0; i < _count; i++)
{
KeyPair k = KeyPair::create();
ret.push_back(make_pair(k,s_basePort+i));
}
return std::move(ret);
}
void pingTestNodes(std::vector<std::pair<KeyPair,unsigned>> const& _testNodes)
{
bi::address ourIp = bi::address::from_string("127.0.0.1");
for (auto& n: _testNodes)
{
ping(bi::udp::endpoint(ourIp, n.second));
this_thread::sleep_for(chrono::milliseconds(2));
}
}
void populateTestNodes(std::vector<std::pair<KeyPair,unsigned>> const& _testNodes, size_t _count = 0)
{
if (!_count)
_count = _testNodes.size();
bi::address ourIp = bi::address::from_string("127.0.0.1");
for (auto& n: _testNodes)
if (_count--)
noteNode(n.first.pub(), bi::udp::endpoint(ourIp, n.second));
else
break;
}
void reset()
{
Guard l(x_state);
for (auto& n: m_state) n.nodes.clear();
}
};
/**
* Only used for testing. Not useful beyond tests.
*/
struct TestNodeTableHost: public TestHost
{
TestNodeTableHost(unsigned _count = 8): m_alias(KeyPair::create()), nodeTable(new TestNodeTable(m_io, m_alias)), testNodes(TestNodeTable::createTestNodes(_count)) {};
~TestNodeTableHost() { m_io.stop(); stopWorking(); }
void setup() { for (auto n: testNodes) nodeTables.push_back(make_shared<TestNodeTable>(m_io,n.first,n.second)); }
void pingAll() { for (auto& t: nodeTables) t->pingTestNodes(testNodes); }
void populateAll(size_t _count = 0) { for (auto& t: nodeTables) t->populateTestNodes(testNodes, _count); }
void populate(size_t _count = 0) { nodeTable->populateTestNodes(testNodes, _count); }
KeyPair m_alias;
shared_ptr<TestNodeTable> nodeTable;
std::vector<std::pair<KeyPair,unsigned>> testNodes; // keypair and port
std::vector<shared_ptr<TestNodeTable>> nodeTables;
};
class TestUDPSocket: UDPSocketEvents, public TestHost
{
public:
TestUDPSocket(): m_socket(new UDPSocket<TestUDPSocket, 1024>(m_io, *this, 30300)) {}
void onDisconnected(UDPSocketFace*) {};
void onReceived(UDPSocketFace*, bi::udp::endpoint const&, bytesConstRef _packet) { if (_packet.toString() == "AAAA") success = true; }
shared_ptr<UDPSocket<TestUDPSocket, 1024>> m_socket;
bool success = false;
};
BOOST_AUTO_TEST_CASE(test_neighbours_packet)
{
KeyPair k = KeyPair::create();
std::vector<std::pair<KeyPair,unsigned>> testNodes(TestNodeTable::createTestNodes(16));
bi::udp::endpoint to(boost::asio::ip::address::from_string("127.0.0.1"), 30000);
Neighbours out(to);
for (auto n: testNodes)
{
Neighbours::Node node;
node.ipAddress = boost::asio::ip::address::from_string("127.0.0.1").to_string();
node.port = n.second;
node.node = n.first.pub();
out.nodes.push_back(node);
}
out.sign(k.sec());
bytesConstRef packet(out.data.data(), out.data.size());
bytesConstRef rlpBytes(packet.cropped(97, packet.size() - 97));
Neighbours in = Neighbours::fromBytesConstRef(to, rlpBytes);
int count = 0;
for (auto n: in.nodes)
{
BOOST_REQUIRE_EQUAL(testNodes[count].second, n.port);
BOOST_REQUIRE_EQUAL(testNodes[count].first.pub(), n.node);
BOOST_REQUIRE_EQUAL(sha3(testNodes[count].first.pub()), sha3(n.node));
count++;
}
}
BOOST_AUTO_TEST_CASE(test_findnode_neighbours)
{
// Executing findNode should result in a list which is serialized
// into Neighbours packet. Neighbours packet should then be deserialized
// into the same list of nearest nodes.
}
BOOST_AUTO_TEST_CASE(test_windows_template)
{
bi::udp::endpoint ep;
PingNode p(ep);
}
BOOST_AUTO_TEST_CASE(kademlia)
{
// Not yet a 'real' test.
TestNodeTableHost node(8);
node.start();
node.nodeTable->join(); // ideally, joining with empty node table logs warning we can check for
node.setup();
node.populate();
clog << "NodeTable:\n" << *node.nodeTable.get() << endl;
node.populateAll();
clog << "NodeTable:\n" << *node.nodeTable.get() << endl;
node.nodeTable->reset();
clog << "NodeTable:\n" << *node.nodeTable.get() << endl;
node.populate(1);
clog << "NodeTable:\n" << *node.nodeTable.get() << endl;
node.nodeTable->join();
this_thread::sleep_for(chrono::milliseconds(2000));
clog << "NodeTable:\n" << *node.nodeTable.get() << endl;
}
BOOST_AUTO_TEST_CASE(test_udp_once)
{
UDPDatagram d(bi::udp::endpoint(boost::asio::ip::address::from_string("127.0.0.1"), 30300), bytes({65,65,65,65}));
TestUDPSocket a; a.m_socket->connect(); a.start();
a.m_socket->send(d);
this_thread::sleep_for(chrono::seconds(1));
BOOST_REQUIRE_EQUAL(true, a.success);
}
BOOST_AUTO_TEST_SUITE_END()

55
test/network.cpp

@ -1,55 +0,0 @@
/*
This file is part of cpp-ethereum.
cpp-ethereum is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
cpp-ethereum is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with cpp-ethereum. If not, see <http://www.gnu.org/licenses/>.
*/
/** @file network.cpp
* @author Marko Simovic <markobarko@gmail.com>
* @date 2014
* Basic networking tests
*/
#include <boost/test/unit_test.hpp>
#include <boost/filesystem/operations.hpp>
#include <libethereum/Client.h>
#include <libethereum/BlockChain.h>
#include <libethereum/EthereumHost.h>
#include "TestHelper.h"
using namespace std;
using namespace dev;
using namespace dev::eth;
// Disabled since tests shouldn't block (not the worst offender, but timeout should be reduced anyway).
/*
BOOST_AUTO_TEST_CASE(listen_port_busy)
{
short port = 20000;
//make use of the port ahead of our client
ba::io_service ioService;
bi::tcp::endpoint endPoint(bi::tcp::v4(), port);
bi::tcp::acceptor acceptor(ioService, endPoint);
acceptor.listen(10);
//prepare client and try to listen on same, used, port
Client c1("TestClient1", KeyPair::create().address(),
(boost::filesystem::temp_directory_path() / boost::filesystem::unique_path()).string());
c1.startNetwork(port);
BOOST_REQUIRE(c1.haveNetwork());
BOOST_REQUIRE(c1.peerServer()->listenPort() != 0);
BOOST_REQUIRE(c1.peerServer()->listenPort() != port);
}
*/
Loading…
Cancel
Save