Browse Source

Persist host identifier to disk. Replace noteNode with addNode. Add udp node port to connect. Add addNode to node table which pings node, potentially adding node to table if node respons. Rename NodeEndpoint to NodeIPEndpoint.

cl-refactor
subtly 10 years ago
parent
commit
fa11fc0149
  1. 180
      libp2p/Host.cpp
  2. 34
      libp2p/Host.h
  3. 19
      libp2p/NodeTable.cpp
  4. 36
      libp2p/NodeTable.h
  5. 28
      libp2p/Session.cpp

180
libp2p/Host.cpp

@ -28,6 +28,7 @@
#include <libdevcore/Common.h> #include <libdevcore/Common.h>
#include <libdevcore/CommonIO.h> #include <libdevcore/CommonIO.h>
#include <libethcore/Exceptions.h> #include <libethcore/Exceptions.h>
#include <devcrypto/FileSystem.h>
#include "Session.h" #include "Session.h"
#include "Common.h" #include "Common.h"
#include "Capability.h" #include "Capability.h"
@ -44,14 +45,14 @@ Host::Host(std::string const& _clientVersion, NetworkPreferences const& _n, bool
m_ifAddresses(Network::getInterfaceAddresses()), m_ifAddresses(Network::getInterfaceAddresses()),
m_ioService(2), m_ioService(2),
m_tcp4Acceptor(m_ioService), m_tcp4Acceptor(m_ioService),
m_key(KeyPair::create()), m_key(move(getHostIdentifier())),
m_nodeTable(new NodeTable(m_ioService, m_key)) m_nodeTable(new NodeTable(m_ioService, m_key))
{ {
for (auto address: m_ifAddresses) for (auto address: m_ifAddresses)
if (address.is_v4()) if (address.is_v4())
clog(NetNote) << "IP Address: " << address << " = " << (isPrivateAddress(address) ? "[LOCAL]" : "[PEER]"); clog(NetNote) << "IP Address: " << address << " = " << (isPrivateAddress(address) ? "[LOCAL]" : "[PEER]");
clog(NetNote) << "Id:" << id().abridged(); clog(NetNote) << "Id:" << id();
if (_start) if (_start)
start(); start();
} }
@ -178,69 +179,78 @@ void Host::seal(bytes& _b)
_b[7] = len & 0xff; _b[7] = len & 0xff;
} }
// TODO P2P: remove oldid. port to NodeTable. (see noteNode calls, Session.cpp#218,337) // TODO: P2P port to NodeTable. (see noteNode calls, Session.cpp)
shared_ptr<NodeInfo> Host::noteNode(NodeId _id, bi::tcp::endpoint _a, NodeId _oldId) //shared_ptr<NodeInfo> Host::noteNode(NodeId _id, bi::tcp::endpoint _a)
{ //{
RecursiveGuard l(x_peers); // RecursiveGuard l(x_peers);
if (_a.port() < 30300 || _a.port() > 30305) // if (_a.port() < 30300 || _a.port() > 30305)
cwarn << "Weird port being recorded: " << _a.port(); // cwarn << "Weird port being recorded: " << _a.port();
//
if (_a.port() >= /*49152*/32768) // if (_a.port() >= /*49152*/32768)
{ // {
cwarn << "Private port being recorded - setting to 0"; // cwarn << "Private port being recorded - setting to 0";
_a = bi::tcp::endpoint(_a.address(), 0); // _a = bi::tcp::endpoint(_a.address(), 0);
} // }
//
// unsigned i;
// if (!m_nodes.count(_id))
// {
// i = m_nodesList.size();
// m_nodesList.push_back(_id);
// m_nodes[_id] = make_shared<NodeInfo>();
// m_nodes[_id]->id = _id;
// m_nodes[_id]->index = i;
// }
// else
// i = m_nodes[_id]->index;
// m_nodes[_id]->address = _a;
// m_private.extendAll(i);
// if (!_a.port() || (isPrivateAddress(_a.address()) && !m_netPrefs.localNetworking))
// m_private += i;
// else
// m_private -= i;
//
// return m_nodes[_id];
//}
// First check for another node with the same connection credentials, and put it in oldId if found. // TODO: P2P base on target
if (!_oldId) // TODO: P2P store caps in NodeTable/NodeEntry
for (pair<h512, shared_ptr<NodeInfo>> const& n: m_nodes) //Nodes Host::potentialPeers(RangeMask<unsigned> const& _known)
if (n.second->address == _a && n.second->id != _id) //{
{ // RecursiveGuard l(x_peers);
_oldId = n.second->id; // Nodes ret;
break; //
} // // todo: if localnetworking is enabled it should only share peers if remote
// // is within the same network as our interfaces.
// // this requires flagging nodes when we receive them as to if they're on private network
// auto ns = (m_netPrefs.localNetworking ? _known : (m_private + _known)).inverted();
// for (auto i: ns)
// ret.push_back(*m_nodes[m_nodesList[i]]);
// return ret;
//}
unsigned i; KeyPair Host::getHostIdentifier()
if (!m_nodes.count(_id)) {
{ static string s_file(getDataDir() + "/host");
if (m_nodes.count(_oldId)) static mutex s_x;
{ lock_guard<mutex> l(s_x);
i = m_nodes[_oldId]->index;
m_nodes.erase(_oldId); h256 secret;
m_nodesList[i] = _id; bytes b = contents(s_file);
} if (b.size() == 32)
memcpy(secret.data(), b.data(), 32);
else else
{ {
i = m_nodesList.size(); // todo: replace w/user entropy; abstract to devcrypto
m_nodesList.push_back(_id); std::mt19937_64 s_eng(time(0) + chrono::high_resolution_clock::now().time_since_epoch().count());
} std::uniform_int_distribution<uint16_t> d(0, 255);
m_nodes[_id] = make_shared<NodeInfo>(); for (unsigned i = 0; i < 32; ++i)
m_nodes[_id]->id = _id; secret[i] = (byte)d(s_eng);
m_nodes[_id]->index = i;
} }
else
i = m_nodes[_id]->index;
m_nodes[_id]->address = _a;
m_private.extendAll(i);
if (!_a.port() || (isPrivateAddress(_a.address()) && !m_netPrefs.localNetworking))
m_private += i;
else
m_private -= i;
return m_nodes[_id];
}
// TODO P2P: should be based on target
// TODO P2P: store caps in NodeTable/NodeEntry
Nodes Host::potentialPeers(RangeMask<unsigned> const& _known)
{
RecursiveGuard l(x_peers);
Nodes ret;
auto ns = (m_netPrefs.localNetworking ? _known : (m_private + _known)).inverted(); if (!secret)
for (auto i: ns) BOOST_THROW_EXCEPTION(crypto::InvalidState());
ret.push_back(*m_nodes[m_nodesList[i]]); return move(KeyPair(move(secret)));
return ret;
} }
void Host::determinePublic(string const& _publicAddress, bool _upnp) void Host::determinePublic(string const& _publicAddress, bool _upnp)
@ -366,9 +376,25 @@ string Host::pocHost()
return "poc-" + strs[1] + ".ethdev.com"; return "poc-" + strs[1] + ".ethdev.com";
} }
// TODO P2P: support for TCP+UDP when manually connecting void Host::addNode(NodeId const& _node, std::string const& _addr, unsigned short _tcpPeerPort, unsigned short _udpNodePort)
// TODO P2P: remove in favor of addNode(NodeId, string, uint16_t, bool _required = false) {
void Host::connect(NodeId const& _node, std::string const& _addr, unsigned short _port) noexcept boost::system::error_code ec;
bi::address addr = bi::address::from_string(_addr, ec);
if (ec)
{
bi::tcp::resolver r(m_ioService);
r.async_resolve({_addr, toString(_tcpPeerPort)}, [=](boost::system::error_code const& _ec, bi::tcp::resolver::iterator _epIt) {
if (_ec)
return;
bi::tcp::endpoint tcp = *_epIt;
addNode(Node(_node, NodeIPEndpoint(bi::udp::endpoint(tcp.address(), _udpNodePort), tcp)));
});
}
else
addNode(Node(_node, NodeIPEndpoint(bi::udp::endpoint(addr, _udpNodePort), bi::tcp::endpoint(addr, _tcpPeerPort))));
}
void Host::connect(NodeId const& _node, std::string const& _addr, unsigned short _peerPort, unsigned short _nodePort) noexcept
{ {
if (!m_run) if (!m_run)
return; return;
@ -388,10 +414,10 @@ void Host::connect(NodeId const& _node, std::string const& _addr, unsigned short
if (first) if (first)
{ {
bi::tcp::resolver r(m_ioService); bi::tcp::resolver r(m_ioService);
ep = r.resolve({_addr, toString(_port)})->endpoint(); ep = r.resolve({_addr, toString(_peerPort)})->endpoint();
} }
else else
ep = bi::tcp::endpoint(bi::address::from_string(_addr), _port); ep = bi::tcp::endpoint(bi::address::from_string(_addr), _peerPort);
if (!n) if (!n)
m_nodes[_node] = make_shared<NodeInfo>(); m_nodes[_node] = make_shared<NodeInfo>();
@ -494,8 +520,8 @@ unsigned NodeInfo::fallbackSeconds() const
} }
} }
// TODO P2P: rebuild noetable when localNetworking is enabled/disabled // TODO: P2P rebuild noetable when localNetworking is enabled/disabled
// TODO P2P: migrate grow/prunePeers into 'maintainPeers' & evaluate reputation instead of availability. schedule via deadline timer. // TODO: P2P migrate grow/prunePeers into 'maintainPeers' & evaluate reputation instead of availability. schedule via deadline timer.
//void Host::growPeers() //void Host::growPeers()
//{ //{
// RecursiveGuard l(x_peers); // RecursiveGuard l(x_peers);
@ -663,7 +689,6 @@ void Host::pingAll()
m_lastPing = chrono::steady_clock::now(); m_lastPing = chrono::steady_clock::now();
} }
// TODO P2P: integration: todo save/restoreNodes
bytes Host::saveNodes() const bytes Host::saveNodes() const
{ {
RLPStream nodes; RLPStream nodes;
@ -704,9 +729,8 @@ void Host::restoreNodes(bytesConstRef _b)
{ {
case 0: case 0:
{ {
auto oldId = id();
m_key = KeyPair(r[1].toHash<Secret>()); m_key = KeyPair(r[1].toHash<Secret>());
noteNode(id(), m_tcpPublic, oldId); // noteNode(id(), m_tcpPublic);
for (auto i: r[2]) for (auto i: r[2])
{ {
@ -718,14 +742,14 @@ void Host::restoreNodes(bytesConstRef _b)
auto id = (NodeId)i[2]; auto id = (NodeId)i[2];
if (!m_nodes.count(id)) if (!m_nodes.count(id))
{ {
// auto o = (Origin)i[3].toInt<int>(); //// auto o = (Origin)i[3].toInt<int>();
auto n = noteNode(id, ep); // auto n = noteNode(id, ep);
n->lastConnected = chrono::system_clock::time_point(chrono::seconds(i[4].toInt<unsigned>())); // n->lastConnected = chrono::system_clock::time_point(chrono::seconds(i[4].toInt<unsigned>()));
n->lastAttempted = chrono::system_clock::time_point(chrono::seconds(i[5].toInt<unsigned>())); // n->lastAttempted = chrono::system_clock::time_point(chrono::seconds(i[5].toInt<unsigned>()));
n->failedAttempts = i[6].toInt<unsigned>(); // n->failedAttempts = i[6].toInt<unsigned>();
n->lastDisconnect = (DisconnectReason)i[7].toInt<unsigned>(); // n->lastDisconnect = (DisconnectReason)i[7].toInt<unsigned>();
n->score = (int)i[8].toInt<unsigned>(); // n->score = (int)i[8].toInt<unsigned>();
n->rating = (int)i[9].toInt<unsigned>(); // n->rating = (int)i[9].toInt<unsigned>();
} }
} }
} }
@ -742,7 +766,7 @@ void Host::restoreNodes(bytesConstRef _b)
ep = bi::tcp::endpoint(bi::address_v4(i[0].toArray<byte, 4>()), i[1].toInt<short>()); ep = bi::tcp::endpoint(bi::address_v4(i[0].toArray<byte, 4>()), i[1].toInt<short>());
else else
ep = bi::tcp::endpoint(bi::address_v6(i[0].toArray<byte, 16>()), i[1].toInt<short>()); ep = bi::tcp::endpoint(bi::address_v6(i[0].toArray<byte, 16>()), i[1].toInt<short>());
auto n = noteNode(id, ep); // auto n = noteNode(id, ep);
} }
} }
} }

34
libp2p/Host.h

@ -56,7 +56,7 @@ struct NodeInfo
NodeId id; ///< Their id/public key. NodeId id; ///< Their id/public key.
unsigned index; ///< Index into m_nodesList unsigned index; ///< Index into m_nodesList
// p2p: move to NodeEndpoint // p2p: move to NodeIPEndpoint
bi::tcp::endpoint address; ///< As reported from the node itself. bi::tcp::endpoint address; ///< As reported from the node itself.
// p2p: This information is relevant to the network-stack, ex: firewall, rather than node itself // p2p: This information is relevant to the network-stack, ex: firewall, rather than node itself
@ -73,14 +73,14 @@ struct NodeInfo
int score = 0; ///< All time cumulative. int score = 0; ///< All time cumulative.
int rating = 0; ///< Trending. int rating = 0; ///< Trending.
// p2p: move to NodeEndpoint // p2p: move to NodeIPEndpoint
int secondsSinceLastConnected() const { return lastConnected == std::chrono::system_clock::time_point() ? -1 : (int)std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now() - lastConnected).count(); } int secondsSinceLastConnected() const { return lastConnected == std::chrono::system_clock::time_point() ? -1 : (int)std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now() - lastConnected).count(); }
// p2p: move to NodeEndpoint // p2p: move to NodeIPEndpoint
int secondsSinceLastAttempted() const { return lastAttempted == std::chrono::system_clock::time_point() ? -1 : (int)std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now() - lastAttempted).count(); } int secondsSinceLastAttempted() const { return lastAttempted == std::chrono::system_clock::time_point() ? -1 : (int)std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now() - lastAttempted).count(); }
// p2p: move to NodeEndpoint // p2p: move to NodeIPEndpoint
unsigned fallbackSeconds() const; unsigned fallbackSeconds() const;
// p2p: move to NodeEndpoint // p2p: move to NodeIPEndpoint
bool shouldReconnect() const { return std::chrono::system_clock::now() > lastAttempted + std::chrono::seconds(fallbackSeconds()); } bool shouldReconnect() const { return std::chrono::system_clock::now() > lastAttempted + std::chrono::seconds(fallbackSeconds()); }
// p2p: This has two meanings now. It's possible UDP works but TPC is down (unable to punch hole). // p2p: This has two meanings now. It's possible UDP works but TPC is down (unable to punch hole).
@ -114,6 +114,7 @@ using Nodes = std::vector<NodeInfo>;
* @brief The Host class * @brief The Host class
* Capabilities should be registered prior to startNetwork, since m_capabilities is not thread-safe. * Capabilities should be registered prior to startNetwork, since m_capabilities is not thread-safe.
* @todo determinePublic: ipv6, udp * @todo determinePublic: ipv6, udp
* @todo handle conflict if addNode/requireNode called and Node already exists w/conflicting tcp or udp port
*/ */
class Host: public Worker class Host: public Worker
{ {
@ -128,6 +129,9 @@ public:
/// Will block on network process events. /// Will block on network process events.
virtual ~Host(); virtual ~Host();
/// Default host for current version of client.
static std::string pocHost();
/// Basic peer network protocol version. /// Basic peer network protocol version.
unsigned protocolVersion() const; unsigned protocolVersion() const;
@ -138,9 +142,11 @@ public:
CapDescs caps() const { CapDescs ret; for (auto const& i: m_capabilities) ret.push_back(i.first); return ret; } CapDescs caps() const { CapDescs ret; for (auto const& i: m_capabilities) ret.push_back(i.first); return ret; }
template <class T> std::shared_ptr<T> cap() const { try { return std::static_pointer_cast<T>(m_capabilities.at(std::make_pair(T::staticName(), T::staticVersion()))); } catch (...) { return nullptr; } } template <class T> std::shared_ptr<T> cap() const { try { return std::static_pointer_cast<T>(m_capabilities.at(std::make_pair(T::staticName(), T::staticVersion()))); } catch (...) { return nullptr; } }
/// Manually add node.
void addNode(NodeId const& _node, std::string const& _addr, unsigned short _tcpPort = 30303, unsigned short _udpPort = 30303);
/// Connect to a peer explicitly. /// Connect to a peer explicitly.
static std::string pocHost(); void connect(NodeId const& _node, std::string const& _addr, unsigned short _tcpPort = 30303, unsigned short _udpPort = 30303) noexcept;
void connect(NodeId const& _node, std::string const& _addr, unsigned short _port = 30303) noexcept;
void connect(NodeId const& _node, bi::tcp::endpoint const& _ep); void connect(NodeId const& _node, bi::tcp::endpoint const& _ep);
void connect(std::shared_ptr<NodeInfo> const& _n); void connect(std::shared_ptr<NodeInfo> const& _n);
@ -192,6 +198,8 @@ public:
std::shared_ptr<NodeInfo> node(NodeId _id) const { if (m_nodes.count(_id)) return m_nodes.at(_id); return std::shared_ptr<NodeInfo>(); } std::shared_ptr<NodeInfo> node(NodeId _id) const { if (m_nodes.count(_id)) return m_nodes.at(_id); return std::shared_ptr<NodeInfo>(); }
private: private:
KeyPair getHostIdentifier();
/// Populate m_peerAddresses with available public addresses. /// Populate m_peerAddresses with available public addresses.
void determinePublic(std::string const& _publicAddress, bool _upnp); void determinePublic(std::string const& _publicAddress, bool _upnp);
@ -214,8 +222,10 @@ private:
/// Shutdown network. Not thread-safe; to be called only by worker. /// Shutdown network. Not thread-safe; to be called only by worker.
virtual void doneWorking(); virtual void doneWorking();
std::shared_ptr<NodeInfo> noteNode(NodeId _id, bi::tcp::endpoint _a, NodeId _oldId = NodeId()); /// Add node
Nodes potentialPeers(RangeMask<unsigned> const& _known); void addNode(Node const& _nodeInfo) { m_nodeTable->addNode(_nodeInfo); }
// Nodes potentialPeers(RangeMask<unsigned> const& _known);
bool m_run = false; ///< Whether network is running. bool m_run = false; ///< Whether network is running.
std::mutex x_runTimer; ///< Start/stop mutex. std::mutex x_runTimer; ///< Start/stop mutex.
@ -254,10 +264,10 @@ private:
/// TODO: mutex; replace with nodeTable /// TODO: mutex; replace with nodeTable
std::map<NodeId, std::shared_ptr<NodeInfo> > m_nodes; std::map<NodeId, std::shared_ptr<NodeInfo> > m_nodes;
/// A list of node IDs. This contains every index from m_nodes; the order is guaranteed to remain the same. // /// A list of node IDs. This contains every index from m_nodes; the order is guaranteed to remain the same.
std::vector<NodeId> m_nodesList; // std::vector<NodeId> m_nodesList;
RangeMask<unsigned> m_private; ///< Indices into m_nodesList over to which nodes are private. // RangeMask<unsigned> m_private; ///< Indices into m_nodesList over to which nodes are private.
unsigned m_idealPeerCount = 5; ///< Ideal number of peers to be connected to. unsigned m_idealPeerCount = 5; ///< Ideal number of peers to be connected to.

19
libp2p/NodeTable.cpp

@ -24,8 +24,8 @@ using namespace std;
using namespace dev; using namespace dev;
using namespace dev::p2p; using namespace dev::p2p;
NodeEntry::NodeEntry(Node _src, Public _pubk, NodeDefaultEndpoint _gw): Node(_pubk, _gw), distance(NodeTable::dist(_src.id,_pubk)) {} NodeEntry::NodeEntry(Node _src, Public _pubk, NodeIPEndpoint _gw): Node(_pubk, _gw), distance(NodeTable::dist(_src.id,_pubk)) {}
NodeEntry::NodeEntry(Node _src, Public _pubk, bi::udp::endpoint _udp): Node(_pubk, NodeDefaultEndpoint(_udp)), distance(NodeTable::dist(_src.id,_pubk)) {} NodeEntry::NodeEntry(Node _src, Public _pubk, bi::udp::endpoint _udp): Node(_pubk, NodeIPEndpoint(_udp)), distance(NodeTable::dist(_src.id,_pubk)) {}
NodeTable::NodeTable(ba::io_service& _io, KeyPair _alias, uint16_t _udp, bi::tcp::endpoint _ep): NodeTable::NodeTable(ba::io_service& _io, KeyPair _alias, uint16_t _udp, bi::tcp::endpoint _ep):
m_node(Node(_alias.pub(), bi::udp::endpoint())), m_node(Node(_alias.pub(), bi::udp::endpoint())),
@ -240,15 +240,24 @@ void NodeTable::evict(shared_ptr<NodeEntry> _leastSeen, shared_ptr<NodeEntry> _n
} }
shared_ptr<NodeEntry> NodeTable::addNode(Public const& _pubk, bi::udp::endpoint const& _udp, bi::tcp::endpoint const& _tcp) shared_ptr<NodeEntry> NodeTable::addNode(Public const& _pubk, bi::udp::endpoint const& _udp, bi::tcp::endpoint const& _tcp)
{
auto node = Node(_pubk, NodeIPEndpoint(_udp, _tcp));
return move(addNode(node));
}
shared_ptr<NodeEntry> NodeTable::addNode(Node const& _node)
{ {
shared_ptr<NodeEntry> ret; shared_ptr<NodeEntry> ret;
Guard l(x_nodes); Guard l(x_nodes);
if (auto n = m_nodes[_pubk]) if (auto n = m_nodes[_node.id])
ret = n; ret = n;
else else
{ {
ret.reset(new NodeEntry(m_node, _pubk, _udp)); ret.reset(new NodeEntry(m_node, _node.id, NodeIPEndpoint(_node.endpoint.udp, _node.endpoint.tcp)));
m_nodes[_pubk] = ret; m_nodes[_node.id] = ret;
PingNode p(_node.endpoint.udp, m_node.endpoint.udp.address().to_string(), m_node.endpoint.udp.port());
p.sign(m_secret);
m_socketPtr->send(p);
} }
return move(ret); return move(ret);
} }

36
libp2p/NodeTable.h

@ -31,11 +31,14 @@ namespace dev
namespace p2p namespace p2p
{ {
struct NodeDefaultEndpoint /**
* @brief IPv4,UDP/TCP endpoints.
*/
struct NodeIPEndpoint
{ {
NodeDefaultEndpoint(bi::udp::endpoint _udp): udp(_udp) {} NodeIPEndpoint(bi::udp::endpoint _udp): udp(_udp) {}
NodeDefaultEndpoint(bi::tcp::endpoint _tcp): tcp(_tcp) {} NodeIPEndpoint(bi::tcp::endpoint _tcp): tcp(_tcp) {}
NodeDefaultEndpoint(bi::udp::endpoint _udp, bi::tcp::endpoint _tcp): udp(_udp), tcp(_tcp) {} NodeIPEndpoint(bi::udp::endpoint _udp, bi::tcp::endpoint _tcp): udp(_udp), tcp(_tcp) {}
bi::udp::endpoint udp; bi::udp::endpoint udp;
bi::tcp::endpoint tcp; bi::tcp::endpoint tcp;
@ -43,14 +46,19 @@ struct NodeDefaultEndpoint
struct Node struct Node
{ {
Node(Public _pubk, NodeDefaultEndpoint _udp): id(_pubk), endpoint(_udp) {} Node(Public _pubk, NodeIPEndpoint _ip, bool _required = false): id(_pubk), endpoint(_ip), required(_required) {}
Node(Public _pubk, bi::udp::endpoint _udp): Node(_pubk, NodeDefaultEndpoint(_udp)) {} Node(Public _pubk, bi::udp::endpoint _udp, bool _required = false): Node(_pubk, NodeIPEndpoint(_udp), _required) {}
virtual NodeId const& address() const { return id; } virtual NodeId const& address() const { return id; }
virtual Public const& publicKey() const { return id; } virtual Public const& publicKey() const { return id; }
NodeId id; NodeId id;
NodeDefaultEndpoint endpoint;
/// Endpoints by which we expect to reach node.
NodeIPEndpoint endpoint;
/// If true, node will not be removed from Node list.
bool required = false;
}; };
@ -60,8 +68,8 @@ struct Node
*/ */
struct NodeEntry: public Node struct NodeEntry: public Node
{ {
NodeEntry(Node _src, Public _pubk, NodeDefaultEndpoint _gw); //: Node(_pubk, _gw), distance(dist(_src.id,_pubk)) {} NodeEntry(Node _src, Public _pubk, NodeIPEndpoint _gw); //: Node(_pubk, _gw), distance(dist(_src.id,_pubk)) {}
NodeEntry(Node _src, Public _pubk, bi::udp::endpoint _udp); //: Node(_pubk, NodeDefaultEndpoint(_udp)), distance(dist(_src.id,_pubk)) {} NodeEntry(Node _src, Public _pubk, bi::udp::endpoint _udp); //: Node(_pubk, NodeIPEndpoint(_udp)), distance(dist(_src.id,_pubk)) {}
const unsigned distance; ///< Node's distance from _src (see constructor). const unsigned distance; ///< Node's distance from _src (see constructor).
}; };
@ -131,9 +139,12 @@ public:
static unsigned dist(NodeId const& _a, NodeId const& _b) { u512 d = _a ^ _b; unsigned ret; for (ret = 0; d >>= 1; ++ret) {}; return ret; } static unsigned dist(NodeId const& _a, NodeId const& _b) { u512 d = _a ^ _b; unsigned ret; for (ret = 0; d >>= 1; ++ret) {}; return ret; }
/// Add node details and attempt adding to node table if node responds to ping. NodeEntry will immediately be returned and may be used for required connectivity. /// Add node. Node will be pinged if it's not already known.
std::shared_ptr<NodeEntry> addNode(Public const& _pubk, bi::udp::endpoint const& _udp, bi::tcp::endpoint const& _tcp = bi::tcp::endpoint()); std::shared_ptr<NodeEntry> addNode(Public const& _pubk, bi::udp::endpoint const& _udp, bi::tcp::endpoint const& _tcp = bi::tcp::endpoint());
/// Add node. Node will be pinged if it's not already known.
std::shared_ptr<NodeEntry> addNode(Node const& _node);
void join(); void join();
NodeEntry root() const { return NodeEntry(m_node, m_node.publicKey(), m_node.endpoint.udp); } NodeEntry root() const { return NodeEntry(m_node, m_node.publicKey(), m_node.endpoint.udp); }
@ -142,7 +153,6 @@ public:
NodeEntry operator[](NodeId _id); NodeEntry operator[](NodeId _id);
protected: protected:
struct NodeBucket struct NodeBucket
{ {
@ -195,10 +205,10 @@ protected:
Secret m_secret; ///< This nodes secret key. Secret m_secret; ///< This nodes secret key.
mutable Mutex x_nodes; ///< Mutable for thread-safe copy in nodes() const. mutable Mutex x_nodes; ///< Mutable for thread-safe copy in nodes() const.
std::map<NodeId, std::shared_ptr<NodeEntry>> m_nodes; ///< NodeId -> Node table std::map<NodeId, std::shared_ptr<NodeEntry>> m_nodes; ///< Nodes
mutable Mutex x_state; mutable Mutex x_state;
std::array<NodeBucket, s_bins> m_state; ///< State table of binned nodes. std::array<NodeBucket, s_bins> m_state; ///< State of p2p node network.
Mutex x_evictions; Mutex x_evictions;
std::deque<EvictionTimeout> m_evictions; ///< Eviction timeouts. std::deque<EvictionTimeout> m_evictions; ///< Eviction timeouts.

28
libp2p/Session.cpp

@ -48,7 +48,7 @@ Session::Session(Host* _s, bi::tcp::socket _socket, std::shared_ptr<NodeInfo> co
Session::~Session() Session::~Session()
{ {
// TODO P2P: revisit (refactored from previous logic) // TODO: P2P revisit (refactored from previous logic)
if (m_node && !(id() && !isPermanentProblem(m_node->lastDisconnect) && !m_node->dead)) if (m_node && !(id() && !isPermanentProblem(m_node->lastDisconnect) && !m_node->dead))
m_node->lastConnected = m_node->lastAttempted - chrono::seconds(1); m_node->lastConnected = m_node->lastAttempted - chrono::seconds(1);
@ -87,7 +87,7 @@ int Session::rating() const
return m_node->rating; return m_node->rating;
} }
// TODO P2P: integration: session->? should be unavailable when socket isn't open // TODO: P2P integration: session->? should be unavailable when socket isn't open
bi::tcp::endpoint Session::endpoint() const bi::tcp::endpoint Session::endpoint() const
{ {
if (m_socket.is_open() && m_node) if (m_socket.is_open() && m_node)
@ -117,7 +117,7 @@ template <class T> vector<T> randomSelection(vector<T> const& _t, unsigned _n)
return ret; return ret;
} }
// TODO P2P: integration: replace w/asio post -> serviceNodesRequest() // TODO: P2P integration: replace w/asio post -> serviceNodesRequest()
void Session::ensureNodesRequested() void Session::ensureNodesRequested()
{ {
if (isOpen() && !m_weRequestedNodes) if (isOpen() && !m_weRequestedNodes)
@ -133,7 +133,9 @@ void Session::serviceNodesRequest()
if (!m_theyRequestedNodes) if (!m_theyRequestedNodes)
return; return;
auto peers = m_server->potentialPeers(m_knownNodes); // TODO: P2P
// auto peers = m_server->potentialPeers(m_knownNodes);
Nodes peers;
if (peers.empty()) if (peers.empty())
{ {
addNote("peers", "requested"); addNote("peers", "requested");
@ -210,14 +212,16 @@ bool Session::interpret(RLP const& _r)
return true; return true;
} }
// TODO P2P: first pass, implement signatures. if signature fails, drop connection. if egress, flag node's endpoint as stale. // TODO: P2P first pass, implement signatures. if signature fails, drop connection. if egress, flag node's endpoint as stale.
// TODO P2P: remove oldid // Discussion: Most this to Host so we consolidate authentication logic and eschew peer deduplication logic.
// TODO P2P: with encrypted transport the handshake will fail and we won't get here // TODO: P2P Move all node-lifecycle information into Host. Determine best way to handle peer-lifecycle properties vs node lifecycle.
m_node = m_server->noteNode(id, bi::tcp::endpoint(m_socket.remote_endpoint().address(), listenPort), m_node->id); // TODO: P2P remove oldid
// TODO: P2P with encrypted transport the handshake will fail and we won't get here
// m_node = m_server->noteNode(m_node->id, bi::tcp::endpoint(m_socket.remote_endpoint().address(), listenPort));
if (m_node->isOffline()) if (m_node->isOffline())
m_node->lastConnected = chrono::system_clock::now(); m_node->lastConnected = chrono::system_clock::now();
//
// TODO P2P: introduce map of nodes we've given to this node (if GetPeers/Peers stays in TCP) // // TODO: P2P introduce map of nodes we've given to this node (if GetPeers/Peers stays in TCP)
m_knownNodes.extendAll(m_node->index); m_knownNodes.extendAll(m_node->index);
m_knownNodes.unionWith(m_node->index); m_knownNodes.unionWith(m_node->index);
@ -334,7 +338,9 @@ bool Session::interpret(RLP const& _r)
// OK passed all our checks. Assume it's good. // OK passed all our checks. Assume it's good.
addRating(1000); addRating(1000);
m_server->noteNode(id, ep);
// TODO: P2P change to addNode()
// m_server->noteNode(id, ep);
clogS(NetTriviaDetail) << "New peer: " << ep << "(" << id .abridged()<< ")"; clogS(NetTriviaDetail) << "New peer: " << ep << "(" << id .abridged()<< ")";
CONTINUE:; CONTINUE:;
} }

Loading…
Cancel
Save