From 614b61b67afaf1b291de9e8ccc6ee98ac95aaf65 Mon Sep 17 00:00:00 2001 From: subtly Date: Fri, 10 Jul 2015 16:50:05 -0700 Subject: [PATCH] Fix deadlineop/ops mutexes. Rerun discovery regardless of whether timer is cancelled, due to asio prematurely cancelling timer. --- libdevcore/Worker.cpp | 2 +- libp2p/Common.h | 35 +++++++++++++++++++---------------- libp2p/NodeTable.cpp | 11 ++++++++--- test/libp2p/net.cpp | 17 ++++++++++++++--- 4 files changed, 42 insertions(+), 23 deletions(-) diff --git a/libdevcore/Worker.cpp b/libdevcore/Worker.cpp index c058ef7d4..c4ea4996b 100644 --- a/libdevcore/Worker.cpp +++ b/libdevcore/Worker.cpp @@ -58,7 +58,7 @@ void Worker::startWorking() } catch (std::exception const& _e) { - clog(WarnChannel) << "Exception thrown in Worker thread: " _e.what(); + clog(WarnChannel) << "Exception thrown in Worker thread: " << _e.what(); } // ex = WorkerState::Stopping; diff --git a/libp2p/Common.h b/libp2p/Common.h index 7c9950b39..4718787a6 100644 --- a/libp2p/Common.h +++ b/libp2p/Common.h @@ -215,32 +215,35 @@ struct Node virtual operator bool() const { return (bool)id; } }; -class DeadlineOp -{ -public: - DeadlineOp(ba::io_service& _io, unsigned _msInFuture, std::function const& _f): m_timer(new ba::deadline_timer(_io)) { m_timer->expires_from_now(boost::posix_time::milliseconds(_msInFuture)); m_timer->async_wait(_f); } - DeadlineOp(DeadlineOp&& _s): m_timer(_s.m_timer.release()) {} - DeadlineOp& operator=(DeadlineOp&& _s) { m_timer.reset(_s.m_timer.release()); return *this; } - - bool expired() { return m_timer->expires_from_now().total_milliseconds() <= 0; } - void cancel() { m_timer->cancel(); } - -private: - std::unique_ptr m_timer; -}; - class DeadlineOps { + class DeadlineOp + { + public: + DeadlineOp(ba::io_service& _io, unsigned _msInFuture, std::function const& _f): m_timer(new ba::deadline_timer(_io)) { m_timer->expires_from_now(boost::posix_time::milliseconds(_msInFuture)); m_timer->async_wait(_f); } + ~DeadlineOp() {} + + DeadlineOp(DeadlineOp&& _s): m_timer(_s.m_timer.release()) {} + DeadlineOp& operator=(DeadlineOp&& _s) { m_timer.reset(_s.m_timer.release()); return *this; } + + bool expired() { Guard l(x_timer); return m_timer->expires_from_now().total_nanoseconds() <= 0; } + void wait() { Guard l(x_timer); m_timer->wait(); } + + private: + std::unique_ptr m_timer; + Mutex x_timer; + }; + public: DeadlineOps(ba::io_service& _io, unsigned _reapIntervalMs = 100): m_io(_io), m_reapIntervalMs(_reapIntervalMs), m_stopped({false}) { reap(); } - ~DeadlineOps() { m_stopped = true; } + ~DeadlineOps() { stop(); } void schedule(unsigned _msInFuture, std::function const& _f) { if (m_stopped) return; DEV_GUARDED(x_timers) m_timers.emplace_back(m_io, _msInFuture, _f); } void stop() { m_stopped = true; DEV_GUARDED(x_timers) m_timers.clear(); } protected: - void reap() { DEV_GUARDED(x_timers) { auto t = m_timers.begin(); while (t != m_timers.end()) if (t->expired()) m_timers.erase(t); else t++; } schedule(m_reapIntervalMs, [this](boost::system::error_code const& ec){ if (!ec) reap(); }); } + void reap() { Guard l(x_timers); auto t = m_timers.begin(); while (t != m_timers.end()) if (t->expired()) { t->wait(); m_timers.erase(t); } else t++; m_timers.emplace_back(m_io, m_reapIntervalMs, [this](boost::system::error_code const& ec){ if (!ec) reap(); }); } private: ba::io_service& m_io; diff --git a/libp2p/NodeTable.cpp b/libp2p/NodeTable.cpp index 389546605..44ab0aa99 100644 --- a/libp2p/NodeTable.cpp +++ b/libp2p/NodeTable.cpp @@ -156,12 +156,13 @@ void NodeTable::doDiscover(NodeId _node, unsigned _round, shared_ptrisOpen() || _round == s_maxSteps) + if (!m_socketPointer->isOpen()) return; if (_round == s_maxSteps) { clog(NodeTableEvent) << "Terminating discover after " << _round << " rounds."; + doDiscovery(); return; } else if (!_round && !_tried) @@ -197,8 +198,9 @@ void NodeTable::doDiscover(NodeId _node, unsigned _round, shared_ptr= s_bucketSize) { + if (removed) + clog(NodeTableWarn) << "DANGER: Bucket overflow when swapping node position."; + // It's only contested iff nodeentry exists contested = s.nodes.front().lock(); if (!contested) diff --git a/test/libp2p/net.cpp b/test/libp2p/net.cpp index 7603a6d41..7e3719c34 100644 --- a/test/libp2p/net.cpp +++ b/test/libp2p/net.cpp @@ -341,11 +341,22 @@ BOOST_AUTO_TEST_CASE(deadlineTimer) { ba::io_service io; ba::deadline_timer t(io); - t.expires_from_now(boost::posix_time::milliseconds(1)); - this_thread::sleep_for(chrono::milliseconds(2)); + bool start = false; + boost::system::error_code ec; + std::atomic fired(0); + + thread thread([&](){ while(!start) this_thread::sleep_for(chrono::milliseconds(10)); io.run(); }); + t.expires_from_now(boost::posix_time::milliseconds(200)); + start = true; + t.async_wait([&](boost::system::error_code const& _ec){ ec = _ec; fired++; }); + BOOST_REQUIRE_NO_THROW(t.wait()); + this_thread::sleep_for(chrono::milliseconds(250)); auto expire = t.expires_from_now().total_milliseconds(); BOOST_REQUIRE(expire <= 0); - BOOST_REQUIRE_NO_THROW(t.wait()); + BOOST_REQUIRE(fired == 1); + BOOST_REQUIRE(!ec); + io.stop(); + thread.join(); } BOOST_AUTO_TEST_CASE(unspecifiedNode)