Browse Source

Pass 2 integrating node table. Pruning and merging old node lifecycle logic with new.

Begin moving node identification and authentication into Host so session can be directly-constructed with NodeInfo and is not created until after authentication.
Require session to be passed a valid node.
cl-refactor
subtly 10 years ago
parent
commit
da9668c0f5
  1. 191
      libp2p/Host.cpp
  2. 27
      libp2p/Host.h
  3. 34
      libp2p/NodeTable.cpp
  4. 5
      libp2p/NodeTable.h
  5. 58
      libp2p/Session.cpp
  6. 6
      libp2p/Session.h
  7. 2
      libwebthree/WebThree.cpp
  8. 2
      test/peer.cpp
  9. 2
      test/whisperTopic.cpp

191
libp2p/Host.cpp

@ -149,12 +149,8 @@ unsigned Host::protocolVersion() const
void Host::registerPeer(std::shared_ptr<Session> _s, CapDescs const& _caps) void Host::registerPeer(std::shared_ptr<Session> _s, CapDescs const& _caps)
{ {
#warning integration: todo rework so this is an exception assert(!!_s->m_node);
if (!_s->m_node || !_s->m_node->id) assert(!!_s->m_node->id);
{
cwarn << "Attempting to register a peer without node information!";
return;
}
{ {
RecursiveGuard l(x_peers); RecursiveGuard l(x_peers);
@ -182,8 +178,8 @@ void Host::seal(bytes& _b)
_b[7] = len & 0xff; _b[7] = len & 0xff;
} }
#warning integration: todo remove origin, ready, oldid. port to NodeTable. see Session.cpp#244,363 // TODO P2P: remove oldid. port to NodeTable. (see noteNode calls, Session.cpp#218,337)
shared_ptr<NodeInfo> Host::noteNode(NodeId _id, bi::tcp::endpoint _a, Origin _o, bool _ready, NodeId _oldId) shared_ptr<NodeInfo> Host::noteNode(NodeId _id, bi::tcp::endpoint _a, NodeId _oldId)
{ {
RecursiveGuard l(x_peers); RecursiveGuard l(x_peers);
if (_a.port() < 30300 || _a.port() > 30305) if (_a.port() < 30300 || _a.port() > 30305)
@ -195,8 +191,6 @@ shared_ptr<NodeInfo> Host::noteNode(NodeId _id, bi::tcp::endpoint _a, Origin _o,
_a = bi::tcp::endpoint(_a.address(), 0); _a = bi::tcp::endpoint(_a.address(), 0);
} }
// cnote << "NodeInfo:" << _id.abridged() << _a << (_ready ? "ready" : "used") << _oldId.abridged() << (m_nodes.count(_id) ? "[have]" : "[NEW]");
// First check for another node with the same connection credentials, and put it in oldId if found. // First check for another node with the same connection credentials, and put it in oldId if found.
if (!_oldId) if (!_oldId)
for (pair<h512, shared_ptr<NodeInfo>> const& n: m_nodes) for (pair<h512, shared_ptr<NodeInfo>> const& n: m_nodes)
@ -223,33 +217,21 @@ shared_ptr<NodeInfo> Host::noteNode(NodeId _id, bi::tcp::endpoint _a, Origin _o,
m_nodes[_id] = make_shared<NodeInfo>(); m_nodes[_id] = make_shared<NodeInfo>();
m_nodes[_id]->id = _id; m_nodes[_id]->id = _id;
m_nodes[_id]->index = i; m_nodes[_id]->index = i;
m_nodes[_id]->idOrigin = _o;
} }
else else
{
i = m_nodes[_id]->index; i = m_nodes[_id]->index;
m_nodes[_id]->idOrigin = max(m_nodes[_id]->idOrigin, _o);
}
m_nodes[_id]->address = _a; m_nodes[_id]->address = _a;
m_ready.extendAll(i);
m_private.extendAll(i); m_private.extendAll(i);
if (_ready)
m_ready += i;
else
m_ready -= i;
if (!_a.port() || (isPrivateAddress(_a.address()) && !m_netPrefs.localNetworking)) if (!_a.port() || (isPrivateAddress(_a.address()) && !m_netPrefs.localNetworking))
m_private += i; m_private += i;
else else
m_private -= i; m_private -= i;
// cnote << m_nodes[_id]->index << ":" << m_ready;
m_hadNewNodes = true;
return m_nodes[_id]; return m_nodes[_id];
} }
#warning integration: TBD caps in NodeTable/NodeEntry // TODO P2P: should be based on target
// TODO P2P: store caps in NodeTable/NodeEntry
Nodes Host::potentialPeers(RangeMask<unsigned> const& _known) Nodes Host::potentialPeers(RangeMask<unsigned> const& _known)
{ {
RecursiveGuard l(x_peers); RecursiveGuard l(x_peers);
@ -340,13 +322,7 @@ void Host::runAcceptor()
{ {
try try
{ {
try { doHandshake(m_socket.release());
clog(NetConnect) << "Accepted connection from " << m_socket->remote_endpoint();
} catch (...){}
bi::address remoteAddress = m_socket->remote_endpoint().address();
// Port defaults to 0 - we let the hello tell us which port the peer listens to
auto p = std::make_shared<Session>(this, std::move(*m_socket.release()), bi::tcp::endpoint(remoteAddress, 0));
p->start();
success = true; success = true;
} }
catch (Exception const& _e) catch (Exception const& _e)
@ -373,6 +349,16 @@ void Host::runAcceptor()
} }
} }
void Host::doHandshake(bi::tcp::socket* _socket, NodeId _egressNodeId)
{
try {
clog(NetConnect) << "Accepting connection for " << _socket->remote_endpoint();
} catch (...){}
auto p = std::make_shared<Session>(this, std::move(*_socket), m_nodes[_egressNodeId]);
p->start();
}
string Host::pocHost() string Host::pocHost()
{ {
vector<string> strs; vector<string> strs;
@ -380,56 +366,51 @@ string Host::pocHost()
return "poc-" + strs[1] + ".ethdev.com"; return "poc-" + strs[1] + ".ethdev.com";
} }
#warning integration: todo remove all connect() w/ addNode makeRequired (this requires pubkey) // TODO P2P: support for TCP+UDP when manually connecting
void Host::connect(std::string const& _addr, unsigned short _port) noexcept // TODO P2P: remove in favor of addNode(NodeId, string, uint16_t, bool _required = false)
void Host::connect(NodeId const& _node, std::string const& _addr, unsigned short _port) noexcept
{ {
if (!m_run) if (!m_run)
return; return;
assert(_node);
for (auto first: {true, false}) auto n = m_nodes[_node];
// TODO: refactor into async_resolve
m_ioService.post([=]()
{ {
try for (auto first: {true, false})
{ {
if (first) try
{ {
bi::tcp::resolver r(m_ioService); bi::tcp::endpoint ep;
connect(r.resolve({_addr, toString(_port)})->endpoint()); if (first)
{
bi::tcp::resolver r(m_ioService);
ep = r.resolve({_addr, toString(_port)})->endpoint();
}
else
ep = bi::tcp::endpoint(bi::address::from_string(_addr), _port);
if (!n)
m_nodes[_node] = make_shared<NodeInfo>();
m_nodes[_node]->id = _node;
m_nodes[_node]->address = ep;
connect(m_nodes[_node]);
break;
}
catch (Exception const& _e)
{
// Couldn't connect
clog(NetConnect) << "Bad host " << _addr << "\n" << diagnostic_information(_e);
}
catch (exception const& e)
{
// Couldn't connect
clog(NetConnect) << "Bad host " << _addr << " (" << e.what() << ")";
} }
else
connect(bi::tcp::endpoint(bi::address::from_string(_addr), _port));
break;
}
catch (Exception const& _e)
{
// Couldn't connect
clog(NetConnect) << "Bad host " << _addr << "\n" << diagnostic_information(_e);
}
catch (exception const& e)
{
// Couldn't connect
clog(NetConnect) << "Bad host " << _addr << " (" << e.what() << ")";
}
}
}
void Host::connect(bi::tcp::endpoint const& _ep)
{
if (!m_run)
return;
clog(NetConnect) << "Attempting single-shot connection to " << _ep;
bi::tcp::socket* s = new bi::tcp::socket(m_ioService);
s->async_connect(_ep, [=](boost::system::error_code const& ec)
{
if (ec)
clog(NetConnect) << "Connection refused to " << _ep << " (" << ec.message() << ")";
else
{
auto p = make_shared<Session>(this, std::move(*s), _ep);
clog(NetConnect) << "Connected to " << _ep;
p->start();
} }
delete s;
}); });
} }
@ -438,7 +419,7 @@ void Host::connect(std::shared_ptr<NodeInfo> const& _n)
if (!m_run) if (!m_run)
return; return;
// prevent concurrently connecting to a node; todo: better abstraction // prevent concurrently connecting to a node
NodeInfo *nptr = _n.get(); NodeInfo *nptr = _n.get();
{ {
Guard l(x_pendingNodeConns); Guard l(x_pendingNodeConns);
@ -450,7 +431,6 @@ void Host::connect(std::shared_ptr<NodeInfo> const& _n)
clog(NetConnect) << "Attempting connection to node" << _n->id.abridged() << "@" << _n->address << "from" << id().abridged(); clog(NetConnect) << "Attempting connection to node" << _n->id.abridged() << "@" << _n->address << "from" << id().abridged();
_n->lastAttempted = std::chrono::system_clock::now(); _n->lastAttempted = std::chrono::system_clock::now();
_n->failedAttempts++; _n->failedAttempts++;
m_ready -= _n->index;
bi::tcp::socket* s = new bi::tcp::socket(m_ioService); bi::tcp::socket* s = new bi::tcp::socket(m_ioService);
auto n = node(_n->id); auto n = node(_n->id);
@ -462,13 +442,12 @@ void Host::connect(std::shared_ptr<NodeInfo> const& _n)
clog(NetConnect) << "Connection refused to node" << _n->id.abridged() << "@" << _n->address << "(" << ec.message() << ")"; clog(NetConnect) << "Connection refused to node" << _n->id.abridged() << "@" << _n->address << "(" << ec.message() << ")";
_n->lastDisconnect = TCPError; _n->lastDisconnect = TCPError;
_n->lastAttempted = std::chrono::system_clock::now(); _n->lastAttempted = std::chrono::system_clock::now();
m_ready += _n->index;
} }
else else
{ {
clog(NetConnect) << "Connected to" << _n->id.abridged() << "@" << _n->address; clog(NetConnect) << "Connected to" << _n->id.abridged() << "@" << _n->address;
_n->lastConnected = std::chrono::system_clock::now(); _n->lastConnected = std::chrono::system_clock::now();
auto p = make_shared<Session>(this, std::move(*s), n, true); // true because we don't care about ids matched for now. Once we have permenant IDs this will matter a lot more and we can institute a safer mechanism. auto p = make_shared<Session>(this, std::move(*s), n);
p->start(); p->start();
} }
delete s; delete s;
@ -476,7 +455,7 @@ void Host::connect(std::shared_ptr<NodeInfo> const& _n)
m_pendingNodeConns.erase(nptr); m_pendingNodeConns.erase(nptr);
}); });
else else
clog(NetWarn) << "Trying to connect to node not in node table."; clog(NetWarn) << "Aborted connect. Node not in node table.";
} }
bool Host::havePeer(NodeId _id) const bool Host::havePeer(NodeId _id) const
@ -515,8 +494,8 @@ unsigned NodeInfo::fallbackSeconds() const
} }
} }
#warning integration: ---- grow/prunePeers // TODO P2P: rebuild noetable when localNetworking is enabled/disabled
#warning integration: todo grow/prune into 'maintainPeers' & evaluate reputation instead of availability. schedule via deadline timer. // TODO P2P: migrate grow/prunePeers into 'maintainPeers' & evaluate reputation instead of availability. schedule via deadline timer.
//void Host::growPeers() //void Host::growPeers()
//{ //{
// RecursiveGuard l(x_peers); // RecursiveGuard l(x_peers);
@ -585,14 +564,8 @@ PeerInfos Host::peers() const
if (!m_run) if (!m_run)
return PeerInfos(); return PeerInfos();
#warning integration: ---- pingAll. It is called every 30secs via deadline timer.
RecursiveGuard l(x_peers);
// if (_updatePing)
// {
// const_cast<Host*>(this)->pingAll();
// this_thread::sleep_for(chrono::milliseconds(200));
// }
std::vector<PeerInfo> ret; std::vector<PeerInfo> ret;
RecursiveGuard l(x_peers);
for (auto& i: m_peers) for (auto& i: m_peers)
if (auto j = i.second.lock()) if (auto j = i.second.lock())
if (j->m_socket.is_open()) if (j->m_socket.is_open())
@ -604,6 +577,9 @@ void Host::run(boost::system::error_code const&)
{ {
if (!m_run) if (!m_run)
{ {
// reset NodeTable
m_nodeTable.reset();
// stopping io service allows running manual network operations for shutdown // stopping io service allows running manual network operations for shutdown
// and also stops blocking worker thread, allowing worker thread to exit // and also stops blocking worker thread, allowing worker thread to exit
m_ioService.stop(); m_ioService.stop();
@ -612,24 +588,10 @@ void Host::run(boost::system::error_code const&)
m_timer.reset(); m_timer.reset();
return; return;
} }
#warning integration: ----
// m_lastTick += c_timerInterval;
// if (m_lastTick >= c_timerInterval * 10)
// {
// growPeers();
// prunePeers();
// m_lastTick = 0;
// }
if (m_hadNewNodes) for (auto p: m_peers)
{ if (auto pp = p.second.lock())
for (auto p: m_peers) pp->serviceNodesRequest();
if (auto pp = p.second.lock())
pp->serviceNodesRequest();
m_hadNewNodes = false;
}
if (chrono::steady_clock::now() - m_lastPing > chrono::seconds(30)) // ping every 30s. if (chrono::steady_clock::now() - m_lastPing > chrono::seconds(30)) // ping every 30s.
{ {
@ -675,20 +637,13 @@ void Host::startedWorking()
if (m_listenPort > 0) if (m_listenPort > 0)
runAcceptor(); runAcceptor();
#warning integration: ++++
if (!m_tcpPublic.address().is_unspecified()) if (!m_tcpPublic.address().is_unspecified())
m_nodeTable.reset(new NodeTable(m_ioService, m_key, m_listenPort, m_tcpPublic)); m_nodeTable.reset(new NodeTable(m_ioService, m_key, m_listenPort, m_tcpPublic));
else else
m_nodeTable.reset(new NodeTable(m_ioService, m_key, m_listenPort > 0 ? m_listenPort : 30303)); m_nodeTable.reset(new NodeTable(m_ioService, m_key, m_listenPort > 0 ? m_listenPort : 30303));
} }
#warning integration: ---- clog(NetNote) << "p2p.started id:" << id().abridged();
// // if m_public address is valid then add us to node list
// // todo: abstract empty() and emplace logic
// if (!m_tcpPublic.address().is_unspecified() && (m_nodes.empty() || m_nodes[m_nodesList[0]]->id != id()))
// noteNode(id(), m_tcpPublic, Origin::Perfect, false);
clog(NetNote) << "Id:" << id().abridged();
run(boost::system::error_code()); run(boost::system::error_code());
} }
@ -708,7 +663,7 @@ void Host::pingAll()
m_lastPing = chrono::steady_clock::now(); m_lastPing = chrono::steady_clock::now();
} }
#warning integration: todo save/restoreNodes // TODO P2P: integration: todo save/restoreNodes
bytes Host::saveNodes() const bytes Host::saveNodes() const
{ {
RLPStream nodes; RLPStream nodes;
@ -726,7 +681,7 @@ bytes Host::saveNodes() const
nodes << n.address.address().to_v4().to_bytes(); nodes << n.address.address().to_v4().to_bytes();
else else
nodes << n.address.address().to_v6().to_bytes(); nodes << n.address.address().to_v6().to_bytes();
nodes << n.address.port() << n.id << (int)n.idOrigin nodes << n.address.port() << n.id /* << (int)n.idOrigin */ << 0
<< chrono::duration_cast<chrono::seconds>(n.lastConnected.time_since_epoch()).count() << chrono::duration_cast<chrono::seconds>(n.lastConnected.time_since_epoch()).count()
<< chrono::duration_cast<chrono::seconds>(n.lastAttempted.time_since_epoch()).count() << chrono::duration_cast<chrono::seconds>(n.lastAttempted.time_since_epoch()).count()
<< n.failedAttempts << (unsigned)n.lastDisconnect << n.score << n.rating; << n.failedAttempts << (unsigned)n.lastDisconnect << n.score << n.rating;
@ -751,7 +706,7 @@ void Host::restoreNodes(bytesConstRef _b)
{ {
auto oldId = id(); auto oldId = id();
m_key = KeyPair(r[1].toHash<Secret>()); m_key = KeyPair(r[1].toHash<Secret>());
noteNode(id(), m_tcpPublic, Origin::Perfect, false, oldId); noteNode(id(), m_tcpPublic, oldId);
for (auto i: r[2]) for (auto i: r[2])
{ {
@ -763,8 +718,8 @@ void Host::restoreNodes(bytesConstRef _b)
auto id = (NodeId)i[2]; auto id = (NodeId)i[2];
if (!m_nodes.count(id)) if (!m_nodes.count(id))
{ {
auto o = (Origin)i[3].toInt<int>(); // auto o = (Origin)i[3].toInt<int>();
auto n = noteNode(id, ep, o, true); auto n = noteNode(id, ep);
n->lastConnected = chrono::system_clock::time_point(chrono::seconds(i[4].toInt<unsigned>())); n->lastConnected = chrono::system_clock::time_point(chrono::seconds(i[4].toInt<unsigned>()));
n->lastAttempted = chrono::system_clock::time_point(chrono::seconds(i[5].toInt<unsigned>())); n->lastAttempted = chrono::system_clock::time_point(chrono::seconds(i[5].toInt<unsigned>()));
n->failedAttempts = i[6].toInt<unsigned>(); n->failedAttempts = i[6].toInt<unsigned>();
@ -787,7 +742,7 @@ void Host::restoreNodes(bytesConstRef _b)
ep = bi::tcp::endpoint(bi::address_v4(i[0].toArray<byte, 4>()), i[1].toInt<short>()); ep = bi::tcp::endpoint(bi::address_v4(i[0].toArray<byte, 4>()), i[1].toInt<short>());
else else
ep = bi::tcp::endpoint(bi::address_v6(i[0].toArray<byte, 16>()), i[1].toInt<short>()); ep = bi::tcp::endpoint(bi::address_v6(i[0].toArray<byte, 16>()), i[1].toInt<short>());
auto n = noteNode(id, ep, Origin::Self, true); auto n = noteNode(id, ep);
} }
} }
} }

27
libp2p/Host.h

@ -51,15 +51,6 @@ namespace p2p
class Host; class Host;
enum class Origin
{
Unknown,
Self,
SelfThird,
PerfectThird,
Perfect,
};
struct NodeInfo struct NodeInfo
{ {
NodeId id; ///< Their id/public key. NodeId id; ///< Their id/public key.
@ -81,9 +72,6 @@ struct NodeInfo
// p2p: move to protocol-specific map // p2p: move to protocol-specific map
int score = 0; ///< All time cumulative. int score = 0; ///< All time cumulative.
int rating = 0; ///< Trending. int rating = 0; ///< Trending.
// p2p: remove
Origin idOrigin = Origin::Unknown; ///< How did we get to know this node's id?
// p2p: move to NodeEndpoint // p2p: move to NodeEndpoint
int secondsSinceLastConnected() const { return lastConnected == std::chrono::system_clock::time_point() ? -1 : (int)std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now() - lastConnected).count(); } int secondsSinceLastConnected() const { return lastConnected == std::chrono::system_clock::time_point() ? -1 : (int)std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now() - lastConnected).count(); }
@ -125,6 +113,7 @@ using Nodes = std::vector<NodeInfo>;
/** /**
* @brief The Host class * @brief The Host class
* Capabilities should be registered prior to startNetwork, since m_capabilities is not thread-safe. * Capabilities should be registered prior to startNetwork, since m_capabilities is not thread-safe.
* @todo determinePublic: ipv6, udp
*/ */
class Host: public Worker class Host: public Worker
{ {
@ -151,8 +140,8 @@ public:
/// Connect to a peer explicitly. /// Connect to a peer explicitly.
static std::string pocHost(); static std::string pocHost();
void connect(std::string const& _addr, unsigned short _port = 30303) noexcept; void connect(NodeId const& _node, std::string const& _addr, unsigned short _port = 30303) noexcept;
void connect(bi::tcp::endpoint const& _ep); void connect(NodeId const& _node, bi::tcp::endpoint const& _ep);
void connect(std::shared_ptr<NodeInfo> const& _n); void connect(std::shared_ptr<NodeInfo> const& _n);
/// @returns true iff we have a peer of the given id. /// @returns true iff we have a peer of the given id.
@ -209,6 +198,9 @@ private:
/// Called only from startedWorking(). /// Called only from startedWorking().
void runAcceptor(); void runAcceptor();
/// Handler for verifying handshake siganture before creating session. _egressNodeId is passed for outbound connections.
void doHandshake(bi::tcp::socket* _socket, NodeId _egressNodeId = NodeId());
void seal(bytes& _b); void seal(bytes& _b);
/// Called by Worker. Not thread-safe; to be called only by worker. /// Called by Worker. Not thread-safe; to be called only by worker.
@ -222,7 +214,7 @@ private:
/// Shutdown network. Not thread-safe; to be called only by worker. /// Shutdown network. Not thread-safe; to be called only by worker.
virtual void doneWorking(); virtual void doneWorking();
std::shared_ptr<NodeInfo> noteNode(NodeId _id, bi::tcp::endpoint _a, Origin _o, bool _ready, NodeId _oldId = NodeId()); std::shared_ptr<NodeInfo> noteNode(NodeId _id, bi::tcp::endpoint _a, NodeId _oldId = NodeId());
Nodes potentialPeers(RangeMask<unsigned> const& _known); Nodes potentialPeers(RangeMask<unsigned> const& _known);
bool m_run = false; ///< Whether network is running. bool m_run = false; ///< Whether network is running.
@ -252,8 +244,6 @@ private:
std::shared_ptr<NodeTable> m_nodeTable; ///< Node table (uses kademlia-like discovery). std::shared_ptr<NodeTable> m_nodeTable; ///< Node table (uses kademlia-like discovery).
std::map<CapDesc, unsigned> m_capIdealPeerCount; ///< Ideal peer count for capability. std::map<CapDesc, unsigned> m_capIdealPeerCount; ///< Ideal peer count for capability.
bool m_hadNewNodes = false;
mutable RecursiveMutex x_peers; mutable RecursiveMutex x_peers;
/// The nodes to which we are currently connected. /// The nodes to which we are currently connected.
@ -261,13 +251,12 @@ private:
mutable std::map<NodeId, std::weak_ptr<Session>> m_peers; mutable std::map<NodeId, std::weak_ptr<Session>> m_peers;
/// Nodes to which we may connect (or to which we have connected). /// Nodes to which we may connect (or to which we have connected).
/// TODO: does this need a lock? /// TODO: mutex; replace with nodeTable
std::map<NodeId, std::shared_ptr<NodeInfo> > m_nodes; std::map<NodeId, std::shared_ptr<NodeInfo> > m_nodes;
/// A list of node IDs. This contains every index from m_nodes; the order is guaranteed to remain the same. /// A list of node IDs. This contains every index from m_nodes; the order is guaranteed to remain the same.
std::vector<NodeId> m_nodesList; std::vector<NodeId> m_nodesList;
RangeMask<unsigned> m_ready; ///< Indices into m_nodesList over to which nodes we are not currently connected, connecting or otherwise ignoring.
RangeMask<unsigned> m_private; ///< Indices into m_nodesList over to which nodes are private. RangeMask<unsigned> m_private; ///< Indices into m_nodesList over to which nodes are private.
unsigned m_idealPeerCount = 5; ///< Ideal number of peers to be connected to. unsigned m_idealPeerCount = 5; ///< Ideal number of peers to be connected to.

34
libp2p/NodeTable.cpp

@ -239,29 +239,27 @@ void NodeTable::evict(shared_ptr<NodeEntry> _leastSeen, shared_ptr<NodeEntry> _n
ping(_leastSeen.get()); ping(_leastSeen.get());
} }
shared_ptr<NodeEntry> NodeTable::addNode(Public const& _pubk, bi::udp::endpoint const& _udp, bi::tcp::endpoint const& _tcp)
{
shared_ptr<NodeEntry> ret;
Guard l(x_nodes);
if (auto n = m_nodes[_pubk])
ret = n;
else
{
ret.reset(new NodeEntry(m_node, _pubk, _udp));
m_nodes[_pubk] = ret;
}
return move(ret);
}
void NodeTable::noteNode(Public const& _pubk, bi::udp::endpoint const& _endpoint) void NodeTable::noteNode(Public const& _pubk, bi::udp::endpoint const& _endpoint)
{ {
// Don't add ourself
if (_pubk == m_node.address()) if (_pubk == m_node.address())
return; return;
shared_ptr<NodeEntry> node; shared_ptr<NodeEntry> node(addNode(_pubk, _endpoint));
{
Guard l(x_nodes);
auto n = m_nodes.find(_pubk);
if (n == m_nodes.end())
{
node.reset(new NodeEntry(m_node, _pubk, _endpoint));
m_nodes[_pubk] = node;
// clog(NodeTableMessageSummary) << "Adding node to cache: " << _pubk;
}
else
{
node = n->second;
// clog(NodeTableMessageSummary) << "Found node in cache: " << _pubk;
}
}
// todo: sometimes node is nullptr here // todo: sometimes node is nullptr here
if (!!node) if (!!node)
noteNode(node); noteNode(node);

5
libp2p/NodeTable.h

@ -131,6 +131,9 @@ public:
static unsigned dist(NodeId const& _a, NodeId const& _b) { u512 d = _a ^ _b; unsigned ret; for (ret = 0; d >>= 1; ++ret) {}; return ret; } static unsigned dist(NodeId const& _a, NodeId const& _b) { u512 d = _a ^ _b; unsigned ret; for (ret = 0; d >>= 1; ++ret) {}; return ret; }
/// Add node details and attempt adding to node table if node responds to ping. NodeEntry will immediately be returned and may be used for required connectivity.
std::shared_ptr<NodeEntry> addNode(Public const& _pubk, bi::udp::endpoint const& _udp, bi::tcp::endpoint const& _tcp = bi::tcp::endpoint());
void join(); void join();
NodeEntry root() const { return NodeEntry(m_node, m_node.publicKey(), m_node.endpoint.udp); } NodeEntry root() const { return NodeEntry(m_node, m_node.publicKey(), m_node.endpoint.udp); }
@ -192,7 +195,7 @@ protected:
Secret m_secret; ///< This nodes secret key. Secret m_secret; ///< This nodes secret key.
mutable Mutex x_nodes; ///< Mutable for thread-safe copy in nodes() const. mutable Mutex x_nodes; ///< Mutable for thread-safe copy in nodes() const.
std::map<NodeId, std::shared_ptr<NodeEntry>> m_nodes; ///< NodeId -> Node table (most common lookup path) std::map<NodeId, std::shared_ptr<NodeEntry>> m_nodes; ///< NodeId -> Node table
mutable Mutex x_state; mutable Mutex x_state;
std::array<NodeBucket, s_bins> m_state; ///< State table of binned nodes. std::array<NodeBucket, s_bins> m_state; ///< State table of binned nodes.

58
libp2p/Session.cpp

@ -36,23 +36,11 @@ using namespace dev::p2p;
#endif #endif
#define clogS(X) dev::LogOutputStream<X, true>(false) << "| " << std::setw(2) << m_socket.native_handle() << "] " #define clogS(X) dev::LogOutputStream<X, true>(false) << "| " << std::setw(2) << m_socket.native_handle() << "] "
Session::Session(Host* _s, bi::tcp::socket _socket, bi::tcp::endpoint const& _manual): Session::Session(Host* _s, bi::tcp::socket _socket, std::shared_ptr<NodeInfo> const& _n):
m_server(_s),
m_socket(std::move(_socket)),
m_node(nullptr),
m_manualEndpoint(_manual) // NOTE: the port on this shouldn't be used if it's zero.
{
m_lastReceived = m_connect = std::chrono::steady_clock::now();
m_info = PeerInfo({NodeId(), "?", m_manualEndpoint.address().to_string(), 0, std::chrono::steady_clock::duration(0), CapDescSet(), 0, map<string, string>()});
}
Session::Session(Host* _s, bi::tcp::socket _socket, std::shared_ptr<NodeInfo> const& _n, bool _force):
m_server(_s), m_server(_s),
m_socket(std::move(_socket)), m_socket(std::move(_socket)),
m_node(_n), m_node(_n),
m_manualEndpoint(_n->address), m_manualEndpoint(_n->address)
m_force(_force)
{ {
m_lastReceived = m_connect = std::chrono::steady_clock::now(); m_lastReceived = m_connect = std::chrono::steady_clock::now();
m_info = PeerInfo({m_node->id, "?", _n->address.address().to_string(), _n->address.port(), std::chrono::steady_clock::duration(0), CapDescSet(), 0, map<string, string>()}); m_info = PeerInfo({m_node->id, "?", _n->address.address().to_string(), _n->address.port(), std::chrono::steady_clock::duration(0), CapDescSet(), 0, map<string, string>()});
@ -60,13 +48,9 @@ Session::Session(Host* _s, bi::tcp::socket _socket, std::shared_ptr<NodeInfo> co
Session::~Session() Session::~Session()
{ {
if (m_node) // TODO P2P: revisit (refactored from previous logic)
{ if (m_node && !(id() && !isPermanentProblem(m_node->lastDisconnect) && !m_node->dead))
if (id() && !isPermanentProblem(m_node->lastDisconnect) && !m_node->dead) m_node->lastConnected = m_node->lastAttempted - chrono::seconds(1);
m_server->m_ready += m_node->index;
else
m_node->lastConnected = m_node->lastAttempted - chrono::seconds(1);
}
// Read-chain finished for one reason or another. // Read-chain finished for one reason or another.
for (auto& i: m_capabilities) for (auto& i: m_capabilities)
@ -103,6 +87,7 @@ int Session::rating() const
return m_node->rating; return m_node->rating;
} }
// TODO P2P: integration: session->? should be unavailable when socket isn't open
bi::tcp::endpoint Session::endpoint() const bi::tcp::endpoint Session::endpoint() const
{ {
if (m_socket.is_open() && m_node) if (m_socket.is_open() && m_node)
@ -132,6 +117,7 @@ template <class T> vector<T> randomSelection(vector<T> const& _t, unsigned _n)
return ret; return ret;
} }
// TODO P2P: integration: replace w/asio post -> serviceNodesRequest()
void Session::ensureNodesRequested() void Session::ensureNodesRequested()
{ {
if (isOpen() && !m_weRequestedNodes) if (isOpen() && !m_weRequestedNodes)
@ -207,25 +193,8 @@ bool Session::interpret(RLP const& _r)
return true; return true;
} }
if (m_node && m_node->id != id) assert(!!m_node);
{ assert(!!m_node->id);
if (m_force || m_node->idOrigin <= Origin::SelfThird)
// SECURITY: We're forcing through the new ID, despite having been told
clogS(NetWarn) << "Connected to node, but their ID has changed since last time. This could indicate a MitM attack. Allowing anyway...";
else
{
clogS(NetWarn) << "Connected to node, but their ID has changed since last time. This could indicate a MitM attack. Disconnecting.";
disconnect(UnexpectedIdentity);
return true;
}
if (m_server->havePeer(id))
{
m_node->dead = true;
disconnect(DuplicatePeer);
return true;
}
}
if (m_server->havePeer(id)) if (m_server->havePeer(id))
{ {
@ -241,9 +210,14 @@ bool Session::interpret(RLP const& _r)
return true; return true;
} }
m_node = m_server->noteNode(id, bi::tcp::endpoint(m_socket.remote_endpoint().address(), listenPort), Origin::Self, false, !m_node || m_node->id == id ? NodeId() : m_node->id); // TODO P2P: first pass, implement signatures. if signature fails, drop connection. if egress, flag node's endpoint as stale.
// TODO P2P: remove oldid
// TODO P2P: with encrypted transport the handshake will fail and we won't get here
m_node = m_server->noteNode(id, bi::tcp::endpoint(m_socket.remote_endpoint().address(), listenPort), m_node->id);
if (m_node->isOffline()) if (m_node->isOffline())
m_node->lastConnected = chrono::system_clock::now(); m_node->lastConnected = chrono::system_clock::now();
// TODO P2P: introduce map of nodes we've given to this node (if GetPeers/Peers stays in TCP)
m_knownNodes.extendAll(m_node->index); m_knownNodes.extendAll(m_node->index);
m_knownNodes.unionWith(m_node->index); m_knownNodes.unionWith(m_node->index);
@ -360,7 +334,7 @@ bool Session::interpret(RLP const& _r)
// OK passed all our checks. Assume it's good. // OK passed all our checks. Assume it's good.
addRating(1000); addRating(1000);
m_server->noteNode(id, ep, m_node->idOrigin == Origin::Perfect ? Origin::PerfectThird : Origin::SelfThird, true); m_server->noteNode(id, ep);
clogS(NetTriviaDetail) << "New peer: " << ep << "(" << id .abridged()<< ")"; clogS(NetTriviaDetail) << "New peer: " << ep << "(" << id .abridged()<< ")";
CONTINUE:; CONTINUE:;
} }

6
libp2p/Session.h

@ -51,8 +51,7 @@ class Session: public std::enable_shared_from_this<Session>
friend class HostCapabilityFace; friend class HostCapabilityFace;
public: public:
Session(Host* _server, bi::tcp::socket _socket, std::shared_ptr<NodeInfo> const& _n, bool _force = false); Session(Host* _server, bi::tcp::socket _socket, std::shared_ptr<NodeInfo> const& _n);
Session(Host* _server, bi::tcp::socket _socket, bi::tcp::endpoint const& _manual);
virtual ~Session(); virtual ~Session();
void start(); void start();
@ -113,9 +112,8 @@ private:
PeerInfo m_info; ///< Dynamic information about this peer. PeerInfo m_info; ///< Dynamic information about this peer.
unsigned m_protocolVersion = 0; ///< The protocol version of the peer. unsigned m_protocolVersion = 0; ///< The protocol version of the peer.
std::shared_ptr<NodeInfo> m_node; ///< The NodeInfo object. Might be null if we constructed using a bare address/port. std::shared_ptr<NodeInfo> m_node; ///< The NodeInfo object.
bi::tcp::endpoint m_manualEndpoint; ///< The endpoint as specified by the constructor. bi::tcp::endpoint m_manualEndpoint; ///< The endpoint as specified by the constructor.
bool m_force = false; ///< If true, ignore IDs being different. This could open you up to MitM attacks.
bool m_dropped = false; ///< If true, we've already divested ourselves of this peer. We're just waiting for the reads & writes to fail before the shared_ptr goes OOS and the destructor kicks in. bool m_dropped = false; ///< If true, we've already divested ourselves of this peer. We're just waiting for the reads & writes to fail before the shared_ptr goes OOS and the destructor kicks in.
bool m_theyRequestedNodes = false; ///< Has the peer requested nodes from us without receiveing an answer from us? bool m_theyRequestedNodes = false; ///< Has the peer requested nodes from us without receiveing an answer from us?

2
libwebthree/WebThree.cpp

@ -102,5 +102,5 @@ void WebThreeDirect::restoreNodes(bytesConstRef _saved)
void WebThreeDirect::connect(std::string const& _seedHost, unsigned short _port) void WebThreeDirect::connect(std::string const& _seedHost, unsigned short _port)
{ {
m_net.connect(_seedHost, _port); m_net.connect(NodeId(), _seedHost, _port);
} }

2
test/peer.cpp

@ -49,7 +49,7 @@ int peerTest(int argc, char** argv)
Host ph("Test", NetworkPreferences(listenPort)); Host ph("Test", NetworkPreferences(listenPort));
if (!remoteHost.empty()) if (!remoteHost.empty())
ph.connect(remoteHost, remotePort); ph.connect(NodeId(), remoteHost, remotePort);
for (int i = 0; ; ++i) for (int i = 0; ; ++i)
{ {

2
test/whisperTopic.cpp

@ -72,7 +72,7 @@ BOOST_AUTO_TEST_CASE(topic)
this_thread::sleep_for(chrono::milliseconds(500)); this_thread::sleep_for(chrono::milliseconds(500));
ph.start(); ph.start();
this_thread::sleep_for(chrono::milliseconds(500)); this_thread::sleep_for(chrono::milliseconds(500));
ph.connect("127.0.0.1", 50303); ph.connect(NodeId(), "127.0.0.1", 50303);
KeyPair us = KeyPair::create(); KeyPair us = KeyPair::create();
for (int i = 0; i < 10; ++i) for (int i = 0; i < 10; ++i)

Loading…
Cancel
Save