Browse Source

updates for code-review

cl-refactor
subtly 10 years ago
parent
commit
e9538b23c8
  1. 1
      libp2p/Common.h
  2. 44
      libp2p/Host.cpp
  3. 47
      libp2p/Host.h
  4. 153
      libp2p/NodeTable.cpp
  5. 164
      libp2p/NodeTable.h
  6. 20
      libp2p/Session.cpp
  7. 6
      libp2p/UDP.h
  8. 2
      libwebthree/WebThree.cpp
  9. 2
      libwebthree/WebThree.h
  10. 8
      test/net.cpp
  11. 22
      test/whisperTopic.cpp

1
libp2p/Common.h

@ -169,6 +169,7 @@ struct Node
NodeIPEndpoint endpoint; NodeIPEndpoint endpoint;
/// If true, node will not be removed from Node list. /// If true, node will not be removed from Node list.
// TODO: p2p implement
bool required = false; bool required = false;
virtual operator bool() const { return (bool)id; } virtual operator bool() const { return (bool)id; }

44
libp2p/Host.cpp

@ -53,7 +53,7 @@ Host::Host(std::string const& _clientVersion, NetworkPreferences const& _n, byte
m_ifAddresses(Network::getInterfaceAddresses()), m_ifAddresses(Network::getInterfaceAddresses()),
m_ioService(2), m_ioService(2),
m_tcp4Acceptor(m_ioService), m_tcp4Acceptor(m_ioService),
m_alias(getNetworkAlias(_restoreNetwork)), m_alias(networkAlias(_restoreNetwork)),
m_lastPing(chrono::time_point<chrono::steady_clock>::min()) m_lastPing(chrono::time_point<chrono::steady_clock>::min())
{ {
for (auto address: m_ifAddresses) for (auto address: m_ifAddresses)
@ -182,7 +182,7 @@ void Host::onNodeTableEvent(NodeId const& _n, NodeTableEventType const& _e)
{ {
clog(NetNote) << "p2p.host.nodeTable.events.nodeEntryAdded " << _n; clog(NetNote) << "p2p.host.nodeTable.events.nodeEntryAdded " << _n;
auto n = (*m_nodeTable)[_n]; auto n = m_nodeTable->node(_n);
if (n) if (n)
{ {
RecursiveGuard l(x_sessions); RecursiveGuard l(x_sessions);
@ -195,7 +195,7 @@ void Host::onNodeTableEvent(NodeId const& _n, NodeTableEventType const& _e)
} }
p->endpoint.tcp = n.endpoint.tcp; p->endpoint.tcp = n.endpoint.tcp;
// TODO: Implement similar to doFindNode. Attempt connecting to nodes // TODO: Implement similar to discover. Attempt connecting to nodes
// until ideal peer count is reached; if all nodes are tried, // until ideal peer count is reached; if all nodes are tried,
// repeat. Notably, this is an integrated process such that // repeat. Notably, this is an integrated process such that
// when onNodeTableEvent occurs we should also update +/- // when onNodeTableEvent occurs we should also update +/-
@ -442,9 +442,9 @@ void Host::connect(std::shared_ptr<Peer> const& _p)
Peer *nptr = _p.get(); Peer *nptr = _p.get();
{ {
Guard l(x_pendingNodeConns); Guard l(x_pendingNodeConns);
if (m_pendingNodeConns.count(nptr)) if (m_pendingPeerConns.count(nptr))
return; return;
m_pendingNodeConns.insert(nptr); m_pendingPeerConns.insert(nptr);
} }
clog(NetConnect) << "Attempting connection to node" << _p->id.abridged() << "@" << _p->peerEndpoint() << "from" << id().abridged(); clog(NetConnect) << "Attempting connection to node" << _p->id.abridged() << "@" << _p->peerEndpoint() << "from" << id().abridged();
@ -454,25 +454,25 @@ void Host::connect(std::shared_ptr<Peer> const& _p)
if (ec) if (ec)
{ {
clog(NetConnect) << "Connection refused to node" << _p->id.abridged() << "@" << _p->peerEndpoint() << "(" << ec.message() << ")"; clog(NetConnect) << "Connection refused to node" << _p->id.abridged() << "@" << _p->peerEndpoint() << "(" << ec.message() << ")";
_p->lastDisconnect = TCPError; _p->m_lastDisconnect = TCPError;
_p->lastAttempted = std::chrono::system_clock::now(); _p->m_lastAttempted = std::chrono::system_clock::now();
} }
else else
{ {
clog(NetConnect) << "Connected to" << _p->id.abridged() << "@" << _p->peerEndpoint(); clog(NetConnect) << "Connected to" << _p->id.abridged() << "@" << _p->peerEndpoint();
_p->lastConnected = std::chrono::system_clock::now(); _p->m_lastConnected = std::chrono::system_clock::now();
auto ps = make_shared<Session>(this, std::move(*s), _p); auto ps = make_shared<Session>(this, std::move(*s), _p);
ps->start(); ps->start();
} }
delete s; delete s;
Guard l(x_pendingNodeConns); Guard l(x_pendingNodeConns);
m_pendingNodeConns.erase(nptr); m_pendingPeerConns.erase(nptr);
}); });
} }
PeerSessionInfos Host::peers() const PeerSessionInfos Host::peerSessionInfo() const
{ {
if (!m_run) if (!m_run)
return PeerSessionInfos(); return PeerSessionInfos();
@ -622,7 +622,7 @@ bytes Host::saveNetwork() const
// TODO: alpha: Figure out why it ever shares these ports.//p.address.port() >= 30300 && p.address.port() <= 30305 && // TODO: alpha: Figure out why it ever shares these ports.//p.address.port() >= 30300 && p.address.port() <= 30305 &&
// TODO: alpha: if/how to save private addresses // TODO: alpha: if/how to save private addresses
// Only save peers which have connected within 2 days, with properly-advertised port and public IP address // Only save peers which have connected within 2 days, with properly-advertised port and public IP address
if (chrono::system_clock::now() - p.lastConnected < chrono::seconds(3600 * 48) && p.peerEndpoint().port() > 0 && p.peerEndpoint().port() < /*49152*/32768 && p.id != id() && !isPrivateAddress(p.peerEndpoint().address())) if (chrono::system_clock::now() - p.m_lastConnected < chrono::seconds(3600 * 48) && p.peerEndpoint().port() > 0 && p.peerEndpoint().port() < /*49152*/32768 && p.id != id() && !isPrivateAddress(p.peerEndpoint().address()))
{ {
network.appendList(10); network.appendList(10);
if (p.peerEndpoint().address().is_v4()) if (p.peerEndpoint().address().is_v4())
@ -631,15 +631,15 @@ bytes Host::saveNetwork() const
network << p.peerEndpoint().address().to_v6().to_bytes(); network << p.peerEndpoint().address().to_v6().to_bytes();
// TODO: alpha: replace 0 with trust-state of node // TODO: alpha: replace 0 with trust-state of node
network << p.peerEndpoint().port() << p.id << 0 network << p.peerEndpoint().port() << p.id << 0
<< chrono::duration_cast<chrono::seconds>(p.lastConnected.time_since_epoch()).count() << chrono::duration_cast<chrono::seconds>(p.m_lastConnected.time_since_epoch()).count()
<< chrono::duration_cast<chrono::seconds>(p.lastAttempted.time_since_epoch()).count() << chrono::duration_cast<chrono::seconds>(p.m_lastAttempted.time_since_epoch()).count()
<< p.failedAttempts << (unsigned)p.lastDisconnect << p.score << p.rating; << p.m_failedAttempts << (unsigned)p.m_lastDisconnect << p.m_score << p.m_rating;
count++; count++;
} }
} }
} }
auto state = m_nodeTable->state(); auto state = m_nodeTable->snapshot();
state.sort(); state.sort();
for (auto const& s: state) for (auto const& s: state)
{ {
@ -693,12 +693,12 @@ void Host::restoreNetwork(bytesConstRef _b)
{ {
shared_ptr<Peer> p = make_shared<Peer>(); shared_ptr<Peer> p = make_shared<Peer>();
p->id = id; p->id = id;
p->lastConnected = chrono::system_clock::time_point(chrono::seconds(i[4].toInt<unsigned>())); p->m_lastConnected = chrono::system_clock::time_point(chrono::seconds(i[4].toInt<unsigned>()));
p->lastAttempted = chrono::system_clock::time_point(chrono::seconds(i[5].toInt<unsigned>())); p->m_lastAttempted = chrono::system_clock::time_point(chrono::seconds(i[5].toInt<unsigned>()));
p->failedAttempts = i[6].toInt<unsigned>(); p->m_failedAttempts = i[6].toInt<unsigned>();
p->lastDisconnect = (DisconnectReason)i[7].toInt<unsigned>(); p->m_lastDisconnect = (DisconnectReason)i[7].toInt<unsigned>();
p->score = (int)i[8].toInt<unsigned>(); p->m_score = (int)i[8].toInt<unsigned>();
p->rating = (int)i[9].toInt<unsigned>(); p->m_rating = (int)i[9].toInt<unsigned>();
p->endpoint.tcp = tcp; p->endpoint.tcp = tcp;
p->endpoint.udp = udp; p->endpoint.udp = udp;
m_peers[p->id] = p; m_peers[p->id] = p;
@ -708,7 +708,7 @@ void Host::restoreNetwork(bytesConstRef _b)
} }
} }
KeyPair Host::getNetworkAlias(bytesConstRef _b) KeyPair Host::networkAlias(bytesConstRef _b)
{ {
RLP r(_b); RLP r(_b);
if (r.itemCount() == 3 && r[0].isInt() && r[0].toInt<int>() == 1) if (r.itemCount() == 3 && r[0].isInt() && r[0].toInt<int>() == 1)

47
libp2p/Host.h

@ -79,37 +79,37 @@ public:
bool isOffline() const { return !m_session.lock(); } bool isOffline() const { return !m_session.lock(); }
bi::tcp::endpoint const& peerEndpoint() const { return endpoint.tcp; } bi::tcp::endpoint const& peerEndpoint() const { return endpoint.tcp; }
int score = 0; ///< All time cumulative. int m_score = 0; ///< All time cumulative.
int rating = 0; ///< Trending. int m_rating = 0; ///< Trending.
/// Network Availability /// Network Availability
std::chrono::system_clock::time_point lastConnected; std::chrono::system_clock::time_point m_lastConnected;
std::chrono::system_clock::time_point lastAttempted; std::chrono::system_clock::time_point m_lastAttempted;
unsigned failedAttempts = 0; unsigned m_failedAttempts = 0;
DisconnectReason lastDisconnect = NoDisconnect; ///< Reason for disconnect that happened last. DisconnectReason m_lastDisconnect = NoDisconnect; ///< Reason for disconnect that happened last.
virtual bool operator<(Peer const& _p) const virtual bool operator<(Peer const& _p) const
{ {
if (isOffline() != _p.isOffline()) if (isOffline() != _p.isOffline())
return isOffline(); return isOffline();
else if (isOffline()) else if (isOffline())
if (lastAttempted == _p.lastAttempted) if (m_lastAttempted == _p.m_lastAttempted)
return failedAttempts < _p.failedAttempts; return m_failedAttempts < _p.m_failedAttempts;
else else
return lastAttempted < _p.lastAttempted; return m_lastAttempted < _p.m_lastAttempted;
else else
if (score == _p.score) if (m_score == _p.m_score)
if (rating == _p.rating) if (m_rating == _p.m_rating)
if (failedAttempts == _p.failedAttempts) if (m_failedAttempts == _p.m_failedAttempts)
return id < _p.id; return id < _p.id;
else else
return failedAttempts < _p.failedAttempts; return m_failedAttempts < _p.m_failedAttempts;
else else
return rating < _p.rating; return m_rating < _p.m_rating;
else else
return score < _p.score; return m_score < _p.m_score;
} }
protected: protected:
@ -121,9 +121,14 @@ using Peers = std::vector<Peer>;
class HostNodeTableHandler: public NodeTableEventHandler class HostNodeTableHandler: public NodeTableEventHandler
{ {
friend class Host; public:
HostNodeTableHandler(Host& _host); HostNodeTableHandler(Host& _host);
Host const& host() const { return m_host; }
private:
virtual void processEvent(NodeId const& _n, NodeTableEventType const& _e); virtual void processEvent(NodeId const& _n, NodeTableEventType const& _e);
Host& m_host; Host& m_host;
}; };
@ -181,7 +186,7 @@ public:
void setIdealPeerCount(unsigned _n) { m_idealPeerCount = _n; } void setIdealPeerCount(unsigned _n) { m_idealPeerCount = _n; }
/// Get peer information. /// Get peer information.
PeerSessionInfos peers() const; PeerSessionInfos peerSessionInfo() const;
/// Get number of peers connected. /// Get number of peers connected.
size_t peerCount() const; size_t peerCount() const;
@ -196,7 +201,7 @@ public:
bytes saveNetwork() const; bytes saveNetwork() const;
// TODO: P2P this should be combined with peers into a HostStat object of some kind; coalesce data, as it's only used for status information. // TODO: P2P this should be combined with peers into a HostStat object of some kind; coalesce data, as it's only used for status information.
Peers nodes() const { RecursiveGuard l(x_sessions); Peers ret; for (auto const& i: m_peers) ret.push_back(*i.second); return ret; } Peers getPeers() const { RecursiveGuard l(x_sessions); Peers ret; for (auto const& i: m_peers) ret.push_back(*i.second); return ret; }
void setNetworkPreferences(NetworkPreferences const& _p) { auto had = isStarted(); if (had) stop(); m_netPrefs = _p; if (had) start(); } void setNetworkPreferences(NetworkPreferences const& _p) { auto had = isStarted(); if (had) stop(); m_netPrefs = _p; if (had) start(); }
@ -252,7 +257,7 @@ private:
virtual void doneWorking(); virtual void doneWorking();
/// Get or create host identifier (KeyPair). /// Get or create host identifier (KeyPair).
static KeyPair getNetworkAlias(bytesConstRef _b); static KeyPair networkAlias(bytesConstRef _b);
bytes m_restoreNetwork; ///< Set by constructor and used to set Host key and restore network peers & nodes. bytes m_restoreNetwork; ///< Set by constructor and used to set Host key and restore network peers & nodes.
@ -274,7 +279,7 @@ private:
std::unique_ptr<boost::asio::deadline_timer> m_timer; ///< Timer which, when network is running, calls scheduler() every c_timerInterval ms. std::unique_ptr<boost::asio::deadline_timer> m_timer; ///< Timer which, when network is running, calls scheduler() every c_timerInterval ms.
static const unsigned c_timerInterval = 100; ///< Interval which m_timer is run when network is connected. static const unsigned c_timerInterval = 100; ///< Interval which m_timer is run when network is connected.
std::set<Peer*> m_pendingNodeConns; /// Used only by connect(Peer&) to limit concurrently connecting to same node. See connect(shared_ptr<Peer>const&). std::set<Peer*> m_pendingPeerConns; /// Used only by connect(Peer&) to limit concurrently connecting to same node. See connect(shared_ptr<Peer>const&).
Mutex x_pendingNodeConns; Mutex x_pendingNodeConns;
bi::tcp::endpoint m_tcpPublic; ///< Our public listening endpoint. bi::tcp::endpoint m_tcpPublic; ///< Our public listening endpoint.

153
libp2p/NodeTable.cpp

@ -24,15 +24,15 @@ using namespace std;
using namespace dev; using namespace dev;
using namespace dev::p2p; using namespace dev::p2p;
NodeEntry::NodeEntry(Node _src, Public _pubk, NodeIPEndpoint _gw): Node(_pubk, _gw), distance(NodeTable::dist(_src.id,_pubk)) {} NodeEntry::NodeEntry(Node _src, Public _pubk, NodeIPEndpoint _gw): Node(_pubk, _gw), distance(NodeTable::distance(_src.id,_pubk)) {}
NodeEntry::NodeEntry(Node _src, Public _pubk, bi::udp::endpoint _udp): Node(_pubk, NodeIPEndpoint(_udp)), distance(NodeTable::dist(_src.id,_pubk)) {} NodeEntry::NodeEntry(Node _src, Public _pubk, bi::udp::endpoint _udp): Node(_pubk, NodeIPEndpoint(_udp)), distance(NodeTable::distance(_src.id,_pubk)) {}
NodeTable::NodeTable(ba::io_service& _io, KeyPair _alias, uint16_t _udp): NodeTable::NodeTable(ba::io_service& _io, KeyPair _alias, uint16_t _udp):
m_node(Node(_alias.pub(), bi::udp::endpoint())), m_node(Node(_alias.pub(), bi::udp::endpoint())),
m_secret(_alias.sec()), m_secret(_alias.sec()),
m_io(_io), m_io(_io),
m_socket(new NodeSocket(m_io, *this, _udp)), m_socket(new NodeSocket(m_io, *this, _udp)),
m_socketPtr(m_socket.get()), m_socketPointer(m_socket.get()),
m_bucketRefreshTimer(m_io), m_bucketRefreshTimer(m_io),
m_evictionCheckTimer(m_io) m_evictionCheckTimer(m_io)
{ {
@ -42,15 +42,18 @@ NodeTable::NodeTable(ba::io_service& _io, KeyPair _alias, uint16_t _udp):
m_state[i].modified = chrono::steady_clock::now() - chrono::seconds(1); m_state[i].modified = chrono::steady_clock::now() - chrono::seconds(1);
} }
m_socketPtr->connect(); m_socketPointer->connect();
doRefreshBuckets(boost::system::error_code()); doRefreshBuckets(boost::system::error_code());
} }
NodeTable::~NodeTable() NodeTable::~NodeTable()
{ {
// Cancel scheduled tasks to ensure.
m_evictionCheckTimer.cancel(); m_evictionCheckTimer.cancel();
m_bucketRefreshTimer.cancel(); m_bucketRefreshTimer.cancel();
m_socketPtr->disconnect();
// Disconnect socket so that deallocation is safe.
m_socketPointer->disconnect();
} }
void NodeTable::processEvents() void NodeTable::processEvents()
@ -87,14 +90,14 @@ shared_ptr<NodeEntry> NodeTable::addNode(Node const& _node)
m_nodes[_node.id] = ret; m_nodes[_node.id] = ret;
PingNode p(_node.endpoint.udp, m_node.endpoint.udp.address().to_string(), m_node.endpoint.udp.port()); PingNode p(_node.endpoint.udp, m_node.endpoint.udp.address().to_string(), m_node.endpoint.udp.port());
p.sign(m_secret); p.sign(m_secret);
m_socketPtr->send(p); m_socketPointer->send(p);
} }
return move(ret); return move(ret);
} }
void NodeTable::join() void NodeTable::discover()
{ {
doFindNode(m_node.id); discover(m_node.id);
} }
list<NodeId> NodeTable::nodes() const list<NodeId> NodeTable::nodes() const
@ -106,7 +109,7 @@ list<NodeId> NodeTable::nodes() const
return move(nodes); return move(nodes);
} }
list<NodeEntry> NodeTable::state() const list<NodeEntry> NodeTable::snapshot() const
{ {
list<NodeEntry> ret; list<NodeEntry> ret;
Guard l(x_state); Guard l(x_state);
@ -116,42 +119,35 @@ list<NodeEntry> NodeTable::state() const
return move(ret); return move(ret);
} }
Node NodeTable::operator[](NodeId _id) Node NodeTable::node(NodeId _id)
{ {
Guard l(x_nodes); Guard l(x_nodes);
auto n = m_nodes[_id]; auto n = m_nodes[_id];
return !!n ? *n : Node(); return !!n ? *n : Node();
} }
shared_ptr<NodeEntry> NodeTable::getNodeEntry(NodeId _id) shared_ptr<NodeEntry> NodeTable::nodeEntry(NodeId _id)
{ {
Guard l(x_nodes); Guard l(x_nodes);
auto n = m_nodes[_id]; auto n = m_nodes[_id];
return !!n ? move(n) : move(shared_ptr<NodeEntry>()); return !!n ? move(n) : move(shared_ptr<NodeEntry>());
} }
void NodeTable::requestNeighbours(NodeEntry const& _node, NodeId _target) const void NodeTable::discover(NodeId _node, unsigned _round, shared_ptr<set<shared_ptr<NodeEntry>>> _tried)
{ {
FindNode p(_node.endpoint.udp, _target); if (!m_socketPointer->isOpen() || _round == s_maxSteps)
p.sign(m_secret);
m_socketPtr->send(p);
}
void NodeTable::doFindNode(NodeId _node, unsigned _round, shared_ptr<set<shared_ptr<NodeEntry>>> _tried)
{
if (!m_socketPtr->isOpen() || _round == s_maxSteps)
return; return;
if (_round == s_maxSteps) if (_round == s_maxSteps)
{ {
clog(NodeTableNote) << "Terminating doFindNode after " << _round << " rounds."; clog(NodeTableNote) << "Terminating discover after " << _round << " rounds.";
return; return;
} }
else if(!_round && !_tried) else if(!_round && !_tried)
// initialized _tried on first round // initialized _tried on first round
_tried.reset(new set<shared_ptr<NodeEntry>>()); _tried.reset(new set<shared_ptr<NodeEntry>>());
auto nearest = findNearest(_node); auto nearest = nearestNodeEntries(_node);
list<shared_ptr<NodeEntry>> tried; list<shared_ptr<NodeEntry>> tried;
for (unsigned i = 0; i < nearest.size() && tried.size() < s_alpha; i++) for (unsigned i = 0; i < nearest.size() && tried.size() < s_alpha; i++)
if (!_tried->count(nearest[i])) if (!_tried->count(nearest[i]))
@ -160,12 +156,12 @@ void NodeTable::doFindNode(NodeId _node, unsigned _round, shared_ptr<set<shared_
tried.push_back(r); tried.push_back(r);
FindNode p(r->endpoint.udp, _node); FindNode p(r->endpoint.udp, _node);
p.sign(m_secret); p.sign(m_secret);
m_socketPtr->send(p); m_socketPointer->send(p);
} }
if (tried.empty()) if (tried.empty())
{ {
clog(NodeTableNote) << "Terminating doFindNode after " << _round << " rounds."; clog(NodeTableNote) << "Terminating discover after " << _round << " rounds.";
return; return;
} }
@ -181,15 +177,15 @@ void NodeTable::doFindNode(NodeId _node, unsigned _round, shared_ptr<set<shared_
{ {
if (_ec) if (_ec)
return; return;
doFindNode(_node, _round + 1, _tried); discover(_node, _round + 1, _tried);
}); });
} }
vector<shared_ptr<NodeEntry>> NodeTable::findNearest(NodeId _target) vector<shared_ptr<NodeEntry>> NodeTable::nearestNodeEntries(NodeId _target)
{ {
// send s_alpha FindNode packets to nodes we know, closest to target // send s_alpha FindNode packets to nodes we know, closest to target
static unsigned lastBin = s_bins - 1; static unsigned lastBin = s_bins - 1;
unsigned head = dist(m_node.id, _target); unsigned head = distance(m_node.id, _target);
unsigned tail = head == 0 ? lastBin : (head - 1) % s_bins; unsigned tail = head == 0 ? lastBin : (head - 1) % s_bins;
map<unsigned, list<shared_ptr<NodeEntry>>> found; map<unsigned, list<shared_ptr<NodeEntry>>> found;
@ -204,7 +200,7 @@ vector<shared_ptr<NodeEntry>> NodeTable::findNearest(NodeId _target)
if (auto p = n.lock()) if (auto p = n.lock())
{ {
if (count < s_bucketSize) if (count < s_bucketSize)
found[dist(_target, p->id)].push_back(p); found[distance(_target, p->id)].push_back(p);
else else
break; break;
} }
@ -214,7 +210,7 @@ vector<shared_ptr<NodeEntry>> NodeTable::findNearest(NodeId _target)
if (auto p = n.lock()) if (auto p = n.lock())
{ {
if (count < s_bucketSize) if (count < s_bucketSize)
found[dist(_target, p->id)].push_back(p); found[distance(_target, p->id)].push_back(p);
else else
break; break;
} }
@ -231,7 +227,7 @@ vector<shared_ptr<NodeEntry>> NodeTable::findNearest(NodeId _target)
if (auto p = n.lock()) if (auto p = n.lock())
{ {
if (count < s_bucketSize) if (count < s_bucketSize)
found[dist(_target, p->id)].push_back(p); found[distance(_target, p->id)].push_back(p);
else else
break; break;
} }
@ -245,7 +241,7 @@ vector<shared_ptr<NodeEntry>> NodeTable::findNearest(NodeId _target)
if (auto p = n.lock()) if (auto p = n.lock())
{ {
if (count < s_bucketSize) if (count < s_bucketSize)
found[dist(_target, p->id)].push_back(p); found[distance(_target, p->id)].push_back(p);
else else
break; break;
} }
@ -263,7 +259,7 @@ void NodeTable::ping(bi::udp::endpoint _to) const
{ {
PingNode p(_to, m_node.endpoint.udp.address().to_string(), m_node.endpoint.udp.port()); PingNode p(_to, m_node.endpoint.udp.address().to_string(), m_node.endpoint.udp.port());
p.sign(m_secret); p.sign(m_secret);
m_socketPtr->send(p); m_socketPointer->send(p);
} }
void NodeTable::ping(NodeEntry* _n) const void NodeTable::ping(NodeEntry* _n) const
@ -274,65 +270,64 @@ void NodeTable::ping(NodeEntry* _n) const
void NodeTable::evict(shared_ptr<NodeEntry> _leastSeen, shared_ptr<NodeEntry> _new) void NodeTable::evict(shared_ptr<NodeEntry> _leastSeen, shared_ptr<NodeEntry> _new)
{ {
if (!m_socketPtr->isOpen()) if (!m_socketPointer->isOpen())
return; return;
Guard l(x_evictions); {
m_evictions.push_back(EvictionTimeout(make_pair(_leastSeen->id,chrono::steady_clock::now()), _new->id)); Guard l(x_evictions);
if (m_evictions.size() == 1) m_evictions.push_back(EvictionTimeout(make_pair(_leastSeen->id,chrono::steady_clock::now()), _new->id));
doCheckEvictions(boost::system::error_code()); if (m_evictions.size() == 1)
doCheckEvictions(boost::system::error_code());
m_evictions.push_back(EvictionTimeout(make_pair(_leastSeen->id,chrono::steady_clock::now()), _new->id));
m_evictions.push_back(EvictionTimeout(make_pair(_leastSeen->id,chrono::steady_clock::now()), _new->id));
}
ping(_leastSeen.get()); ping(_leastSeen.get());
} }
void NodeTable::noteNode(Public const& _pubk, bi::udp::endpoint const& _endpoint) void NodeTable::noteActiveNode(Public const& _pubk, bi::udp::endpoint const& _endpoint)
{ {
if (_pubk == m_node.address()) if (_pubk == m_node.address())
return; return;
shared_ptr<NodeEntry> node(addNode(_pubk, _endpoint)); shared_ptr<NodeEntry> node(addNode(_pubk, _endpoint));
// todo: sometimes node is nullptr here // TODO p2p: old bug (maybe gone now) sometimes node is nullptr here
if (!!node) if (!!node)
noteNode(node);
}
void NodeTable::noteNode(shared_ptr<NodeEntry> _n)
{
shared_ptr<NodeEntry> contested;
{ {
NodeBucket& s = bucket(_n.get()); shared_ptr<NodeEntry> contested;
Guard l(x_state);
s.nodes.remove_if([&_n](weak_ptr<NodeEntry> n)
{
if (n.lock() == _n)
return true;
return false;
});
if (s.nodes.size() >= s_bucketSize)
{ {
contested = s.nodes.front().lock(); Guard l(x_state);
if (!contested) NodeBucket& s = bucket_UNSAFE(node.get());
s.nodes.remove_if([&node](weak_ptr<NodeEntry> n)
{
if (n.lock() == node)
return true;
return false;
});
if (s.nodes.size() >= s_bucketSize)
{ {
s.nodes.pop_front(); contested = s.nodes.front().lock();
s.nodes.push_back(_n); if (!contested)
{
s.nodes.pop_front();
s.nodes.push_back(node);
}
} }
else
s.nodes.push_back(node);
} }
else
s.nodes.push_back(_n); if (contested)
evict(contested, node);
} }
if (contested)
evict(contested, _n);
} }
void NodeTable::dropNode(shared_ptr<NodeEntry> _n) void NodeTable::dropNode(shared_ptr<NodeEntry> _n)
{ {
NodeBucket &s = bucket(_n.get());
{ {
Guard l(x_state); Guard l(x_state);
NodeBucket& s = bucket_UNSAFE(_n.get());
s.nodes.remove_if([&_n](weak_ptr<NodeEntry> n) { return n.lock() == _n; }); s.nodes.remove_if([&_n](weak_ptr<NodeEntry> n) { return n.lock() == _n; });
} }
{ {
@ -345,7 +340,7 @@ void NodeTable::dropNode(shared_ptr<NodeEntry> _n)
m_nodeEventHandler->appendEvent(_n->id, NodeEntryRemoved); m_nodeEventHandler->appendEvent(_n->id, NodeEntryRemoved);
} }
NodeTable::NodeBucket& NodeTable::bucket(NodeEntry const* _n) NodeTable::NodeBucket& NodeTable::bucket_UNSAFE(NodeEntry const* _n)
{ {
return m_state[_n->distance - 1]; return m_state[_n->distance - 1];
} }
@ -381,7 +376,7 @@ void NodeTable::onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytes
unsigned packetType = signedBytes[0]; unsigned packetType = signedBytes[0];
if (packetType && packetType < 4) if (packetType && packetType < 4)
noteNode(nodeid, _from); noteActiveNode(nodeid, _from);
bytesConstRef rlpBytes(_packet.cropped(h256::size + Signature::size + 1)); bytesConstRef rlpBytes(_packet.cropped(h256::size + Signature::size + 1));
RLP rlp(rlpBytes); RLP rlp(rlpBytes);
@ -398,10 +393,10 @@ void NodeTable::onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytes
for (auto it = m_evictions.begin(); it != m_evictions.end(); it++) for (auto it = m_evictions.begin(); it != m_evictions.end(); it++)
if (it->first.first == nodeid && it->first.second > std::chrono::steady_clock::now()) if (it->first.first == nodeid && it->first.second > std::chrono::steady_clock::now())
{ {
if (auto n = getNodeEntry(it->second)) if (auto n = nodeEntry(it->second))
dropNode(n); dropNode(n);
if (auto n = (*this)[it->first.first]) if (auto n = node(it->first.first))
addNode(n); addNode(n);
it = m_evictions.erase(it); it = m_evictions.erase(it);
@ -414,7 +409,7 @@ void NodeTable::onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytes
Neighbours in = Neighbours::fromBytesConstRef(_from, rlpBytes); Neighbours in = Neighbours::fromBytesConstRef(_from, rlpBytes);
// clog(NodeTableMessageSummary) << "Received " << in.nodes.size() << " Neighbours from " << _from.address().to_string() << ":" << _from.port(); // clog(NodeTableMessageSummary) << "Received " << in.nodes.size() << " Neighbours from " << _from.address().to_string() << ":" << _from.port();
for (auto n: in.nodes) for (auto n: in.nodes)
noteNode(n.node, bi::udp::endpoint(bi::address::from_string(n.ipAddress), n.port)); noteActiveNode(n.node, bi::udp::endpoint(bi::address::from_string(n.ipAddress), n.port));
break; break;
} }
@ -423,13 +418,13 @@ void NodeTable::onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytes
// clog(NodeTableMessageSummary) << "Received FindNode from " << _from.address().to_string() << ":" << _from.port(); // clog(NodeTableMessageSummary) << "Received FindNode from " << _from.address().to_string() << ":" << _from.port();
FindNode in = FindNode::fromBytesConstRef(_from, rlpBytes); FindNode in = FindNode::fromBytesConstRef(_from, rlpBytes);
vector<shared_ptr<NodeEntry>> nearest = findNearest(in.target); vector<shared_ptr<NodeEntry>> nearest = nearestNodeEntries(in.target);
static unsigned const nlimit = (m_socketPtr->maxDatagramSize - 11) / 86; static unsigned const nlimit = (m_socketPointer->maxDatagramSize - 11) / 86;
for (unsigned offset = 0; offset < nearest.size(); offset += nlimit) for (unsigned offset = 0; offset < nearest.size(); offset += nlimit)
{ {
Neighbours out(_from, nearest, offset, nlimit); Neighbours out(_from, nearest, offset, nlimit);
out.sign(m_secret); out.sign(m_secret);
m_socketPtr->send(out); m_socketPointer->send(out);
} }
break; break;
} }
@ -442,7 +437,7 @@ void NodeTable::onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytes
Pong p(_from); Pong p(_from);
p.echo = sha3(rlpBytes); p.echo = sha3(rlpBytes);
p.sign(m_secret); p.sign(m_secret);
m_socketPtr->send(p); m_socketPointer->send(p);
break; break;
} }
@ -459,7 +454,7 @@ void NodeTable::onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytes
void NodeTable::doCheckEvictions(boost::system::error_code const& _ec) void NodeTable::doCheckEvictions(boost::system::error_code const& _ec)
{ {
if (_ec || !m_socketPtr->isOpen()) if (_ec || !m_socketPointer->isOpen())
return; return;
auto self(shared_from_this()); auto self(shared_from_this());
@ -472,8 +467,8 @@ void NodeTable::doCheckEvictions(boost::system::error_code const& _ec)
bool evictionsRemain = false; bool evictionsRemain = false;
list<shared_ptr<NodeEntry>> drop; list<shared_ptr<NodeEntry>> drop;
{ {
Guard le(x_evictions);
Guard ln(x_nodes); Guard ln(x_nodes);
Guard le(x_evictions);
for (auto& e: m_evictions) for (auto& e: m_evictions)
if (chrono::steady_clock::now() - e.first.second > c_reqTimeout) if (chrono::steady_clock::now() - e.first.second > c_reqTimeout)
if (auto n = m_nodes[e.second]) if (auto n = m_nodes[e.second])
@ -496,7 +491,7 @@ void NodeTable::doRefreshBuckets(boost::system::error_code const& _ec)
return; return;
clog(NodeTableNote) << "refreshing buckets"; clog(NodeTableNote) << "refreshing buckets";
bool connected = m_socketPtr->isOpen(); bool connected = m_socketPointer->isOpen();
bool refreshed = false; bool refreshed = false;
if (connected) if (connected)
{ {

164
libp2p/NodeTable.h

@ -53,7 +53,7 @@ class NodeTableEventHandler
{ {
friend class NodeTable; friend class NodeTable;
public: public:
virtual void processEvent(NodeId const& _n, NodeTableEventType const& _e) =0; virtual void processEvent(NodeId const& _n, NodeTableEventType const& _e) = 0;
protected: protected:
/// Called by NodeTable on behalf of an implementation (Host) to process new events without blocking nodetable. /// Called by NodeTable on behalf of an implementation (Host) to process new events without blocking nodetable.
@ -65,7 +65,8 @@ protected:
if (!m_nodeEventHandler.size()) if (!m_nodeEventHandler.size())
return; return;
m_nodeEventHandler.unique(); m_nodeEventHandler.unique();
for (auto const& n: m_nodeEventHandler) events.push_back(std::make_pair(n,m_events[n])); for (auto const& n: m_nodeEventHandler)
events.push_back(std::make_pair(n,m_events[n]));
m_nodeEventHandler.clear(); m_nodeEventHandler.clear();
m_events.clear(); m_events.clear();
} }
@ -80,10 +81,17 @@ protected:
std::list<NodeId> m_nodeEventHandler; std::list<NodeId> m_nodeEventHandler;
std::map<NodeId, NodeTableEventType> m_events; std::map<NodeId, NodeTableEventType> m_events;
}; };
class NodeTable;
inline std::ostream& operator<<(std::ostream& _out, NodeTable const& _nodeTable);
/** /**
* NodeTable using modified kademlia for node discovery and preference. * NodeTable using modified kademlia for node discovery and preference.
* untouched buckets are refreshed if they have not been touched within an hour * Node table requires an IO service, creates a socket for incoming
* UDP messages and implements a kademlia-like protocol. Node requests and
* responses are used to build a node table which can be queried to
* obtain a list of potential nodes to connect to, and, passes events to
* Host whenever a node is added or removed to/from the table.
* *
* Thread-safety is ensured by modifying NodeEntry details via * Thread-safety is ensured by modifying NodeEntry details via
* shared_ptr replacement instead of mutating values. * shared_ptr replacement instead of mutating values.
@ -101,7 +109,7 @@ protected:
* @todo serialize evictions per-bucket * @todo serialize evictions per-bucket
* @todo store evictions in map, unit-test eviction logic * @todo store evictions in map, unit-test eviction logic
* @todo store root node in table * @todo store root node in table
* @todo encapsulate doFindNode into NetworkAlgorithm (task) * @todo encapsulate discover into NetworkAlgorithm (task)
* @todo Pong to include ip:port where ping was received * @todo Pong to include ip:port where ping was received
* @todo expiration and sha3(id) 'to' for messages which are replies (prevents replay) * @todo expiration and sha3(id) 'to' for messages which are replies (prevents replay)
* @todo cache Ping and FindSelf * @todo cache Ping and FindSelf
@ -117,33 +125,17 @@ protected:
*/ */
class NodeTable: UDPSocketEvents, public std::enable_shared_from_this<NodeTable> class NodeTable: UDPSocketEvents, public std::enable_shared_from_this<NodeTable>
{ {
friend std::ostream& operator<<(std::ostream& _out, NodeTable const& _nodeTable);
using NodeSocket = UDPSocket<NodeTable, 1280>; using NodeSocket = UDPSocket<NodeTable, 1280>;
using TimePoint = std::chrono::steady_clock::time_point; using TimePoint = std::chrono::steady_clock::time_point;
using EvictionTimeout = std::pair<std::pair<NodeId,TimePoint>,NodeId>; ///< First NodeId may be evicted and replaced with second NodeId. using EvictionTimeout = std::pair<std::pair<NodeId, TimePoint>, NodeId>; ///< First NodeId may be evicted and replaced with second NodeId.
public: public:
NodeTable(ba::io_service& _io, KeyPair _alias, uint16_t _udpPort = 30303); NodeTable(ba::io_service& _io, KeyPair _alias, uint16_t _udpPort = 30303);
~NodeTable(); ~NodeTable();
/// Constants for Kademlia, derived from address space. /// Returns distance based on xor metric two node ids. Used by NodeEntry and NodeTable.
static unsigned distance(NodeId const& _a, NodeId const& _b) { u512 d = _a ^ _b; unsigned ret; for (ret = 0; d >>= 1; ++ret) {}; return ret; }
static unsigned const s_addressByteSize = sizeof(NodeId); ///< Size of address type in bytes.
static unsigned const s_bits = 8 * s_addressByteSize; ///< Denoted by n in [Kademlia].
static unsigned const s_bins = s_bits - 1; ///< Size of m_state (excludes root, which is us).
static unsigned const s_maxSteps = boost::static_log2<s_bits>::value; ///< Max iterations of discovery. (doFindNode)
/// Chosen constants
static unsigned const s_bucketSize = 16; ///< Denoted by k in [Kademlia]. Number of nodes stored in each bucket.
static unsigned const s_alpha = 3; ///< Denoted by \alpha in [Kademlia]. Number of concurrent FindNode requests.
/// Intervals
boost::posix_time::milliseconds const c_evictionCheckInterval = boost::posix_time::milliseconds(75); ///< Interval at which eviction timeouts are checked.
std::chrono::milliseconds const c_reqTimeout = std::chrono::milliseconds(300); ///< How long to wait for requests (evict, find iterations).
std::chrono::seconds const c_bucketRefresh = std::chrono::seconds(3600); ///< Refresh interval prevents bucket from becoming stale. [Kademlia]
static unsigned dist(NodeId const& _a, NodeId const& _b) { u512 d = _a ^ _b; unsigned ret; for (ret = 0; d >>= 1; ++ret) {}; return ret; }
/// Set event handler for NodeEntryAdded and NodeEntryRemoved events. /// Set event handler for NodeEntryAdded and NodeEntryRemoved events.
void setEventHandler(NodeTableEventHandler* _handler) { m_nodeEventHandler.reset(_handler); } void setEventHandler(NodeTableEventHandler* _handler) { m_nodeEventHandler.reset(_handler); }
@ -157,18 +149,49 @@ public:
/// Add node. Node will be pinged if it's not already known. /// Add node. Node will be pinged if it's not already known.
std::shared_ptr<NodeEntry> addNode(Node const& _node); std::shared_ptr<NodeEntry> addNode(Node const& _node);
void join(); /// To be called when node table is empty. Runs node discovery with m_node.id as the target in order to populate node-table.
void discover();
NodeEntry root() const { return NodeEntry(m_node, m_node.publicKey(), m_node.endpoint.udp); } /// Returns list of node ids active in node table.
std::list<NodeId> nodes() const; std::list<NodeId> nodes() const;
unsigned size() const { return m_nodes.size(); }
std::list<NodeEntry> state() const;
/// Returns node count.
unsigned count() const { return m_nodes.size(); }
/// Returns snapshot of table.
std::list<NodeEntry> snapshot() const;
/// Returns true if node id is in node table.
bool haveNode(NodeId _id) { Guard l(x_nodes); return m_nodes.count(_id); } bool haveNode(NodeId _id) { Guard l(x_nodes); return m_nodes.count(_id); }
Node operator[](NodeId _id);
std::shared_ptr<NodeEntry> getNodeEntry(NodeId _id);
/// Returns the Node to the corresponding node id or the empty Node if that id is not found.
Node node(NodeId _id);
#ifndef BOOST_AUTO_TEST_SUITE
private:
#else
protected: protected:
#endif
/// Constants for Kademlia, derived from address space.
static unsigned const s_addressByteSize = sizeof(NodeId); ///< Size of address type in bytes.
static unsigned const s_bits = 8 * s_addressByteSize; ///< Denoted by n in [Kademlia].
static unsigned const s_bins = s_bits - 1; ///< Size of m_state (excludes root, which is us).
static unsigned const s_maxSteps = boost::static_log2<s_bits>::value; ///< Max iterations of discovery. (discover)
/// Chosen constants
static unsigned const s_bucketSize = 16; ///< Denoted by k in [Kademlia]. Number of nodes stored in each bucket.
static unsigned const s_alpha = 3; ///< Denoted by \alpha in [Kademlia]. Number of concurrent FindNode requests.
/// Intervals
/* todo: replace boost::posix_time; change constants to upper camelcase */
boost::posix_time::milliseconds const c_evictionCheckInterval = boost::posix_time::milliseconds(75); ///< Interval at which eviction timeouts are checked.
std::chrono::milliseconds const c_reqTimeout = std::chrono::milliseconds(300); ///< How long to wait for requests (evict, find iterations).
std::chrono::seconds const c_bucketRefresh = std::chrono::seconds(3600); ///< Refresh interval prevents bucket from becoming stale. [Kademlia]
struct NodeBucket struct NodeBucket
{ {
unsigned distance; unsigned distance;
@ -176,81 +199,95 @@ protected:
std::list<std::weak_ptr<NodeEntry>> nodes; std::list<std::weak_ptr<NodeEntry>> nodes;
}; };
/// Repeatedly sends s_alpha concurrent requests to nodes nearest to target, for nodes nearest to target, up to s_maxSteps rounds. /// Used to ping endpoint.
void doFindNode(NodeId _node, unsigned _round = 0, std::shared_ptr<std::set<std::shared_ptr<NodeEntry>>> _tried = std::shared_ptr<std::set<std::shared_ptr<NodeEntry>>>());
/// Returns nodes nearest to target.
std::vector<std::shared_ptr<NodeEntry>> findNearest(NodeId _target);
void ping(bi::udp::endpoint _to) const; void ping(bi::udp::endpoint _to) const;
/// Used ping known node. Used by node table when refreshing buckets and as part of eviction process (see evict).
void ping(NodeEntry* _n) const; void ping(NodeEntry* _n) const;
void evict(std::shared_ptr<NodeEntry> _leastSeen, std::shared_ptr<NodeEntry> _new); /// Returns center node entry which describes this node and used with dist() to calculate xor metric for node table nodes.
NodeEntry center() const { return NodeEntry(m_node, m_node.publicKey(), m_node.endpoint.udp); }
/// Used by asynchronous operations to return NodeEntry which is active and managed by node table.
std::shared_ptr<NodeEntry> nodeEntry(NodeId _id);
void noteNode(Public const& _pubk, bi::udp::endpoint const& _endpoint); /// Used to discovery nodes on network which are close to the given target.
/// Sends s_alpha concurrent requests to nodes nearest to target, for nodes nearest to target, up to s_maxSteps rounds.
void discover(NodeId _target, unsigned _round = 0, std::shared_ptr<std::set<std::shared_ptr<NodeEntry>>> _tried = std::shared_ptr<std::set<std::shared_ptr<NodeEntry>>>());
/// Returns nodes from node table which are closest to target.
std::vector<std::shared_ptr<NodeEntry>> nearestNodeEntries(NodeId _target);
void noteNode(std::shared_ptr<NodeEntry> _n); /// Asynchronously drops _leastSeen node if it doesn't reply and adds _new node, otherwise _new node is thrown away.
void evict(std::shared_ptr<NodeEntry> _leastSeen, std::shared_ptr<NodeEntry> _new);
/// Called whenever activity is received from an unknown node in order to maintain node table.
void noteActiveNode(Public const& _pubk, bi::udp::endpoint const& _endpoint);
/// Used to drop node when timeout occurs or when evict() result is to keep previous node.
void dropNode(std::shared_ptr<NodeEntry> _n); void dropNode(std::shared_ptr<NodeEntry> _n);
NodeBucket& bucket(NodeEntry const* _n); /// Returns references to bucket which corresponds to distance of node id.
/// @warning Only use the return reference locked x_state mutex.
// TODO p2p: Remove this method after removing offset-by-one functionality.
NodeBucket& bucket_UNSAFE(NodeEntry const* _n);
/// General Network Events /// General Network Events
/// Called by m_socket when packet is received.
void onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytesConstRef _packet); void onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytesConstRef _packet);
void onDisconnected(UDPSocketFace*) {}; /// Called by m_socket when socket is disconnected.
void onDisconnected(UDPSocketFace*) {}
/// Tasks /// Tasks
/// Called by evict() to ensure eviction check is scheduled to run and terminates when no evictions remain. Asynchronous.
void doCheckEvictions(boost::system::error_code const& _ec); void doCheckEvictions(boost::system::error_code const& _ec);
void doRefreshBuckets(boost::system::error_code const& _ec);
#ifndef BOOST_AUTO_TEST_SUITE /// Purges and pings nodes for any buckets which haven't been touched for c_bucketRefresh seconds.
private: void doRefreshBuckets(boost::system::error_code const& _ec);
#else
protected:
#endif
/// Sends FindNeighbor packet. See doFindNode.
void requestNeighbours(NodeEntry const& _node, NodeId _target) const;
std::unique_ptr<NodeTableEventHandler> m_nodeEventHandler; ///< Event handler for node events. std::unique_ptr<NodeTableEventHandler> m_nodeEventHandler; ///< Event handler for node events.
Node m_node; ///< This node. Node m_node; ///< This node.
Secret m_secret; ///< This nodes secret key. Secret m_secret; ///< This nodes secret key.
mutable Mutex x_nodes; ///< Mutable for thread-safe copy in nodes() const. mutable Mutex x_nodes; ///< LOCK x_state first if both locks are required. Mutable for thread-safe copy in nodes() const.
std::map<NodeId, std::shared_ptr<NodeEntry>> m_nodes; ///< Nodes std::map<NodeId, std::shared_ptr<NodeEntry>> m_nodes; ///< Nodes
mutable Mutex x_state; mutable Mutex x_state; ///< LOCK x_state first if both x_nodes and x_state locks are required.
std::array<NodeBucket, s_bins> m_state; ///< State of p2p node network. std::array<NodeBucket, s_bins> m_state; ///< State of p2p node network.
Mutex x_evictions; Mutex x_evictions; ///< LOCK x_nodes first if both x_nodes and x_evictions locks are required.
std::deque<EvictionTimeout> m_evictions; ///< Eviction timeouts. std::deque<EvictionTimeout> m_evictions; ///< Eviction timeouts.
ba::io_service& m_io; ///< Used by bucket refresh timer. ba::io_service& m_io; ///< Used by bucket refresh timer.
std::shared_ptr<NodeSocket> m_socket; ///< Shared pointer for our UDPSocket; ASIO requires shared_ptr. std::shared_ptr<NodeSocket> m_socket; ///< Shared pointer for our UDPSocket; ASIO requires shared_ptr.
NodeSocket* m_socketPtr; ///< Set to m_socket.get(). NodeSocket* m_socketPointer; ///< Set to m_socket.get(). Socket is created in constructor and disconnected in destructor to ensure access to pointer is safe.
boost::asio::deadline_timer m_bucketRefreshTimer; ///< Timer which schedules and enacts bucket refresh. boost::asio::deadline_timer m_bucketRefreshTimer; ///< Timer which schedules and enacts bucket refresh.
boost::asio::deadline_timer m_evictionCheckTimer; ///< Timer for handling node evictions. boost::asio::deadline_timer m_evictionCheckTimer; ///< Timer for handling node evictions.
}; };
inline std::ostream& operator<<(std::ostream& _out, NodeTable const& _nodeTable) inline std::ostream& operator<<(std::ostream& _out, NodeTable const& _nodeTable)
{ {
_out << _nodeTable.root().address() << "\t" << "0\t" << _nodeTable.root().endpoint.udp.address() << ":" << _nodeTable.root().endpoint.udp.port() << std::endl; _out << _nodeTable.center().address() << "\t" << "0\t" << _nodeTable.center().endpoint.udp.address() << ":" << _nodeTable.center().endpoint.udp.port() << std::endl;
auto s = _nodeTable.state(); auto s = _nodeTable.snapshot();
for (auto n: s) for (auto n: s)
_out << n.address() << "\t" << n.distance << "\t" << n.endpoint.udp.address() << ":" << n.endpoint.udp.port() << std::endl; _out << n.address() << "\t" << n.distance << "\t" << n.endpoint.udp.address() << ":" << n.endpoint.udp.port() << std::endl;
return _out; return _out;
} }
/** /**
* Ping packet: Check if node is alive. * Ping packet: Sent to check if node is alive.
* PingNode is cached and regenerated after expiration - t, where t is timeout. * PingNode is cached and regenerated after expiration - t, where t is timeout.
* *
* Ping is used to implement evict. When a new node is seen for
* a given bucket which is full, the least-responsive node is pinged.
* If the pinged node doesn't respond, then it is removed and the new
* node is inserted.
*
* RLP Encoded Items: 3 * RLP Encoded Items: 3
* Minimum Encoded Size: 18 bytes * Minimum Encoded Size: 18 bytes
* Maximum Encoded Size: bytes // todo after u128 addresses * Maximum Encoded Size: bytes // todo after u128 addresses
@ -260,11 +297,6 @@ inline std::ostream& operator<<(std::ostream& _out, NodeTable const& _nodeTable)
* port: Our port. * port: Our port.
* expiration: Triggers regeneration of packet. May also provide control over synchronization. * expiration: Triggers regeneration of packet. May also provide control over synchronization.
* *
* Ping is used to implement evict. When a new node is seen for
* a given bucket which is full, the least-responsive node is pinged.
* If the pinged node doesn't respond then it is removed and the new
* node is inserted.
*
* @todo uint128_t for ip address (<->integer ipv4/6, asio-address, asio-endpoint) * @todo uint128_t for ip address (<->integer ipv4/6, asio-address, asio-endpoint)
* *
*/ */
@ -285,7 +317,7 @@ struct PingNode: RLPXDatagram<PingNode>
}; };
/** /**
* Pong packet: response to ping * Pong packet: Sent in response to ping
* *
* RLP Encoded Items: 2 * RLP Encoded Items: 2
* Minimum Encoded Size: 33 bytes * Minimum Encoded Size: 33 bytes
@ -365,7 +397,7 @@ struct Neighbours: RLPXDatagram<Neighbours>
} }
static const uint8_t type = 4; static const uint8_t type = 4;
std::list<Node> nodes; std::vector<Node> nodes;
unsigned expiration = 1; unsigned expiration = 1;
void streamRLP(RLPStream& _s) const { _s.appendList(2); _s.appendList(nodes.size()); for (auto& n: nodes) n.streamRLP(_s); _s << expiration; } void streamRLP(RLPStream& _s) const { _s.appendList(2); _s.appendList(nodes.size()); for (auto& n: nodes) n.streamRLP(_s); _s << expiration; }

20
libp2p/Session.cpp

@ -48,7 +48,7 @@ Session::Session(Host* _s, bi::tcp::socket _socket, std::shared_ptr<Peer> const&
Session::~Session() Session::~Session()
{ {
m_peer->lastConnected = m_peer->lastAttempted - chrono::seconds(1); m_peer->m_lastConnected = m_peer->m_lastAttempted - chrono::seconds(1);
// Read-chain finished for one reason or another. // Read-chain finished for one reason or another.
for (auto& i: m_capabilities) for (auto& i: m_capabilities)
@ -75,14 +75,14 @@ void Session::addRating(unsigned _r)
{ {
if (m_peer) if (m_peer)
{ {
m_peer->rating += _r; m_peer->m_rating += _r;
m_peer->score += _r; m_peer->m_score += _r;
} }
} }
int Session::rating() const int Session::rating() const
{ {
return m_peer->rating; return m_peer->m_rating;
} }
template <class T> vector<T> randomSelection(vector<T> const& _t, unsigned _n) template <class T> vector<T> randomSelection(vector<T> const& _t, unsigned _n)
@ -206,7 +206,7 @@ bool Session::interpret(RLP const& _r)
} }
if (m_peer->isOffline()) if (m_peer->isOffline())
m_peer->lastConnected = chrono::system_clock::now(); m_peer->m_lastConnected = chrono::system_clock::now();
if (m_protocolVersion != m_server->protocolVersion()) if (m_protocolVersion != m_server->protocolVersion())
{ {
@ -446,13 +446,13 @@ void Session::drop(DisconnectReason _reason)
if (m_peer) if (m_peer)
{ {
if (_reason != m_peer->lastDisconnect || _reason == NoDisconnect || _reason == ClientQuit || _reason == DisconnectRequested) if (_reason != m_peer->m_lastDisconnect || _reason == NoDisconnect || _reason == ClientQuit || _reason == DisconnectRequested)
m_peer->failedAttempts = 0; m_peer->m_failedAttempts = 0;
m_peer->lastDisconnect = _reason; m_peer->m_lastDisconnect = _reason;
if (_reason == BadProtocol) if (_reason == BadProtocol)
{ {
m_peer->rating /= 2; m_peer->m_rating /= 2;
m_peer->score /= 2; m_peer->m_score /= 2;
} }
} }
m_dropped = true; m_dropped = true;

6
libp2p/UDP.h

@ -64,12 +64,12 @@ struct RLPXDatagramFace: public UDPDatagram
static uint64_t futureFromEpoch(std::chrono::seconds _sec) { return std::chrono::duration_cast<std::chrono::seconds>((std::chrono::system_clock::now() + _sec).time_since_epoch()).count(); } static uint64_t futureFromEpoch(std::chrono::seconds _sec) { return std::chrono::duration_cast<std::chrono::seconds>((std::chrono::system_clock::now() + _sec).time_since_epoch()).count(); }
static Public authenticate(bytesConstRef _sig, bytesConstRef _rlp); static Public authenticate(bytesConstRef _sig, bytesConstRef _rlp);
virtual uint8_t packetType() =0; virtual uint8_t packetType() = 0;
RLPXDatagramFace(bi::udp::endpoint const& _ep): UDPDatagram(_ep) {} RLPXDatagramFace(bi::udp::endpoint const& _ep): UDPDatagram(_ep) {}
virtual h256 sign(Secret const& _from); virtual h256 sign(Secret const& _from);
virtual void streamRLP(RLPStream&) const =0; virtual void streamRLP(RLPStream&) const = 0;
virtual void interpretRLP(bytesConstRef _bytes) =0; virtual void interpretRLP(bytesConstRef _bytes) = 0;
}; };
template <class T> template <class T>

2
libwebthree/WebThree.cpp

@ -77,7 +77,7 @@ void WebThreeDirect::setNetworkPreferences(p2p::NetworkPreferences const& _n)
std::vector<PeerSessionInfo> WebThreeDirect::peers() std::vector<PeerSessionInfo> WebThreeDirect::peers()
{ {
return m_net.peers(); return m_net.peerSessionInfo();
} }
size_t WebThreeDirect::peerCount() const size_t WebThreeDirect::peerCount() const

2
libwebthree/WebThree.h

@ -145,7 +145,7 @@ public:
p2p::NodeId id() const override { return m_net.id(); } p2p::NodeId id() const override { return m_net.id(); }
/// Gets the nodes. /// Gets the nodes.
p2p::Peers nodes() const override { return m_net.nodes(); } p2p::Peers nodes() const override { return m_net.getPeers(); }
/// Start the network subsystem. /// Start the network subsystem.
void startNetwork() override { m_net.start(); } void startNetwork() override { m_net.start(); }

8
test/net.cpp

@ -87,7 +87,7 @@ struct TestNodeTable: public NodeTable
bi::address ourIp = bi::address::from_string("127.0.0.1"); bi::address ourIp = bi::address::from_string("127.0.0.1");
for (auto& n: _testNodes) for (auto& n: _testNodes)
if (_count--) if (_count--)
noteNode(n.first.pub(), bi::udp::endpoint(ourIp, n.second)); noteActiveNode(n.first.pub(), bi::udp::endpoint(ourIp, n.second));
else else
break; break;
} }
@ -182,7 +182,7 @@ BOOST_AUTO_TEST_CASE(kademlia)
// Not yet a 'real' test. // Not yet a 'real' test.
TestNodeTableHost node(8); TestNodeTableHost node(8);
node.start(); node.start();
node.nodeTable->join(); // ideally, joining with empty node table logs warning we can check for node.nodeTable->discover(); // ideally, joining with empty node table logs warning we can check for
node.setup(); node.setup();
node.populate(); node.populate();
clog << "NodeTable:\n" << *node.nodeTable.get() << endl; clog << "NodeTable:\n" << *node.nodeTable.get() << endl;
@ -199,11 +199,11 @@ BOOST_AUTO_TEST_CASE(kademlia)
node.populate(1); node.populate(1);
clog << "NodeTable:\n" << *node.nodeTable.get() << endl; clog << "NodeTable:\n" << *node.nodeTable.get() << endl;
node.nodeTable->join(); node.nodeTable->discover();
this_thread::sleep_for(chrono::milliseconds(2000)); this_thread::sleep_for(chrono::milliseconds(2000));
clog << "NodeTable:\n" << *node.nodeTable.get() << endl; clog << "NodeTable:\n" << *node.nodeTable.get() << endl;
BOOST_REQUIRE_EQUAL(node.nodeTable->size(), 8); BOOST_REQUIRE_EQUAL(node.nodeTable->count(), 8);
auto netNodes = node.nodeTable->nodes(); auto netNodes = node.nodeTable->nodes();
netNodes.sort(); netNodes.sort();

22
test/whisperTopic.cpp

@ -111,7 +111,7 @@ BOOST_AUTO_TEST_CASE(forwarding)
setThreadName("listener"); setThreadName("listener");
// Host must be configured not to share peers. // Host must be configured not to share peers.
Host ph("Listner", NetworkPreferences(50303, "", false, true)); Host ph("Listner", NetworkPreferences(30303, "", false, true));
ph.setIdealPeerCount(0); ph.setIdealPeerCount(0);
auto wh = ph.registerCapability(new WhisperHost()); auto wh = ph.registerCapability(new WhisperHost());
ph.start(); ph.start();
@ -145,7 +145,7 @@ BOOST_AUTO_TEST_CASE(forwarding)
this_thread::sleep_for(chrono::milliseconds(50)); this_thread::sleep_for(chrono::milliseconds(50));
// Host must be configured not to share peers. // Host must be configured not to share peers.
Host ph("Forwarder", NetworkPreferences(50305, "", false, true)); Host ph("Forwarder", NetworkPreferences(30305, "", false, true));
ph.setIdealPeerCount(0); ph.setIdealPeerCount(0);
auto wh = ph.registerCapability(new WhisperHost()); auto wh = ph.registerCapability(new WhisperHost());
this_thread::sleep_for(chrono::milliseconds(500)); this_thread::sleep_for(chrono::milliseconds(500));
@ -153,7 +153,7 @@ BOOST_AUTO_TEST_CASE(forwarding)
fwderid = ph.id(); fwderid = ph.id();
this_thread::sleep_for(chrono::milliseconds(500)); this_thread::sleep_for(chrono::milliseconds(500));
ph.addNode(phid, "127.0.0.1", 50303, 50303); ph.addNode(phid, "127.0.0.1", 30303, 30303);
startedForwarder = true; startedForwarder = true;
@ -174,13 +174,13 @@ BOOST_AUTO_TEST_CASE(forwarding)
while (!startedForwarder) while (!startedForwarder)
this_thread::sleep_for(chrono::milliseconds(50)); this_thread::sleep_for(chrono::milliseconds(50));
Host ph("Sender", NetworkPreferences(50300, "", false, true)); Host ph("Sender", NetworkPreferences(30300, "", false, true));
ph.setIdealPeerCount(0); ph.setIdealPeerCount(0);
shared_ptr<WhisperHost> wh = ph.registerCapability(new WhisperHost()); shared_ptr<WhisperHost> wh = ph.registerCapability(new WhisperHost());
this_thread::sleep_for(chrono::milliseconds(500)); this_thread::sleep_for(chrono::milliseconds(500));
ph.start(); ph.start();
this_thread::sleep_for(chrono::milliseconds(500)); this_thread::sleep_for(chrono::milliseconds(500));
ph.addNode(fwderid, "127.0.0.1", 50305, 50305); ph.addNode(fwderid, "127.0.0.1", 30305, 30305);
KeyPair us = KeyPair::create(); KeyPair us = KeyPair::create();
wh->post(us.sec(), RLPStream().append(1).out(), BuildTopic("test")); wh->post(us.sec(), RLPStream().append(1).out(), BuildTopic("test"));
@ -210,14 +210,14 @@ BOOST_AUTO_TEST_CASE(asyncforwarding)
setThreadName("forwarder"); setThreadName("forwarder");
// Host must be configured not to share peers. // Host must be configured not to share peers.
Host ph("Forwarder", NetworkPreferences(50305, "", false, true)); Host ph("Forwarder", NetworkPreferences(30305, "", false, true));
ph.setIdealPeerCount(0); ph.setIdealPeerCount(0);
auto wh = ph.registerCapability(new WhisperHost()); auto wh = ph.registerCapability(new WhisperHost());
this_thread::sleep_for(chrono::milliseconds(500)); this_thread::sleep_for(chrono::milliseconds(500));
ph.start(); ph.start();
this_thread::sleep_for(chrono::milliseconds(500)); this_thread::sleep_for(chrono::milliseconds(500));
// ph.addNode("127.0.0.1", 50303, 50303); // ph.addNode("127.0.0.1", 30303, 30303);
startedForwarder = true; startedForwarder = true;
@ -239,13 +239,13 @@ BOOST_AUTO_TEST_CASE(asyncforwarding)
this_thread::sleep_for(chrono::milliseconds(50)); this_thread::sleep_for(chrono::milliseconds(50));
{ {
Host ph("Sender", NetworkPreferences(50300, "", false, true)); Host ph("Sender", NetworkPreferences(30300, "", false, true));
ph.setIdealPeerCount(0); ph.setIdealPeerCount(0);
shared_ptr<WhisperHost> wh = ph.registerCapability(new WhisperHost()); shared_ptr<WhisperHost> wh = ph.registerCapability(new WhisperHost());
this_thread::sleep_for(chrono::milliseconds(500)); this_thread::sleep_for(chrono::milliseconds(500));
ph.start(); ph.start();
this_thread::sleep_for(chrono::milliseconds(500)); this_thread::sleep_for(chrono::milliseconds(500));
// ph.addNode("127.0.0.1", 50305, 50305); // ph.addNode("127.0.0.1", 30305, 30305);
KeyPair us = KeyPair::create(); KeyPair us = KeyPair::create();
wh->post(us.sec(), RLPStream().append(1).out(), BuildTopic("test")); wh->post(us.sec(), RLPStream().append(1).out(), BuildTopic("test"));
@ -253,13 +253,13 @@ BOOST_AUTO_TEST_CASE(asyncforwarding)
} }
{ {
Host ph("Listener", NetworkPreferences(50300, "", false, true)); Host ph("Listener", NetworkPreferences(30300, "", false, true));
ph.setIdealPeerCount(0); ph.setIdealPeerCount(0);
shared_ptr<WhisperHost> wh = ph.registerCapability(new WhisperHost()); shared_ptr<WhisperHost> wh = ph.registerCapability(new WhisperHost());
this_thread::sleep_for(chrono::milliseconds(500)); this_thread::sleep_for(chrono::milliseconds(500));
ph.start(); ph.start();
this_thread::sleep_for(chrono::milliseconds(500)); this_thread::sleep_for(chrono::milliseconds(500));
// ph.addNode("127.0.0.1", 50305, 50305); // ph.addNode("127.0.0.1", 30305, 30305);
/// Only interested in odd packets /// Only interested in odd packets
auto w = wh->installWatch(BuildTopicMask("test")); auto w = wh->installWatch(BuildTopicMask("test"));

Loading…
Cancel
Save