From 94c09508fd9d7495e91f57c333a56220a9060961 Mon Sep 17 00:00:00 2001 From: subtly Date: Sat, 10 Jan 2015 19:45:20 +0100 Subject: [PATCH] Merging in new data structure for nodes from node-table. End result will be consolidation into NodeId, Node (id and endpoints), NodeEntry (as in table), and Peer (connected node as in host). Rename PeerInfo to PeerSessionInfo. Rename NodeInfo to PeerInfo. PeerSessionInfo which is information about the Peer connection and will be split/merged into Node and PeerInfo. Add node-table callbacks for Host to perform connect node if there are not enough nodes. --- alethzero/MainWin.cpp | 2 +- libp2p/Common.h | 4 +- libp2p/Host.cpp | 342 ++++++++++++++++---------------------- libp2p/Host.h | 88 +++++----- libp2p/HostCapability.cpp | 4 +- libp2p/NodeTable.cpp | 55 +++--- libp2p/NodeTable.h | 44 ++++- libp2p/Session.cpp | 138 +++++++-------- libp2p/Session.h | 10 +- libwebthree/WebThree.cpp | 4 +- libwebthree/WebThree.h | 4 +- neth/main.cpp | 2 +- test/peer.cpp | 16 +- test/whisperTopic.cpp | 2 +- 14 files changed, 354 insertions(+), 361 deletions(-) diff --git a/alethzero/MainWin.cpp b/alethzero/MainWin.cpp index b73a2cd0d..40663db8d 100644 --- a/alethzero/MainWin.cpp +++ b/alethzero/MainWin.cpp @@ -906,7 +906,7 @@ void Main::refreshNetwork() if (web3()->haveNetwork()) { map clients; - for (PeerInfo const& i: ps) + for (PeerSessionInfo const& i: ps) ui->peers->addItem(QString("[%8 %7] %3 ms - %1:%2 - %4 %5 %6") .arg(QString::fromStdString(i.host)) .arg(i.port) diff --git a/libp2p/Common.h b/libp2p/Common.h index d46c5eed1..059e1a64e 100644 --- a/libp2p/Common.h +++ b/libp2p/Common.h @@ -116,7 +116,7 @@ typedef std::pair CapDesc; typedef std::set CapDescSet; typedef std::vector CapDescs; -struct PeerInfo +struct PeerSessionInfo { NodeId id; std::string clientVersion; @@ -128,7 +128,7 @@ struct PeerInfo std::map notes; }; -using PeerInfos = std::vector; +using PeerSessionInfos = std::vector; } } diff --git a/libp2p/Host.cpp b/libp2p/Host.cpp index 5f756f2a5..243738918 100644 --- a/libp2p/Host.cpp +++ b/libp2p/Host.cpp @@ -38,6 +38,13 @@ using namespace std; using namespace dev; using namespace dev::p2p; +HostNodeTableHandler::HostNodeTableHandler(Host& _host): m_host(_host) {} + +void HostNodeTableHandler::processEvent(NodeId _n, NodeTableEventType _e) +{ + m_host.onNodeTableEvent(_n, _e); +} + Host::Host(std::string const& _clientVersion, NetworkPreferences const& _n, bool _start): Worker("p2p", 0), m_clientVersion(_clientVersion), @@ -116,8 +123,8 @@ void Host::doneWorking() for (unsigned n = 0;; n = 0) { { - RecursiveGuard l(x_peers); - for (auto i: m_peers) + RecursiveGuard l(x_sessions); + for (auto i: m_sessions) if (auto p = i.second.lock()) if (p->isOpen()) { @@ -139,8 +146,8 @@ void Host::doneWorking() m_ioService.reset(); // finally, clear out peers (in case they're lingering) - RecursiveGuard l(x_peers); - m_peers.clear(); + RecursiveGuard l(x_sessions); + m_sessions.clear(); } unsigned Host::protocolVersion() const @@ -150,12 +157,12 @@ unsigned Host::protocolVersion() const void Host::registerPeer(std::shared_ptr _s, CapDescs const& _caps) { - assert(!!_s->m_node); - assert(!!_s->m_node->id); + assert(!!_s->m_peer); + assert(!!_s->m_peer->id); { - RecursiveGuard l(x_peers); - m_peers[_s->m_node->id] = _s; + RecursiveGuard l(x_sessions); + m_sessions[_s->m_peer->id] = _s; } unsigned o = (unsigned)UserPacket; for (auto const& i: _caps) @@ -166,6 +173,34 @@ void Host::registerPeer(std::shared_ptr _s, CapDescs const& _caps) } } +void Host::onNodeTableEvent(NodeId _n, NodeTableEventType _e) +{ + if (_e == NodeEntryAdded) + { + auto n = (*m_nodeTable)[_n]; + if (n) + { + RecursiveGuard l(x_sessions); + auto p = m_peers[_n]; + if (!p) + { + m_peers[_n] = make_shared(); + p = m_peers[_n]; + p->id = _n; + } + p->address = n.endpoint.tcp; + + if (peerCount() < m_idealPeerCount) + connect(p); + } + } + else if (_e == NodeEntryRemoved) + { + RecursiveGuard l(x_sessions); + m_peers.erase(_n); + } +} + void Host::seal(bytes& _b) { _b[0] = 0x22; @@ -179,80 +214,6 @@ void Host::seal(bytes& _b) _b[7] = len & 0xff; } -// TODO: P2P port to NodeTable. (see noteNode calls, Session.cpp) -//shared_ptr Host::noteNode(NodeId _id, bi::tcp::endpoint _a) -//{ -// RecursiveGuard l(x_peers); -// if (_a.port() < 30300 || _a.port() > 30305) -// cwarn << "Weird port being recorded: " << _a.port(); -// -// if (_a.port() >= /*49152*/32768) -// { -// cwarn << "Private port being recorded - setting to 0"; -// _a = bi::tcp::endpoint(_a.address(), 0); -// } -// -// unsigned i; -// if (!m_nodes.count(_id)) -// { -// i = m_nodesList.size(); -// m_nodesList.push_back(_id); -// m_nodes[_id] = make_shared(); -// m_nodes[_id]->id = _id; -// m_nodes[_id]->index = i; -// } -// else -// i = m_nodes[_id]->index; -// m_nodes[_id]->address = _a; -// m_private.extendAll(i); -// if (!_a.port() || (isPrivateAddress(_a.address()) && !m_netPrefs.localNetworking)) -// m_private += i; -// else -// m_private -= i; -// -// return m_nodes[_id]; -//} - -// TODO: P2P base on target -// TODO: P2P store caps in NodeTable/NodeEntry -//Nodes Host::potentialPeers(RangeMask const& _known) -//{ -// RecursiveGuard l(x_peers); -// Nodes ret; -// -// // todo: if localnetworking is enabled it should only share peers if remote -// // is within the same network as our interfaces. -// // this requires flagging nodes when we receive them as to if they're on private network -// auto ns = (m_netPrefs.localNetworking ? _known : (m_private + _known)).inverted(); -// for (auto i: ns) -// ret.push_back(*m_nodes[m_nodesList[i]]); -// return ret; -//} - -KeyPair Host::getHostIdentifier() -{ - static string s_file(getDataDir() + "/host"); - static mutex s_x; - lock_guard l(s_x); - - h256 secret; - bytes b = contents(s_file); - if (b.size() == 32) - memcpy(secret.data(), b.data(), 32); - else - { - // todo: replace w/user entropy; abstract to devcrypto - std::mt19937_64 s_eng(time(0) + chrono::high_resolution_clock::now().time_since_epoch().count()); - std::uniform_int_distribution d(0, 255); - for (unsigned i = 0; i < 32; ++i) - secret[i] = (byte)d(s_eng); - } - - if (!secret) - BOOST_THROW_EXCEPTION(crypto::InvalidState()); - return move(KeyPair(move(secret))); -} - void Host::determinePublic(string const& _publicAddress, bool _upnp) { m_peerAddresses.clear(); @@ -324,15 +285,16 @@ void Host::runAcceptor() { clog(NetConnect) << "Listening on local port " << m_listenPort << " (public: " << m_tcpPublic << ")"; m_accepting = true; - m_socket.reset(new bi::tcp::socket(m_ioService)); - m_tcp4Acceptor.async_accept(*m_socket, [=](boost::system::error_code ec) + + bi::tcp::socket* s = new bi::tcp::socket(m_ioService); + m_tcp4Acceptor.async_accept(*s, [=](boost::system::error_code ec) { bool success = false; if (!ec) { try { - doHandshake(m_socket.release()); + doHandshake(s); success = true; } catch (Exception const& _e) @@ -345,27 +307,29 @@ void Host::runAcceptor() } } - if (!success && m_socket->is_open()) + if (!success && s->is_open()) { boost::system::error_code ec; - m_socket->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec); - m_socket->close(); + s->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec); + s->close(); } m_accepting = false; + delete s; + if (ec.value() < 1) runAcceptor(); }); } } -void Host::doHandshake(bi::tcp::socket* _socket, NodeId _egressNodeId) +void Host::doHandshake(bi::tcp::socket* _socket, NodeId _nodeId) { try { clog(NetConnect) << "Accepting connection for " << _socket->remote_endpoint(); } catch (...){} - auto p = std::make_shared(this, std::move(*_socket), m_nodes[_egressNodeId]); + auto p = std::make_shared(this, std::move(*_socket), m_peers[_nodeId]); p->start(); } @@ -378,6 +342,15 @@ string Host::pocHost() void Host::addNode(NodeId const& _node, std::string const& _addr, unsigned short _tcpPeerPort, unsigned short _udpNodePort) { + if (_tcpPeerPort < 30300 || _tcpPeerPort > 30305) + cwarn << "Weird port being recorded: " << _tcpPeerPort; + + if (_tcpPeerPort >= /*49152*/32768) + { + cwarn << "Private port being recorded - setting to 0"; + _tcpPeerPort = 0; + } + boost::system::error_code ec; bi::address addr = bi::address::from_string(_addr, ec); if (ec) @@ -394,59 +367,25 @@ void Host::addNode(NodeId const& _node, std::string const& _addr, unsigned short addNode(Node(_node, NodeIPEndpoint(bi::udp::endpoint(addr, _udpNodePort), bi::tcp::endpoint(addr, _tcpPeerPort)))); } -void Host::connect(NodeId const& _node, std::string const& _addr, unsigned short _peerPort, unsigned short _nodePort) noexcept +void Host::connect(std::shared_ptr const& _n) { if (!m_run) return; - assert(_node); - - auto n = (*m_nodeTable)[_node]; + if (havePeerSession(_n->id)) + { + clog(NetWarn) << "Aborted connect. Node already connected."; + return; + } - // TODO: refactor into async_resolve - m_ioService.post([=]() + if (!m_nodeTable->haveNode(_n->id)) { - for (auto first: {true, false}) - { - try - { - bi::tcp::endpoint ep; - if (first) - { - bi::tcp::resolver r(m_ioService); - ep = r.resolve({_addr, toString(_peerPort)})->endpoint(); - } - else - ep = bi::tcp::endpoint(bi::address::from_string(_addr), _peerPort); - - if (!n) - m_nodes[_node] = make_shared(); - m_nodes[_node]->id = _node; - m_nodes[_node]->address = ep; - connect(m_nodes[_node]); - break; - } - catch (Exception const& _e) - { - // Couldn't connect - clog(NetConnect) << "Bad host " << _addr << "\n" << diagnostic_information(_e); - } - catch (exception const& e) - { - // Couldn't connect - clog(NetConnect) << "Bad host " << _addr << " (" << e.what() << ")"; - } - } - }); -} - -void Host::connect(std::shared_ptr const& _n) -{ - if (!m_run) + clog(NetWarn) << "Aborted connect. Node not in node table."; return; + } // prevent concurrently connecting to a node - NodeInfo *nptr = _n.get(); + PeerInfo *nptr = _n.get(); { Guard l(x_pendingNodeConns); if (m_pendingNodeConns.count(nptr)) @@ -457,48 +396,32 @@ void Host::connect(std::shared_ptr const& _n) clog(NetConnect) << "Attempting connection to node" << _n->id.abridged() << "@" << _n->address << "from" << id().abridged(); _n->lastAttempted = std::chrono::system_clock::now(); _n->failedAttempts++; + bi::tcp::socket* s = new bi::tcp::socket(m_ioService); - - auto n = node(_n->id); - if (n) - s->async_connect(_n->address, [=](boost::system::error_code const& ec) + s->async_connect(_n->address, [=](boost::system::error_code const& ec) + { + if (ec) { - if (ec) - { - clog(NetConnect) << "Connection refused to node" << _n->id.abridged() << "@" << _n->address << "(" << ec.message() << ")"; - _n->lastDisconnect = TCPError; - _n->lastAttempted = std::chrono::system_clock::now(); - } - else - { - clog(NetConnect) << "Connected to" << _n->id.abridged() << "@" << _n->address; - _n->lastConnected = std::chrono::system_clock::now(); - auto p = make_shared(this, std::move(*s), n); - p->start(); - } - delete s; - Guard l(x_pendingNodeConns); - m_pendingNodeConns.erase(nptr); - }); - else - clog(NetWarn) << "Aborted connect. Node not in node table."; -} - -bool Host::havePeer(NodeId _id) const -{ - RecursiveGuard l(x_peers); - - // Remove dead peers from list. - for (auto i = m_peers.begin(); i != m_peers.end();) - if (i->second.lock().get()) - ++i; + clog(NetConnect) << "Connection refused to node" << _n->id.abridged() << "@" << _n->address << "(" << ec.message() << ")"; + _n->lastDisconnect = TCPError; + _n->lastAttempted = std::chrono::system_clock::now(); + } else - i = m_peers.erase(i); - - return !!m_peers.count(_id); + { + clog(NetConnect) << "Connected to" << _n->id.abridged() << "@" << _n->address; + + _n->lastConnected = std::chrono::system_clock::now(); + auto p = make_shared(this, std::move(*s), _n); + p->start(); + + } + delete s; + Guard l(x_pendingNodeConns); + m_pendingNodeConns.erase(nptr); + }); } -unsigned NodeInfo::fallbackSeconds() const +unsigned PeerInfo::fallbackSeconds() const { switch (lastDisconnect) { @@ -524,27 +447,27 @@ unsigned NodeInfo::fallbackSeconds() const // TODO: P2P migrate grow/prunePeers into 'maintainPeers' & evaluate reputation instead of availability. schedule via deadline timer. //void Host::growPeers() //{ -// RecursiveGuard l(x_peers); -// int morePeers = (int)m_idealPeerCount - m_peers.size(); +// RecursiveGuard l(x_sessions); +// int morePeers = (int)m_idealPeerCount - m_sessions.size(); // if (morePeers > 0) // { // auto toTry = m_ready; // if (!m_netPrefs.localNetworking) // toTry -= m_private; -// set ns; +// set ns; // for (auto i: toTry) // if (m_nodes[m_nodesList[i]]->shouldReconnect()) // ns.insert(*m_nodes[m_nodesList[i]]); // // if (ns.size()) -// for (NodeInfo const& i: ns) +// for (PeerInfo const& i: ns) // { // connect(m_nodes[i.id]); // if (!--morePeers) // return; // } // else -// for (auto const& i: m_peers) +// for (auto const& i: m_sessions) // if (auto p = i.second.lock()) // p->ensureNodesRequested(); // } @@ -552,17 +475,17 @@ unsigned NodeInfo::fallbackSeconds() const // //void Host::prunePeers() //{ -// RecursiveGuard l(x_peers); +// RecursiveGuard l(x_sessions); // // We'll keep at most twice as many as is ideal, halfing what counts as "too young to kill" until we get there. // set dc; -// for (unsigned old = 15000; m_peers.size() - dc.size() > m_idealPeerCount * 2 && old > 100; old /= 2) -// if (m_peers.size() - dc.size() > m_idealPeerCount) +// for (unsigned old = 15000; m_sessions.size() - dc.size() > m_idealPeerCount * 2 && old > 100; old /= 2) +// if (m_sessions.size() - dc.size() > m_idealPeerCount) // { // // look for worst peer to kick off // // first work out how many are old enough to kick off. // shared_ptr worst; // unsigned agedPeers = 0; -// for (auto i: m_peers) +// for (auto i: m_sessions) // if (!dc.count(i.first)) // if (auto p = i.second.lock()) // if (/*(m_mode != NodeMode::Host || p->m_caps != 0x01) &&*/ chrono::steady_clock::now() > p->m_connect + chrono::milliseconds(old)) // don't throw off new peers; peer-servers should never kick off other peer-servers. @@ -578,21 +501,21 @@ unsigned NodeInfo::fallbackSeconds() const // } // // // Remove dead peers from list. -// for (auto i = m_peers.begin(); i != m_peers.end();) +// for (auto i = m_sessions.begin(); i != m_sessions.end();) // if (i->second.lock().get()) // ++i; // else -// i = m_peers.erase(i); +// i = m_sessions.erase(i); //} -PeerInfos Host::peers() const +PeerSessionInfos Host::peers() const { if (!m_run) - return PeerInfos(); + return PeerSessionInfos(); - std::vector ret; - RecursiveGuard l(x_peers); - for (auto& i: m_peers) + std::vector ret; + RecursiveGuard l(x_sessions); + for (auto& i: m_sessions) if (auto j = i.second.lock()) if (j->m_socket.is_open()) ret.push_back(j->m_info); @@ -615,7 +538,7 @@ void Host::run(boost::system::error_code const&) return; } - for (auto p: m_peers) + for (auto p: m_sessions) if (auto pp = p.second.lock()) pp->serviceNodesRequest(); @@ -658,7 +581,8 @@ void Host::startedWorking() runAcceptor(); if (!m_tcpPublic.address().is_unspecified()) - m_nodeTable.reset(new NodeTable(m_ioService, m_key, m_listenPort, m_tcpPublic)); + // TODO: add m_tcpPublic endpoint; sort out endpoint stuff for nodetable + m_nodeTable.reset(new NodeTable(m_ioService, m_key, m_listenPort)); else m_nodeTable.reset(new NodeTable(m_ioService, m_key, m_listenPort > 0 ? m_listenPort : 30303)); } @@ -676,8 +600,8 @@ void Host::doWork() void Host::keepAlivePeers() { - RecursiveGuard l(x_peers); - for (auto p: m_peers) + RecursiveGuard l(x_sessions); + for (auto p: m_sessions) if (auto pp = p.second.lock()) { if (chrono::steady_clock::now() - pp->m_lastReceived >= chrono::seconds(60)) @@ -694,10 +618,10 @@ bytes Host::saveNodes() const RLPStream nodes; int count = 0; { - RecursiveGuard l(x_peers); - for (auto const& i: m_nodes) + RecursiveGuard l(x_sessions); + for (auto const& i: m_peers) { - NodeInfo const& n = *(i.second); + PeerInfo const& n = *(i.second); // TODO: PoC-7: Figure out why it ever shares these ports.//n.address.port() >= 30300 && n.address.port() <= 30305 && if (!n.dead && chrono::system_clock::now() - n.lastConnected < chrono::seconds(3600 * 48) && n.address.port() > 0 && n.address.port() < /*49152*/32768 && n.id != id() && !isPrivateAddress(n.address.address())) { @@ -722,7 +646,7 @@ bytes Host::saveNodes() const void Host::restoreNodes(bytesConstRef _b) { - RecursiveGuard l(x_peers); + RecursiveGuard l(x_sessions); RLP r(_b); if (r.itemCount() > 0 && r[0].isInt()) switch (r[0].toInt()) @@ -740,7 +664,7 @@ void Host::restoreNodes(bytesConstRef _b) else ep = bi::tcp::endpoint(bi::address_v6(i[0].toArray()), i[1].toInt()); auto id = (NodeId)i[2]; - if (!m_nodes.count(id)) + if (!m_peers.count(id)) { //// auto o = (Origin)i[3].toInt(); // auto n = noteNode(id, ep); @@ -759,7 +683,7 @@ void Host::restoreNodes(bytesConstRef _b) for (auto i: r) { auto id = (NodeId)i[2]; - if (!m_nodes.count(id)) + if (!m_peers.count(id)) { bi::tcp::endpoint ep; if (i[0].itemCount() == 4) @@ -770,3 +694,27 @@ void Host::restoreNodes(bytesConstRef _b) } } } + +KeyPair Host::getHostIdentifier() +{ + static string s_file(getDataDir() + "/host"); + static mutex s_x; + lock_guard l(s_x); + + h256 secret; + bytes b = contents(s_file); + if (b.size() == 32) + memcpy(secret.data(), b.data(), 32); + else + { + // todo: replace w/user entropy; abstract to devcrypto + std::mt19937_64 s_eng(time(0) + chrono::high_resolution_clock::now().time_since_epoch().count()); + std::uniform_int_distribution d(0, 255); + for (unsigned i = 0; i < 32; ++i) + secret[i] = (byte)d(s_eng); + } + + if (!secret) + BOOST_THROW_EXCEPTION(crypto::InvalidState()); + return move(KeyPair(move(secret))); +} diff --git a/libp2p/Host.h b/libp2p/Host.h index 5892b5b0d..097b07433 100644 --- a/libp2p/Host.h +++ b/libp2p/Host.h @@ -51,7 +51,7 @@ namespace p2p class Host; -struct NodeInfo +struct PeerInfo { NodeId id; ///< Their id/public key. unsigned index; ///< Index into m_nodesList @@ -88,7 +88,7 @@ struct NodeInfo bool isOffline() const { return lastAttempted > lastConnected; } // p2p: Remove (in favor of lru eviction and sub-protocol ratings). - bool operator<(NodeInfo const& _n) const + bool operator<(PeerInfo const& _n) const { if (isOffline() != _n.isOffline()) return isOffline(); @@ -108,19 +108,30 @@ struct NodeInfo } }; -using Nodes = std::vector; +using Nodes = std::vector; + +class Host; +class HostNodeTableHandler: public NodeTableEventHandler +{ + HostNodeTableHandler(Host& _host); + virtual void processEvent(NodeId _n, NodeTableEventType _e); + Host& m_host; +}; + /** * @brief The Host class * Capabilities should be registered prior to startNetwork, since m_capabilities is not thread-safe. + * @todo gracefully disconnect peer if peer already connected * @todo determinePublic: ipv6, udp * @todo handle conflict if addNode/requireNode called and Node already exists w/conflicting tcp or udp port */ class Host: public Worker { + friend class HostNodeTableHandler; friend class Session; friend class HostCapabilityFace; - friend struct NodeInfo; + friend struct PeerInfo; public: /// Start server, listening for connections on the given port. @@ -142,31 +153,19 @@ public: CapDescs caps() const { CapDescs ret; for (auto const& i: m_capabilities) ret.push_back(i.first); return ret; } template std::shared_ptr cap() const { try { return std::static_pointer_cast(m_capabilities.at(std::make_pair(T::staticName(), T::staticVersion()))); } catch (...) { return nullptr; } } - /// Manually add node. - void addNode(NodeId const& _node, std::string const& _addr, unsigned short _tcpPort = 30303, unsigned short _udpPort = 30303); - - /// Connect to a peer explicitly. - void connect(NodeId const& _node, std::string const& _addr, unsigned short _tcpPort = 30303, unsigned short _udpPort = 30303) noexcept; - void connect(NodeId const& _node, bi::tcp::endpoint const& _ep); - void connect(std::shared_ptr const& _n); - - /// @returns true iff we have a peer of the given id. - bool havePeer(NodeId _id) const; - + bool havePeerSession(NodeId _id) { RecursiveGuard l(x_sessions); if (m_sessions.count(_id)) return !!m_sessions[_id].lock(); else return false; } + + /// Add node. + void addNode(NodeId const& _node, std::string const& _addr, unsigned short _tcpPort, unsigned short _udpPort); + /// Set ideal number of peers. void setIdealPeerCount(unsigned _n) { m_idealPeerCount = _n; } - - /// p2p: template? - void setIdealPeerCount(HostCapabilityFace* _cap, unsigned _n) { m_capIdealPeerCount[_cap->capDesc()] = _n; } /// Get peer information. - PeerInfos peers() const; + PeerSessionInfos peers() const; /// Get number of peers connected; equivalent to, but faster than, peers().size(). - size_t peerCount() const { RecursiveGuard l(x_peers); return m_peers.size(); } - - /// Ping the peers to update the latency information and disconnect peers which have timed out. - void keepAlivePeers(); + size_t peerCount() const { RecursiveGuard l(x_sessions); return m_peers.size(); } /// Get the port we're listening on currently. unsigned short listenPort() const { return m_tcpPublic.port(); } @@ -177,7 +176,8 @@ public: /// Deserialise the data and populate the set of known peers. void restoreNodes(bytesConstRef _b); - Nodes nodes() const { RecursiveGuard l(x_peers); Nodes ret; for (auto const& i: m_nodes) ret.push_back(*i.second); return ret; } + // TODO: P2P this should be combined with peers into a HostStat object of some kind; coalesce data, as it's only used for status information. + Nodes nodes() const { RecursiveGuard l(x_sessions); Nodes ret; for (auto const& i: m_peers) ret.push_back(*i.second); return ret; } void setNetworkPreferences(NetworkPreferences const& _p) { auto had = isStarted(); if (had) stop(); m_netPrefs = _p; if (had) start(); } @@ -195,14 +195,20 @@ public: void registerPeer(std::shared_ptr _s, CapDescs const& _caps); - std::shared_ptr node(NodeId _id) const { if (m_nodes.count(_id)) return m_nodes.at(_id); return std::shared_ptr(); } +// std::shared_ptr node(NodeId _id) const { if (m_nodes.count(_id)) return m_nodes.at(_id); return std::shared_ptr(); } + +protected: + void onNodeTableEvent(NodeId _n, NodeTableEventType _e); private: - KeyPair getHostIdentifier(); - /// Populate m_peerAddresses with available public addresses. void determinePublic(std::string const& _publicAddress, bool _upnp); + void connect(std::shared_ptr const& _n); + + /// Ping the peers to update the latency information and disconnect peers which have timed out. + void keepAlivePeers(); + /// Called only from startedWorking(). void runAcceptor(); @@ -223,9 +229,10 @@ private: virtual void doneWorking(); /// Add node - void addNode(Node const& _nodeInfo) { m_nodeTable->addNode(_nodeInfo); } + void addNode(Node const& _node) { m_nodeTable->addNode(_node); } -// Nodes potentialPeers(RangeMask const& _known); + /// Get or create host identifier (KeyPair). + KeyPair getHostIdentifier(); bool m_run = false; ///< Whether network is running. std::mutex x_runTimer; ///< Start/stop mutex. @@ -241,33 +248,24 @@ private: ba::io_service m_ioService; ///< IOService for network stuff. bi::tcp::acceptor m_tcp4Acceptor; ///< Listening acceptor. - std::unique_ptr m_socket; ///< Listening socket. std::unique_ptr m_timer; ///< Timer which, when network is running, calls scheduler() every c_timerInterval ms. static const unsigned c_timerInterval = 100; ///< Interval which m_timer is run when network is connected. - std::set m_pendingNodeConns; /// Used only by connect(NodeInfo&) to limit concurrently connecting to same node. See connect(shared_ptrconst&). + std::set m_pendingNodeConns; /// Used only by connect(PeerInfo&) to limit concurrently connecting to same node. See connect(shared_ptrconst&). Mutex x_pendingNodeConns; bi::tcp::endpoint m_tcpPublic; ///< Our public listening endpoint. KeyPair m_key; ///< Our unique ID. std::shared_ptr m_nodeTable; ///< Node table (uses kademlia-like discovery). - std::map m_capIdealPeerCount; ///< Ideal peer count for capability. - - mutable RecursiveMutex x_peers; + std::map> m_peers; + + mutable RecursiveMutex x_sessions; + /// The nodes to which we are currently connected. /// Mutable because we flush zombie entries (null-weakptrs) as regular maintenance from a const method. - mutable std::map> m_peers; - - /// Nodes to which we may connect (or to which we have connected). - /// TODO: mutex; replace with nodeTable - std::map > m_nodes; - -// /// A list of node IDs. This contains every index from m_nodes; the order is guaranteed to remain the same. -// std::vector m_nodesList; - -// RangeMask m_private; ///< Indices into m_nodesList over to which nodes are private. + mutable std::map> m_sessions; unsigned m_idealPeerCount = 5; ///< Ideal number of peers to be connected to. @@ -280,6 +278,6 @@ private: bool m_accepting = false; }; - + } } diff --git a/libp2p/HostCapability.cpp b/libp2p/HostCapability.cpp index 0728bef2c..8ff74d3b6 100644 --- a/libp2p/HostCapability.cpp +++ b/libp2p/HostCapability.cpp @@ -34,9 +34,9 @@ void HostCapabilityFace::seal(bytes& _b) std::vector > HostCapabilityFace::peers() const { - RecursiveGuard l(m_host->x_peers); + RecursiveGuard l(m_host->x_sessions); std::vector > ret; - for (auto const& i: m_host->m_peers) + for (auto const& i: m_host->m_sessions) if (std::shared_ptr p = i.second.lock()) if (p->m_capabilities.count(capDesc())) ret.push_back(p); diff --git a/libp2p/NodeTable.cpp b/libp2p/NodeTable.cpp index 1b74abbda..b384c42d2 100644 --- a/libp2p/NodeTable.cpp +++ b/libp2p/NodeTable.cpp @@ -27,7 +27,7 @@ using namespace dev::p2p; NodeEntry::NodeEntry(Node _src, Public _pubk, NodeIPEndpoint _gw): Node(_pubk, _gw), distance(NodeTable::dist(_src.id,_pubk)) {} NodeEntry::NodeEntry(Node _src, Public _pubk, bi::udp::endpoint _udp): Node(_pubk, NodeIPEndpoint(_udp)), distance(NodeTable::dist(_src.id,_pubk)) {} -NodeTable::NodeTable(ba::io_service& _io, KeyPair _alias, uint16_t _udp, bi::tcp::endpoint _ep): +NodeTable::NodeTable(ba::io_service& _io, KeyPair _alias, uint16_t _udp): m_node(Node(_alias.pub(), bi::udp::endpoint())), m_secret(_alias.sec()), m_io(_io), @@ -53,11 +53,32 @@ NodeTable::~NodeTable() m_socketPtr->disconnect(); } +shared_ptr NodeTable::addNode(Public const& _pubk, bi::udp::endpoint const& _udp, bi::tcp::endpoint const& _tcp) +{ + auto node = Node(_pubk, NodeIPEndpoint(_udp, _tcp)); + return move(addNode(node)); +} + +shared_ptr NodeTable::addNode(Node const& _node) +{ + Guard l(x_nodes); + shared_ptr ret = m_nodes[_node.id]; + if (!ret) + { + ret.reset(new NodeEntry(m_node, _node.id, NodeIPEndpoint(_node.endpoint.udp, _node.endpoint.tcp))); + m_nodes[_node.id] = ret; + PingNode p(_node.endpoint.udp, m_node.endpoint.udp.address().to_string(), m_node.endpoint.udp.port()); + p.sign(m_secret); + m_socketPtr->send(p); + } + return move(ret); +} + void NodeTable::join() { doFindNode(m_node.id); } - + list NodeTable::nodes() const { list nodes; @@ -84,6 +105,13 @@ Node NodeTable::operator[](NodeId _id) return !!n ? *n : Node(); } +shared_ptr NodeTable::getNodeEntry(NodeId _id) +{ + Guard l(x_nodes); + auto n = m_nodes[_id]; + return !!n ? move(n) : move(shared_ptr()); +} + void NodeTable::requestNeighbours(NodeEntry const& _node, NodeId _target) const { FindNode p(_node.endpoint.udp, _target); @@ -240,29 +268,6 @@ void NodeTable::evict(shared_ptr _leastSeen, shared_ptr _n ping(_leastSeen.get()); } -shared_ptr NodeTable::addNode(Public const& _pubk, bi::udp::endpoint const& _udp, bi::tcp::endpoint const& _tcp) -{ - auto node = Node(_pubk, NodeIPEndpoint(_udp, _tcp)); - return move(addNode(node)); -} - -shared_ptr NodeTable::addNode(Node const& _node) -{ - shared_ptr ret; - Guard l(x_nodes); - if (auto n = m_nodes[_node.id]) - ret = n; - else - { - ret.reset(new NodeEntry(m_node, _node.id, NodeIPEndpoint(_node.endpoint.udp, _node.endpoint.tcp))); - m_nodes[_node.id] = ret; - PingNode p(_node.endpoint.udp, m_node.endpoint.udp.address().to_string(), m_node.endpoint.udp.port()); - p.sign(m_secret); - m_socketPtr->send(p); - } - return move(ret); -} - void NodeTable::noteNode(Public const& _pubk, bi::udp::endpoint const& _endpoint) { if (_pubk == m_node.address()) diff --git a/libp2p/NodeTable.h b/libp2p/NodeTable.h index 79ce6a9e3..7f75bf27e 100644 --- a/libp2p/NodeTable.h +++ b/libp2p/NodeTable.h @@ -22,6 +22,7 @@ #pragma once #include +#include #include #include #include @@ -74,14 +75,37 @@ struct Node */ struct NodeEntry: public Node { - NodeEntry(Node _src, Public _pubk, NodeIPEndpoint _gw); //: Node(_pubk, _gw), distance(dist(_src.id,_pubk)) {} - NodeEntry(Node _src, Public _pubk, bi::udp::endpoint _udp); //: Node(_pubk, NodeIPEndpoint(_udp)), distance(dist(_src.id,_pubk)) {} + NodeEntry(Node _src, Public _pubk, NodeIPEndpoint _gw); + NodeEntry(Node _src, Public _pubk, bi::udp::endpoint _udp); - const unsigned distance; ///< Node's distance from _src (see constructor). + const unsigned distance; ///< Node's distance (xor of _src as integer). +}; + +enum NodeTableEventType { + NodeEntryAdded, + NodeEntryRemoved +}; +class NodeTable; +class NodeTableEventHandler +{ + friend class NodeTable; +public: + virtual void processEvent(NodeId _n, NodeTableEventType _e) =0; + +protected: + /// Called by NodeTable on behalf of an implementation (Host) to process new events without blocking nodetable. + void processEvents() { std::list> events; { Guard l(x_events); if (!m_nodeEvents.size()) return; m_nodeEvents.unique(); for (auto const& n: m_nodeEvents) events.push_back(std::make_pair(n,m_events[n])); m_nodeEvents.empty(); m_events.empty(); } for (auto const& e: events) processEvent(e.first, e.second); } + + /// Called by NodeTable to append event. + virtual void appendEvent(NodeId _n, NodeTableEventType _e) { Guard l(x_events); m_nodeEvents.push_back(_n); m_events[_n] = _e; } + + Mutex x_events; + std::list m_nodeEvents; + std::map m_events; }; /** - * NodeTable using S/Kademlia system for node discovery and preference. + * NodeTable using modified kademlia for node discovery and preference. * untouched buckets are refreshed if they have not been touched within an hour * * Thread-safety is ensured by modifying NodeEntry details via @@ -122,7 +146,7 @@ class NodeTable: UDPSocketEvents, public std::enable_shared_from_this using EvictionTimeout = std::pair,NodeId>; ///< First NodeId may be evicted and replaced with second NodeId. public: - NodeTable(ba::io_service& _io, KeyPair _alias, uint16_t _udpPort = 30303, bi::tcp::endpoint _ep = bi::tcp::endpoint()); + NodeTable(ba::io_service& _io, KeyPair _alias, uint16_t _udpPort = 30303); ~NodeTable(); /// Constants for Kademlia, mostly derived from address space. @@ -145,6 +169,12 @@ public: static unsigned dist(NodeId const& _a, NodeId const& _b) { u512 d = _a ^ _b; unsigned ret; for (ret = 0; d >>= 1; ++ret) {}; return ret; } + /// Set event handler for NodeEntryAdded and NodeEntryRemoved events. + void setEventHandler(NodeTableEventHandler* _handler) { m_nodeEvents.reset(_handler); } + + /// Called by implementation which provided handler to process NodeEntryAdded/NodeEntryRemoved events. Events are coalesced by type whereby old events are ignored. + void processEvents() { if (m_nodeEvents) m_nodeEvents->processEvents(); } + /// Add node. Node will be pinged if it's not already known. std::shared_ptr addNode(Public const& _pubk, bi::udp::endpoint const& _udp, bi::tcp::endpoint const& _tcp = bi::tcp::endpoint()); @@ -157,7 +187,9 @@ public: std::list nodes() const; std::list state() const; + bool haveNode(NodeId _id) { Guard l(x_nodes); return !!m_nodes[_id]; } Node operator[](NodeId _id); + std::shared_ptr getNodeEntry(NodeId _id); protected: struct NodeBucket @@ -207,6 +239,8 @@ protected: /// Sends FindNeighbor packet. See doFindNode. void requestNeighbours(NodeEntry const& _node, NodeId _target) const; + std::unique_ptr m_nodeEvents; ///< Event handler for node events. + Node m_node; ///< This node. Secret m_secret; ///< This nodes secret key. diff --git a/libp2p/Session.cpp b/libp2p/Session.cpp index c6797b69c..0ef06ab50 100644 --- a/libp2p/Session.cpp +++ b/libp2p/Session.cpp @@ -36,21 +36,21 @@ using namespace dev::p2p; #endif #define clogS(X) dev::LogOutputStream(false) << "| " << std::setw(2) << m_socket.native_handle() << "] " -Session::Session(Host* _s, bi::tcp::socket _socket, std::shared_ptr const& _n): +Session::Session(Host* _s, bi::tcp::socket _socket, std::shared_ptr const& _n): m_server(_s), m_socket(std::move(_socket)), - m_node(_n), + m_info({m_peer->id, "?", _n->address.address().to_string(), _n->address.port(), std::chrono::steady_clock::duration(0), CapDescSet(), 0, map()}), + m_peer(_n), m_manualEndpoint(_n->address) { m_lastReceived = m_connect = std::chrono::steady_clock::now(); - m_info = PeerInfo({m_node->id, "?", _n->address.address().to_string(), _n->address.port(), std::chrono::steady_clock::duration(0), CapDescSet(), 0, map()}); } Session::~Session() { // TODO: P2P revisit (refactored from previous logic) - if (m_node && !(id() && !isPermanentProblem(m_node->lastDisconnect) && !m_node->dead)) - m_node->lastConnected = m_node->lastAttempted - chrono::seconds(1); + if (m_peer && !(id() && !isPermanentProblem(m_peer->lastDisconnect) && !m_peer->dead)) + m_peer->lastConnected = m_peer->lastAttempted - chrono::seconds(1); // Read-chain finished for one reason or another. for (auto& i: m_capabilities) @@ -70,35 +70,35 @@ Session::~Session() NodeId Session::id() const { - return m_node ? m_node->id : NodeId(); + return m_peer ? m_peer->id : NodeId(); } void Session::addRating(unsigned _r) { - if (m_node) + if (m_peer) { - m_node->rating += _r; - m_node->score += _r; + m_peer->rating += _r; + m_peer->score += _r; } } int Session::rating() const { - return m_node->rating; + return m_peer->rating; } // TODO: P2P integration: session->? should be unavailable when socket isn't open bi::tcp::endpoint Session::endpoint() const { - if (m_socket.is_open() && m_node) + if (m_socket.is_open() && m_peer) try { - return bi::tcp::endpoint(m_socket.remote_endpoint().address(), m_node->address.port()); + return bi::tcp::endpoint(m_socket.remote_endpoint().address(), m_peer->address.port()); } catch (...) {} - if (m_node) - return m_node->address; + if (m_peer) + return m_peer->address; return m_manualEndpoint; } @@ -195,10 +195,11 @@ bool Session::interpret(RLP const& _r) return true; } - assert(!!m_node); - assert(!!m_node->id); + assert(!!m_peer); + assert(!!m_peer->id); - if (m_server->havePeer(id)) + // TODO: P2P ensure disabled logic is covered + if (false /* m_server->havePeer(id) */) { // Already connected. clogS(NetWarn) << "Already connected to a peer with id" << id.abridged(); @@ -217,20 +218,28 @@ bool Session::interpret(RLP const& _r) // TODO: P2P Move all node-lifecycle information into Host. Determine best way to handle peer-lifecycle properties vs node lifecycle. // TODO: P2P remove oldid // TODO: P2P with encrypted transport the handshake will fail and we won't get here -// m_node = m_server->noteNode(m_node->id, bi::tcp::endpoint(m_socket.remote_endpoint().address(), listenPort)); - if (m_node->isOffline()) - m_node->lastConnected = chrono::system_clock::now(); +// m_peer = m_server->noteNode(m_peer->id, bi::tcp::endpoint(m_socket.remote_endpoint().address(), listenPort)); + if (m_peer->isOffline()) + m_peer->lastConnected = chrono::system_clock::now(); // // // TODO: P2P introduce map of nodes we've given to this node (if GetPeers/Peers stays in TCP) - m_knownNodes.extendAll(m_node->index); - m_knownNodes.unionWith(m_node->index); + m_knownNodes.extendAll(m_peer->index); + m_knownNodes.unionWith(m_peer->index); if (m_protocolVersion != m_server->protocolVersion()) { disconnect(IncompatibleProtocol); return true; } - m_info = PeerInfo({id, clientVersion, m_socket.remote_endpoint().address().to_string(), listenPort, std::chrono::steady_clock::duration(), _r[3].toSet(), (unsigned)m_socket.native_handle(), map() }); + + // TODO: P2P migrate auth to Host and Handshake to constructor + m_info.clientVersion = clientVersion; + m_info.host = m_socket.remote_endpoint().address().to_string(); + m_info.port = listenPort; + m_info.lastPing = std::chrono::steady_clock::duration(); + m_info.caps = _r[3].toSet(); + m_info.socket = (unsigned)m_socket.native_handle(); + m_info.notes = map(); m_server->registerPeer(shared_from_this(), caps); break; @@ -285,64 +294,63 @@ bool Session::interpret(RLP const& _r) } auto ep = bi::tcp::endpoint(peerAddress, _r[i][1].toInt()); NodeId id = _r[i][2].toHash(); - clogS(NetAllDetail) << "Checking: " << ep << "(" << id.abridged() << ")" << isPrivateAddress(peerAddress) << this->id().abridged() << isPrivateAddress(endpoint().address()) << m_server->m_nodes.count(id) << (m_server->m_nodes.count(id) ? isPrivateAddress(m_server->m_nodes.at(id)->address.address()) : -1); + + clogS(NetAllDetail) << "Checking: " << ep << "(" << id.abridged() << ")"; +// clogS(NetAllDetail) << "Checking: " << ep << "(" << id.abridged() << ")" << isPrivateAddress(peerAddress) << this->id().abridged() << isPrivateAddress(endpoint().address()) << m_server->m_peers.count(id) << (m_server->m_peers.count(id) ? isPrivateAddress(m_server->m_peers.at(id)->address.address()) : -1); - if (isPrivateAddress(peerAddress) && !m_server->m_netPrefs.localNetworking) + // ignore if dist(us,item) - dist(us,them) > 1 + + // TODO: isPrivate + if (!m_server->m_netPrefs.localNetworking && isPrivateAddress(peerAddress)) goto CONTINUE; // Private address. Ignore. if (!id) - goto CONTINUE; // Null identity. Ignore. + goto LAMEPEER; // Null identity. Ignore. if (m_server->id() == id) - goto CONTINUE; // Just our info - we already have that. + goto LAMEPEER; // Just our info - we already have that. if (id == this->id()) - goto CONTINUE; // Just their info - we already have that. + goto LAMEPEER; // Just their info - we already have that. + // we don't worry about m_peers.count(id) now because node table will handle this and + // by default we will not blindly connect to nodes received via tcp; instead they will + // be pinged, as-is standard, by the node table and added if appropriate. unless flagged + // as required, nodes aren't connected to unless they respond via discovery; no matter if + // a node is relayed via udp or tcp. // check that it's not us or one we already know: - if (m_server->m_nodes.count(id)) - { - /* MEH. Far from an ideal solution. Leave alone for now. - // Already got this node. - // See if it's any better that ours or not... - // This could be the public address of a known node. - // SECURITY: remove this in beta - it's only for lazy connections and presents an easy attack vector. - if (m_server->m_nodes.count(id) && isPrivateAddress(m_server->m_nodes.at(id)->address.address()) && ep.port() != 0) - // Update address if the node if we now have a public IP for it. - m_server->m_nodes[id]->address = ep; - */ - goto CONTINUE; - } +// if (m_server->m_peers.count(id)) +// { +// /* MEH. Far from an ideal solution. Leave alone for now. +// // Already got this node. +// // See if it's any better that ours or not... +// // This could be the public address of a known node. +// // SECURITY: remove this in beta - it's only for lazy connections and presents an easy attack vector. +// if (m_server->m_peers.count(id) && isPrivateAddress(m_server->m_peers.at(id)->address.address()) && ep.port() != 0) +// // Update address if the node if we now have a public IP for it. +// m_server->m_peers[id]->address = ep; +// */ +// goto CONTINUE; +// } if (!ep.port()) - goto CONTINUE; // Zero port? Don't think so. + goto LAMEPEER; // Zero port? Don't think so. if (ep.port() >= /*49152*/32768) - goto CONTINUE; // Private port according to IANA. + goto LAMEPEER; // Private port according to IANA. - // TODO: PoC-7: - // Technically fine, but ignore for now to avoid peers passing on incoming ports until we can be sure that doesn't happen any more. -// if (ep.port() < 30300 || ep.port() > 30305) -// goto CONTINUE; // Wierd port. - - // Avoid our random other addresses that they might end up giving us. - for (auto i: m_server->m_peerAddresses) - if (ep.address() == i && ep.port() == m_server->listenPort()) - goto CONTINUE; - - // Check that we don't already know about this addr:port combination. If we are, assume the original is best. - // SECURITY: Not a valid assumption in general. Should compare ID origins and pick the best or note uncertainty and weight each equally. - for (auto const& i: m_server->m_nodes) - if (i.second->address == ep) - goto CONTINUE; // Same address but a different node. + // node table handles another node giving us a node which represents one of our other local network interfaces + // node table handles another node giving us a node we already know about // OK passed all our checks. Assume it's good. addRating(1000); // TODO: P2P change to addNode() -// m_server->noteNode(id, ep); + m_server->addNode(Node(id, NodeIPEndpoint(bi::udp::endpoint(ep.address(), 30303), ep))); + clogS(NetTriviaDetail) << "New peer: " << ep << "(" << id .abridged()<< ")"; CONTINUE:; + LAMEPEER:; } break; default: @@ -469,15 +477,15 @@ void Session::drop(DisconnectReason _reason) } catch (...) {} - if (m_node) + if (m_peer) { - if (_reason != m_node->lastDisconnect || _reason == NoDisconnect || _reason == ClientQuit || _reason == DisconnectRequested) - m_node->failedAttempts = 0; - m_node->lastDisconnect = _reason; + if (_reason != m_peer->lastDisconnect || _reason == NoDisconnect || _reason == ClientQuit || _reason == DisconnectRequested) + m_peer->failedAttempts = 0; + m_peer->lastDisconnect = _reason; if (_reason == BadProtocol) { - m_node->rating /= 2; - m_node->score /= 2; + m_peer->rating /= 2; + m_peer->score /= 2; } } m_dropped = true; diff --git a/libp2p/Session.h b/libp2p/Session.h index 19dc60a28..9c0472a81 100644 --- a/libp2p/Session.h +++ b/libp2p/Session.h @@ -39,7 +39,7 @@ namespace dev namespace p2p { -struct NodeInfo; +struct PeerInfo; /** * @brief The Session class @@ -51,7 +51,7 @@ class Session: public std::enable_shared_from_this friend class HostCapabilityFace; public: - Session(Host* _server, bi::tcp::socket _socket, std::shared_ptr const& _n); + Session(Host* _server, bi::tcp::socket _socket, std::shared_ptr const& _n); virtual ~Session(); void start(); @@ -80,7 +80,7 @@ public: void addNote(std::string const& _k, std::string const& _v) { m_info.notes[_k] = _v; } - PeerInfo const& info() const { return m_info; } + PeerSessionInfo const& info() const { return m_info; } void ensureNodesRequested(); void serviceNodesRequest(); @@ -109,10 +109,10 @@ private: std::array m_data; ///< Buffer for ingress packet data. bytes m_incoming; ///< Read buffer for ingress bytes. - PeerInfo m_info; ///< Dynamic information about this peer. + PeerSessionInfo m_info; ///< Dynamic information about this peer. unsigned m_protocolVersion = 0; ///< The protocol version of the peer. - std::shared_ptr m_node; ///< The NodeInfo object. + std::shared_ptr m_peer; ///< The PeerInfo object. bi::tcp::endpoint m_manualEndpoint; ///< The endpoint as specified by the constructor. bool m_dropped = false; ///< If true, we've already divested ourselves of this peer. We're just waiting for the reads & writes to fail before the shared_ptr goes OOS and the destructor kicks in. diff --git a/libwebthree/WebThree.cpp b/libwebthree/WebThree.cpp index fd51b5947..40e878858 100644 --- a/libwebthree/WebThree.cpp +++ b/libwebthree/WebThree.cpp @@ -75,7 +75,7 @@ void WebThreeDirect::setNetworkPreferences(p2p::NetworkPreferences const& _n) startNetwork(); } -std::vector WebThreeDirect::peers() +std::vector WebThreeDirect::peers() { return m_net.peers(); } @@ -102,5 +102,5 @@ void WebThreeDirect::restoreNodes(bytesConstRef _saved) void WebThreeDirect::connect(std::string const& _seedHost, unsigned short _port) { - m_net.connect(NodeId(), _seedHost, _port); + m_net.addNode(NodeId(), _seedHost, _port, _port); } diff --git a/libwebthree/WebThree.h b/libwebthree/WebThree.h index ec7bf2406..3a9fbbb30 100644 --- a/libwebthree/WebThree.h +++ b/libwebthree/WebThree.h @@ -84,7 +84,7 @@ public: // Network stuff: /// Get information on the current peer set. - std::vector peers(); + std::vector peers(); /// Same as peers().size(), but more efficient. size_t peerCount() const; @@ -195,7 +195,7 @@ public: // Peer network stuff - forward through RPCSlave, probably with P2PNetworkSlave/Master classes like Whisper & Ethereum. /// Get information on the current peer set. - std::vector peers(); + std::vector peers(); /// Same as peers().size(), but more efficient. size_t peerCount() const; diff --git a/neth/main.cpp b/neth/main.cpp index 8552177ce..2bcbb5e04 100644 --- a/neth/main.cpp +++ b/neth/main.cpp @@ -913,7 +913,7 @@ int main(int argc, char** argv) // Peers y = 1; - for (PeerInfo const& i: web3.peers()) + for (PeerSessionInfo const& i: web3.peers()) { auto s = boost::format("%1% ms - %2%:%3% - %4%") % toString(chrono::duration_cast(i.lastPing).count()) % diff --git a/test/peer.cpp b/test/peer.cpp index d13ea97f3..5c11d4cfb 100644 --- a/test/peer.cpp +++ b/test/peer.cpp @@ -49,14 +49,14 @@ int peerTest(int argc, char** argv) Host ph("Test", NetworkPreferences(listenPort)); if (!remoteHost.empty()) - ph.connect(NodeId(), remoteHost, remotePort); - - for (int i = 0; ; ++i) - { - this_thread::sleep_for(chrono::milliseconds(100)); - if (!(i % 10)) - ph.keepAlivePeers(); - } + ph.addNode(NodeId(), remoteHost, remotePort, remotePort); + +// for (int i = 0; ; ++i) +// { +// this_thread::sleep_for(chrono::milliseconds(100)); +// if (!(i % 10)) +// ph.keepAlivePeers(); +// } return 0; } diff --git a/test/whisperTopic.cpp b/test/whisperTopic.cpp index 8ecabde9b..ea46b162c 100644 --- a/test/whisperTopic.cpp +++ b/test/whisperTopic.cpp @@ -72,7 +72,7 @@ BOOST_AUTO_TEST_CASE(topic) this_thread::sleep_for(chrono::milliseconds(500)); ph.start(); this_thread::sleep_for(chrono::milliseconds(500)); - ph.connect(NodeId(), "127.0.0.1", 50303); + ph.addNode(NodeId(), "127.0.0.1", 50303, 50303); KeyPair us = KeyPair::create(); for (int i = 0; i < 10; ++i)