Browse Source

Merging in new data structure for nodes from node-table. End result will be consolidation into NodeId, Node (id and endpoints), NodeEntry (as in table), and Peer (connected node as in host). Rename PeerInfo to PeerSessionInfo. Rename NodeInfo to PeerInfo. PeerSessionInfo which is information about the Peer connection and will be split/merged into Node and PeerInfo. Add node-table callbacks for Host to perform connect node if there are not enough nodes.

cl-refactor
subtly 10 years ago
parent
commit
94c09508fd
  1. 2
      alethzero/MainWin.cpp
  2. 4
      libp2p/Common.h
  3. 342
      libp2p/Host.cpp
  4. 88
      libp2p/Host.h
  5. 4
      libp2p/HostCapability.cpp
  6. 55
      libp2p/NodeTable.cpp
  7. 44
      libp2p/NodeTable.h
  8. 138
      libp2p/Session.cpp
  9. 10
      libp2p/Session.h
  10. 4
      libwebthree/WebThree.cpp
  11. 4
      libwebthree/WebThree.h
  12. 2
      neth/main.cpp
  13. 16
      test/peer.cpp
  14. 2
      test/whisperTopic.cpp

2
alethzero/MainWin.cpp

@ -906,7 +906,7 @@ void Main::refreshNetwork()
if (web3()->haveNetwork())
{
map<h512, QString> clients;
for (PeerInfo const& i: ps)
for (PeerSessionInfo const& i: ps)
ui->peers->addItem(QString("[%8 %7] %3 ms - %1:%2 - %4 %5 %6")
.arg(QString::fromStdString(i.host))
.arg(i.port)

4
libp2p/Common.h

@ -116,7 +116,7 @@ typedef std::pair<std::string, u256> CapDesc;
typedef std::set<CapDesc> CapDescSet;
typedef std::vector<CapDesc> CapDescs;
struct PeerInfo
struct PeerSessionInfo
{
NodeId id;
std::string clientVersion;
@ -128,7 +128,7 @@ struct PeerInfo
std::map<std::string, std::string> notes;
};
using PeerInfos = std::vector<PeerInfo>;
using PeerSessionInfos = std::vector<PeerSessionInfo>;
}
}

342
libp2p/Host.cpp

@ -38,6 +38,13 @@ using namespace std;
using namespace dev;
using namespace dev::p2p;
HostNodeTableHandler::HostNodeTableHandler(Host& _host): m_host(_host) {}
void HostNodeTableHandler::processEvent(NodeId _n, NodeTableEventType _e)
{
m_host.onNodeTableEvent(_n, _e);
}
Host::Host(std::string const& _clientVersion, NetworkPreferences const& _n, bool _start):
Worker("p2p", 0),
m_clientVersion(_clientVersion),
@ -116,8 +123,8 @@ void Host::doneWorking()
for (unsigned n = 0;; n = 0)
{
{
RecursiveGuard l(x_peers);
for (auto i: m_peers)
RecursiveGuard l(x_sessions);
for (auto i: m_sessions)
if (auto p = i.second.lock())
if (p->isOpen())
{
@ -139,8 +146,8 @@ void Host::doneWorking()
m_ioService.reset();
// finally, clear out peers (in case they're lingering)
RecursiveGuard l(x_peers);
m_peers.clear();
RecursiveGuard l(x_sessions);
m_sessions.clear();
}
unsigned Host::protocolVersion() const
@ -150,12 +157,12 @@ unsigned Host::protocolVersion() const
void Host::registerPeer(std::shared_ptr<Session> _s, CapDescs const& _caps)
{
assert(!!_s->m_node);
assert(!!_s->m_node->id);
assert(!!_s->m_peer);
assert(!!_s->m_peer->id);
{
RecursiveGuard l(x_peers);
m_peers[_s->m_node->id] = _s;
RecursiveGuard l(x_sessions);
m_sessions[_s->m_peer->id] = _s;
}
unsigned o = (unsigned)UserPacket;
for (auto const& i: _caps)
@ -166,6 +173,34 @@ void Host::registerPeer(std::shared_ptr<Session> _s, CapDescs const& _caps)
}
}
void Host::onNodeTableEvent(NodeId _n, NodeTableEventType _e)
{
if (_e == NodeEntryAdded)
{
auto n = (*m_nodeTable)[_n];
if (n)
{
RecursiveGuard l(x_sessions);
auto p = m_peers[_n];
if (!p)
{
m_peers[_n] = make_shared<PeerInfo>();
p = m_peers[_n];
p->id = _n;
}
p->address = n.endpoint.tcp;
if (peerCount() < m_idealPeerCount)
connect(p);
}
}
else if (_e == NodeEntryRemoved)
{
RecursiveGuard l(x_sessions);
m_peers.erase(_n);
}
}
void Host::seal(bytes& _b)
{
_b[0] = 0x22;
@ -179,80 +214,6 @@ void Host::seal(bytes& _b)
_b[7] = len & 0xff;
}
// TODO: P2P port to NodeTable. (see noteNode calls, Session.cpp)
//shared_ptr<NodeInfo> Host::noteNode(NodeId _id, bi::tcp::endpoint _a)
//{
// RecursiveGuard l(x_peers);
// if (_a.port() < 30300 || _a.port() > 30305)
// cwarn << "Weird port being recorded: " << _a.port();
//
// if (_a.port() >= /*49152*/32768)
// {
// cwarn << "Private port being recorded - setting to 0";
// _a = bi::tcp::endpoint(_a.address(), 0);
// }
//
// unsigned i;
// if (!m_nodes.count(_id))
// {
// i = m_nodesList.size();
// m_nodesList.push_back(_id);
// m_nodes[_id] = make_shared<NodeInfo>();
// m_nodes[_id]->id = _id;
// m_nodes[_id]->index = i;
// }
// else
// i = m_nodes[_id]->index;
// m_nodes[_id]->address = _a;
// m_private.extendAll(i);
// if (!_a.port() || (isPrivateAddress(_a.address()) && !m_netPrefs.localNetworking))
// m_private += i;
// else
// m_private -= i;
//
// return m_nodes[_id];
//}
// TODO: P2P base on target
// TODO: P2P store caps in NodeTable/NodeEntry
//Nodes Host::potentialPeers(RangeMask<unsigned> const& _known)
//{
// RecursiveGuard l(x_peers);
// Nodes ret;
//
// // todo: if localnetworking is enabled it should only share peers if remote
// // is within the same network as our interfaces.
// // this requires flagging nodes when we receive them as to if they're on private network
// auto ns = (m_netPrefs.localNetworking ? _known : (m_private + _known)).inverted();
// for (auto i: ns)
// ret.push_back(*m_nodes[m_nodesList[i]]);
// return ret;
//}
KeyPair Host::getHostIdentifier()
{
static string s_file(getDataDir() + "/host");
static mutex s_x;
lock_guard<mutex> l(s_x);
h256 secret;
bytes b = contents(s_file);
if (b.size() == 32)
memcpy(secret.data(), b.data(), 32);
else
{
// todo: replace w/user entropy; abstract to devcrypto
std::mt19937_64 s_eng(time(0) + chrono::high_resolution_clock::now().time_since_epoch().count());
std::uniform_int_distribution<uint16_t> d(0, 255);
for (unsigned i = 0; i < 32; ++i)
secret[i] = (byte)d(s_eng);
}
if (!secret)
BOOST_THROW_EXCEPTION(crypto::InvalidState());
return move(KeyPair(move(secret)));
}
void Host::determinePublic(string const& _publicAddress, bool _upnp)
{
m_peerAddresses.clear();
@ -324,15 +285,16 @@ void Host::runAcceptor()
{
clog(NetConnect) << "Listening on local port " << m_listenPort << " (public: " << m_tcpPublic << ")";
m_accepting = true;
m_socket.reset(new bi::tcp::socket(m_ioService));
m_tcp4Acceptor.async_accept(*m_socket, [=](boost::system::error_code ec)
bi::tcp::socket* s = new bi::tcp::socket(m_ioService);
m_tcp4Acceptor.async_accept(*s, [=](boost::system::error_code ec)
{
bool success = false;
if (!ec)
{
try
{
doHandshake(m_socket.release());
doHandshake(s);
success = true;
}
catch (Exception const& _e)
@ -345,27 +307,29 @@ void Host::runAcceptor()
}
}
if (!success && m_socket->is_open())
if (!success && s->is_open())
{
boost::system::error_code ec;
m_socket->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
m_socket->close();
s->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
s->close();
}
m_accepting = false;
delete s;
if (ec.value() < 1)
runAcceptor();
});
}
}
void Host::doHandshake(bi::tcp::socket* _socket, NodeId _egressNodeId)
void Host::doHandshake(bi::tcp::socket* _socket, NodeId _nodeId)
{
try {
clog(NetConnect) << "Accepting connection for " << _socket->remote_endpoint();
} catch (...){}
auto p = std::make_shared<Session>(this, std::move(*_socket), m_nodes[_egressNodeId]);
auto p = std::make_shared<Session>(this, std::move(*_socket), m_peers[_nodeId]);
p->start();
}
@ -378,6 +342,15 @@ string Host::pocHost()
void Host::addNode(NodeId const& _node, std::string const& _addr, unsigned short _tcpPeerPort, unsigned short _udpNodePort)
{
if (_tcpPeerPort < 30300 || _tcpPeerPort > 30305)
cwarn << "Weird port being recorded: " << _tcpPeerPort;
if (_tcpPeerPort >= /*49152*/32768)
{
cwarn << "Private port being recorded - setting to 0";
_tcpPeerPort = 0;
}
boost::system::error_code ec;
bi::address addr = bi::address::from_string(_addr, ec);
if (ec)
@ -394,59 +367,25 @@ void Host::addNode(NodeId const& _node, std::string const& _addr, unsigned short
addNode(Node(_node, NodeIPEndpoint(bi::udp::endpoint(addr, _udpNodePort), bi::tcp::endpoint(addr, _tcpPeerPort))));
}
void Host::connect(NodeId const& _node, std::string const& _addr, unsigned short _peerPort, unsigned short _nodePort) noexcept
void Host::connect(std::shared_ptr<PeerInfo> const& _n)
{
if (!m_run)
return;
assert(_node);
auto n = (*m_nodeTable)[_node];
if (havePeerSession(_n->id))
{
clog(NetWarn) << "Aborted connect. Node already connected.";
return;
}
// TODO: refactor into async_resolve
m_ioService.post([=]()
if (!m_nodeTable->haveNode(_n->id))
{
for (auto first: {true, false})
{
try
{
bi::tcp::endpoint ep;
if (first)
{
bi::tcp::resolver r(m_ioService);
ep = r.resolve({_addr, toString(_peerPort)})->endpoint();
}
else
ep = bi::tcp::endpoint(bi::address::from_string(_addr), _peerPort);
if (!n)
m_nodes[_node] = make_shared<NodeInfo>();
m_nodes[_node]->id = _node;
m_nodes[_node]->address = ep;
connect(m_nodes[_node]);
break;
}
catch (Exception const& _e)
{
// Couldn't connect
clog(NetConnect) << "Bad host " << _addr << "\n" << diagnostic_information(_e);
}
catch (exception const& e)
{
// Couldn't connect
clog(NetConnect) << "Bad host " << _addr << " (" << e.what() << ")";
}
}
});
}
void Host::connect(std::shared_ptr<NodeInfo> const& _n)
{
if (!m_run)
clog(NetWarn) << "Aborted connect. Node not in node table.";
return;
}
// prevent concurrently connecting to a node
NodeInfo *nptr = _n.get();
PeerInfo *nptr = _n.get();
{
Guard l(x_pendingNodeConns);
if (m_pendingNodeConns.count(nptr))
@ -457,48 +396,32 @@ void Host::connect(std::shared_ptr<NodeInfo> const& _n)
clog(NetConnect) << "Attempting connection to node" << _n->id.abridged() << "@" << _n->address << "from" << id().abridged();
_n->lastAttempted = std::chrono::system_clock::now();
_n->failedAttempts++;
bi::tcp::socket* s = new bi::tcp::socket(m_ioService);
auto n = node(_n->id);
if (n)
s->async_connect(_n->address, [=](boost::system::error_code const& ec)
s->async_connect(_n->address, [=](boost::system::error_code const& ec)
{
if (ec)
{
if (ec)
{
clog(NetConnect) << "Connection refused to node" << _n->id.abridged() << "@" << _n->address << "(" << ec.message() << ")";
_n->lastDisconnect = TCPError;
_n->lastAttempted = std::chrono::system_clock::now();
}
else
{
clog(NetConnect) << "Connected to" << _n->id.abridged() << "@" << _n->address;
_n->lastConnected = std::chrono::system_clock::now();
auto p = make_shared<Session>(this, std::move(*s), n);
p->start();
}
delete s;
Guard l(x_pendingNodeConns);
m_pendingNodeConns.erase(nptr);
});
else
clog(NetWarn) << "Aborted connect. Node not in node table.";
}
bool Host::havePeer(NodeId _id) const
{
RecursiveGuard l(x_peers);
// Remove dead peers from list.
for (auto i = m_peers.begin(); i != m_peers.end();)
if (i->second.lock().get())
++i;
clog(NetConnect) << "Connection refused to node" << _n->id.abridged() << "@" << _n->address << "(" << ec.message() << ")";
_n->lastDisconnect = TCPError;
_n->lastAttempted = std::chrono::system_clock::now();
}
else
i = m_peers.erase(i);
return !!m_peers.count(_id);
{
clog(NetConnect) << "Connected to" << _n->id.abridged() << "@" << _n->address;
_n->lastConnected = std::chrono::system_clock::now();
auto p = make_shared<Session>(this, std::move(*s), _n);
p->start();
}
delete s;
Guard l(x_pendingNodeConns);
m_pendingNodeConns.erase(nptr);
});
}
unsigned NodeInfo::fallbackSeconds() const
unsigned PeerInfo::fallbackSeconds() const
{
switch (lastDisconnect)
{
@ -524,27 +447,27 @@ unsigned NodeInfo::fallbackSeconds() const
// TODO: P2P migrate grow/prunePeers into 'maintainPeers' & evaluate reputation instead of availability. schedule via deadline timer.
//void Host::growPeers()
//{
// RecursiveGuard l(x_peers);
// int morePeers = (int)m_idealPeerCount - m_peers.size();
// RecursiveGuard l(x_sessions);
// int morePeers = (int)m_idealPeerCount - m_sessions.size();
// if (morePeers > 0)
// {
// auto toTry = m_ready;
// if (!m_netPrefs.localNetworking)
// toTry -= m_private;
// set<NodeInfo> ns;
// set<PeerInfo> ns;
// for (auto i: toTry)
// if (m_nodes[m_nodesList[i]]->shouldReconnect())
// ns.insert(*m_nodes[m_nodesList[i]]);
//
// if (ns.size())
// for (NodeInfo const& i: ns)
// for (PeerInfo const& i: ns)
// {
// connect(m_nodes[i.id]);
// if (!--morePeers)
// return;
// }
// else
// for (auto const& i: m_peers)
// for (auto const& i: m_sessions)
// if (auto p = i.second.lock())
// p->ensureNodesRequested();
// }
@ -552,17 +475,17 @@ unsigned NodeInfo::fallbackSeconds() const
//
//void Host::prunePeers()
//{
// RecursiveGuard l(x_peers);
// RecursiveGuard l(x_sessions);
// // We'll keep at most twice as many as is ideal, halfing what counts as "too young to kill" until we get there.
// set<NodeId> dc;
// for (unsigned old = 15000; m_peers.size() - dc.size() > m_idealPeerCount * 2 && old > 100; old /= 2)
// if (m_peers.size() - dc.size() > m_idealPeerCount)
// for (unsigned old = 15000; m_sessions.size() - dc.size() > m_idealPeerCount * 2 && old > 100; old /= 2)
// if (m_sessions.size() - dc.size() > m_idealPeerCount)
// {
// // look for worst peer to kick off
// // first work out how many are old enough to kick off.
// shared_ptr<Session> worst;
// unsigned agedPeers = 0;
// for (auto i: m_peers)
// for (auto i: m_sessions)
// if (!dc.count(i.first))
// if (auto p = i.second.lock())
// if (/*(m_mode != NodeMode::Host || p->m_caps != 0x01) &&*/ chrono::steady_clock::now() > p->m_connect + chrono::milliseconds(old)) // don't throw off new peers; peer-servers should never kick off other peer-servers.
@ -578,21 +501,21 @@ unsigned NodeInfo::fallbackSeconds() const
// }
//
// // Remove dead peers from list.
// for (auto i = m_peers.begin(); i != m_peers.end();)
// for (auto i = m_sessions.begin(); i != m_sessions.end();)
// if (i->second.lock().get())
// ++i;
// else
// i = m_peers.erase(i);
// i = m_sessions.erase(i);
//}
PeerInfos Host::peers() const
PeerSessionInfos Host::peers() const
{
if (!m_run)
return PeerInfos();
return PeerSessionInfos();
std::vector<PeerInfo> ret;
RecursiveGuard l(x_peers);
for (auto& i: m_peers)
std::vector<PeerSessionInfo> ret;
RecursiveGuard l(x_sessions);
for (auto& i: m_sessions)
if (auto j = i.second.lock())
if (j->m_socket.is_open())
ret.push_back(j->m_info);
@ -615,7 +538,7 @@ void Host::run(boost::system::error_code const&)
return;
}
for (auto p: m_peers)
for (auto p: m_sessions)
if (auto pp = p.second.lock())
pp->serviceNodesRequest();
@ -658,7 +581,8 @@ void Host::startedWorking()
runAcceptor();
if (!m_tcpPublic.address().is_unspecified())
m_nodeTable.reset(new NodeTable(m_ioService, m_key, m_listenPort, m_tcpPublic));
// TODO: add m_tcpPublic endpoint; sort out endpoint stuff for nodetable
m_nodeTable.reset(new NodeTable(m_ioService, m_key, m_listenPort));
else
m_nodeTable.reset(new NodeTable(m_ioService, m_key, m_listenPort > 0 ? m_listenPort : 30303));
}
@ -676,8 +600,8 @@ void Host::doWork()
void Host::keepAlivePeers()
{
RecursiveGuard l(x_peers);
for (auto p: m_peers)
RecursiveGuard l(x_sessions);
for (auto p: m_sessions)
if (auto pp = p.second.lock())
{
if (chrono::steady_clock::now() - pp->m_lastReceived >= chrono::seconds(60))
@ -694,10 +618,10 @@ bytes Host::saveNodes() const
RLPStream nodes;
int count = 0;
{
RecursiveGuard l(x_peers);
for (auto const& i: m_nodes)
RecursiveGuard l(x_sessions);
for (auto const& i: m_peers)
{
NodeInfo const& n = *(i.second);
PeerInfo const& n = *(i.second);
// TODO: PoC-7: Figure out why it ever shares these ports.//n.address.port() >= 30300 && n.address.port() <= 30305 &&
if (!n.dead && chrono::system_clock::now() - n.lastConnected < chrono::seconds(3600 * 48) && n.address.port() > 0 && n.address.port() < /*49152*/32768 && n.id != id() && !isPrivateAddress(n.address.address()))
{
@ -722,7 +646,7 @@ bytes Host::saveNodes() const
void Host::restoreNodes(bytesConstRef _b)
{
RecursiveGuard l(x_peers);
RecursiveGuard l(x_sessions);
RLP r(_b);
if (r.itemCount() > 0 && r[0].isInt())
switch (r[0].toInt<int>())
@ -740,7 +664,7 @@ void Host::restoreNodes(bytesConstRef _b)
else
ep = bi::tcp::endpoint(bi::address_v6(i[0].toArray<byte, 16>()), i[1].toInt<short>());
auto id = (NodeId)i[2];
if (!m_nodes.count(id))
if (!m_peers.count(id))
{
//// auto o = (Origin)i[3].toInt<int>();
// auto n = noteNode(id, ep);
@ -759,7 +683,7 @@ void Host::restoreNodes(bytesConstRef _b)
for (auto i: r)
{
auto id = (NodeId)i[2];
if (!m_nodes.count(id))
if (!m_peers.count(id))
{
bi::tcp::endpoint ep;
if (i[0].itemCount() == 4)
@ -770,3 +694,27 @@ void Host::restoreNodes(bytesConstRef _b)
}
}
}
KeyPair Host::getHostIdentifier()
{
static string s_file(getDataDir() + "/host");
static mutex s_x;
lock_guard<mutex> l(s_x);
h256 secret;
bytes b = contents(s_file);
if (b.size() == 32)
memcpy(secret.data(), b.data(), 32);
else
{
// todo: replace w/user entropy; abstract to devcrypto
std::mt19937_64 s_eng(time(0) + chrono::high_resolution_clock::now().time_since_epoch().count());
std::uniform_int_distribution<uint16_t> d(0, 255);
for (unsigned i = 0; i < 32; ++i)
secret[i] = (byte)d(s_eng);
}
if (!secret)
BOOST_THROW_EXCEPTION(crypto::InvalidState());
return move(KeyPair(move(secret)));
}

88
libp2p/Host.h

@ -51,7 +51,7 @@ namespace p2p
class Host;
struct NodeInfo
struct PeerInfo
{
NodeId id; ///< Their id/public key.
unsigned index; ///< Index into m_nodesList
@ -88,7 +88,7 @@ struct NodeInfo
bool isOffline() const { return lastAttempted > lastConnected; }
// p2p: Remove (in favor of lru eviction and sub-protocol ratings).
bool operator<(NodeInfo const& _n) const
bool operator<(PeerInfo const& _n) const
{
if (isOffline() != _n.isOffline())
return isOffline();
@ -108,19 +108,30 @@ struct NodeInfo
}
};
using Nodes = std::vector<NodeInfo>;
using Nodes = std::vector<PeerInfo>;
class Host;
class HostNodeTableHandler: public NodeTableEventHandler
{
HostNodeTableHandler(Host& _host);
virtual void processEvent(NodeId _n, NodeTableEventType _e);
Host& m_host;
};
/**
* @brief The Host class
* Capabilities should be registered prior to startNetwork, since m_capabilities is not thread-safe.
* @todo gracefully disconnect peer if peer already connected
* @todo determinePublic: ipv6, udp
* @todo handle conflict if addNode/requireNode called and Node already exists w/conflicting tcp or udp port
*/
class Host: public Worker
{
friend class HostNodeTableHandler;
friend class Session;
friend class HostCapabilityFace;
friend struct NodeInfo;
friend struct PeerInfo;
public:
/// Start server, listening for connections on the given port.
@ -142,31 +153,19 @@ public:
CapDescs caps() const { CapDescs ret; for (auto const& i: m_capabilities) ret.push_back(i.first); return ret; }
template <class T> std::shared_ptr<T> cap() const { try { return std::static_pointer_cast<T>(m_capabilities.at(std::make_pair(T::staticName(), T::staticVersion()))); } catch (...) { return nullptr; } }
/// Manually add node.
void addNode(NodeId const& _node, std::string const& _addr, unsigned short _tcpPort = 30303, unsigned short _udpPort = 30303);
/// Connect to a peer explicitly.
void connect(NodeId const& _node, std::string const& _addr, unsigned short _tcpPort = 30303, unsigned short _udpPort = 30303) noexcept;
void connect(NodeId const& _node, bi::tcp::endpoint const& _ep);
void connect(std::shared_ptr<NodeInfo> const& _n);
/// @returns true iff we have a peer of the given id.
bool havePeer(NodeId _id) const;
bool havePeerSession(NodeId _id) { RecursiveGuard l(x_sessions); if (m_sessions.count(_id)) return !!m_sessions[_id].lock(); else return false; }
/// Add node.
void addNode(NodeId const& _node, std::string const& _addr, unsigned short _tcpPort, unsigned short _udpPort);
/// Set ideal number of peers.
void setIdealPeerCount(unsigned _n) { m_idealPeerCount = _n; }
/// p2p: template?
void setIdealPeerCount(HostCapabilityFace* _cap, unsigned _n) { m_capIdealPeerCount[_cap->capDesc()] = _n; }
/// Get peer information.
PeerInfos peers() const;
PeerSessionInfos peers() const;
/// Get number of peers connected; equivalent to, but faster than, peers().size().
size_t peerCount() const { RecursiveGuard l(x_peers); return m_peers.size(); }
/// Ping the peers to update the latency information and disconnect peers which have timed out.
void keepAlivePeers();
size_t peerCount() const { RecursiveGuard l(x_sessions); return m_peers.size(); }
/// Get the port we're listening on currently.
unsigned short listenPort() const { return m_tcpPublic.port(); }
@ -177,7 +176,8 @@ public:
/// Deserialise the data and populate the set of known peers.
void restoreNodes(bytesConstRef _b);
Nodes nodes() const { RecursiveGuard l(x_peers); Nodes ret; for (auto const& i: m_nodes) ret.push_back(*i.second); return ret; }
// TODO: P2P this should be combined with peers into a HostStat object of some kind; coalesce data, as it's only used for status information.
Nodes nodes() const { RecursiveGuard l(x_sessions); Nodes ret; for (auto const& i: m_peers) ret.push_back(*i.second); return ret; }
void setNetworkPreferences(NetworkPreferences const& _p) { auto had = isStarted(); if (had) stop(); m_netPrefs = _p; if (had) start(); }
@ -195,14 +195,20 @@ public:
void registerPeer(std::shared_ptr<Session> _s, CapDescs const& _caps);
std::shared_ptr<NodeInfo> node(NodeId _id) const { if (m_nodes.count(_id)) return m_nodes.at(_id); return std::shared_ptr<NodeInfo>(); }
// std::shared_ptr<PeerInfo> node(NodeId _id) const { if (m_nodes.count(_id)) return m_nodes.at(_id); return std::shared_ptr<PeerInfo>(); }
protected:
void onNodeTableEvent(NodeId _n, NodeTableEventType _e);
private:
KeyPair getHostIdentifier();
/// Populate m_peerAddresses with available public addresses.
void determinePublic(std::string const& _publicAddress, bool _upnp);
void connect(std::shared_ptr<PeerInfo> const& _n);
/// Ping the peers to update the latency information and disconnect peers which have timed out.
void keepAlivePeers();
/// Called only from startedWorking().
void runAcceptor();
@ -223,9 +229,10 @@ private:
virtual void doneWorking();
/// Add node
void addNode(Node const& _nodeInfo) { m_nodeTable->addNode(_nodeInfo); }
void addNode(Node const& _node) { m_nodeTable->addNode(_node); }
// Nodes potentialPeers(RangeMask<unsigned> const& _known);
/// Get or create host identifier (KeyPair).
KeyPair getHostIdentifier();
bool m_run = false; ///< Whether network is running.
std::mutex x_runTimer; ///< Start/stop mutex.
@ -241,33 +248,24 @@ private:
ba::io_service m_ioService; ///< IOService for network stuff.
bi::tcp::acceptor m_tcp4Acceptor; ///< Listening acceptor.
std::unique_ptr<bi::tcp::socket> m_socket; ///< Listening socket.
std::unique_ptr<boost::asio::deadline_timer> m_timer; ///< Timer which, when network is running, calls scheduler() every c_timerInterval ms.
static const unsigned c_timerInterval = 100; ///< Interval which m_timer is run when network is connected.
std::set<NodeInfo*> m_pendingNodeConns; /// Used only by connect(NodeInfo&) to limit concurrently connecting to same node. See connect(shared_ptr<NodeInfo>const&).
std::set<PeerInfo*> m_pendingNodeConns; /// Used only by connect(PeerInfo&) to limit concurrently connecting to same node. See connect(shared_ptr<PeerInfo>const&).
Mutex x_pendingNodeConns;
bi::tcp::endpoint m_tcpPublic; ///< Our public listening endpoint.
KeyPair m_key; ///< Our unique ID.
std::shared_ptr<NodeTable> m_nodeTable; ///< Node table (uses kademlia-like discovery).
std::map<CapDesc, unsigned> m_capIdealPeerCount; ///< Ideal peer count for capability.
mutable RecursiveMutex x_peers;
std::map<NodeId, std::shared_ptr<PeerInfo>> m_peers;
mutable RecursiveMutex x_sessions;
/// The nodes to which we are currently connected.
/// Mutable because we flush zombie entries (null-weakptrs) as regular maintenance from a const method.
mutable std::map<NodeId, std::weak_ptr<Session>> m_peers;
/// Nodes to which we may connect (or to which we have connected).
/// TODO: mutex; replace with nodeTable
std::map<NodeId, std::shared_ptr<NodeInfo> > m_nodes;
// /// A list of node IDs. This contains every index from m_nodes; the order is guaranteed to remain the same.
// std::vector<NodeId> m_nodesList;
// RangeMask<unsigned> m_private; ///< Indices into m_nodesList over to which nodes are private.
mutable std::map<NodeId, std::weak_ptr<Session>> m_sessions;
unsigned m_idealPeerCount = 5; ///< Ideal number of peers to be connected to.
@ -280,6 +278,6 @@ private:
bool m_accepting = false;
};
}
}

4
libp2p/HostCapability.cpp

@ -34,9 +34,9 @@ void HostCapabilityFace::seal(bytes& _b)
std::vector<std::shared_ptr<Session> > HostCapabilityFace::peers() const
{
RecursiveGuard l(m_host->x_peers);
RecursiveGuard l(m_host->x_sessions);
std::vector<std::shared_ptr<Session> > ret;
for (auto const& i: m_host->m_peers)
for (auto const& i: m_host->m_sessions)
if (std::shared_ptr<Session> p = i.second.lock())
if (p->m_capabilities.count(capDesc()))
ret.push_back(p);

55
libp2p/NodeTable.cpp

@ -27,7 +27,7 @@ using namespace dev::p2p;
NodeEntry::NodeEntry(Node _src, Public _pubk, NodeIPEndpoint _gw): Node(_pubk, _gw), distance(NodeTable::dist(_src.id,_pubk)) {}
NodeEntry::NodeEntry(Node _src, Public _pubk, bi::udp::endpoint _udp): Node(_pubk, NodeIPEndpoint(_udp)), distance(NodeTable::dist(_src.id,_pubk)) {}
NodeTable::NodeTable(ba::io_service& _io, KeyPair _alias, uint16_t _udp, bi::tcp::endpoint _ep):
NodeTable::NodeTable(ba::io_service& _io, KeyPair _alias, uint16_t _udp):
m_node(Node(_alias.pub(), bi::udp::endpoint())),
m_secret(_alias.sec()),
m_io(_io),
@ -53,11 +53,32 @@ NodeTable::~NodeTable()
m_socketPtr->disconnect();
}
shared_ptr<NodeEntry> NodeTable::addNode(Public const& _pubk, bi::udp::endpoint const& _udp, bi::tcp::endpoint const& _tcp)
{
auto node = Node(_pubk, NodeIPEndpoint(_udp, _tcp));
return move(addNode(node));
}
shared_ptr<NodeEntry> NodeTable::addNode(Node const& _node)
{
Guard l(x_nodes);
shared_ptr<NodeEntry> ret = m_nodes[_node.id];
if (!ret)
{
ret.reset(new NodeEntry(m_node, _node.id, NodeIPEndpoint(_node.endpoint.udp, _node.endpoint.tcp)));
m_nodes[_node.id] = ret;
PingNode p(_node.endpoint.udp, m_node.endpoint.udp.address().to_string(), m_node.endpoint.udp.port());
p.sign(m_secret);
m_socketPtr->send(p);
}
return move(ret);
}
void NodeTable::join()
{
doFindNode(m_node.id);
}
list<NodeId> NodeTable::nodes() const
{
list<NodeId> nodes;
@ -84,6 +105,13 @@ Node NodeTable::operator[](NodeId _id)
return !!n ? *n : Node();
}
shared_ptr<NodeEntry> NodeTable::getNodeEntry(NodeId _id)
{
Guard l(x_nodes);
auto n = m_nodes[_id];
return !!n ? move(n) : move(shared_ptr<NodeEntry>());
}
void NodeTable::requestNeighbours(NodeEntry const& _node, NodeId _target) const
{
FindNode p(_node.endpoint.udp, _target);
@ -240,29 +268,6 @@ void NodeTable::evict(shared_ptr<NodeEntry> _leastSeen, shared_ptr<NodeEntry> _n
ping(_leastSeen.get());
}
shared_ptr<NodeEntry> NodeTable::addNode(Public const& _pubk, bi::udp::endpoint const& _udp, bi::tcp::endpoint const& _tcp)
{
auto node = Node(_pubk, NodeIPEndpoint(_udp, _tcp));
return move(addNode(node));
}
shared_ptr<NodeEntry> NodeTable::addNode(Node const& _node)
{
shared_ptr<NodeEntry> ret;
Guard l(x_nodes);
if (auto n = m_nodes[_node.id])
ret = n;
else
{
ret.reset(new NodeEntry(m_node, _node.id, NodeIPEndpoint(_node.endpoint.udp, _node.endpoint.tcp)));
m_nodes[_node.id] = ret;
PingNode p(_node.endpoint.udp, m_node.endpoint.udp.address().to_string(), m_node.endpoint.udp.port());
p.sign(m_secret);
m_socketPtr->send(p);
}
return move(ret);
}
void NodeTable::noteNode(Public const& _pubk, bi::udp::endpoint const& _endpoint)
{
if (_pubk == m_node.address())

44
libp2p/NodeTable.h

@ -22,6 +22,7 @@
#pragma once
#include <algorithm>
#include <deque>
#include <boost/integer/static_log2.hpp>
#include <libdevcrypto/Common.h>
#include <libp2p/UDP.h>
@ -74,14 +75,37 @@ struct Node
*/
struct NodeEntry: public Node
{
NodeEntry(Node _src, Public _pubk, NodeIPEndpoint _gw); //: Node(_pubk, _gw), distance(dist(_src.id,_pubk)) {}
NodeEntry(Node _src, Public _pubk, bi::udp::endpoint _udp); //: Node(_pubk, NodeIPEndpoint(_udp)), distance(dist(_src.id,_pubk)) {}
NodeEntry(Node _src, Public _pubk, NodeIPEndpoint _gw);
NodeEntry(Node _src, Public _pubk, bi::udp::endpoint _udp);
const unsigned distance; ///< Node's distance from _src (see constructor).
const unsigned distance; ///< Node's distance (xor of _src as integer).
};
enum NodeTableEventType {
NodeEntryAdded,
NodeEntryRemoved
};
class NodeTable;
class NodeTableEventHandler
{
friend class NodeTable;
public:
virtual void processEvent(NodeId _n, NodeTableEventType _e) =0;
protected:
/// Called by NodeTable on behalf of an implementation (Host) to process new events without blocking nodetable.
void processEvents() { std::list<std::pair<NodeId,NodeTableEventType>> events; { Guard l(x_events); if (!m_nodeEvents.size()) return; m_nodeEvents.unique(); for (auto const& n: m_nodeEvents) events.push_back(std::make_pair(n,m_events[n])); m_nodeEvents.empty(); m_events.empty(); } for (auto const& e: events) processEvent(e.first, e.second); }
/// Called by NodeTable to append event.
virtual void appendEvent(NodeId _n, NodeTableEventType _e) { Guard l(x_events); m_nodeEvents.push_back(_n); m_events[_n] = _e; }
Mutex x_events;
std::list<NodeId> m_nodeEvents;
std::map<NodeId,NodeTableEventType> m_events;
};
/**
* NodeTable using S/Kademlia system for node discovery and preference.
* NodeTable using modified kademlia for node discovery and preference.
* untouched buckets are refreshed if they have not been touched within an hour
*
* Thread-safety is ensured by modifying NodeEntry details via
@ -122,7 +146,7 @@ class NodeTable: UDPSocketEvents, public std::enable_shared_from_this<NodeTable>
using EvictionTimeout = std::pair<std::pair<NodeId,TimePoint>,NodeId>; ///< First NodeId may be evicted and replaced with second NodeId.
public:
NodeTable(ba::io_service& _io, KeyPair _alias, uint16_t _udpPort = 30303, bi::tcp::endpoint _ep = bi::tcp::endpoint());
NodeTable(ba::io_service& _io, KeyPair _alias, uint16_t _udpPort = 30303);
~NodeTable();
/// Constants for Kademlia, mostly derived from address space.
@ -145,6 +169,12 @@ public:
static unsigned dist(NodeId const& _a, NodeId const& _b) { u512 d = _a ^ _b; unsigned ret; for (ret = 0; d >>= 1; ++ret) {}; return ret; }
/// Set event handler for NodeEntryAdded and NodeEntryRemoved events.
void setEventHandler(NodeTableEventHandler* _handler) { m_nodeEvents.reset(_handler); }
/// Called by implementation which provided handler to process NodeEntryAdded/NodeEntryRemoved events. Events are coalesced by type whereby old events are ignored.
void processEvents() { if (m_nodeEvents) m_nodeEvents->processEvents(); }
/// Add node. Node will be pinged if it's not already known.
std::shared_ptr<NodeEntry> addNode(Public const& _pubk, bi::udp::endpoint const& _udp, bi::tcp::endpoint const& _tcp = bi::tcp::endpoint());
@ -157,7 +187,9 @@ public:
std::list<NodeId> nodes() const;
std::list<NodeEntry> state() const;
bool haveNode(NodeId _id) { Guard l(x_nodes); return !!m_nodes[_id]; }
Node operator[](NodeId _id);
std::shared_ptr<NodeEntry> getNodeEntry(NodeId _id);
protected:
struct NodeBucket
@ -207,6 +239,8 @@ protected:
/// Sends FindNeighbor packet. See doFindNode.
void requestNeighbours(NodeEntry const& _node, NodeId _target) const;
std::unique_ptr<NodeTableEventHandler> m_nodeEvents; ///< Event handler for node events.
Node m_node; ///< This node.
Secret m_secret; ///< This nodes secret key.

138
libp2p/Session.cpp

@ -36,21 +36,21 @@ using namespace dev::p2p;
#endif
#define clogS(X) dev::LogOutputStream<X, true>(false) << "| " << std::setw(2) << m_socket.native_handle() << "] "
Session::Session(Host* _s, bi::tcp::socket _socket, std::shared_ptr<NodeInfo> const& _n):
Session::Session(Host* _s, bi::tcp::socket _socket, std::shared_ptr<PeerInfo> const& _n):
m_server(_s),
m_socket(std::move(_socket)),
m_node(_n),
m_info({m_peer->id, "?", _n->address.address().to_string(), _n->address.port(), std::chrono::steady_clock::duration(0), CapDescSet(), 0, map<string, string>()}),
m_peer(_n),
m_manualEndpoint(_n->address)
{
m_lastReceived = m_connect = std::chrono::steady_clock::now();
m_info = PeerInfo({m_node->id, "?", _n->address.address().to_string(), _n->address.port(), std::chrono::steady_clock::duration(0), CapDescSet(), 0, map<string, string>()});
}
Session::~Session()
{
// TODO: P2P revisit (refactored from previous logic)
if (m_node && !(id() && !isPermanentProblem(m_node->lastDisconnect) && !m_node->dead))
m_node->lastConnected = m_node->lastAttempted - chrono::seconds(1);
if (m_peer && !(id() && !isPermanentProblem(m_peer->lastDisconnect) && !m_peer->dead))
m_peer->lastConnected = m_peer->lastAttempted - chrono::seconds(1);
// Read-chain finished for one reason or another.
for (auto& i: m_capabilities)
@ -70,35 +70,35 @@ Session::~Session()
NodeId Session::id() const
{
return m_node ? m_node->id : NodeId();
return m_peer ? m_peer->id : NodeId();
}
void Session::addRating(unsigned _r)
{
if (m_node)
if (m_peer)
{
m_node->rating += _r;
m_node->score += _r;
m_peer->rating += _r;
m_peer->score += _r;
}
}
int Session::rating() const
{
return m_node->rating;
return m_peer->rating;
}
// TODO: P2P integration: session->? should be unavailable when socket isn't open
bi::tcp::endpoint Session::endpoint() const
{
if (m_socket.is_open() && m_node)
if (m_socket.is_open() && m_peer)
try
{
return bi::tcp::endpoint(m_socket.remote_endpoint().address(), m_node->address.port());
return bi::tcp::endpoint(m_socket.remote_endpoint().address(), m_peer->address.port());
}
catch (...) {}
if (m_node)
return m_node->address;
if (m_peer)
return m_peer->address;
return m_manualEndpoint;
}
@ -195,10 +195,11 @@ bool Session::interpret(RLP const& _r)
return true;
}
assert(!!m_node);
assert(!!m_node->id);
assert(!!m_peer);
assert(!!m_peer->id);
if (m_server->havePeer(id))
// TODO: P2P ensure disabled logic is covered
if (false /* m_server->havePeer(id) */)
{
// Already connected.
clogS(NetWarn) << "Already connected to a peer with id" << id.abridged();
@ -217,20 +218,28 @@ bool Session::interpret(RLP const& _r)
// TODO: P2P Move all node-lifecycle information into Host. Determine best way to handle peer-lifecycle properties vs node lifecycle.
// TODO: P2P remove oldid
// TODO: P2P with encrypted transport the handshake will fail and we won't get here
// m_node = m_server->noteNode(m_node->id, bi::tcp::endpoint(m_socket.remote_endpoint().address(), listenPort));
if (m_node->isOffline())
m_node->lastConnected = chrono::system_clock::now();
// m_peer = m_server->noteNode(m_peer->id, bi::tcp::endpoint(m_socket.remote_endpoint().address(), listenPort));
if (m_peer->isOffline())
m_peer->lastConnected = chrono::system_clock::now();
//
// // TODO: P2P introduce map of nodes we've given to this node (if GetPeers/Peers stays in TCP)
m_knownNodes.extendAll(m_node->index);
m_knownNodes.unionWith(m_node->index);
m_knownNodes.extendAll(m_peer->index);
m_knownNodes.unionWith(m_peer->index);
if (m_protocolVersion != m_server->protocolVersion())
{
disconnect(IncompatibleProtocol);
return true;
}
m_info = PeerInfo({id, clientVersion, m_socket.remote_endpoint().address().to_string(), listenPort, std::chrono::steady_clock::duration(), _r[3].toSet<CapDesc>(), (unsigned)m_socket.native_handle(), map<string, string>() });
// TODO: P2P migrate auth to Host and Handshake to constructor
m_info.clientVersion = clientVersion;
m_info.host = m_socket.remote_endpoint().address().to_string();
m_info.port = listenPort;
m_info.lastPing = std::chrono::steady_clock::duration();
m_info.caps = _r[3].toSet<CapDesc>();
m_info.socket = (unsigned)m_socket.native_handle();
m_info.notes = map<string, string>();
m_server->registerPeer(shared_from_this(), caps);
break;
@ -285,64 +294,63 @@ bool Session::interpret(RLP const& _r)
}
auto ep = bi::tcp::endpoint(peerAddress, _r[i][1].toInt<short>());
NodeId id = _r[i][2].toHash<NodeId>();
clogS(NetAllDetail) << "Checking: " << ep << "(" << id.abridged() << ")" << isPrivateAddress(peerAddress) << this->id().abridged() << isPrivateAddress(endpoint().address()) << m_server->m_nodes.count(id) << (m_server->m_nodes.count(id) ? isPrivateAddress(m_server->m_nodes.at(id)->address.address()) : -1);
clogS(NetAllDetail) << "Checking: " << ep << "(" << id.abridged() << ")";
// clogS(NetAllDetail) << "Checking: " << ep << "(" << id.abridged() << ")" << isPrivateAddress(peerAddress) << this->id().abridged() << isPrivateAddress(endpoint().address()) << m_server->m_peers.count(id) << (m_server->m_peers.count(id) ? isPrivateAddress(m_server->m_peers.at(id)->address.address()) : -1);
if (isPrivateAddress(peerAddress) && !m_server->m_netPrefs.localNetworking)
// ignore if dist(us,item) - dist(us,them) > 1
// TODO: isPrivate
if (!m_server->m_netPrefs.localNetworking && isPrivateAddress(peerAddress))
goto CONTINUE; // Private address. Ignore.
if (!id)
goto CONTINUE; // Null identity. Ignore.
goto LAMEPEER; // Null identity. Ignore.
if (m_server->id() == id)
goto CONTINUE; // Just our info - we already have that.
goto LAMEPEER; // Just our info - we already have that.
if (id == this->id())
goto CONTINUE; // Just their info - we already have that.
goto LAMEPEER; // Just their info - we already have that.
// we don't worry about m_peers.count(id) now because node table will handle this and
// by default we will not blindly connect to nodes received via tcp; instead they will
// be pinged, as-is standard, by the node table and added if appropriate. unless flagged
// as required, nodes aren't connected to unless they respond via discovery; no matter if
// a node is relayed via udp or tcp.
// check that it's not us or one we already know:
if (m_server->m_nodes.count(id))
{
/* MEH. Far from an ideal solution. Leave alone for now.
// Already got this node.
// See if it's any better that ours or not...
// This could be the public address of a known node.
// SECURITY: remove this in beta - it's only for lazy connections and presents an easy attack vector.
if (m_server->m_nodes.count(id) && isPrivateAddress(m_server->m_nodes.at(id)->address.address()) && ep.port() != 0)
// Update address if the node if we now have a public IP for it.
m_server->m_nodes[id]->address = ep;
*/
goto CONTINUE;
}
// if (m_server->m_peers.count(id))
// {
// /* MEH. Far from an ideal solution. Leave alone for now.
// // Already got this node.
// // See if it's any better that ours or not...
// // This could be the public address of a known node.
// // SECURITY: remove this in beta - it's only for lazy connections and presents an easy attack vector.
// if (m_server->m_peers.count(id) && isPrivateAddress(m_server->m_peers.at(id)->address.address()) && ep.port() != 0)
// // Update address if the node if we now have a public IP for it.
// m_server->m_peers[id]->address = ep;
// */
// goto CONTINUE;
// }
if (!ep.port())
goto CONTINUE; // Zero port? Don't think so.
goto LAMEPEER; // Zero port? Don't think so.
if (ep.port() >= /*49152*/32768)
goto CONTINUE; // Private port according to IANA.
goto LAMEPEER; // Private port according to IANA.
// TODO: PoC-7:
// Technically fine, but ignore for now to avoid peers passing on incoming ports until we can be sure that doesn't happen any more.
// if (ep.port() < 30300 || ep.port() > 30305)
// goto CONTINUE; // Wierd port.
// Avoid our random other addresses that they might end up giving us.
for (auto i: m_server->m_peerAddresses)
if (ep.address() == i && ep.port() == m_server->listenPort())
goto CONTINUE;
// Check that we don't already know about this addr:port combination. If we are, assume the original is best.
// SECURITY: Not a valid assumption in general. Should compare ID origins and pick the best or note uncertainty and weight each equally.
for (auto const& i: m_server->m_nodes)
if (i.second->address == ep)
goto CONTINUE; // Same address but a different node.
// node table handles another node giving us a node which represents one of our other local network interfaces
// node table handles another node giving us a node we already know about
// OK passed all our checks. Assume it's good.
addRating(1000);
// TODO: P2P change to addNode()
// m_server->noteNode(id, ep);
m_server->addNode(Node(id, NodeIPEndpoint(bi::udp::endpoint(ep.address(), 30303), ep)));
clogS(NetTriviaDetail) << "New peer: " << ep << "(" << id .abridged()<< ")";
CONTINUE:;
LAMEPEER:;
}
break;
default:
@ -469,15 +477,15 @@ void Session::drop(DisconnectReason _reason)
}
catch (...) {}
if (m_node)
if (m_peer)
{
if (_reason != m_node->lastDisconnect || _reason == NoDisconnect || _reason == ClientQuit || _reason == DisconnectRequested)
m_node->failedAttempts = 0;
m_node->lastDisconnect = _reason;
if (_reason != m_peer->lastDisconnect || _reason == NoDisconnect || _reason == ClientQuit || _reason == DisconnectRequested)
m_peer->failedAttempts = 0;
m_peer->lastDisconnect = _reason;
if (_reason == BadProtocol)
{
m_node->rating /= 2;
m_node->score /= 2;
m_peer->rating /= 2;
m_peer->score /= 2;
}
}
m_dropped = true;

10
libp2p/Session.h

@ -39,7 +39,7 @@ namespace dev
namespace p2p
{
struct NodeInfo;
struct PeerInfo;
/**
* @brief The Session class
@ -51,7 +51,7 @@ class Session: public std::enable_shared_from_this<Session>
friend class HostCapabilityFace;
public:
Session(Host* _server, bi::tcp::socket _socket, std::shared_ptr<NodeInfo> const& _n);
Session(Host* _server, bi::tcp::socket _socket, std::shared_ptr<PeerInfo> const& _n);
virtual ~Session();
void start();
@ -80,7 +80,7 @@ public:
void addNote(std::string const& _k, std::string const& _v) { m_info.notes[_k] = _v; }
PeerInfo const& info() const { return m_info; }
PeerSessionInfo const& info() const { return m_info; }
void ensureNodesRequested();
void serviceNodesRequest();
@ -109,10 +109,10 @@ private:
std::array<byte, 65536> m_data; ///< Buffer for ingress packet data.
bytes m_incoming; ///< Read buffer for ingress bytes.
PeerInfo m_info; ///< Dynamic information about this peer.
PeerSessionInfo m_info; ///< Dynamic information about this peer.
unsigned m_protocolVersion = 0; ///< The protocol version of the peer.
std::shared_ptr<NodeInfo> m_node; ///< The NodeInfo object.
std::shared_ptr<PeerInfo> m_peer; ///< The PeerInfo object.
bi::tcp::endpoint m_manualEndpoint; ///< The endpoint as specified by the constructor.
bool m_dropped = false; ///< If true, we've already divested ourselves of this peer. We're just waiting for the reads & writes to fail before the shared_ptr goes OOS and the destructor kicks in.

4
libwebthree/WebThree.cpp

@ -75,7 +75,7 @@ void WebThreeDirect::setNetworkPreferences(p2p::NetworkPreferences const& _n)
startNetwork();
}
std::vector<PeerInfo> WebThreeDirect::peers()
std::vector<PeerSessionInfo> WebThreeDirect::peers()
{
return m_net.peers();
}
@ -102,5 +102,5 @@ void WebThreeDirect::restoreNodes(bytesConstRef _saved)
void WebThreeDirect::connect(std::string const& _seedHost, unsigned short _port)
{
m_net.connect(NodeId(), _seedHost, _port);
m_net.addNode(NodeId(), _seedHost, _port, _port);
}

4
libwebthree/WebThree.h

@ -84,7 +84,7 @@ public:
// Network stuff:
/// Get information on the current peer set.
std::vector<p2p::PeerInfo> peers();
std::vector<p2p::PeerSessionInfo> peers();
/// Same as peers().size(), but more efficient.
size_t peerCount() const;
@ -195,7 +195,7 @@ public:
// Peer network stuff - forward through RPCSlave, probably with P2PNetworkSlave/Master classes like Whisper & Ethereum.
/// Get information on the current peer set.
std::vector<p2p::PeerInfo> peers();
std::vector<p2p::PeerSessionInfo> peers();
/// Same as peers().size(), but more efficient.
size_t peerCount() const;

2
neth/main.cpp

@ -913,7 +913,7 @@ int main(int argc, char** argv)
// Peers
y = 1;
for (PeerInfo const& i: web3.peers())
for (PeerSessionInfo const& i: web3.peers())
{
auto s = boost::format("%1% ms - %2%:%3% - %4%") %
toString(chrono::duration_cast<chrono::milliseconds>(i.lastPing).count()) %

16
test/peer.cpp

@ -49,14 +49,14 @@ int peerTest(int argc, char** argv)
Host ph("Test", NetworkPreferences(listenPort));
if (!remoteHost.empty())
ph.connect(NodeId(), remoteHost, remotePort);
for (int i = 0; ; ++i)
{
this_thread::sleep_for(chrono::milliseconds(100));
if (!(i % 10))
ph.keepAlivePeers();
}
ph.addNode(NodeId(), remoteHost, remotePort, remotePort);
// for (int i = 0; ; ++i)
// {
// this_thread::sleep_for(chrono::milliseconds(100));
// if (!(i % 10))
// ph.keepAlivePeers();
// }
return 0;
}

2
test/whisperTopic.cpp

@ -72,7 +72,7 @@ BOOST_AUTO_TEST_CASE(topic)
this_thread::sleep_for(chrono::milliseconds(500));
ph.start();
this_thread::sleep_for(chrono::milliseconds(500));
ph.connect(NodeId(), "127.0.0.1", 50303);
ph.addNode(NodeId(), "127.0.0.1", 50303, 50303);
KeyPair us = KeyPair::create();
for (int i = 0; i < 10; ++i)

Loading…
Cancel
Save