Browse Source

reverted changed in NodeTable::nearestNodeEntries

cl-refactor
arkpar 10 years ago
parent
commit
f52f8682c4
  1. 107
      libp2p/NodeTable.cpp

107
libp2p/NodeTable.cpp

@ -1,16 +1,16 @@
/*
This file is part of cpp-ethereum.
cpp-ethereum is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
cpp-ethereum is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with cpp-ethereum. If not, see <http://www.gnu.org/licenses/>.
*/
@ -54,17 +54,17 @@ NodeTable::NodeTable(ba::io_service& _io, KeyPair const& _alias, NodeIPEndpoint
m_state[i].distance = i;
m_state[i].modified = chrono::steady_clock::now() - chrono::seconds(1);
}
m_socketPointer->connect();
doRefreshBuckets(boost::system::error_code());
}
NodeTable::~NodeTable()
{
// Cancel scheduled tasks to ensure.
m_evictionCheckTimer.cancel();
m_bucketRefreshTimer.cancel();
// Disconnect socket so that deallocation is safe.
m_socketPointer->disconnect();
}
@ -85,10 +85,10 @@ shared_ptr<NodeEntry> NodeTable::addNode(Node const& _node, NodeRelation _relati
noteActiveNode(_node.id, _node.endpoint);
return ret;
}
if (!_node.endpoint)
return move(shared_ptr<NodeEntry>());
// ping address to recover nodeid if nodeid is empty
if (!_node.id)
{
@ -100,13 +100,13 @@ shared_ptr<NodeEntry> NodeTable::addNode(Node const& _node, NodeRelation _relati
ping(_node.endpoint);
return move(shared_ptr<NodeEntry>());
}
{
Guard ln(x_nodes);
if (m_nodes.count(_node.id))
return m_nodes[_node.id];
}
shared_ptr<NodeEntry> ret(new NodeEntry(m_node, _node.id, _node.endpoint));
m_nodes[_node.id] = ret;
clog(NodeTableConnect) << "addNode pending for" << _node.endpoint;
@ -167,7 +167,7 @@ void NodeTable::discover(NodeId _node, unsigned _round, shared_ptr<set<shared_pt
{
if (!m_socketPointer->isOpen() || _round == s_maxSteps)
return;
if (_round == s_maxSteps)
{
clog(NodeTableEvent) << "Terminating discover after " << _round << " rounds.";
@ -176,7 +176,7 @@ void NodeTable::discover(NodeId _node, unsigned _round, shared_ptr<set<shared_pt
else if(!_round && !_tried)
// initialized _tried on first round
_tried.reset(new set<shared_ptr<NodeEntry>>());
auto nearest = nearestNodeEntries(_node);
list<shared_ptr<NodeEntry>> tried;
for (unsigned i = 0; i < nearest.size() && tried.size() < s_alpha; i++)
@ -189,19 +189,19 @@ void NodeTable::discover(NodeId _node, unsigned _round, shared_ptr<set<shared_pt
m_findNodeTimeout.push_back(make_pair(r->id, chrono::steady_clock::now()));
m_socketPointer->send(p);
}
if (tried.empty())
{
clog(NodeTableEvent) << "Terminating discover after " << _round << " rounds.";
return;
}
while (!tried.empty())
{
_tried->insert(tried.front());
tried.pop_front();
}
auto self(shared_from_this());
m_evictionCheckTimer.expires_from_now(boost::posix_time::milliseconds(c_reqTimeout.count() * 2));
m_evictionCheckTimer.async_wait([this, self, _node, _round, _tried](boost::system::error_code const& _ec)
@ -218,10 +218,10 @@ vector<shared_ptr<NodeEntry>> NodeTable::nearestNodeEntries(NodeId _target)
static unsigned lastBin = s_bins - 1;
unsigned head = distance(m_node.id, _target);
unsigned tail = head == 0 ? lastBin : (head - 1) % s_bins;
unordered_multimap<unsigned, shared_ptr<NodeEntry>> found;
map<unsigned, list<shared_ptr<NodeEntry>>> found;
unsigned count = 0;
// if d is 0, then we roll look forward, if last, we reverse, else, spread from d
if (head > 1 && tail != lastBin)
while (head != tail && head < s_bins && count < s_bucketSize)
@ -231,17 +231,17 @@ vector<shared_ptr<NodeEntry>> NodeTable::nearestNodeEntries(NodeId _target)
if (auto p = n.lock())
{
if (count < s_bucketSize)
found.insert(make_pair(distance(_target, p->id), p));
found[distance(_target, p->id)].push_back(p);
else
break;
}
if (count < s_bucketSize && tail)
for (auto n: m_state[tail].nodes)
if (auto p = n.lock())
{
if (count < s_bucketSize)
found.insert(make_pair(distance(_target, p->id), p));
found[distance(_target, p->id)].push_back(p);
else
break;
}
@ -258,7 +258,7 @@ vector<shared_ptr<NodeEntry>> NodeTable::nearestNodeEntries(NodeId _target)
if (auto p = n.lock())
{
if (count < s_bucketSize)
found.insert(make_pair(distance(_target, p->id), p));
found[distance(_target, p->id)].push_back(p);
else
break;
}
@ -272,17 +272,18 @@ vector<shared_ptr<NodeEntry>> NodeTable::nearestNodeEntries(NodeId _target)
if (auto p = n.lock())
{
if (count < s_bucketSize)
found.insert(make_pair(distance(_target, p->id), p));
found[distance(_target, p->id)].push_back(p);
else
break;
}
tail--;
}
vector<shared_ptr<NodeEntry>> ret;
for (auto n: found)
if (ret.size() < s_bucketSize && !!n.second->endpoint && n.second->endpoint.isAllowed())
ret.push_back(n.second);
for (auto& nodes: found)
for (auto n: nodes.second)
if (ret.size() < s_bucketSize && !!n->endpoint && n->endpoint.isAllowed())
ret.push_back(n);
return move(ret);
}
@ -303,7 +304,7 @@ void NodeTable::evict(shared_ptr<NodeEntry> _leastSeen, shared_ptr<NodeEntry> _n
{
if (!m_socketPointer->isOpen())
return;
{
Guard l(x_evictions);
m_evictions.push_back(EvictionTimeout(make_pair(_leastSeen->id,chrono::steady_clock::now()), _new->id));
@ -324,7 +325,7 @@ void NodeTable::noteActiveNode(Public const& _pubk, bi::udp::endpoint const& _en
clog(NodeTableConnect) << "Noting active node:" << _pubk << _endpoint.address().to_string() << ":" << _endpoint.port();
node->endpoint.address = _endpoint.address();
node->endpoint.udpPort = _endpoint.port();
shared_ptr<NodeEntry> contested;
{
Guard l(x_state);
@ -336,7 +337,7 @@ void NodeTable::noteActiveNode(Public const& _pubk, bi::udp::endpoint const& _en
removed = true;
return removed;
});
if (s.nodes.size() >= s_bucketSize)
{
// It's only contested iff nodeentry exists
@ -346,7 +347,7 @@ void NodeTable::noteActiveNode(Public const& _pubk, bi::udp::endpoint const& _en
s.nodes.pop_front();
s.nodes.push_back(node);
s.touch();
if (!removed && m_nodeEventHandler)
m_nodeEventHandler->appendEvent(node->id, NodeEntryAdded);
}
@ -355,12 +356,12 @@ void NodeTable::noteActiveNode(Public const& _pubk, bi::udp::endpoint const& _en
{
s.nodes.push_back(node);
s.touch();
if (!removed && m_nodeEventHandler)
m_nodeEventHandler->appendEvent(node->id, NodeEntryAdded);
}
}
if (contested)
evict(contested, node);
}
@ -374,7 +375,7 @@ void NodeTable::dropNode(shared_ptr<NodeEntry> _n)
NodeBucket& s = bucket_UNSAFE(_n.get());
s.nodes.remove_if([&_n](weak_ptr<NodeEntry> n) { return n.lock() == _n; });
}
// notify host
clog(NodeTableUpdate) << "p2p.nodes.drop " << _n->id;
if (m_nodeEventHandler)
@ -394,7 +395,7 @@ void NodeTable::onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytes
clog(NodeTableTriviaSummary) << "Invalid message size from " << _from.address().to_string() << ":" << _from.port();
return;
}
bytesConstRef hashedBytes(_packet.cropped(h256::size, _packet.size() - h256::size));
h256 hashSigned(sha3(hashedBytes));
if (!_packet.cropped(0, h256::size).contentsEqual(hashSigned.asBytes()))
@ -402,11 +403,11 @@ void NodeTable::onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytes
clog(NodeTableTriviaSummary) << "Invalid message hash from " << _from.address().to_string() << ":" << _from.port();
return;
}
bytesConstRef signedBytes(hashedBytes.cropped(Signature::size, hashedBytes.size() - Signature::size));
// todo: verify sig via known-nodeid and MDC
bytesConstRef sigBytes(_packet.cropped(h256::size, Signature::size));
Public nodeid(dev::recover(*(Signature const*)sigBytes.data(), sha3(signedBytes)));
if (!nodeid)
@ -414,7 +415,7 @@ void NodeTable::onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytes
clog(NodeTableTriviaSummary) << "Invalid message signature from " << _from.address().to_string() << ":" << _from.port();
return;
}
unsigned packetType = signedBytes[0];
bytesConstRef rlpBytes(_packet.cropped(h256::size + Signature::size + 1));
RLP rlp(rlpBytes);
@ -424,7 +425,7 @@ void NodeTable::onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytes
case Pong::type:
{
Pong in = Pong::fromBytesConstRef(_from, rlpBytes);
// whenever a pong is received, check if it's in m_evictions
Guard le(x_evictions);
bool evictionEntry = false;
@ -434,13 +435,13 @@ void NodeTable::onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytes
evictionEntry = true;
if (auto n = nodeEntry(it->second))
dropNode(n);
if (auto n = nodeEntry(it->first.first))
n->pending = false;
it = m_evictions.erase(it);
}
// if not, check if it's known/pending or a pubk discovery ping
if (!evictionEntry)
{
@ -458,16 +459,16 @@ void NodeTable::onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytes
else
return; // unsolicited pong; don't note node as active
}
// update our endpoint address and UDP port
if ((!m_node.endpoint || !m_node.endpoint.isAllowed()) && isPublicAddress(in.destination.address))
m_node.endpoint.address = in.destination.address;
m_node.endpoint.udpPort = in.destination.udpPort;
clog(NodeTableConnect) << "PONG from " << nodeid << _from;
break;
}
case Neighbours::type:
{
bool expected = false;
@ -480,13 +481,13 @@ void NodeTable::onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytes
return true;
return false;
});
if (!expected)
{
clog(NetConnect) << "Dropping unsolicited neighbours packet from " << _from.address();
break;
}
Neighbours in = Neighbours::fromBytesConstRef(_from, rlpBytes);
for (auto n: in.neighbours)
addNode(Node(n.node, n.endpoint));
@ -530,13 +531,13 @@ void NodeTable::onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytes
else
return;
}
if (RLPXDatagramFace::secondsSinceEpoch() > in.ts)
{
clog(NodeTableTriviaSummary) << "Received expired PingNode from " << _from.address().to_string() << ":" << _from.port();
return;
}
in.source.address = _from.address();
in.source.udpPort = _from.port();
addNode(Node(nodeid, in.source));
@ -546,7 +547,7 @@ void NodeTable::onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytes
m_socketPointer->send(p);
break;
}
default:
clog(NodeTableWarn) << "Invalid message, " << hex << packetType << ", received from " << _from.address().to_string() << ":" << dec << _from.port();
return;
@ -571,7 +572,7 @@ void NodeTable::doCheckEvictions(boost::system::error_code const& _ec)
{
if (_ec)
return;
bool evictionsRemain = false;
list<shared_ptr<NodeEntry>> drop;
{
@ -583,11 +584,11 @@ void NodeTable::doCheckEvictions(boost::system::error_code const& _ec)
drop.push_back(m_nodes[e.second]);
evictionsRemain = m_evictions.size() - drop.size() > 0;
}
drop.unique();
for (auto n: drop)
dropNode(n);
if (evictionsRemain)
doCheckEvictions(boost::system::error_code());
});

Loading…
Cancel
Save