From f0c925086d4cd90fef890e19224b3f4b07369f73 Mon Sep 17 00:00:00 2001 From: subtly Date: Thu, 9 Jul 2015 19:27:05 -0700 Subject: [PATCH 1/4] Fixes for #2337, #2349, #2288, #2333, #1398. --- libdevcore/Worker.cpp | 12 +++--- libp2p/Common.h | 38 +++++++++++++++++ libp2p/Host.cpp | 70 +++++++++++++++---------------- libp2p/NodeTable.cpp | 98 ++++++++++++++++--------------------------- libp2p/NodeTable.h | 26 ++++-------- test/libp2p/net.cpp | 29 +++++-------- 6 files changed, 136 insertions(+), 137 deletions(-) diff --git a/libdevcore/Worker.cpp b/libdevcore/Worker.cpp index d47955753..4c64eaa31 100644 --- a/libdevcore/Worker.cpp +++ b/libdevcore/Worker.cpp @@ -50,11 +50,13 @@ void Worker::startWorking() // cnote << "Trying to set Started: Thread was" << (unsigned)ex << "; " << ok; (void)ok; - startedWorking(); -// cnote << "Entering work loop..."; - workLoop(); -// cnote << "Finishing up worker thread..."; - doneWorking(); + try + { + startedWorking(); + workLoop(); + doneWorking(); + } + catch (...) {} // ex = WorkerState::Stopping; // m_state.compare_exchange_strong(ex, WorkerState::Stopped); diff --git a/libp2p/Common.h b/libp2p/Common.h index c124115f9..7c9950b39 100644 --- a/libp2p/Common.h +++ b/libp2p/Common.h @@ -37,6 +37,7 @@ #include #include #include +#include namespace ba = boost::asio; namespace bi = boost::asio::ip; @@ -214,6 +215,43 @@ struct Node virtual operator bool() const { return (bool)id; } }; +class DeadlineOp +{ +public: + DeadlineOp(ba::io_service& _io, unsigned _msInFuture, std::function const& _f): m_timer(new ba::deadline_timer(_io)) { m_timer->expires_from_now(boost::posix_time::milliseconds(_msInFuture)); m_timer->async_wait(_f); } + DeadlineOp(DeadlineOp&& _s): m_timer(_s.m_timer.release()) {} + DeadlineOp& operator=(DeadlineOp&& _s) { m_timer.reset(_s.m_timer.release()); return *this; } + + bool expired() { return m_timer->expires_from_now().total_milliseconds() <= 0; } + void cancel() { m_timer->cancel(); } + +private: + std::unique_ptr m_timer; +}; + +class DeadlineOps +{ +public: + DeadlineOps(ba::io_service& _io, unsigned _reapIntervalMs = 100): m_io(_io), m_reapIntervalMs(_reapIntervalMs), m_stopped({false}) { reap(); } + ~DeadlineOps() { m_stopped = true; } + + void schedule(unsigned _msInFuture, std::function const& _f) { if (m_stopped) return; DEV_GUARDED(x_timers) m_timers.emplace_back(m_io, _msInFuture, _f); } + + void stop() { m_stopped = true; DEV_GUARDED(x_timers) m_timers.clear(); } + +protected: + void reap() { DEV_GUARDED(x_timers) { auto t = m_timers.begin(); while (t != m_timers.end()) if (t->expired()) m_timers.erase(t); else t++; } schedule(m_reapIntervalMs, [this](boost::system::error_code const& ec){ if (!ec) reap(); }); } + +private: + ba::io_service& m_io; + unsigned m_reapIntervalMs; + + std::vector m_timers; + Mutex x_timers; + + std::atomic m_stopped; +}; + } /// Simple stream output for a NodeIPEndpoint. diff --git a/libp2p/Host.cpp b/libp2p/Host.cpp index f033298ef..8482f254a 100644 --- a/libp2p/Host.cpp +++ b/libp2p/Host.cpp @@ -120,6 +120,15 @@ Host::~Host() void Host::start() { startWorking(); + while (isWorking() && !haveNetwork()) + this_thread::sleep_for(chrono::milliseconds(10)); + + // network start failed! + if (isWorking()) + return; + + clog(NetWarn) << "Network start failed!"; + doneWorking(); } void Host::stop() @@ -130,11 +139,12 @@ void Host::stop() { // Although m_run is set by stop() or start(), it effects m_runTimer so x_runTimer is used instead of a mutex for m_run. - // when m_run == false, run() will cause this::run() to stop() ioservice Guard l(x_runTimer); // ignore if already stopped/stopping if (!m_run) return; + + // signal run() to prepare for shutdown and reset m_timer m_run = false; } @@ -143,14 +153,18 @@ void Host::stop() this_thread::sleep_for(chrono::milliseconds(50)); // stop worker thread - stopWorking(); + if (isWorking()) + stopWorking(); } void Host::doneWorking() { - // reset ioservice (allows manually polling network, below) + // reset ioservice (cancels all timers and allows manually polling network, below) m_ioService.reset(); + DEV_GUARDED(x_timers) + m_timers.clear(); + // shutdown acceptor m_tcp4Acceptor.cancel(); if (m_tcp4Acceptor.is_open()) @@ -170,15 +184,13 @@ void Host::doneWorking() // disconnect pending handshake, before peers, as a handshake may create a peer for (unsigned n = 0;; n = 0) { - { - Guard l(x_connecting); + DEV_GUARDED(x_connecting) for (auto i: m_connecting) if (auto h = i.lock()) { h->cancel(); n++; } - } if (!n) break; m_ioService.poll(); @@ -187,8 +199,7 @@ void Host::doneWorking() // disconnect peers for (unsigned n = 0;; n = 0) { - { - RecursiveGuard l(x_sessions); + DEV_RECURSIVE_GUARDED(x_sessions) for (auto i: m_sessions) if (auto p = i.second.lock()) if (p->isConnected()) @@ -196,7 +207,6 @@ void Host::doneWorking() p->disconnect(ClientQuit); n++; } - } if (!n) break; @@ -465,6 +475,9 @@ void Host::addNode(NodeId const& _node, NodeIPEndpoint const& _endpoint) void Host::requirePeer(NodeId const& _n, NodeIPEndpoint const& _endpoint) { + if (!m_run) + return; + Node node(_n, _endpoint, true); if (_n) { @@ -482,22 +495,21 @@ void Host::requirePeer(NodeId const& _n, NodeIPEndpoint const& _endpoint) p.reset(new Peer(node)); m_peers[_n] = p; } - connect(p); } else if (m_nodeTable) { - shared_ptr t(new boost::asio::deadline_timer(m_ioService)); - m_timers.push_back(t); - m_nodeTable->addNode(node); + shared_ptr t(new boost::asio::deadline_timer(m_ioService)); t->expires_from_now(boost::posix_time::milliseconds(600)); t->async_wait([this, _n](boost::system::error_code const& _ec) { - if (!_ec && m_nodeTable) - // FIXME RACE CONDITION (use weak_ptr or mutex). - if (auto n = m_nodeTable->node(_n)) - requirePeer(n.id, n.endpoint); + if (!_ec) + if (m_nodeTable) + if (auto n = m_nodeTable->node(_n)) + requirePeer(n.id, n.endpoint); }); + DEV_GUARDED(x_timers) + m_timers.push_back(t); } } @@ -512,8 +524,6 @@ void Host::connect(std::shared_ptr const& _p) { if (!m_run) return; - - _p->m_lastAttempted = std::chrono::system_clock::now(); if (havePeerSession(_p->id)) { @@ -539,6 +549,8 @@ void Host::connect(std::shared_ptr const& _p) m_pendingPeerConns.insert(nptr); } + _p->m_lastAttempted = std::chrono::system_clock::now(); + bi::tcp::endpoint ep(_p->endpoint); clog(NetConnect) << "Attempting connection to node" << _p->id << "@" << ep << "from" << id(); auto socket = make_shared(new bi::tcp::socket(m_ioService)); @@ -615,21 +627,13 @@ void Host::run(boost::system::error_code const&) m_nodeTable->processEvents(); // cleanup zombies - { - Guard l(x_connecting); - m_connecting.remove_if([](std::weak_ptr h){ return h.lock(); }); - } - { - Guard l(x_timers); + DEV_GUARDED(x_connecting); + m_connecting.remove_if([](std::weak_ptr h){ return h.expired(); }); + DEV_GUARDED(x_timers) m_timers.remove_if([](std::shared_ptr t) { - return t->expires_from_now().total_milliseconds() > 0; + return t->expires_from_now().total_milliseconds() < 0; }); - } - - for (auto p: m_sessions) - if (auto pp = p.second.lock()) - pp->serviceNodesRequest(); keepAlivePeers(); @@ -666,13 +670,9 @@ void Host::run(boost::system::error_code const&) pendingCount = m_pendingPeerConns.size(); int openSlots = m_idealPeerCount - peerCount() - pendingCount + reqConn; if (openSlots > 0) - { for (auto p: toConnect) if (!p->required && openSlots--) connect(p); - - m_nodeTable->discover(); - } } auto runcb = [this](boost::system::error_code const& error) { run(error); }; diff --git a/libp2p/NodeTable.cpp b/libp2p/NodeTable.cpp index 1a0b11734..389546605 100644 --- a/libp2p/NodeTable.cpp +++ b/libp2p/NodeTable.cpp @@ -43,33 +43,31 @@ NodeEntry::NodeEntry(NodeId const& _src, Public const& _pubk, NodeIPEndpoint con NodeTable::NodeTable(ba::io_service& _io, KeyPair const& _alias, NodeIPEndpoint const& _endpoint, bool _enabled): m_node(Node(_alias.pub(), _endpoint)), m_secret(_alias.sec()), - m_io(_io), - m_socket(new NodeSocket(m_io, *this, (bi::udp::endpoint)m_node.endpoint)), + m_socket(new NodeSocket(_io, *this, (bi::udp::endpoint)m_node.endpoint)), m_socketPointer(m_socket.get()), - m_bucketRefreshTimer(m_io), - m_evictionCheckTimer(m_io), - m_disabled(!_enabled) + m_timers(_io) { for (unsigned i = 0; i < s_bins; i++) - { m_state[i].distance = i; - m_state[i].modified = chrono::steady_clock::now() - chrono::seconds(1); - } - if (!m_disabled) + if (!_enabled) + return; + + try { m_socketPointer->connect(); - doRefreshBuckets(boost::system::error_code()); + doDiscovery(); + } + catch (std::exception const& _e) + { + clog(NetWarn) << "Exception connecting NodeTable socket: " << _e.what(); + clog(NetWarn) << "Discovery disabled."; } } NodeTable::~NodeTable() { - // Cancel scheduled tasks to ensure. - m_evictionCheckTimer.cancel(); - m_bucketRefreshTimer.cancel(); - - // Disconnect socket so that deallocation is safe. + m_timers.stop(); m_socketPointer->disconnect(); } @@ -117,16 +115,6 @@ shared_ptr NodeTable::addNode(Node const& _node, NodeRelation _relati return ret; } -void NodeTable::discover() -{ - static chrono::steady_clock::time_point s_lastDiscover = chrono::steady_clock::now() - std::chrono::seconds(30); - if (chrono::steady_clock::now() > s_lastDiscover + std::chrono::seconds(30)) - { - s_lastDiscover = chrono::steady_clock::now(); - discover(m_node.id); - } -} - list NodeTable::nodes() const { list nodes; @@ -164,8 +152,10 @@ shared_ptr NodeTable::nodeEntry(NodeId _id) return m_nodes.count(_id) ? m_nodes[_id] : shared_ptr(); } -void NodeTable::discover(NodeId _node, unsigned _round, shared_ptr>> _tried) +void NodeTable::doDiscover(NodeId _node, unsigned _round, shared_ptr>> _tried) { + // NOTE: ONLY called by doDiscovery! + if (!m_socketPointer->isOpen() || _round == s_maxSteps) return; @@ -195,6 +185,7 @@ void NodeTable::discover(NodeId _node, unsigned _round, shared_ptrinsert(tried.front()); tried.pop_front(); } - - auto self(shared_from_this()); - m_evictionCheckTimer.expires_from_now(boost::posix_time::milliseconds(c_reqTimeout.count() * 2)); - m_evictionCheckTimer.async_wait([this, self, _node, _round, _tried](boost::system::error_code const& _ec) + + m_timers.schedule(c_reqTimeout.count() * 2, [this, _node, _round, _tried](boost::system::error_code const& _ec) { - if (_ec) - return; - discover(_node, _round + 1, _tried); + if (!_ec) + doDiscover(_node, _round + 1, _tried); }); } @@ -310,15 +298,15 @@ void NodeTable::evict(shared_ptr _leastSeen, shared_ptr _n if (!m_socketPointer->isOpen()) return; - unsigned ec; + unsigned evicts; DEV_GUARDED(x_evictions) { m_evictions.push_back(EvictionTimeout(make_pair(_leastSeen->id,chrono::steady_clock::now()), _new->id)); - ec = m_evictions.size(); + evicts = m_evictions.size(); } - if (ec == 1) - doCheckEvictions(boost::system::error_code()); + if (evicts == 1) + doCheckEvictions(); ping(_leastSeen.get()); } @@ -354,8 +342,6 @@ void NodeTable::noteActiveNode(Public const& _pubk, bi::udp::endpoint const& _en { s.nodes.pop_front(); s.nodes.push_back(node); - s.touch(); - if (!removed && m_nodeEventHandler) m_nodeEventHandler->appendEvent(node->id, NodeEntryAdded); } @@ -363,8 +349,6 @@ void NodeTable::noteActiveNode(Public const& _pubk, bi::udp::endpoint const& _en else { s.nodes.push_back(node); - s.touch(); - if (!removed && m_nodeEventHandler) m_nodeEventHandler->appendEvent(node->id, NodeEntryAdded); } @@ -576,14 +560,9 @@ void NodeTable::onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytes } } -void NodeTable::doCheckEvictions(boost::system::error_code const& _ec) +void NodeTable::doCheckEvictions() { - if (_ec || !m_socketPointer->isOpen()) - return; - - auto self(shared_from_this()); - m_evictionCheckTimer.expires_from_now(c_evictionCheckInterval); - m_evictionCheckTimer.async_wait([this, self](boost::system::error_code const& _ec) + m_timers.schedule(c_evictionCheckInterval.count(), [this](boost::system::error_code const& _ec) { if (_ec) return; @@ -605,28 +584,23 @@ void NodeTable::doCheckEvictions(boost::system::error_code const& _ec) dropNode(n); if (evictionsRemain) - doCheckEvictions(boost::system::error_code()); + doCheckEvictions(); }); } -void NodeTable::doRefreshBuckets(boost::system::error_code const& _ec) +void NodeTable::doDiscovery() { - if (_ec) - return; - - clog(NodeTableEvent) << "refreshing buckets"; - bool connected = m_socketPointer->isOpen(); - if (connected) + m_timers.schedule(c_bucketRefresh.count(), [this](boost::system::error_code const& ec) { + if (ec) + return; + + clog(NodeTableEvent) << "performing random discovery"; NodeId randNodeId; crypto::Nonce::get().ref().copyTo(randNodeId.ref().cropped(0, h256::size)); crypto::Nonce::get().ref().copyTo(randNodeId.ref().cropped(h256::size, h256::size)); - discover(randNodeId); - } - - auto runcb = [this](boost::system::error_code const& error) { doRefreshBuckets(error); }; - m_bucketRefreshTimer.expires_from_now(boost::posix_time::milliseconds(c_bucketRefresh.count())); - m_bucketRefreshTimer.async_wait(runcb); + doDiscover(randNodeId); + }); } void PingNode::streamRLP(RLPStream& _s) const diff --git a/libp2p/NodeTable.h b/libp2p/NodeTable.h index d31a356ef..4d6b02d99 100644 --- a/libp2p/NodeTable.h +++ b/libp2p/NodeTable.h @@ -128,6 +128,7 @@ class NodeTable: UDPSocketEvents, public std::enable_shared_from_this public: enum NodeRelation { Unknown = 0, Known }; + enum DiscoverType { Random = 0 }; /// Constructor requiring host for I/O, credentials, and IP Address and port to listen on. NodeTable(ba::io_service& _io, KeyPair const& _alias, NodeIPEndpoint const& _endpoint, bool _enabled = true); @@ -145,9 +146,6 @@ public: /// Add node. Node will be pinged and empty shared_ptr is returned if node has never been seen or NodeId is empty. std::shared_ptr addNode(Node const& _node, NodeRelation _relation = NodeRelation::Unknown); - /// To be called when node table is empty. Runs node discovery with m_node.id as the target in order to populate node-table. - void discover(); - /// Returns list of node ids active in node table. std::list nodes() const; @@ -184,16 +182,14 @@ private: /// Intervals /* todo: replace boost::posix_time; change constants to upper camelcase */ - boost::posix_time::milliseconds const c_evictionCheckInterval = boost::posix_time::milliseconds(75); ///< Interval at which eviction timeouts are checked. + std::chrono::milliseconds const c_evictionCheckInterval = std::chrono::milliseconds(75); ///< Interval at which eviction timeouts are checked. std::chrono::milliseconds const c_reqTimeout = std::chrono::milliseconds(300); ///< How long to wait for requests (evict, find iterations). std::chrono::milliseconds const c_bucketRefresh = std::chrono::milliseconds(7200); ///< Refresh interval prevents bucket from becoming stale. [Kademlia] struct NodeBucket { unsigned distance; - TimePoint modified; std::list> nodes; - void touch() { modified = std::chrono::steady_clock::now(); } }; /// Used to ping endpoint. @@ -210,7 +206,7 @@ private: /// Used to discovery nodes on network which are close to the given target. /// Sends s_alpha concurrent requests to nodes nearest to target, for nodes nearest to target, up to s_maxSteps rounds. - void discover(NodeId _target, unsigned _round = 0, std::shared_ptr>> _tried = std::shared_ptr>>()); + void doDiscover(NodeId _target, unsigned _round = 0, std::shared_ptr>> _tried = std::shared_ptr>>()); /// Returns nodes from node table which are closest to target. std::vector> nearestNodeEntries(NodeId _target); @@ -240,10 +236,10 @@ private: /// Tasks /// Called by evict() to ensure eviction check is scheduled to run and terminates when no evictions remain. Asynchronous. - void doCheckEvictions(boost::system::error_code const& _ec); + void doCheckEvictions(); - /// Purges and pings nodes for any buckets which haven't been touched for c_bucketRefresh seconds. - void doRefreshBuckets(boost::system::error_code const& _ec); + /// Looks up a random node at @c_bucketRefresh interval. + void doDiscovery(); std::unique_ptr m_nodeEventHandler; ///< Event handler for node events. @@ -251,7 +247,7 @@ private: Secret m_secret; ///< This nodes secret key. mutable Mutex x_nodes; ///< LOCK x_state first if both locks are required. Mutable for thread-safe copy in nodes() const. - std::unordered_map> m_nodes; ///< Nodes + std::unordered_map> m_nodes; ///< Known Node Endpoints mutable Mutex x_state; ///< LOCK x_state first if both x_nodes and x_state locks are required. std::array m_state; ///< State of p2p node network. @@ -264,15 +260,11 @@ private: Mutex x_findNodeTimeout; std::list m_findNodeTimeout; ///< Timeouts for pending Ping and FindNode requests. - - ba::io_service& m_io; ///< Used by bucket refresh timer. + std::shared_ptr m_socket; ///< Shared pointer for our UDPSocket; ASIO requires shared_ptr. NodeSocket* m_socketPointer; ///< Set to m_socket.get(). Socket is created in constructor and disconnected in destructor to ensure access to pointer is safe. - boost::asio::deadline_timer m_bucketRefreshTimer; ///< Timer which schedules and enacts bucket refresh. - boost::asio::deadline_timer m_evictionCheckTimer; ///< Timer for handling node evictions. - - bool m_disabled; ///< Disable discovery. + DeadlineOps m_timers; }; inline std::ostream& operator<<(std::ostream& _out, NodeTable const& _nodeTable) diff --git a/test/libp2p/net.cpp b/test/libp2p/net.cpp index 1e3e2e15c..7603a6d41 100644 --- a/test/libp2p/net.cpp +++ b/test/libp2p/net.cpp @@ -308,35 +308,17 @@ BOOST_AUTO_TEST_CASE(kademlia) if (test::Options::get().nonetwork) return; - // Not yet a 'real' test. TestNodeTableHost node(8); node.start(); - node.nodeTable->discover(); // ideally, joining with empty node table logs warning we can check for node.setup(); node.populate(); -// clog << "NodeTable:\n" << *node.nodeTable.get() << endl; - node.populateAll(); -// clog << "NodeTable:\n" << *node.nodeTable.get() << endl; - auto nodes = node.nodeTable->nodes(); nodes.sort(); - node.nodeTable->reset(); -// clog << "NodeTable:\n" << *node.nodeTable.get() << endl; - node.populate(1); -// clog << "NodeTable:\n" << *node.nodeTable.get() << endl; - - node.nodeTable->discover(); this_thread::sleep_for(chrono::milliseconds(2000)); -// clog << "NodeTable:\n" << *node.nodeTable.get() << endl; - BOOST_REQUIRE_EQUAL(node.nodeTable->count(), 8); - - auto netNodes = node.nodeTable->nodes(); - netNodes.sort(); - } BOOST_AUTO_TEST_CASE(udpOnce) @@ -355,6 +337,17 @@ BOOST_AUTO_TEST_SUITE_END() BOOST_AUTO_TEST_SUITE(netTypes) +BOOST_AUTO_TEST_CASE(deadlineTimer) +{ + ba::io_service io; + ba::deadline_timer t(io); + t.expires_from_now(boost::posix_time::milliseconds(1)); + this_thread::sleep_for(chrono::milliseconds(2)); + auto expire = t.expires_from_now().total_milliseconds(); + BOOST_REQUIRE(expire <= 0); + BOOST_REQUIRE_NO_THROW(t.wait()); +} + BOOST_AUTO_TEST_CASE(unspecifiedNode) { if (test::Options::get().nonetwork) From 7cb578cc1f4d605c79e1a6e06f6cb08f645d58aa Mon Sep 17 00:00:00 2001 From: subtly Date: Fri, 10 Jul 2015 11:19:00 -0700 Subject: [PATCH 2/4] log worker exceptions --- libdevcore/Worker.cpp | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/libdevcore/Worker.cpp b/libdevcore/Worker.cpp index 4c64eaa31..c058ef7d4 100644 --- a/libdevcore/Worker.cpp +++ b/libdevcore/Worker.cpp @@ -56,7 +56,10 @@ void Worker::startWorking() workLoop(); doneWorking(); } - catch (...) {} + catch (std::exception const& _e) + { + clog(WarnChannel) << "Exception thrown in Worker thread: " _e.what(); + } // ex = WorkerState::Stopping; // m_state.compare_exchange_strong(ex, WorkerState::Stopped); From 614b61b67afaf1b291de9e8ccc6ee98ac95aaf65 Mon Sep 17 00:00:00 2001 From: subtly Date: Fri, 10 Jul 2015 16:50:05 -0700 Subject: [PATCH 3/4] Fix deadlineop/ops mutexes. Rerun discovery regardless of whether timer is cancelled, due to asio prematurely cancelling timer. --- libdevcore/Worker.cpp | 2 +- libp2p/Common.h | 35 +++++++++++++++++++---------------- libp2p/NodeTable.cpp | 11 ++++++++--- test/libp2p/net.cpp | 17 ++++++++++++++--- 4 files changed, 42 insertions(+), 23 deletions(-) diff --git a/libdevcore/Worker.cpp b/libdevcore/Worker.cpp index c058ef7d4..c4ea4996b 100644 --- a/libdevcore/Worker.cpp +++ b/libdevcore/Worker.cpp @@ -58,7 +58,7 @@ void Worker::startWorking() } catch (std::exception const& _e) { - clog(WarnChannel) << "Exception thrown in Worker thread: " _e.what(); + clog(WarnChannel) << "Exception thrown in Worker thread: " << _e.what(); } // ex = WorkerState::Stopping; diff --git a/libp2p/Common.h b/libp2p/Common.h index 7c9950b39..4718787a6 100644 --- a/libp2p/Common.h +++ b/libp2p/Common.h @@ -215,32 +215,35 @@ struct Node virtual operator bool() const { return (bool)id; } }; -class DeadlineOp -{ -public: - DeadlineOp(ba::io_service& _io, unsigned _msInFuture, std::function const& _f): m_timer(new ba::deadline_timer(_io)) { m_timer->expires_from_now(boost::posix_time::milliseconds(_msInFuture)); m_timer->async_wait(_f); } - DeadlineOp(DeadlineOp&& _s): m_timer(_s.m_timer.release()) {} - DeadlineOp& operator=(DeadlineOp&& _s) { m_timer.reset(_s.m_timer.release()); return *this; } - - bool expired() { return m_timer->expires_from_now().total_milliseconds() <= 0; } - void cancel() { m_timer->cancel(); } - -private: - std::unique_ptr m_timer; -}; - class DeadlineOps { + class DeadlineOp + { + public: + DeadlineOp(ba::io_service& _io, unsigned _msInFuture, std::function const& _f): m_timer(new ba::deadline_timer(_io)) { m_timer->expires_from_now(boost::posix_time::milliseconds(_msInFuture)); m_timer->async_wait(_f); } + ~DeadlineOp() {} + + DeadlineOp(DeadlineOp&& _s): m_timer(_s.m_timer.release()) {} + DeadlineOp& operator=(DeadlineOp&& _s) { m_timer.reset(_s.m_timer.release()); return *this; } + + bool expired() { Guard l(x_timer); return m_timer->expires_from_now().total_nanoseconds() <= 0; } + void wait() { Guard l(x_timer); m_timer->wait(); } + + private: + std::unique_ptr m_timer; + Mutex x_timer; + }; + public: DeadlineOps(ba::io_service& _io, unsigned _reapIntervalMs = 100): m_io(_io), m_reapIntervalMs(_reapIntervalMs), m_stopped({false}) { reap(); } - ~DeadlineOps() { m_stopped = true; } + ~DeadlineOps() { stop(); } void schedule(unsigned _msInFuture, std::function const& _f) { if (m_stopped) return; DEV_GUARDED(x_timers) m_timers.emplace_back(m_io, _msInFuture, _f); } void stop() { m_stopped = true; DEV_GUARDED(x_timers) m_timers.clear(); } protected: - void reap() { DEV_GUARDED(x_timers) { auto t = m_timers.begin(); while (t != m_timers.end()) if (t->expired()) m_timers.erase(t); else t++; } schedule(m_reapIntervalMs, [this](boost::system::error_code const& ec){ if (!ec) reap(); }); } + void reap() { Guard l(x_timers); auto t = m_timers.begin(); while (t != m_timers.end()) if (t->expired()) { t->wait(); m_timers.erase(t); } else t++; m_timers.emplace_back(m_io, m_reapIntervalMs, [this](boost::system::error_code const& ec){ if (!ec) reap(); }); } private: ba::io_service& m_io; diff --git a/libp2p/NodeTable.cpp b/libp2p/NodeTable.cpp index 389546605..44ab0aa99 100644 --- a/libp2p/NodeTable.cpp +++ b/libp2p/NodeTable.cpp @@ -156,12 +156,13 @@ void NodeTable::doDiscover(NodeId _node, unsigned _round, shared_ptrisOpen() || _round == s_maxSteps) + if (!m_socketPointer->isOpen()) return; if (_round == s_maxSteps) { clog(NodeTableEvent) << "Terminating discover after " << _round << " rounds."; + doDiscovery(); return; } else if (!_round && !_tried) @@ -197,8 +198,9 @@ void NodeTable::doDiscover(NodeId _node, unsigned _round, shared_ptr= s_bucketSize) { + if (removed) + clog(NodeTableWarn) << "DANGER: Bucket overflow when swapping node position."; + // It's only contested iff nodeentry exists contested = s.nodes.front().lock(); if (!contested) diff --git a/test/libp2p/net.cpp b/test/libp2p/net.cpp index 7603a6d41..7e3719c34 100644 --- a/test/libp2p/net.cpp +++ b/test/libp2p/net.cpp @@ -341,11 +341,22 @@ BOOST_AUTO_TEST_CASE(deadlineTimer) { ba::io_service io; ba::deadline_timer t(io); - t.expires_from_now(boost::posix_time::milliseconds(1)); - this_thread::sleep_for(chrono::milliseconds(2)); + bool start = false; + boost::system::error_code ec; + std::atomic fired(0); + + thread thread([&](){ while(!start) this_thread::sleep_for(chrono::milliseconds(10)); io.run(); }); + t.expires_from_now(boost::posix_time::milliseconds(200)); + start = true; + t.async_wait([&](boost::system::error_code const& _ec){ ec = _ec; fired++; }); + BOOST_REQUIRE_NO_THROW(t.wait()); + this_thread::sleep_for(chrono::milliseconds(250)); auto expire = t.expires_from_now().total_milliseconds(); BOOST_REQUIRE(expire <= 0); - BOOST_REQUIRE_NO_THROW(t.wait()); + BOOST_REQUIRE(fired == 1); + BOOST_REQUIRE(!ec); + io.stop(); + thread.join(); } BOOST_AUTO_TEST_CASE(unspecifiedNode) From 2991bf40976896d50a53964dfbd7bbab147407d3 Mon Sep 17 00:00:00 2001 From: subtly Date: Sat, 11 Jul 2015 11:45:20 -0700 Subject: [PATCH 4/4] Use const, nudge buildbot. --- libp2p/Host.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libp2p/Host.cpp b/libp2p/Host.cpp index 8482f254a..d57741e61 100644 --- a/libp2p/Host.cpp +++ b/libp2p/Host.cpp @@ -185,7 +185,7 @@ void Host::doneWorking() for (unsigned n = 0;; n = 0) { DEV_GUARDED(x_connecting) - for (auto i: m_connecting) + for (auto const& i: m_connecting) if (auto h = i.lock()) { h->cancel();