Browse Source

Merge pull request #2438 from subtly/netFix

Fixes for #2337, #2349, #2288, #2333, #1398.
cl-refactor
Gav Wood 10 years ago
parent
commit
0a293b10df
  1. 15
      libdevcore/Worker.cpp
  2. 41
      libp2p/Common.h
  3. 72
      libp2p/Host.cpp
  4. 103
      libp2p/NodeTable.cpp
  5. 26
      libp2p/NodeTable.h
  6. 40
      test/libp2p/net.cpp

15
libdevcore/Worker.cpp

@ -50,11 +50,16 @@ void Worker::startWorking()
// cnote << "Trying to set Started: Thread was" << (unsigned)ex << "; " << ok;
(void)ok;
startedWorking();
// cnote << "Entering work loop...";
workLoop();
// cnote << "Finishing up worker thread...";
doneWorking();
try
{
startedWorking();
workLoop();
doneWorking();
}
catch (std::exception const& _e)
{
clog(WarnChannel) << "Exception thrown in Worker thread: " << _e.what();
}
// ex = WorkerState::Stopping;
// m_state.compare_exchange_strong(ex, WorkerState::Stopped);

41
libp2p/Common.h

@ -37,6 +37,7 @@
#include <libdevcore/Log.h>
#include <libdevcore/Exceptions.h>
#include <libdevcore/RLP.h>
#include <libdevcore/Guards.h>
namespace ba = boost::asio;
namespace bi = boost::asio::ip;
@ -214,6 +215,46 @@ struct Node
virtual operator bool() const { return (bool)id; }
};
class DeadlineOps
{
class DeadlineOp
{
public:
DeadlineOp(ba::io_service& _io, unsigned _msInFuture, std::function<void(boost::system::error_code const&)> const& _f): m_timer(new ba::deadline_timer(_io)) { m_timer->expires_from_now(boost::posix_time::milliseconds(_msInFuture)); m_timer->async_wait(_f); }
~DeadlineOp() {}
DeadlineOp(DeadlineOp&& _s): m_timer(_s.m_timer.release()) {}
DeadlineOp& operator=(DeadlineOp&& _s) { m_timer.reset(_s.m_timer.release()); return *this; }
bool expired() { Guard l(x_timer); return m_timer->expires_from_now().total_nanoseconds() <= 0; }
void wait() { Guard l(x_timer); m_timer->wait(); }
private:
std::unique_ptr<ba::deadline_timer> m_timer;
Mutex x_timer;
};
public:
DeadlineOps(ba::io_service& _io, unsigned _reapIntervalMs = 100): m_io(_io), m_reapIntervalMs(_reapIntervalMs), m_stopped({false}) { reap(); }
~DeadlineOps() { stop(); }
void schedule(unsigned _msInFuture, std::function<void(boost::system::error_code const&)> const& _f) { if (m_stopped) return; DEV_GUARDED(x_timers) m_timers.emplace_back(m_io, _msInFuture, _f); }
void stop() { m_stopped = true; DEV_GUARDED(x_timers) m_timers.clear(); }
protected:
void reap() { Guard l(x_timers); auto t = m_timers.begin(); while (t != m_timers.end()) if (t->expired()) { t->wait(); m_timers.erase(t); } else t++; m_timers.emplace_back(m_io, m_reapIntervalMs, [this](boost::system::error_code const& ec){ if (!ec) reap(); }); }
private:
ba::io_service& m_io;
unsigned m_reapIntervalMs;
std::vector<DeadlineOp> m_timers;
Mutex x_timers;
std::atomic<bool> m_stopped;
};
}
/// Simple stream output for a NodeIPEndpoint.

72
libp2p/Host.cpp

@ -120,6 +120,15 @@ Host::~Host()
void Host::start()
{
startWorking();
while (isWorking() && !haveNetwork())
this_thread::sleep_for(chrono::milliseconds(10));
// network start failed!
if (isWorking())
return;
clog(NetWarn) << "Network start failed!";
doneWorking();
}
void Host::stop()
@ -130,11 +139,12 @@ void Host::stop()
{
// Although m_run is set by stop() or start(), it effects m_runTimer so x_runTimer is used instead of a mutex for m_run.
// when m_run == false, run() will cause this::run() to stop() ioservice
Guard l(x_runTimer);
// ignore if already stopped/stopping
if (!m_run)
return;
// signal run() to prepare for shutdown and reset m_timer
m_run = false;
}
@ -143,14 +153,18 @@ void Host::stop()
this_thread::sleep_for(chrono::milliseconds(50));
// stop worker thread
stopWorking();
if (isWorking())
stopWorking();
}
void Host::doneWorking()
{
// reset ioservice (allows manually polling network, below)
// reset ioservice (cancels all timers and allows manually polling network, below)
m_ioService.reset();
DEV_GUARDED(x_timers)
m_timers.clear();
// shutdown acceptor
m_tcp4Acceptor.cancel();
if (m_tcp4Acceptor.is_open())
@ -170,15 +184,13 @@ void Host::doneWorking()
// disconnect pending handshake, before peers, as a handshake may create a peer
for (unsigned n = 0;; n = 0)
{
{
Guard l(x_connecting);
for (auto i: m_connecting)
DEV_GUARDED(x_connecting)
for (auto const& i: m_connecting)
if (auto h = i.lock())
{
h->cancel();
n++;
}
}
if (!n)
break;
m_ioService.poll();
@ -187,8 +199,7 @@ void Host::doneWorking()
// disconnect peers
for (unsigned n = 0;; n = 0)
{
{
RecursiveGuard l(x_sessions);
DEV_RECURSIVE_GUARDED(x_sessions)
for (auto i: m_sessions)
if (auto p = i.second.lock())
if (p->isConnected())
@ -196,7 +207,6 @@ void Host::doneWorking()
p->disconnect(ClientQuit);
n++;
}
}
if (!n)
break;
@ -465,6 +475,9 @@ void Host::addNode(NodeId const& _node, NodeIPEndpoint const& _endpoint)
void Host::requirePeer(NodeId const& _n, NodeIPEndpoint const& _endpoint)
{
if (!m_run)
return;
Node node(_n, _endpoint, true);
if (_n)
{
@ -482,22 +495,21 @@ void Host::requirePeer(NodeId const& _n, NodeIPEndpoint const& _endpoint)
p.reset(new Peer(node));
m_peers[_n] = p;
}
connect(p);
}
else if (m_nodeTable)
{
shared_ptr<boost::asio::deadline_timer> t(new boost::asio::deadline_timer(m_ioService));
m_timers.push_back(t);
m_nodeTable->addNode(node);
shared_ptr<boost::asio::deadline_timer> t(new boost::asio::deadline_timer(m_ioService));
t->expires_from_now(boost::posix_time::milliseconds(600));
t->async_wait([this, _n](boost::system::error_code const& _ec)
{
if (!_ec && m_nodeTable)
// FIXME RACE CONDITION (use weak_ptr or mutex).
if (auto n = m_nodeTable->node(_n))
requirePeer(n.id, n.endpoint);
if (!_ec)
if (m_nodeTable)
if (auto n = m_nodeTable->node(_n))
requirePeer(n.id, n.endpoint);
});
DEV_GUARDED(x_timers)
m_timers.push_back(t);
}
}
@ -512,8 +524,6 @@ void Host::connect(std::shared_ptr<Peer> const& _p)
{
if (!m_run)
return;
_p->m_lastAttempted = std::chrono::system_clock::now();
if (havePeerSession(_p->id))
{
@ -539,6 +549,8 @@ void Host::connect(std::shared_ptr<Peer> const& _p)
m_pendingPeerConns.insert(nptr);
}
_p->m_lastAttempted = std::chrono::system_clock::now();
bi::tcp::endpoint ep(_p->endpoint);
clog(NetConnect) << "Attempting connection to node" << _p->id << "@" << ep << "from" << id();
auto socket = make_shared<RLPXSocket>(new bi::tcp::socket(m_ioService));
@ -615,21 +627,13 @@ void Host::run(boost::system::error_code const&)
m_nodeTable->processEvents();
// cleanup zombies
{
Guard l(x_connecting);
m_connecting.remove_if([](std::weak_ptr<RLPXHandshake> h){ return h.lock(); });
}
{
Guard l(x_timers);
DEV_GUARDED(x_connecting);
m_connecting.remove_if([](std::weak_ptr<RLPXHandshake> h){ return h.expired(); });
DEV_GUARDED(x_timers)
m_timers.remove_if([](std::shared_ptr<boost::asio::deadline_timer> t)
{
return t->expires_from_now().total_milliseconds() > 0;
return t->expires_from_now().total_milliseconds() < 0;
});
}
for (auto p: m_sessions)
if (auto pp = p.second.lock())
pp->serviceNodesRequest();
keepAlivePeers();
@ -666,13 +670,9 @@ void Host::run(boost::system::error_code const&)
pendingCount = m_pendingPeerConns.size();
int openSlots = m_idealPeerCount - peerCount() - pendingCount + reqConn;
if (openSlots > 0)
{
for (auto p: toConnect)
if (!p->required && openSlots--)
connect(p);
m_nodeTable->discover();
}
}
auto runcb = [this](boost::system::error_code const& error) { run(error); };

103
libp2p/NodeTable.cpp

@ -43,33 +43,31 @@ NodeEntry::NodeEntry(NodeId const& _src, Public const& _pubk, NodeIPEndpoint con
NodeTable::NodeTable(ba::io_service& _io, KeyPair const& _alias, NodeIPEndpoint const& _endpoint, bool _enabled):
m_node(Node(_alias.pub(), _endpoint)),
m_secret(_alias.sec()),
m_io(_io),
m_socket(new NodeSocket(m_io, *this, (bi::udp::endpoint)m_node.endpoint)),
m_socket(new NodeSocket(_io, *this, (bi::udp::endpoint)m_node.endpoint)),
m_socketPointer(m_socket.get()),
m_bucketRefreshTimer(m_io),
m_evictionCheckTimer(m_io),
m_disabled(!_enabled)
m_timers(_io)
{
for (unsigned i = 0; i < s_bins; i++)
{
m_state[i].distance = i;
m_state[i].modified = chrono::steady_clock::now() - chrono::seconds(1);
}
if (!m_disabled)
if (!_enabled)
return;
try
{
m_socketPointer->connect();
doRefreshBuckets(boost::system::error_code());
doDiscovery();
}
catch (std::exception const& _e)
{
clog(NetWarn) << "Exception connecting NodeTable socket: " << _e.what();
clog(NetWarn) << "Discovery disabled.";
}
}
NodeTable::~NodeTable()
{
// Cancel scheduled tasks to ensure.
m_evictionCheckTimer.cancel();
m_bucketRefreshTimer.cancel();
// Disconnect socket so that deallocation is safe.
m_timers.stop();
m_socketPointer->disconnect();
}
@ -117,16 +115,6 @@ shared_ptr<NodeEntry> NodeTable::addNode(Node const& _node, NodeRelation _relati
return ret;
}
void NodeTable::discover()
{
static chrono::steady_clock::time_point s_lastDiscover = chrono::steady_clock::now() - std::chrono::seconds(30);
if (chrono::steady_clock::now() > s_lastDiscover + std::chrono::seconds(30))
{
s_lastDiscover = chrono::steady_clock::now();
discover(m_node.id);
}
}
list<NodeId> NodeTable::nodes() const
{
list<NodeId> nodes;
@ -164,14 +152,17 @@ shared_ptr<NodeEntry> NodeTable::nodeEntry(NodeId _id)
return m_nodes.count(_id) ? m_nodes[_id] : shared_ptr<NodeEntry>();
}
void NodeTable::discover(NodeId _node, unsigned _round, shared_ptr<set<shared_ptr<NodeEntry>>> _tried)
void NodeTable::doDiscover(NodeId _node, unsigned _round, shared_ptr<set<shared_ptr<NodeEntry>>> _tried)
{
if (!m_socketPointer->isOpen() || _round == s_maxSteps)
// NOTE: ONLY called by doDiscovery!
if (!m_socketPointer->isOpen())
return;
if (_round == s_maxSteps)
{
clog(NodeTableEvent) << "Terminating discover after " << _round << " rounds.";
doDiscovery();
return;
}
else if (!_round && !_tried)
@ -195,6 +186,7 @@ void NodeTable::discover(NodeId _node, unsigned _round, shared_ptr<set<shared_pt
if (tried.empty())
{
clog(NodeTableEvent) << "Terminating discover after " << _round << " rounds.";
doDiscovery();
return;
}
@ -203,14 +195,12 @@ void NodeTable::discover(NodeId _node, unsigned _round, shared_ptr<set<shared_pt
_tried->insert(tried.front());
tried.pop_front();
}
auto self(shared_from_this());
m_evictionCheckTimer.expires_from_now(boost::posix_time::milliseconds(c_reqTimeout.count() * 2));
m_evictionCheckTimer.async_wait([this, self, _node, _round, _tried](boost::system::error_code const& _ec)
m_timers.schedule(c_reqTimeout.count() * 2, [this, _node, _round, _tried](boost::system::error_code const& _ec)
{
if (_ec)
return;
discover(_node, _round + 1, _tried);
clog(NodeTableWarn) << "Discovery timer canceled!";
doDiscover(_node, _round + 1, _tried);
});
}
@ -310,15 +300,15 @@ void NodeTable::evict(shared_ptr<NodeEntry> _leastSeen, shared_ptr<NodeEntry> _n
if (!m_socketPointer->isOpen())
return;
unsigned ec;
unsigned evicts;
DEV_GUARDED(x_evictions)
{
m_evictions.push_back(EvictionTimeout(make_pair(_leastSeen->id,chrono::steady_clock::now()), _new->id));
ec = m_evictions.size();
evicts = m_evictions.size();
}
if (ec == 1)
doCheckEvictions(boost::system::error_code());
if (evicts == 1)
doCheckEvictions();
ping(_leastSeen.get());
}
@ -348,14 +338,15 @@ void NodeTable::noteActiveNode(Public const& _pubk, bi::udp::endpoint const& _en
if (s.nodes.size() >= s_bucketSize)
{
if (removed)
clog(NodeTableWarn) << "DANGER: Bucket overflow when swapping node position.";
// It's only contested iff nodeentry exists
contested = s.nodes.front().lock();
if (!contested)
{
s.nodes.pop_front();
s.nodes.push_back(node);
s.touch();
if (!removed && m_nodeEventHandler)
m_nodeEventHandler->appendEvent(node->id, NodeEntryAdded);
}
@ -363,8 +354,6 @@ void NodeTable::noteActiveNode(Public const& _pubk, bi::udp::endpoint const& _en
else
{
s.nodes.push_back(node);
s.touch();
if (!removed && m_nodeEventHandler)
m_nodeEventHandler->appendEvent(node->id, NodeEntryAdded);
}
@ -576,14 +565,9 @@ void NodeTable::onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytes
}
}
void NodeTable::doCheckEvictions(boost::system::error_code const& _ec)
void NodeTable::doCheckEvictions()
{
if (_ec || !m_socketPointer->isOpen())
return;
auto self(shared_from_this());
m_evictionCheckTimer.expires_from_now(c_evictionCheckInterval);
m_evictionCheckTimer.async_wait([this, self](boost::system::error_code const& _ec)
m_timers.schedule(c_evictionCheckInterval.count(), [this](boost::system::error_code const& _ec)
{
if (_ec)
return;
@ -605,28 +589,23 @@ void NodeTable::doCheckEvictions(boost::system::error_code const& _ec)
dropNode(n);
if (evictionsRemain)
doCheckEvictions(boost::system::error_code());
doCheckEvictions();
});
}
void NodeTable::doRefreshBuckets(boost::system::error_code const& _ec)
void NodeTable::doDiscovery()
{
if (_ec)
return;
clog(NodeTableEvent) << "refreshing buckets";
bool connected = m_socketPointer->isOpen();
if (connected)
m_timers.schedule(c_bucketRefresh.count(), [this](boost::system::error_code const& ec)
{
if (ec)
return;
clog(NodeTableEvent) << "performing random discovery";
NodeId randNodeId;
crypto::Nonce::get().ref().copyTo(randNodeId.ref().cropped(0, h256::size));
crypto::Nonce::get().ref().copyTo(randNodeId.ref().cropped(h256::size, h256::size));
discover(randNodeId);
}
auto runcb = [this](boost::system::error_code const& error) { doRefreshBuckets(error); };
m_bucketRefreshTimer.expires_from_now(boost::posix_time::milliseconds(c_bucketRefresh.count()));
m_bucketRefreshTimer.async_wait(runcb);
doDiscover(randNodeId);
});
}
void PingNode::streamRLP(RLPStream& _s) const

26
libp2p/NodeTable.h

@ -128,6 +128,7 @@ class NodeTable: UDPSocketEvents, public std::enable_shared_from_this<NodeTable>
public:
enum NodeRelation { Unknown = 0, Known };
enum DiscoverType { Random = 0 };
/// Constructor requiring host for I/O, credentials, and IP Address and port to listen on.
NodeTable(ba::io_service& _io, KeyPair const& _alias, NodeIPEndpoint const& _endpoint, bool _enabled = true);
@ -145,9 +146,6 @@ public:
/// Add node. Node will be pinged and empty shared_ptr is returned if node has never been seen or NodeId is empty.
std::shared_ptr<NodeEntry> addNode(Node const& _node, NodeRelation _relation = NodeRelation::Unknown);
/// To be called when node table is empty. Runs node discovery with m_node.id as the target in order to populate node-table.
void discover();
/// Returns list of node ids active in node table.
std::list<NodeId> nodes() const;
@ -184,16 +182,14 @@ private:
/// Intervals
/* todo: replace boost::posix_time; change constants to upper camelcase */
boost::posix_time::milliseconds const c_evictionCheckInterval = boost::posix_time::milliseconds(75); ///< Interval at which eviction timeouts are checked.
std::chrono::milliseconds const c_evictionCheckInterval = std::chrono::milliseconds(75); ///< Interval at which eviction timeouts are checked.
std::chrono::milliseconds const c_reqTimeout = std::chrono::milliseconds(300); ///< How long to wait for requests (evict, find iterations).
std::chrono::milliseconds const c_bucketRefresh = std::chrono::milliseconds(7200); ///< Refresh interval prevents bucket from becoming stale. [Kademlia]
struct NodeBucket
{
unsigned distance;
TimePoint modified;
std::list<std::weak_ptr<NodeEntry>> nodes;
void touch() { modified = std::chrono::steady_clock::now(); }
};
/// Used to ping endpoint.
@ -210,7 +206,7 @@ private:
/// Used to discovery nodes on network which are close to the given target.
/// Sends s_alpha concurrent requests to nodes nearest to target, for nodes nearest to target, up to s_maxSteps rounds.
void discover(NodeId _target, unsigned _round = 0, std::shared_ptr<std::set<std::shared_ptr<NodeEntry>>> _tried = std::shared_ptr<std::set<std::shared_ptr<NodeEntry>>>());
void doDiscover(NodeId _target, unsigned _round = 0, std::shared_ptr<std::set<std::shared_ptr<NodeEntry>>> _tried = std::shared_ptr<std::set<std::shared_ptr<NodeEntry>>>());
/// Returns nodes from node table which are closest to target.
std::vector<std::shared_ptr<NodeEntry>> nearestNodeEntries(NodeId _target);
@ -240,10 +236,10 @@ private:
/// Tasks
/// Called by evict() to ensure eviction check is scheduled to run and terminates when no evictions remain. Asynchronous.
void doCheckEvictions(boost::system::error_code const& _ec);
void doCheckEvictions();
/// Purges and pings nodes for any buckets which haven't been touched for c_bucketRefresh seconds.
void doRefreshBuckets(boost::system::error_code const& _ec);
/// Looks up a random node at @c_bucketRefresh interval.
void doDiscovery();
std::unique_ptr<NodeTableEventHandler> m_nodeEventHandler; ///< Event handler for node events.
@ -251,7 +247,7 @@ private:
Secret m_secret; ///< This nodes secret key.
mutable Mutex x_nodes; ///< LOCK x_state first if both locks are required. Mutable for thread-safe copy in nodes() const.
std::unordered_map<NodeId, std::shared_ptr<NodeEntry>> m_nodes; ///< Nodes
std::unordered_map<NodeId, std::shared_ptr<NodeEntry>> m_nodes; ///< Known Node Endpoints
mutable Mutex x_state; ///< LOCK x_state first if both x_nodes and x_state locks are required.
std::array<NodeBucket, s_bins> m_state; ///< State of p2p node network.
@ -264,15 +260,11 @@ private:
Mutex x_findNodeTimeout;
std::list<NodeIdTimePoint> m_findNodeTimeout; ///< Timeouts for pending Ping and FindNode requests.
ba::io_service& m_io; ///< Used by bucket refresh timer.
std::shared_ptr<NodeSocket> m_socket; ///< Shared pointer for our UDPSocket; ASIO requires shared_ptr.
NodeSocket* m_socketPointer; ///< Set to m_socket.get(). Socket is created in constructor and disconnected in destructor to ensure access to pointer is safe.
boost::asio::deadline_timer m_bucketRefreshTimer; ///< Timer which schedules and enacts bucket refresh.
boost::asio::deadline_timer m_evictionCheckTimer; ///< Timer for handling node evictions.
bool m_disabled; ///< Disable discovery.
DeadlineOps m_timers;
};
inline std::ostream& operator<<(std::ostream& _out, NodeTable const& _nodeTable)

40
test/libp2p/net.cpp

@ -308,35 +308,17 @@ BOOST_AUTO_TEST_CASE(kademlia)
if (test::Options::get().nonetwork)
return;
// Not yet a 'real' test.
TestNodeTableHost node(8);
node.start();
node.nodeTable->discover(); // ideally, joining with empty node table logs warning we can check for
node.setup();
node.populate();
// clog << "NodeTable:\n" << *node.nodeTable.get() << endl;
node.populateAll();
// clog << "NodeTable:\n" << *node.nodeTable.get() << endl;
auto nodes = node.nodeTable->nodes();
nodes.sort();
node.nodeTable->reset();
// clog << "NodeTable:\n" << *node.nodeTable.get() << endl;
node.populate(1);
// clog << "NodeTable:\n" << *node.nodeTable.get() << endl;
node.nodeTable->discover();
this_thread::sleep_for(chrono::milliseconds(2000));
// clog << "NodeTable:\n" << *node.nodeTable.get() << endl;
BOOST_REQUIRE_EQUAL(node.nodeTable->count(), 8);
auto netNodes = node.nodeTable->nodes();
netNodes.sort();
}
BOOST_AUTO_TEST_CASE(udpOnce)
@ -355,6 +337,28 @@ BOOST_AUTO_TEST_SUITE_END()
BOOST_AUTO_TEST_SUITE(netTypes)
BOOST_AUTO_TEST_CASE(deadlineTimer)
{
ba::io_service io;
ba::deadline_timer t(io);
bool start = false;
boost::system::error_code ec;
std::atomic<unsigned> fired(0);
thread thread([&](){ while(!start) this_thread::sleep_for(chrono::milliseconds(10)); io.run(); });
t.expires_from_now(boost::posix_time::milliseconds(200));
start = true;
t.async_wait([&](boost::system::error_code const& _ec){ ec = _ec; fired++; });
BOOST_REQUIRE_NO_THROW(t.wait());
this_thread::sleep_for(chrono::milliseconds(250));
auto expire = t.expires_from_now().total_milliseconds();
BOOST_REQUIRE(expire <= 0);
BOOST_REQUIRE(fired == 1);
BOOST_REQUIRE(!ec);
io.stop();
thread.join();
}
BOOST_AUTO_TEST_CASE(unspecifiedNode)
{
if (test::Options::get().nonetwork)

Loading…
Cancel
Save