Browse Source

Merge pull request #1851 from subtly/discovery

fix and add guards
cl-refactor
subtly 10 years ago
parent
commit
2c60895d80
  1. 16
      libp2p/NodeTable.cpp
  2. 4
      libp2p/UDP.cpp
  3. 25
      libp2p/UDP.h

16
libp2p/NodeTable.cpp

@ -81,6 +81,7 @@ shared_ptr<NodeEntry> NodeTable::addNode(Node const& _node, NodeRelation _relati
{ {
shared_ptr<NodeEntry> ret(new NodeEntry(m_node, _node.id, _node.endpoint)); shared_ptr<NodeEntry> ret(new NodeEntry(m_node, _node.id, _node.endpoint));
ret->pending = false; ret->pending = false;
DEV_GUARDED(x_nodes)
m_nodes[_node.id] = ret; m_nodes[_node.id] = ret;
noteActiveNode(_node.id, _node.endpoint); noteActiveNode(_node.id, _node.endpoint);
return ret; return ret;
@ -101,13 +102,12 @@ shared_ptr<NodeEntry> NodeTable::addNode(Node const& _node, NodeRelation _relati
return move(shared_ptr<NodeEntry>()); return move(shared_ptr<NodeEntry>());
} }
{ DEV_GUARDED(x_nodes)
Guard ln(x_nodes);
if (m_nodes.count(_node.id)) if (m_nodes.count(_node.id))
return m_nodes[_node.id]; return m_nodes[_node.id];
}
shared_ptr<NodeEntry> ret(new NodeEntry(m_node, _node.id, _node.endpoint)); shared_ptr<NodeEntry> ret(new NodeEntry(m_node, _node.id, _node.endpoint));
DEV_GUARDED(x_nodes)
m_nodes[_node.id] = ret; m_nodes[_node.id] = ret;
clog(NodeTableConnect) << "addNode pending for" << _node.endpoint; clog(NodeTableConnect) << "addNode pending for" << _node.endpoint;
ping(_node.endpoint); ping(_node.endpoint);
@ -186,6 +186,7 @@ void NodeTable::discover(NodeId _node, unsigned _round, shared_ptr<set<shared_pt
tried.push_back(r); tried.push_back(r);
FindNode p(r->endpoint, _node); FindNode p(r->endpoint, _node);
p.sign(m_secret); p.sign(m_secret);
DEV_GUARDED(x_findNodeTimeout)
m_findNodeTimeout.push_back(make_pair(r->id, chrono::steady_clock::now())); m_findNodeTimeout.push_back(make_pair(r->id, chrono::steady_clock::now()));
m_socketPointer->send(p); m_socketPointer->send(p);
} }
@ -447,17 +448,17 @@ void NodeTable::onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytes
{ {
if (auto n = nodeEntry(nodeid)) if (auto n = nodeEntry(nodeid))
n->pending = false; n->pending = false;
else if (m_pubkDiscoverPings.count(_from.address())) else
{ {
DEV_GUARDED(x_pubkDiscoverPings)
{ {
Guard l(x_pubkDiscoverPings); if (!m_pubkDiscoverPings.count(_from.address()))
return; // unsolicited pong; don't note node as active
m_pubkDiscoverPings.erase(_from.address()); m_pubkDiscoverPings.erase(_from.address());
} }
if (!haveNode(nodeid)) if (!haveNode(nodeid))
addNode(Node(nodeid, NodeIPEndpoint(_from.address(), _from.port(), _from.port()))); addNode(Node(nodeid, NodeIPEndpoint(_from.address(), _from.port(), _from.port())));
} }
else
return; // unsolicited pong; don't note node as active
} }
// update our endpoint address and UDP port // update our endpoint address and UDP port
@ -473,6 +474,7 @@ void NodeTable::onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytes
{ {
bool expected = false; bool expected = false;
auto now = chrono::steady_clock::now(); auto now = chrono::steady_clock::now();
DEV_GUARDED(x_findNodeTimeout)
m_findNodeTimeout.remove_if([&](NodeIdTimePoint const& t) m_findNodeTimeout.remove_if([&](NodeIdTimePoint const& t)
{ {
if (t.first == nodeid && now - t.second < c_reqTimeout) if (t.first == nodeid && now - t.second < c_reqTimeout)

4
libp2p/UDP.cpp

@ -20,9 +20,13 @@
*/ */
#include "UDP.h" #include "UDP.h"
using namespace std;
using namespace dev; using namespace dev;
using namespace dev::p2p; using namespace dev::p2p;
const char* RLPXWarn::name() { return "!X!"; }
const char* RLPXNote::name() { return "-X-"; }
h256 RLPXDatagramFace::sign(Secret const& _k) h256 RLPXDatagramFace::sign(Secret const& _k)
{ {
assert(packetType()); assert(packetType());

25
libp2p/UDP.h

@ -30,6 +30,7 @@
#include <libdevcore/Guards.h> #include <libdevcore/Guards.h>
#include <libdevcrypto/Common.h> #include <libdevcrypto/Common.h>
#include <libdevcrypto/SHA3.h> #include <libdevcrypto/SHA3.h>
#include <libdevcore/Log.h>
#include <libdevcore/RLP.h> #include <libdevcore/RLP.h>
#include "Common.h" #include "Common.h"
namespace ba = boost::asio; namespace ba = boost::asio;
@ -40,6 +41,9 @@ namespace dev
namespace p2p namespace p2p
{ {
struct RLPXWarn: public LogChannel { static const char* name(); static const int verbosity = 0; };
struct RLPXNote: public LogChannel { static const char* name(); static const int verbosity = 1; };
/** /**
* UDP Datagram * UDP Datagram
* @todo make data protected/functional * @todo make data protected/functional
@ -203,13 +207,13 @@ void UDPSocket<Handler, MaxDatagramSize>::doRead()
auto self(UDPSocket<Handler, MaxDatagramSize>::shared_from_this()); auto self(UDPSocket<Handler, MaxDatagramSize>::shared_from_this());
m_socket.async_receive_from(boost::asio::buffer(m_recvData), m_recvEndpoint, [this, self](boost::system::error_code _ec, size_t _len) m_socket.async_receive_from(boost::asio::buffer(m_recvData), m_recvEndpoint, [this, self](boost::system::error_code _ec, size_t _len)
{ {
// ASIO Safety: It is possible that ASIO will call lambda w/o an error if (m_closed)
// and after the socket has been disconnected. Checking m_closed
// guarantees that m_host will not be called after disconnect().
if (_ec || m_closed)
return disconnectWithError(_ec); return disconnectWithError(_ec);
assert(_len); if (_ec != boost::system::errc::success)
clog(NetWarn) << "Receiving UDP message failed. " << _ec.value() << ":" << _ec.message();
if (_len)
m_host.onReceived(this, m_recvEndpoint, bytesConstRef(m_recvData.data(), _len)); m_host.onReceived(this, m_recvEndpoint, bytesConstRef(m_recvData.data(), _len));
doRead(); doRead();
}); });
@ -228,17 +232,14 @@ void UDPSocket<Handler, MaxDatagramSize>::doWrite()
{ {
if (m_closed) if (m_closed)
return disconnectWithError(_ec); return disconnectWithError(_ec);
else if (_ec != boost::system::errc::success &&
_ec != boost::system::errc::address_not_available && if (_ec != boost::system::errc::success)
_ec != boost::system::errc::host_unreachable) clog(NetWarn) << "Failed delivering UDP message. " << _ec.value() << ":" << _ec.message();
return disconnectWithError(_ec);
else
{
Guard l(x_sendQ); Guard l(x_sendQ);
m_sendQ.pop_front(); m_sendQ.pop_front();
if (m_sendQ.empty()) if (m_sendQ.empty())
return; return;
}
doWrite(); doWrite();
}); });
} }

Loading…
Cancel
Save