Browse Source

Fix deadlineop/ops mutexes. Rerun discovery regardless of whether timer is cancelled, due to asio prematurely cancelling timer.

cl-refactor
subtly 10 years ago
parent
commit
614b61b67a
  1. 2
      libdevcore/Worker.cpp
  2. 35
      libp2p/Common.h
  3. 11
      libp2p/NodeTable.cpp
  4. 17
      test/libp2p/net.cpp

2
libdevcore/Worker.cpp

@ -58,7 +58,7 @@ void Worker::startWorking()
}
catch (std::exception const& _e)
{
clog(WarnChannel) << "Exception thrown in Worker thread: " _e.what();
clog(WarnChannel) << "Exception thrown in Worker thread: " << _e.what();
}
// ex = WorkerState::Stopping;

35
libp2p/Common.h

@ -215,32 +215,35 @@ struct Node
virtual operator bool() const { return (bool)id; }
};
class DeadlineOp
{
public:
DeadlineOp(ba::io_service& _io, unsigned _msInFuture, std::function<void(boost::system::error_code const&)> const& _f): m_timer(new ba::deadline_timer(_io)) { m_timer->expires_from_now(boost::posix_time::milliseconds(_msInFuture)); m_timer->async_wait(_f); }
DeadlineOp(DeadlineOp&& _s): m_timer(_s.m_timer.release()) {}
DeadlineOp& operator=(DeadlineOp&& _s) { m_timer.reset(_s.m_timer.release()); return *this; }
bool expired() { return m_timer->expires_from_now().total_milliseconds() <= 0; }
void cancel() { m_timer->cancel(); }
private:
std::unique_ptr<ba::deadline_timer> m_timer;
};
class DeadlineOps
{
class DeadlineOp
{
public:
DeadlineOp(ba::io_service& _io, unsigned _msInFuture, std::function<void(boost::system::error_code const&)> const& _f): m_timer(new ba::deadline_timer(_io)) { m_timer->expires_from_now(boost::posix_time::milliseconds(_msInFuture)); m_timer->async_wait(_f); }
~DeadlineOp() {}
DeadlineOp(DeadlineOp&& _s): m_timer(_s.m_timer.release()) {}
DeadlineOp& operator=(DeadlineOp&& _s) { m_timer.reset(_s.m_timer.release()); return *this; }
bool expired() { Guard l(x_timer); return m_timer->expires_from_now().total_nanoseconds() <= 0; }
void wait() { Guard l(x_timer); m_timer->wait(); }
private:
std::unique_ptr<ba::deadline_timer> m_timer;
Mutex x_timer;
};
public:
DeadlineOps(ba::io_service& _io, unsigned _reapIntervalMs = 100): m_io(_io), m_reapIntervalMs(_reapIntervalMs), m_stopped({false}) { reap(); }
~DeadlineOps() { m_stopped = true; }
~DeadlineOps() { stop(); }
void schedule(unsigned _msInFuture, std::function<void(boost::system::error_code const&)> const& _f) { if (m_stopped) return; DEV_GUARDED(x_timers) m_timers.emplace_back(m_io, _msInFuture, _f); }
void stop() { m_stopped = true; DEV_GUARDED(x_timers) m_timers.clear(); }
protected:
void reap() { DEV_GUARDED(x_timers) { auto t = m_timers.begin(); while (t != m_timers.end()) if (t->expired()) m_timers.erase(t); else t++; } schedule(m_reapIntervalMs, [this](boost::system::error_code const& ec){ if (!ec) reap(); }); }
void reap() { Guard l(x_timers); auto t = m_timers.begin(); while (t != m_timers.end()) if (t->expired()) { t->wait(); m_timers.erase(t); } else t++; m_timers.emplace_back(m_io, m_reapIntervalMs, [this](boost::system::error_code const& ec){ if (!ec) reap(); }); }
private:
ba::io_service& m_io;

11
libp2p/NodeTable.cpp

@ -156,12 +156,13 @@ void NodeTable::doDiscover(NodeId _node, unsigned _round, shared_ptr<set<shared_
{
// NOTE: ONLY called by doDiscovery!
if (!m_socketPointer->isOpen() || _round == s_maxSteps)
if (!m_socketPointer->isOpen())
return;
if (_round == s_maxSteps)
{
clog(NodeTableEvent) << "Terminating discover after " << _round << " rounds.";
doDiscovery();
return;
}
else if (!_round && !_tried)
@ -197,8 +198,9 @@ void NodeTable::doDiscover(NodeId _node, unsigned _round, shared_ptr<set<shared_
m_timers.schedule(c_reqTimeout.count() * 2, [this, _node, _round, _tried](boost::system::error_code const& _ec)
{
if (!_ec)
doDiscover(_node, _round + 1, _tried);
if (_ec)
clog(NodeTableWarn) << "Discovery timer canceled!";
doDiscover(_node, _round + 1, _tried);
});
}
@ -336,6 +338,9 @@ void NodeTable::noteActiveNode(Public const& _pubk, bi::udp::endpoint const& _en
if (s.nodes.size() >= s_bucketSize)
{
if (removed)
clog(NodeTableWarn) << "DANGER: Bucket overflow when swapping node position.";
// It's only contested iff nodeentry exists
contested = s.nodes.front().lock();
if (!contested)

17
test/libp2p/net.cpp

@ -341,11 +341,22 @@ BOOST_AUTO_TEST_CASE(deadlineTimer)
{
ba::io_service io;
ba::deadline_timer t(io);
t.expires_from_now(boost::posix_time::milliseconds(1));
this_thread::sleep_for(chrono::milliseconds(2));
bool start = false;
boost::system::error_code ec;
std::atomic<unsigned> fired(0);
thread thread([&](){ while(!start) this_thread::sleep_for(chrono::milliseconds(10)); io.run(); });
t.expires_from_now(boost::posix_time::milliseconds(200));
start = true;
t.async_wait([&](boost::system::error_code const& _ec){ ec = _ec; fired++; });
BOOST_REQUIRE_NO_THROW(t.wait());
this_thread::sleep_for(chrono::milliseconds(250));
auto expire = t.expires_from_now().total_milliseconds();
BOOST_REQUIRE(expire <= 0);
BOOST_REQUIRE_NO_THROW(t.wait());
BOOST_REQUIRE(fired == 1);
BOOST_REQUIRE(!ec);
io.stop();
thread.join();
}
BOOST_AUTO_TEST_CASE(unspecifiedNode)

Loading…
Cancel
Save