Browse Source

Merge pull request #656 from ethereum/p2p

p2p: peer discovery integration
cl-refactor
Gav Wood 10 years ago
parent
commit
90a23d7ba9
  1. 47
      alethzero/MainWin.cpp
  2. 2
      alethzero/MainWin.h
  3. 7
      eth/main.cpp
  4. 25
      libethereum/EthereumHost.cpp
  5. 55
      libp2p/Common.h
  6. 733
      libp2p/Host.cpp
  7. 192
      libp2p/Host.h
  8. 14
      libp2p/HostCapability.cpp
  9. 3
      libp2p/HostCapability.h
  10. 339
      libp2p/NodeTable.cpp
  11. 302
      libp2p/NodeTable.h
  12. 83
      libp2p/Peer.cpp
  13. 97
      libp2p/Peer.h
  14. 216
      libp2p/Session.cpp
  15. 20
      libp2p/Session.h
  16. 35
      libp2p/UDP.cpp
  17. 14
      libp2p/UDP.h
  18. 2
      libp2p/UPnP.cpp
  19. 19
      libwebthree/WebThree.cpp
  20. 23
      libwebthree/WebThree.h
  21. 16
      libwhisper/WhisperHost.cpp
  22. 8
      neth/main.cpp
  23. 19
      test/net.cpp
  24. 78
      test/peer.cpp
  25. 159
      test/whisperTopic.cpp
  26. 19
      third/MainWin.cpp
  27. 2
      third/MainWin.h

47
alethzero/MainWin.cpp

@ -161,8 +161,11 @@ Main::Main(QWidget *parent) :
statusBar()->addPermanentWidget(ui->blockCount);
connect(ui->ourAccounts->model(), SIGNAL(rowsMoved(const QModelIndex &, int, int, const QModelIndex &, int)), SLOT(ourAccountsRowsMoved()));
m_webThree.reset(new WebThreeDirect(string("AlethZero/v") + dev::Version + "/" DEV_QUOTED(ETH_BUILD_TYPE) "/" DEV_QUOTED(ETH_BUILD_PLATFORM), getDataDir() + "/AlethZero", false, {"eth", "shh"}));
QSettings s("ethereum", "alethzero");
m_networkConfig = s.value("peers").toByteArray();
bytesConstRef network((byte*)m_networkConfig.data(), m_networkConfig.size());
m_webThree.reset(new WebThreeDirect(string("AlethZero/v") + dev::Version + "/" DEV_QUOTED(ETH_BUILD_TYPE) "/" DEV_QUOTED(ETH_BUILD_PLATFORM), getDataDir() + "/AlethZero", false, {"eth", "shh"}, p2p::NetworkPreferences(), network));
m_qwebConnector.reset(new QWebThreeConnector());
m_server.reset(new OurWebThreeStubServer(*m_qwebConnector, *web3(), keysAsVector(m_myKeys), this));
@ -210,11 +213,11 @@ Main::Main(QWidget *parent) :
Main::~Main()
{
writeSettings();
// Must do this here since otherwise m_ethereum'll be deleted (and therefore clearWatches() called by the destructor)
// *after* the client is dead.
m_qweb->clientDieing();
g_logPost = simpleDebugOut;
writeSettings();
}
void Main::on_newIdentity_triggered()
@ -678,10 +681,10 @@ void Main::writeSettings()
s.setValue("verbosity", ui->verbosity->value());
s.setValue("jitvm", ui->jitvm->isChecked());
bytes d = m_webThree->saveNodes();
bytes d = m_webThree->saveNetwork();
if (d.size())
m_peers = QByteArray((char*)d.data(), (int)d.size());
s.setValue("peers", m_peers);
m_networkConfig = QByteArray((char*)d.data(), (int)d.size());
s.setValue("peers", m_networkConfig);
s.setValue("nameReg", ui->nameReg->text());
s.setValue("geometry", saveGeometry());
@ -730,7 +733,6 @@ void Main::readSettings(bool _skipGeometry)
}
}
m_peers = s.value("peers").toByteArray();
ui->upnp->setChecked(s.value("upnp", true).toBool());
ui->forceAddress->setText(s.value("forceAddress", "").toString());
ui->usePast->setChecked(s.value("usePast", true).toBool());
@ -962,31 +964,27 @@ void Main::refreshNetwork()
if (web3()->haveNetwork())
{
map<h512, QString> clients;
for (PeerInfo const& i: ps)
map<h512, QString> sessions;
for (PeerSessionInfo const& i: ps)
ui->peers->addItem(QString("[%8 %7] %3 ms - %1:%2 - %4 %5 %6")
.arg(QString::fromStdString(i.host))
.arg(i.port)
.arg(chrono::duration_cast<chrono::milliseconds>(i.lastPing).count())
.arg(clients[i.id] = QString::fromStdString(i.clientVersion))
.arg(sessions[i.id] = QString::fromStdString(i.clientVersion))
.arg(QString::fromStdString(toString(i.caps)))
.arg(QString::fromStdString(toString(i.notes)))
.arg(i.socket)
.arg(QString::fromStdString(i.id.abridged())));
auto ns = web3()->nodes();
for (p2p::Node const& i: ns)
if (!i.dead)
ui->nodes->insertItem(clients.count(i.id) ? 0 : ui->nodes->count(), QString("[%1 %3] %2 - ( =%5s | /%4s%6 ) - *%7 $%8")
.arg(QString::fromStdString(i.id.abridged()))
.arg(QString::fromStdString(toString(i.address)))
.arg(i.id == web3()->id() ? "self" : clients.count(i.id) ? clients[i.id] : i.secondsSinceLastAttempted() == -1 ? "session-fail" : i.secondsSinceLastAttempted() >= (int)i.fallbackSeconds() ? "retrying..." : "retry-" + QString::number(i.fallbackSeconds() - i.secondsSinceLastAttempted()) + "s")
.arg(i.secondsSinceLastAttempted())
.arg(i.secondsSinceLastConnected())
.arg(i.isOffline() ? " | " + QString::fromStdString(reasonOf(i.lastDisconnect)) + " | " + QString::number(i.failedAttempts) + "x" : "")
.arg(i.rating)
.arg((int)i.idOrigin)
);
for (p2p::Peer const& i: ns)
ui->nodes->insertItem(sessions.count(i.id) ? 0 : ui->nodes->count(), QString("[%1 %3] %2 - ( =%5s | /%4s%6 ) - *%7 $%8")
.arg(QString::fromStdString(i.id.abridged()))
.arg(QString::fromStdString(i.peerEndpoint().address().to_string()))
.arg(i.id == web3()->id() ? "self" : sessions.count(i.id) ? sessions[i.id] : "disconnected")
.arg(i.isOffline() ? " | " + QString::fromStdString(reasonOf(i.lastDisconnect())) + " | " + QString::number(i.failedAttempts()) + "x" : "")
.arg(i.rating())
);
}
}
@ -1899,8 +1897,9 @@ void Main::on_net_triggered()
web3()->setIdealPeerCount(ui->idealPeers->value());
web3()->setNetworkPreferences(netPrefs());
ethereum()->setNetworkId(m_privateChain.size() ? sha3(m_privateChain.toStdString()) : 0);
if (m_peers.size()/* && ui->usePast->isChecked()*/)
web3()->restoreNodes(bytesConstRef((byte*)m_peers.data(), m_peers.size()));
// TODO: p2p
// if (m_networkConfig.size()/* && ui->usePast->isChecked()*/)
// web3()->restoreNetwork(bytesConstRef((byte*)m_networkConfig.data(), m_networkConfig.size()));
web3()->startNetwork();
ui->downloadView->setDownloadMan(ethereum()->downloadMan());
}

2
alethzero/MainWin.h

@ -251,7 +251,7 @@ private:
unsigned m_currenciesFilter = (unsigned)-1;
unsigned m_balancesFilter = (unsigned)-1;
QByteArray m_peers;
QByteArray m_networkConfig;
QStringList m_servers;
QList<dev::KeyPair> m_myKeys;
QList<dev::KeyPair> m_myIdentities;

7
eth/main.cpp

@ -352,12 +352,14 @@ int main(int argc, char** argv)
VMFactory::setKind(jit ? VMKind::JIT : VMKind::Interpreter);
NetworkPreferences netPrefs(listenPort, publicIP, upnp, useLocal);
auto nodesState = contents((dbPath.size() ? dbPath : getDataDir()) + "/network.rlp");
dev::WebThreeDirect web3(
"Ethereum(++)/" + clientName + "v" + dev::Version + "/" DEV_QUOTED(ETH_BUILD_TYPE) "/" DEV_QUOTED(ETH_BUILD_PLATFORM) + (jit ? "/JIT" : ""),
dbPath,
false,
mode == NodeMode::Full ? set<string>{"eth", "shh"} : set<string>(),
netPrefs,
&nodesState,
miners
);
web3.setIdealPeerCount(peers);
@ -369,9 +371,6 @@ int main(int argc, char** argv)
c->setAddress(coinbase);
}
auto nodesState = contents((dbPath.size() ? dbPath : getDataDir()) + "/nodeState.rlp");
web3.restoreNodes(&nodesState);
cout << "Address: " << endl << toHex(us.address().asArray()) << endl;
web3.startNetwork();
@ -899,7 +898,7 @@ int main(int argc, char** argv)
while (!g_exit)
this_thread::sleep_for(chrono::milliseconds(1000));
writeFile((dbPath.size() ? dbPath : getDataDir()) + "/nodeState.rlp", web3.saveNodes());
writeFile((dbPath.size() ? dbPath : getDataDir()) + "/network.rlp", web3.saveNetwork());
return 0;
}

25
libethereum/EthereumHost.cpp

@ -51,8 +51,8 @@ EthereumHost::EthereumHost(BlockChain const& _ch, TransactionQueue& _tq, BlockQu
EthereumHost::~EthereumHost()
{
for (auto const& i: peers())
i->cap<EthereumPeer>()->abortSync();
for (auto i: peerSessions())
i.first->cap<EthereumPeer>().get()->abortSync();
}
bool EthereumHost::ensureInitialised()
@ -95,16 +95,19 @@ void EthereumHost::changeSyncer(EthereumPeer* _syncer)
if (isSyncing())
{
if (_syncer->m_asking == Asking::Blocks)
for (auto j: peers())
if (j->cap<EthereumPeer>().get() != _syncer && j->cap<EthereumPeer>()->m_asking == Asking::Nothing)
j->cap<EthereumPeer>()->transition(Asking::Blocks);
for (auto j: peerSessions())
{
auto e = j.first->cap<EthereumPeer>().get();
if (e != _syncer && e->m_asking == Asking::Nothing)
e->transition(Asking::Blocks);
}
}
else
{
// start grabbing next hash chain if there is one.
for (auto j: peers())
for (auto j: peerSessions())
{
j->cap<EthereumPeer>()->attemptSync();
j.first->cap<EthereumPeer>()->attemptSync();
if (isSyncing())
return;
}
@ -167,8 +170,8 @@ void EthereumHost::doWork()
void EthereumHost::maintainTransactions()
{
// Send any new transactions.
for (auto const& p: peers())
if (auto ep = p->cap<EthereumPeer>())
for (auto p: peerSessions())
if (auto ep = p.first->cap<EthereumPeer>().get())
{
bytes b;
unsigned n = 0;
@ -198,9 +201,9 @@ void EthereumHost::maintainBlocks(h256 _currentHash)
{
clog(NetMessageSummary) << "Sending a new block (current is" << _currentHash << ", was" << m_latestBlockSent << ")";
for (auto j: peers())
for (auto j: peerSessions())
{
auto p = j->cap<EthereumPeer>();
auto p = j.first->cap<EthereumPeer>().get();
RLPStream ts;
p->prep(ts, NewBlockPacket, 2).appendRaw(m_chain.block(), 1).append(m_chain.details().totalDifficulty);

55
libp2p/Common.h

@ -16,9 +16,10 @@
*/
/** @file Common.h
* @author Gav Wood <i@gavwood.com>
* @author Alex Leverington <nessence@gmail.com>
* @date 2014
*
* Miscellanea required for the Host/Session classes.
* Miscellanea required for the Host/Session/NodeTable classes.
*/
#pragma once
@ -29,9 +30,9 @@
#include <boost/asio.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <chrono>
#include <libdevcore/Common.h>
#include <libdevcrypto/Common.h>
#include <libdevcore/Log.h>
#include <libdevcore/FixedHash.h>
#include <libdevcore/Exceptions.h>
namespace ba = boost::asio;
namespace bi = boost::asio::ip;
@ -54,6 +55,8 @@ class Capability;
class Host;
class Session;
struct NetworkStartRequired: virtual dev::Exception {};
struct NetWarn: public LogChannel { static const char* name() { return "!N!"; } static const int verbosity = 0; };
struct NetNote: public LogChannel { static const char* name() { return "*N*"; } static const int verbosity = 1; };
struct NetMessageSummary: public LogChannel { static const char* name() { return "-N-"; } static const int verbosity = 2; };
@ -116,7 +119,12 @@ using CapDesc = std::pair<std::string, u256>;
using CapDescSet = std::set<CapDesc>;
using CapDescs = std::vector<CapDesc>;
struct PeerInfo
/*
* Used by Host to pass negotiated information about a connection to a
* new Peer Session; PeerSessionInfo is then maintained by Session and can
* be queried for point-in-time status information via Host.
*/
struct PeerSessionInfo
{
NodeId id;
std::string clientVersion;
@ -128,7 +136,44 @@ struct PeerInfo
std::map<std::string, std::string> notes;
};
using PeerInfos = std::vector<PeerInfo>;
using PeerSessionInfos = std::vector<PeerSessionInfo>;
/**
* @brief IPv4,UDP/TCP endpoints.
*/
struct NodeIPEndpoint
{
NodeIPEndpoint(): udp(bi::udp::endpoint()), tcp(bi::tcp::endpoint()) {}
NodeIPEndpoint(bi::udp::endpoint _udp): udp(_udp) {}
NodeIPEndpoint(bi::tcp::endpoint _tcp): tcp(_tcp) {}
NodeIPEndpoint(bi::udp::endpoint _udp, bi::tcp::endpoint _tcp): udp(_udp), tcp(_tcp) {}
bi::udp::endpoint udp;
bi::tcp::endpoint tcp;
operator bool() const { return !udp.address().is_unspecified() || !tcp.address().is_unspecified(); }
};
struct Node
{
Node(): endpoint(NodeIPEndpoint()) {};
Node(Public _pubk, NodeIPEndpoint _ip, bool _required = false): id(_pubk), endpoint(_ip), required(_required) {}
Node(Public _pubk, bi::udp::endpoint _udp, bool _required = false): Node(_pubk, NodeIPEndpoint(_udp), _required) {}
virtual NodeId const& address() const { return id; }
virtual Public const& publicKey() const { return id; }
NodeId id;
/// Endpoints by which we expect to reach node.
NodeIPEndpoint endpoint;
/// If true, node will not be removed from Node list.
// TODO: p2p implement
bool required = false;
virtual operator bool() const { return (bool)id; }
};
}
}

733
libp2p/Host.cpp

@ -28,6 +28,7 @@
#include <libdevcore/Common.h>
#include <libdevcore/CommonIO.h>
#include <libethcore/Exceptions.h>
#include <libdevcrypto/FileSystem.h>
#include "Session.h"
#include "Common.h"
#include "Capability.h"
@ -37,22 +38,29 @@ using namespace std;
using namespace dev;
using namespace dev::p2p;
Host::Host(std::string const& _clientVersion, NetworkPreferences const& _n, bool _start):
HostNodeTableHandler::HostNodeTableHandler(Host& _host): m_host(_host) {}
void HostNodeTableHandler::processEvent(NodeId const& _n, NodeTableEventType const& _e)
{
m_host.onNodeTableEvent(_n, _e);
}
Host::Host(std::string const& _clientVersion, NetworkPreferences const& _n, bytesConstRef _restoreNetwork):
Worker("p2p", 0),
m_restoreNetwork(_restoreNetwork.toBytes()),
m_clientVersion(_clientVersion),
m_netPrefs(_n),
m_ifAddresses(Network::getInterfaceAddresses()),
m_ioService(2),
m_tcp4Acceptor(m_ioService),
m_key(KeyPair::create())
m_alias(networkAlias(_restoreNetwork)),
m_lastPing(chrono::time_point<chrono::steady_clock>::min())
{
for (auto address: m_ifAddresses)
if (address.is_v4())
clog(NetNote) << "IP Address: " << address << " = " << (isPrivateAddress(address) ? "[LOCAL]" : "[PEER]");
clog(NetNote) << "Id:" << id().abridged();
if (_start)
start();
clog(NetNote) << "Id:" << id();
}
Host::~Host()
@ -114,10 +122,10 @@ void Host::doneWorking()
for (unsigned n = 0;; n = 0)
{
{
RecursiveGuard l(x_peers);
for (auto i: m_peers)
RecursiveGuard l(x_sessions);
for (auto i: m_sessions)
if (auto p = i.second.lock())
if (p->isOpen())
if (p->isConnected())
{
p->disconnect(ClientQuit);
n++;
@ -137,27 +145,27 @@ void Host::doneWorking()
m_ioService.reset();
// finally, clear out peers (in case they're lingering)
RecursiveGuard l(x_peers);
m_peers.clear();
RecursiveGuard l(x_sessions);
m_sessions.clear();
}
unsigned Host::protocolVersion() const
{
return 2;
return 3;
}
void Host::registerPeer(std::shared_ptr<Session> _s, CapDescs const& _caps)
{
if (!_s->m_node || !_s->m_node->id)
{
cwarn << "Attempting to register a peer without node information!";
return;
}
{
RecursiveGuard l(x_peers);
m_peers[_s->m_node->id] = _s;
clog(NetNote) << "p2p.host.peer.register" << _s->m_peer->id.abridged();
RecursiveGuard l(x_sessions);
// TODO: temporary loose-coupling; if m_peers already has peer,
// it is same as _s->m_peer. (fixing next PR)
if (!m_peers.count(_s->m_peer->id))
m_peers[_s->m_peer->id] = _s->m_peer;
m_sessions[_s->m_peer->id] = _s;
}
unsigned o = (unsigned)UserPacket;
for (auto const& i: _caps)
if (haveCapability(i))
@ -167,94 +175,67 @@ void Host::registerPeer(std::shared_ptr<Session> _s, CapDescs const& _caps)
}
}
void Host::seal(bytes& _b)
void Host::onNodeTableEvent(NodeId const& _n, NodeTableEventType const& _e)
{
_b[0] = 0x22;
_b[1] = 0x40;
_b[2] = 0x08;
_b[3] = 0x91;
uint32_t len = (uint32_t)_b.size() - 8;
_b[4] = (len >> 24) & 0xff;
_b[5] = (len >> 16) & 0xff;
_b[6] = (len >> 8) & 0xff;
_b[7] = len & 0xff;
}
shared_ptr<Node> Host::noteNode(NodeId _id, bi::tcp::endpoint _a, Origin _o, bool _ready, NodeId _oldId)
{
RecursiveGuard l(x_peers);
if (_a.port() < 30300 || _a.port() > 30305)
cwarn << "Non-standard port being recorded: " << _a.port();
if (_a.port() >= /*49152*/32768)
if (_e == NodeEntryAdded)
{
cwarn << "Private port being recorded - setting to 0";
_a = bi::tcp::endpoint(_a.address(), 0);
}
// cnote << "Node:" << _id.abridged() << _a << (_ready ? "ready" : "used") << _oldId.abridged() << (m_nodes.count(_id) ? "[have]" : "[NEW]");
// First check for another node with the same connection credentials, and put it in oldId if found.
if (!_oldId)
for (pair<h512, shared_ptr<Node>> const& n: m_nodes)
if (n.second->address == _a && n.second->id != _id)
clog(NetNote) << "p2p.host.nodeTable.events.nodeEntryAdded " << _n;
auto n = m_nodeTable->node(_n);
if (n)
{
shared_ptr<Peer> p;
{
_oldId = n.second->id;
break;
RecursiveGuard l(x_sessions);
if (m_peers.count(_n))
p = m_peers[_n];
else
{
// TODO p2p: construct peer from node
p.reset(new Peer());
p->id = _n;
p->endpoint = NodeIPEndpoint(n.endpoint.udp, n.endpoint.tcp);
p->required = n.required;
m_peers[_n] = p;
clog(NetNote) << "p2p.host.peers.events.peersAdded " << _n << p->endpoint.tcp.address() << p->endpoint.udp.address();
}
p->endpoint.tcp = n.endpoint.tcp;
}
unsigned i;
if (!m_nodes.count(_id))
{
if (m_nodes.count(_oldId))
{
i = m_nodes[_oldId]->index;
m_nodes.erase(_oldId);
m_nodesList[i] = _id;
}
else
{
i = m_nodesList.size();
m_nodesList.push_back(_id);
// TODO: Implement similar to discover. Attempt connecting to nodes
// until ideal peer count is reached; if all nodes are tried,
// repeat. Notably, this is an integrated process such that
// when onNodeTableEvent occurs we should also update +/-
// the list of nodes to be tried. Thus:
// 1) externalize connection attempts
// 2) attempt copies potentialPeer list
// 3) replace this logic w/maintenance of potentialPeers
if (peerCount() < m_idealPeerCount)
connect(p);
}
m_nodes[_id] = make_shared<Node>();
m_nodes[_id]->id = _id;
m_nodes[_id]->index = i;
m_nodes[_id]->idOrigin = _o;
}
else
else if (_e == NodeEntryRemoved)
{
i = m_nodes[_id]->index;
m_nodes[_id]->idOrigin = max(m_nodes[_id]->idOrigin, _o);
clog(NetNote) << "p2p.host.nodeTable.events.nodeEntryRemoved " << _n;
RecursiveGuard l(x_sessions);
m_peers.erase(_n);
}
m_nodes[_id]->address = _a;
m_ready.extendAll(i);
m_private.extendAll(i);
if (_ready)
m_ready += i;
else
m_ready -= i;
if (!_a.port() || (isPrivateAddress(_a.address()) && !m_netPrefs.localNetworking))
m_private += i;
else
m_private -= i;
// cnote << m_nodes[_id]->index << ":" << m_ready;
m_hadNewNodes = true;
return m_nodes[_id];
}
Nodes Host::potentialPeers(RangeMask<unsigned> const& _known)
void Host::seal(bytes& _b)
{
RecursiveGuard l(x_peers);
Nodes ret;
auto ns = (m_netPrefs.localNetworking ? _known : (m_private + _known)).inverted();
for (auto i: ns)
ret.push_back(*m_nodes[m_nodesList[i]]);
return ret;
_b[0] = 0x22;
_b[1] = 0x40;
_b[2] = 0x08;
_b[3] = 0x91;
uint32_t len = (uint32_t)_b.size() - 8;
_b[4] = (len >> 24) & 0xff;
_b[5] = (len >> 16) & 0xff;
_b[6] = (len >> 8) & 0xff;
_b[7] = len & 0xff;
}
void Host::determinePublic(string const& _publicAddress, bool _upnp)
@ -328,21 +309,35 @@ void Host::runAcceptor()
{
clog(NetConnect) << "Listening on local port " << m_listenPort << " (public: " << m_tcpPublic << ")";
m_accepting = true;
m_socket.reset(new bi::tcp::socket(m_ioService));
m_tcp4Acceptor.async_accept(*m_socket, [=](boost::system::error_code ec)
// socket is created outside of acceptor-callback
// An allocated socket is necessary as asio can use the socket
// until the callback succeeds or fails.
//
// Until callback succeeds or fails, we can't dealloc it.
//
// Callback is guaranteed to be called via asio or when
// m_tcp4Acceptor->stop() is called by Host.
//
// All exceptions are caught so they don't halt asio and so the
// socket is deleted.
//
// It's possible for an accepted connection to return an error in which
// case the socket may be open and must be closed to prevent asio from
// processing socket events after socket is deallocated.
bi::tcp::socket *s = new bi::tcp::socket(m_ioService);
m_tcp4Acceptor.async_accept(*s, [=](boost::system::error_code ec)
{
// if no error code, doHandshake takes ownership
bool success = false;
if (!ec)
{
try
{
try {
clog(NetConnect) << "Accepted connection from " << m_socket->remote_endpoint();
} catch (...){}
bi::address remoteAddress = m_socket->remote_endpoint().address();
// Port defaults to 0 - we let the hello tell us which port the peer listens to
auto p = std::make_shared<Session>(this, std::move(*m_socket.release()), bi::tcp::endpoint(remoteAddress, 0));
p->start();
// doHandshake takes ownersihp of *s via std::move
// incoming connection; we don't yet know nodeid
doHandshake(s, NodeId());
success = true;
}
catch (Exception const& _e)
@ -355,20 +350,41 @@ void Host::runAcceptor()
}
}
if (!success && m_socket->is_open())
// asio doesn't close socket on error
if (!success && s->is_open())
{
boost::system::error_code ec;
m_socket->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
m_socket->close();
s->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
s->close();
}
m_accepting = false;
delete s;
if (ec.value() < 1)
runAcceptor();
});
}
}
void Host::doHandshake(bi::tcp::socket* _socket, NodeId _nodeId)
{
try {
clog(NetConnect) << "Accepting connection for " << _socket->remote_endpoint();
} catch (...){}
shared_ptr<Peer> p;
if (_nodeId)
p = m_peers[_nodeId];
if (!p)
p.reset(new Peer());
p->endpoint.tcp.address(_socket->remote_endpoint().address());
auto ps = std::make_shared<Session>(this, std::move(*_socket), p);
ps->start();
}
string Host::pocHost()
{
vector<string> strs;
@ -376,237 +392,130 @@ string Host::pocHost()
return "poc-" + strs[1] + ".ethdev.com";
}
void Host::connect(std::string const& _addr, unsigned short _port) noexcept
void Host::addNode(NodeId const& _node, std::string const& _addr, unsigned short _tcpPeerPort, unsigned short _udpNodePort)
{
// TODO: p2p clean this up (bring tested acceptor code over from network branch)
while (isWorking() && !m_run)
this_thread::sleep_for(chrono::milliseconds(50));
if (!m_run)
return;
for (auto first: {true, false})
if (_tcpPeerPort < 30300 || _tcpPeerPort > 30305)
cwarn << "Non-standard port being recorded: " << _tcpPeerPort;
if (_tcpPeerPort >= /*49152*/32768)
{
cwarn << "Private port being recorded - setting to 0";
_tcpPeerPort = 0;
}
boost::system::error_code ec;
bi::address addr = bi::address::from_string(_addr, ec);
if (ec)
{
try
bi::tcp::resolver *r = new bi::tcp::resolver(m_ioService);
r->async_resolve({_addr, toString(_tcpPeerPort)}, [=](boost::system::error_code const& _ec, bi::tcp::resolver::iterator _epIt)
{
if (first)
if (!_ec)
{
bi::tcp::resolver r(m_ioService);
connect(r.resolve({_addr, toString(_port)})->endpoint());
bi::tcp::endpoint tcp = *_epIt;
if (m_nodeTable) m_nodeTable->addNode(Node(_node, NodeIPEndpoint(bi::udp::endpoint(tcp.address(), _udpNodePort), tcp)));
}
else
connect(bi::tcp::endpoint(bi::address::from_string(_addr), _port));
break;
}
catch (Exception const& _e)
{
// Couldn't connect
clog(NetConnect) << "Bad host " << _addr << "\n" << diagnostic_information(_e);
}
catch (exception const& e)
{
// Couldn't connect
clog(NetConnect) << "Bad host " << _addr << " (" << e.what() << ")";
}
delete r;
});
}
else
if (m_nodeTable) m_nodeTable->addNode(Node(_node, NodeIPEndpoint(bi::udp::endpoint(addr, _udpNodePort), bi::tcp::endpoint(addr, _tcpPeerPort))));
}
void Host::connect(bi::tcp::endpoint const& _ep)
void Host::connect(std::shared_ptr<Peer> const& _p)
{
while (isWorking() && !m_run)
this_thread::sleep_for(chrono::milliseconds(50));
for (unsigned i = 0; i < 200; i++)
if (isWorking() && !m_run)
this_thread::sleep_for(chrono::milliseconds(50));
if (!m_run)
return;
clog(NetConnect) << "Attempting single-shot connection to " << _ep;
bi::tcp::socket* s = new bi::tcp::socket(m_ioService);
s->async_connect(_ep, [=](boost::system::error_code const& ec)
if (havePeerSession(_p->id))
{
if (ec)
clog(NetConnect) << "Connection refused to " << _ep << " (" << ec.message() << ")";
else
{
auto p = make_shared<Session>(this, std::move(*s), _ep);
clog(NetConnect) << "Connected to " << _ep;
p->start();
}
delete s;
});
}
void Host::connect(std::shared_ptr<Node> const& _n)
{
while (isWorking() && !m_run)
this_thread::sleep_for(chrono::milliseconds(50));
if (!m_run)
clog(NetWarn) << "Aborted connect. Node already connected.";
return;
}
if (!m_nodeTable->haveNode(_p->id))
{
clog(NetWarn) << "Aborted connect. Node not in node table.";
m_nodeTable->addNode(*_p.get());
return;
}
// prevent concurrently connecting to a node; todo: better abstraction
Node *nptr = _n.get();
// prevent concurrently connecting to a node
Peer *nptr = _p.get();
{
Guard l(x_pendingNodeConns);
if (m_pendingNodeConns.count(nptr))
if (m_pendingPeerConns.count(nptr))
return;
m_pendingNodeConns.insert(nptr);
m_pendingPeerConns.insert(nptr);
}
clog(NetConnect) << "Attempting connection to node" << _n->id.abridged() << "@" << _n->address << "from" << id().abridged();
_n->lastAttempted = std::chrono::system_clock::now();
_n->failedAttempts++;
m_ready -= _n->index;
clog(NetConnect) << "Attempting connection to node" << _p->id.abridged() << "@" << _p->peerEndpoint() << "from" << id().abridged();
bi::tcp::socket* s = new bi::tcp::socket(m_ioService);
auto n = node(_n->id);
if (n)
s->async_connect(_n->address, [=](boost::system::error_code const& ec)
{
if (ec)
{
clog(NetConnect) << "Connection refused to node" << _n->id.abridged() << "@" << _n->address << "(" << ec.message() << ")";
_n->lastDisconnect = TCPError;
_n->lastAttempted = std::chrono::system_clock::now();
m_ready += _n->index;
}
else
{
clog(NetConnect) << "Connected to" << _n->id.abridged() << "@" << _n->address;
_n->lastConnected = std::chrono::system_clock::now();
auto p = make_shared<Session>(this, std::move(*s), n, true); // true because we don't care about ids matched for now. Once we have permenant IDs this will matter a lot more and we can institute a safer mechanism.
p->start();
}
delete s;
Guard l(x_pendingNodeConns);
m_pendingNodeConns.erase(nptr);
});
else
clog(NetWarn) << "Trying to connect to node not in node table.";
}
bool Host::havePeer(NodeId _id) const
{
RecursiveGuard l(x_peers);
// Remove dead peers from list.
for (auto i = m_peers.begin(); i != m_peers.end();)
if (i->second.lock().get())
++i;
else
i = m_peers.erase(i);
return !!m_peers.count(_id);
}
unsigned Node::fallbackSeconds() const
{
switch (lastDisconnect)
{
case BadProtocol:
return 30 * (failedAttempts + 1);
case UselessPeer:
case TooManyPeers:
case ClientQuit:
return 15 * (failedAttempts + 1);
case NoDisconnect:
return 0;
default:
if (failedAttempts < 5)
return failedAttempts * 5;
else if (failedAttempts < 15)
return 25 + (failedAttempts - 5) * 10;
else
return 25 + 100 + (failedAttempts - 15) * 20;
}
}
bool Node::shouldReconnect() const
{
return chrono::system_clock::now() > lastAttempted + chrono::seconds(fallbackSeconds());
}
void Host::growPeers()
{
RecursiveGuard l(x_peers);
int morePeers = (int)m_idealPeerCount - m_peers.size();
if (morePeers > 0)
s->async_connect(_p->peerEndpoint(), [=](boost::system::error_code const& ec)
{
auto toTry = m_ready;
if (!m_netPrefs.localNetworking)
toTry -= m_private;
set<Node> ns;
for (auto i: toTry)
if (m_nodes[m_nodesList[i]]->shouldReconnect())
ns.insert(*m_nodes[m_nodesList[i]]);
if (ns.size())
for (Node const& i: ns)
{
connect(m_nodes[i.id]);
if (!--morePeers)
return;
}
else
for (auto const& i: m_peers)
if (auto p = i.second.lock())
p->ensureNodesRequested();
}
}
void Host::prunePeers()
{
RecursiveGuard l(x_peers);
// We'll keep at most twice as many as is ideal, halfing what counts as "too young to kill" until we get there.
set<NodeId> dc;
for (unsigned old = 15000; m_peers.size() - dc.size() > m_idealPeerCount * 2 && old > 100; old /= 2)
if (m_peers.size() - dc.size() > m_idealPeerCount)
if (ec)
{
// look for worst peer to kick off
// first work out how many are old enough to kick off.
shared_ptr<Session> worst;
unsigned agedPeers = 0;
for (auto i: m_peers)
if (!dc.count(i.first))
if (auto p = i.second.lock())
if (chrono::steady_clock::now() > p->m_connect + chrono::milliseconds(old)) // don't throw off new peers; peer-servers should never kick off other peer-servers.
{
++agedPeers;
if ((!worst || p->rating() < worst->rating() || (p->rating() == worst->rating() && p->m_connect > worst->m_connect))) // kill older ones
worst = p;
}
if (!worst || agedPeers <= m_idealPeerCount)
break;
dc.insert(worst->id());
worst->disconnect(TooManyPeers);
clog(NetConnect) << "Connection refused to node" << _p->id.abridged() << "@" << _p->peerEndpoint() << "(" << ec.message() << ")";
_p->m_lastDisconnect = TCPError;
_p->m_lastAttempted = std::chrono::system_clock::now();
}
// Remove dead peers from list.
for (auto i = m_peers.begin(); i != m_peers.end();)
if (i->second.lock().get())
++i;
else
i = m_peers.erase(i);
{
clog(NetConnect) << "Connected to" << _p->id.abridged() << "@" << _p->peerEndpoint();
_p->m_lastDisconnect = NoDisconnect;
_p->m_lastConnected = std::chrono::system_clock::now();
_p->m_failedAttempts = 0;
auto ps = make_shared<Session>(this, std::move(*s), _p);
ps->start();
}
delete s;
Guard l(x_pendingNodeConns);
m_pendingPeerConns.erase(nptr);
});
}
PeerInfos Host::peers(bool _updatePing) const
PeerSessionInfos Host::peerSessionInfo() const
{
if (!m_run)
return PeerInfos();
return PeerSessionInfos();
RecursiveGuard l(x_peers);
if (_updatePing)
{
const_cast<Host*>(this)->pingAll();
this_thread::sleep_for(chrono::milliseconds(200));
}
std::vector<PeerInfo> ret;
for (auto& i: m_peers)
std::vector<PeerSessionInfo> ret;
RecursiveGuard l(x_sessions);
for (auto& i: m_sessions)
if (auto j = i.second.lock())
if (j->m_socket.is_open())
if (j->isConnected())
ret.push_back(j->m_info);
return ret;
}
size_t Host::peerCount() const
{
unsigned retCount = 0;
RecursiveGuard l(x_sessions);
for (auto& i: m_sessions)
if (std::shared_ptr<Session> j = i.second.lock())
if (j->isConnected())
retCount++;
return retCount;
}
void Host::run(boost::system::error_code const&)
{
if (!m_run)
{
// reset NodeTable
m_nodeTable.reset();
// stopping io service allows running manual network operations for shutdown
// and also stops blocking worker thread, allowing worker thread to exit
m_ioService.stop();
@ -615,34 +524,31 @@ void Host::run(boost::system::error_code const&)
m_timer.reset();
return;
}
m_lastTick += c_timerInterval;
if (m_lastTick >= c_timerInterval * 10)
{
growPeers();
prunePeers();
m_lastTick = 0;
}
if (m_hadNewNodes)
{
for (auto p: m_peers)
if (auto pp = p.second.lock())
pp->serviceNodesRequest();
m_hadNewNodes = false;
}
m_nodeTable->processEvents();
if (chrono::steady_clock::now() - m_lastPing > chrono::seconds(30)) // ping every 30s.
{
for (auto p: m_sessions)
if (auto pp = p.second.lock())
pp->serviceNodesRequest();
keepAlivePeers();
disconnectLatePeers();
auto c = peerCount();
if (m_idealPeerCount && !c)
for (auto p: m_peers)
if (auto pp = p.second.lock())
if (chrono::steady_clock::now() - pp->m_lastReceived > chrono::seconds(60))
pp->disconnect(PingTimeout);
pingAll();
}
if (p.second->shouldReconnect())
{
// TODO p2p: fixme
p.second->m_lastAttempted = std::chrono::system_clock::now();
connect(p.second);
break;
}
auto runcb = [this](boost::system::error_code const& error) -> void { run(error); };
if (c < m_idealPeerCount)
m_nodeTable->discover();
auto runcb = [this](boost::system::error_code const& error) { run(error); };
m_timer->expires_from_now(boost::posix_time::milliseconds(c_timerInterval));
m_timer->async_wait(runcb);
}
@ -677,13 +583,15 @@ void Host::startedWorking()
if (m_listenPort > 0)
runAcceptor();
}
else
clog(NetNote) << "p2p.start.notice id:" << id().abridged() << "Listen port is invalid or unavailable. Node Table using default port (30303).";
// if m_public address is valid then add us to node list
// todo: abstract empty() and emplace logic
if (!m_tcpPublic.address().is_unspecified() && (m_nodes.empty() || m_nodes[m_nodesList[0]]->id != id()))
noteNode(id(), m_tcpPublic, Origin::Perfect, false);
// TODO: add m_tcpPublic endpoint; sort out endpoint stuff for nodetable
m_nodeTable.reset(new NodeTable(m_ioService, m_alias, m_listenPort > 0 ? m_listenPort : 30303));
m_nodeTable->setEventHandler(new HostNodeTableHandler(*this));
restoreNetwork(&m_restoreNetwork);
clog(NetNote) << "Id:" << id().abridged();
clog(NetNote) << "p2p.started id:" << id().abridged();
run(boost::system::error_code());
}
@ -694,94 +602,143 @@ void Host::doWork()
m_ioService.run();
}
void Host::pingAll()
void Host::keepAlivePeers()
{
RecursiveGuard l(x_peers);
for (auto& i: m_peers)
if (auto j = i.second.lock())
j->ping();
if (chrono::steady_clock::now() - c_keepAliveInterval < m_lastPing)
return;
RecursiveGuard l(x_sessions);
for (auto p: m_sessions)
if (auto pp = p.second.lock())
pp->ping();
m_lastPing = chrono::steady_clock::now();
}
bytes Host::saveNodes() const
void Host::disconnectLatePeers()
{
RLPStream nodes;
auto now = chrono::steady_clock::now();
if (now - c_keepAliveTimeOut < m_lastPing)
return;
RecursiveGuard l(x_sessions);
for (auto p: m_sessions)
if (auto pp = p.second.lock())
if (now - c_keepAliveTimeOut > m_lastPing && pp->m_lastReceived < m_lastPing)
pp->disconnect(PingTimeout);
}
bytes Host::saveNetwork() const
{
std::list<Peer> peers;
{
RecursiveGuard l(x_sessions);
for (auto p: m_peers)
if (p.second)
peers.push_back(*p.second);
}
peers.sort();
RLPStream network;
int count = 0;
{
RecursiveGuard l(x_peers);
for (auto const& i: m_nodes)
RecursiveGuard l(x_sessions);
for (auto const& p: peers)
{
Node const& n = *(i.second);
// TODO: PoC-7: Figure out why it ever shares these ports.//n.address.port() >= 30300 && n.address.port() <= 30305 &&
if (!n.dead && chrono::system_clock::now() - n.lastConnected < chrono::seconds(3600 * 48) && n.address.port() > 0 && n.address.port() < /*49152*/32768 && n.id != id() && !isPrivateAddress(n.address.address()))
// TODO: alpha: Figure out why it ever shares these ports.//p.address.port() >= 30300 && p.address.port() <= 30305 &&
// TODO: alpha: if/how to save private addresses
// Only save peers which have connected within 2 days, with properly-advertised port and public IP address
if (chrono::system_clock::now() - p.m_lastConnected < chrono::seconds(3600 * 48) && p.peerEndpoint().port() > 0 && p.peerEndpoint().port() < /*49152*/32768 && p.id != id() && !isPrivateAddress(p.peerEndpoint().address()))
{
nodes.appendList(10);
if (n.address.address().is_v4())
nodes << n.address.address().to_v4().to_bytes();
network.appendList(10);
if (p.peerEndpoint().address().is_v4())
network << p.peerEndpoint().address().to_v4().to_bytes();
else
nodes << n.address.address().to_v6().to_bytes();
nodes << n.address.port() << n.id << (int)n.idOrigin
<< chrono::duration_cast<chrono::seconds>(n.lastConnected.time_since_epoch()).count()
<< chrono::duration_cast<chrono::seconds>(n.lastAttempted.time_since_epoch()).count()
<< n.failedAttempts << (unsigned)n.lastDisconnect << n.score << n.rating;
network << p.peerEndpoint().address().to_v6().to_bytes();
// TODO: alpha: replace 0 with trust-state of node
network << p.peerEndpoint().port() << p.id << 0
<< chrono::duration_cast<chrono::seconds>(p.m_lastConnected.time_since_epoch()).count()
<< chrono::duration_cast<chrono::seconds>(p.m_lastAttempted.time_since_epoch()).count()
<< p.m_failedAttempts << (unsigned)p.m_lastDisconnect << p.m_score << p.m_rating;
count++;
}
}
}
auto state = m_nodeTable->snapshot();
state.sort();
for (auto const& s: state)
{
network.appendList(3);
if (s.endpoint.tcp.address().is_v4())
network << s.endpoint.tcp.address().to_v4().to_bytes();
else
network << s.endpoint.tcp.address().to_v6().to_bytes();
network << s.endpoint.tcp.port() << s.id;
count++;
}
RLPStream ret(3);
ret << 0 << m_key.secret();
ret.appendList(count).appendRaw(nodes.out(), count);
ret << 1 << m_alias.secret();
ret.appendList(count).appendRaw(network.out(), count);
return ret.out();
}
void Host::restoreNodes(bytesConstRef _b)
void Host::restoreNetwork(bytesConstRef _b)
{
RecursiveGuard l(x_peers);
// nodes can only be added if network is added
if (!isStarted())
BOOST_THROW_EXCEPTION(NetworkStartRequired());
RecursiveGuard l(x_sessions);
RLP r(_b);
if (r.itemCount() > 0 && r[0].isInt())
switch (r[0].toInt<int>())
{
case 0:
{
auto oldId = id();
m_key = KeyPair(r[1].toHash<Secret>());
noteNode(id(), m_tcpPublic, Origin::Perfect, false, oldId);
if (r.itemCount() > 0 && r[0].isInt() && r[0].toInt<int>() == 1)
{
// r[0] = version
// r[1] = key
// r[2] = nodes
for (auto i: r[2])
for (auto i: r[2])
{
bi::tcp::endpoint tcp;
bi::udp::endpoint udp;
if (i[0].itemCount() == 4)
{
bi::tcp::endpoint ep;
if (i[0].itemCount() == 4)
ep = bi::tcp::endpoint(bi::address_v4(i[0].toArray<byte, 4>()), i[1].toInt<short>());
else
ep = bi::tcp::endpoint(bi::address_v6(i[0].toArray<byte, 16>()), i[1].toInt<short>());
auto id = (NodeId)i[2];
if (!m_nodes.count(id))
{
auto o = (Origin)i[3].toInt<int>();
auto n = noteNode(id, ep, o, true);
n->lastConnected = chrono::system_clock::time_point(chrono::seconds(i[4].toInt<unsigned>()));
n->lastAttempted = chrono::system_clock::time_point(chrono::seconds(i[5].toInt<unsigned>()));
n->failedAttempts = i[6].toInt<unsigned>();
n->lastDisconnect = (DisconnectReason)i[7].toInt<unsigned>();
n->score = (int)i[8].toInt<unsigned>();
n->rating = (int)i[9].toInt<unsigned>();
}
tcp = bi::tcp::endpoint(bi::address_v4(i[0].toArray<byte, 4>()), i[1].toInt<short>());
udp = bi::udp::endpoint(bi::address_v4(i[0].toArray<byte, 4>()), i[1].toInt<short>());
}
else
{
tcp = bi::tcp::endpoint(bi::address_v6(i[0].toArray<byte, 16>()), i[1].toInt<short>());
udp = bi::udp::endpoint(bi::address_v6(i[0].toArray<byte, 16>()), i[1].toInt<short>());
}
}
default:;
}
else
for (auto i: r)
{
auto id = (NodeId)i[2];
if (!m_nodes.count(id))
if (i.itemCount() == 3)
m_nodeTable->addNode(id, udp, tcp);
else if (i.itemCount() == 10)
{
bi::tcp::endpoint ep;
if (i[0].itemCount() == 4)
ep = bi::tcp::endpoint(bi::address_v4(i[0].toArray<byte, 4>()), i[1].toInt<short>());
else
ep = bi::tcp::endpoint(bi::address_v6(i[0].toArray<byte, 16>()), i[1].toInt<short>());
auto n = noteNode(id, ep, Origin::Self, true);
shared_ptr<Peer> p = make_shared<Peer>();
p->id = id;
p->m_lastConnected = chrono::system_clock::time_point(chrono::seconds(i[4].toInt<unsigned>()));
p->m_lastAttempted = chrono::system_clock::time_point(chrono::seconds(i[5].toInt<unsigned>()));
p->m_failedAttempts = i[6].toInt<unsigned>();
p->m_lastDisconnect = (DisconnectReason)i[7].toInt<unsigned>();
p->m_score = (int)i[8].toInt<unsigned>();
p->m_rating = (int)i[9].toInt<unsigned>();
p->endpoint.tcp = tcp;
p->endpoint.udp = udp;
m_peers[p->id] = p;
m_nodeTable->addNode(*p.get());
}
}
}
}
KeyPair Host::networkAlias(bytesConstRef _b)
{
RLP r(_b);
if (r.itemCount() == 3 && r[0].isInt() && r[0].toInt<int>() == 1)
return move(KeyPair(move(Secret(r[1].toBytes()))));
else
return move(KeyPair::create());
}

192
libp2p/Host.h

@ -34,8 +34,10 @@
#include <libdevcore/Worker.h>
#include <libdevcore/RangeMask.h>
#include <libdevcrypto/Common.h>
#include "NodeTable.h"
#include "HostCapability.h"
#include "Network.h"
#include "Peer.h"
#include "Common.h"
namespace ba = boost::asio;
namespace bi = ba::ip;
@ -43,83 +45,60 @@ namespace bi = ba::ip;
namespace dev
{
class RLPStream;
namespace p2p
{
class Host;
enum class Origin
class HostNodeTableHandler: public NodeTableEventHandler
{
Unknown,
Self,
SelfThird,
PerfectThird,
Perfect,
};
public:
HostNodeTableHandler(Host& _host);
struct Node
{
NodeId id; ///< Their id/public key.
unsigned index; ///< Index into m_nodesList
bi::tcp::endpoint address; ///< As reported from the node itself.
int score = 0; ///< All time cumulative.
int rating = 0; ///< Trending.
bool dead = false; ///< If true, we believe this node is permanently dead - forget all about it.
std::chrono::system_clock::time_point lastConnected;
std::chrono::system_clock::time_point lastAttempted;
unsigned failedAttempts = 0;
DisconnectReason lastDisconnect = NoDisconnect; ///< Reason for disconnect that happened last.
Origin idOrigin = Origin::Unknown; ///< How did we get to know this node's id?
int secondsSinceLastConnected() const { return lastConnected == std::chrono::system_clock::time_point() ? -1 : (int)std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now() - lastConnected).count(); }
int secondsSinceLastAttempted() const { return lastAttempted == std::chrono::system_clock::time_point() ? -1 : (int)std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now() - lastAttempted).count(); }
unsigned fallbackSeconds() const;
bool shouldReconnect() const;
bool isOffline() const { return lastAttempted > lastConnected; }
bool operator<(Node const& _n) const
{
if (isOffline() != _n.isOffline())
return isOffline();
else if (isOffline())
if (lastAttempted == _n.lastAttempted)
return failedAttempts < _n.failedAttempts;
else
return lastAttempted < _n.lastAttempted;
else
if (score == _n.score)
if (rating == _n.rating)
return failedAttempts < _n.failedAttempts;
else
return rating < _n.rating;
else
return score < _n.score;
}
};
Host const& host() const { return m_host; }
private:
virtual void processEvent(NodeId const& _n, NodeTableEventType const& _e);
using Nodes = std::vector<Node>;
Host& m_host;
};
/**
* @brief The Host class
* Capabilities should be registered prior to startNetwork, since m_capabilities is not thread-safe.
*
* @todo exceptions when nodeTable not set (prior to start)
* @todo onNodeTableEvent: move peer-connection logic into ensurePeers
* @todo handshake: gracefully disconnect peer if peer already connected
* @todo abstract socket -> IPConnection
* @todo determinePublic: ipv6, udp
* @todo handle conflict if addNode/requireNode called and Node already exists w/conflicting tcp or udp port
* @todo write host identifier to disk w/nodes
* @todo per-session keepalive/ping instead of broadcast; set ping-timeout via median-latency
* @todo configuration-management (NetworkPrefs+Keys+Topology)
*/
class Host: public Worker
{
friend class HostNodeTableHandler;
friend class Session;
friend class HostCapabilityFace;
friend struct Node;
public:
/// Start server, listening for connections on the given port.
Host(std::string const& _clientVersion, NetworkPreferences const& _n = NetworkPreferences(), bool _start = false);
Host(std::string const& _clientVersion, NetworkPreferences const& _n = NetworkPreferences(), bytesConstRef _restoreNetwork = bytesConstRef());
/// Will block on network process events.
virtual ~Host();
/// Interval at which Host::run will call keepAlivePeers to ping peers.
std::chrono::seconds const c_keepAliveInterval = std::chrono::seconds(30);
/// Disconnect timeout after failure to respond to keepAlivePeers ping.
std::chrono::milliseconds const c_keepAliveTimeOut = std::chrono::milliseconds(1000);
/// Default host for current version of client.
static std::string pocHost();
/// Basic peer network protocol version.
unsigned protocolVersion() const;
@ -129,38 +108,31 @@ public:
bool haveCapability(CapDesc const& _name) const { return m_capabilities.count(_name) != 0; }
CapDescs caps() const { CapDescs ret; for (auto const& i: m_capabilities) ret.push_back(i.first); return ret; }
template <class T> std::shared_ptr<T> cap() const { try { return std::static_pointer_cast<T>(m_capabilities.at(std::make_pair(T::staticName(), T::staticVersion()))); } catch (...) { return nullptr; } }
/// Connect to a peer explicitly.
static std::string pocHost();
void connect(std::string const& _addr, unsigned short _port = 30303) noexcept;
void connect(bi::tcp::endpoint const& _ep);
void connect(std::shared_ptr<Node> const& _n);
/// @returns true iff we have a peer of the given id.
bool havePeer(NodeId _id) const;
bool havePeerSession(NodeId _id) { RecursiveGuard l(x_sessions); return m_sessions.count(_id) ? !!m_sessions[_id].lock() : false; }
void addNode(NodeId const& _node, std::string const& _addr, unsigned short _tcpPort, unsigned short _udpPort);
/// Set ideal number of peers.
void setIdealPeerCount(unsigned _n) { m_idealPeerCount = _n; }
/// Get peer information.
PeerInfos peers(bool _updatePing = false) const;
/// Get number of peers connected; equivalent to, but faster than, peers().size().
size_t peerCount() const { RecursiveGuard l(x_peers); return m_peers.size(); }
/// Ping the peers, to update the latency information.
void pingAll();
PeerSessionInfos peerSessionInfo() const;
/// Get number of peers connected.
size_t peerCount() const;
/// Get the address we're listening on currently.
std::string listenAddress() const { return m_tcpPublic.address().to_string(); }
/// Get the port we're listening on currently.
unsigned short listenPort() const { return m_tcpPublic.port(); }
/// Serialise the set of known peers.
bytes saveNodes() const;
bytes saveNetwork() const;
/// Deserialise the data and populate the set of known peers.
void restoreNodes(bytesConstRef _b);
Nodes nodes() const { RecursiveGuard l(x_peers); Nodes ret; for (auto const& i: m_nodes) ret.push_back(*i.second); return ret; }
// TODO: P2P this should be combined with peers into a HostStat object of some kind; coalesce data, as it's only used for status information.
Peers getPeers() const { RecursiveGuard l(x_sessions); Peers ret; for (auto const& i: m_peers) ret.push_back(*i.second); return ret; }
void setNetworkPreferences(NetworkPreferences const& _p) { auto had = isStarted(); if (had) stop(); m_netPrefs = _p; if (had) start(); }
@ -174,24 +146,36 @@ public:
/// @returns if network is running.
bool isStarted() const { return m_run; }
NodeId id() const { return m_key.pub(); }
NodeId id() const { return m_alias.pub(); }
void registerPeer(std::shared_ptr<Session> _s, CapDescs const& _caps);
std::shared_ptr<Node> node(NodeId _id) const { if (m_nodes.count(_id)) return m_nodes.at(_id); return std::shared_ptr<Node>(); }
protected:
void onNodeTableEvent(NodeId const& _n, NodeTableEventType const& _e);
/// Deserialise the data and populate the set of known peers.
void restoreNetwork(bytesConstRef _b);
private:
/// Populate m_peerAddresses with available public addresses.
void determinePublic(std::string const& _publicAddress, bool _upnp);
void connect(std::shared_ptr<Peer> const& _p);
/// Ping the peers to update the latency information and disconnect peers which have timed out.
void keepAlivePeers();
/// Disconnect peers which didn't respond to keepAlivePeers ping prior to c_keepAliveTimeOut.
void disconnectLatePeers();
/// Called only from startedWorking().
void runAcceptor();
/// Handler for verifying handshake siganture before creating session. _nodeId is passed for outbound connections. If successful, socket is moved to Session via std::move.
void doHandshake(bi::tcp::socket* _socket, NodeId _nodeId = NodeId());
void seal(bytes& _b);
void growPeers();
void prunePeers();
/// Called by Worker. Not thread-safe; to be called only by worker.
virtual void startedWorking();
/// Called by startedWorking. Not thread-safe; to be called only be Worker.
@ -203,64 +187,54 @@ private:
/// Shutdown network. Not thread-safe; to be called only by worker.
virtual void doneWorking();
std::shared_ptr<Node> noteNode(NodeId _id, bi::tcp::endpoint _a, Origin _o, bool _ready, NodeId _oldId = NodeId());
Nodes potentialPeers(RangeMask<unsigned> const& _known);
/// Get or create host identifier (KeyPair).
static KeyPair networkAlias(bytesConstRef _b);
bytes m_restoreNetwork; ///< Set by constructor and used to set Host key and restore network peers & nodes.
bool m_run = false; ///< Whether network is running.
std::mutex x_runTimer; ///< Start/stop mutex.
std::mutex x_runTimer; ///< Start/stop mutex.
std::string m_clientVersion; ///< Our version string.
NetworkPreferences m_netPrefs; ///< Network settings.
/// Interface addresses (private, public)
std::vector<bi::address> m_ifAddresses; ///< Interface addresses.
std::vector<bi::address> m_ifAddresses; ///< Interface addresses.
int m_listenPort = -1; ///< What port are we listening on. -1 means binding failed or acceptor hasn't been initialized.
ba::io_service m_ioService; ///< IOService for network stuff.
bi::tcp::acceptor m_tcp4Acceptor; ///< Listening acceptor.
std::unique_ptr<bi::tcp::socket> m_socket; ///< Listening socket.
ba::io_service m_ioService; ///< IOService for network stuff.
bi::tcp::acceptor m_tcp4Acceptor; ///< Listening acceptor.
std::unique_ptr<boost::asio::deadline_timer> m_timer; ///< Timer which, when network is running, calls scheduler() every c_timerInterval ms.
static const unsigned c_timerInterval = 100; ///< Interval which m_timer is run when network is connected.
unsigned m_lastTick = 0; ///< Used by run() for scheduling; must not be mutated outside of run().
std::set<Node*> m_pendingNodeConns; /// Used only by connect(Node&) to limit concurrently connecting to same node. See connect(shared_ptr<Node>const&).
std::set<Peer*> m_pendingPeerConns; /// Used only by connect(Peer&) to limit concurrently connecting to same node. See connect(shared_ptr<Peer>const&).
Mutex x_pendingNodeConns;
bi::tcp::endpoint m_tcpPublic; ///< Our public listening endpoint.
KeyPair m_key; ///< Our unique ID.
bool m_hadNewNodes = false;
KeyPair m_alias; ///< Alias for network communication. Network address is k*G. k is key material. TODO: Replace KeyPair.
std::shared_ptr<NodeTable> m_nodeTable; ///< Node table (uses kademlia-like discovery).
mutable RecursiveMutex x_peers;
/// The nodes to which we are currently connected.
/// Shared storage of Peer objects. Peers are created or destroyed on demand by the Host. Active sessions maintain a shared_ptr to a Peer;
std::map<NodeId, std::shared_ptr<Peer>> m_peers;
/// The nodes to which we are currently connected. Used by host to service peer requests and keepAlivePeers and for shutdown. (see run())
/// Mutable because we flush zombie entries (null-weakptrs) as regular maintenance from a const method.
mutable std::map<NodeId, std::weak_ptr<Session>> m_peers;
/// Nodes to which we may connect (or to which we have connected).
/// TODO: does this need a lock?
std::map<NodeId, std::shared_ptr<Node> > m_nodes;
/// A list of node IDs. This contains every index from m_nodes; the order is guaranteed to remain the same.
std::vector<NodeId> m_nodesList;
RangeMask<unsigned> m_ready; ///< Indices into m_nodesList over to which nodes we are not currently connected, connecting or otherwise ignoring.
RangeMask<unsigned> m_private; ///< Indices into m_nodesList over to which nodes are private.
mutable std::map<NodeId, std::weak_ptr<Session>> m_sessions;
mutable RecursiveMutex x_sessions;
unsigned m_idealPeerCount = 5; ///< Ideal number of peers to be connected to.
std::set<bi::address> m_peerAddresses; ///< Public addresses that peers (can) know us by.
// Our capabilities.
std::map<CapDesc, std::shared_ptr<HostCapabilityFace>> m_capabilities; ///< Each of the capabilities we support.
std::chrono::steady_clock::time_point m_lastPing; ///< Time we sent the last ping to all peers.
bool m_accepting = false;
};
}
}

14
libp2p/HostCapability.cpp

@ -32,13 +32,13 @@ void HostCapabilityFace::seal(bytes& _b)
m_host->seal(_b);
}
std::vector<std::shared_ptr<Session> > HostCapabilityFace::peers() const
std::vector<std::pair<std::shared_ptr<Session>,std::shared_ptr<Peer>>> HostCapabilityFace::peerSessions() const
{
RecursiveGuard l(m_host->x_peers);
std::vector<std::shared_ptr<Session> > ret;
for (auto const& i: m_host->m_peers)
if (std::shared_ptr<Session> p = i.second.lock())
if (p->m_capabilities.count(capDesc()))
ret.push_back(p);
RecursiveGuard l(m_host->x_sessions);
std::vector<std::pair<std::shared_ptr<Session>,std::shared_ptr<Peer>>> ret;
for (auto const& i: m_host->m_sessions)
if (std::shared_ptr<Session> s = i.second.lock())
if (s->m_capabilities.count(capDesc()))
ret.push_back(make_pair(s,s->m_peer));
return ret;
}

3
libp2p/HostCapability.h

@ -23,6 +23,7 @@
#pragma once
#include "Peer.h"
#include "Common.h"
namespace dev
@ -44,7 +45,7 @@ public:
Host* host() const { return m_host; }
std::vector<std::shared_ptr<Session> > peers() const;
std::vector<std::pair<std::shared_ptr<Session>,std::shared_ptr<Peer>>> peerSessions() const;
protected:
virtual std::string name() const = 0;

339
libp2p/NodeTable.cpp

@ -24,12 +24,15 @@ using namespace std;
using namespace dev;
using namespace dev::p2p;
NodeTable::NodeTable(ba::io_service& _io, KeyPair _alias, uint16_t _listenPort):
NodeEntry::NodeEntry(Node _src, Public _pubk, NodeIPEndpoint _gw): Node(_pubk, _gw), distance(NodeTable::distance(_src.id,_pubk)) {}
NodeEntry::NodeEntry(Node _src, Public _pubk, bi::udp::endpoint _udp): Node(_pubk, NodeIPEndpoint(_udp)), distance(NodeTable::distance(_src.id,_pubk)) {}
NodeTable::NodeTable(ba::io_service& _io, KeyPair _alias, uint16_t _udp):
m_node(Node(_alias.pub(), bi::udp::endpoint())),
m_secret(_alias.sec()),
m_socket(new NodeSocket(_io, *this, _listenPort)),
m_socketPtr(m_socket.get()),
m_io(_io),
m_socket(new NodeSocket(m_io, *this, _udp)),
m_socketPointer(m_socket.get()),
m_bucketRefreshTimer(m_io),
m_evictionCheckTimer(m_io)
{
@ -39,22 +42,78 @@ NodeTable::NodeTable(ba::io_service& _io, KeyPair _alias, uint16_t _listenPort):
m_state[i].modified = chrono::steady_clock::now() - chrono::seconds(1);
}
m_socketPtr->connect();
m_socketPointer->connect();
doRefreshBuckets(boost::system::error_code());
}
NodeTable::~NodeTable()
{
// Cancel scheduled tasks to ensure.
m_evictionCheckTimer.cancel();
m_bucketRefreshTimer.cancel();
m_socketPtr->disconnect();
// Disconnect socket so that deallocation is safe.
m_socketPointer->disconnect();
}
void NodeTable::processEvents()
{
if (m_nodeEventHandler)
m_nodeEventHandler->processEvents();
}
void NodeTable::join()
shared_ptr<NodeEntry> NodeTable::addNode(Public const& _pubk, bi::udp::endpoint const& _udp, bi::tcp::endpoint const& _tcp)
{
doFindNode(m_node.id);
auto node = Node(_pubk, NodeIPEndpoint(_udp, _tcp));
return addNode(node);
}
shared_ptr<NodeEntry> NodeTable::addNode(Node const& _node)
{
// ping address if nodeid is empty
if (!_node.id)
{
PingNode p(_node.endpoint.udp, m_node.endpoint.udp.address().to_string(), m_node.endpoint.udp.port());
p.sign(m_secret);
m_socketPointer->send(p);
shared_ptr<NodeEntry> n;
return move(n);
}
Guard l(x_nodes);
if (m_nodes.count(_node.id))
{
// // SECURITY: remove this in beta - it's only for lazy connections and presents an easy attack vector.
// if (m_server->m_peers.count(id) && isPrivateAddress(m_server->m_peers.at(id)->address.address()) && ep.port() != 0)
// // Update address if the node if we now have a public IP for it.
// m_server->m_peers[id]->address = ep;
return m_nodes[_node.id];
}
shared_ptr<NodeEntry> ret(new NodeEntry(m_node, _node.id, NodeIPEndpoint(_node.endpoint.udp, _node.endpoint.tcp)));
m_nodes[_node.id] = ret;
PingNode p(_node.endpoint.udp, m_node.endpoint.udp.address().to_string(), m_node.endpoint.udp.port());
p.sign(m_secret);
m_socketPointer->send(p);
// TODO p2p: rename to p2p.nodes.pending, add p2p.nodes.add event (when pong is received)
clog(NodeTableNote) << "p2p.nodes.add " << _node.id.abridged();
if (m_nodeEventHandler)
m_nodeEventHandler->appendEvent(_node.id, NodeEntryAdded);
return ret;
}
void NodeTable::discover()
{
static chrono::steady_clock::time_point s_lastDiscover = chrono::steady_clock::now() - std::chrono::seconds(30);
if (chrono::steady_clock::now() > s_lastDiscover + std::chrono::seconds(30))
{
s_lastDiscover = chrono::steady_clock::now();
discover(m_node.id);
}
}
list<NodeId> NodeTable::nodes() const
{
list<NodeId> nodes;
@ -64,7 +123,7 @@ list<NodeId> NodeTable::nodes() const
return move(nodes);
}
list<NodeTable::NodeEntry> NodeTable::state() const
list<NodeEntry> NodeTable::snapshot() const
{
list<NodeEntry> ret;
Guard l(x_state);
@ -74,34 +133,40 @@ list<NodeTable::NodeEntry> NodeTable::state() const
return move(ret);
}
NodeTable::NodeEntry NodeTable::operator[](NodeId _id)
Node NodeTable::node(NodeId const& _id)
{
// TODO p2p: eloquent copy operator
Guard l(x_nodes);
return *m_nodes[_id];
if (m_nodes.count(_id))
{
auto entry = m_nodes[_id];
Node n(_id, NodeIPEndpoint(entry->endpoint.udp, entry->endpoint.tcp), entry->required);
return move(n);
}
return move(Node());
}
void NodeTable::requestNeighbours(NodeEntry const& _node, NodeId _target) const
shared_ptr<NodeEntry> NodeTable::nodeEntry(NodeId _id)
{
FindNode p(_node.endpoint.udp, _target);
p.sign(m_secret);
m_socketPtr->send(p);
Guard l(x_nodes);
return m_nodes.count(_id) ? move(m_nodes[_id]) : move(shared_ptr<NodeEntry>());
}
void NodeTable::doFindNode(NodeId _node, unsigned _round, shared_ptr<set<shared_ptr<NodeEntry>>> _tried)
void NodeTable::discover(NodeId _node, unsigned _round, shared_ptr<set<shared_ptr<NodeEntry>>> _tried)
{
if (!m_socketPtr->isOpen() || _round == s_maxSteps)
if (!m_socketPointer->isOpen() || _round == s_maxSteps)
return;
if (_round == s_maxSteps)
{
clog(NodeTableNote) << "Terminating doFindNode after " << _round << " rounds.";
clog(NodeTableNote) << "Terminating discover after " << _round << " rounds.";
return;
}
else if(!_round && !_tried)
// initialized _tried on first round
_tried.reset(new set<shared_ptr<NodeEntry>>());
auto nearest = findNearest(_node);
auto nearest = nearestNodeEntries(_node);
list<shared_ptr<NodeEntry>> tried;
for (unsigned i = 0; i < nearest.size() && tried.size() < s_alpha; i++)
if (!_tried->count(nearest[i]))
@ -110,12 +175,12 @@ void NodeTable::doFindNode(NodeId _node, unsigned _round, shared_ptr<set<shared_
tried.push_back(r);
FindNode p(r->endpoint.udp, _node);
p.sign(m_secret);
m_socketPtr->send(p);
m_socketPointer->send(p);
}
if (tried.empty())
{
clog(NodeTableNote) << "Terminating doFindNode after " << _round << " rounds.";
clog(NodeTableNote) << "Terminating discover after " << _round << " rounds.";
return;
}
@ -131,15 +196,15 @@ void NodeTable::doFindNode(NodeId _node, unsigned _round, shared_ptr<set<shared_
{
if (_ec)
return;
doFindNode(_node, _round + 1, _tried);
discover(_node, _round + 1, _tried);
});
}
vector<shared_ptr<NodeTable::NodeEntry>> NodeTable::findNearest(NodeId _target)
vector<shared_ptr<NodeEntry>> NodeTable::nearestNodeEntries(NodeId _target)
{
// send s_alpha FindNode packets to nodes we know, closest to target
static unsigned lastBin = s_bins - 1;
unsigned head = dist(m_node.id, _target);
unsigned head = distance(m_node.id, _target);
unsigned tail = head == 0 ? lastBin : (head - 1) % s_bins;
map<unsigned, list<shared_ptr<NodeEntry>>> found;
@ -154,7 +219,7 @@ vector<shared_ptr<NodeTable::NodeEntry>> NodeTable::findNearest(NodeId _target)
if (auto p = n.lock())
{
if (count < s_bucketSize)
found[dist(_target, p->id)].push_back(p);
found[distance(_target, p->id)].push_back(p);
else
break;
}
@ -164,7 +229,7 @@ vector<shared_ptr<NodeTable::NodeEntry>> NodeTable::findNearest(NodeId _target)
if (auto p = n.lock())
{
if (count < s_bucketSize)
found[dist(_target, p->id)].push_back(p);
found[distance(_target, p->id)].push_back(p);
else
break;
}
@ -181,7 +246,7 @@ vector<shared_ptr<NodeTable::NodeEntry>> NodeTable::findNearest(NodeId _target)
if (auto p = n.lock())
{
if (count < s_bucketSize)
found[dist(_target, p->id)].push_back(p);
found[distance(_target, p->id)].push_back(p);
else
break;
}
@ -195,7 +260,7 @@ vector<shared_ptr<NodeTable::NodeEntry>> NodeTable::findNearest(NodeId _target)
if (auto p = n.lock())
{
if (count < s_bucketSize)
found[dist(_target, p->id)].push_back(p);
found[distance(_target, p->id)].push_back(p);
else
break;
}
@ -213,7 +278,7 @@ void NodeTable::ping(bi::udp::endpoint _to) const
{
PingNode p(_to, m_node.endpoint.udp.address().to_string(), m_node.endpoint.udp.port());
p.sign(m_secret);
m_socketPtr->send(p);
m_socketPointer->send(p);
}
void NodeTable::ping(NodeEntry* _n) const
@ -224,173 +289,186 @@ void NodeTable::ping(NodeEntry* _n) const
void NodeTable::evict(shared_ptr<NodeEntry> _leastSeen, shared_ptr<NodeEntry> _new)
{
if (!m_socketPtr->isOpen())
if (!m_socketPointer->isOpen())
return;
Guard l(x_evictions);
m_evictions.push_back(EvictionTimeout(make_pair(_leastSeen->id,chrono::steady_clock::now()), _new->id));
if (m_evictions.size() == 1)
doCheckEvictions(boost::system::error_code());
m_evictions.push_back(EvictionTimeout(make_pair(_leastSeen->id,chrono::steady_clock::now()), _new->id));
{
Guard l(x_evictions);
m_evictions.push_back(EvictionTimeout(make_pair(_leastSeen->id,chrono::steady_clock::now()), _new->id));
if (m_evictions.size() == 1)
doCheckEvictions(boost::system::error_code());
m_evictions.push_back(EvictionTimeout(make_pair(_leastSeen->id,chrono::steady_clock::now()), _new->id));
}
ping(_leastSeen.get());
}
void NodeTable::noteNode(Public const& _pubk, bi::udp::endpoint const& _endpoint)
void NodeTable::noteActiveNode(Public const& _pubk, bi::udp::endpoint const& _endpoint)
{
// Don't add ourself
if (_pubk == m_node.address())
return;
shared_ptr<NodeEntry> node;
{
Guard l(x_nodes);
auto n = m_nodes.find(_pubk);
if (n == m_nodes.end())
{
node.reset(new NodeEntry(m_node, _pubk, _endpoint));
m_nodes[_pubk] = node;
// clog(NodeTableMessageSummary) << "Adding node to cache: " << _pubk;
}
else
{
node = n->second;
// clog(NodeTableMessageSummary) << "Found node in cache: " << _pubk;
}
}
// todo: why is this necessary?
if (!!node)
noteNode(node);
}
clog(NodeTableNote) << "Noting active node:" << _pubk.abridged() << _endpoint.address().to_string() << ":" << _endpoint.port();
void NodeTable::noteNode(shared_ptr<NodeEntry> _n)
{
shared_ptr<NodeEntry> contested;
{
NodeBucket& s = bucket(_n.get());
Guard l(x_state);
s.nodes.remove_if([&_n](weak_ptr<NodeEntry> n)
{
if (n.lock() == _n)
return true;
return false;
});
shared_ptr<NodeEntry> node(addNode(_pubk, _endpoint, bi::tcp::endpoint(_endpoint.address(), _endpoint.port())));
if (s.nodes.size() >= s_bucketSize)
// TODO p2p: old bug (maybe gone now) sometimes node is nullptr here
if (!!node)
{
shared_ptr<NodeEntry> contested;
{
contested = s.nodes.front().lock();
if (!contested)
Guard l(x_state);
NodeBucket& s = bucket_UNSAFE(node.get());
s.nodes.remove_if([&node](weak_ptr<NodeEntry> n)
{
if (n.lock() == node)
return true;
return false;
});
if (s.nodes.size() >= s_bucketSize)
{
// It's only contested iff nodeentry exists
contested = s.nodes.front().lock();
if (!contested)
{
s.nodes.pop_front();
s.nodes.push_back(node);
s.touch();
}
}
else
{
s.nodes.pop_front();
s.nodes.push_back(_n);
s.nodes.push_back(node);
s.touch();
}
}
else
s.nodes.push_back(_n);
if (contested)
evict(contested, node);
}
if (contested)
evict(contested, _n);
}
void NodeTable::dropNode(shared_ptr<NodeEntry> _n)
{
NodeBucket &s = bucket(_n.get());
{
Guard l(x_state);
NodeBucket& s = bucket_UNSAFE(_n.get());
s.nodes.remove_if([&_n](weak_ptr<NodeEntry> n) { return n.lock() == _n; });
}
Guard l(x_nodes);
m_nodes.erase(_n->id);
{
Guard l(x_nodes);
m_nodes.erase(_n->id);
}
clog(NodeTableNote) << "p2p.nodes.drop " << _n->id.abridged();
if (m_nodeEventHandler)
m_nodeEventHandler->appendEvent(_n->id, NodeEntryRemoved);
}
NodeTable::NodeBucket& NodeTable::bucket(NodeEntry const* _n)
NodeTable::NodeBucket& NodeTable::bucket_UNSAFE(NodeEntry const* _n)
{
return m_state[_n->distance - 1];
}
void NodeTable::onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytesConstRef _packet)
{
// h256 + Signature + RLP (smallest possible packet is empty neighbours packet which is 3 bytes)
if (_packet.size() < h256::size + Signature::size + 3)
// h256 + Signature + type + RLP (smallest possible packet is empty neighbours packet which is 3 bytes)
if (_packet.size() < h256::size + Signature::size + 1 + 3)
{
clog(NodeTableMessageSummary) << "Invalid Message size from " << _from.address().to_string() << ":" << _from.port();
return;
}
bytesConstRef signedBytes(_packet.cropped(h256::size, _packet.size() - h256::size));
h256 hashSigned(sha3(signedBytes));
bytesConstRef hashedBytes(_packet.cropped(h256::size, _packet.size() - h256::size));
h256 hashSigned(sha3(hashedBytes));
if (!_packet.cropped(0, h256::size).contentsEqual(hashSigned.asBytes()))
{
clog(NodeTableMessageSummary) << "Invalid Message hash from " << _from.address().to_string() << ":" << _from.port();
return;
}
bytesConstRef signedBytes(hashedBytes.cropped(Signature::size, hashedBytes.size() - Signature::size));
bytesConstRef rlpBytes(signedBytes.cropped(Signature::size, signedBytes.size() - Signature::size));
RLP rlp(rlpBytes);
unsigned itemCount = rlp.itemCount();
// todo: verify sig via known-nodeid and MDC, or, do ping/pong auth if node/endpoint is unknown/untrusted
bytesConstRef sigBytes(_packet.cropped(h256::size, Signature::size));
Public nodeid(dev::recover(*(Signature const*)sigBytes.data(), sha3(rlpBytes)));
Public nodeid(dev::recover(*(Signature const*)sigBytes.data(), sha3(signedBytes)));
if (!nodeid)
{
clog(NodeTableMessageSummary) << "Invalid Message signature from " << _from.address().to_string() << ":" << _from.port();
return;
}
noteNode(nodeid, _from);
unsigned packetType = signedBytes[0];
if (packetType && packetType < 4)
noteActiveNode(nodeid, _from);
bytesConstRef rlpBytes(_packet.cropped(h256::size + Signature::size + 1));
RLP rlp(rlpBytes);
try {
switch (itemCount)
switch (packetType)
{
case 1:
case Pong::type:
{
// clog(NodeTableMessageSummary) << "Received Pong from " << _from.address().to_string() << ":" << _from.port();
Pong in = Pong::fromBytesConstRef(_from, rlpBytes);
// whenever a pong is received, first check if it's in m_evictions
// whenever a pong is received, check if it's in m_evictions
Guard le(x_evictions);
for (auto it = m_evictions.begin(); it != m_evictions.end(); it++)
if (it->first.first == nodeid && it->first.second > std::chrono::steady_clock::now())
{
if (auto n = nodeEntry(it->second))
dropNode(n);
if (auto n = node(it->first.first))
addNode(n);
it = m_evictions.erase(it);
}
break;
}
case 2:
if (rlp[0].isList())
{
Neighbours in = Neighbours::fromBytesConstRef(_from, rlpBytes);
// clog(NodeTableMessageSummary) << "Received " << in.nodes.size() << " Neighbours from " << _from.address().to_string() << ":" << _from.port();
for (auto n: in.nodes)
noteNode(n.node, bi::udp::endpoint(bi::address::from_string(n.ipAddress), n.port));
}
else
case Neighbours::type:
{
Neighbours in = Neighbours::fromBytesConstRef(_from, rlpBytes);
// clog(NodeTableMessageSummary) << "Received " << in.nodes.size() << " Neighbours from " << _from.address().to_string() << ":" << _from.port();
for (auto n: in.nodes)
noteActiveNode(n.node, bi::udp::endpoint(bi::address::from_string(n.ipAddress), n.port));
break;
}
case FindNode::type:
{
// clog(NodeTableMessageSummary) << "Received FindNode from " << _from.address().to_string() << ":" << _from.port();
FindNode in = FindNode::fromBytesConstRef(_from, rlpBytes);
vector<shared_ptr<NodeEntry>> nearest = nearestNodeEntries(in.target);
static unsigned const nlimit = (m_socketPointer->maxDatagramSize - 11) / 86;
for (unsigned offset = 0; offset < nearest.size(); offset += nlimit)
{
// clog(NodeTableMessageSummary) << "Received FindNode from " << _from.address().to_string() << ":" << _from.port();
FindNode in = FindNode::fromBytesConstRef(_from, rlpBytes);
vector<shared_ptr<NodeTable::NodeEntry>> nearest = findNearest(in.target);
static unsigned const nlimit = (m_socketPtr->maxDatagramSize - 11) / 86;
for (unsigned offset = 0; offset < nearest.size(); offset += nlimit)
{
Neighbours out(_from, nearest, offset, nlimit);
out.sign(m_secret);
m_socketPtr->send(out);
}
Neighbours out(_from, nearest, offset, nlimit);
out.sign(m_secret);
m_socketPointer->send(out);
}
break;
case 3:
}
case PingNode::type:
{
// clog(NodeTableMessageSummary) << "Received PingNode from " << _from.address().to_string() << ":" << _from.port();
PingNode in = PingNode::fromBytesConstRef(_from, rlpBytes);
Pong p(_from);
p.replyTo = sha3(rlpBytes);
p.echo = sha3(rlpBytes);
p.sign(m_secret);
m_socketPtr->send(p);
m_socketPointer->send(p);
break;
}
default:
clog(NodeTableMessageSummary) << "Invalid Message received from " << _from.address().to_string() << ":" << _from.port();
clog(NodeTableWarn) << "Invalid Message, " << hex << packetType << ", received from " << _from.address().to_string() << ":" << dec << _from.port();
return;
}
}
@ -399,10 +477,10 @@ void NodeTable::onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytes
clog(NodeTableWarn) << "Exception processing message from " << _from.address().to_string() << ":" << _from.port();
}
}
void NodeTable::doCheckEvictions(boost::system::error_code const& _ec)
{
if (_ec || !m_socketPtr->isOpen())
if (_ec || !m_socketPointer->isOpen())
return;
auto self(shared_from_this());
@ -415,12 +493,12 @@ void NodeTable::doCheckEvictions(boost::system::error_code const& _ec)
bool evictionsRemain = false;
list<shared_ptr<NodeEntry>> drop;
{
Guard le(x_evictions);
Guard ln(x_nodes);
Guard le(x_evictions);
for (auto& e: m_evictions)
if (chrono::steady_clock::now() - e.first.second > c_reqTimeout)
if (auto n = m_nodes[e.second])
drop.push_back(n);
if (m_nodes.count(e.second))
drop.push_back(m_nodes[e.second]);
evictionsRemain = m_evictions.size() - drop.size() > 0;
}
@ -439,13 +517,15 @@ void NodeTable::doRefreshBuckets(boost::system::error_code const& _ec)
return;
clog(NodeTableNote) << "refreshing buckets";
bool connected = m_socketPtr->isOpen();
bool connected = m_socketPointer->isOpen();
bool refreshed = false;
if (connected)
{
Guard l(x_state);
for (auto& d: m_state)
if (chrono::steady_clock::now() - d.modified > c_bucketRefresh)
{
d.touch();
while (!d.nodes.empty())
{
auto n = d.nodes.front();
@ -457,10 +537,11 @@ void NodeTable::doRefreshBuckets(boost::system::error_code const& _ec)
}
d.nodes.pop_front();
}
}
}
unsigned nextRefresh = connected ? (refreshed ? 200 : c_bucketRefresh.count()*1000) : 10000;
auto runcb = [this](boost::system::error_code const& error) -> void { doRefreshBuckets(error); };
auto runcb = [this](boost::system::error_code const& error) { doRefreshBuckets(error); };
m_bucketRefreshTimer.expires_from_now(boost::posix_time::milliseconds(nextRefresh));
m_bucketRefreshTimer.async_wait(runcb);
}

302
libp2p/NodeTable.h

@ -22,9 +22,10 @@
#pragma once
#include <algorithm>
#include <deque>
#include <boost/integer/static_log2.hpp>
#include <libdevcrypto/Common.h>
#include <libp2p/UDP.h>
#include "Common.h"
namespace dev
{
@ -32,189 +33,263 @@ namespace p2p
{
/**
* NodeTable using S/Kademlia system for node discovery and preference.
* untouched buckets are refreshed if they have not been touched within an hour
* NodeEntry
* @brief Entry in Node Table
*/
struct NodeEntry: public Node
{
NodeEntry(Node _src, Public _pubk, NodeIPEndpoint _gw);
NodeEntry(Node _src, Public _pubk, bi::udp::endpoint _udp);
unsigned const distance; ///< Node's distance (xor of _src as integer).
};
enum NodeTableEventType {
NodeEntryAdded,
NodeEntryRemoved
};
class NodeTable;
class NodeTableEventHandler
{
friend class NodeTable;
public:
virtual void processEvent(NodeId const& _n, NodeTableEventType const& _e) = 0;
protected:
/// Called by NodeTable on behalf of an implementation (Host) to process new events without blocking nodetable.
void processEvents()
{
std::list<std::pair<NodeId, NodeTableEventType>> events;
{
Guard l(x_events);
if (!m_nodeEventHandler.size())
return;
m_nodeEventHandler.unique();
for (auto const& n: m_nodeEventHandler)
events.push_back(std::make_pair(n,m_events[n]));
m_nodeEventHandler.clear();
m_events.clear();
}
for (auto const& e: events)
processEvent(e.first, e.second);
}
/// Called by NodeTable to append event.
virtual void appendEvent(NodeId _n, NodeTableEventType _e) { Guard l(x_events); m_nodeEventHandler.push_back(_n); m_events[_n] = _e; }
Mutex x_events;
std::list<NodeId> m_nodeEventHandler;
std::map<NodeId, NodeTableEventType> m_events;
};
class NodeTable;
inline std::ostream& operator<<(std::ostream& _out, NodeTable const& _nodeTable);
/**
* NodeTable using modified kademlia for node discovery and preference.
* Node table requires an IO service, creates a socket for incoming
* UDP messages and implements a kademlia-like protocol. Node requests and
* responses are used to build a node table which can be queried to
* obtain a list of potential nodes to connect to, and, passes events to
* Host whenever a node is added or removed to/from the table.
*
* Thread-safety is ensured by modifying NodeEntry details via
* shared_ptr replacement instead of mutating values.
*
* NodeTable accepts a port for UDP and will listen to the port on all available
* interfaces.
*
* [Integration]
* @todo deadline-timer which maintains tcp/peer connections
* @todo restore nodes: affects refreshbuckets
* @todo TCP endpoints
* @todo makeRequired: don't try to evict node if node isRequired.
* @todo makeRequired: exclude bucket from refresh if we have node as peer.
*
* [Optimization]
* @todo encapsulate doFindNode into NetworkAlgorithm (task)
* @todo serialize evictions per-bucket
* @todo store evictions in map, unit-test eviction logic
* @todo store root node in table
* @todo encapsulate discover into NetworkAlgorithm (task)
* @todo Pong to include ip:port where ping was received
* @todo expiration and sha3(id) 'to' for messages which are replies (prevents replay)
* @todo std::shared_ptr<PingNode> m_cachedPingPacket;
* @todo std::shared_ptr<FindNeighbours> m_cachedFindSelfPacket;
* @todo store root node in table?
* @todo cache Ping and FindSelf
*
* [Networking]
* @todo node-endpoint updates
* @todo TCP endpoints
* @todo eth/upnp/natpmp/stun/ice/etc for public-discovery
* @todo firewall
*
* [Protocol]
* @todo post-eviction pong
* @todo optimize knowledge at opposite edges; eg, s_bitsPerStep lookups. (Can be done via pointers to NodeBucket)
* @todo ^ s_bitsPerStep = 5; // Denoted by b in [Kademlia]. Bits by which address space is divided.
* @todo optimize (use tree for state and/or custom compare for cache)
* @todo reputation (aka universal siblings lists)
* @todo ^ s_bitsPerStep = 8; // Denoted by b in [Kademlia]. Bits by which address space is divided.
*/
class NodeTable: UDPSocketEvents, public std::enable_shared_from_this<NodeTable>
{
friend struct Neighbours;
friend std::ostream& operator<<(std::ostream& _out, NodeTable const& _nodeTable);
using NodeSocket = UDPSocket<NodeTable, 1280>;
using TimePoint = std::chrono::steady_clock::time_point;
using EvictionTimeout = std::pair<std::pair<NodeId,TimePoint>,NodeId>; ///< First NodeId may be evicted and replaced with second NodeId.
struct NodeDefaultEndpoint
{
NodeDefaultEndpoint(bi::udp::endpoint _udp): udp(_udp) {}
bi::udp::endpoint udp;
};
using EvictionTimeout = std::pair<std::pair<NodeId, TimePoint>, NodeId>; ///< First NodeId may be evicted and replaced with second NodeId.
struct Node
{
Node(Public _pubk, NodeDefaultEndpoint _udp): id(_pubk), endpoint(_udp) {}
Node(Public _pubk, bi::udp::endpoint _udp): Node(_pubk, NodeDefaultEndpoint(_udp)) {}
virtual NodeId const& address() const { return id; }
virtual Public const& publicKey() const { return id; }
NodeId id;
NodeDefaultEndpoint endpoint;
};
public:
NodeTable(ba::io_service& _io, KeyPair _alias, uint16_t _udpPort = 30303);
~NodeTable();
/**
* NodeEntry
* @todo Type of id will become template parameter.
*/
struct NodeEntry: public Node
{
NodeEntry(Node _src, Public _pubk, NodeDefaultEndpoint _gw): Node(_pubk, _gw), distance(dist(_src.id,_pubk)) {}
NodeEntry(Node _src, Public _pubk, bi::udp::endpoint _udp): Node(_pubk, NodeDefaultEndpoint(_udp)), distance(dist(_src.id,_pubk)) {}
/// Returns distance based on xor metric two node ids. Used by NodeEntry and NodeTable.
static unsigned distance(NodeId const& _a, NodeId const& _b) { u512 d = _a ^ _b; unsigned ret; for (ret = 0; d >>= 1; ++ret) {}; return ret; }
/// Set event handler for NodeEntryAdded and NodeEntryRemoved events.
void setEventHandler(NodeTableEventHandler* _handler) { m_nodeEventHandler.reset(_handler); }
/// Called by implementation which provided handler to process NodeEntryAdded/NodeEntryRemoved events. Events are coalesced by type whereby old events are ignored.
void processEvents();
/// Add node. Node will be pinged if it's not already known.
std::shared_ptr<NodeEntry> addNode(Public const& _pubk, bi::udp::endpoint const& _udp, bi::tcp::endpoint const& _tcp);
/// Add node. Node will be pinged if it's not already known.
std::shared_ptr<NodeEntry> addNode(Node const& _node);
const unsigned distance; ///< Node's distance from _src (see constructor).
};
/// To be called when node table is empty. Runs node discovery with m_node.id as the target in order to populate node-table.
void discover();
struct NodeBucket
{
unsigned distance;
TimePoint modified;
std::list<std::weak_ptr<NodeEntry>> nodes;
};
/// Returns list of node ids active in node table.
std::list<NodeId> nodes() const;
public:
NodeTable(ba::io_service& _io, KeyPair _alias, uint16_t _port = 30300);
~NodeTable();
/// Returns node count.
unsigned count() const { return m_nodes.size(); }
/// Returns snapshot of table.
std::list<NodeEntry> snapshot() const;
/// Returns true if node id is in node table.
bool haveNode(NodeId const& _id) { Guard l(x_nodes); return m_nodes.count(_id); }
/// Returns the Node to the corresponding node id or the empty Node if that id is not found.
Node node(NodeId const& _id);
#ifndef BOOST_AUTO_TEST_SUITE
private:
#else
protected:
#endif
/// Constants for Kademlia, mostly derived from address space.
/// Constants for Kademlia, derived from address space.
static unsigned const s_addressByteSize = sizeof(NodeId); ///< Size of address type in bytes.
static unsigned const s_bits = 8 * s_addressByteSize; ///< Denoted by n in [Kademlia].
static unsigned const s_bins = s_bits - 1; ///< Size of m_state (excludes root, which is us).
static unsigned const s_maxSteps = boost::static_log2<s_bits>::value; ///< Max iterations of discovery. (doFindNode)
static unsigned const s_maxSteps = boost::static_log2<s_bits>::value; ///< Max iterations of discovery. (discover)
/// Chosen constants
static unsigned const s_bucketSize = 16; ///< Denoted by k in [Kademlia]. Number of nodes stored in each bucket.
static unsigned const s_bucketSize = 16; ///< Denoted by k in [Kademlia]. Number of nodes stored in each bucket.
static unsigned const s_alpha = 3; ///< Denoted by \alpha in [Kademlia]. Number of concurrent FindNode requests.
/// Intervals
/* todo: replace boost::posix_time; change constants to upper camelcase */
boost::posix_time::milliseconds const c_evictionCheckInterval = boost::posix_time::milliseconds(75); ///< Interval at which eviction timeouts are checked.
std::chrono::milliseconds const c_reqTimeout = std::chrono::milliseconds(300); ///< How long to wait for requests (evict, find iterations).
std::chrono::seconds const c_bucketRefresh = std::chrono::seconds(3600); ///< Refresh interval prevents bucket from becoming stale. [Kademlia]
static unsigned dist(NodeId const& _a, NodeId const& _b) { u512 d = _a ^ _b; unsigned ret; for (ret = 0; d >>= 1; ++ret) {}; return ret; }
struct NodeBucket
{
unsigned distance;
TimePoint modified;
std::list<std::weak_ptr<NodeEntry>> nodes;
void touch() { modified = std::chrono::steady_clock::now(); }
};
void join();
/// Used to ping endpoint.
void ping(bi::udp::endpoint _to) const;
NodeEntry root() const { return NodeEntry(m_node, m_node.publicKey(), m_node.endpoint.udp); }
std::list<NodeId> nodes() const;
std::list<NodeEntry> state() const;
/// Used ping known node. Used by node table when refreshing buckets and as part of eviction process (see evict).
void ping(NodeEntry* _n) const;
NodeEntry operator[](NodeId _id);
/// Returns center node entry which describes this node and used with dist() to calculate xor metric for node table nodes.
NodeEntry center() const { return NodeEntry(m_node, m_node.publicKey(), m_node.endpoint.udp); }
/// Used by asynchronous operations to return NodeEntry which is active and managed by node table.
std::shared_ptr<NodeEntry> nodeEntry(NodeId _id);
protected:
/// Repeatedly sends s_alpha concurrent requests to nodes nearest to target, for nodes nearest to target, up to s_maxSteps rounds.
void doFindNode(NodeId _node, unsigned _round = 0, std::shared_ptr<std::set<std::shared_ptr<NodeEntry>>> _tried = std::shared_ptr<std::set<std::shared_ptr<NodeEntry>>>());
/// Used to discovery nodes on network which are close to the given target.
/// Sends s_alpha concurrent requests to nodes nearest to target, for nodes nearest to target, up to s_maxSteps rounds.
void discover(NodeId _target, unsigned _round = 0, std::shared_ptr<std::set<std::shared_ptr<NodeEntry>>> _tried = std::shared_ptr<std::set<std::shared_ptr<NodeEntry>>>());
/// Returns nodes nearest to target.
std::vector<std::shared_ptr<NodeEntry>> findNearest(NodeId _target);
void ping(bi::udp::endpoint _to) const;
void ping(NodeEntry* _n) const;
/// Returns nodes from node table which are closest to target.
std::vector<std::shared_ptr<NodeEntry>> nearestNodeEntries(NodeId _target);
/// Asynchronously drops _leastSeen node if it doesn't reply and adds _new node, otherwise _new node is thrown away.
void evict(std::shared_ptr<NodeEntry> _leastSeen, std::shared_ptr<NodeEntry> _new);
void noteNode(Public const& _pubk, bi::udp::endpoint const& _endpoint);
void noteNode(std::shared_ptr<NodeEntry> _n);
/// Called whenever activity is received from an unknown node in order to maintain node table.
void noteActiveNode(Public const& _pubk, bi::udp::endpoint const& _endpoint);
/// Used to drop node when timeout occurs or when evict() result is to keep previous node.
void dropNode(std::shared_ptr<NodeEntry> _n);
NodeBucket& bucket(NodeEntry const* _n);
/// Returns references to bucket which corresponds to distance of node id.
/// @warning Only use the return reference locked x_state mutex.
// TODO p2p: Remove this method after removing offset-by-one functionality.
NodeBucket& bucket_UNSAFE(NodeEntry const* _n);
/// General Network Events
/// Called by m_socket when packet is received.
void onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytesConstRef _packet);
void onDisconnected(UDPSocketFace*) {};
/// Called by m_socket when socket is disconnected.
void onDisconnected(UDPSocketFace*) {}
/// Tasks
/// Called by evict() to ensure eviction check is scheduled to run and terminates when no evictions remain. Asynchronous.
void doCheckEvictions(boost::system::error_code const& _ec);
void doRefreshBuckets(boost::system::error_code const& _ec);
#ifndef BOOST_AUTO_TEST_SUITE
private:
#else
protected:
#endif
/// Sends FindNeighbor packet. See doFindNode.
void requestNeighbours(NodeEntry const& _node, NodeId _target) const;
/// Purges and pings nodes for any buckets which haven't been touched for c_bucketRefresh seconds.
void doRefreshBuckets(boost::system::error_code const& _ec);
std::unique_ptr<NodeTableEventHandler> m_nodeEventHandler; ///< Event handler for node events.
Node m_node; ///< This node.
Secret m_secret; ///< This nodes secret key.
mutable Mutex x_nodes; ///< Mutable for thread-safe copy in nodes() const.
std::map<NodeId, std::shared_ptr<NodeEntry>> m_nodes; ///< NodeId -> Node table (most common lookup path)
mutable Mutex x_nodes; ///< LOCK x_state first if both locks are required. Mutable for thread-safe copy in nodes() const.
std::map<NodeId, std::shared_ptr<NodeEntry>> m_nodes; ///< Nodes
mutable Mutex x_state;
std::array<NodeBucket, s_bins> m_state; ///< State table of binned nodes.
mutable Mutex x_state; ///< LOCK x_state first if both x_nodes and x_state locks are required.
std::array<NodeBucket, s_bins> m_state; ///< State of p2p node network.
Mutex x_evictions;
Mutex x_evictions; ///< LOCK x_nodes first if both x_nodes and x_evictions locks are required.
std::deque<EvictionTimeout> m_evictions; ///< Eviction timeouts.
std::shared_ptr<NodeSocket> m_socket; ///< Shared pointer for our UDPSocket; ASIO requires shared_ptr.
NodeSocket* m_socketPtr; ///< Set to m_socket.get().
ba::io_service& m_io; ///< Used by bucket refresh timer.
std::shared_ptr<NodeSocket> m_socket; ///< Shared pointer for our UDPSocket; ASIO requires shared_ptr.
NodeSocket* m_socketPointer; ///< Set to m_socket.get(). Socket is created in constructor and disconnected in destructor to ensure access to pointer is safe.
boost::asio::deadline_timer m_bucketRefreshTimer; ///< Timer which schedules and enacts bucket refresh.
boost::asio::deadline_timer m_evictionCheckTimer; ///< Timer for handling node evictions.
};
inline std::ostream& operator<<(std::ostream& _out, NodeTable const& _nodeTable)
{
_out << _nodeTable.root().address() << "\t" << "0\t" << _nodeTable.root().endpoint.udp.address() << ":" << _nodeTable.root().endpoint.udp.port() << std::endl;
auto s = _nodeTable.state();
_out << _nodeTable.center().address() << "\t" << "0\t" << _nodeTable.center().endpoint.udp.address() << ":" << _nodeTable.center().endpoint.udp.port() << std::endl;
auto s = _nodeTable.snapshot();
for (auto n: s)
_out << n.address() << "\t" << n.distance << "\t" << n.endpoint.udp.address() << ":" << n.endpoint.udp.port() << std::endl;
return _out;
}
/**
* Ping packet: Check if node is alive.
* Ping packet: Sent to check if node is alive.
* PingNode is cached and regenerated after expiration - t, where t is timeout.
*
* Ping is used to implement evict. When a new node is seen for
* a given bucket which is full, the least-responsive node is pinged.
* If the pinged node doesn't respond, then it is removed and the new
* node is inserted.
*
* RLP Encoded Items: 3
* Minimum Encoded Size: 18 bytes
* Maximum Encoded Size: bytes // todo after u128 addresses
@ -224,11 +299,6 @@ inline std::ostream& operator<<(std::ostream& _out, NodeTable const& _nodeTable)
* port: Our port.
* expiration: Triggers regeneration of packet. May also provide control over synchronization.
*
* Ping is used to implement evict. When a new node is seen for
* a given bucket which is full, the least-responsive node is pinged.
* If the pinged node doesn't respond then it is removed and the new
* node is inserted.
*
* @todo uint128_t for ip address (<->integer ipv4/6, asio-address, asio-endpoint)
*
*/
@ -237,6 +307,9 @@ struct PingNode: RLPXDatagram<PingNode>
PingNode(bi::udp::endpoint _ep): RLPXDatagram<PingNode>(_ep) {}
PingNode(bi::udp::endpoint _ep, std::string _src, uint16_t _srcPort, std::chrono::seconds _expiration = std::chrono::seconds(60)): RLPXDatagram<PingNode>(_ep), ipAddress(_src), port(_srcPort), expiration(futureFromEpoch(_expiration)) {}
static const uint8_t type = 1;
unsigned version = 1;
std::string ipAddress;
unsigned port;
unsigned expiration;
@ -246,25 +319,23 @@ struct PingNode: RLPXDatagram<PingNode>
};
/**
* Pong packet: response to ping
* Pong packet: Sent in response to ping
*
* RLP Encoded Items: 1
* RLP Encoded Items: 2
* Minimum Encoded Size: 33 bytes
* Maximum Encoded Size: 33 bytes
*
* @todo expiration
* @todo value of replyTo
* @todo create from PingNode (reqs RLPXDatagram verify flag)
*/
struct Pong: RLPXDatagram<Pong>
{
Pong(bi::udp::endpoint _ep): RLPXDatagram<Pong>(_ep) {}
Pong(bi::udp::endpoint _ep): RLPXDatagram<Pong>(_ep), expiration(futureFromEpoch(std::chrono::seconds(60))) {}
h256 replyTo; // hash of rlp of PingNode
static const uint8_t type = 2;
h256 echo; ///< MCD of PingNode
unsigned expiration;
void streamRLP(RLPStream& _s) const { _s.appendList(1); _s << replyTo; }
void interpretRLP(bytesConstRef _bytes) { RLP r(_bytes); replyTo = (h256)r[0]; }
void streamRLP(RLPStream& _s) const { _s.appendList(2); _s << echo << expiration; }
void interpretRLP(bytesConstRef _bytes) { RLP r(_bytes); echo = (h256)r[0]; expiration = r[1].toInt<unsigned>(); }
};
/**
@ -284,6 +355,8 @@ struct FindNode: RLPXDatagram<FindNode>
{
FindNode(bi::udp::endpoint _ep): RLPXDatagram<FindNode>(_ep) {}
FindNode(bi::udp::endpoint _ep, NodeId _target, std::chrono::seconds _expiration = std::chrono::seconds(30)): RLPXDatagram<FindNode>(_ep), target(_target), expiration(futureFromEpoch(_expiration)) {}
static const uint8_t type = 3;
h512 target;
unsigned expiration;
@ -297,8 +370,6 @@ struct FindNode: RLPXDatagram<FindNode>
*
* RLP Encoded Items: 2 (first item is list)
* Minimum Encoded Size: 10 bytes
*
* @todo nonce: Should be replaced with expiration.
*/
struct Neighbours: RLPXDatagram<Neighbours>
{
@ -313,8 +384,8 @@ struct Neighbours: RLPXDatagram<Neighbours>
void interpretRLP(RLP const& _r) { ipAddress = _r[0].toString(); port = _r[1].toInt<unsigned>(); node = h512(_r[2].toBytes()); }
};
Neighbours(bi::udp::endpoint _ep): RLPXDatagram<Neighbours>(_ep) {}
Neighbours(bi::udp::endpoint _to, std::vector<std::shared_ptr<NodeTable::NodeEntry>> const& _nearest, unsigned _offset = 0, unsigned _limit = 0): RLPXDatagram<Neighbours>(_to)
Neighbours(bi::udp::endpoint _ep): RLPXDatagram<Neighbours>(_ep), expiration(futureFromEpoch(std::chrono::seconds(30))) {}
Neighbours(bi::udp::endpoint _to, std::vector<std::shared_ptr<NodeEntry>> const& _nearest, unsigned _offset = 0, unsigned _limit = 0): RLPXDatagram<Neighbours>(_to), expiration(futureFromEpoch(std::chrono::seconds(30)))
{
auto limit = _limit ? std::min(_nearest.size(), (size_t)(_offset + _limit)) : _nearest.size();
for (auto i = _offset; i < limit; i++)
@ -327,7 +398,8 @@ struct Neighbours: RLPXDatagram<Neighbours>
}
}
std::list<Node> nodes;
static const uint8_t type = 4;
std::vector<Node> nodes;
unsigned expiration = 1;
void streamRLP(RLPStream& _s) const { _s.appendList(2); _s.appendList(nodes.size()); for (auto& n: nodes) n.streamRLP(_s); _s << expiration; }

83
libp2p/Peer.cpp

@ -0,0 +1,83 @@
/*
This file is part of cpp-ethereum.
cpp-ethereum is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
cpp-ethereum is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with cpp-ethereum. If not, see <http://www.gnu.org/licenses/>.
*/
/** @file Peer.cpp
* @author Alex Leverington <nessence@gmail.com>
* @author Gav Wood <i@gavwood.com>
* @date 2014
*/
#include "Peer.h"
using namespace std;
using namespace dev;
using namespace dev::p2p;
namespace dev
{
namespace p2p
{
bool Peer::shouldReconnect() const
{
return chrono::system_clock::now() > m_lastAttempted + chrono::seconds(fallbackSeconds());
}
unsigned Peer::fallbackSeconds() const
{
switch (m_lastDisconnect)
{
case BadProtocol:
return 30 * (m_failedAttempts + 1);
case UselessPeer:
case TooManyPeers:
case ClientQuit:
return 15 * (m_failedAttempts + 1);
case NoDisconnect:
default:
if (m_failedAttempts < 5)
return m_failedAttempts ? m_failedAttempts * 5 : 5;
else if (m_failedAttempts < 15)
return 25 + (m_failedAttempts - 5) * 10;
else
return 25 + 100 + (m_failedAttempts - 15) * 20;
}
}
bool Peer::operator<(Peer const& _p) const
{
if (isOffline() != _p.isOffline())
return isOffline();
else if (isOffline())
if (m_lastAttempted == _p.m_lastAttempted)
return m_failedAttempts < _p.m_failedAttempts;
else
return m_lastAttempted < _p.m_lastAttempted;
else
if (m_score == _p.m_score)
if (m_rating == _p.m_rating)
if (m_failedAttempts == _p.m_failedAttempts)
return id < _p.id;
else
return m_failedAttempts < _p.m_failedAttempts;
else
return m_rating < _p.m_rating;
else
return m_score < _p.m_score;
}
}
}

97
libp2p/Peer.h

@ -0,0 +1,97 @@
/*
This file is part of cpp-ethereum.
cpp-ethereum is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
cpp-ethereum is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with cpp-ethereum. If not, see <http://www.gnu.org/licenses/>.
*/
/** @file Peer.h
* @author Alex Leverington <nessence@gmail.com>
* @author Gav Wood <i@gavwood.com>
* @date 2014
*/
#pragma once
#include "Common.h"
namespace dev
{
namespace p2p
{
/**
* @brief Representation of connectivity state and all other pertinent Peer metadata.
* A Peer represents connectivity between two nodes, which in this case, are the host
* and remote nodes.
*
* State information necessary for loading network topology is maintained by NodeTable.
*
* @todo Implement 'bool required'
* @todo reputation: Move score, rating to capability-specific map (&& remove friend class)
* @todo reputation: implement via origin-tagged events
* @todo Populate metadata upon construction; save when destroyed.
* @todo Metadata for peers needs to be handled via a storage backend.
* Specifically, peers can be utilized in a variety of
* many-to-many relationships while also needing to modify shared instances of
* those peers. Modifying these properties via a storage backend alleviates
* Host of the responsibility. (&& remove save/restoreNetwork)
* @todo reimplement recording of historical session information on per-transport basis
* @todo rebuild nodetable when localNetworking is enabled/disabled
* @todo move attributes into protected
*/
class Peer: public Node
{
friend class Session; /// Allows Session to update score and rating.
friend class Host; /// For Host: saveNetwork(), restoreNetwork()
public:
bool isOffline() const { return !m_session.lock(); }
bi::tcp::endpoint const& peerEndpoint() const { return endpoint.tcp; }
virtual bool operator<(Peer const& _p) const;
/// WIP: Returns current peer rating.
int rating() const { return m_rating; }
/// Return true if connection attempt should be made to this peer or false if
bool shouldReconnect() const;
/// Number of times connection has been attempted to peer.
int failedAttempts() const { return m_failedAttempts; }
/// Reason peer was previously disconnected.
DisconnectReason lastDisconnect() const { return m_lastDisconnect; }
protected:
/// Returns number of seconds to wait until attempting connection, based on attempted connection history.
unsigned fallbackSeconds() const;
int m_score = 0; ///< All time cumulative.
int m_rating = 0; ///< Trending.
/// Network Availability
std::chrono::system_clock::time_point m_lastConnected;
std::chrono::system_clock::time_point m_lastAttempted;
unsigned m_failedAttempts = 0;
DisconnectReason m_lastDisconnect = NoDisconnect; ///< Reason for disconnect that happened last.
/// Used by isOffline() and (todo) for peer to emit session information.
std::weak_ptr<Session> m_session;
};
using Peers = std::vector<Peer>;
}
}

216
libp2p/Session.cpp

@ -36,37 +36,19 @@ using namespace dev::p2p;
#endif
#define clogS(X) dev::LogOutputStream<X, true>(false) << "| " << std::setw(2) << m_socket.native_handle() << "] "
Session::Session(Host* _s, bi::tcp::socket _socket, bi::tcp::endpoint const& _manual):
Session::Session(Host* _s, bi::tcp::socket _socket, std::shared_ptr<Peer> const& _n):
m_server(_s),
m_socket(std::move(_socket)),
m_node(nullptr),
m_manualEndpoint(_manual) // NOTE: the port on this shouldn't be used if it's zero.
m_peer(_n),
m_info({NodeId(), "?", m_socket.remote_endpoint().address().to_string(), 0, std::chrono::steady_clock::duration(0), CapDescSet(), 0, map<string, string>()}),
m_ping(chrono::time_point<chrono::steady_clock>::max())
{
m_lastReceived = m_connect = std::chrono::steady_clock::now();
m_info = PeerInfo({NodeId(), "?", m_manualEndpoint.address().to_string(), 0, std::chrono::steady_clock::duration(0), CapDescSet(), 0, map<string, string>()});
}
Session::Session(Host* _s, bi::tcp::socket _socket, std::shared_ptr<Node> const& _n, bool _force):
m_server(_s),
m_socket(std::move(_socket)),
m_node(_n),
m_manualEndpoint(_n->address),
m_force(_force)
{
m_lastReceived = m_connect = std::chrono::steady_clock::now();
m_info = PeerInfo({m_node->id, "?", _n->address.address().to_string(), _n->address.port(), std::chrono::steady_clock::duration(0), CapDescSet(), 0, map<string, string>()});
}
Session::~Session()
{
if (m_node)
{
if (id() && !isPermanentProblem(m_node->lastDisconnect) && !m_node->dead)
m_server->m_ready += m_node->index;
else
m_node->lastConnected = m_node->lastAttempted - chrono::seconds(1);
}
m_peer->m_lastConnected = m_peer->m_lastAttempted - chrono::seconds(1);
// Read-chain finished for one reason or another.
for (auto& i: m_capabilities)
@ -86,36 +68,21 @@ Session::~Session()
NodeId Session::id() const
{
return m_node ? m_node->id : NodeId();
return m_peer ? m_peer->id : NodeId();
}
void Session::addRating(unsigned _r)
{
if (m_node)
if (m_peer)
{
m_node->rating += _r;
m_node->score += _r;
m_peer->m_rating += _r;
m_peer->m_score += _r;
}
}
int Session::rating() const
{
return m_node->rating;
}
bi::tcp::endpoint Session::endpoint() const
{
if (m_socket.is_open() && m_node)
try
{
return bi::tcp::endpoint(m_socket.remote_endpoint().address(), m_node->address.port());
}
catch (...) {}
if (m_node)
return m_node->address;
return m_manualEndpoint;
return m_peer->m_rating;
}
template <class T> vector<T> randomSelection(vector<T> const& _t, unsigned _n)
@ -132,9 +99,10 @@ template <class T> vector<T> randomSelection(vector<T> const& _t, unsigned _n)
return ret;
}
// TODO: P2P integration: replace w/asio post -> serviceNodesRequest()
void Session::ensureNodesRequested()
{
if (isOpen() && !m_weRequestedNodes)
if (isConnected() && !m_weRequestedNodes)
{
m_weRequestedNodes = true;
RLPStream s;
@ -147,7 +115,9 @@ void Session::serviceNodesRequest()
if (!m_theyRequestedNodes)
return;
auto peers = m_server->potentialPeers(m_knownNodes);
// TODO: P2P reimplement, as per TCP "close nodes" gossip specifications (WiP)
// auto peers = m_server->potentialPeers(m_knownNodes);
Peers peers;
if (peers.empty())
{
addNote("peers", "requested");
@ -160,13 +130,11 @@ void Session::serviceNodesRequest()
auto rs = randomSelection(peers, 10);
for (auto const& i: rs)
{
clogS(NetTriviaDetail) << "Sending peer " << i.id.abridged() << i.address;
if (i.address.address().is_v4())
s.appendList(3) << bytesConstRef(i.address.address().to_v4().to_bytes().data(), 4) << i.address.port() << i.id;
clogS(NetTriviaDetail) << "Sending peer " << i.id.abridged() << i.peerEndpoint();
if (i.peerEndpoint().address().is_v4())
s.appendList(3) << bytesConstRef(i.peerEndpoint().address().to_v4().to_bytes().data(), 4) << i.peerEndpoint().port() << i.id;
else// if (i.second.address().is_v6()) - assumed
s.appendList(3) << bytesConstRef(i.address.address().to_v6().to_bytes().data(), 16) << i.address.port() << i.id;
m_knownNodes.extendAll(i.index);
m_knownNodes.unionWith(i.index);
s.appendList(3) << bytesConstRef(i.peerEndpoint().address().to_v6().to_bytes().data(), 16) << i.peerEndpoint().port() << i.id;
}
sealAndSend(s);
m_theyRequestedNodes = false;
@ -185,6 +153,11 @@ bool Session::interpret(RLP const& _r)
{
case HelloPacket:
{
// TODO: P2P first pass, implement signatures. if signature fails, drop connection. if egress, flag node's endpoint as stale.
// Move auth to Host so we consolidate authentication logic and eschew peer deduplication logic.
// Move all node-lifecycle information into Host.
// Finalize peer-lifecycle properties vs node lifecycle.
m_protocolVersion = _r[1].toInt<unsigned>();
auto clientVersion = _r[2].toString();
auto caps = _r[3].toVector<CapDesc>();
@ -202,57 +175,58 @@ bool Session::interpret(RLP const& _r)
if (m_server->id() == id)
{
// Already connected.
clogS(NetWarn) << "Connected to ourself under a false pretext. We were told this peer was id" << m_info.id.abridged();
clogS(NetWarn) << "Connected to ourself under a false pretext. We were told this peer was id" << id.abridged();
disconnect(LocalIdentity);
return true;
}
if (m_node && m_node->id != id)
// if peer and connection have id, check for UnexpectedIdentity
if (!id)
{
if (m_force || m_node->idOrigin <= Origin::SelfThird)
// SECURITY: We're forcing through the new ID, despite having been told
clogS(NetWarn) << "Connected to node, but their ID has changed since last time. This could indicate a MitM attack. Allowing anyway...";
else
{
clogS(NetWarn) << "Connected to node, but their ID has changed since last time. This could indicate a MitM attack. Disconnecting.";
disconnect(UnexpectedIdentity);
return true;
}
if (m_server->havePeer(id))
{
m_node->dead = true;
disconnect(DuplicatePeer);
return true;
}
disconnect(NullIdentity);
return true;
}
else if (!m_peer->id)
{
m_peer->id = id;
m_peer->endpoint.tcp.port(listenPort);
}
else if (m_peer->id != id)
{
// TODO p2p: FIXME. Host should catch this and reattempt adding node to table.
m_peer->id = id;
m_peer->m_score = 0;
m_peer->m_rating = 0;
// disconnect(UnexpectedIdentity);
// return true;
}
if (m_server->havePeer(id))
if (m_server->havePeerSession(id))
{
// Already connected.
clogS(NetWarn) << "Already connected to a peer with id" << id.abridged();
// Possible that two nodes continually connect to each other with exact same timing.
this_thread::sleep_for(chrono::milliseconds(rand() % 100));
disconnect(DuplicatePeer);
return true;
}
if (!id)
{
disconnect(NullIdentity);
return true;
}
m_node = m_server->noteNode(id, bi::tcp::endpoint(m_socket.remote_endpoint().address(), listenPort), Origin::Self, false, !m_node || m_node->id == id ? NodeId() : m_node->id);
if (m_node->isOffline())
m_node->lastConnected = chrono::system_clock::now();
m_knownNodes.extendAll(m_node->index);
m_knownNodes.unionWith(m_node->index);
if (m_peer->isOffline())
m_peer->m_lastConnected = chrono::system_clock::now();
if (m_protocolVersion != m_server->protocolVersion())
{
disconnect(IncompatibleProtocol);
return true;
}
m_info = PeerInfo({id, clientVersion, m_socket.remote_endpoint().address().to_string(), listenPort, std::chrono::steady_clock::duration(), _r[3].toSet<CapDesc>(), (unsigned)m_socket.native_handle(), map<string, string>() });
m_info.clientVersion = clientVersion;
m_info.host = m_socket.remote_endpoint().address().to_string();
m_info.port = listenPort;
m_info.lastPing = std::chrono::steady_clock::duration();
m_info.caps = _r[3].toSet<CapDesc>();
m_info.socket = (unsigned)m_socket.native_handle();
m_info.notes = map<string, string>();
m_server->registerPeer(shared_from_this(), caps);
break;
@ -284,12 +258,20 @@ bool Session::interpret(RLP const& _r)
break;
case GetPeersPacket:
{
// Disabled for interop testing.
// GetPeers/PeersPacket will be modified to only exchange new nodes which it's peers are interested in.
break;
clogS(NetTriviaSummary) << "GetPeers";
m_theyRequestedNodes = true;
serviceNodesRequest();
break;
}
case PeersPacket:
// Disabled for interop testing.
// GetPeers/PeersPacket will be modified to only exchange new nodes which it's peers are interested in.
break;
clogS(NetTriviaSummary) << "Peers (" << dec << (_r.itemCount() - 1) << " entries)";
m_weRequestedNodes = false;
for (unsigned i = 1; i < _r.itemCount(); ++i)
@ -307,62 +289,37 @@ bool Session::interpret(RLP const& _r)
}
auto ep = bi::tcp::endpoint(peerAddress, _r[i][1].toInt<short>());
NodeId id = _r[i][2].toHash<NodeId>();
clogS(NetAllDetail) << "Checking: " << ep << "(" << id.abridged() << ")" << isPrivateAddress(peerAddress) << this->id().abridged() << isPrivateAddress(endpoint().address()) << m_server->m_nodes.count(id) << (m_server->m_nodes.count(id) ? isPrivateAddress(m_server->m_nodes.at(id)->address.address()) : -1);
if (isPrivateAddress(peerAddress) && !m_server->m_netPrefs.localNetworking)
clogS(NetAllDetail) << "Checking: " << ep << "(" << id.abridged() << ")";
// clogS(NetAllDetail) << "Checking: " << ep << "(" << id.abridged() << ")" << isPrivateAddress(peerAddress) << this->id().abridged() << isPrivateAddress(endpoint().address()) << m_server->m_peers.count(id) << (m_server->m_peers.count(id) ? isPrivateAddress(m_server->m_peers.at(id)->address.address()) : -1);
// todo: draft spec: ignore if dist(us,item) - dist(us,them) > 1
// TODO: isPrivate
if (!m_server->m_netPrefs.localNetworking && isPrivateAddress(peerAddress))
goto CONTINUE; // Private address. Ignore.
if (!id)
goto CONTINUE; // Null identity. Ignore.
goto LAMEPEER; // Null identity. Ignore.
if (m_server->id() == id)
goto CONTINUE; // Just our info - we already have that.
goto LAMEPEER; // Just our info - we already have that.
if (id == this->id())
goto CONTINUE; // Just their info - we already have that.
// check that it's not us or one we already know:
if (m_server->m_nodes.count(id))
{
/* MEH. Far from an ideal solution. Leave alone for now.
// Already got this node.
// See if it's any better that ours or not...
// This could be the public address of a known node.
// SECURITY: remove this in beta - it's only for lazy connections and presents an easy attack vector.
if (m_server->m_nodes.count(id) && isPrivateAddress(m_server->m_nodes.at(id)->address.address()) && ep.port() != 0)
// Update address if the node if we now have a public IP for it.
m_server->m_nodes[id]->address = ep;
*/
goto CONTINUE;
}
goto LAMEPEER; // Just their info - we already have that.
if (!ep.port())
goto CONTINUE; // Zero port? Don't think so.
goto LAMEPEER; // Zero port? Don't think so.
if (ep.port() >= /*49152*/32768)
goto CONTINUE; // Private port according to IANA.
// TODO: PoC-7:
// Technically fine, but ignore for now to avoid peers passing on incoming ports until we can be sure that doesn't happen any more.
// if (ep.port() < 30300 || ep.port() > 30305)
// goto CONTINUE; // Wierd port.
// Avoid our random other addresses that they might end up giving us.
for (auto i: m_server->m_peerAddresses)
if (ep.address() == i && ep.port() == m_server->listenPort())
goto CONTINUE;
// Check that we don't already know about this addr:port combination. If we are, assume the original is best.
// SECURITY: Not a valid assumption in general. Should compare ID origins and pick the best or note uncertainty and weight each equally.
for (auto const& i: m_server->m_nodes)
if (i.second->address == ep)
goto CONTINUE; // Same address but a different node.
goto LAMEPEER; // Private port according to IANA.
// OK passed all our checks. Assume it's good.
addRating(1000);
m_server->noteNode(id, ep, m_node->idOrigin == Origin::Perfect ? Origin::PerfectThird : Origin::SelfThird, true);
m_server->addNode(id, ep.address().to_string(), ep.port(), ep.port());
clogS(NetTriviaDetail) << "New peer: " << ep << "(" << id .abridged()<< ")";
CONTINUE:;
LAMEPEER:;
}
break;
default:
@ -441,7 +398,6 @@ void Session::send(bytes&& _msg)
if (!checkPacket(bytesConstRef(&_msg)))
clogS(NetWarn) << "INVALID PACKET CONSTRUCTED!";
// cerr << (void*)this << " writeImpl" << endl;
if (!m_socket.is_open())
return;
@ -494,15 +450,15 @@ void Session::drop(DisconnectReason _reason)
}
catch (...) {}
if (m_node)
if (m_peer)
{
if (_reason != m_node->lastDisconnect || _reason == NoDisconnect || _reason == ClientQuit || _reason == DisconnectRequested)
m_node->failedAttempts = 0;
m_node->lastDisconnect = _reason;
if (_reason != m_peer->m_lastDisconnect || _reason == NoDisconnect || _reason == ClientQuit || _reason == DisconnectRequested)
m_peer->m_failedAttempts = 0;
m_peer->m_lastDisconnect = _reason;
if (_reason == BadProtocol)
{
m_node->rating /= 2;
m_node->score /= 2;
m_peer->m_rating /= 2;
m_peer->m_score /= 2;
}
}
m_dropped = true;

20
libp2p/Session.h

@ -39,7 +39,7 @@ namespace dev
namespace p2p
{
struct Node;
class Peer;
/**
* @brief The Session class
@ -51,8 +51,7 @@ class Session: public std::enable_shared_from_this<Session>
friend class HostCapabilityFace;
public:
Session(Host* _server, bi::tcp::socket _socket, std::shared_ptr<Node> const& _n, bool _force = false);
Session(Host* _server, bi::tcp::socket _socket, bi::tcp::endpoint const& _manual);
Session(Host* _server, bi::tcp::socket _socket, std::shared_ptr<Peer> const& _n);
virtual ~Session();
void start();
@ -60,13 +59,11 @@ public:
void ping();
bool isOpen() const { return m_socket.is_open(); }
bool isConnected() const { return m_socket.is_open(); }
NodeId id() const;
unsigned socketId() const { return m_socket.native_handle(); }
bi::tcp::endpoint endpoint() const; ///< for other peers to connect to.
template <class PeerCap>
std::shared_ptr<PeerCap> cap() const { try { return std::static_pointer_cast<PeerCap>(m_capabilities.at(std::make_pair(PeerCap::name(), PeerCap::version()))); } catch (...) { return nullptr; } }
@ -81,7 +78,7 @@ public:
void addNote(std::string const& _k, std::string const& _v) { m_info.notes[_k] = _v; }
PeerInfo const& info() const { return m_info; }
PeerSessionInfo const& info() const { return m_info; }
void ensureNodesRequested();
void serviceNodesRequest();
@ -110,14 +107,12 @@ private:
std::array<byte, 65536> m_data; ///< Buffer for ingress packet data.
bytes m_incoming; ///< Read buffer for ingress bytes.
PeerInfo m_info; ///< Dynamic information about this peer.
unsigned m_protocolVersion = 0; ///< The protocol version of the peer.
std::shared_ptr<Node> m_node; ///< The Node object. Might be null if we constructed using a bare address/port.
bi::tcp::endpoint m_manualEndpoint; ///< The endpoint as specified by the constructor.
bool m_force = false; ///< If true, ignore IDs being different. This could open you up to MitM attacks.
std::shared_ptr<Peer> m_peer; ///< The Peer object.
bool m_dropped = false; ///< If true, we've already divested ourselves of this peer. We're just waiting for the reads & writes to fail before the shared_ptr goes OOS and the destructor kicks in.
PeerSessionInfo m_info; ///< Dynamic information about this peer.
bool m_theyRequestedNodes = false; ///< Has the peer requested nodes from us without receiveing an answer from us?
bool m_weRequestedNodes = false; ///< Have we requested nodes from the peer and not received an answer yet?
@ -126,7 +121,6 @@ private:
std::chrono::steady_clock::time_point m_lastReceived; ///< Time point of last message.
std::map<CapDesc, std::shared_ptr<Capability>> m_capabilities; ///< The peer's capability set.
RangeMask<unsigned> m_knownNodes; ///< Nodes we already know about as indices into Host's nodesList. These shouldn't be resent to peer.
};
}

35
libp2p/UDP.cpp

@ -25,25 +25,30 @@ using namespace dev::p2p;
h256 RLPXDatagramFace::sign(Secret const& _k)
{
RLPStream rlpstream;
streamRLP(rlpstream);
bytes rlpBytes(rlpstream.out());
assert(packetType());
bytesConstRef rlp(&rlpBytes);
h256 hash(dev::sha3(rlp));
Signature sig = dev::sign(_k, hash);
RLPStream rlpxstream;
// rlpxstream.appendRaw(toPublic(_k).asBytes()); // for mdc-based signature
rlpxstream.appendRaw(bytes(1, packetType())); // prefix by 1 byte for type
streamRLP(rlpxstream);
bytes rlpxBytes(rlpxstream.out());
data.resize(h256::size + Signature::size + rlp.size());
bytesConstRef packetHash(&data[0], h256::size);
bytesConstRef signedPayload(&data[h256::size], Signature::size + rlp.size());
bytesConstRef payloadSig(&data[h256::size], Signature::size);
bytesConstRef payload(&data[h256::size + Signature::size], rlp.size());
bytesConstRef rlpx(&rlpxBytes);
h256 sighash(dev::sha3(rlpx)); // H(type||data)
Signature sig = dev::sign(_k, sighash); // S(H(type||data))
sig.ref().copyTo(payloadSig);
rlp.copyTo(payload);
dev::sha3(signedPayload).ref().copyTo(packetHash);
data.resize(h256::size + Signature::size + rlpx.size());
bytesConstRef rlpxHash(&data[0], h256::size);
bytesConstRef rlpxSig(&data[h256::size], Signature::size);
bytesConstRef rlpxPayload(&data[h256::size + Signature::size], rlpx.size());
sig.ref().copyTo(rlpxSig);
rlpx.copyTo(rlpxPayload);
bytesConstRef signedRLPx(&data[h256::size], data.size() - h256::size);
dev::sha3(signedRLPx).ref().copyTo(rlpxHash);
return std::move(hash);
return std::move(sighash);
};
Public RLPXDatagramFace::authenticate(bytesConstRef _sig, bytesConstRef _rlp)

14
libp2p/UDP.h

@ -57,20 +57,19 @@ protected:
/**
* @brief RLPX Datagram which can be signed.
* @todo compact templates
* @todo make data private/functional (see UDPDatagram)
*/
struct RLPXDatagramFace: public UDPDatagram
{
static uint64_t futureFromEpoch(std::chrono::milliseconds _ms) { return std::chrono::duration_cast<std::chrono::milliseconds>((std::chrono::system_clock::now() + _ms).time_since_epoch()).count(); }
static uint64_t futureFromEpoch(std::chrono::seconds _sec) { return std::chrono::duration_cast<std::chrono::milliseconds>((std::chrono::system_clock::now() + _sec).time_since_epoch()).count(); }
static uint64_t futureFromEpoch(std::chrono::milliseconds _ms) { return std::chrono::duration_cast<std::chrono::seconds>((std::chrono::system_clock::now() + _ms).time_since_epoch()).count(); }
static uint64_t futureFromEpoch(std::chrono::seconds _sec) { return std::chrono::duration_cast<std::chrono::seconds>((std::chrono::system_clock::now() + _sec).time_since_epoch()).count(); }
static Public authenticate(bytesConstRef _sig, bytesConstRef _rlp);
virtual uint8_t packetType() = 0;
RLPXDatagramFace(bi::udp::endpoint const& _ep): UDPDatagram(_ep) {}
virtual h256 sign(Secret const& _from);
virtual void streamRLP(RLPStream&) const =0;
virtual void interpretRLP(bytesConstRef _bytes) =0;
virtual void streamRLP(RLPStream&) const = 0;
virtual void interpretRLP(bytesConstRef _bytes) = 0;
};
template <class T>
@ -78,6 +77,7 @@ struct RLPXDatagram: public RLPXDatagramFace
{
RLPXDatagram(bi::udp::endpoint const& _ep): RLPXDatagramFace(_ep) {}
static T fromBytesConstRef(bi::udp::endpoint const& _ep, bytesConstRef _bytes) { T t(_ep); t.interpretRLP(_bytes); return std::move(t); }
uint8_t packetType() { return T::type; }
};
/**

2
libp2p/UPnP.cpp

@ -132,7 +132,7 @@ int UPnP::addRedirect(char const* _addr, int _port)
srand(time(NULL));
for (unsigned i = 0; i < 10; ++i)
{
_port = rand() % (65535 - 1024) + 1024;
_port = rand() % (32768 - 1024) + 1024;
sprintf(ext_port_str, "%d", _port);
if (!UPNP_AddPortMapping(m_urls->controlURL, m_data->first.servicetype, ext_port_str, port_str, _addr, "ethereum", "TCP", NULL, NULL))
return _port;

19
libwebthree/WebThree.cpp

@ -35,9 +35,9 @@ using namespace dev::p2p;
using namespace dev::eth;
using namespace dev::shh;
WebThreeDirect::WebThreeDirect(std::string const& _clientVersion, std::string const& _dbPath, bool _forceClean, std::set<std::string> const& _interfaces, NetworkPreferences const& _n, int miners):
WebThreeDirect::WebThreeDirect(std::string const& _clientVersion, std::string const& _dbPath, bool _forceClean, std::set<std::string> const& _interfaces, NetworkPreferences const& _n, bytesConstRef _network, int miners):
m_clientVersion(_clientVersion),
m_net(_clientVersion, _n)
m_net(_clientVersion, _n, _network)
{
if (_dbPath.size())
Defaults::setDBPath(_dbPath);
@ -75,9 +75,9 @@ void WebThreeDirect::setNetworkPreferences(p2p::NetworkPreferences const& _n)
startNetwork();
}
std::vector<PeerInfo> WebThreeDirect::peers()
std::vector<PeerSessionInfo> WebThreeDirect::peers()
{
return m_net.peers();
return m_net.peerSessionInfo();
}
size_t WebThreeDirect::peerCount() const
@ -90,17 +90,12 @@ void WebThreeDirect::setIdealPeerCount(size_t _n)
return m_net.setIdealPeerCount(_n);
}
bytes WebThreeDirect::saveNodes()
bytes WebThreeDirect::saveNetwork()
{
return m_net.saveNodes();
}
void WebThreeDirect::restoreNodes(bytesConstRef _saved)
{
return m_net.restoreNodes(_saved);
return m_net.saveNetwork();
}
void WebThreeDirect::connect(std::string const& _seedHost, unsigned short _port)
{
m_net.connect(_seedHost, _port);
m_net.addNode(NodeId(), _seedHost, _port, _port);
}

23
libwebthree/WebThree.h

@ -54,7 +54,7 @@ class WebThreeNetworkFace
{
public:
/// Get information on the current peer set.
virtual std::vector<p2p::PeerInfo> peers() = 0;
virtual std::vector<p2p::PeerSessionInfo> peers() = 0;
/// Same as peers().size(), but more efficient.
virtual size_t peerCount() const = 0;
@ -63,10 +63,7 @@ public:
virtual void connect(std::string const& _seedHost, unsigned short _port) = 0;
/// Save peers
virtual dev::bytes saveNodes() = 0;
/// Restore peers
virtual void restoreNodes(bytesConstRef _saved) = 0;
virtual dev::bytes saveNetwork() = 0;
/// Sets the ideal number of peers.
virtual void setIdealPeerCount(size_t _n) = 0;
@ -78,7 +75,7 @@ public:
virtual p2p::NodeId id() const = 0;
/// Gets the nodes.
virtual p2p::Nodes nodes() const = 0;
virtual p2p::Peers nodes() const = 0;
/// Start the network subsystem.
virtual void startNetwork() = 0;
@ -106,7 +103,7 @@ class WebThreeDirect : public WebThreeNetworkFace
public:
/// Constructor for private instance. If there is already another process on the machine using @a _dbPath, then this will throw an exception.
/// ethereum() may be safely static_cast()ed to a eth::Client*.
WebThreeDirect(std::string const& _clientVersion, std::string const& _dbPath, bool _forceClean = false, std::set<std::string> const& _interfaces = {"eth", "shh"}, p2p::NetworkPreferences const& _n = p2p::NetworkPreferences(), int miners = -1);
WebThreeDirect(std::string const& _clientVersion, std::string const& _dbPath, bool _forceClean = false, std::set<std::string> const& _interfaces = {"eth", "shh"}, p2p::NetworkPreferences const& _n = p2p::NetworkPreferences(), bytesConstRef _network = bytesConstRef(), int miners = -1);
/// Destructor.
~WebThreeDirect();
@ -124,7 +121,7 @@ public:
// Network stuff:
/// Get information on the current peer set.
std::vector<p2p::PeerInfo> peers() override;
std::vector<p2p::PeerSessionInfo> peers() override;
/// Same as peers().size(), but more efficient.
size_t peerCount() const override;
@ -133,10 +130,10 @@ public:
void connect(std::string const& _seedHost, unsigned short _port = 30303) override;
/// Save peers
dev::bytes saveNodes() override;
dev::bytes saveNetwork() override;
/// Restore peers
void restoreNodes(bytesConstRef _saved) override;
// /// Restore peers
// void restoreNetwork(bytesConstRef _saved) override;
/// Sets the ideal number of peers.
void setIdealPeerCount(size_t _n) override;
@ -148,7 +145,7 @@ public:
p2p::NodeId id() const override { return m_net.id(); }
/// Gets the nodes.
p2p::Nodes nodes() const override { return m_net.nodes(); }
p2p::Peers nodes() const override { return m_net.getPeers(); }
/// Start the network subsystem.
void startNetwork() override { m_net.start(); }
@ -235,7 +232,7 @@ public:
// Peer network stuff - forward through RPCSlave, probably with P2PNetworkSlave/Master classes like Whisper & Ethereum.
/// Get information on the current peer set.
std::vector<p2p::PeerInfo> peers();
std::vector<p2p::PeerSessionInfo> peers();
/// Same as peers().size(), but more efficient.
size_t peerCount() const;

16
libwhisper/WhisperHost.cpp

@ -79,11 +79,15 @@ void WhisperHost::inject(Envelope const& _m, WhisperPeer* _p)
noteChanged(h, f.first);
}
for (auto& i: peers())
if (i->cap<WhisperPeer>().get() == _p)
i->addRating(1);
// TODO p2p: capability-based rating
for (auto i: peerSessions())
{
auto w = i.first->cap<WhisperPeer>().get();
if (w == _p)
w->addRating(1);
else
i->cap<WhisperPeer>()->noteNewMessage(h, _m);
w->noteNewMessage(h, _m);
}
}
void WhisperHost::noteChanged(h256 _messageHash, h256 _filter)
@ -158,8 +162,8 @@ void WhisperHost::uninstallWatch(unsigned _i)
void WhisperHost::doWork()
{
for (auto& i: peers())
i->cap<WhisperPeer>()->sendMessages();
for (auto& i: peerSessions())
i.first->cap<WhisperPeer>().get()->sendMessages();
cleanup();
}

8
neth/main.cpp

@ -435,12 +435,14 @@ int main(int argc, char** argv)
cout << credits();
NetworkPreferences netPrefs(listenPort, publicIP, upnp, useLocal);
auto nodesState = contents((dbPath.size() ? dbPath : getDataDir()) + "/network.rlp");
dev::WebThreeDirect web3(
"NEthereum(++)/" + clientName + "v" + dev::Version + "/" DEV_QUOTED(ETH_BUILD_TYPE) "/" DEV_QUOTED(ETH_BUILD_PLATFORM),
dbPath,
false,
mode == NodeMode::Full ? set<string>{"eth", "shh"} : set<string>(),
netPrefs,
&nodesState,
miners
);
web3.setIdealPeerCount(peers);
@ -452,9 +454,7 @@ int main(int argc, char** argv)
c->setAddress(coinbase);
}
auto nodesState = contents((dbPath.size() ? dbPath : getDataDir()) + "/nodeState.rlp");
web3.restoreNodes(&nodesState);
cout << "Address: " << endl << toHex(us.address().asArray()) << endl;
web3.startNetwork();
if (bootstrap)
@ -1023,7 +1023,7 @@ int main(int argc, char** argv)
// Peers
y = 1;
for (PeerInfo const& i: web3.peers())
for (PeerSessionInfo const& i: web3.peers())
{
auto s = boost::format("%1% ms - %2%:%3% - %4%") %
toString(chrono::duration_cast<chrono::milliseconds>(i.lastPing).count()) %

19
test/net.cpp

@ -30,7 +30,7 @@ using namespace dev::p2p;
namespace ba = boost::asio;
namespace bi = ba::ip;
BOOST_AUTO_TEST_SUITE(p2p)
BOOST_AUTO_TEST_SUITE(net)
/**
* Only used for testing. Not useful beyond tests.
@ -87,7 +87,7 @@ struct TestNodeTable: public NodeTable
bi::address ourIp = bi::address::from_string("127.0.0.1");
for (auto& n: _testNodes)
if (_count--)
noteNode(n.first.pub(), bi::udp::endpoint(ourIp, n.second));
noteActiveNode(n.first.pub(), bi::udp::endpoint(ourIp, n.second));
else
break;
}
@ -152,7 +152,7 @@ BOOST_AUTO_TEST_CASE(test_neighbours_packet)
out.sign(k.sec());
bytesConstRef packet(out.data.data(), out.data.size());
bytesConstRef rlpBytes(packet.cropped(97, packet.size() - 97));
bytesConstRef rlpBytes(packet.cropped(h256::size + Signature::size + 1));
Neighbours in = Neighbours::fromBytesConstRef(to, rlpBytes);
int count = 0;
for (auto n: in.nodes)
@ -182,7 +182,7 @@ BOOST_AUTO_TEST_CASE(kademlia)
// Not yet a 'real' test.
TestNodeTableHost node(8);
node.start();
node.nodeTable->join(); // ideally, joining with empty node table logs warning we can check for
node.nodeTable->discover(); // ideally, joining with empty node table logs warning we can check for
node.setup();
node.populate();
clog << "NodeTable:\n" << *node.nodeTable.get() << endl;
@ -190,15 +190,24 @@ BOOST_AUTO_TEST_CASE(kademlia)
node.populateAll();
clog << "NodeTable:\n" << *node.nodeTable.get() << endl;
auto nodes = node.nodeTable->nodes();
nodes.sort();
node.nodeTable->reset();
clog << "NodeTable:\n" << *node.nodeTable.get() << endl;
node.populate(1);
clog << "NodeTable:\n" << *node.nodeTable.get() << endl;
node.nodeTable->join();
node.nodeTable->discover();
this_thread::sleep_for(chrono::milliseconds(2000));
clog << "NodeTable:\n" << *node.nodeTable.get() << endl;
BOOST_REQUIRE_EQUAL(node.nodeTable->count(), 8);
auto netNodes = node.nodeTable->nodes();
netNodes.sort();
}
BOOST_AUTO_TEST_CASE(test_udp_once)

78
test/peer.cpp

@ -20,6 +20,7 @@
* Peer Network test functions.
*/
#include <boost/test/unit_test.hpp>
#include <chrono>
#include <thread>
#include <libp2p/Host.h>
@ -27,12 +28,74 @@ using namespace std;
using namespace dev;
using namespace dev::p2p;
BOOST_AUTO_TEST_SUITE(p2p)
BOOST_AUTO_TEST_CASE(host)
{
NetworkPreferences host1prefs(30301, "127.0.0.1", true, true);
NetworkPreferences host2prefs(30302, "127.0.0.1", true, true);
Host host1("Test", host1prefs);
host1.start();
Host host2("Test", host2prefs);
auto node2 = host2.id();
host2.start();
host1.addNode(node2, "127.0.0.1", host2prefs.listenPort, host2prefs.listenPort);
this_thread::sleep_for(chrono::seconds(1));
BOOST_REQUIRE_EQUAL(host1.peerCount(), 1);
BOOST_REQUIRE_EQUAL(host2.peerCount(), host1.peerCount());
}
BOOST_AUTO_TEST_CASE(save_nodes)
{
std::list<Host*> hosts;
for (auto i:{0,1,2,3,4,5})
{
Host* h = new Host("Test", NetworkPreferences(30300 + i, "127.0.0.1", true, true));
// starting host is required so listenport is available
h->start();
while (!h->isStarted())
this_thread::sleep_for(chrono::milliseconds(2));
hosts.push_back(h);
}
Host& host = *hosts.front();
for (auto const& h: hosts)
host.addNode(h->id(), "127.0.0.1", h->listenPort(), h->listenPort());
Host& host2 = *hosts.back();
for (auto const& h: hosts)
host2.addNode(h->id(), "127.0.0.1", h->listenPort(), h->listenPort());
this_thread::sleep_for(chrono::milliseconds(1000));
bytes firstHostNetwork(host.saveNetwork());
bytes secondHostNetwork(host.saveNetwork());
BOOST_REQUIRE_EQUAL(sha3(firstHostNetwork), sha3(secondHostNetwork));
BOOST_CHECK_EQUAL(host.peerCount(), 5);
BOOST_CHECK_EQUAL(host2.peerCount(), 5);
RLP r(firstHostNetwork);
BOOST_REQUIRE(r.itemCount() == 3);
BOOST_REQUIRE(r[0].toInt<int>() == 1);
BOOST_REQUIRE_EQUAL(r[1].toBytes().size(), 32); // secret
BOOST_REQUIRE_EQUAL(r[2].itemCount(), 5);
}
BOOST_AUTO_TEST_SUITE_END()
int peerTest(int argc, char** argv)
{
Public remoteAlias;
short listenPort = 30303;
string remoteHost;
short remotePort = 30303;
for (int i = 1; i < argc; ++i)
{
string arg = argv[i];
@ -42,21 +105,18 @@ int peerTest(int argc, char** argv)
remoteHost = argv[++i];
else if (arg == "-p" && i + 1 < argc)
remotePort = (short)atoi(argv[++i]);
else if (arg == "-ra" && i + 1 < argc)
remoteAlias = Public(dev::fromHex(argv[++i]));
else
remoteHost = argv[i];
}
Host ph("Test", NetworkPreferences(listenPort));
if (!remoteHost.empty())
ph.connect(remoteHost, remotePort);
if (!remoteHost.empty() && !remoteAlias)
ph.addNode(remoteAlias, remoteHost, remotePort, remotePort);
for (int i = 0; ; ++i)
{
this_thread::sleep_for(chrono::milliseconds(100));
if (!(i % 10))
ph.pingAll();
}
this_thread::sleep_for(chrono::milliseconds(200));
return 0;
}

159
test/whisperTopic.cpp

@ -36,27 +36,31 @@ BOOST_AUTO_TEST_CASE(topic)
auto oldLogVerbosity = g_logVerbosity;
g_logVerbosity = 0;
Host host1("Test", NetworkPreferences(30303, "127.0.0.1", false, true));
auto whost1 = host1.registerCapability(new WhisperHost());
host1.start();
while (!host1.isStarted())
this_thread::sleep_for(chrono::milliseconds(2));
bool started = false;
unsigned result = 0;
std::thread listener([&]()
{
setThreadName("other");
Host ph("Test", NetworkPreferences(50303, "", false, true));
auto wh = ph.registerCapability(new WhisperHost());
ph.start();
started = true;
/// Only interested in odd packets
auto w = wh->installWatch(BuildTopicMask("odd"));
auto w = whost1->installWatch(BuildTopicMask("odd"));
started = true;
set<unsigned> received;
for (int iterout = 0, last = 0; iterout < 200 && last < 81; ++iterout)
{
for (auto i: wh->checkWatch(w))
for (auto i: whost1->checkWatch(w))
{
Message msg = wh->envelope(i).open(wh->fullTopic(w));
Message msg = whost1->envelope(i).open(whost1->fullTopic(w));
last = RLP(msg.payload()).toInt<unsigned>();
if (received.count(last))
continue;
@ -66,22 +70,28 @@ BOOST_AUTO_TEST_CASE(topic)
}
this_thread::sleep_for(chrono::milliseconds(50));
}
});
Host host2("Test", NetworkPreferences(30300, "127.0.0.1", false, true));
auto whost2 = host2.registerCapability(new WhisperHost());
host2.start();
while (!host2.isStarted())
this_thread::sleep_for(chrono::milliseconds(2));
this_thread::sleep_for(chrono::milliseconds(100));
host2.addNode(host1.id(), "127.0.0.1", 30303, 30303);
this_thread::sleep_for(chrono::milliseconds(500));
while (!started)
this_thread::sleep_for(chrono::milliseconds(50));
Host ph("Test", NetworkPreferences(50300, "", false, true));
shared_ptr<WhisperHost> wh = ph.registerCapability(new WhisperHost());
this_thread::sleep_for(chrono::milliseconds(500));
ph.start();
this_thread::sleep_for(chrono::milliseconds(500));
ph.connect("127.0.0.1", 50303);
this_thread::sleep_for(chrono::milliseconds(2));
KeyPair us = KeyPair::create();
for (int i = 0; i < 10; ++i)
{
wh->post(us.sec(), RLPStream().append(i * i).out(), BuildTopic(i)(i % 2 ? "odd" : "even"));
whost2->post(us.sec(), RLPStream().append(i * i).out(), BuildTopic(i)(i % 2 ? "odd" : "even"));
this_thread::sleep_for(chrono::milliseconds(250));
}
@ -96,7 +106,15 @@ BOOST_AUTO_TEST_CASE(forwarding)
cnote << "Testing Whisper forwarding...";
auto oldLogVerbosity = g_logVerbosity;
g_logVerbosity = 0;
// Host must be configured not to share peers.
Host host1("Listner", NetworkPreferences(30303, "", false, true));
host1.setIdealPeerCount(0);
auto whost1 = host1.registerCapability(new WhisperHost());
host1.start();
while (!host1.isStarted())
this_thread::sleep_for(chrono::milliseconds(2));
unsigned result = 0;
bool done = false;
@ -105,22 +123,16 @@ BOOST_AUTO_TEST_CASE(forwarding)
{
setThreadName("listener");
// Host must be configured not to share peers.
Host ph("Listner", NetworkPreferences(50303, "", false, true));
ph.setIdealPeerCount(0);
auto wh = ph.registerCapability(new WhisperHost());
ph.start();
startedListener = true;
/// Only interested in odd packets
auto w = wh->installWatch(BuildTopicMask("test"));
auto w = whost1->installWatch(BuildTopicMask("test"));
for (int i = 0; i < 200 && !result; ++i)
{
for (auto i: wh->checkWatch(w))
for (auto i: whost1->checkWatch(w))
{
Message msg = wh->envelope(i).open(wh->fullTopic(w));
Message msg = whost1->envelope(i).open(whost1->fullTopic(w));
unsigned last = RLP(msg.payload()).toInt<unsigned>();
cnote << "New message from:" << msg.from().abridged() << RLP(msg.payload()).toInt<unsigned>();
result = last;
@ -129,6 +141,16 @@ BOOST_AUTO_TEST_CASE(forwarding)
}
});
// Host must be configured not to share peers.
Host host2("Forwarder", NetworkPreferences(30305, "", false, true));
host2.setIdealPeerCount(1);
auto whost2 = host2.registerCapability(new WhisperHost());
host2.start();
while (!host2.isStarted())
this_thread::sleep_for(chrono::milliseconds(2));
Public fwderid;
bool startedForwarder = false;
std::thread forwarder([&]()
{
@ -137,26 +159,19 @@ BOOST_AUTO_TEST_CASE(forwarding)
while (!startedListener)
this_thread::sleep_for(chrono::milliseconds(50));
// Host must be configured not to share peers.
Host ph("Forwarder", NetworkPreferences(50305, "", false, true));
ph.setIdealPeerCount(0);
auto wh = ph.registerCapability(new WhisperHost());
this_thread::sleep_for(chrono::milliseconds(500));
ph.start();
this_thread::sleep_for(chrono::milliseconds(500));
ph.connect("127.0.0.1", 50303);
host2.addNode(host1.id(), "127.0.0.1", 30303, 30303);
startedForwarder = true;
/// Only interested in odd packets
auto w = wh->installWatch(BuildTopicMask("test"));
auto w = whost2->installWatch(BuildTopicMask("test"));
while (!done)
{
for (auto i: wh->checkWatch(w))
for (auto i: whost2->checkWatch(w))
{
Message msg = wh->envelope(i).open(wh->fullTopic(w));
Message msg = whost2->envelope(i).open(whost2->fullTopic(w));
cnote << "New message from:" << msg.from().abridged() << RLP(msg.payload()).toInt<unsigned>();
}
this_thread::sleep_for(chrono::milliseconds(50));
@ -166,13 +181,13 @@ BOOST_AUTO_TEST_CASE(forwarding)
while (!startedForwarder)
this_thread::sleep_for(chrono::milliseconds(50));
Host ph("Sender", NetworkPreferences(50300, "", false, true));
ph.setIdealPeerCount(0);
Host ph("Sender", NetworkPreferences(30300, "", false, true));
ph.setIdealPeerCount(1);
shared_ptr<WhisperHost> wh = ph.registerCapability(new WhisperHost());
this_thread::sleep_for(chrono::milliseconds(500));
ph.start();
this_thread::sleep_for(chrono::milliseconds(500));
ph.connect("127.0.0.1", 50305);
ph.addNode(host2.id(), "127.0.0.1", 30305, 30305);
while (!ph.isStarted())
this_thread::sleep_for(chrono::milliseconds(10));
KeyPair us = KeyPair::create();
wh->post(us.sec(), RLPStream().append(1).out(), BuildTopic("test"));
@ -194,32 +209,33 @@ BOOST_AUTO_TEST_CASE(asyncforwarding)
unsigned result = 0;
bool done = false;
// Host must be configured not to share peers.
Host host1("Forwarder", NetworkPreferences(30305, "", false, true));
host1.setIdealPeerCount(1);
auto whost1 = host1.registerCapability(new WhisperHost());
host1.start();
while (!host1.isStarted())
this_thread::sleep_for(chrono::milliseconds(2));
bool startedForwarder = false;
std::thread forwarder([&]()
{
setThreadName("forwarder");
// Host must be configured not to share peers.
Host ph("Forwarder", NetworkPreferences(50305, "", false, true));
ph.setIdealPeerCount(0);
auto wh = ph.registerCapability(new WhisperHost());
this_thread::sleep_for(chrono::milliseconds(500));
ph.start();
this_thread::sleep_for(chrono::milliseconds(500));
ph.connect("127.0.0.1", 50303);
// ph.addNode("127.0.0.1", 30303, 30303);
startedForwarder = true;
/// Only interested in odd packets
auto w = wh->installWatch(BuildTopicMask("test"));
auto w = whost1->installWatch(BuildTopicMask("test"));
while (!done)
{
for (auto i: wh->checkWatch(w))
for (auto i: whost1->checkWatch(w))
{
Message msg = wh->envelope(i).open(wh->fullTopic(w));
Message msg = whost1->envelope(i).open(whost1->fullTopic(w));
cnote << "New message from:" << msg.from().abridged() << RLP(msg.payload()).toInt<unsigned>();
}
this_thread::sleep_for(chrono::milliseconds(50));
@ -227,30 +243,33 @@ BOOST_AUTO_TEST_CASE(asyncforwarding)
});
while (!startedForwarder)
this_thread::sleep_for(chrono::milliseconds(50));
this_thread::sleep_for(chrono::milliseconds(2));
{
Host ph("Sender", NetworkPreferences(50300, "", false, true));
ph.setIdealPeerCount(0);
shared_ptr<WhisperHost> wh = ph.registerCapability(new WhisperHost());
this_thread::sleep_for(chrono::milliseconds(500));
ph.start();
this_thread::sleep_for(chrono::milliseconds(500));
ph.connect("127.0.0.1", 50305);
Host host2("Sender", NetworkPreferences(30300, "", false, true));
host2.setIdealPeerCount(1);
shared_ptr<WhisperHost> whost2 = host2.registerCapability(new WhisperHost());
host2.start();
while (!host2.isStarted())
this_thread::sleep_for(chrono::milliseconds(2));
host2.addNode(host1.id(), "127.0.0.1", 30305, 30305);
while (!host2.peerCount())
this_thread::sleep_for(chrono::milliseconds(5));
KeyPair us = KeyPair::create();
wh->post(us.sec(), RLPStream().append(1).out(), BuildTopic("test"));
whost2->post(us.sec(), RLPStream().append(1).out(), BuildTopic("test"));
this_thread::sleep_for(chrono::milliseconds(250));
}
{
Host ph("Listener", NetworkPreferences(50300, "", false, true));
ph.setIdealPeerCount(0);
Host ph("Listener", NetworkPreferences(30300, "", false, true));
ph.setIdealPeerCount(1);
shared_ptr<WhisperHost> wh = ph.registerCapability(new WhisperHost());
this_thread::sleep_for(chrono::milliseconds(500));
ph.start();
this_thread::sleep_for(chrono::milliseconds(500));
ph.connect("127.0.0.1", 50305);
while (!ph.isStarted())
this_thread::sleep_for(chrono::milliseconds(2));
ph.addNode(host1.id(), "127.0.0.1", 30305, 30305);
/// Only interested in odd packets
auto w = wh->installWatch(BuildTopicMask("test"));

19
third/MainWin.cpp

@ -114,7 +114,8 @@ Main::Main(QWidget *parent) :
connect(ui->ourAccounts->model(), SIGNAL(rowsMoved(const QModelIndex &, int, int, const QModelIndex &, int)), SLOT(ourAccountsRowsMoved()));
m_web3.reset(new WebThreeDirect("Third", getDataDir() + "/Third", false, {"eth", "shh"}));
bytesConstRef networkConfig((byte*)m_networkConfig.data(), m_networkConfig.size());
m_web3.reset(new WebThreeDirect("Third", getDataDir() + "/Third", false, {"eth", "shh"}, NetworkPreferences(), networkConfig));
m_web3->connect(Host::pocHost());
m_server = unique_ptr<WebThreeStubServer>(new WebThreeStubServer(m_qwebConnector, *web3(), keysAsVector(m_myKeys)));
@ -377,10 +378,10 @@ void Main::writeSettings()
s.setValue("address", b);
s.setValue("url", ui->urlEdit->text());
bytes d = m_web3->saveNodes();
bytes d = m_web3->saveNetwork();
if (d.size())
m_nodes = QByteArray((char*)d.data(), (int)d.size());
s.setValue("peers", m_nodes);
m_networkConfig = QByteArray((char*)d.data(), (int)d.size());
s.setValue("peers", m_networkConfig);
s.setValue("geometry", saveGeometry());
s.setValue("windowState", saveState());
@ -409,7 +410,7 @@ void Main::readSettings(bool _skipGeometry)
}
}
ethereum()->setAddress(m_myKeys.back().address());
m_nodes = s.value("peers").toByteArray();
m_networkConfig = s.value("peers").toByteArray();
ui->urlEdit->setText(s.value("url", "about:blank").toString()); //http://gavwood.com/gavcoin.html
on_urlEdit_returnPressed();
}
@ -581,11 +582,9 @@ void Main::ensureNetwork()
web3()->startNetwork();
web3()->connect(defPeer);
}
else
if (!m_web3->peerCount())
m_web3->connect(defPeer);
if (m_nodes.size())
m_web3->restoreNodes(bytesConstRef((byte*)m_nodes.data(), m_nodes.size()));
// else
// if (!m_web3->peerCount())
// m_web3->connect(defPeer);
}
void Main::on_connect_triggered()

2
third/MainWin.h

@ -131,7 +131,7 @@ private:
unsigned m_currenciesFilter = (unsigned)-1;
unsigned m_balancesFilter = (unsigned)-1;
QByteArray m_nodes;
QByteArray m_networkConfig;
QStringList m_servers;
QNetworkAccessManager m_webCtrl;

Loading…
Cancel
Save