Browse Source

Remove unused code paths with confidence. Rename key/identity to alias. Inherit Peer from Node and update Host/Session to use Node::endpoint instead of previous Peer::address.

cl-refactor
subtly 10 years ago
parent
commit
d7e3065f97
  1. 2
      alethzero/MainWin.cpp
  2. 47
      libp2p/Common.h
  3. 89
      libp2p/Host.cpp
  4. 110
      libp2p/Host.h
  5. 40
      libp2p/NodeTable.h
  6. 17
      libp2p/Session.cpp
  7. 6
      libp2p/Session.h
  8. 4
      libwebthree/WebThree.h

2
alethzero/MainWin.cpp

@ -942,7 +942,7 @@ void Main::refreshNetwork()
.arg(QString::fromStdString(i.id.abridged()))); .arg(QString::fromStdString(i.id.abridged())));
auto ns = web3()->nodes(); auto ns = web3()->nodes();
for (p2p::PeerInfo const& i: ns) for (p2p::Peer const& i: ns)
ui->nodes->insertItem(clients.count(i.id) ? 0 : ui->nodes->count(), QString("[%1 %3] %2 - ( =%5s | /%4s%6 ) - *%7 $%8") ui->nodes->insertItem(clients.count(i.id) ? 0 : ui->nodes->count(), QString("[%1 %3] %2 - ( =%5s | /%4s%6 ) - *%7 $%8")
.arg(QString::fromStdString(i.id.abridged())) .arg(QString::fromStdString(i.id.abridged()))
.arg(QString::fromStdString(toString(i.address))) .arg(QString::fromStdString(toString(i.address)))

47
libp2p/Common.h

@ -16,9 +16,10 @@
*/ */
/** @file Common.h /** @file Common.h
* @author Gav Wood <i@gavwood.com> * @author Gav Wood <i@gavwood.com>
* @author Alex Leverington <nessence@gmail.com>
* @date 2014 * @date 2014
* *
* Miscellanea required for the Host/Session classes. * Miscellanea required for the Host/Session/NodeTable classes.
*/ */
#pragma once #pragma once
@ -29,9 +30,8 @@
#include <boost/asio.hpp> #include <boost/asio.hpp>
#include <boost/asio/ip/tcp.hpp> #include <boost/asio/ip/tcp.hpp>
#include <chrono> #include <chrono>
#include <libdevcore/Common.h> #include <libdevcrypto/Common.h>
#include <libdevcore/Log.h> #include <libdevcore/Log.h>
#include <libdevcore/FixedHash.h>
namespace ba = boost::asio; namespace ba = boost::asio;
namespace bi = boost::asio::ip; namespace bi = boost::asio::ip;
@ -116,6 +116,11 @@ typedef std::pair<std::string, u256> CapDesc;
typedef std::set<CapDesc> CapDescSet; typedef std::set<CapDesc> CapDescSet;
typedef std::vector<CapDesc> CapDescs; typedef std::vector<CapDesc> CapDescs;
/*
* Used by Host to pass negotiated information about a connection to a
* new Peer Session; PeerSessionInfo is then maintained by Session and can
* be queried for point-in-time status information via Host.
*/
struct PeerSessionInfo struct PeerSessionInfo
{ {
NodeId id; NodeId id;
@ -130,5 +135,41 @@ struct PeerSessionInfo
using PeerSessionInfos = std::vector<PeerSessionInfo>; using PeerSessionInfos = std::vector<PeerSessionInfo>;
/**
* @brief IPv4,UDP/TCP endpoints.
*/
struct NodeIPEndpoint
{
NodeIPEndpoint(): udp(bi::udp::endpoint()), tcp(bi::tcp::endpoint()) {}
NodeIPEndpoint(bi::udp::endpoint _udp): udp(_udp) {}
NodeIPEndpoint(bi::tcp::endpoint _tcp): tcp(_tcp) {}
NodeIPEndpoint(bi::udp::endpoint _udp, bi::tcp::endpoint _tcp): udp(_udp), tcp(_tcp) {}
bi::udp::endpoint udp;
bi::tcp::endpoint tcp;
operator bool() const { return udp.address().is_unspecified() && tcp.address().is_unspecified(); }
};
struct Node
{
Node(): endpoint(NodeIPEndpoint()) {};
Node(Public _pubk, NodeIPEndpoint _ip, bool _required = false): id(_pubk), endpoint(_ip), required(_required) {}
Node(Public _pubk, bi::udp::endpoint _udp, bool _required = false): Node(_pubk, NodeIPEndpoint(_udp), _required) {}
virtual NodeId const& address() const { return id; }
virtual Public const& publicKey() const { return id; }
NodeId id;
/// Endpoints by which we expect to reach node.
NodeIPEndpoint endpoint;
/// If true, node will not be removed from Node list.
bool required = false;
operator bool() const { return (bool)id; }
};
} }
} }

89
libp2p/Host.cpp

@ -52,7 +52,7 @@ Host::Host(std::string const& _clientVersion, NetworkPreferences const& _n, bool
m_ifAddresses(Network::getInterfaceAddresses()), m_ifAddresses(Network::getInterfaceAddresses()),
m_ioService(2), m_ioService(2),
m_tcp4Acceptor(m_ioService), m_tcp4Acceptor(m_ioService),
m_key(move(getHostIdentifier())) m_alias(move(getHostIdentifier()))
{ {
for (auto address: m_ifAddresses) for (auto address: m_ifAddresses)
if (address.is_v4()) if (address.is_v4())
@ -186,12 +186,20 @@ void Host::onNodeTableEvent(NodeId _n, NodeTableEventType _e)
auto p = m_peers[_n]; auto p = m_peers[_n];
if (!p) if (!p)
{ {
m_peers[_n] = make_shared<PeerInfo>(); m_peers[_n] = make_shared<Peer>();
p = m_peers[_n]; p = m_peers[_n];
p->id = _n; p->id = _n;
} }
p->address = n.endpoint.tcp; p->endpoint.tcp = n.endpoint.tcp;
// TODO: Implement similar to doFindNode. Attempt connecting to nodes
// until ideal peer count is reached; if all nodes are tried,
// repeat. Notably, this is an integrated process such that
// when onNodeTableEvent occurs we should also update +/-
// the list of nodes to be tried. Thus:
// 1) externalize connection attempts
// 2) attempt copies potentialPeer list
// 3) replace this logic w/maintenance of potentialPeers
if (peerCount() < m_idealPeerCount) if (peerCount() < m_idealPeerCount)
connect(p); connect(p);
} }
@ -334,14 +342,14 @@ void Host::doHandshake(bi::tcp::socket* _socket, NodeId _nodeId)
clog(NetConnect) << "Accepting connection for " << _socket->remote_endpoint(); clog(NetConnect) << "Accepting connection for " << _socket->remote_endpoint();
} catch (...){} } catch (...){}
shared_ptr<PeerInfo> p; shared_ptr<Peer> p;
if (_nodeId) if (_nodeId)
p = m_peers[_nodeId]; p = m_peers[_nodeId];
if (!p) if (!p)
{ {
p = make_shared<PeerInfo>(); p = make_shared<Peer>();
p->address.address(_socket->remote_endpoint().address()); p->endpoint.tcp.address(_socket->remote_endpoint().address());
} }
auto ps = std::make_shared<Session>(this, std::move(*_socket), p); auto ps = std::make_shared<Session>(this, std::move(*_socket), p);
@ -357,6 +365,8 @@ string Host::pocHost()
void Host::addNode(NodeId const& _node, std::string const& _addr, unsigned short _tcpPeerPort, unsigned short _udpNodePort) void Host::addNode(NodeId const& _node, std::string const& _addr, unsigned short _tcpPeerPort, unsigned short _udpNodePort)
{ {
if (_tcpPeerPort < 30300 || _tcpPeerPort > 30305) if (_tcpPeerPort < 30300 || _tcpPeerPort > 30305)
cwarn << "Non-standard port being recorded: " << _tcpPeerPort; cwarn << "Non-standard port being recorded: " << _tcpPeerPort;
@ -383,7 +393,7 @@ void Host::addNode(NodeId const& _node, std::string const& _addr, unsigned short
addNode(Node(_node, NodeIPEndpoint(bi::udp::endpoint(addr, _udpNodePort), bi::tcp::endpoint(addr, _tcpPeerPort)))); addNode(Node(_node, NodeIPEndpoint(bi::udp::endpoint(addr, _udpNodePort), bi::tcp::endpoint(addr, _tcpPeerPort))));
} }
void Host::connect(std::shared_ptr<PeerInfo> const& _n) void Host::connect(std::shared_ptr<Peer> const& _n)
{ {
if (!m_run) if (!m_run)
return; return;
@ -401,7 +411,7 @@ void Host::connect(std::shared_ptr<PeerInfo> const& _n)
} }
// prevent concurrently connecting to a node // prevent concurrently connecting to a node
PeerInfo *nptr = _n.get(); Peer *nptr = _n.get();
{ {
Guard l(x_pendingNodeConns); Guard l(x_pendingNodeConns);
if (m_pendingNodeConns.count(nptr)) if (m_pendingNodeConns.count(nptr))
@ -409,22 +419,19 @@ void Host::connect(std::shared_ptr<PeerInfo> const& _n)
m_pendingNodeConns.insert(nptr); m_pendingNodeConns.insert(nptr);
} }
clog(NetConnect) << "Attempting connection to node" << _n->id.abridged() << "@" << _n->address << "from" << id().abridged(); clog(NetConnect) << "Attempting connection to node" << _n->id.abridged() << "@" << _n->peerEndpoint() << "from" << id().abridged();
_n->lastAttempted = std::chrono::system_clock::now();
_n->failedAttempts++;
bi::tcp::socket* s = new bi::tcp::socket(m_ioService); bi::tcp::socket* s = new bi::tcp::socket(m_ioService);
s->async_connect(_n->address, [=](boost::system::error_code const& ec) s->async_connect(_n->peerEndpoint(), [=](boost::system::error_code const& ec)
{ {
if (ec) if (ec)
{ {
clog(NetConnect) << "Connection refused to node" << _n->id.abridged() << "@" << _n->address << "(" << ec.message() << ")"; clog(NetConnect) << "Connection refused to node" << _n->id.abridged() << "@" << _n->peerEndpoint() << "(" << ec.message() << ")";
_n->lastDisconnect = TCPError; _n->lastDisconnect = TCPError;
_n->lastAttempted = std::chrono::system_clock::now(); _n->lastAttempted = std::chrono::system_clock::now();
} }
else else
{ {
clog(NetConnect) << "Connected to" << _n->id.abridged() << "@" << _n->address; clog(NetConnect) << "Connected to" << _n->id.abridged() << "@" << _n->peerEndpoint();
_n->lastConnected = std::chrono::system_clock::now(); _n->lastConnected = std::chrono::system_clock::now();
auto ps = make_shared<Session>(this, std::move(*s), _n); auto ps = make_shared<Session>(this, std::move(*s), _n);
@ -437,31 +444,6 @@ void Host::connect(std::shared_ptr<PeerInfo> const& _n)
}); });
} }
unsigned PeerInfo::fallbackSeconds() const
{
switch (lastDisconnect)
{
case BadProtocol:
return 30 * (failedAttempts + 1);
case UselessPeer:
case TooManyPeers:
case ClientQuit:
return 15 * (failedAttempts + 1);
case NoDisconnect:
return 0;
default:
if (failedAttempts < 5)
return failedAttempts * 5;
else if (failedAttempts < 15)
return 25 + (failedAttempts - 5) * 10;
else
return 25 + 100 + (failedAttempts - 15) * 20;
}
}
// TODO: P2P rebuild nodetable when localNetworking is enabled/disabled
// TODO: P2P implement 'maintainPeers' & evaluate reputation instead of availability. schedule via deadline timer.
PeerSessionInfos Host::peers() const PeerSessionInfos Host::peers() const
{ {
if (!m_run) if (!m_run)
@ -538,11 +520,13 @@ void Host::startedWorking()
if (!m_tcpPublic.address().is_unspecified()) if (!m_tcpPublic.address().is_unspecified())
// TODO: add m_tcpPublic endpoint; sort out endpoint stuff for nodetable // TODO: add m_tcpPublic endpoint; sort out endpoint stuff for nodetable
m_nodeTable.reset(new NodeTable(m_ioService, m_key, m_listenPort)); m_nodeTable.reset(new NodeTable(m_ioService, m_alias, m_listenPort));
else else
m_nodeTable.reset(new NodeTable(m_ioService, m_key, m_listenPort > 0 ? m_listenPort : 30303)); m_nodeTable.reset(new NodeTable(m_ioService, m_alias, m_listenPort > 0 ? m_listenPort : 30303));
m_nodeTable->setEventHandler(new HostNodeTableHandler(*this)); m_nodeTable->setEventHandler(new HostNodeTableHandler(*this));
} }
else
clog(NetNote) << "p2p.start.notice id:" << id().abridged() << "Invalid listen-port. Node Table Disabled.";
clog(NetNote) << "p2p.started id:" << id().abridged(); clog(NetNote) << "p2p.started id:" << id().abridged();
@ -578,16 +562,16 @@ bytes Host::saveNodes() const
RecursiveGuard l(x_sessions); RecursiveGuard l(x_sessions);
for (auto const& i: m_peers) for (auto const& i: m_peers)
{ {
PeerInfo const& n = *(i.second); Peer const& n = *(i.second);
// TODO: PoC-7: Figure out why it ever shares these ports.//n.address.port() >= 30300 && n.address.port() <= 30305 && // TODO: PoC-7: Figure out why it ever shares these ports.//n.address.port() >= 30300 && n.address.port() <= 30305 &&
if (chrono::system_clock::now() - n.lastConnected < chrono::seconds(3600 * 48) && n.address.port() > 0 && n.address.port() < /*49152*/32768 && n.id != id() && !isPrivateAddress(n.address.address())) if (chrono::system_clock::now() - n.lastConnected < chrono::seconds(3600 * 48) && n.peerEndpoint().port() > 0 && n.peerEndpoint().port() < /*49152*/32768 && n.id != id() && !isPrivateAddress(n.peerEndpoint().address()))
{ {
nodes.appendList(10); nodes.appendList(10);
if (n.address.address().is_v4()) if (n.peerEndpoint().address().is_v4())
nodes << n.address.address().to_v4().to_bytes(); nodes << n.peerEndpoint().address().to_v4().to_bytes();
else else
nodes << n.address.address().to_v6().to_bytes(); nodes << n.peerEndpoint().address().to_v6().to_bytes();
nodes << n.address.port() << n.id /* << (int)n.idOrigin */ << 0 nodes << n.peerEndpoint().port() << n.id /* << (int)n.idOrigin */ << 0
<< chrono::duration_cast<chrono::seconds>(n.lastConnected.time_since_epoch()).count() << chrono::duration_cast<chrono::seconds>(n.lastConnected.time_since_epoch()).count()
<< chrono::duration_cast<chrono::seconds>(n.lastAttempted.time_since_epoch()).count() << chrono::duration_cast<chrono::seconds>(n.lastAttempted.time_since_epoch()).count()
<< n.failedAttempts << (unsigned)n.lastDisconnect << n.score << n.rating; << n.failedAttempts << (unsigned)n.lastDisconnect << n.score << n.rating;
@ -596,11 +580,12 @@ bytes Host::saveNodes() const
} }
} }
RLPStream ret(3); RLPStream ret(3);
ret << 0 << m_key.secret(); ret << 0 << m_alias.secret();
ret.appendList(count).appendRaw(nodes.out(), count); ret.appendList(count).appendRaw(nodes.out(), count);
return ret.out(); return ret.out();
} }
// TODO: p2p Import-ant
void Host::restoreNodes(bytesConstRef _b) void Host::restoreNodes(bytesConstRef _b)
{ {
RecursiveGuard l(x_sessions); RecursiveGuard l(x_sessions);
@ -610,7 +595,7 @@ void Host::restoreNodes(bytesConstRef _b)
{ {
case 0: case 0:
{ {
m_key = KeyPair(r[1].toHash<Secret>()); m_alias = KeyPair(r[1].toHash<Secret>());
// noteNode(id(), m_tcpPublic); // noteNode(id(), m_tcpPublic);
for (auto i: r[2]) for (auto i: r[2])
@ -623,7 +608,7 @@ void Host::restoreNodes(bytesConstRef _b)
auto id = (NodeId)i[2]; auto id = (NodeId)i[2];
if (!m_peers.count(id)) if (!m_peers.count(id))
{ {
//// auto o = (Origin)i[3].toInt<int>(); // TODO: p2p Important :)
// auto n = noteNode(id, ep); // auto n = noteNode(id, ep);
// n->lastConnected = chrono::system_clock::time_point(chrono::seconds(i[4].toInt<unsigned>())); // n->lastConnected = chrono::system_clock::time_point(chrono::seconds(i[4].toInt<unsigned>()));
// n->lastAttempted = chrono::system_clock::time_point(chrono::seconds(i[5].toInt<unsigned>())); // n->lastAttempted = chrono::system_clock::time_point(chrono::seconds(i[5].toInt<unsigned>()));

110
libp2p/Host.h

@ -51,58 +51,50 @@ namespace p2p
class Host; class Host;
struct PeerInfo /**
* @brief Representation of connectivity state and all other pertinent Peer metadata.
* A Peer represents connectivity between two nodes, which in this case, are the host
* and remote nodes.
*
* State information necessary for loading network topology is maintained by NodeTable.
*
* @todo Implement 'bool required'
* @todo reputation: Move score, rating to capability-specific map (&& remove friend class)
* @todo reputation: implement via origin-tagged events
* @todo Populate metadata upon construction; save when destroyed.
* @todo Metadata for peers needs to be handled via a storage backend.
* Specifically, peers can be utilized in a variety of
* many-to-many relationships while also needing to modify shared instances of
* those peers. Modifying these properties via a storage backend alleviates
* Host of the responsibility. (&& remove save/restoreNodes)
* @todo reimplement recording of historical session information on per-transport basis
* @todo rebuild nodetable when localNetworking is enabled/disabled
*/
class Peer: public Node
{ {
NodeId id; ///< Their id/public key. friend class Session; /// Allows Session to update score and rating.
friend class Host; /// For Host: saveNodes(), restoreNodes()
public:
bool isOffline() const { return !m_session.lock(); }
// p2p: move to NodeIPEndpoint bi::tcp::endpoint const& peerEndpoint() const { return endpoint.tcp; }
bi::tcp::endpoint address; ///< As reported from the node itself.
// p2p: This information is relevant to the network-stack, ex: firewall, rather than node itself protected:
std::chrono::system_clock::time_point lastConnected;
std::chrono::system_clock::time_point lastAttempted; /// Used by isOffline() and (todo) for peer to emit session information.
unsigned failedAttempts = 0; std::weak_ptr<Session> m_session;
DisconnectReason lastDisconnect = NoDisconnect; ///< Reason for disconnect that happened last.
// p2p: move to protocol-specific map
int score = 0; ///< All time cumulative. int score = 0; ///< All time cumulative.
int rating = 0; ///< Trending. int rating = 0; ///< Trending.
// p2p: move to NodeIPEndpoint /// Network Availability
int secondsSinceLastConnected() const { return lastConnected == std::chrono::system_clock::time_point() ? -1 : (int)std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now() - lastConnected).count(); }
// p2p: move to NodeIPEndpoint
int secondsSinceLastAttempted() const { return lastAttempted == std::chrono::system_clock::time_point() ? -1 : (int)std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now() - lastAttempted).count(); }
// p2p: move to NodeIPEndpoint
unsigned fallbackSeconds() const;
// p2p: move to NodeIPEndpoint
bool shouldReconnect() const { return std::chrono::system_clock::now() > lastAttempted + std::chrono::seconds(fallbackSeconds()); }
// p2p: This has two meanings now. It's possible UDP works but TPC is down or vice-versa.
bool isOffline() const { return lastAttempted > lastConnected; }
// p2p: Remove (in favor of lr-seen eviction and ratings).
bool operator<(PeerInfo const& _n) const
{
if (isOffline() != _n.isOffline())
return isOffline();
else if (isOffline())
if (lastAttempted == _n.lastAttempted)
return failedAttempts < _n.failedAttempts;
else
return lastAttempted < _n.lastAttempted;
else
if (score == _n.score)
if (rating == _n.rating)
return failedAttempts < _n.failedAttempts;
else
return rating < _n.rating;
else
return score < _n.score;
}
};
using Nodes = std::vector<PeerInfo>; std::chrono::system_clock::time_point lastConnected;
std::chrono::system_clock::time_point lastAttempted;
unsigned failedAttempts = 0;
DisconnectReason lastDisconnect = NoDisconnect; ///< Reason for disconnect that happened last.
};
using Peers = std::vector<Peer>;
class HostNodeTableHandler: public NodeTableEventHandler class HostNodeTableHandler: public NodeTableEventHandler
@ -116,18 +108,18 @@ class HostNodeTableHandler: public NodeTableEventHandler
/** /**
* @brief The Host class * @brief The Host class
* Capabilities should be registered prior to startNetwork, since m_capabilities is not thread-safe. * Capabilities should be registered prior to startNetwork, since m_capabilities is not thread-safe.
* @todo gracefully disconnect peer if peer already connected *
* @todo onNodeTableEvent: move peer-connection logic into ensurePeers
* @todo handshake: gracefully disconnect peer if peer already connected
* @todo determinePublic: ipv6, udp * @todo determinePublic: ipv6, udp
* @todo handle conflict if addNode/requireNode called and Node already exists w/conflicting tcp or udp port * @todo handle conflict if addNode/requireNode called and Node already exists w/conflicting tcp or udp port
* @todo write host identifier to disk along w/nodes * @todo write host identifier to disk w/nodes
* @todo move Session::addRating into Host and implement via sender-tagged events
*/ */
class Host: public Worker class Host: public Worker
{ {
friend class HostNodeTableHandler; friend class HostNodeTableHandler;
friend class Session; friend class Session;
friend class HostCapabilityFace; friend class HostCapabilityFace;
friend struct PeerInfo;
public: public:
/// Start server, listening for connections on the given port. /// Start server, listening for connections on the given port.
@ -163,6 +155,9 @@ public:
/// Get number of peers connected; equivalent to, but faster than, peers().size(). /// Get number of peers connected; equivalent to, but faster than, peers().size().
size_t peerCount() const { RecursiveGuard l(x_sessions); return m_peers.size(); } size_t peerCount() const { RecursiveGuard l(x_sessions); return m_peers.size(); }
/// Get the address we're listening on currently.
std::string listenAddress() const { return m_tcpPublic.address().to_string(); }
/// Get the port we're listening on currently. /// Get the port we're listening on currently.
unsigned short listenPort() const { return m_tcpPublic.port(); } unsigned short listenPort() const { return m_tcpPublic.port(); }
@ -173,7 +168,7 @@ public:
void restoreNodes(bytesConstRef _b); void restoreNodes(bytesConstRef _b);
// TODO: P2P this should be combined with peers into a HostStat object of some kind; coalesce data, as it's only used for status information. // TODO: P2P this should be combined with peers into a HostStat object of some kind; coalesce data, as it's only used for status information.
Nodes nodes() const { RecursiveGuard l(x_sessions); Nodes ret; for (auto const& i: m_peers) ret.push_back(*i.second); return ret; } Peers nodes() const { RecursiveGuard l(x_sessions); Peers ret; for (auto const& i: m_peers) ret.push_back(*i.second); return ret; }
void setNetworkPreferences(NetworkPreferences const& _p) { auto had = isStarted(); if (had) stop(); m_netPrefs = _p; if (had) start(); } void setNetworkPreferences(NetworkPreferences const& _p) { auto had = isStarted(); if (had) stop(); m_netPrefs = _p; if (had) start(); }
@ -187,12 +182,10 @@ public:
/// @returns if network is running. /// @returns if network is running.
bool isStarted() const { return m_run; } bool isStarted() const { return m_run; }
NodeId id() const { return m_key.pub(); } NodeId id() const { return m_alias.pub(); }
void registerPeer(std::shared_ptr<Session> _s, CapDescs const& _caps); void registerPeer(std::shared_ptr<Session> _s, CapDescs const& _caps);
// std::shared_ptr<PeerInfo> node(NodeId _id) const { if (m_nodes.count(_id)) return m_nodes.at(_id); return std::shared_ptr<PeerInfo>(); }
protected: protected:
void onNodeTableEvent(NodeId _n, NodeTableEventType _e); void onNodeTableEvent(NodeId _n, NodeTableEventType _e);
@ -200,7 +193,7 @@ private:
/// Populate m_peerAddresses with available public addresses. /// Populate m_peerAddresses with available public addresses.
void determinePublic(std::string const& _publicAddress, bool _upnp); void determinePublic(std::string const& _publicAddress, bool _upnp);
void connect(std::shared_ptr<PeerInfo> const& _n); void connect(std::shared_ptr<Peer> const& _n);
/// Ping the peers to update the latency information and disconnect peers which have timed out. /// Ping the peers to update the latency information and disconnect peers which have timed out.
void keepAlivePeers(); void keepAlivePeers();
@ -225,7 +218,7 @@ private:
virtual void doneWorking(); virtual void doneWorking();
/// Add node /// Add node
void addNode(Node const& _node) { m_nodeTable->addNode(_node); } void addNode(Node const& _node) { if (m_nodeTable) m_nodeTable->addNode(_node); }
/// Get or create host identifier (KeyPair). /// Get or create host identifier (KeyPair).
KeyPair getHostIdentifier(); KeyPair getHostIdentifier();
@ -248,16 +241,17 @@ private:
std::unique_ptr<boost::asio::deadline_timer> m_timer; ///< Timer which, when network is running, calls scheduler() every c_timerInterval ms. std::unique_ptr<boost::asio::deadline_timer> m_timer; ///< Timer which, when network is running, calls scheduler() every c_timerInterval ms.
static const unsigned c_timerInterval = 100; ///< Interval which m_timer is run when network is connected. static const unsigned c_timerInterval = 100; ///< Interval which m_timer is run when network is connected.
std::set<PeerInfo*> m_pendingNodeConns; /// Used only by connect(PeerInfo&) to limit concurrently connecting to same node. See connect(shared_ptr<PeerInfo>const&). std::set<Peer*> m_pendingNodeConns; /// Used only by connect(Peer&) to limit concurrently connecting to same node. See connect(shared_ptr<Peer>const&).
Mutex x_pendingNodeConns; Mutex x_pendingNodeConns;
bi::tcp::endpoint m_tcpPublic; ///< Our public listening endpoint. bi::tcp::endpoint m_tcpPublic; ///< Our public listening endpoint.
KeyPair m_key; ///< Our unique ID. KeyPair m_alias; ///< Alias for network communication. Network address is k*G. k is key material. TODO: Replace KeyPair.
std::shared_ptr<NodeTable> m_nodeTable; ///< Node table (uses kademlia-like discovery). std::shared_ptr<NodeTable> m_nodeTable; ///< Node table (uses kademlia-like discovery).
std::map<NodeId, std::shared_ptr<PeerInfo>> m_peers; /// Shared storage of Peer objects. Peers are created or destroyed on demand by the Host. Active sessions maintain a shared_ptr to a Peer;
std::map<NodeId, std::shared_ptr<Peer>> m_peers;
/// The nodes to which we are currently connected. /// The nodes to which we are currently connected. Used by host to service peer requests and keepAlivePeers and for shutdown. (see run())
/// Mutable because we flush zombie entries (null-weakptrs) as regular maintenance from a const method. /// Mutable because we flush zombie entries (null-weakptrs) as regular maintenance from a const method.
mutable std::map<NodeId, std::weak_ptr<Session>> m_sessions; mutable std::map<NodeId, std::weak_ptr<Session>> m_sessions;
mutable RecursiveMutex x_sessions; mutable RecursiveMutex x_sessions;

40
libp2p/NodeTable.h

@ -24,51 +24,14 @@
#include <algorithm> #include <algorithm>
#include <deque> #include <deque>
#include <boost/integer/static_log2.hpp> #include <boost/integer/static_log2.hpp>
#include <libdevcrypto/Common.h>
#include <libp2p/UDP.h> #include <libp2p/UDP.h>
#include "Common.h"
namespace dev namespace dev
{ {
namespace p2p namespace p2p
{ {
/**
* @brief IPv4,UDP/TCP endpoints.
*/
struct NodeIPEndpoint
{
NodeIPEndpoint(): udp(bi::udp::endpoint()), tcp(bi::tcp::endpoint()) {}
NodeIPEndpoint(bi::udp::endpoint _udp): udp(_udp) {}
NodeIPEndpoint(bi::tcp::endpoint _tcp): tcp(_tcp) {}
NodeIPEndpoint(bi::udp::endpoint _udp, bi::tcp::endpoint _tcp): udp(_udp), tcp(_tcp) {}
bi::udp::endpoint udp;
bi::tcp::endpoint tcp;
operator bool() const { return udp.address().is_unspecified() && tcp.address().is_unspecified(); }
};
struct Node
{
Node(): endpoint(NodeIPEndpoint()) {};
Node(Public _pubk, NodeIPEndpoint _ip, bool _required = false): id(_pubk), endpoint(_ip), required(_required) {}
Node(Public _pubk, bi::udp::endpoint _udp, bool _required = false): Node(_pubk, NodeIPEndpoint(_udp), _required) {}
virtual NodeId const& address() const { return id; }
virtual Public const& publicKey() const { return id; }
NodeId id;
/// Endpoints by which we expect to reach node.
NodeIPEndpoint endpoint;
/// If true, node will not be removed from Node list.
bool required = false;
operator bool() const { return (bool)id; }
};
/** /**
* NodeEntry * NodeEntry
* @brief Entry in Node Table * @brief Entry in Node Table
@ -151,7 +114,6 @@ protected:
*/ */
class NodeTable: UDPSocketEvents, public std::enable_shared_from_this<NodeTable> class NodeTable: UDPSocketEvents, public std::enable_shared_from_this<NodeTable>
{ {
friend struct Neighbours;
using NodeSocket = UDPSocket<NodeTable, 1280>; using NodeSocket = UDPSocket<NodeTable, 1280>;
using TimePoint = std::chrono::steady_clock::time_point; using TimePoint = std::chrono::steady_clock::time_point;
using EvictionTimeout = std::pair<std::pair<NodeId,TimePoint>,NodeId>; ///< First NodeId may be evicted and replaced with second NodeId. using EvictionTimeout = std::pair<std::pair<NodeId,TimePoint>,NodeId>; ///< First NodeId may be evicted and replaced with second NodeId.

17
libp2p/Session.cpp

@ -36,7 +36,7 @@ using namespace dev::p2p;
#endif #endif
#define clogS(X) dev::LogOutputStream<X, true>(false) << "| " << std::setw(2) << m_socket.native_handle() << "] " #define clogS(X) dev::LogOutputStream<X, true>(false) << "| " << std::setw(2) << m_socket.native_handle() << "] "
Session::Session(Host* _s, bi::tcp::socket _socket, std::shared_ptr<PeerInfo> const& _n): Session::Session(Host* _s, bi::tcp::socket _socket, std::shared_ptr<Peer> const& _n):
m_server(_s), m_server(_s),
m_socket(std::move(_socket)), m_socket(std::move(_socket)),
m_peer(_n), m_peer(_n),
@ -114,9 +114,9 @@ void Session::serviceNodesRequest()
if (!m_theyRequestedNodes) if (!m_theyRequestedNodes)
return; return;
// TODO: P2P // TODO: P2P reimplement, as per TCP "close nodes" gossip specifications (WiP)
// auto peers = m_server->potentialPeers(m_knownNodes); // auto peers = m_server->potentialPeers(m_knownNodes);
Nodes peers; Peers peers;
if (peers.empty()) if (peers.empty())
{ {
addNote("peers", "requested"); addNote("peers", "requested");
@ -129,11 +129,11 @@ void Session::serviceNodesRequest()
auto rs = randomSelection(peers, 10); auto rs = randomSelection(peers, 10);
for (auto const& i: rs) for (auto const& i: rs)
{ {
clogS(NetTriviaDetail) << "Sending peer " << i.id.abridged() << i.address; clogS(NetTriviaDetail) << "Sending peer " << i.id.abridged() << i.peerEndpoint();
if (i.address.address().is_v4()) if (i.peerEndpoint().address().is_v4())
s.appendList(3) << bytesConstRef(i.address.address().to_v4().to_bytes().data(), 4) << i.address.port() << i.id; s.appendList(3) << bytesConstRef(i.peerEndpoint().address().to_v4().to_bytes().data(), 4) << i.peerEndpoint().port() << i.id;
else// if (i.second.address().is_v6()) - assumed else// if (i.second.address().is_v6()) - assumed
s.appendList(3) << bytesConstRef(i.address.address().to_v6().to_bytes().data(), 16) << i.address.port() << i.id; s.appendList(3) << bytesConstRef(i.peerEndpoint().address().to_v6().to_bytes().data(), 16) << i.peerEndpoint().port() << i.id;
} }
sealAndSend(s); sealAndSend(s);
m_theyRequestedNodes = false; m_theyRequestedNodes = false;
@ -188,7 +188,7 @@ bool Session::interpret(RLP const& _r)
else if (!m_peer->id) else if (!m_peer->id)
{ {
m_peer->id = id; m_peer->id = id;
m_peer->address.port(listenPort); m_peer->endpoint.tcp.port(listenPort);
} }
else if (m_peer->id != id) else if (m_peer->id != id)
{ {
@ -386,7 +386,6 @@ void Session::send(bytes&& _msg)
if (!checkPacket(bytesConstRef(&_msg))) if (!checkPacket(bytesConstRef(&_msg)))
clogS(NetWarn) << "INVALID PACKET CONSTRUCTED!"; clogS(NetWarn) << "INVALID PACKET CONSTRUCTED!";
// cerr << (void*)this << " writeImpl" << endl;
if (!m_socket.is_open()) if (!m_socket.is_open())
return; return;

6
libp2p/Session.h

@ -39,7 +39,7 @@ namespace dev
namespace p2p namespace p2p
{ {
struct PeerInfo; class Peer;
/** /**
* @brief The Session class * @brief The Session class
@ -51,7 +51,7 @@ class Session: public std::enable_shared_from_this<Session>
friend class HostCapabilityFace; friend class HostCapabilityFace;
public: public:
Session(Host* _server, bi::tcp::socket _socket, std::shared_ptr<PeerInfo> const& _n); Session(Host* _server, bi::tcp::socket _socket, std::shared_ptr<Peer> const& _n);
virtual ~Session(); virtual ~Session();
void start(); void start();
@ -108,7 +108,7 @@ private:
bytes m_incoming; ///< Read buffer for ingress bytes. bytes m_incoming; ///< Read buffer for ingress bytes.
unsigned m_protocolVersion = 0; ///< The protocol version of the peer. unsigned m_protocolVersion = 0; ///< The protocol version of the peer.
std::shared_ptr<PeerInfo> m_peer; ///< The PeerInfo object. std::shared_ptr<Peer> m_peer; ///< The Peer object.
bool m_dropped = false; ///< If true, we've already divested ourselves of this peer. We're just waiting for the reads & writes to fail before the shared_ptr goes OOS and the destructor kicks in. bool m_dropped = false; ///< If true, we've already divested ourselves of this peer. We're just waiting for the reads & writes to fail before the shared_ptr goes OOS and the destructor kicks in.
PeerSessionInfo m_info; ///< Dynamic information about this peer. PeerSessionInfo m_info; ///< Dynamic information about this peer.

4
libwebthree/WebThree.h

@ -78,7 +78,7 @@ public:
virtual p2p::NodeId id() const = 0; virtual p2p::NodeId id() const = 0;
/// Gets the nodes. /// Gets the nodes.
virtual p2p::Nodes nodes() const = 0; virtual p2p::Peers nodes() const = 0;
/// Start the network subsystem. /// Start the network subsystem.
virtual void startNetwork() = 0; virtual void startNetwork() = 0;
@ -148,7 +148,7 @@ public:
p2p::NodeId id() const override { return m_net.id(); } p2p::NodeId id() const override { return m_net.id(); }
/// Gets the nodes. /// Gets the nodes.
p2p::Nodes nodes() const override { return m_net.nodes(); } p2p::Peers nodes() const override { return m_net.nodes(); }
/// Start the network subsystem. /// Start the network subsystem.
void startNetwork() override { m_net.start(); } void startNetwork() override { m_net.start(); }

Loading…
Cancel
Save