Browse Source

Merge branch 'develop' of https://github.com/ethereum/cpp-ethereum into develop

cl-refactor
Vlad Gluhovsky 10 years ago
parent
commit
287014dc67
  1. 38
      libp2p/NodeTable.cpp
  2. 4
      libp2p/UDP.cpp
  3. 35
      libp2p/UDP.h

38
libp2p/NodeTable.cpp

@ -81,7 +81,8 @@ shared_ptr<NodeEntry> NodeTable::addNode(Node const& _node, NodeRelation _relati
{
shared_ptr<NodeEntry> ret(new NodeEntry(m_node, _node.id, _node.endpoint));
ret->pending = false;
m_nodes[_node.id] = ret;
DEV_GUARDED(x_nodes)
m_nodes[_node.id] = ret;
noteActiveNode(_node.id, _node.endpoint);
return ret;
}
@ -101,14 +102,13 @@ shared_ptr<NodeEntry> NodeTable::addNode(Node const& _node, NodeRelation _relati
return move(shared_ptr<NodeEntry>());
}
{
Guard ln(x_nodes);
DEV_GUARDED(x_nodes)
if (m_nodes.count(_node.id))
return m_nodes[_node.id];
}
shared_ptr<NodeEntry> ret(new NodeEntry(m_node, _node.id, _node.endpoint));
m_nodes[_node.id] = ret;
DEV_GUARDED(x_nodes)
m_nodes[_node.id] = ret;
clog(NodeTableConnect) << "addNode pending for" << _node.endpoint;
ping(_node.endpoint);
return ret;
@ -186,7 +186,8 @@ void NodeTable::discover(NodeId _node, unsigned _round, shared_ptr<set<shared_pt
tried.push_back(r);
FindNode p(r->endpoint, _node);
p.sign(m_secret);
m_findNodeTimeout.push_back(make_pair(r->id, chrono::steady_clock::now()));
DEV_GUARDED(x_findNodeTimeout)
m_findNodeTimeout.push_back(make_pair(r->id, chrono::steady_clock::now()));
m_socketPointer->send(p);
}
@ -447,17 +448,17 @@ void NodeTable::onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytes
{
if (auto n = nodeEntry(nodeid))
n->pending = false;
else if (m_pubkDiscoverPings.count(_from.address()))
else
{
DEV_GUARDED(x_pubkDiscoverPings)
{
Guard l(x_pubkDiscoverPings);
if (!m_pubkDiscoverPings.count(_from.address()))
return; // unsolicited pong; don't note node as active
m_pubkDiscoverPings.erase(_from.address());
}
if (!haveNode(nodeid))
addNode(Node(nodeid, NodeIPEndpoint(_from.address(), _from.port(), _from.port())));
}
else
return; // unsolicited pong; don't note node as active
}
// update our endpoint address and UDP port
@ -473,14 +474,15 @@ void NodeTable::onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytes
{
bool expected = false;
auto now = chrono::steady_clock::now();
m_findNodeTimeout.remove_if([&](NodeIdTimePoint const& t)
{
if (t.first == nodeid && now - t.second < c_reqTimeout)
expected = true;
else if (t.first == nodeid)
return true;
return false;
});
DEV_GUARDED(x_findNodeTimeout)
m_findNodeTimeout.remove_if([&](NodeIdTimePoint const& t)
{
if (t.first == nodeid && now - t.second < c_reqTimeout)
expected = true;
else if (t.first == nodeid)
return true;
return false;
});
if (!expected)
{

4
libp2p/UDP.cpp

@ -20,9 +20,13 @@
*/
#include "UDP.h"
using namespace std;
using namespace dev;
using namespace dev::p2p;
const char* RLPXWarn::name() { return "!X!"; }
const char* RLPXNote::name() { return "-X-"; }
h256 RLPXDatagramFace::sign(Secret const& _k)
{
assert(packetType());

35
libp2p/UDP.h

@ -30,6 +30,7 @@
#include <libdevcore/Guards.h>
#include <libdevcrypto/Common.h>
#include <libdevcrypto/SHA3.h>
#include <libdevcore/Log.h>
#include <libdevcore/RLP.h>
#include "Common.h"
namespace ba = boost::asio;
@ -40,6 +41,9 @@ namespace dev
namespace p2p
{
struct RLPXWarn: public LogChannel { static const char* name(); static const int verbosity = 0; };
struct RLPXNote: public LogChannel { static const char* name(); static const int verbosity = 1; };
/**
* UDP Datagram
* @todo make data protected/functional
@ -203,14 +207,14 @@ void UDPSocket<Handler, MaxDatagramSize>::doRead()
auto self(UDPSocket<Handler, MaxDatagramSize>::shared_from_this());
m_socket.async_receive_from(boost::asio::buffer(m_recvData), m_recvEndpoint, [this, self](boost::system::error_code _ec, size_t _len)
{
// ASIO Safety: It is possible that ASIO will call lambda w/o an error
// and after the socket has been disconnected. Checking m_closed
// guarantees that m_host will not be called after disconnect().
if (_ec || m_closed)
if (m_closed)
return disconnectWithError(_ec);
if (_ec != boost::system::errc::success)
clog(NetWarn) << "Receiving UDP message failed. " << _ec.value() << ":" << _ec.message();
assert(_len);
m_host.onReceived(this, m_recvEndpoint, bytesConstRef(m_recvData.data(), _len));
if (_len)
m_host.onReceived(this, m_recvEndpoint, bytesConstRef(m_recvData.data(), _len));
doRead();
});
}
@ -228,17 +232,14 @@ void UDPSocket<Handler, MaxDatagramSize>::doWrite()
{
if (m_closed)
return disconnectWithError(_ec);
else if (_ec != boost::system::errc::success &&
_ec != boost::system::errc::address_not_available &&
_ec != boost::system::errc::host_unreachable)
return disconnectWithError(_ec);
else
{
Guard l(x_sendQ);
m_sendQ.pop_front();
if (m_sendQ.empty())
return;
}
if (_ec != boost::system::errc::success)
clog(NetWarn) << "Failed delivering UDP message. " << _ec.value() << ":" << _ec.message();
Guard l(x_sendQ);
m_sendQ.pop_front();
if (m_sendQ.empty())
return;
doWrite();
});
}

Loading…
Cancel
Save