Browse Source

compiles (v4 endpoint changes)

cl-refactor
subtly 10 years ago
parent
commit
e9dfa8642e
  1. 2
      libp2p/Common.cpp
  2. 11
      libp2p/Common.h
  3. 44
      libp2p/Host.cpp
  4. 45
      libp2p/NodeTable.cpp
  5. 62
      libp2p/NodeTable.h

2
libp2p/Common.cpp

@ -24,7 +24,7 @@ using namespace std;
using namespace dev;
using namespace dev::p2p;
const unsigned dev::p2p::c_protocolVersion = 3;
const unsigned dev::p2p::c_protocolVersion = 4;
const unsigned dev::p2p::c_defaultIPPort = 30303;
const dev::p2p::NodeIPEndpoint dev::p2p::UnspecifiedNodeIPEndpoint = NodeIPEndpoint(bi::address(), 0, 0);

11
libp2p/Common.h

@ -36,6 +36,7 @@
#include <libdevcrypto/Common.h>
#include <libdevcore/Log.h>
#include <libdevcore/Exceptions.h>
#include <libdevcore/RLP.h>
namespace ba = boost::asio;
namespace bi = boost::asio::ip;
@ -162,10 +163,17 @@ using PeerSessionInfos = std::vector<PeerSessionInfo>;
*/
struct NodeIPEndpoint
{
enum InlineRLP
{
CreateList,
InlineList
};
/// Setting true causes isAllowed to return true for all addresses. (Used by test fixtures)
static bool test_allowLocal;
NodeIPEndpoint(bi::address _addr, uint16_t _udp, uint16_t _tcp): address(_addr), udpPort(_udp), tcpPort(_tcp) {}
NodeIPEndpoint(RLP const& _r) { interpretRLP(_r); }
bi::address address;
uint16_t udpPort;
@ -177,6 +185,9 @@ struct NodeIPEndpoint
operator bool() const { return !address.is_unspecified() && udpPort > 0 && tcpPort > 0; }
bool isAllowed() const { return NodeIPEndpoint::test_allowLocal ? !address.is_unspecified() : isPublicAddress(address); }
void streamRLP(RLPStream& _s, bool _inline = CreateList) const { if (_inline == CreateList) _s.appendList(3); if (address.is_v4()) _s << address.to_v4().to_bytes(); else _s << address.to_v6().to_bytes(); _s << udpPort << tcpPort; }
void interpretRLP(RLP const& _r) { if (_r[0].size() == 4) address = bi::address_v4(_r[0].toArray<byte, 4>()); else address = bi::address_v6(_r[0].toArray<byte, 16>()); udpPort = _r[1].toInt<uint16_t>(); tcpPort = _r[2].toInt<uint16_t>(); }
};
struct Node

44
libp2p/Host.cpp

@ -678,15 +678,16 @@ bytes Host::saveNetwork() const
int count = 0;
for (auto const& p: peers)
{
// Only save peers which have connected within 2 days, with properly-advertised port and public IP address
// todo: e2e ipv6 support
// todo: ipv6
if (!p.endpoint.address.is_v4())
continue;
if (chrono::system_clock::now() - p.m_lastConnected < chrono::seconds(3600 * 48) && p.endpoint.tcpPort > 0 && p.id != id() && (p.required || p.endpoint.isAllowed()))
// Only save peers which have connected within 2 days, with properly-advertised port and public IP address
if (chrono::system_clock::now() - p.m_lastConnected < chrono::seconds(3600 * 48) && !!p.endpoint && p.id != id() && (p.required || p.endpoint.isAllowed()))
{
network.appendList(10);
network << p.endpoint.address.to_v4().to_bytes() << p.endpoint.tcpPort << p.id << p.required
network.appendList(11);
p.endpoint.streamRLP(network, NodeIPEndpoint::InlineList);
network << p.id << p.required
<< chrono::duration_cast<chrono::seconds>(p.m_lastConnected.time_since_epoch()).count()
<< chrono::duration_cast<chrono::seconds>(p.m_lastAttempted.time_since_epoch()).count()
<< p.m_failedAttempts << (unsigned)p.m_lastDisconnect << p.m_score << p.m_rating;
@ -700,12 +701,9 @@ bytes Host::saveNetwork() const
state.sort();
for (auto const& entry: state)
{
network.appendList(3);
if (entry.endpoint.address.is_v4())
network << entry.endpoint.address.to_v4().to_bytes();
else
network << entry.endpoint.address.to_v6().to_bytes();
network << entry.endpoint.tcpPort << entry.id;
network.appendList(4);
entry.endpoint.streamRLP(network, NodeIPEndpoint::InlineList);
network << entry.id;
count++;
}
}
@ -738,25 +736,25 @@ void Host::restoreNetwork(bytesConstRef _b)
for (auto i: r[2])
{
// todo: ipv6
if (i[0].itemCount() != 4)
continue;
// todo: ipv6, bi::address_v6(i[0].toArray<byte, 16>()
Node n((NodeId)i[2], NodeIPEndpoint(bi::address_v4(i[0].toArray<byte, 4>()), i[1].toInt<uint16_t>(), i[1].toInt<uint16_t>()));
if (i.itemCount() == 3 && n.endpoint.isAllowed())
Node n((NodeId)i[3], NodeIPEndpoint(i));
if (i.itemCount() == 4 && n.endpoint.isAllowed())
m_nodeTable->addNode(n);
else if (i.itemCount() == 10)
else if (i.itemCount() == 11)
{
n.required = i[3].toInt<bool>();
n.required = i[4].toInt<bool>();
if (!n.endpoint.isAllowed() && !n.required)
continue;
shared_ptr<Peer> p = make_shared<Peer>(n);
p->m_lastConnected = chrono::system_clock::time_point(chrono::seconds(i[4].toInt<unsigned>()));
p->m_lastAttempted = chrono::system_clock::time_point(chrono::seconds(i[5].toInt<unsigned>()));
p->m_failedAttempts = i[6].toInt<unsigned>();
p->m_lastDisconnect = (DisconnectReason)i[7].toInt<unsigned>();
p->m_score = (int)i[8].toInt<unsigned>();
p->m_rating = (int)i[9].toInt<unsigned>();
p->m_lastConnected = chrono::system_clock::time_point(chrono::seconds(i[5].toInt<unsigned>()));
p->m_lastAttempted = chrono::system_clock::time_point(chrono::seconds(i[6].toInt<unsigned>()));
p->m_failedAttempts = i[7].toInt<unsigned>();
p->m_lastDisconnect = (DisconnectReason)i[8].toInt<unsigned>();
p->m_score = (int)i[9].toInt<unsigned>();
p->m_rating = (int)i[10].toInt<unsigned>();
m_peers[p->id] = p;
if (p->required)
requirePeer(p->id, n.endpoint);

45
libp2p/NodeTable.cpp

@ -99,9 +99,7 @@ shared_ptr<NodeEntry> NodeTable::addNode(Node const& _node)
Guard l(x_pubkDiscoverPings);
m_pubkDiscoverPings[_node.endpoint.address] = std::chrono::steady_clock::now();
}
PingNode p(_node.endpoint, m_node.endpoint.address.to_string(), m_node.endpoint.udpPort);
p.sign(m_secret);
m_socketPointer->send(p);
ping(_node.endpoint);
return move(shared_ptr<NodeEntry>());
}
@ -114,9 +112,7 @@ shared_ptr<NodeEntry> NodeTable::addNode(Node const& _node)
shared_ptr<NodeEntry> ret(new NodeEntry(m_node, _node.id, _node.endpoint));
m_nodes[_node.id] = ret;
clog(NodeTableConnect) << "addNode pending for" << _node.endpoint;
PingNode p(_node.endpoint, m_node.endpoint.address.to_string(), m_node.endpoint.udpPort);
p.sign(m_secret);
m_socketPointer->send(p);
ping(_node.endpoint);
return ret;
}
@ -291,9 +287,9 @@ vector<shared_ptr<NodeEntry>> NodeTable::nearestNodeEntries(NodeId _target)
return move(ret);
}
void NodeTable::ping(bi::udp::endpoint _to) const
void NodeTable::ping(NodeIPEndpoint _to) const
{
PingNode p(_to, m_node.endpoint.address.to_string(), m_node.endpoint.udpPort);
PingNode p(m_node.endpoint, _to);
p.sign(m_secret);
m_socketPointer->send(p);
}
@ -488,8 +484,8 @@ void NodeTable::onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytes
}
Neighbours in = Neighbours::fromBytesConstRef(_from, rlpBytes);
for (auto n: in.nodes)
addNode(n.node, NodeIPEndpoint(bi::address::from_string(n.ipAddress), n.udpPort, n.udpPort));
for (auto n: in.neighbours)
addNode(n.node, n.endpoint);
break;
}
@ -520,10 +516,9 @@ void NodeTable::onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytes
return;
}
// TODO: Feedback if _from.address() != in.ipAddress
addNode(nodeid, NodeIPEndpoint(_from.address(), _from.port(), in.tcpPort));
Pong p(_from);
// TODO: Feedback if _from.address() != in.ipAddress, or, _from.por() != in.source.udpPort
auto node = addNode(nodeid, NodeIPEndpoint(_from.address(), _from.port(), in.source.tcpPort));
Pong p(node->endpoint);
p.echo = sha3(rlpBytes);
p.sign(m_secret);
m_socketPointer->send(p);
@ -599,26 +594,22 @@ void NodeTable::doRefreshBuckets(boost::system::error_code const& _ec)
void PingNode::streamRLP(RLPStream& _s) const
{
_s.appendList(4);
_s << dev::p2p::c_protocolVersion << ipAddress << tcpPort << ts;
_s << dev::p2p::c_protocolVersion;
source.streamRLP(_s);
destination.streamRLP(_s);
_s << ts;
}
void PingNode::interpretRLP(bytesConstRef _bytes)
{
RLP r(_bytes);
if (r.itemCountStrict() == 3)
{
version = 2;
ipAddress = r[0].toString();
tcpPort = r[1].toInt<unsigned>(RLP::Strict);
ts = r[2].toInt<unsigned>(RLP::Strict);
}
else if (r.itemCountStrict() == 4)
if (r.itemCountStrict() == 4 && r[0].isInt() && r[0].toInt<unsigned>(RLP::Strict) == dev::p2p::c_protocolVersion)
{
version = r[0].toInt<unsigned>(RLP::Strict);
ipAddress = r[1].toString();
tcpPort = r[2].toInt<unsigned>(RLP::Strict);
version = dev::p2p::c_protocolVersion;
source.interpretRLP(r[1]);
destination.interpretRLP(r[2]);
ts = r[3].toInt<unsigned>(RLP::Strict);
}
else
BOOST_THROW_EXCEPTION(InvalidRLP());
version = 0;
}

62
libp2p/NodeTable.h

@ -204,7 +204,7 @@ private:
};
/// Used to ping endpoint.
void ping(bi::udp::endpoint _to) const;
void ping(NodeIPEndpoint _to) const;
/// Used ping known node. Used by node table when refreshing buckets and as part of eviction process (see evict).
void ping(NodeEntry* _n) const;
@ -299,29 +299,17 @@ struct InvalidRLP: public Exception {};
* a given bucket which is full, the least-responsive node is pinged.
* If the pinged node doesn't respond, then it is removed and the new
* node is inserted.
*
* RLP Encoded Items: 3
* Minimum Encoded Size: 18 bytes
* Maximum Encoded Size: bytes // todo after u128 addresses
*
* signature: Signature of message.
* ipAddress: Our IP address.
* port: Our port.
*
* @todo uint128_t for ip address (<->integer ipv4/6, asio-address, asio-endpoint)
*
*/
struct PingNode: RLPXDatagram<PingNode>
{
PingNode(bi::udp::endpoint _ep): RLPXDatagram<PingNode>(_ep) {}
PingNode(bi::udp::endpoint _ep, std::string _src, uint16_t _srcPort, std::chrono::seconds _ts = std::chrono::seconds(60)): RLPXDatagram<PingNode>(_ep), ipAddress(_src), tcpPort(_srcPort), ts(futureFromEpoch(_ts)) {}
PingNode(bi::udp::endpoint _ep): RLPXDatagram<PingNode>(_ep), source(UnspecifiedNodeIPEndpoint), destination(UnspecifiedNodeIPEndpoint) {}
PingNode(NodeIPEndpoint _src, NodeIPEndpoint _dest): RLPXDatagram<PingNode>(_dest), source(_src), destination(_dest), ts(futureFromEpoch(std::chrono::seconds(60))) {}
static const uint8_t type = 1;
unsigned version = 0;
std::string ipAddress;
// uint16_t udpPort;
uint16_t tcpPort;
NodeIPEndpoint source;
NodeIPEndpoint destination;
unsigned ts;
void streamRLP(RLPStream& _s) const override;
@ -330,17 +318,15 @@ struct PingNode: RLPXDatagram<PingNode>
/**
* Pong packet: Sent in response to ping
*
* RLP Encoded Items: 2
* Minimum Encoded Size: 33 bytes
* Maximum Encoded Size: 33 bytes
*/
struct Pong: RLPXDatagram<Pong>
{
Pong(bi::udp::endpoint _ep): RLPXDatagram<Pong>(_ep), ts(futureFromEpoch(std::chrono::seconds(60))) {}
Pong(bi::udp::endpoint _ep): RLPXDatagram<Pong>(_ep), destination(UnspecifiedNodeIPEndpoint) {}
Pong(NodeIPEndpoint _dest): RLPXDatagram<Pong>((bi::udp::endpoint)_dest), destination(_dest), ts(futureFromEpoch(std::chrono::seconds(60))) {}
static const uint8_t type = 2;
NodeIPEndpoint destination;
h256 echo; ///< MCD of PingNode
unsigned ts;
@ -375,23 +361,17 @@ struct FindNode: RLPXDatagram<FindNode>
};
/**
* Node Packet: Multiple node packets are sent in response to FindNode.
*
* RLP Encoded Items: 2 (first item is list)
* Minimum Encoded Size: 10 bytes
* Node Packet: One or more node packets are sent in response to FindNode.
*/
struct Neighbours: RLPXDatagram<Neighbours>
{
struct Node
struct Neighbour
{
Node() = default;
Node(RLP const& _r) { interpretRLP(_r); }
std::string ipAddress;
uint16_t udpPort;
// uint16_t tcpPort;
Neighbour(Node const& _node): endpoint(_node.endpoint), node(_node.id) {}
Neighbour(RLP const& _r): endpoint(_r) { node = h512(_r[3].toBytes()); }
NodeIPEndpoint endpoint;
NodeId node;
void streamRLP(RLPStream& _s) const { _s.appendList(3); _s << ipAddress << udpPort << node; }
void interpretRLP(RLP const& _r) { ipAddress = _r[0].toString(); udpPort = _r[1].toInt<uint16_t>(); node = h512(_r[2].toBytes()); }
void streamRLP(RLPStream& _s) const { _s.appendList(4); endpoint.streamRLP(_s, NodeIPEndpoint::InlineList); _s << node; }
};
Neighbours(bi::udp::endpoint _ep): RLPXDatagram<Neighbours>(_ep), ts(futureFromEpoch(std::chrono::seconds(30))) {}
@ -399,21 +379,15 @@ struct Neighbours: RLPXDatagram<Neighbours>
{
auto limit = _limit ? std::min(_nearest.size(), (size_t)(_offset + _limit)) : _nearest.size();
for (auto i = _offset; i < limit; i++)
{
Node node;
node.ipAddress = _nearest[i]->endpoint.address.to_string();
node.udpPort = _nearest[i]->endpoint.udpPort;
node.node = _nearest[i]->publicKey();
nodes.push_back(node);
}
neighbours.push_back(Neighbour(*_nearest[i]));
}
static const uint8_t type = 4;
std::vector<Node> nodes;
std::vector<Neighbour> neighbours;
unsigned ts = 1;
void streamRLP(RLPStream& _s) const { _s.appendList(2); _s.appendList(nodes.size()); for (auto& n: nodes) n.streamRLP(_s); _s << ts; }
void interpretRLP(bytesConstRef _bytes) { RLP r(_bytes); for (auto n: r[0]) nodes.push_back(Node(n)); ts = r[1].toInt<unsigned>(); }
void streamRLP(RLPStream& _s) const { _s.appendList(2); _s.appendList(neighbours.size()); for (auto& n: neighbours) n.streamRLP(_s); _s << ts; }
void interpretRLP(bytesConstRef _bytes) { RLP r(_bytes); for (auto n: r[0]) neighbours.push_back(Neighbour(n)); ts = r[1].toInt<unsigned>(); }
};
struct NodeTableWarn: public LogChannel { static const char* name(); static const int verbosity = 0; };

Loading…
Cancel
Save