Browse Source

memory

cl-refactor
subtly 10 years ago
parent
commit
ba455270c0
  1. 44
      libp2p/NodeTable.cpp
  2. 36
      libp2p/NodeTable.h
  3. 1
      libp2p/UDP.h
  4. 43
      test/net.cpp

44
libp2p/NodeTable.cpp

@ -35,8 +35,8 @@ NodeTable::NodeTable(ba::io_service& _io, KeyPair _alias, uint16_t _listenPort):
{
for (unsigned i = 0; i < s_bins; i++)
m_state[i].distance = i, m_state[i].modified = chrono::steady_clock::now() - chrono::seconds(1);
doRefreshBuckets(boost::system::error_code());
m_socketPtr->connect();
doRefreshBuckets(boost::system::error_code());
}
NodeTable::~NodeTable()
@ -87,7 +87,15 @@ void NodeTable::doFindNode(Address _node, unsigned _round, std::shared_ptr<std::
{
if (!m_socketPtr->isOpen() || _round == s_maxSteps)
return;
if (_round == s_maxSteps)
{
clog(NodeTableWarn) << "Terminating doFindNode after " << _round << " rounds.";
return;
}
else
_tried.reset(new std::set<std::shared_ptr<NodeEntry>>());
auto nearest = findNearest(_node);
std::list<std::shared_ptr<NodeEntry>> tried;
for (unsigned i = 0; i < nearest.size() && tried.size() < s_alpha; i++)
@ -137,7 +145,7 @@ std::vector<std::shared_ptr<NodeTable::NodeEntry>> NodeTable::findNearest(Addres
while (head != tail && head < s_bins && count < s_bucketSize)
{
Guard l(x_state);
for (auto& n: m_state[head].nodes)
for (auto n: m_state[head].nodes)
if (auto p = n.lock())
{
if (count < s_bucketSize)
@ -147,7 +155,7 @@ std::vector<std::shared_ptr<NodeTable::NodeEntry>> NodeTable::findNearest(Addres
}
if (count < s_bucketSize && tail)
for (auto& n: m_state[tail].nodes)
for (auto n: m_state[tail].nodes)
if (auto p = n.lock())
{
if (count < s_bucketSize)
@ -164,7 +172,7 @@ std::vector<std::shared_ptr<NodeTable::NodeEntry>> NodeTable::findNearest(Addres
while (head < s_bins && count < s_bucketSize)
{
Guard l(x_state);
for (auto& n: m_state[head].nodes)
for (auto n: m_state[head].nodes)
if (auto p = n.lock())
{
if (count < s_bucketSize)
@ -178,7 +186,7 @@ std::vector<std::shared_ptr<NodeTable::NodeEntry>> NodeTable::findNearest(Addres
while (tail > 0 && count < s_bucketSize)
{
Guard l(x_state);
for (auto& n: m_state[tail].nodes)
for (auto n: m_state[tail].nodes)
if (auto p = n.lock())
{
if (count < s_bucketSize)
@ -191,7 +199,7 @@ std::vector<std::shared_ptr<NodeTable::NodeEntry>> NodeTable::findNearest(Addres
std::vector<std::shared_ptr<NodeEntry>> ret;
for (auto& nodes: found)
for (auto& n: nodes.second)
for (auto n: nodes.second)
ret.push_back(n);
return std::move(ret);
}
@ -223,9 +231,14 @@ void NodeTable::evict(std::shared_ptr<NodeEntry> _leastSeen, std::shared_ptr<Nod
ping(_leastSeen.get());
}
void NodeTable::noteNode(Public _pubk, bi::udp::endpoint _endpoint)
void NodeTable::noteNode(Public const& _pubk, bi::udp::endpoint const& _endpoint)
{
Address id = right160(sha3(_pubk));
// Don't add ourself (would result in -1 bucket lookup)
if (id == m_node.address())
return;
std::shared_ptr<NodeEntry> node;
{
Guard l(x_nodes);
@ -325,6 +338,7 @@ void NodeTable::onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytes
case 2:
if (rlp[0].isList())
{
// todo: chunk neighbors packet
clog(NodeTableMessageSummary) << "Received Neighbors from " << _from.address().to_string() << ":" << _from.port();
Neighbors in = Neighbors::fromBytesConstRef(_from, rlpBytes);
for (auto n: in.nodes)
@ -371,7 +385,7 @@ void NodeTable::doCheckEvictions(boost::system::error_code const& _ec)
return;
auto self(shared_from_this());
m_evictionCheckTimer.expires_from_now(boost::posix_time::milliseconds(s_evictionCheckInterval));
m_evictionCheckTimer.expires_from_now(c_evictionCheckInterval);
m_evictionCheckTimer.async_wait([this, self](boost::system::error_code const& _ec)
{
if (_ec)
@ -380,17 +394,17 @@ void NodeTable::doCheckEvictions(boost::system::error_code const& _ec)
bool evictionsRemain = false;
std::list<shared_ptr<NodeEntry>> drop;
{
Guard l(x_evictions);
Guard le(x_evictions);
Guard ln(x_nodes);
for (auto& e: m_evictions)
if (chrono::steady_clock::now() - e.first.second > c_reqTimeout)
{
Guard l(x_nodes);
drop.push_back(m_nodes[e.second]);
}
if (auto n = m_nodes[e.second])
drop.push_back(n);
evictionsRemain = m_evictions.size() - drop.size() > 0;
}
for (auto& n: drop)
drop.unique();
for (auto n: drop)
dropNode(n);
if (evictionsRemain)

36
libp2p/NodeTable.h

@ -120,9 +120,9 @@ public:
/// Intervals
static constexpr unsigned s_evictionCheckInterval = 75; ///< Interval at which eviction timeouts are checked.
std::chrono::milliseconds const c_reqTimeout = std::chrono::milliseconds(300); ///< How long to wait for requests (evict, find iterations).
std::chrono::seconds const c_bucketRefresh = std::chrono::seconds(3600); ///< Refresh interval prevents bucket from becoming stale. [Kademlia]
boost::posix_time::milliseconds const c_evictionCheckInterval = boost::posix_time::milliseconds(75); ///< Interval at which eviction timeouts are checked.
std::chrono::milliseconds const c_reqTimeout = std::chrono::milliseconds(300); ///< How long to wait for requests (evict, find iterations).
std::chrono::seconds const c_bucketRefresh = std::chrono::seconds(3600); ///< Refresh interval prevents bucket from becoming stale. [Kademlia]
static unsigned dist(Address const& _a, Address const& _b) { u160 d = _a ^ _b; unsigned ret; for (ret = 0; d >>= 1; ++ret) {}; return ret; }
@ -151,7 +151,7 @@ protected:
void evict(std::shared_ptr<NodeEntry> _leastSeen, std::shared_ptr<NodeEntry> _new);
void noteNode(Public _pubk, bi::udp::endpoint _endpoint);
void noteNode(Public const& _pubk, bi::udp::endpoint const& _endpoint);
void noteNode(std::shared_ptr<NodeEntry> _n);
@ -234,11 +234,11 @@ struct PingNode: RLPXDatagram<PingNode>
PingNode(bi::udp::endpoint _ep, std::string _src, uint16_t _srcPort, std::chrono::seconds _expiration = std::chrono::seconds(60)): RLPXDatagram(_ep), ipAddress(_src), port(_srcPort), expiration(futureFromEpoch(_expiration)) {}
std::string ipAddress; // rlp encoded bytes min: 16
uint16_t port; // rlp encoded bytes min: 1 max: 3
uint64_t expiration; // rlp encoded bytes min: 1 max: 9
unsigned port; // rlp encoded bytes min: 1 max: 3
unsigned expiration; // rlp encoded bytes min: 1 max: 9
void streamRLP(RLPStream& _s) const { _s.appendList(3); _s << ipAddress << port << expiration; }
void interpretRLP(bytesConstRef _bytes) { RLP r(_bytes); ipAddress = (std::string)r[0]; port = (uint16_t)r[1]; expiration = (uint64_t)r[2]; }
void interpretRLP(bytesConstRef _bytes) { RLP r(_bytes); ipAddress = r[0].toString(); port = r[1].toInt<unsigned>(); expiration = r[2].toInt<unsigned>(); }
};
/**
@ -280,10 +280,10 @@ struct FindNode: RLPXDatagram<FindNode>
FindNode(bi::udp::endpoint _ep, Address _target, std::chrono::seconds _expiration = std::chrono::seconds(30)): RLPXDatagram(_ep), target(_target), expiration(futureFromEpoch(_expiration)) {}
h160 target;
uint64_t expiration;
unsigned expiration;
void streamRLP(RLPStream& _s) const { _s.appendList(2); _s << target << expiration; }
void interpretRLP(bytesConstRef _bytes) { RLP r(_bytes); target = (h160)r[0]; expiration = (uint64_t)r[1]; }
void interpretRLP(bytesConstRef _bytes) { RLP r(_bytes); target = r[0].toHash<h160>(); expiration = r[1].toInt<unsigned>(); }
};
/**
@ -299,12 +299,16 @@ struct Neighbors: RLPXDatagram<Neighbors>
struct Node
{
Node() = default;
Node(bytesConstRef _bytes) { interpretRLP(_bytes); }
Node(RLP const& _r) { interpretRLP(_r); }
std::string ipAddress;
uint16_t port;
unsigned port;
NodeId node;
void streamRLP(RLPStream& _s) const { _s.appendList(3); _s << ipAddress << port << node; }
void interpretRLP(bytesConstRef _bytes) { RLP r(_bytes); ipAddress = r[0].toString(); port = (uint16_t)r[1]; node = (h512)r[2]; }
void streamRLP(RLPStream& _s) const {
_s.appendList(3); _s << ipAddress << port << node;
}
void interpretRLP(RLP const& _r) {
ipAddress = _r[0].toString(); port = _r[1].toInt<unsigned>(); node = h512(_r[2].toBytes());
}
};
using RLPXDatagram::RLPXDatagram;
@ -323,8 +327,10 @@ struct Neighbors: RLPXDatagram<Neighbors>
std::list<Node> nodes;
h256 nonce;
void streamRLP(RLPStream& _s) const { _s.appendList(2); _s.appendList(nodes.size()); for (auto& n: nodes) n.streamRLP(_s); _s << nonce; }
void interpretRLP(bytesConstRef _bytes) { RLP r(_bytes); for (auto n: r[0]) nodes.push_back(Node(n.toBytesConstRef())); nonce = (h256)r[1]; }
void streamRLP(RLPStream& _s) const { _s.appendList(2); _s.appendList(nodes.size()); for (auto& n: nodes) n.streamRLP(_s); _s << 1; }
void interpretRLP(bytesConstRef _bytes) {
RLP r(_bytes); for (auto n: r[0]) nodes.push_back(Node(n)); nonce = (h256)r[1];
}
};
struct NodeTableWarn: public LogChannel { static const char* name() { return "!P!"; } static const int verbosity = 0; };

1
libp2p/UDP.h

@ -221,7 +221,6 @@ void UDPSocket<Handler,MaxDatagramSize>::doWrite()
if (sendQ.empty())
return;
}
clog(NoteChannel) << "sent datagram";
doWrite();
});
}

43
test/net.cpp

@ -57,14 +57,23 @@ struct TestNodeTable: public NodeTable
{
bi::address ourIp = bi::address::from_string("127.0.0.1");
for (auto& n: _testNodes)
{
ping(bi::udp::endpoint(ourIp, n.second));
this_thread::sleep_for(chrono::milliseconds(5));
}
}
void populate(std::vector<std::pair<KeyPair,unsigned>> const& _testNodes)
void populate(std::vector<std::pair<KeyPair,unsigned>> const& _testNodes, size_t _count = 0)
{
if (!_count)
_count = _testNodes.size();
bi::address ourIp = bi::address::from_string("127.0.0.1");
for (auto& n: _testNodes)
noteNode(n.first.pub(), bi::udp::endpoint(ourIp, n.second));
if (_count--)
noteNode(n.first.pub(), bi::udp::endpoint(ourIp, n.second));
else
break;
}
void reset()
@ -82,7 +91,7 @@ struct TestNodeTableHost: public TestHost
TestNodeTableHost(): m_alias(KeyPair::create()), nodeTable(new TestNodeTable(m_io, m_alias)) {};
~TestNodeTableHost() { m_io.stop(); stopWorking(); }
void generateTestNodes(int _count = 30)
void generateTestNodes(int _count = 16)
{
asserts(_count < 1000);
static uint16_t s_basePort = 30500;
@ -105,11 +114,13 @@ struct TestNodeTableHost: public TestHost
void pingAll()
{
nodeTable->pingAll(m_testNodes);
// for (auto& n: testNodes)
// n->pingAll(m_testNodes);
}
void populate()
void populate(size_t _count = 0)
{
nodeTable->populate(m_testNodes);
nodeTable->populate(m_testNodes, _count);
}
KeyPair m_alias;
@ -130,6 +141,13 @@ public:
bool success = false;
};
BOOST_AUTO_TEST_CASE(test_findnode_neighbors)
{
// Executing findNode should result in a list which is serialized
// into Neighbors packet. Neighbors packet should then be deserialized
// into the same list of nearest nodes.
}
BOOST_AUTO_TEST_CASE(kademlia)
{
TestNodeTableHost node;
@ -137,12 +155,23 @@ BOOST_AUTO_TEST_CASE(kademlia)
node.nodeTable->join(); // ideally, joining with empty node table logs warning we can check for
node.setup();
node.pingAll();
clog << "NodeTable:\n" << *node.nodeTable.get() << endl;
this_thread::sleep_for(chrono::milliseconds(10000));
node.nodeTable->reset();
clog << "NodeTable:\n" << *node.nodeTable.get() << endl;
node.populate(2);
clog << "NodeTable:\n" << *node.nodeTable.get() << endl;
this_thread::sleep_for(chrono::milliseconds(500));
cout << "NodeTable:\n" << *node.nodeTable.get() << endl;
// node.nodeTable->join();
// this_thread::sleep_for(chrono::milliseconds(2000));
//
// clog << "NodeTable:\n" << *node.nodeTable.get() << endl;
}
BOOST_AUTO_TEST_CASE(test_txrx_one)
BOOST_AUTO_TEST_CASE(test_udp_once)
{
UDPDatagram d(bi::udp::endpoint(boost::asio::ip::address::from_string("127.0.0.1"), 30300), bytes({65,65,65,65}));
TestUDPSocket a; a.m_socket->connect(); a.start();

Loading…
Cancel
Save