Browse Source

Threading fixes & cleanup.

cl-refactor
Gav Wood 10 years ago
parent
commit
fda1fe5d37
  1. 78
      libp2p/NodeTable.cpp
  2. 4
      libp2p/NodeTable.h

78
libp2p/NodeTable.cpp

@ -127,20 +127,19 @@ void NodeTable::discover()
list<NodeId> NodeTable::nodes() const list<NodeId> NodeTable::nodes() const
{ {
list<NodeId> nodes; list<NodeId> nodes;
Guard l(x_nodes); DEV_GUARDED(x_nodes)
for (auto& i: m_nodes) for (auto& i: m_nodes)
nodes.push_back(i.second->id); nodes.push_back(i.second->id);
return move(nodes); return move(nodes);
} }
list<NodeEntry> NodeTable::snapshot() const list<NodeEntry> NodeTable::snapshot() const
{ {
list<NodeEntry> ret; list<NodeEntry> ret;
Guard l(x_state); DEV_GUARDED(x_state)
for (auto s: m_state) for (auto const& s: m_state)
for (auto np: s.nodes) for (auto const& np: s.nodes)
if (auto n = np.lock()) if (auto n = np.lock())
if (!!n)
ret.push_back(*n); ret.push_back(*n);
return move(ret); return move(ret);
} }
@ -151,8 +150,7 @@ Node NodeTable::node(NodeId const& _id)
if (m_nodes.count(_id)) if (m_nodes.count(_id))
{ {
auto entry = m_nodes[_id]; auto entry = m_nodes[_id];
Node n(_id, entry->endpoint, entry->required); return Node(_id, entry->endpoint, entry->required);
return move(n);
} }
return UnspecifiedNode; return UnspecifiedNode;
} }
@ -173,7 +171,7 @@ void NodeTable::discover(NodeId _node, unsigned _round, shared_ptr<set<shared_pt
clog(NodeTableEvent) << "Terminating discover after " << _round << " rounds."; clog(NodeTableEvent) << "Terminating discover after " << _round << " rounds.";
return; return;
} }
else if(!_round && !_tried) else if (!_round && !_tried)
// initialized _tried on first round // initialized _tried on first round
_tried.reset(new set<shared_ptr<NodeEntry>>()); _tried.reset(new set<shared_ptr<NodeEntry>>());
@ -228,7 +226,7 @@ vector<shared_ptr<NodeEntry>> NodeTable::nearestNodeEntries(NodeId _target)
while (head != tail && head < s_bins && count < s_bucketSize) while (head != tail && head < s_bins && count < s_bucketSize)
{ {
Guard l(x_state); Guard l(x_state);
for (auto n: m_state[head].nodes) for (auto const& n: m_state[head].nodes)
if (auto p = n.lock()) if (auto p = n.lock())
{ {
if (count < s_bucketSize) if (count < s_bucketSize)
@ -238,7 +236,7 @@ vector<shared_ptr<NodeEntry>> NodeTable::nearestNodeEntries(NodeId _target)
} }
if (count < s_bucketSize && tail) if (count < s_bucketSize && tail)
for (auto n: m_state[tail].nodes) for (auto const& n: m_state[tail].nodes)
if (auto p = n.lock()) if (auto p = n.lock())
{ {
if (count < s_bucketSize) if (count < s_bucketSize)
@ -255,7 +253,7 @@ vector<shared_ptr<NodeEntry>> NodeTable::nearestNodeEntries(NodeId _target)
while (head < s_bins && count < s_bucketSize) while (head < s_bins && count < s_bucketSize)
{ {
Guard l(x_state); Guard l(x_state);
for (auto n: m_state[head].nodes) for (auto const& n: m_state[head].nodes)
if (auto p = n.lock()) if (auto p = n.lock())
{ {
if (count < s_bucketSize) if (count < s_bucketSize)
@ -269,7 +267,7 @@ vector<shared_ptr<NodeEntry>> NodeTable::nearestNodeEntries(NodeId _target)
while (tail > 0 && count < s_bucketSize) while (tail > 0 && count < s_bucketSize)
{ {
Guard l(x_state); Guard l(x_state);
for (auto n: m_state[tail].nodes) for (auto const& n: m_state[tail].nodes)
if (auto p = n.lock()) if (auto p = n.lock())
{ {
if (count < s_bucketSize) if (count < s_bucketSize)
@ -282,7 +280,7 @@ vector<shared_ptr<NodeEntry>> NodeTable::nearestNodeEntries(NodeId _target)
vector<shared_ptr<NodeEntry>> ret; vector<shared_ptr<NodeEntry>> ret;
for (auto& nodes: found) for (auto& nodes: found)
for (auto n: nodes.second) for (auto const& n: nodes.second)
if (ret.size() < s_bucketSize && !!n->endpoint && n->endpoint.isAllowed()) if (ret.size() < s_bucketSize && !!n->endpoint && n->endpoint.isAllowed())
ret.push_back(n); ret.push_back(n);
return move(ret); return move(ret);
@ -306,12 +304,15 @@ void NodeTable::evict(shared_ptr<NodeEntry> _leastSeen, shared_ptr<NodeEntry> _n
if (!m_socketPointer->isOpen()) if (!m_socketPointer->isOpen())
return; return;
unsigned ec;
DEV_GUARDED(x_evictions)
{ {
Guard l(x_evictions);
m_evictions.push_back(EvictionTimeout(make_pair(_leastSeen->id,chrono::steady_clock::now()), _new->id)); m_evictions.push_back(EvictionTimeout(make_pair(_leastSeen->id,chrono::steady_clock::now()), _new->id));
if (m_evictions.size() == 1) ec = m_evictions.size();
doCheckEvictions(boost::system::error_code());
} }
if (ec == 1)
doCheckEvictions(boost::system::error_code());
ping(_leastSeen.get()); ping(_leastSeen.get());
} }
@ -428,24 +429,27 @@ void NodeTable::onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytes
Pong in = Pong::fromBytesConstRef(_from, rlpBytes); Pong in = Pong::fromBytesConstRef(_from, rlpBytes);
// whenever a pong is received, check if it's in m_evictions // whenever a pong is received, check if it's in m_evictions
Guard le(x_evictions); bool found = false;
bool evictionEntry = false; EvictionTimeout evictionEntry;
for (auto it = m_evictions.begin(); it != m_evictions.end(); it++) DEV_GUARDED(x_evictions)
if (it->first.first == nodeid && it->first.second > std::chrono::steady_clock::now()) for (auto it = m_evictions.begin(); it != m_evictions.end();)
{ if (it->first.first == nodeid && it->first.second > std::chrono::steady_clock::now())
evictionEntry = true; {
if (auto n = nodeEntry(it->second)) found = true;
dropNode(n); evictionEntry = *it;
m_evictions.erase(it);
if (auto n = nodeEntry(it->first.first)) break;
n->pending = false; }
if (found)
it = m_evictions.erase(it); {
} if (auto n = nodeEntry(evictionEntry.second))
dropNode(n);
// if not, check if it's known/pending or a pubk discovery ping if (auto n = nodeEntry(evictionEntry.first.first))
if (!evictionEntry) n->pending = false;
}
else
{ {
// if not, check if it's known/pending or a pubk discovery ping
if (auto n = nodeEntry(nodeid)) if (auto n = nodeEntry(nodeid))
n->pending = false; n->pending = false;
else else
@ -584,7 +588,7 @@ void NodeTable::doCheckEvictions(boost::system::error_code const& _ec)
if (chrono::steady_clock::now() - e.first.second > c_reqTimeout) if (chrono::steady_clock::now() - e.first.second > c_reqTimeout)
if (m_nodes.count(e.second)) if (m_nodes.count(e.second))
drop.push_back(m_nodes[e.second]); drop.push_back(m_nodes[e.second]);
evictionsRemain = m_evictions.size() - drop.size() > 0; evictionsRemain = (m_evictions.size() - drop.size() > 0);
} }
drop.unique(); drop.unique();

4
libp2p/NodeTable.h

@ -45,10 +45,12 @@ struct NodeEntry: public Node
bool pending = true; ///< Node will be ignored until Pong is received bool pending = true; ///< Node will be ignored until Pong is received
}; };
enum NodeTableEventType { enum NodeTableEventType
{
NodeEntryAdded, NodeEntryAdded,
NodeEntryDropped NodeEntryDropped
}; };
class NodeTable; class NodeTable;
class NodeTableEventHandler class NodeTableEventHandler
{ {

Loading…
Cancel
Save