Browse Source

Vast P2P networking improvements, mainly for peer discovery, management

and handling.
cl-refactor
Gav Wood 11 years ago
parent
commit
07c804a885
  1. 4
      alethzero/MainWin.cpp
  2. 25
      libdevcore/RangeMask.h
  3. 2
      libdevcrypto/TrieDB.h
  4. 17
      libethcore/Exceptions.cpp
  5. 2
      libethcore/Exceptions.h
  6. 4
      libethereum/EthereumHost.h
  7. 10
      libethereum/State.cpp
  8. 1
      libp2p/CMakeLists.txt
  9. 6
      libp2p/Common.h
  10. 265
      libp2p/Host.cpp
  11. 104
      libp2p/Host.h
  12. 2
      libp2p/HostCapability.cpp
  13. 191
      libp2p/Session.cpp
  14. 22
      libp2p/Session.h
  15. 8
      libwebthree/WebThree.cpp
  16. 4
      libwebthree/WebThree.h
  17. 1
      libwhisper/Common.h
  18. 12
      third/MainWin.cpp
  19. 2
      third/MainWin.h

4
alethzero/MainWin.cpp

@ -500,7 +500,7 @@ void Main::writeSettings()
s.setValue("privateChain", m_privateChain); s.setValue("privateChain", m_privateChain);
s.setValue("verbosity", ui->verbosity->value()); s.setValue("verbosity", ui->verbosity->value());
bytes d = m_webThree->savePeers(); bytes d = m_webThree->saveNodes();
if (d.size()) if (d.size())
m_peers = QByteArray((char*)d.data(), (int)d.size()); m_peers = QByteArray((char*)d.data(), (int)d.size());
s.setValue("peers", m_peers); s.setValue("peers", m_peers);
@ -1542,7 +1542,7 @@ void Main::on_net_triggered()
web3()->startNetwork(); web3()->startNetwork();
ui->downloadView->setDownloadMan(ethereum()->downloadMan()); ui->downloadView->setDownloadMan(ethereum()->downloadMan());
if (m_peers.size() && ui->usePast->isChecked()) if (m_peers.size() && ui->usePast->isChecked())
web3()->restorePeers(bytesConstRef((byte*)m_peers.data(), m_peers.size())); web3()->restoreNodes(bytesConstRef((byte*)m_peers.data(), m_peers.size()));
} }
else else
{ {

25
libdevcore/RangeMask.h

@ -47,6 +47,7 @@ public:
RangeMask(T _begin, T _end): m_all(_begin, _end) {} RangeMask(T _begin, T _end): m_all(_begin, _end) {}
RangeMask(Range const& _c): m_all(_c) {} RangeMask(Range const& _c): m_all(_c) {}
RangeMask unionedWith(RangeMask const& _m) const { return operator+(_m); }
RangeMask operator+(RangeMask const& _m) const { return RangeMask(*this) += _m; } RangeMask operator+(RangeMask const& _m) const { return RangeMask(*this) += _m; }
RangeMask lowest(T _items) const RangeMask lowest(T _items) const
@ -57,7 +58,9 @@ public:
return ret; return ret;
} }
RangeMask operator~() const RangeMask operator~() const { return inverted(); }
RangeMask inverted() const
{ {
RangeMask ret(m_all); RangeMask ret(m_all);
T last = m_all.first; T last = m_all.first;
@ -72,13 +75,23 @@ public:
return ret; return ret;
} }
RangeMask& operator+=(RangeMask const& _m) RangeMask& invert() { return *this = inverted(); }
template <class S> RangeMask operator-(S const& _m) const { auto ret = *this; return ret -= _m; }
template <class S> RangeMask& operator-=(S const& _m) { return invert().unionWith(_m).invert(); }
RangeMask& operator+=(RangeMask const& _m) { return unionWith(_m); }
RangeMask& unionWith(RangeMask const& _m)
{ {
m_all.first = std::min(_m.m_all.first, m_all.first);
m_all.second = std::max(_m.m_all.second, m_all.second);
for (auto const& i: _m.m_ranges) for (auto const& i: _m.m_ranges)
operator+=(i); unionWith(i);
return *this; return *this;
} }
RangeMask& operator+=(UnsignedRange const& _m) RangeMask& operator+=(Range const& _m) { return unionWith(_m); }
RangeMask& unionWith(Range const& _m)
{ {
for (auto i = _m.first; i < _m.second;) for (auto i = _m.first; i < _m.second;)
{ {
@ -130,7 +143,8 @@ public:
return *this; return *this;
} }
RangeMask& operator+=(T _i) RangeMask& operator+=(T _m) { return unionWith(_m); }
RangeMask& unionWith(T _i)
{ {
return operator+=(Range(_i, _i + 1)); return operator+=(Range(_i, _i + 1));
} }
@ -165,6 +179,7 @@ public:
} }
std::pair<T, T> const& all() const { return m_all; } std::pair<T, T> const& all() const { return m_all; }
void extendAll(T _max) { m_all.second = _max; }
class const_iterator class const_iterator
{ {

2
libdevcrypto/TrieDB.h

@ -473,7 +473,7 @@ template <class DB> void GenericTrieDB<DB>::insert(bytesConstRef _key, bytesCons
assert(rv.size()); assert(rv.size());
bytes b = mergeAt(RLP(rv), NibbleSlice(_key), _value); bytes b = mergeAt(RLP(rv), NibbleSlice(_key), _value);
// mergeAt won't attempt to delete the node is it's less than 32 bytes // mergeAt won't attempt to delete the node if it's less than 32 bytes
// However, we know it's the root node and thus always hashed. // However, we know it's the root node and thus always hashed.
// So, if it's less than 32 (and thus should have been deleted but wasn't) then we delete it here. // So, if it's less than 32 (and thus should have been deleted but wasn't) then we delete it here.
if (rv.size() < 32) if (rv.size() < 32)

17
libethcore/Exceptions.cpp

@ -22,14 +22,17 @@
#include "Exceptions.h" #include "Exceptions.h"
#include <libdevcore/CommonIO.h> #include <libdevcore/CommonIO.h>
using namespace std;
using namespace dev; using namespace dev;
using namespace dev::eth; using namespace dev::eth;
const char* InvalidBlockFormat::what() const noexcept { return ("Invalid block format: Bad field " + toString(m_f) + " (" + toHex(m_d) + ")").c_str(); } #define ETH_RETURN_STRING(S) static string s_what; s_what = S; return s_what.c_str();
const char* UncleInChain::what() const noexcept { return ("Uncle in block already mentioned: Uncles " + toString(m_uncles) + " (" + m_block.abridged() + ")").c_str(); }
const char* InvalidTransactionsHash::what() const noexcept { return ("Invalid transactions hash: header says: " + toHex(m_head.ref()) + " block is:" + toHex(m_real.ref())).c_str(); } const char* InvalidBlockFormat::what() const noexcept { ETH_RETURN_STRING("Invalid block format: Bad field " + toString(m_f) + " (" + toHex(m_d) + ")"); }
const char* InvalidGasLimit::what() const noexcept { return ("Invalid gas limit (provided: " + toString(provided) + " valid:" + toString(valid) + ")").c_str(); } const char* UncleInChain::what() const noexcept { ETH_RETURN_STRING("Uncle in block already mentioned: Uncles " + toString(m_uncles) + " (" + m_block.abridged() + ")"); }
const char* InvalidMinGasPrice::what() const noexcept { return ("Invalid minimum gas price (provided: " + toString(provided) + " limit:" + toString(limit) + ")").c_str(); } const char* InvalidTransactionsHash::what() const noexcept { ETH_RETURN_STRING("Invalid transactions hash: header says: " + toHex(m_head.ref()) + " block is:" + toHex(m_real.ref())); }
const char* InvalidNonce::what() const noexcept { return ("Invalid nonce (r: " + toString(required) + " c:" + toString(candidate) + ")").c_str(); } const char* InvalidGasLimit::what() const noexcept { ETH_RETURN_STRING("Invalid gas limit (provided: " + toString(provided) + " valid:" + toString(valid) + ")"); }
const char* InvalidBlockNonce::what() const noexcept { return ("Invalid nonce (h: " + toString(h) + " n:" + toString(n) + " d:" + toString(d) + ")").c_str(); } const char* InvalidMinGasPrice::what() const noexcept { ETH_RETURN_STRING("Invalid minimum gas price (provided: " + toString(provided) + " limit:" + toString(limit) + ")"); }
const char* InvalidNonce::what() const noexcept { ETH_RETURN_STRING("Invalid nonce (r: " + toString(required) + " c:" + toString(candidate) + ")"); }
const char* InvalidBlockNonce::what() const noexcept { ETH_RETURN_STRING("Invalid nonce (h: " + toString(h) + " n:" + toString(n) + " d:" + toString(d) + ")"); }

2
libethcore/Exceptions.h

@ -48,7 +48,7 @@ class InvalidBlockFormat: public dev::Exception { public: InvalidBlockFormat(int
struct InvalidUnclesHash: virtual dev::Exception {}; struct InvalidUnclesHash: virtual dev::Exception {};
struct InvalidUncle: virtual dev::Exception {}; struct InvalidUncle: virtual dev::Exception {};
struct UncleTooOld: virtual dev::Exception {}; struct UncleTooOld: virtual dev::Exception {};
class UncleInChain: public dev::Exception { public: UncleInChain(h256Set _uncles, h256 _block): m_uncles(_uncles), m_block(_block) {} h256Set m_uncles; h256 m_block;virtual const char* what() const noexcept; }; class UncleInChain: public dev::Exception { public: UncleInChain(h256Set _uncles, h256 _block): m_uncles(_uncles), m_block(_block) {} h256Set m_uncles; h256 m_block; virtual const char* what() const noexcept; };
struct DuplicateUncleNonce: virtual dev::Exception {}; struct DuplicateUncleNonce: virtual dev::Exception {};
struct InvalidStateRoot: virtual dev::Exception {}; struct InvalidStateRoot: virtual dev::Exception {};
class InvalidTransactionsHash: public dev::Exception { public: InvalidTransactionsHash(h256 _head, h256 _real): m_head(_head), m_real(_real) {} h256 m_head; h256 m_real; virtual const char* what() const noexcept; }; class InvalidTransactionsHash: public dev::Exception { public: InvalidTransactionsHash(h256 _head, h256 _real): m_head(_head), m_real(_real) {} h256 m_head; h256 m_real; virtual const char* what() const noexcept; };

4
libethereum/EthereumHost.h

@ -72,7 +72,7 @@ public:
DownloadMan const& downloadMan() const { return m_man; } DownloadMan const& downloadMan() const { return m_man; }
bool isSyncing() const { return !!m_syncer; } bool isSyncing() const { return !!m_syncer; }
bool isBanned(h512 _id) const { return !!m_banned.count(_id); } bool isBanned(p2p::NodeId _id) const { return !!m_banned.count(_id); }
private: private:
/// Session is tell us that we may need (re-)syncing with the peer. /// Session is tell us that we may need (re-)syncing with the peer.
@ -116,7 +116,7 @@ private:
h256 m_latestBlockSent; h256 m_latestBlockSent;
h256Set m_transactionsSent; h256Set m_transactionsSent;
std::set<h512> m_banned; std::set<p2p::NodeId> m_banned;
}; };
} }

10
libethereum/State.cpp

@ -75,7 +75,9 @@ void ripemd160Code(bytesConstRef _in, bytesRef _out)
{ {
h256 ret; h256 ret;
ripemd160(_in, bytesRef(ret.data(), 32)); ripemd160(_in, bytesRef(ret.data(), 32));
memcpy(_out.data(), &ret, min(_out.size(), sizeof(ret))); memset(_out.data(), 0, std::min<int>(12, _out.size()));
if (_out.size() > 12)
memcpy(_out.data() + 12, &ret, min(_out.size() - 12, sizeof(ret)));
} }
const std::map<unsigned, PrecompiledAddress> State::c_precompiled = const std::map<unsigned, PrecompiledAddress> State::c_precompiled =
@ -1284,8 +1286,8 @@ std::ostream& dev::eth::operator<<(std::ostream& _out, State const& _s)
{ {
auto it = _s.m_cache.find(i); auto it = _s.m_cache.find(i);
AddressState* cache = it != _s.m_cache.end() ? &it->second : nullptr; AddressState* cache = it != _s.m_cache.end() ? &it->second : nullptr;
auto rlpString = trie.at(i); string rlpString = dtr.count(i) ? trie.at(i) : "";
RLP r(dtr.count(i) ? rlpString : ""); RLP r(rlpString);
assert(cache || r); assert(cache || r);
if (cache && !cache->isAlive()) if (cache && !cache->isAlive())
@ -1298,7 +1300,7 @@ std::ostream& dev::eth::operator<<(std::ostream& _out, State const& _s)
stringstream contout; stringstream contout;
if ((!cache || cache->codeBearing()) && (!r || r[3].toHash<h256>() != EmptySHA3)) if ((cache && cache->codeBearing()) || (!cache && r && !r[3].isEmpty()))
{ {
std::map<u256, u256> mem; std::map<u256, u256> mem;
std::set<u256> back; std::set<u256> back;

1
libp2p/CMakeLists.txt

@ -17,6 +17,7 @@ file(GLOB HEADERS "*.h")
include_directories(..) include_directories(..)
target_link_libraries(${EXECUTABLE} devcrypto)
target_link_libraries(${EXECUTABLE} devcore) target_link_libraries(${EXECUTABLE} devcore)
target_link_libraries(${EXECUTABLE} secp256k1) target_link_libraries(${EXECUTABLE} secp256k1)
if(MINIUPNPC_LS) if(MINIUPNPC_LS)

6
libp2p/Common.h

@ -44,6 +44,8 @@ class RLPStream;
namespace p2p namespace p2p
{ {
using NodeId = h512;
bool isPrivateAddress(bi::address const& _addressToCheck); bool isPrivateAddress(bi::address const& _addressToCheck);
class UPnP; class UPnP;
@ -82,8 +84,9 @@ enum DisconnectReason
TooManyPeers, TooManyPeers,
DuplicatePeer, DuplicatePeer,
IncompatibleProtocol, IncompatibleProtocol,
InvalidIdentity, NullIdentity,
ClientQuit, ClientQuit,
UnexpectedIdentity,
UserReason = 0x10 UserReason = 0x10
}; };
@ -96,6 +99,7 @@ typedef std::vector<CapDesc> CapDescs;
struct PeerInfo struct PeerInfo
{ {
NodeId id;
std::string clientVersion; std::string clientVersion;
std::string host; std::string host;
unsigned short port; unsigned short port;

265
libp2p/Host.cpp

@ -63,11 +63,11 @@ Host::Host(std::string const& _clientVersion, NetworkPreferences const& _n, bool
m_netPrefs(_n), m_netPrefs(_n),
m_acceptor(m_ioService), m_acceptor(m_ioService),
m_socket(m_ioService), m_socket(m_ioService),
m_id(h512::random()) m_key(KeyPair::create())
{ {
populateAddresses(); populateAddresses();
m_lastPeersRequest = chrono::steady_clock::time_point::min(); m_lastPeersRequest = chrono::steady_clock::time_point::min();
clog(NetNote) << "Id:" << m_id.abridged(); clog(NetNote) << "Id:" << id().abridged();
if (_start) if (_start)
start(); start();
} }
@ -109,11 +109,16 @@ void Host::start()
determinePublic(m_netPrefs.publicIP, m_netPrefs.upnp); determinePublic(m_netPrefs.publicIP, m_netPrefs.upnp);
ensureAccepting(); ensureAccepting();
m_incomingPeers.clear(); m_nodes.clear();
m_freePeers.clear(); m_freePeers.clear();
m_nodesList.clear();
m_ready.reset();
if (!m_public.address().is_unspecified())
noteNode(id(), m_public, Origin::Perfect, false);
m_lastPeersRequest = chrono::steady_clock::time_point::min(); m_lastPeersRequest = chrono::steady_clock::time_point::min();
clog(NetNote) << "Id:" << m_id.abridged(); clog(NetNote) << "Id:" << id().abridged();
for (auto const& h: m_capabilities) for (auto const& h: m_capabilities)
h.second->onStarting(); h.second->onStarting();
@ -149,9 +154,15 @@ unsigned Host::protocolVersion() const
void Host::registerPeer(std::shared_ptr<Session> _s, CapDescs const& _caps) void Host::registerPeer(std::shared_ptr<Session> _s, CapDescs const& _caps)
{ {
if (!_s->m_node || !_s->m_node->id)
{
cwarn << "Attempting to register a peer without node information!";
return;
}
{ {
Guard l(x_peers); RecursiveGuard l(x_peers);
m_peers[_s->m_id] = _s; m_peers[_s->m_node->id] = _s;
} }
unsigned o = (unsigned)UserPacket; unsigned o = (unsigned)UserPacket;
for (auto const& i: _caps) for (auto const& i: _caps)
@ -167,7 +178,7 @@ void Host::disconnectPeers()
for (unsigned n = 0;; n = 0) for (unsigned n = 0;; n = 0)
{ {
{ {
Guard l(x_peers); RecursiveGuard l(x_peers);
for (auto i: m_peers) for (auto i: m_peers)
if (auto p = i.second.lock()) if (auto p = i.second.lock())
{ {
@ -230,7 +241,7 @@ void Host::determinePublic(string const& _publicAddress, bool _upnp)
else else
{ {
m_public = bi::tcp::endpoint(bi::address::from_string(_publicAddress.empty() ? eip : _publicAddress), (unsigned short)p); m_public = bi::tcp::endpoint(bi::address::from_string(_publicAddress.empty() ? eip : _publicAddress), (unsigned short)p);
m_addresses.push_back(m_public.address().to_v4()); m_addresses.push_back(m_public.address());
} }
} }
else else
@ -239,7 +250,7 @@ void Host::determinePublic(string const& _publicAddress, bool _upnp)
m_public = bi::tcp::endpoint(_publicAddress.size() ? bi::address::from_string(_publicAddress) m_public = bi::tcp::endpoint(_publicAddress.size() ? bi::address::from_string(_publicAddress)
: m_peerAddresses.size() ? m_peerAddresses[0] : m_peerAddresses.size() ? m_peerAddresses[0]
: bi::address(), m_listenPort); : bi::address(), m_listenPort);
m_addresses.push_back(m_public.address().to_v4()); m_addresses.push_back(m_public.address());
} }
} }
@ -312,33 +323,83 @@ void Host::populateAddresses()
clog(NetNote) << "Couldn't resolve: " << host; clog(NetNote) << "Couldn't resolve: " << host;
} }
} }
else if (ifa->ifa_addr->sa_family == AF_INET6)
{
char host[NI_MAXHOST];
if (getnameinfo(ifa->ifa_addr, sizeof(struct sockaddr_in6), host, NI_MAXHOST, NULL, 0, NI_NUMERICHOST))
continue;
try
{
auto it = r.resolve({host, "30303"});
bi::tcp::endpoint ep = it->endpoint();
bi::address ad = ep.address();
m_addresses.push_back(ad.to_v6());
bool isLocal = std::find(c_rejectAddresses.begin(), c_rejectAddresses.end(), ad) != c_rejectAddresses.end();
if (!isLocal)
m_peerAddresses.push_back(ad);
clog(NetNote) << "Address: " << host << " = " << m_addresses.back() << (isLocal ? " [LOCAL]" : " [PEER]");
}
catch (...)
{
clog(NetNote) << "Couldn't resolve: " << host;
}
}
} }
freeifaddrs(ifaddr); freeifaddrs(ifaddr);
#endif #endif
} }
std::map<h512, bi::tcp::endpoint> Host::potentialPeers() shared_ptr<Node> Host::noteNode(NodeId _id, bi::tcp::endpoint const& _a, Origin _o, bool _ready, NodeId _oldId)
{ {
std::map<h512, bi::tcp::endpoint> ret; RecursiveGuard l(x_peers);
if (!m_public.address().is_unspecified()) unsigned i;
ret.insert(make_pair(m_id, m_public)); if (!m_nodes.count(_id))
Guard l(x_peers); {
for (auto i: m_peers) shared_ptr<Node> old;
if (auto j = i.second.lock()) if (m_nodes.count(_oldId))
{ {
auto ep = j->endpoint(); old = m_nodes[_oldId];
// cnote << "Checking potential peer" << j->m_listenPort << j->endpoint() << isPrivateAddress(ep.address()) << ep.port() << j->m_id.abridged(); i = old->index;
// Skip peers with a listen port of zero or are on a private network m_nodes.erase(_oldId);
bool peerOnNet = (j->m_listenPort != 0 && (!isPrivateAddress(ep.address()) || m_netPrefs.localNetworking)); m_nodesList[i] = _id;
if (!peerOnNet && m_incomingPeers.count(j->m_id)) m_nodes[id()] = make_shared<Node>();
{ }
ep = m_incomingPeers.at(j->m_id).first; else
peerOnNet = (j->m_listenPort != 0 && (!isPrivateAddress(ep.address()) || m_netPrefs.localNetworking)); {
} i = m_nodesList.size();
if (peerOnNet && ep.port() && j->m_id) m_nodesList.push_back(_id);
ret.insert(make_pair(i.first, ep)); m_nodes[_id] = make_shared<Node>();
} }
m_nodes[_id]->address = m_public;
m_nodes[_id]->index = i;
m_nodes[_id]->id = _id;
m_nodes[_id]->idOrigin = _o;
}
else
{
i = m_nodes[_id]->index;
m_nodes[_id]->idOrigin = max(m_nodes[_id]->idOrigin, _o);
}
m_ready.extendAll(i);
m_private.extendAll(i);
if (_ready)
m_ready += i;
else
m_ready -= i;
if (!_a.port() || (isPrivateAddress(_a.address()) && !m_netPrefs.localNetworking))
m_private += i;
else
m_private -= i;
return m_nodes[_id];
}
Nodes Host::potentialPeers(RangeMask<unsigned> const& _known)
{
RecursiveGuard l(x_peers);
Nodes ret;
for (auto i: m_ready - (m_private + _known))
ret.push_back(*m_nodes[m_nodesList[i]]);
return ret; return ret;
} }
@ -359,7 +420,7 @@ void Host::ensureAccepting()
} catch (...){} } catch (...){}
bi::address remoteAddress = m_socket.remote_endpoint().address(); bi::address remoteAddress = m_socket.remote_endpoint().address();
// Port defaults to 0 - we let the hello tell us which port the peer listens to // Port defaults to 0 - we let the hello tell us which port the peer listens to
auto p = std::make_shared<Session>(this, std::move(m_socket), remoteAddress); auto p = std::make_shared<Session>(this, std::move(m_socket), bi::tcp::endpoint(remoteAddress, 0));
p->start(); p->start();
} }
catch (Exception const& _e) catch (Exception const& _e)
@ -415,26 +476,15 @@ void Host::connect(std::string const& _addr, unsigned short _port) noexcept
void Host::connect(bi::tcp::endpoint const& _ep) void Host::connect(bi::tcp::endpoint const& _ep)
{ {
clog(NetConnect) << "Attempting connection to " << _ep; clog(NetConnect) << "Attempting single-shot connection to " << _ep;
bi::tcp::socket* s = new bi::tcp::socket(m_ioService); bi::tcp::socket* s = new bi::tcp::socket(m_ioService);
s->async_connect(_ep, [=](boost::system::error_code const& ec) s->async_connect(_ep, [=](boost::system::error_code const& ec)
{ {
if (ec) if (ec)
{
clog(NetConnect) << "Connection refused to " << _ep << " (" << ec.message() << ")"; clog(NetConnect) << "Connection refused to " << _ep << " (" << ec.message() << ")";
for (auto i = m_incomingPeers.begin(); i != m_incomingPeers.end(); ++i)
if (i->second.first == _ep && i->second.second < 3)
{
m_freePeers.push_back(i->first);
goto OK;
}
// for-else
clog(NetConnect) << "Giving up.";
OK:;
}
else else
{ {
auto p = make_shared<Session>(this, std::move(*s), _ep.address(), _ep.port()); auto p = make_shared<Session>(this, std::move(*s), _ep);
clog(NetConnect) << "Connected to " << _ep; clog(NetConnect) << "Connected to " << _ep;
p->start(); p->start();
} }
@ -442,9 +492,35 @@ void Host::connect(bi::tcp::endpoint const& _ep)
}); });
} }
bool Host::havePeer(h512 _id) const void Node::connect(Host* _h)
{ {
Guard l(x_peers); clog(NetConnect) << "Attempting connection to node" << id.abridged() << "@" << address;
_h->m_ready -= index;
bi::tcp::socket* s = new bi::tcp::socket(_h->m_ioService);
s->async_connect(address, [=](boost::system::error_code const& ec)
{
if (ec)
{
clog(NetConnect) << "Connection refused to node" << id.abridged() << "@" << address << "(" << ec.message() << ")";
failedAttempts++;
lastAttempted = std::chrono::system_clock::now();
_h->m_ready += index;
}
else
{
clog(NetConnect) << "Connected to" << id.abridged() << "@" << address;
failedAttempts = 0;
lastConnected = std::chrono::system_clock::now();
auto p = make_shared<Session>(_h, std::move(*s), _h->node(id), true); // true because we don't care about ids matched for now. Once we have permenant IDs this will matter a lot more and we can institute a safer mechanism.
p->start();
}
delete s;
});
}
bool Host::havePeer(NodeId _id) const
{
RecursiveGuard l(x_peers);
// Remove dead peers from list. // Remove dead peers from list.
for (auto i = m_peers.begin(); i != m_peers.end();) for (auto i = m_peers.begin(); i != m_peers.end();)
@ -458,7 +534,7 @@ bool Host::havePeer(h512 _id) const
void Host::growPeers() void Host::growPeers()
{ {
Guard l(x_peers); RecursiveGuard l(x_peers);
while (m_peers.size() < m_idealPeerCount) while (m_peers.size() < m_idealPeerCount)
{ {
if (m_freePeers.empty()) if (m_freePeers.empty())
@ -483,16 +559,15 @@ void Host::growPeers()
} }
auto x = time(0) % m_freePeers.size(); auto x = time(0) % m_freePeers.size();
m_incomingPeers[m_freePeers[x]].second++;
if (!m_peers.count(m_freePeers[x])) if (!m_peers.count(m_freePeers[x]))
connect(m_incomingPeers[m_freePeers[x]].first); connect(m_nodes[m_freePeers[x]]->address);
m_freePeers.erase(m_freePeers.begin() + x); m_freePeers.erase(m_freePeers.begin() + x);
} }
} }
void Host::prunePeers() void Host::prunePeers()
{ {
Guard l(x_peers); RecursiveGuard l(x_peers);
// We'll keep at most twice as many as is ideal, halfing what counts as "too young to kill" until we get there. // We'll keep at most twice as many as is ideal, halfing what counts as "too young to kill" until we get there.
for (unsigned old = 15000; m_peers.size() > m_idealPeerCount * 2 && old > 100; old /= 2) for (unsigned old = 15000; m_peers.size() > m_idealPeerCount * 2 && old > 100; old /= 2)
while (m_peers.size() > m_idealPeerCount) while (m_peers.size() > m_idealPeerCount)
@ -506,7 +581,7 @@ void Host::prunePeers()
if (/*(m_mode != NodeMode::Host || p->m_caps != 0x01) &&*/ chrono::steady_clock::now() > p->m_connect + chrono::milliseconds(old)) // don't throw off new peers; peer-servers should never kick off other peer-servers. if (/*(m_mode != NodeMode::Host || p->m_caps != 0x01) &&*/ chrono::steady_clock::now() > p->m_connect + chrono::milliseconds(old)) // don't throw off new peers; peer-servers should never kick off other peer-servers.
{ {
++agedPeers; ++agedPeers;
if ((!worst || p->m_rating < worst->m_rating || (p->m_rating == worst->m_rating && p->m_connect > worst->m_connect))) // kill older ones if ((!worst || p->rating() < worst->rating() || (p->rating() == worst->rating() && p->m_connect > worst->m_connect))) // kill older ones
worst = p; worst = p;
} }
if (!worst || agedPeers <= m_idealPeerCount) if (!worst || agedPeers <= m_idealPeerCount)
@ -524,10 +599,12 @@ void Host::prunePeers()
std::vector<PeerInfo> Host::peers(bool _updatePing) const std::vector<PeerInfo> Host::peers(bool _updatePing) const
{ {
Guard l(x_peers); RecursiveGuard l(x_peers);
if (_updatePing) if (_updatePing)
{
const_cast<Host*>(this)->pingAll(); const_cast<Host*>(this)->pingAll();
this_thread::sleep_for(chrono::milliseconds(200)); this_thread::sleep_for(chrono::milliseconds(200));
}
std::vector<PeerInfo> ret; std::vector<PeerInfo> ret;
for (auto& i: m_peers) for (auto& i: m_peers)
if (auto j = i.second.lock()) if (auto j = i.second.lock())
@ -545,36 +622,82 @@ void Host::doWork()
void Host::pingAll() void Host::pingAll()
{ {
Guard l(x_peers); RecursiveGuard l(x_peers);
for (auto& i: m_peers) for (auto& i: m_peers)
if (auto j = i.second.lock()) if (auto j = i.second.lock())
j->ping(); j->ping();
} }
bytes Host::savePeers() const bytes Host::saveNodes() const
{ {
Guard l(x_peers); RLPStream nodes;
RLPStream ret; int count = 0;
int n = 0; {
for (auto& i: m_peers) RecursiveGuard l(x_peers);
if (auto p = i.second.lock()) for (auto const& i: m_nodes)
if (p->m_socket.is_open() && p->endpoint().port()) {
{ Node const& n = *(i.second);
ret.appendList(3) << p->endpoint().address().to_v4().to_bytes() << p->endpoint().port() << p->m_id; nodes.appendList(4);
n++; if (n.address.address().is_v4())
} nodes << n.address.address().to_v4().to_bytes();
return RLPStream(n).appendRaw(ret.out(), n).out(); else
nodes << n.address.address().to_v6().to_bytes();
nodes << n.address.port() << n.id << (int)n.idOrigin
<< std::chrono::duration_cast<std::chrono::seconds>(n.lastConnected.time_since_epoch()).count()
<< std::chrono::duration_cast<std::chrono::seconds>(n.lastAttempted.time_since_epoch()).count()
<< n.failedAttempts << n.lastDisconnect << n.score << n.rating;
count++;
}
}
RLPStream ret(3);
ret << 0 << m_key.secret();
ret.appendList(count).appendRaw(nodes.out(), count);
return ret.out();
} }
void Host::restorePeers(bytesConstRef _b) void Host::restoreNodes(bytesConstRef _b)
{ {
for (auto i: RLP(_b)) RecursiveGuard l(x_peers);
{ RLP r(_b);
auto k = (h512)i[2]; if (r.itemCount() == 2 && r[0].isInt())
if (!m_incomingPeers.count(k)) switch (r[0].toInt<int>())
{ {
m_incomingPeers.insert(make_pair(k, make_pair(bi::tcp::endpoint(bi::address_v4(i[0].toArray<byte, 4>()), i[1].toInt<short>()), 0))); case 0:
m_freePeers.push_back(k); m_key = KeyPair(r[1].toHash<Secret>());
for (auto i: r[2])
{
auto id = (NodeId)i[2];
auto o = (Origin)i[3].toInt<int>();
if (!m_nodes.count(id))
{
bi::tcp::endpoint ep;
if (i[0].size() == 4)
ep = bi::tcp::endpoint(bi::address_v4(i[0].toArray<byte, 4>()), i[1].toInt<short>());
else
ep = bi::tcp::endpoint(bi::address_v6(i[0].toArray<byte, 16>()), i[1].toInt<short>());
auto n = noteNode(id, ep, o, true);
n->lastConnected = chrono::system_clock::time_point(chrono::seconds(i[4].toInt<unsigned>()));
n->lastAttempted = chrono::system_clock::time_point(chrono::seconds(i[5].toInt<unsigned>()));
n->failedAttempts = i[6].toInt<unsigned>();
n->lastDisconnect = (int)i[7].toInt<unsigned>();
n->score = (int)i[8].toInt<unsigned>();
n->rating = (int)i[9].toInt<unsigned>();
}
}
default:;
}
else
for (auto i: r)
{
auto id = (NodeId)i[2];
if (!m_nodes.count(id))
{
bi::tcp::endpoint ep;
if (i[0].size() == 4)
ep = bi::tcp::endpoint(bi::address_v4(i[0].toArray<byte, 4>()), i[1].toInt<short>());
else
ep = bi::tcp::endpoint(bi::address_v6(i[0].toArray<byte, 16>()), i[1].toInt<short>());
auto n = noteNode(id, ep, Origin::Self, true);
}
} }
}
} }

104
libp2p/Host.h

@ -28,8 +28,11 @@
#include <memory> #include <memory>
#include <utility> #include <utility>
#include <thread> #include <thread>
#include <chrono>
#include <libdevcore/Guards.h> #include <libdevcore/Guards.h>
#include <libdevcore/Worker.h> #include <libdevcore/Worker.h>
#include <libdevcore/RangeMask.h>
#include <libdevcrypto/Common.h>
#include "HostCapability.h" #include "HostCapability.h"
#include "Common.h" #include "Common.h"
namespace ba = boost::asio; namespace ba = boost::asio;
@ -43,6 +46,36 @@ class RLPStream;
namespace p2p namespace p2p
{ {
class Host;
enum class Origin
{
Unknown,
Self,
SelfThird,
PerfectThird,
Perfect,
};
struct Node
{
NodeId id; ///< Their id/public key.
unsigned index; ///< Index into m_nodesList
bi::tcp::endpoint address; ///< As reported from the node itself.
int score = 0; ///< All time cumulative.
int rating = 0; ///< Trending.
std::chrono::system_clock::time_point lastConnected;
std::chrono::system_clock::time_point lastAttempted;
unsigned failedAttempts = 0;
int lastDisconnect = -1; ///< Reason for disconnect that happened last.
Origin idOrigin = Origin::Unknown; ///< Thirdparty
void connect(Host* _h);
};
using Nodes = std::vector<Node>;
struct NetworkPreferences struct NetworkPreferences
{ {
NetworkPreferences(unsigned short p = 30303, std::string i = std::string(), bool u = true, bool l = false): listenPort(p), publicIP(i), upnp(u), localNetworking(l) {} NetworkPreferences(unsigned short p = 30303, std::string i = std::string(), bool u = true, bool l = false): listenPort(p), publicIP(i), upnp(u), localNetworking(l) {}
@ -61,6 +94,7 @@ class Host: public Worker
{ {
friend class Session; friend class Session;
friend class HostCapabilityFace; friend class HostCapabilityFace;
friend class Node;
public: public:
/// Start server, listening for connections on the given port. /// Start server, listening for connections on the given port.
@ -88,7 +122,7 @@ public:
void connect(bi::tcp::endpoint const& _ep); void connect(bi::tcp::endpoint const& _ep);
/// @returns true iff we have the a peer of the given id. /// @returns true iff we have the a peer of the given id.
bool havePeer(h512 _id) const; bool havePeer(NodeId _id) const;
/// Set ideal number of peers. /// Set ideal number of peers.
void setIdealPeerCount(unsigned _n) { m_idealPeerCount = _n; } void setIdealPeerCount(unsigned _n) { m_idealPeerCount = _n; }
@ -97,7 +131,7 @@ public:
std::vector<PeerInfo> peers(bool _updatePing = false) const; std::vector<PeerInfo> peers(bool _updatePing = false) const;
/// Get number of peers connected; equivalent to, but faster than, peers().size(). /// Get number of peers connected; equivalent to, but faster than, peers().size().
size_t peerCount() const { Guard l(x_peers); return m_peers.size(); } size_t peerCount() const { RecursiveGuard l(x_peers); return m_peers.size(); }
/// Ping the peers, to update the latency information. /// Ping the peers, to update the latency information.
void pingAll(); void pingAll();
@ -106,10 +140,10 @@ public:
unsigned short listenPort() const { return m_public.port(); } unsigned short listenPort() const { return m_public.port(); }
/// Serialise the set of known peers. /// Serialise the set of known peers.
bytes savePeers() const; bytes saveNodes() const;
/// Deserialise the data and populate the set of known peers. /// Deserialise the data and populate the set of known peers.
void restorePeers(bytesConstRef _b); void restoreNodes(bytesConstRef _b);
void setNetworkPreferences(NetworkPreferences const& _p) { stop(); m_netPrefs = _p; start(); } void setNetworkPreferences(NetworkPreferences const& _p) { stop(); m_netPrefs = _p; start(); }
@ -117,14 +151,13 @@ public:
void stop(); void stop();
bool isStarted() const { return isWorking(); } bool isStarted() const { return isWorking(); }
h512 id() const { return m_id; } NodeId id() const { return m_key.pub(); }
void registerPeer(std::shared_ptr<Session> _s, CapDescs const& _caps); void registerPeer(std::shared_ptr<Session> _s, CapDescs const& _caps);
private: std::shared_ptr<Node> node(NodeId _id) const { if (m_nodes.count(_id)) return m_nodes.at(_id); return std::shared_ptr<Node>(); }
/// Called when the session has provided us with a new peer we can connect to.
void noteNewPeers() {}
private:
void seal(bytes& _b); void seal(bytes& _b);
void populateAddresses(); void populateAddresses();
void determinePublic(std::string const& _publicAddress, bool _upnp); void determinePublic(std::string const& _publicAddress, bool _upnp);
@ -138,36 +171,51 @@ private:
/// This won't touch alter the blockchain. /// This won't touch alter the blockchain.
virtual void doWork(); virtual void doWork();
std::map<h512, bi::tcp::endpoint> potentialPeers(); std::shared_ptr<Node> noteNode(NodeId _id, bi::tcp::endpoint const& _a, Origin _o, bool _ready, NodeId _oldId = h256());
Nodes potentialPeers(RangeMask<unsigned> const& _known);
std::string m_clientVersion; ///< Our version string.
NetworkPreferences m_netPrefs; ///< Network settings.
static const int NetworkStopped = -1; ///< The value meaning we're not actually listening.
int m_listenPort = NetworkStopped; ///< What port are we listening on?
ba::io_service m_ioService; ///< IOService for network stuff.
bi::tcp::acceptor m_acceptor; ///< Listening acceptor.
bi::tcp::socket m_socket; ///< Listening socket.
std::string m_clientVersion; UPnP* m_upnp = nullptr; ///< UPnP helper.
bi::tcp::endpoint m_public; ///< Our public listening endpoint.
KeyPair m_key; ///< Our unique ID.
NetworkPreferences m_netPrefs; mutable RecursiveMutex x_peers;
static const int NetworkStopped = -1; /// The nodes to which we are currently connected.
int m_listenPort = NetworkStopped; /// Mutable because we flush zombie entries (null-weakptrs) as regular maintenance from a const method.
mutable std::map<NodeId, std::weak_ptr<Session>> m_peers;
ba::io_service m_ioService; /// Nodes to which we may connect (or to which we have connected).
bi::tcp::acceptor m_acceptor; /// TODO: does this need a lock?
bi::tcp::socket m_socket; std::map<NodeId, std::shared_ptr<Node> > m_nodes;
UPnP* m_upnp = nullptr; /// A list of node IDs. This contains every index from m_nodes; the order is guaranteed to remain the same.
bi::tcp::endpoint m_public; std::vector<NodeId> m_nodesList;
h512 m_id;
mutable std::mutex x_peers; RangeMask<unsigned> m_ready; ///< Indices into m_nodesList over to which nodes we are not currently connected, connecting or otherwise ignoring.
mutable std::map<h512, std::weak_ptr<Session>> m_peers; // mutable because we flush zombie entries (null-weakptrs) as regular maintenance from a const method. RangeMask<unsigned> m_private; ///< Indices into m_nodesList over to which nodes are private.
std::map<h512, std::pair<bi::tcp::endpoint, unsigned>> m_incomingPeers; // TODO: does this need a lock? std::vector<NodeId> m_freePeers;// TODO: Kill
std::vector<h512> m_freePeers; std::chrono::steady_clock::time_point m_lastPeersRequest;// TODO: Kill
std::chrono::steady_clock::time_point m_lastPeersRequest; unsigned m_idealPeerCount = 5; ///< Ideal number of peers to be connected to.
unsigned m_idealPeerCount = 5;
std::vector<bi::address_v4> m_addresses; // Our addresses.
std::vector<bi::address_v4> m_peerAddresses; std::vector<bi::address> m_addresses; ///< Addresses for us.
std::vector<bi::address> m_peerAddresses; ///< Addresses that peers (can) know us by.
std::map<CapDesc, std::shared_ptr<HostCapabilityFace>> m_capabilities; // Our capabilities.
std::map<CapDesc, std::shared_ptr<HostCapabilityFace>> m_capabilities; ///< Each of the capabilities we support.
bool m_accepting = false; bool m_accepting = false;
}; };

2
libp2p/HostCapability.cpp

@ -34,7 +34,7 @@ void HostCapabilityFace::seal(bytes& _b)
std::vector<std::shared_ptr<Session> > HostCapabilityFace::peers() const std::vector<std::shared_ptr<Session> > HostCapabilityFace::peers() const
{ {
Guard l(m_host->x_peers); RecursiveGuard l(m_host->x_peers);
std::vector<std::shared_ptr<Session> > ret; std::vector<std::shared_ptr<Session> > ret;
for (auto const& i: m_host->m_peers) for (auto const& i: m_host->m_peers)
if (std::shared_ptr<Session> p = i.second.lock()) if (std::shared_ptr<Session> p = i.second.lock())

191
libp2p/Session.cpp

@ -36,19 +36,35 @@ using namespace dev::p2p;
#endif #endif
#define clogS(X) dev::LogOutputStream<X, true>(false) << "| " << std::setw(2) << m_socket.native_handle() << "] " #define clogS(X) dev::LogOutputStream<X, true>(false) << "| " << std::setw(2) << m_socket.native_handle() << "] "
Session::Session(Host* _s, bi::tcp::socket _socket, bi::address _peerAddress, unsigned short _peerPort): Session::Session(Host* _s, bi::tcp::socket _socket, bi::tcp::endpoint const& _manual):
m_server(_s), m_server(_s),
m_socket(std::move(_socket)), m_socket(std::move(_socket)),
m_listenPort(_peerPort), m_node(nullptr),
m_rating(0) m_manualEndpoint(_manual)
{ {
m_disconnect = std::chrono::steady_clock::time_point::max(); m_disconnect = std::chrono::steady_clock::time_point::max();
m_connect = std::chrono::steady_clock::now(); m_connect = std::chrono::steady_clock::now();
m_info = PeerInfo({"?", _peerAddress.to_string(), m_listenPort, std::chrono::steady_clock::duration(0), CapDescSet(), 0, map<string, string>()});
m_info = PeerInfo({NodeId(), "?", m_manualEndpoint.address().to_string(), m_manualEndpoint.port(), std::chrono::steady_clock::duration(0), CapDescSet(), 0, map<string, string>()});
}
Session::Session(Host* _s, bi::tcp::socket _socket, std::shared_ptr<Node> const& _n, bool _force):
m_server(_s),
m_socket(std::move(_socket)),
m_node(_n),
m_manualEndpoint(_n->address),
m_force(_force)
{
m_disconnect = std::chrono::steady_clock::time_point::max();
m_connect = std::chrono::steady_clock::now();
m_info = PeerInfo({m_node->id, "?", _n->address.address().to_string(), _n->address.port(), std::chrono::steady_clock::duration(0), CapDescSet(), 0, map<string, string>()});
} }
Session::~Session() Session::~Session()
{ {
if (id())
m_server->noteNode(id(), m_manualEndpoint, Origin::Unknown, true);
// Read-chain finished for one reason or another. // Read-chain finished for one reason or another.
for (auto& i: m_capabilities) for (auto& i: m_capabilities)
i.second.reset(); i.second.reset();
@ -61,30 +77,72 @@ Session::~Session()
catch (...){} catch (...){}
} }
NodeId Session::id() const
{
return m_node ? m_node->id : NodeId();
}
void Session::addRating(unsigned _r)
{
if (m_node)
{
m_node->rating += _r;
m_node->score += _r;
}
}
int Session::rating() const
{
return m_node->rating;
}
bi::tcp::endpoint Session::endpoint() const bi::tcp::endpoint Session::endpoint() const
{ {
if (m_socket.is_open()) if (m_socket.is_open() && m_node)
try try
{ {
return bi::tcp::endpoint(m_socket.remote_endpoint().address(), m_listenPort); return bi::tcp::endpoint(m_socket.remote_endpoint().address(), m_node->address.port());
} }
catch (...){} catch (...) {}
return bi::tcp::endpoint(); if (m_node)
return m_node->address;
return m_manualEndpoint;
}
template <class T> vector<T> randomSelection(vector<T> const& _t, unsigned _n)
{
if (_t.size() <= _n)
return _t;
vector<T> ret = _t;
while (ret.size() > _n)
{
auto i = ret.begin();
advance(i, rand() % ret.size());
ret.erase(i);
}
return ret;
} }
bool Session::interpret(RLP const& _r) bool Session::interpret(RLP const& _r)
{ {
clogS(NetRight) << _r; clogS(NetRight) << _r;
try // Generic try-catch block designed to capture RLP format errors - TODO: give decent diagnostics, make a bit more specific over what is caught.
{
switch ((PacketType)_r[0].toInt<unsigned>()) switch ((PacketType)_r[0].toInt<unsigned>())
{ {
case HelloPacket: case HelloPacket:
{ {
if (m_node)
m_node->lastDisconnect = -1;
m_protocolVersion = _r[1].toInt<unsigned>(); m_protocolVersion = _r[1].toInt<unsigned>();
auto clientVersion = _r[2].toString(); auto clientVersion = _r[2].toString();
auto caps = _r[3].toVector<CapDesc>(); auto caps = _r[3].toVector<CapDesc>();
m_listenPort = _r[4].toInt<unsigned short>(); auto listenPort = _r[4].toInt<unsigned short>();
m_id = _r[5].toHash<h512>(); auto id = _r[5].toHash<NodeId>();
// clang error (previously: ... << hex << caps ...) // clang error (previously: ... << hex << caps ...)
// "'operator<<' should be declared prior to the call site or in an associated namespace of one of its arguments" // "'operator<<' should be declared prior to the call site or in an associated namespace of one of its arguments"
@ -92,32 +150,44 @@ bool Session::interpret(RLP const& _r)
for (auto cap: caps) for (auto cap: caps)
capslog << "(" << hex << cap.first << "," << hex << cap.second << ")"; capslog << "(" << hex << cap.first << "," << hex << cap.second << ")";
clogS(NetMessageSummary) << "Hello: " << clientVersion << "V[" << m_protocolVersion << "]" << m_id.abridged() << showbase << capslog.str() << dec << m_listenPort; clogS(NetMessageSummary) << "Hello: " << clientVersion << "V[" << m_protocolVersion << "]" << id.abridged() << showbase << capslog.str() << dec << listenPort;
if (m_server->havePeer(m_id)) if (m_server->havePeer(id))
{ {
// Already connected. // Already connected.
clogS(NetWarn) << "Already have peer id" << m_id.abridged();// << "at" << l->endpoint() << "rather than" << endpoint(); clogS(NetWarn) << "Already connected to a peer with id" << id.abridged();
disconnect(DuplicatePeer); disconnect(DuplicatePeer);
return false; return false;
} }
if (!m_id)
if (m_node && m_node->id != id)
{ {
disconnect(InvalidIdentity); if (m_force || m_node->idOrigin <= Origin::SelfThird)
return false; // SECURITY: We're forcing through the new ID, despite having been told
clogS(NetWarn) << "Connected to node, but their ID has changed since last time. This could indicate a MitM attack. Allowing anyway...";
else
{
clogS(NetWarn) << "Connected to node, but their ID has changed since last time. This could indicate a MitM attack. Disconnecting.";
disconnect(UnexpectedIdentity);
return false;
}
} }
if (m_protocolVersion != m_server->protocolVersion())
if (!id)
{ {
disconnect(IncompatibleProtocol); disconnect(NullIdentity);
return false; return false;
} }
try
{ m_info = PeerInfo({clientVersion, m_socket.remote_endpoint().address().to_string(), m_listenPort, std::chrono::steady_clock::duration(), _r[3].toSet<CapDesc>(), (unsigned)m_socket.native_handle(), map<string, string>() }); } m_node = m_server->noteNode(id, bi::tcp::endpoint(m_socket.remote_endpoint().address(), listenPort), Origin::Self, false, m_node->id == id ? NodeId() : m_node->id);
catch (...) m_knownNodes.unionWith(m_node->index);
if (m_protocolVersion != m_server->protocolVersion())
{ {
disconnect(BadProtocol); disconnect(IncompatibleProtocol);
return false; return false;
} }
m_info = PeerInfo({id, clientVersion, m_socket.remote_endpoint().address().to_string(), listenPort, std::chrono::steady_clock::duration(), _r[3].toSet<CapDesc>(), (unsigned)m_socket.native_handle(), map<string, string>() });
m_server->registerPeer(shared_from_this(), caps); m_server->registerPeer(shared_from_this(), caps);
break; break;
@ -150,16 +220,20 @@ bool Session::interpret(RLP const& _r)
case GetPeersPacket: case GetPeersPacket:
{ {
clogS(NetTriviaSummary) << "GetPeers"; clogS(NetTriviaSummary) << "GetPeers";
auto peers = m_server->potentialPeers(); auto peers = m_server->potentialPeers(m_knownNodes);
if (peers.empty())
break;
RLPStream s; RLPStream s;
prep(s, PeersPacket, peers.size()); prep(s, PeersPacket, min<unsigned>(10, peers.size()));
for (auto i: peers) auto rs = randomSelection(peers, 10);
for (auto const& i: rs)
{ {
clogS(NetTriviaDetail) << "Sending peer " << i.first.abridged() << i.second; clogS(NetTriviaDetail) << "Sending peer " << i.id.abridged() << i.address;
if (i.second.address().is_v4()) if (i.address.address().is_v4())
s.appendList(3) << bytesConstRef(i.second.address().to_v4().to_bytes().data(), 4) << i.second.port() << i.first; s.appendList(3) << bytesConstRef(i.address.address().to_v4().to_bytes().data(), 4) << i.address.port() << i.id;
else// if (i.second.address().is_v6()) - assumed else// if (i.second.address().is_v6()) - assumed
s.appendList(3) << bytesConstRef(i.second.address().to_v6().to_bytes().data(), 16) << i.second.port() << i.first; s.appendList(3) << bytesConstRef(i.address.address().to_v6().to_bytes().data(), 16) << i.address.port() << i.id;
m_knownNodes.unionWith(i.index);
} }
sealAndSend(s); sealAndSend(s);
break; break;
@ -179,28 +253,51 @@ bool Session::interpret(RLP const& _r)
return false; return false;
} }
auto ep = bi::tcp::endpoint(peerAddress, _r[i][1].toInt<short>()); auto ep = bi::tcp::endpoint(peerAddress, _r[i][1].toInt<short>());
h512 id = _r[i][2].toHash<h512>(); NodeId id = _r[i][2].toHash<NodeId>();
clogS(NetAllDetail) << "Checking: " << ep << "(" << id.abridged() << ")" << isPrivateAddress(peerAddress) << m_id.abridged() << isPrivateAddress(endpoint().address()) << m_server->m_incomingPeers.count(id) << (m_server->m_incomingPeers.count(id) ? isPrivateAddress(m_server->m_incomingPeers.at(id).first.address()) : -1); clogS(NetAllDetail) << "Checking: " << ep << "(" << id.abridged() << ")" << isPrivateAddress(peerAddress) << this->id().abridged() << isPrivateAddress(endpoint().address()) << m_server->m_nodes.count(id) << (m_server->m_nodes.count(id) ? isPrivateAddress(m_server->m_nodes.at(id)->address.address()) : -1);
if (isPrivateAddress(peerAddress) && !m_server->m_netPrefs.localNetworking) if (isPrivateAddress(peerAddress) && !m_server->m_netPrefs.localNetworking)
goto CONTINUE; goto CONTINUE; // Private address. Ignore.
if (!id)
goto CONTINUE; // Null identity. Ignore.
if (m_server->id() == id)
goto CONTINUE; // Just our info - we already have that.
if (id == this->id())
goto CONTINUE; // Just their info - we already have that.
// check that it's not us or one we already know: // check that it's not us or one we already know:
if (!(m_id == id && isPrivateAddress(endpoint().address()) && (!m_server->m_incomingPeers.count(id) || isPrivateAddress(m_server->m_incomingPeers.at(id).first.address()))) && (!id || m_server->m_id == id || m_server->m_incomingPeers.count(id))) if (m_server->m_nodes.count(id))
{
// Already got this node.
// See if it's any better that ours or not...
// This could be the public address of a known node.
// SECURITY: remove this in beta - it's only for lazy connections and presents an easy attack vector.
if (m_server->m_nodes.count(id) && isPrivateAddress(m_server->m_nodes.at(id)->address.address()))
// Update address if the node if we now have a public IP for it.
m_server->m_nodes[id]->address = ep;
goto CONTINUE; goto CONTINUE;
}
// check that we're not already connected to addr:
if (!ep.port()) if (!ep.port())
goto CONTINUE; goto CONTINUE; // Zero port? Don't think so.
// Avoid our random other addresses that they might end up giving us.
for (auto i: m_server->m_addresses) for (auto i: m_server->m_addresses)
if (ep.address() == i && ep.port() == m_server->listenPort()) if (ep.address() == i && ep.port() == m_server->listenPort())
goto CONTINUE; goto CONTINUE;
for (auto i: m_server->m_incomingPeers)
if (i.second.first == ep) // Check that we don't already know about this addr:port combination. If we are, assume the original is best.
goto CONTINUE; // SECURITY: Not a valid assumption in general. Should compare ID origins and pick the best or note uncertainty and weight each equally.
m_server->m_incomingPeers[id] = make_pair(ep, 0); for (auto const& i: m_server->m_nodes)
m_server->m_freePeers.push_back(id); if (i.second->address == ep)
m_server->noteNewPeers(); goto CONTINUE; // Same address but a different node.
// OK passed all our checks. Assume it's good.
addRating(1000);
m_server->noteNode(id, ep, m_node->idOrigin == Origin::Perfect ? Origin::PerfectThird : Origin::SelfThird, true);
clogS(NetTriviaDetail) << "New peer: " << ep << "(" << id .abridged()<< ")"; clogS(NetTriviaDetail) << "New peer: " << ep << "(" << id .abridged()<< ")";
CONTINUE:; CONTINUE:;
} }
@ -214,6 +311,12 @@ bool Session::interpret(RLP const& _r)
return false; return false;
} }
} }
}
catch (...)
{
disconnect(BadProtocol);
return false;
}
return true; return true;
} }
@ -347,6 +450,10 @@ void Session::dropped()
void Session::disconnect(int _reason) void Session::disconnect(int _reason)
{ {
clogS(NetConnect) << "Disconnecting (reason:" << reasonOf((DisconnectReason)_reason) << ")"; clogS(NetConnect) << "Disconnecting (reason:" << reasonOf((DisconnectReason)_reason) << ")";
if (m_node)
m_node->lastDisconnect = _reason;
if (m_socket.is_open()) if (m_socket.is_open())
{ {
if (m_disconnect == chrono::steady_clock::time_point::max()) if (m_disconnect == chrono::steady_clock::time_point::max())
@ -369,7 +476,7 @@ void Session::start()
<< m_server->m_clientVersion << m_server->m_clientVersion
<< m_server->caps() << m_server->caps()
<< m_server->m_public.port() << m_server->m_public.port()
<< m_server->m_id; << m_server->id();
sealAndSend(s); sealAndSend(s);
ping(); ping();
getPeers(); getPeers();

22
libp2p/Session.h

@ -29,6 +29,7 @@
#include <utility> #include <utility>
#include <libdevcore/Common.h> #include <libdevcore/Common.h>
#include <libdevcore/RLP.h> #include <libdevcore/RLP.h>
#include <libdevcore/RangeMask.h>
#include "Common.h" #include "Common.h"
namespace dev namespace dev
@ -37,6 +38,8 @@ namespace dev
namespace p2p namespace p2p
{ {
class Node;
/** /**
* @brief The Session class * @brief The Session class
* @todo Document fully. * @todo Document fully.
@ -47,7 +50,8 @@ class Session: public std::enable_shared_from_this<Session>
friend class HostCapabilityFace; friend class HostCapabilityFace;
public: public:
Session(Host* _server, bi::tcp::socket _socket, bi::address _peerAddress, unsigned short _peerPort = 0); Session(Host* _server, bi::tcp::socket _socket, std::shared_ptr<Node> const& _n, bool _force = false);
Session(Host* _server, bi::tcp::socket _socket, bi::tcp::endpoint const& _manual);
virtual ~Session(); virtual ~Session();
void start(); void start();
@ -57,7 +61,7 @@ public:
bool isOpen() const { return m_socket.is_open(); } bool isOpen() const { return m_socket.is_open(); }
h512 id() const { return m_id; } NodeId id() const;
unsigned socketId() const { return m_socket.native_handle(); } unsigned socketId() const { return m_socket.native_handle(); }
bi::tcp::endpoint endpoint() const; ///< for other peers to connect to. bi::tcp::endpoint endpoint() const; ///< for other peers to connect to.
@ -71,7 +75,8 @@ public:
void sendDestroy(bytes& _msg); void sendDestroy(bytes& _msg);
void send(bytesConstRef _msg); void send(bytesConstRef _msg);
void addRating(unsigned _r) { m_rating += _r; } int rating() const;
void addRating(unsigned _r);
void addNote(std::string const& _k, std::string const& _v) { m_info.notes[_k] = _v; } void addNote(std::string const& _k, std::string const& _v) { m_info.notes[_k] = _v; }
@ -95,23 +100,22 @@ private:
std::mutex m_writeLock; std::mutex m_writeLock;
std::deque<bytes> m_writeQueue; std::deque<bytes> m_writeQueue;
mutable bi::tcp::socket m_socket; ///< Mutable to ask for native_handle(). mutable bi::tcp::socket m_socket; ///< Mutable to ask for native_handle().
std::array<byte, 65536> m_data; std::array<byte, 65536> m_data;
PeerInfo m_info; PeerInfo m_info;
h512 m_id;
bytes m_incoming; bytes m_incoming;
unsigned m_protocolVersion; unsigned m_protocolVersion;
unsigned short m_listenPort; ///< Port that the remote client is listening on for connections. Useful for giving to peers. std::shared_ptr<Node> m_node;
bi::tcp::endpoint m_manualEndpoint;
bool m_force = false; /// If true, ignore IDs being different. This could open you up to MitM attacks.
std::chrono::steady_clock::time_point m_ping; std::chrono::steady_clock::time_point m_ping;
std::chrono::steady_clock::time_point m_connect; std::chrono::steady_clock::time_point m_connect;
std::chrono::steady_clock::time_point m_disconnect; std::chrono::steady_clock::time_point m_disconnect;
unsigned m_rating;
std::map<CapDesc, std::shared_ptr<Capability>> m_capabilities; std::map<CapDesc, std::shared_ptr<Capability>> m_capabilities;
std::set<h512> m_knownPeers; RangeMask<unsigned> m_knownNodes; ///< Nodes we already know about as indices into Host's nodesList. These shouldn't be resent to peer.
bool m_willBeDeleted = false; ///< True if we already posted a deleter on the strand. bool m_willBeDeleted = false; ///< True if we already posted a deleter on the strand.
}; };

8
libwebthree/WebThree.cpp

@ -68,14 +68,14 @@ void WebThreeDirect::setIdealPeerCount(size_t _n)
return m_net.setIdealPeerCount(_n); return m_net.setIdealPeerCount(_n);
} }
bytes WebThreeDirect::savePeers() bytes WebThreeDirect::saveNodes()
{ {
return m_net.savePeers(); return m_net.saveNodes();
} }
void WebThreeDirect::restorePeers(bytesConstRef _saved) void WebThreeDirect::restoreNodes(bytesConstRef _saved)
{ {
return m_net.restorePeers(_saved); return m_net.restoreNodes(_saved);
} }
void WebThreeDirect::connect(std::string const& _seedHost, unsigned short _port) void WebThreeDirect::connect(std::string const& _seedHost, unsigned short _port)

4
libwebthree/WebThree.h

@ -96,10 +96,10 @@ public:
bool haveNetwork() { return peerCount() != 0; } bool haveNetwork() { return peerCount() != 0; }
/// Save peers /// Save peers
dev::bytes savePeers(); dev::bytes saveNodes();
/// Restore peers /// Restore peers
void restorePeers(bytesConstRef _saved); void restoreNodes(bytesConstRef _saved);
/// Sets the ideal number of peers. /// Sets the ideal number of peers.
void setIdealPeerCount(size_t _n); void setIdealPeerCount(size_t _n);

1
libwhisper/Common.h

@ -35,7 +35,6 @@ namespace shh
/* this makes these symbols ambiguous on VS2013 /* this makes these symbols ambiguous on VS2013
using h256 = dev::h256; using h256 = dev::h256;
using h512 = dev::h512;
using h256s = dev::h256s; using h256s = dev::h256s;
using bytes = dev::bytes; using bytes = dev::bytes;
using RLPStream = dev::RLPStream; using RLPStream = dev::RLPStream;

12
third/MainWin.cpp

@ -365,10 +365,10 @@ void Main::writeSettings()
s.setValue("address", b); s.setValue("address", b);
s.setValue("url", ui->urlEdit->text()); s.setValue("url", ui->urlEdit->text());
bytes d = m_web3->savePeers(); bytes d = m_web3->saveNodes();
if (d.size()) if (d.size())
m_peers = QByteArray((char*)d.data(), (int)d.size()); m_nodes = QByteArray((char*)d.data(), (int)d.size());
s.setValue("peers", m_peers); s.setValue("peers", m_nodes);
s.setValue("geometry", saveGeometry()); s.setValue("geometry", saveGeometry());
s.setValue("windowState", saveState()); s.setValue("windowState", saveState());
@ -397,7 +397,7 @@ void Main::readSettings(bool _skipGeometry)
} }
} }
ethereum()->setAddress(m_myKeys.back().address()); ethereum()->setAddress(m_myKeys.back().address());
m_peers = s.value("peers").toByteArray(); m_nodes = s.value("peers").toByteArray();
ui->urlEdit->setText(s.value("url", "about:blank").toString()); //http://gavwood.com/gavcoin.html ui->urlEdit->setText(s.value("url", "about:blank").toString()); //http://gavwood.com/gavcoin.html
on_urlEdit_returnPressed(); on_urlEdit_returnPressed();
} }
@ -571,8 +571,8 @@ void Main::ensureNetwork()
else else
if (!m_web3->peerCount()) if (!m_web3->peerCount())
m_web3->connect(defPeer); m_web3->connect(defPeer);
if (m_peers.size()) if (m_nodes.size())
m_web3->restorePeers(bytesConstRef((byte*)m_peers.data(), m_peers.size())); m_web3->restoreNodes(bytesConstRef((byte*)m_nodes.data(), m_nodes.size()));
} }
void Main::on_connect_triggered() void Main::on_connect_triggered()

2
third/MainWin.h

@ -127,7 +127,7 @@ private:
unsigned m_currenciesFilter = (unsigned)-1; unsigned m_currenciesFilter = (unsigned)-1;
unsigned m_balancesFilter = (unsigned)-1; unsigned m_balancesFilter = (unsigned)-1;
QByteArray m_peers; QByteArray m_nodes;
QStringList m_servers; QStringList m_servers;
QNetworkAccessManager m_webCtrl; QNetworkAccessManager m_webCtrl;

Loading…
Cancel
Save