From ba455270c09cdf4202c8d23bf5d3eedce9a05f45 Mon Sep 17 00:00:00 2001 From: subtly Date: Wed, 24 Dec 2014 14:48:47 +0100 Subject: [PATCH] memory --- libp2p/NodeTable.cpp | 44 +++++++++++++++++++++++++++++--------------- libp2p/NodeTable.h | 36 +++++++++++++++++++++--------------- libp2p/UDP.h | 1 - test/net.cpp | 43 ++++++++++++++++++++++++++++++++++++------- 4 files changed, 86 insertions(+), 38 deletions(-) diff --git a/libp2p/NodeTable.cpp b/libp2p/NodeTable.cpp index 3799a4eef..b5b261321 100644 --- a/libp2p/NodeTable.cpp +++ b/libp2p/NodeTable.cpp @@ -35,8 +35,8 @@ NodeTable::NodeTable(ba::io_service& _io, KeyPair _alias, uint16_t _listenPort): { for (unsigned i = 0; i < s_bins; i++) m_state[i].distance = i, m_state[i].modified = chrono::steady_clock::now() - chrono::seconds(1); - doRefreshBuckets(boost::system::error_code()); m_socketPtr->connect(); + doRefreshBuckets(boost::system::error_code()); } NodeTable::~NodeTable() @@ -87,7 +87,15 @@ void NodeTable::doFindNode(Address _node, unsigned _round, std::shared_ptrisOpen() || _round == s_maxSteps) return; - + + if (_round == s_maxSteps) + { + clog(NodeTableWarn) << "Terminating doFindNode after " << _round << " rounds."; + return; + } + else + _tried.reset(new std::set>()); + auto nearest = findNearest(_node); std::list> tried; for (unsigned i = 0; i < nearest.size() && tried.size() < s_alpha; i++) @@ -137,7 +145,7 @@ std::vector> NodeTable::findNearest(Addres while (head != tail && head < s_bins && count < s_bucketSize) { Guard l(x_state); - for (auto& n: m_state[head].nodes) + for (auto n: m_state[head].nodes) if (auto p = n.lock()) { if (count < s_bucketSize) @@ -147,7 +155,7 @@ std::vector> NodeTable::findNearest(Addres } if (count < s_bucketSize && tail) - for (auto& n: m_state[tail].nodes) + for (auto n: m_state[tail].nodes) if (auto p = n.lock()) { if (count < s_bucketSize) @@ -164,7 +172,7 @@ std::vector> NodeTable::findNearest(Addres while (head < s_bins && count < s_bucketSize) { Guard l(x_state); - for (auto& n: m_state[head].nodes) + for (auto n: m_state[head].nodes) if (auto p = n.lock()) { if (count < s_bucketSize) @@ -178,7 +186,7 @@ std::vector> NodeTable::findNearest(Addres while (tail > 0 && count < s_bucketSize) { Guard l(x_state); - for (auto& n: m_state[tail].nodes) + for (auto n: m_state[tail].nodes) if (auto p = n.lock()) { if (count < s_bucketSize) @@ -191,7 +199,7 @@ std::vector> NodeTable::findNearest(Addres std::vector> ret; for (auto& nodes: found) - for (auto& n: nodes.second) + for (auto n: nodes.second) ret.push_back(n); return std::move(ret); } @@ -223,9 +231,14 @@ void NodeTable::evict(std::shared_ptr _leastSeen, std::shared_ptr node; { Guard l(x_nodes); @@ -325,6 +338,7 @@ void NodeTable::onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytes case 2: if (rlp[0].isList()) { + // todo: chunk neighbors packet clog(NodeTableMessageSummary) << "Received Neighbors from " << _from.address().to_string() << ":" << _from.port(); Neighbors in = Neighbors::fromBytesConstRef(_from, rlpBytes); for (auto n: in.nodes) @@ -371,7 +385,7 @@ void NodeTable::doCheckEvictions(boost::system::error_code const& _ec) return; auto self(shared_from_this()); - m_evictionCheckTimer.expires_from_now(boost::posix_time::milliseconds(s_evictionCheckInterval)); + m_evictionCheckTimer.expires_from_now(c_evictionCheckInterval); m_evictionCheckTimer.async_wait([this, self](boost::system::error_code const& _ec) { if (_ec) @@ -380,17 +394,17 @@ void NodeTable::doCheckEvictions(boost::system::error_code const& _ec) bool evictionsRemain = false; std::list> drop; { - Guard l(x_evictions); + Guard le(x_evictions); + Guard ln(x_nodes); for (auto& e: m_evictions) if (chrono::steady_clock::now() - e.first.second > c_reqTimeout) - { - Guard l(x_nodes); - drop.push_back(m_nodes[e.second]); - } + if (auto n = m_nodes[e.second]) + drop.push_back(n); evictionsRemain = m_evictions.size() - drop.size() > 0; } - for (auto& n: drop) + drop.unique(); + for (auto n: drop) dropNode(n); if (evictionsRemain) diff --git a/libp2p/NodeTable.h b/libp2p/NodeTable.h index 89456b4fd..fe925fea8 100644 --- a/libp2p/NodeTable.h +++ b/libp2p/NodeTable.h @@ -120,9 +120,9 @@ public: /// Intervals - static constexpr unsigned s_evictionCheckInterval = 75; ///< Interval at which eviction timeouts are checked. - std::chrono::milliseconds const c_reqTimeout = std::chrono::milliseconds(300); ///< How long to wait for requests (evict, find iterations). - std::chrono::seconds const c_bucketRefresh = std::chrono::seconds(3600); ///< Refresh interval prevents bucket from becoming stale. [Kademlia] + boost::posix_time::milliseconds const c_evictionCheckInterval = boost::posix_time::milliseconds(75); ///< Interval at which eviction timeouts are checked. + std::chrono::milliseconds const c_reqTimeout = std::chrono::milliseconds(300); ///< How long to wait for requests (evict, find iterations). + std::chrono::seconds const c_bucketRefresh = std::chrono::seconds(3600); ///< Refresh interval prevents bucket from becoming stale. [Kademlia] static unsigned dist(Address const& _a, Address const& _b) { u160 d = _a ^ _b; unsigned ret; for (ret = 0; d >>= 1; ++ret) {}; return ret; } @@ -151,7 +151,7 @@ protected: void evict(std::shared_ptr _leastSeen, std::shared_ptr _new); - void noteNode(Public _pubk, bi::udp::endpoint _endpoint); + void noteNode(Public const& _pubk, bi::udp::endpoint const& _endpoint); void noteNode(std::shared_ptr _n); @@ -234,11 +234,11 @@ struct PingNode: RLPXDatagram PingNode(bi::udp::endpoint _ep, std::string _src, uint16_t _srcPort, std::chrono::seconds _expiration = std::chrono::seconds(60)): RLPXDatagram(_ep), ipAddress(_src), port(_srcPort), expiration(futureFromEpoch(_expiration)) {} std::string ipAddress; // rlp encoded bytes min: 16 - uint16_t port; // rlp encoded bytes min: 1 max: 3 - uint64_t expiration; // rlp encoded bytes min: 1 max: 9 + unsigned port; // rlp encoded bytes min: 1 max: 3 + unsigned expiration; // rlp encoded bytes min: 1 max: 9 void streamRLP(RLPStream& _s) const { _s.appendList(3); _s << ipAddress << port << expiration; } - void interpretRLP(bytesConstRef _bytes) { RLP r(_bytes); ipAddress = (std::string)r[0]; port = (uint16_t)r[1]; expiration = (uint64_t)r[2]; } + void interpretRLP(bytesConstRef _bytes) { RLP r(_bytes); ipAddress = r[0].toString(); port = r[1].toInt(); expiration = r[2].toInt(); } }; /** @@ -280,10 +280,10 @@ struct FindNode: RLPXDatagram FindNode(bi::udp::endpoint _ep, Address _target, std::chrono::seconds _expiration = std::chrono::seconds(30)): RLPXDatagram(_ep), target(_target), expiration(futureFromEpoch(_expiration)) {} h160 target; - uint64_t expiration; + unsigned expiration; void streamRLP(RLPStream& _s) const { _s.appendList(2); _s << target << expiration; } - void interpretRLP(bytesConstRef _bytes) { RLP r(_bytes); target = (h160)r[0]; expiration = (uint64_t)r[1]; } + void interpretRLP(bytesConstRef _bytes) { RLP r(_bytes); target = r[0].toHash(); expiration = r[1].toInt(); } }; /** @@ -299,12 +299,16 @@ struct Neighbors: RLPXDatagram struct Node { Node() = default; - Node(bytesConstRef _bytes) { interpretRLP(_bytes); } + Node(RLP const& _r) { interpretRLP(_r); } std::string ipAddress; - uint16_t port; + unsigned port; NodeId node; - void streamRLP(RLPStream& _s) const { _s.appendList(3); _s << ipAddress << port << node; } - void interpretRLP(bytesConstRef _bytes) { RLP r(_bytes); ipAddress = r[0].toString(); port = (uint16_t)r[1]; node = (h512)r[2]; } + void streamRLP(RLPStream& _s) const { + _s.appendList(3); _s << ipAddress << port << node; + } + void interpretRLP(RLP const& _r) { + ipAddress = _r[0].toString(); port = _r[1].toInt(); node = h512(_r[2].toBytes()); + } }; using RLPXDatagram::RLPXDatagram; @@ -323,8 +327,10 @@ struct Neighbors: RLPXDatagram std::list nodes; h256 nonce; - void streamRLP(RLPStream& _s) const { _s.appendList(2); _s.appendList(nodes.size()); for (auto& n: nodes) n.streamRLP(_s); _s << nonce; } - void interpretRLP(bytesConstRef _bytes) { RLP r(_bytes); for (auto n: r[0]) nodes.push_back(Node(n.toBytesConstRef())); nonce = (h256)r[1]; } + void streamRLP(RLPStream& _s) const { _s.appendList(2); _s.appendList(nodes.size()); for (auto& n: nodes) n.streamRLP(_s); _s << 1; } + void interpretRLP(bytesConstRef _bytes) { + RLP r(_bytes); for (auto n: r[0]) nodes.push_back(Node(n)); nonce = (h256)r[1]; + } }; struct NodeTableWarn: public LogChannel { static const char* name() { return "!P!"; } static const int verbosity = 0; }; diff --git a/libp2p/UDP.h b/libp2p/UDP.h index 498d37f0c..82b89b9c8 100644 --- a/libp2p/UDP.h +++ b/libp2p/UDP.h @@ -221,7 +221,6 @@ void UDPSocket::doWrite() if (sendQ.empty()) return; } - clog(NoteChannel) << "sent datagram"; doWrite(); }); } diff --git a/test/net.cpp b/test/net.cpp index 274f729c6..d5113b8b4 100644 --- a/test/net.cpp +++ b/test/net.cpp @@ -57,14 +57,23 @@ struct TestNodeTable: public NodeTable { bi::address ourIp = bi::address::from_string("127.0.0.1"); for (auto& n: _testNodes) + { ping(bi::udp::endpoint(ourIp, n.second)); + this_thread::sleep_for(chrono::milliseconds(5)); + } } - void populate(std::vector> const& _testNodes) + void populate(std::vector> const& _testNodes, size_t _count = 0) { + if (!_count) + _count = _testNodes.size(); + bi::address ourIp = bi::address::from_string("127.0.0.1"); for (auto& n: _testNodes) - noteNode(n.first.pub(), bi::udp::endpoint(ourIp, n.second)); + if (_count--) + noteNode(n.first.pub(), bi::udp::endpoint(ourIp, n.second)); + else + break; } void reset() @@ -82,7 +91,7 @@ struct TestNodeTableHost: public TestHost TestNodeTableHost(): m_alias(KeyPair::create()), nodeTable(new TestNodeTable(m_io, m_alias)) {}; ~TestNodeTableHost() { m_io.stop(); stopWorking(); } - void generateTestNodes(int _count = 30) + void generateTestNodes(int _count = 16) { asserts(_count < 1000); static uint16_t s_basePort = 30500; @@ -105,11 +114,13 @@ struct TestNodeTableHost: public TestHost void pingAll() { nodeTable->pingAll(m_testNodes); +// for (auto& n: testNodes) +// n->pingAll(m_testNodes); } - void populate() + void populate(size_t _count = 0) { - nodeTable->populate(m_testNodes); + nodeTable->populate(m_testNodes, _count); } KeyPair m_alias; @@ -130,6 +141,13 @@ public: bool success = false; }; +BOOST_AUTO_TEST_CASE(test_findnode_neighbors) +{ + // Executing findNode should result in a list which is serialized + // into Neighbors packet. Neighbors packet should then be deserialized + // into the same list of nearest nodes. +} + BOOST_AUTO_TEST_CASE(kademlia) { TestNodeTableHost node; @@ -137,12 +155,23 @@ BOOST_AUTO_TEST_CASE(kademlia) node.nodeTable->join(); // ideally, joining with empty node table logs warning we can check for node.setup(); node.pingAll(); + clog << "NodeTable:\n" << *node.nodeTable.get() << endl; + this_thread::sleep_for(chrono::milliseconds(10000)); + + node.nodeTable->reset(); + clog << "NodeTable:\n" << *node.nodeTable.get() << endl; + + node.populate(2); + clog << "NodeTable:\n" << *node.nodeTable.get() << endl; this_thread::sleep_for(chrono::milliseconds(500)); - cout << "NodeTable:\n" << *node.nodeTable.get() << endl; +// node.nodeTable->join(); +// this_thread::sleep_for(chrono::milliseconds(2000)); +// +// clog << "NodeTable:\n" << *node.nodeTable.get() << endl; } -BOOST_AUTO_TEST_CASE(test_txrx_one) +BOOST_AUTO_TEST_CASE(test_udp_once) { UDPDatagram d(bi::udp::endpoint(boost::asio::ip::address::from_string("127.0.0.1"), 30300), bytes({65,65,65,65})); TestUDPSocket a; a.m_socket->connect(); a.start();