Browse Source

cleanup/prep endpoint semantics for #1558 and #1557

cl-refactor
subtly 10 years ago
parent
commit
49c4bba993
  1. 2
      alethzero/MainWin.cpp
  2. 13
      libp2p/Common.cpp
  3. 27
      libp2p/Common.h
  4. 83
      libp2p/Host.cpp
  5. 7
      libp2p/Host.h
  6. 59
      libp2p/NodeTable.cpp
  7. 63
      libp2p/NodeTable.h
  8. 2
      libp2p/Peer.h
  9. 12
      libp2p/Session.cpp
  10. 4
      libwebthree/WebThree.cpp
  11. 16
      test/net.cpp
  12. 16
      test/peer.cpp

2
alethzero/MainWin.cpp

@ -1034,7 +1034,7 @@ void Main::refreshNetwork()
for (p2p::Peer const& i: ns)
ui->nodes->insertItem(sessions.count(i.id) ? 0 : ui->nodes->count(), QString("[%1 %3] %2 - ( =%5s | /%4s%6 ) - *%7 $%8")
.arg(QString::fromStdString(i.id.abridged()))
.arg(QString::fromStdString(i.peerEndpoint().address().to_string()))
.arg(QString::fromStdString(i.endpoint.address.to_string()))
.arg(i.id == web3()->id() ? "self" : sessions.count(i.id) ? sessions[i.id] : "disconnected")
.arg(i.isOffline() ? " | " + QString::fromStdString(reasonOf(i.lastDisconnect())) + " | " + QString::number(i.failedAttempts()) + "x" : "")
.arg(i.rating())

13
libp2p/Common.cpp

@ -27,6 +27,8 @@ using namespace dev::p2p;
const unsigned dev::p2p::c_protocolVersion = 3;
const unsigned dev::p2p::c_defaultIPPort = 30303;
bool dev::p2p::NodeIPEndpoint::test_allowLocal = false;
bool p2p::isPublicAddress(std::string const& _addressToCheck)
{
return _addressToCheck.empty() ? false : isPublicAddress(bi::address::from_string(_addressToCheck));
@ -111,8 +113,13 @@ std::string p2p::reasonOf(DisconnectReason _r)
}
}
void Node::cullEndpoint()
namespace dev {
std::ostream& operator<<(std::ostream& _out, dev::p2p::NodeIPEndpoint const& _ep)
{
if (!isPublicAddress(endpoint.tcp.address()) && isPublicAddress(endpoint.udp.address()))
endpoint.tcp.address(endpoint.udp.address());
_out << _ep.address << _ep.udpPort << _ep.tcpPort;
return _out;
}
}

27
libp2p/Common.h

@ -156,29 +156,31 @@ using PeerSessionInfos = std::vector<PeerSessionInfo>;
*/
struct NodeIPEndpoint
{
NodeIPEndpoint(): udp(bi::udp::endpoint()), tcp(bi::tcp::endpoint()) {}
NodeIPEndpoint(bi::udp::endpoint _udp): udp(_udp) {}
NodeIPEndpoint(bi::tcp::endpoint _tcp): tcp(_tcp) {}
NodeIPEndpoint(bi::udp::endpoint _udp, bi::tcp::endpoint _tcp): udp(_udp), tcp(_tcp) {}
static bool test_allowLocal;
bi::udp::endpoint udp;
bi::tcp::endpoint tcp;
NodeIPEndpoint(): address() {}
NodeIPEndpoint(bi::address _addr, uint16_t _udp, uint16_t _tcp): address(_addr), udpPort(_udp), tcpPort(_tcp) {}
operator bool() const { return !udp.address().is_unspecified() || !tcp.address().is_unspecified(); }
bi::address address;
uint16_t udpPort = 0;
uint16_t tcpPort = 0;
operator bi::udp::endpoint() const { return std::move(bi::udp::endpoint(address, udpPort)); }
operator bi::tcp::endpoint() const { return std::move(bi::tcp::endpoint(address, tcpPort)); }
operator bool() const { return !address.is_unspecified() && udpPort > 0 && tcpPort > 0; }
bool isValid() const { return NodeIPEndpoint::test_allowLocal ? true : isPublicAddress(address); }
};
struct Node
{
Node(): endpoint(NodeIPEndpoint()) {};
Node(Public _pubk, NodeIPEndpoint _ip, bool _required = false): id(_pubk), endpoint(_ip), required(_required) {}
Node(Public _pubk, bi::udp::endpoint _udp, bool _required = false): Node(_pubk, NodeIPEndpoint(_udp), _required) {}
virtual NodeId const& address() const { return id; }
virtual Public const& publicKey() const { return id; }
/// Adopt UDP address for TCP if TCP isn't public and UDP is. (to be removed when protocol is updated for nat)
void cullEndpoint();
NodeId id;
/// Endpoints by which we expect to reach node.
@ -192,4 +194,7 @@ struct Node
};
}
/// Simple stream output for a NodeIPEndpoint.
std::ostream& operator<<(std::ostream& _out, dev::p2p::NodeIPEndpoint const& _ep);
}

83
libp2p/Host.cpp

@ -184,7 +184,7 @@ void Host::startPeerSession(Public const& _id, RLP const& _rlp, RLPXFrameIO* _io
p = m_peers[_id];
if (p->isOffline())
p->m_lastConnected = std::chrono::system_clock::now();
p->endpoint.tcp.address(_endpoint.address());
p->endpoint.address = _endpoint.address();
auto protocolVersion = _rlp[0].toInt<unsigned>();
auto clientVersion = _rlp[1].toString();
@ -231,7 +231,7 @@ void Host::startPeerSession(Public const& _id, RLP const& _rlp, RLPXFrameIO* _io
}
clog(NetNote) << "p2p.host.peer.register" << _id.abridged();
StructuredLogger::p2pConnected(_id.abridged(), ps->m_peer->peerEndpoint(), ps->m_peer->m_lastConnected, clientVersion, peerCount());
StructuredLogger::p2pConnected(_id.abridged(), ps->m_peer->endpoint, ps->m_peer->m_lastConnected, clientVersion, peerCount());
}
void Host::onNodeTableEvent(NodeId const& _n, NodeTableEventType const& _e)
@ -251,16 +251,14 @@ void Host::onNodeTableEvent(NodeId const& _n, NodeTableEventType const& _e)
p = m_peers[_n];
else
{
// TODO p2p: construct peer from node
p.reset(new Peer());
p->id = _n;
p->endpoint = NodeIPEndpoint(n.endpoint.udp, n.endpoint.tcp);
p->required = n.required;
m_peers[_n] = p;
clog(NetNote) << "p2p.host.peers.events.peersAdded " << _n << "udp:" << p->endpoint.udp.address() << "tcp:" << p->endpoint.tcp.address();
clog(NetNote) << "p2p.host.peers.events.peersAdded " << _n << p->endpoint;
}
p->endpoint.tcp = n.endpoint.tcp;
p->endpoint = n.endpoint;
}
// TODO: Implement similar to discover. Attempt connecting to nodes
@ -381,7 +379,7 @@ string Host::pocHost()
return "poc-" + strs[1] + ".ethdev.com";
}
void Host::addNode(NodeId const& _node, bi::address const& _addr, unsigned short _udpNodePort, unsigned short _tcpPeerPort)
void Host::addNode(NodeId const& _node, NodeIPEndpoint const& _endpoint)
{
// return if network is stopped while waiting on Host::run() or nodeTable to start
while (!haveNetwork())
@ -390,26 +388,16 @@ void Host::addNode(NodeId const& _node, bi::address const& _addr, unsigned short
else
return;
if (_tcpPeerPort < 30300 || _tcpPeerPort > 30305)
cwarn << "Non-standard port being recorded: " << _tcpPeerPort;
if (_tcpPeerPort >= /*49152*/32768)
{
cwarn << "Private port being recorded - setting to 0";
_tcpPeerPort = 0;
}
if (_endpoint.tcpPort < 30300 || _endpoint.tcpPort > 30305)
clog(NetConnect) << "Non-standard port being recorded: " << _endpoint.tcpPort;
if (m_nodeTable)
m_nodeTable->addNode(Node(_node, NodeIPEndpoint(bi::udp::endpoint(_addr, _udpNodePort), bi::tcp::endpoint(_addr, _tcpPeerPort))));
m_nodeTable->addNode(Node(_node, _endpoint));
}
void Host::requirePeer(NodeId const& _n, bi::address const& _udpAddr, unsigned short _udpPort, bi::address const& _tcpAddr, unsigned short _tcpPort)
void Host::requirePeer(NodeId const& _n, NodeIPEndpoint const& _endpoint)
{
auto naddr = _udpAddr;
auto paddr = _tcpAddr.is_unspecified() ? naddr : _tcpAddr;
auto udp = bi::udp::endpoint(naddr, _udpPort);
auto tcp = bi::tcp::endpoint(paddr, _tcpPort ? _tcpPort : _udpPort);
Node node(_n, NodeIPEndpoint(udp, tcp));
Node node(_n, _endpoint, true);
if (_n)
{
// add or replace peer
@ -425,8 +413,7 @@ void Host::requirePeer(NodeId const& _n, bi::address const& _udpAddr, unsigned s
p->required = true;
m_peers[_n] = p;
}
p->endpoint.udp = node.endpoint.udp;
p->endpoint.tcp = node.endpoint.tcp;
p->endpoint = node.endpoint;
}
connect(p);
}
@ -441,7 +428,7 @@ void Host::requirePeer(NodeId const& _n, bi::address const& _udpAddr, unsigned s
{
if (!_ec && m_nodeTable)
if (auto n = m_nodeTable->node(_n))
requirePeer(n.id, n.endpoint.udp.address(), n.endpoint.udp.port(), n.endpoint.tcp.address(), n.endpoint.tcp.port());
requirePeer(n.id, n.endpoint);
});
}
}
@ -482,22 +469,23 @@ void Host::connect(std::shared_ptr<Peer> const& _p)
m_pendingPeerConns.insert(nptr);
}
clog(NetConnect) << "Attempting connection to node" << _p->id.abridged() << "@" << _p->peerEndpoint() << "from" << id().abridged();
bi::tcp::endpoint ep(_p->endpoint);
clog(NetConnect) << "Attempting connection to node" << _p->id.abridged() << "@" << ep << "from" << id().abridged();
auto socket = make_shared<RLPXSocket>(new bi::tcp::socket(m_ioService));
socket->ref().async_connect(_p->peerEndpoint(), [=](boost::system::error_code const& ec)
socket->ref().async_connect(ep, [=](boost::system::error_code const& ec)
{
_p->m_lastAttempted = std::chrono::system_clock::now();
_p->m_failedAttempts++;
if (ec)
{
clog(NetConnect) << "Connection refused to node" << _p->id.abridged() << "@" << _p->peerEndpoint() << "(" << ec.message() << ")";
clog(NetConnect) << "Connection refused to node" << _p->id.abridged() << "@" << ep << "(" << ec.message() << ")";
// Manually set error (session not present)
_p->m_lastDisconnect = TCPError;
}
else
{
clog(NetConnect) << "Connecting to" << _p->id.abridged() << "@" << _p->peerEndpoint();
clog(NetConnect) << "Connecting to" << _p->id.abridged() << "@" << ep;
auto handshake = make_shared<RLPXHandshake>(this, socket, _p->id);
{
Guard l(x_connecting);
@ -637,7 +625,7 @@ void Host::startedWorking()
else
clog(NetNote) << "p2p.start.notice id:" << id().abridged() << "TCP Listen port is invalid or unavailable.";
shared_ptr<NodeTable> nodeTable(new NodeTable(m_ioService, m_alias, bi::address::from_string(listenAddress()), listenPort()));
shared_ptr<NodeTable> nodeTable(new NodeTable(m_ioService, m_alias, NodeIPEndpoint(bi::address::from_string(listenAddress()), listenPort(), listenPort())));
nodeTable->setEventHandler(new HostNodeTableHandler(*this));
m_nodeTable = nodeTable;
restoreNetwork(&m_restoreNetwork);
@ -696,11 +684,11 @@ bytes Host::saveNetwork() const
{
// Only save peers which have connected within 2 days, with properly-advertised port and public IP address
// todo: e2e ipv6 support
bi::tcp::endpoint endpoint(p.peerEndpoint());
bi::tcp::endpoint endpoint(p.endpoint);
if (!endpoint.address().is_v4())
continue;
if (chrono::system_clock::now() - p.m_lastConnected < chrono::seconds(3600 * 48) && endpoint.port() > 0 && endpoint.port() < /*49152*/32768 && p.id != id() && !isPrivateAddress(p.endpoint.udp.address()) && !isPrivateAddress(endpoint.address()))
if (chrono::system_clock::now() - p.m_lastConnected < chrono::seconds(3600 * 48) && endpoint.port() > 0 && p.id != id() && (p.required || p.endpoint.isValid()))
{
network.appendList(10);
network << endpoint.port() << p.id << p.required
@ -715,14 +703,14 @@ bytes Host::saveNetwork() const
{
auto state = m_nodeTable->snapshot();
state.sort();
for (auto const& s: state)
for (auto const& entry: state)
{
network.appendList(3);
if (s.endpoint.tcp.address().is_v4())
network << s.endpoint.tcp.address().to_v4().to_bytes();
if (entry.endpoint.address.is_v4())
network << entry.endpoint.address.to_v4().to_bytes();
else
network << s.endpoint.tcp.address().to_v6().to_bytes();
network << s.endpoint.tcp.port() << s.id;
network << entry.endpoint.address.to_v6().to_bytes();
network << entry.endpoint.tcpPort << entry.id;
count++;
}
}
@ -755,36 +743,33 @@ void Host::restoreNetwork(bytesConstRef _b)
for (auto i: r[2])
{
// todo: e2e ipv6 support
// bi::tcp::endpoint(bi::address_v6(i[0].toArray<byte, 16>()), i[1].toInt<short>());
if (i[0].itemCount() != 4)
continue;
bi::tcp::endpoint tcp;
bi::udp::endpoint udp;
tcp = bi::tcp::endpoint(bi::address_v4(i[0].toArray<byte, 4>()), i[1].toInt<short>());
udp = bi::udp::endpoint(bi::address_v4(i[0].toArray<byte, 4>()), i[1].toInt<short>());
if (isPrivateAddress(tcp.address()) || isPrivateAddress(udp.address()))
// todo: ipv6, bi::address_v6(i[0].toArray<byte, 16>()
NodeIPEndpoint ep(bi::address_v4(i[0].toArray<byte, 4>()), i[1].toInt<short>(), i[1].toInt<short>());
bool required = i[3].toInt<bool>();
if (!ep.isValid() && !required)
continue;
auto id = (NodeId)i[2];
if (i.itemCount() == 3)
m_nodeTable->addNode(id, udp, tcp);
m_nodeTable->addNode(id, ep);
else if (i.itemCount() == 10)
{
shared_ptr<Peer> p = make_shared<Peer>();
p->id = id;
p->required = i[3].toInt<bool>();
p->required = required;
p->m_lastConnected = chrono::system_clock::time_point(chrono::seconds(i[4].toInt<unsigned>()));
p->m_lastAttempted = chrono::system_clock::time_point(chrono::seconds(i[5].toInt<unsigned>()));
p->m_failedAttempts = i[6].toInt<unsigned>();
p->m_lastDisconnect = (DisconnectReason)i[7].toInt<unsigned>();
p->m_score = (int)i[8].toInt<unsigned>();
p->m_rating = (int)i[9].toInt<unsigned>();
p->endpoint.tcp = tcp;
p->endpoint.udp = udp;
p->endpoint = ep;
m_peers[p->id] = p;
if (p->required)
requirePeer(p->id, p->endpoint.udp.address(), p->endpoint.udp.port());
requirePeer(p->id, ep);
else
m_nodeTable->addNode(*p.get());
}

7
libp2p/Host.h

@ -103,10 +103,13 @@ public:
template <class T> std::shared_ptr<T> cap() const { try { return std::static_pointer_cast<T>(m_capabilities.at(std::make_pair(T::staticName(), T::staticVersion()))); } catch (...) { return nullptr; } }
/// Add node as a peer candidate. Node is added if discovery ping is successful and table has capacity.
void addNode(NodeId const& _node, bi::address const& _addr, unsigned short _udpPort, unsigned short _tcpPort);
void addNode(NodeId const& _node, NodeIPEndpoint const& _endpoint);
/// Create Peer and attempt keeping peer connected.
void requirePeer(NodeId const& _node, bi::address const& _udpAddr, unsigned short _udpPort, bi::address const& _tcpAddr = bi::address(), unsigned short _tcpPort = 0);
void requirePeer(NodeId const& _node, NodeIPEndpoint const& _endpoint);
/// Create Peer and attempt keeping peer connected.
void requirePeer(NodeId const& _node, bi::address const& _addr, unsigned short _udpPort, unsigned short _tcpPort) { requirePeer(_node, NodeIPEndpoint(_addr, _udpPort, _tcpPort)); }
/// Note peer as no longer being required.
void relinquishPeer(NodeId const& _node);

59
libp2p/NodeTable.cpp

@ -25,13 +25,12 @@ using namespace dev;
using namespace dev::p2p;
NodeEntry::NodeEntry(Node _src, Public _pubk, NodeIPEndpoint _gw): Node(_pubk, _gw), distance(NodeTable::distance(_src.id,_pubk)) {}
NodeEntry::NodeEntry(Node _src, Public _pubk, bi::udp::endpoint _udp): Node(_pubk, NodeIPEndpoint(_udp)), distance(NodeTable::distance(_src.id,_pubk)) {}
NodeTable::NodeTable(ba::io_service& _io, KeyPair _alias, bi::address const& _udpAddress, uint16_t _udp):
m_node(Node(_alias.pub(), bi::udp::endpoint(_udpAddress, _udp))),
NodeTable::NodeTable(ba::io_service& _io, KeyPair const& _alias, NodeIPEndpoint const& _endpoint):
m_node(Node(_alias.pub(), _endpoint)),
m_secret(_alias.sec()),
m_io(_io),
m_socket(new NodeSocket(m_io, *this, m_node.endpoint.udp)),
m_socket(new NodeSocket(m_io, *this, (bi::udp::endpoint)m_node.endpoint)),
m_socketPointer(m_socket.get()),
m_bucketRefreshTimer(m_io),
m_evictionCheckTimer(m_io)
@ -62,9 +61,9 @@ void NodeTable::processEvents()
m_nodeEventHandler->processEvents();
}
shared_ptr<NodeEntry> NodeTable::addNode(Public const& _pubk, bi::udp::endpoint const& _udp, bi::tcp::endpoint const& _tcp)
shared_ptr<NodeEntry> NodeTable::addNode(Public const& _pubk, NodeIPEndpoint const& _ep)
{
auto node = Node(_pubk, NodeIPEndpoint(_udp, _tcp));
auto node = Node(_pubk, _ep);
return addNode(node);
}
@ -72,7 +71,7 @@ shared_ptr<NodeEntry> NodeTable::addNode(Node const& _node)
{
// re-enable tcp checks when NAT hosts are handled by discover
// we handle when tcp endpoint is 0 below
if (_node.endpoint.udp.address().to_string() == "0.0.0.0")
if (_node.endpoint.address.to_string() == "0.0.0.0")
{
clog(NodeTableWarn) << "addNode Failed. Invalid UDP address 0.0.0.0 for" << _node.id.abridged();
return move(shared_ptr<NodeEntry>());
@ -81,12 +80,12 @@ shared_ptr<NodeEntry> NodeTable::addNode(Node const& _node)
// ping address to recover nodeid if nodeid is empty
if (!_node.id)
{
clog(NodeTableConnect) << "Sending public key discovery Ping to" << _node.endpoint.udp << "(Advertising:" << m_node.endpoint.udp << ")";
clog(NodeTableConnect) << "Sending public key discovery Ping to" << (bi::udp::endpoint)_node.endpoint << "(Advertising:" << (bi::udp::endpoint)m_node.endpoint << ")";
{
Guard l(x_pubkDiscoverPings);
m_pubkDiscoverPings[_node.endpoint.udp.address()] = std::chrono::steady_clock::now();
m_pubkDiscoverPings[_node.endpoint.address] = std::chrono::steady_clock::now();
}
PingNode p(_node.endpoint.udp, m_node.endpoint.udp.address().to_string(), m_node.endpoint.udp.port());
PingNode p((bi::udp::endpoint)_node.endpoint, m_node.endpoint.address.to_string(), m_node.endpoint.udpPort);
p.sign(m_secret);
m_socketPointer->send(p);
return move(shared_ptr<NodeEntry>());
@ -98,11 +97,10 @@ shared_ptr<NodeEntry> NodeTable::addNode(Node const& _node)
return m_nodes[_node.id];
}
shared_ptr<NodeEntry> ret(new NodeEntry(m_node, _node.id, NodeIPEndpoint(_node.endpoint.udp, _node.endpoint.tcp)));
shared_ptr<NodeEntry> ret(new NodeEntry(m_node, _node.id, _node.endpoint));
m_nodes[_node.id] = ret;
ret->cullEndpoint();
clog(NodeTableConnect) << "addNode pending for" << _node.endpoint.udp << _node.endpoint.tcp;
PingNode p(_node.endpoint.udp, m_node.endpoint.udp.address().to_string(), m_node.endpoint.udp.port());
clog(NodeTableConnect) << "addNode pending for" << _node.endpoint;
PingNode p(_node.endpoint, m_node.endpoint.address.to_string(), m_node.endpoint.udpPort);
p.sign(m_secret);
m_socketPointer->send(p);
return ret;
@ -143,7 +141,7 @@ Node NodeTable::node(NodeId const& _id)
if (m_nodes.count(_id))
{
auto entry = m_nodes[_id];
Node n(_id, NodeIPEndpoint(entry->endpoint.udp, entry->endpoint.tcp), entry->required);
Node n(_id, entry->endpoint, entry->required);
return move(n);
}
return move(Node());
@ -176,7 +174,7 @@ void NodeTable::discover(NodeId _node, unsigned _round, shared_ptr<set<shared_pt
{
auto r = nearest[i];
tried.push_back(r);
FindNode p(r->endpoint.udp, _node);
FindNode p(r->endpoint, _node);
p.sign(m_secret);
m_socketPointer->send(p);
}
@ -273,13 +271,14 @@ vector<shared_ptr<NodeEntry>> NodeTable::nearestNodeEntries(NodeId _target)
vector<shared_ptr<NodeEntry>> ret;
for (auto& nodes: found)
for (auto n: nodes.second)
if (n->endpoint.isValid())
ret.push_back(n);
return move(ret);
}
void NodeTable::ping(bi::udp::endpoint _to) const
{
PingNode p(_to, m_node.endpoint.udp.address().to_string(), m_node.endpoint.udp.port());
PingNode p(_to, m_node.endpoint.address.to_string(), m_node.endpoint.udpPort);
p.sign(m_secret);
m_socketPointer->send(p);
}
@ -287,7 +286,7 @@ void NodeTable::ping(bi::udp::endpoint _to) const
void NodeTable::ping(NodeEntry* _n) const
{
if (_n)
ping(_n->endpoint.udp);
ping(_n->endpoint);
}
void NodeTable::evict(shared_ptr<NodeEntry> _leastSeen, shared_ptr<NodeEntry> _new)
@ -306,16 +305,15 @@ void NodeTable::evict(shared_ptr<NodeEntry> _leastSeen, shared_ptr<NodeEntry> _n
void NodeTable::noteActiveNode(Public const& _pubk, bi::udp::endpoint const& _endpoint)
{
if (_pubk == m_node.address())
if (_pubk == m_node.address() || !NodeIPEndpoint(_endpoint.address(), _endpoint.port(), _endpoint.port()).isValid())
return;
shared_ptr<NodeEntry> node = nodeEntry(_pubk);
if (!!node && !node->pending)
{
clog(NodeTableConnect) << "Noting active node:" << _pubk.abridged() << _endpoint.address().to_string() << ":" << _endpoint.port();
node->endpoint.udp.address(_endpoint.address());
node->endpoint.udp.port(_endpoint.port());
node->cullEndpoint();
node->endpoint.address = _endpoint.address();
node->endpoint.udpPort = _endpoint.port();
shared_ptr<NodeEntry> contested;
{
@ -445,7 +443,7 @@ void NodeTable::onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytes
m_pubkDiscoverPings.erase(_from.address());
}
if (!haveNode(nodeid))
addNode(nodeid, _from, bi::tcp::endpoint(_from.address(), _from.port()));
addNode(nodeid, NodeIPEndpoint(_from.address(), _from.port(), _from.port()));
}
else
return; // unsolicited pong; don't note node as active
@ -459,7 +457,7 @@ void NodeTable::onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytes
{
Neighbours in = Neighbours::fromBytesConstRef(_from, rlpBytes);
for (auto n: in.nodes)
addNode(n.node, bi::udp::endpoint(bi::address::from_string(n.ipAddress), n.port), bi::tcp::endpoint(bi::address::from_string(n.ipAddress), n.port));
addNode(n.node, NodeIPEndpoint(bi::address::from_string(n.ipAddress), n.udpPort, n.udpPort));
break;
}
@ -490,7 +488,8 @@ void NodeTable::onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytes
return;
}
addNode(nodeid, _from, bi::tcp::endpoint(bi::address::from_string(in.ipAddress), in.port));
// TODO: Feedback if _from.address() != in.ipAddress
addNode(nodeid, NodeIPEndpoint(_from.address(), _from.port(), in.tcpPort));
Pong p(_from);
p.echo = sha3(rlpBytes);
@ -568,7 +567,7 @@ void NodeTable::doRefreshBuckets(boost::system::error_code const& _ec)
void PingNode::streamRLP(RLPStream& _s) const
{
_s.appendList(4);
_s << dev::p2p::c_protocolVersion << ipAddress << port << expiration;
_s << dev::p2p::c_protocolVersion << ipAddress << tcpPort << ts;
}
void PingNode::interpretRLP(bytesConstRef _bytes)
@ -578,15 +577,15 @@ void PingNode::interpretRLP(bytesConstRef _bytes)
{
version = 2;
ipAddress = r[0].toString();
port = r[1].toInt<unsigned>(RLP::Strict);
expiration = r[2].toInt<unsigned>(RLP::Strict);
tcpPort = r[1].toInt<unsigned>(RLP::Strict);
ts = r[2].toInt<unsigned>(RLP::Strict);
}
else if (r.itemCountStrict() == 4)
{
version = r[0].toInt<unsigned>(RLP::Strict);
ipAddress = r[1].toString();
port = r[2].toInt<unsigned>(RLP::Strict);
expiration = r[3].toInt<unsigned>(RLP::Strict);
tcpPort = r[2].toInt<unsigned>(RLP::Strict);
ts = r[3].toInt<unsigned>(RLP::Strict);
}
else
BOOST_THROW_EXCEPTION(InvalidRLP());

63
libp2p/NodeTable.h

@ -41,10 +41,7 @@ namespace p2p
struct NodeEntry: public Node
{
NodeEntry(Node _src, Public _pubk, NodeIPEndpoint _gw);
NodeEntry(Node _src, Public _pubk, bi::udp::endpoint _udp);
unsigned const distance; ///< Node's distance (xor of _src as integer).
bool pending = true; ///< Node will be ignored until Pong is received
};
@ -136,7 +133,7 @@ class NodeTable: UDPSocketEvents, public std::enable_shared_from_this<NodeTable>
public:
/// Constructor requiring host for I/O, credentials, and IP Address and port to listen on.
NodeTable(ba::io_service& _io, KeyPair _alias, bi::address const& _udpAddress, uint16_t _udpPort = 30303);
NodeTable(ba::io_service& _io, KeyPair const& _alias, NodeIPEndpoint const& _endpoint);
~NodeTable();
/// Returns distance based on xor metric two node ids. Used by NodeEntry and NodeTable.
@ -149,7 +146,7 @@ public:
void processEvents();
/// Add node. Node will be pinged and empty shared_ptr is returned if NodeId is uknown.
std::shared_ptr<NodeEntry> addNode(Public const& _pubk, bi::udp::endpoint const& _udp, bi::tcp::endpoint const& _tcp);
std::shared_ptr<NodeEntry> addNode(Public const& _pubk, NodeIPEndpoint const& _ep);
/// Add node. Node will be pinged and empty shared_ptr is returned if node has never been seen.
std::shared_ptr<NodeEntry> addNode(Node const& _node);
@ -212,7 +209,7 @@ private:
void ping(NodeEntry* _n) const;
/// Returns center node entry which describes this node and used with dist() to calculate xor metric for node table nodes.
NodeEntry center() const { return NodeEntry(m_node, m_node.publicKey(), m_node.endpoint.udp); }
NodeEntry center() const { return NodeEntry(m_node, m_node.publicKey(), m_node.endpoint); }
/// Used by asynchronous operations to return NodeEntry which is active and managed by node table.
std::shared_ptr<NodeEntry> nodeEntry(NodeId _id);
@ -281,10 +278,10 @@ private:
inline std::ostream& operator<<(std::ostream& _out, NodeTable const& _nodeTable)
{
_out << _nodeTable.center().address() << "\t" << "0\t" << _nodeTable.center().endpoint.udp.address() << ":" << _nodeTable.center().endpoint.udp.port() << std::endl;
_out << _nodeTable.center().address() << "\t" << "0\t" << _nodeTable.center().endpoint.address << ":" << _nodeTable.center().endpoint.udpPort << std::endl;
auto s = _nodeTable.snapshot();
for (auto n: s)
_out << n.address() << "\t" << n.distance << "\t" << n.endpoint.udp.address() << ":" << n.endpoint.udp.port() << std::endl;
_out << n.address() << "\t" << n.distance << "\t" << n.endpoint.address << ":" << n.endpoint.udpPort << std::endl;
return _out;
}
@ -292,7 +289,7 @@ struct InvalidRLP: public Exception {};
/**
* Ping packet: Sent to check if node is alive.
* PingNode is cached and regenerated after expiration - t, where t is timeout.
* PingNode is cached and regenerated after ts + t, where t is timeout.
*
* Ping is used to implement evict. When a new node is seen for
* a given bucket which is full, the least-responsive node is pinged.
@ -306,7 +303,6 @@ struct InvalidRLP: public Exception {};
* signature: Signature of message.
* ipAddress: Our IP address.
* port: Our port.
* expiration: Triggers regeneration of packet. May also provide control over synchronization.
*
* @todo uint128_t for ip address (<->integer ipv4/6, asio-address, asio-endpoint)
*
@ -314,14 +310,15 @@ struct InvalidRLP: public Exception {};
struct PingNode: RLPXDatagram<PingNode>
{
PingNode(bi::udp::endpoint _ep): RLPXDatagram<PingNode>(_ep) {}
PingNode(bi::udp::endpoint _ep, std::string _src, uint16_t _srcPort, std::chrono::seconds _expiration = std::chrono::seconds(60)): RLPXDatagram<PingNode>(_ep), ipAddress(_src), port(_srcPort), expiration(futureFromEpoch(_expiration)) {}
PingNode(bi::udp::endpoint _ep, std::string _src, uint16_t _srcPort, std::chrono::seconds _ts = std::chrono::seconds(60)): RLPXDatagram<PingNode>(_ep), ipAddress(_src), tcpPort(_srcPort), ts(futureFromEpoch(_ts)) {}
static const uint8_t type = 1;
unsigned version = 0;
std::string ipAddress;
unsigned port;
unsigned expiration;
// unsigned udpPort;
unsigned tcpPort;
unsigned ts;
void streamRLP(RLPStream& _s) const override;
void interpretRLP(bytesConstRef _bytes) override;
@ -336,20 +333,20 @@ struct PingNode: RLPXDatagram<PingNode>
*/
struct Pong: RLPXDatagram<Pong>
{
Pong(bi::udp::endpoint _ep): RLPXDatagram<Pong>(_ep), expiration(futureFromEpoch(std::chrono::seconds(60))) {}
Pong(bi::udp::endpoint _ep): RLPXDatagram<Pong>(_ep), ts(futureFromEpoch(std::chrono::seconds(60))) {}
static const uint8_t type = 2;
h256 echo; ///< MCD of PingNode
unsigned expiration;
unsigned ts;
void streamRLP(RLPStream& _s) const { _s.appendList(2); _s << echo << expiration; }
void interpretRLP(bytesConstRef _bytes) { RLP r(_bytes); echo = (h256)r[0]; expiration = r[1].toInt<unsigned>(); }
void streamRLP(RLPStream& _s) const { _s.appendList(2); _s << echo << ts; }
void interpretRLP(bytesConstRef _bytes) { RLP r(_bytes); echo = (h256)r[0]; ts = r[1].toInt<unsigned>(); }
};
/**
* FindNode Packet: Request k-nodes, closest to the target.
* FindNode is cached and regenerated after expiration - t, where t is timeout.
* FindNode is cached and regenerated after ts + t, where t is timeout.
* FindNode implicitly results in finding neighbours of a given node.
*
* RLP Encoded Items: 2
@ -357,21 +354,20 @@ struct Pong: RLPXDatagram<Pong>
* Maximum Encoded Size: 30 bytes
*
* target: NodeId of node. The responding node will send back nodes closest to the target.
* expiration: Triggers regeneration of packet. May also provide control over synchronization.
*
*/
struct FindNode: RLPXDatagram<FindNode>
{
FindNode(bi::udp::endpoint _ep): RLPXDatagram<FindNode>(_ep) {}
FindNode(bi::udp::endpoint _ep, NodeId _target, std::chrono::seconds _expiration = std::chrono::seconds(30)): RLPXDatagram<FindNode>(_ep), target(_target), expiration(futureFromEpoch(_expiration)) {}
FindNode(bi::udp::endpoint _ep, NodeId _target, std::chrono::seconds _ts = std::chrono::seconds(60)): RLPXDatagram<FindNode>(_ep), target(_target), ts(futureFromEpoch(_ts)) {}
static const uint8_t type = 3;
h512 target;
unsigned expiration;
unsigned ts;
void streamRLP(RLPStream& _s) const { _s.appendList(2); _s << target << expiration; }
void interpretRLP(bytesConstRef _bytes) { RLP r(_bytes); target = r[0].toHash<h512>(); expiration = r[1].toInt<unsigned>(); }
void streamRLP(RLPStream& _s) const { _s.appendList(2); _s << target << ts; }
void interpretRLP(bytesConstRef _bytes) { RLP r(_bytes); target = r[0].toHash<h512>(); ts = r[1].toInt<unsigned>(); }
};
/**
@ -387,21 +383,22 @@ struct Neighbours: RLPXDatagram<Neighbours>
Node() = default;
Node(RLP const& _r) { interpretRLP(_r); }
std::string ipAddress;
unsigned port;
unsigned udpPort;
// unsigned tcpPort;
NodeId node;
void streamRLP(RLPStream& _s) const { _s.appendList(3); _s << ipAddress << port << node; }
void interpretRLP(RLP const& _r) { ipAddress = _r[0].toString(); port = _r[1].toInt<unsigned>(); node = h512(_r[2].toBytes()); }
void streamRLP(RLPStream& _s) const { _s.appendList(3); _s << ipAddress << udpPort << node; }
void interpretRLP(RLP const& _r) { ipAddress = _r[0].toString(); udpPort = _r[1].toInt<unsigned>(); node = h512(_r[2].toBytes()); }
};
Neighbours(bi::udp::endpoint _ep): RLPXDatagram<Neighbours>(_ep), expiration(futureFromEpoch(std::chrono::seconds(30))) {}
Neighbours(bi::udp::endpoint _to, std::vector<std::shared_ptr<NodeEntry>> const& _nearest, unsigned _offset = 0, unsigned _limit = 0): RLPXDatagram<Neighbours>(_to), expiration(futureFromEpoch(std::chrono::seconds(30)))
Neighbours(bi::udp::endpoint _ep): RLPXDatagram<Neighbours>(_ep), ts(futureFromEpoch(std::chrono::seconds(30))) {}
Neighbours(bi::udp::endpoint _to, std::vector<std::shared_ptr<NodeEntry>> const& _nearest, unsigned _offset = 0, unsigned _limit = 0): RLPXDatagram<Neighbours>(_to), ts(futureFromEpoch(std::chrono::seconds(30)))
{
auto limit = _limit ? std::min(_nearest.size(), (size_t)(_offset + _limit)) : _nearest.size();
for (auto i = _offset; i < limit; i++)
{
Node node;
node.ipAddress = _nearest[i]->endpoint.udp.address().to_string();
node.port = _nearest[i]->endpoint.udp.port();
node.ipAddress = _nearest[i]->endpoint.address.to_string();
node.udpPort = _nearest[i]->endpoint.udpPort;
node.node = _nearest[i]->publicKey();
nodes.push_back(node);
}
@ -409,10 +406,10 @@ struct Neighbours: RLPXDatagram<Neighbours>
static const uint8_t type = 4;
std::vector<Node> nodes;
unsigned expiration = 1;
unsigned ts = 1;
void streamRLP(RLPStream& _s) const { _s.appendList(2); _s.appendList(nodes.size()); for (auto& n: nodes) n.streamRLP(_s); _s << expiration; }
void interpretRLP(bytesConstRef _bytes) { RLP r(_bytes); for (auto n: r[0]) nodes.push_back(Node(n)); expiration = r[1].toInt<unsigned>(); }
void streamRLP(RLPStream& _s) const { _s.appendList(2); _s.appendList(nodes.size()); for (auto& n: nodes) n.streamRLP(_s); _s << ts; }
void interpretRLP(bytesConstRef _bytes) { RLP r(_bytes); for (auto n: r[0]) nodes.push_back(Node(n)); ts = r[1].toInt<unsigned>(); }
};
struct NodeTableWarn: public LogChannel { static const char* name() { return "!P!"; } static const int verbosity = 0; };

2
libp2p/Peer.h

@ -59,8 +59,6 @@ class Peer: public Node
public:
bool isOffline() const { return !m_session.lock(); }
bi::tcp::endpoint const& peerEndpoint() const { return endpoint.tcp; }
virtual bool operator<(Peer const& _p) const;
/// WIP: Returns current peer rating.

12
libp2p/Session.cpp

@ -138,11 +138,11 @@ void Session::serviceNodesRequest()
auto rs = randomSelection(peers, 10);
for (auto const& i: rs)
{
clogS(NetTriviaDetail) << "Sending peer " << i.id.abridged() << i.peerEndpoint();
if (i.peerEndpoint().address().is_v4())
s.appendList(3) << bytesConstRef(i.peerEndpoint().address().to_v4().to_bytes().data(), 4) << i.peerEndpoint().port() << i.id;
clogS(NetTriviaDetail) << "Sending peer " << i.id.abridged() << i.endpoint;
if (i.endpoint.address.is_v4())
s.appendList(3) << bytesConstRef(i.endpoint.address.to_v4().to_bytes().data(), 4) << i.endpoint.tcpPort << i.id;
else// if (i.second.address().is_v6()) - assumed
s.appendList(3) << bytesConstRef(i.peerEndpoint().address().to_v6().to_bytes().data(), 16) << i.peerEndpoint().port() << i.id;
s.appendList(3) << bytesConstRef(i.endpoint.address.to_v6().to_bytes().data(), 16) << i.endpoint.tcpPort << i.id;
}
sealAndSend(s);
m_theyRequestedNodes = false;
@ -237,7 +237,7 @@ bool Session::interpret(PacketType _t, RLP const& _r)
// OK passed all our checks. Assume it's good.
addRating(1000);
m_server->addNode(id, ep.address(), ep.port(), ep.port());
m_server->addNode(id, NodeIPEndpoint(ep.address(), ep.port(), ep.port()));
clogS(NetTriviaDetail) << "New peer: " << ep << "(" << id .abridged()<< ")";
CONTINUE:;
LAMEPEER:;
@ -370,7 +370,7 @@ void Session::disconnect(DisconnectReason _reason)
clogS(NetConnect) << "Disconnecting (our reason:" << reasonOf(_reason) << ")";
StructuredLogger::p2pDisconnected(
m_info.id.abridged(),
m_peer->peerEndpoint(),
m_peer->endpoint, // TODO: may not be 100% accurate
m_server->peerCount()
);
if (m_socket.is_open())

4
libwebthree/WebThree.cpp

@ -104,12 +104,12 @@ bytes WebThreeDirect::saveNetwork()
void WebThreeDirect::addNode(NodeId const& _node, bi::tcp::endpoint const& _host)
{
m_net.addNode(_node, _host.address(), _host.port(), _host.port());
m_net.addNode(_node, NodeIPEndpoint(_host.address(), _host.port(), _host.port()));
}
void WebThreeDirect::requirePeer(NodeId const& _node, bi::tcp::endpoint const& _host)
{
m_net.requirePeer(_node, _host.address(), _host.port());
m_net.requirePeer(_node, NodeIPEndpoint(_host.address(), _host.port(), _host.port()));
}

16
test/net.cpp

@ -32,7 +32,13 @@ using namespace dev::p2p;
namespace ba = boost::asio;
namespace bi = ba::ip;
BOOST_AUTO_TEST_SUITE(net)
struct NetFixture
{
NetFixture() { dev::p2p::NodeIPEndpoint::test_allowLocal = true;; }
~NetFixture() { dev::p2p::NodeIPEndpoint::test_allowLocal = false; }
};
BOOST_FIXTURE_TEST_SUITE(net, NetFixture)
/**
* Only used for testing. Not useful beyond tests.
@ -53,7 +59,7 @@ protected:
struct TestNodeTable: public NodeTable
{
/// Constructor
TestNodeTable(ba::io_service& _io, KeyPair _alias, bi::address const& _addr, uint16_t _port = 30300): NodeTable(_io, _alias, _addr, _port) {}
TestNodeTable(ba::io_service& _io, KeyPair _alias, bi::address const& _addr, uint16_t _port = 30300): NodeTable(_io, _alias, NodeIPEndpoint(_addr, _port, _port)) {}
static std::vector<std::pair<KeyPair,unsigned>> createTestNodes(unsigned _count)
{
@ -93,7 +99,7 @@ struct TestNodeTable: public NodeTable
// manually add node for test
{
Guard ln(x_nodes);
shared_ptr<NodeEntry> node(new NodeEntry(m_node, n.first.pub(), NodeIPEndpoint(bi::udp::endpoint(ourIp, n.second), bi::tcp::endpoint(ourIp, n.second))));
shared_ptr<NodeEntry> node(new NodeEntry(m_node, n.first.pub(), NodeIPEndpoint(ourIp, n.second, n.second)));
node->pending = false;
m_nodes[node->id] = node;
}
@ -201,7 +207,7 @@ BOOST_AUTO_TEST_CASE(test_neighbours_packet)
{
Neighbours::Node node;
node.ipAddress = boost::asio::ip::address::from_string("127.0.0.1").to_string();
node.port = n.second;
node.udpPort = n.second;
node.node = n.first.pub();
out.nodes.push_back(node);
}
@ -213,7 +219,7 @@ BOOST_AUTO_TEST_CASE(test_neighbours_packet)
int count = 0;
for (auto n: in.nodes)
{
BOOST_REQUIRE_EQUAL(testNodes[count].second, n.port);
BOOST_REQUIRE_EQUAL(testNodes[count].second, n.udpPort);
BOOST_REQUIRE_EQUAL(testNodes[count].first.pub(), n.node);
BOOST_REQUIRE_EQUAL(sha3(testNodes[count].first.pub()), sha3(n.node));
count++;

16
test/peer.cpp

@ -28,7 +28,13 @@ using namespace std;
using namespace dev;
using namespace dev::p2p;
BOOST_AUTO_TEST_SUITE(p2p)
struct P2PFixture
{
P2PFixture() { dev::p2p::NodeIPEndpoint::test_allowLocal = true;; }
~P2PFixture() { dev::p2p::NodeIPEndpoint::test_allowLocal = false; }
};
BOOST_FIXTURE_TEST_SUITE(p2p, P2PFixture)
BOOST_AUTO_TEST_CASE(host)
{
@ -45,7 +51,7 @@ BOOST_AUTO_TEST_CASE(host)
auto node2 = host2.id();
host2.start();
host1.addNode(node2, bi::address::from_string("127.0.0.1"), host2prefs.listenPort, host2prefs.listenPort);
host1.addNode(node2, NodeIPEndpoint(bi::address::from_string("127.0.0.1"), host2prefs.listenPort, host2prefs.listenPort));
this_thread::sleep_for(chrono::seconds(3));
@ -82,11 +88,11 @@ BOOST_AUTO_TEST_CASE(save_nodes)
Host& host = *hosts.front();
for (auto const& h: hosts)
host.addNode(h->id(), bi::address::from_string("127.0.0.1"), h->listenPort(), h->listenPort());
host.addNode(h->id(), NodeIPEndpoint(bi::address::from_string("127.0.0.1"), h->listenPort(), h->listenPort()));
Host& host2 = *hosts.back();
for (auto const& h: hosts)
host2.addNode(h->id(), bi::address::from_string("127.0.0.1"), h->listenPort(), h->listenPort());
host2.addNode(h->id(), NodeIPEndpoint(bi::address::from_string("127.0.0.1"), h->listenPort(), h->listenPort()));
this_thread::sleep_for(chrono::milliseconds(2000));
bytes firstHostNetwork(host.saveNetwork());
@ -131,7 +137,7 @@ int peerTest(int argc, char** argv)
Host ph("Test", NetworkPreferences(listenPort));
if (!remoteHost.empty() && !remoteAlias)
ph.addNode(remoteAlias, bi::address::from_string(remoteHost), remotePort, remotePort);
ph.addNode(remoteAlias, NodeIPEndpoint(bi::address::from_string(remoteHost), remotePort, remotePort));
this_thread::sleep_for(chrono::milliseconds(200));

Loading…
Cancel
Save