diff --git a/alethzero/MainWin.cpp b/alethzero/MainWin.cpp index b73a2cd0d..40663db8d 100644 --- a/alethzero/MainWin.cpp +++ b/alethzero/MainWin.cpp @@ -906,7 +906,7 @@ void Main::refreshNetwork() if (web3()->haveNetwork()) { map clients; - for (PeerInfo const& i: ps) + for (PeerSessionInfo const& i: ps) ui->peers->addItem(QString("[%8 %7] %3 ms - %1:%2 - %4 %5 %6") .arg(QString::fromStdString(i.host)) .arg(i.port) diff --git a/libp2p/Common.h b/libp2p/Common.h index d46c5eed1..059e1a64e 100644 --- a/libp2p/Common.h +++ b/libp2p/Common.h @@ -116,7 +116,7 @@ typedef std::pair CapDesc; typedef std::set CapDescSet; typedef std::vector CapDescs; -struct PeerInfo +struct PeerSessionInfo { NodeId id; std::string clientVersion; @@ -128,7 +128,7 @@ struct PeerInfo std::map notes; }; -using PeerInfos = std::vector; +using PeerSessionInfos = std::vector; } } diff --git a/libp2p/Host.cpp b/libp2p/Host.cpp index 5f756f2a5..243738918 100644 --- a/libp2p/Host.cpp +++ b/libp2p/Host.cpp @@ -38,6 +38,13 @@ using namespace std; using namespace dev; using namespace dev::p2p; +HostNodeTableHandler::HostNodeTableHandler(Host& _host): m_host(_host) {} + +void HostNodeTableHandler::processEvent(NodeId _n, NodeTableEventType _e) +{ + m_host.onNodeTableEvent(_n, _e); +} + Host::Host(std::string const& _clientVersion, NetworkPreferences const& _n, bool _start): Worker("p2p", 0), m_clientVersion(_clientVersion), @@ -116,8 +123,8 @@ void Host::doneWorking() for (unsigned n = 0;; n = 0) { { - RecursiveGuard l(x_peers); - for (auto i: m_peers) + RecursiveGuard l(x_sessions); + for (auto i: m_sessions) if (auto p = i.second.lock()) if (p->isOpen()) { @@ -139,8 +146,8 @@ void Host::doneWorking() m_ioService.reset(); // finally, clear out peers (in case they're lingering) - RecursiveGuard l(x_peers); - m_peers.clear(); + RecursiveGuard l(x_sessions); + m_sessions.clear(); } unsigned Host::protocolVersion() const @@ -150,12 +157,12 @@ unsigned Host::protocolVersion() const void Host::registerPeer(std::shared_ptr _s, CapDescs const& _caps) { - assert(!!_s->m_node); - assert(!!_s->m_node->id); + assert(!!_s->m_peer); + assert(!!_s->m_peer->id); { - RecursiveGuard l(x_peers); - m_peers[_s->m_node->id] = _s; + RecursiveGuard l(x_sessions); + m_sessions[_s->m_peer->id] = _s; } unsigned o = (unsigned)UserPacket; for (auto const& i: _caps) @@ -166,6 +173,34 @@ void Host::registerPeer(std::shared_ptr _s, CapDescs const& _caps) } } +void Host::onNodeTableEvent(NodeId _n, NodeTableEventType _e) +{ + if (_e == NodeEntryAdded) + { + auto n = (*m_nodeTable)[_n]; + if (n) + { + RecursiveGuard l(x_sessions); + auto p = m_peers[_n]; + if (!p) + { + m_peers[_n] = make_shared(); + p = m_peers[_n]; + p->id = _n; + } + p->address = n.endpoint.tcp; + + if (peerCount() < m_idealPeerCount) + connect(p); + } + } + else if (_e == NodeEntryRemoved) + { + RecursiveGuard l(x_sessions); + m_peers.erase(_n); + } +} + void Host::seal(bytes& _b) { _b[0] = 0x22; @@ -179,80 +214,6 @@ void Host::seal(bytes& _b) _b[7] = len & 0xff; } -// TODO: P2P port to NodeTable. (see noteNode calls, Session.cpp) -//shared_ptr Host::noteNode(NodeId _id, bi::tcp::endpoint _a) -//{ -// RecursiveGuard l(x_peers); -// if (_a.port() < 30300 || _a.port() > 30305) -// cwarn << "Weird port being recorded: " << _a.port(); -// -// if (_a.port() >= /*49152*/32768) -// { -// cwarn << "Private port being recorded - setting to 0"; -// _a = bi::tcp::endpoint(_a.address(), 0); -// } -// -// unsigned i; -// if (!m_nodes.count(_id)) -// { -// i = m_nodesList.size(); -// m_nodesList.push_back(_id); -// m_nodes[_id] = make_shared(); -// m_nodes[_id]->id = _id; -// m_nodes[_id]->index = i; -// } -// else -// i = m_nodes[_id]->index; -// m_nodes[_id]->address = _a; -// m_private.extendAll(i); -// if (!_a.port() || (isPrivateAddress(_a.address()) && !m_netPrefs.localNetworking)) -// m_private += i; -// else -// m_private -= i; -// -// return m_nodes[_id]; -//} - -// TODO: P2P base on target -// TODO: P2P store caps in NodeTable/NodeEntry -//Nodes Host::potentialPeers(RangeMask const& _known) -//{ -// RecursiveGuard l(x_peers); -// Nodes ret; -// -// // todo: if localnetworking is enabled it should only share peers if remote -// // is within the same network as our interfaces. -// // this requires flagging nodes when we receive them as to if they're on private network -// auto ns = (m_netPrefs.localNetworking ? _known : (m_private + _known)).inverted(); -// for (auto i: ns) -// ret.push_back(*m_nodes[m_nodesList[i]]); -// return ret; -//} - -KeyPair Host::getHostIdentifier() -{ - static string s_file(getDataDir() + "/host"); - static mutex s_x; - lock_guard l(s_x); - - h256 secret; - bytes b = contents(s_file); - if (b.size() == 32) - memcpy(secret.data(), b.data(), 32); - else - { - // todo: replace w/user entropy; abstract to devcrypto - std::mt19937_64 s_eng(time(0) + chrono::high_resolution_clock::now().time_since_epoch().count()); - std::uniform_int_distribution d(0, 255); - for (unsigned i = 0; i < 32; ++i) - secret[i] = (byte)d(s_eng); - } - - if (!secret) - BOOST_THROW_EXCEPTION(crypto::InvalidState()); - return move(KeyPair(move(secret))); -} - void Host::determinePublic(string const& _publicAddress, bool _upnp) { m_peerAddresses.clear(); @@ -324,15 +285,16 @@ void Host::runAcceptor() { clog(NetConnect) << "Listening on local port " << m_listenPort << " (public: " << m_tcpPublic << ")"; m_accepting = true; - m_socket.reset(new bi::tcp::socket(m_ioService)); - m_tcp4Acceptor.async_accept(*m_socket, [=](boost::system::error_code ec) + + bi::tcp::socket* s = new bi::tcp::socket(m_ioService); + m_tcp4Acceptor.async_accept(*s, [=](boost::system::error_code ec) { bool success = false; if (!ec) { try { - doHandshake(m_socket.release()); + doHandshake(s); success = true; } catch (Exception const& _e) @@ -345,27 +307,29 @@ void Host::runAcceptor() } } - if (!success && m_socket->is_open()) + if (!success && s->is_open()) { boost::system::error_code ec; - m_socket->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec); - m_socket->close(); + s->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec); + s->close(); } m_accepting = false; + delete s; + if (ec.value() < 1) runAcceptor(); }); } } -void Host::doHandshake(bi::tcp::socket* _socket, NodeId _egressNodeId) +void Host::doHandshake(bi::tcp::socket* _socket, NodeId _nodeId) { try { clog(NetConnect) << "Accepting connection for " << _socket->remote_endpoint(); } catch (...){} - auto p = std::make_shared(this, std::move(*_socket), m_nodes[_egressNodeId]); + auto p = std::make_shared(this, std::move(*_socket), m_peers[_nodeId]); p->start(); } @@ -378,6 +342,15 @@ string Host::pocHost() void Host::addNode(NodeId const& _node, std::string const& _addr, unsigned short _tcpPeerPort, unsigned short _udpNodePort) { + if (_tcpPeerPort < 30300 || _tcpPeerPort > 30305) + cwarn << "Weird port being recorded: " << _tcpPeerPort; + + if (_tcpPeerPort >= /*49152*/32768) + { + cwarn << "Private port being recorded - setting to 0"; + _tcpPeerPort = 0; + } + boost::system::error_code ec; bi::address addr = bi::address::from_string(_addr, ec); if (ec) @@ -394,59 +367,25 @@ void Host::addNode(NodeId const& _node, std::string const& _addr, unsigned short addNode(Node(_node, NodeIPEndpoint(bi::udp::endpoint(addr, _udpNodePort), bi::tcp::endpoint(addr, _tcpPeerPort)))); } -void Host::connect(NodeId const& _node, std::string const& _addr, unsigned short _peerPort, unsigned short _nodePort) noexcept +void Host::connect(std::shared_ptr const& _n) { if (!m_run) return; - assert(_node); - - auto n = (*m_nodeTable)[_node]; + if (havePeerSession(_n->id)) + { + clog(NetWarn) << "Aborted connect. Node already connected."; + return; + } - // TODO: refactor into async_resolve - m_ioService.post([=]() + if (!m_nodeTable->haveNode(_n->id)) { - for (auto first: {true, false}) - { - try - { - bi::tcp::endpoint ep; - if (first) - { - bi::tcp::resolver r(m_ioService); - ep = r.resolve({_addr, toString(_peerPort)})->endpoint(); - } - else - ep = bi::tcp::endpoint(bi::address::from_string(_addr), _peerPort); - - if (!n) - m_nodes[_node] = make_shared(); - m_nodes[_node]->id = _node; - m_nodes[_node]->address = ep; - connect(m_nodes[_node]); - break; - } - catch (Exception const& _e) - { - // Couldn't connect - clog(NetConnect) << "Bad host " << _addr << "\n" << diagnostic_information(_e); - } - catch (exception const& e) - { - // Couldn't connect - clog(NetConnect) << "Bad host " << _addr << " (" << e.what() << ")"; - } - } - }); -} - -void Host::connect(std::shared_ptr const& _n) -{ - if (!m_run) + clog(NetWarn) << "Aborted connect. Node not in node table."; return; + } // prevent concurrently connecting to a node - NodeInfo *nptr = _n.get(); + PeerInfo *nptr = _n.get(); { Guard l(x_pendingNodeConns); if (m_pendingNodeConns.count(nptr)) @@ -457,48 +396,32 @@ void Host::connect(std::shared_ptr const& _n) clog(NetConnect) << "Attempting connection to node" << _n->id.abridged() << "@" << _n->address << "from" << id().abridged(); _n->lastAttempted = std::chrono::system_clock::now(); _n->failedAttempts++; + bi::tcp::socket* s = new bi::tcp::socket(m_ioService); - - auto n = node(_n->id); - if (n) - s->async_connect(_n->address, [=](boost::system::error_code const& ec) + s->async_connect(_n->address, [=](boost::system::error_code const& ec) + { + if (ec) { - if (ec) - { - clog(NetConnect) << "Connection refused to node" << _n->id.abridged() << "@" << _n->address << "(" << ec.message() << ")"; - _n->lastDisconnect = TCPError; - _n->lastAttempted = std::chrono::system_clock::now(); - } - else - { - clog(NetConnect) << "Connected to" << _n->id.abridged() << "@" << _n->address; - _n->lastConnected = std::chrono::system_clock::now(); - auto p = make_shared(this, std::move(*s), n); - p->start(); - } - delete s; - Guard l(x_pendingNodeConns); - m_pendingNodeConns.erase(nptr); - }); - else - clog(NetWarn) << "Aborted connect. Node not in node table."; -} - -bool Host::havePeer(NodeId _id) const -{ - RecursiveGuard l(x_peers); - - // Remove dead peers from list. - for (auto i = m_peers.begin(); i != m_peers.end();) - if (i->second.lock().get()) - ++i; + clog(NetConnect) << "Connection refused to node" << _n->id.abridged() << "@" << _n->address << "(" << ec.message() << ")"; + _n->lastDisconnect = TCPError; + _n->lastAttempted = std::chrono::system_clock::now(); + } else - i = m_peers.erase(i); - - return !!m_peers.count(_id); + { + clog(NetConnect) << "Connected to" << _n->id.abridged() << "@" << _n->address; + + _n->lastConnected = std::chrono::system_clock::now(); + auto p = make_shared(this, std::move(*s), _n); + p->start(); + + } + delete s; + Guard l(x_pendingNodeConns); + m_pendingNodeConns.erase(nptr); + }); } -unsigned NodeInfo::fallbackSeconds() const +unsigned PeerInfo::fallbackSeconds() const { switch (lastDisconnect) { @@ -524,27 +447,27 @@ unsigned NodeInfo::fallbackSeconds() const // TODO: P2P migrate grow/prunePeers into 'maintainPeers' & evaluate reputation instead of availability. schedule via deadline timer. //void Host::growPeers() //{ -// RecursiveGuard l(x_peers); -// int morePeers = (int)m_idealPeerCount - m_peers.size(); +// RecursiveGuard l(x_sessions); +// int morePeers = (int)m_idealPeerCount - m_sessions.size(); // if (morePeers > 0) // { // auto toTry = m_ready; // if (!m_netPrefs.localNetworking) // toTry -= m_private; -// set ns; +// set ns; // for (auto i: toTry) // if (m_nodes[m_nodesList[i]]->shouldReconnect()) // ns.insert(*m_nodes[m_nodesList[i]]); // // if (ns.size()) -// for (NodeInfo const& i: ns) +// for (PeerInfo const& i: ns) // { // connect(m_nodes[i.id]); // if (!--morePeers) // return; // } // else -// for (auto const& i: m_peers) +// for (auto const& i: m_sessions) // if (auto p = i.second.lock()) // p->ensureNodesRequested(); // } @@ -552,17 +475,17 @@ unsigned NodeInfo::fallbackSeconds() const // //void Host::prunePeers() //{ -// RecursiveGuard l(x_peers); +// RecursiveGuard l(x_sessions); // // We'll keep at most twice as many as is ideal, halfing what counts as "too young to kill" until we get there. // set dc; -// for (unsigned old = 15000; m_peers.size() - dc.size() > m_idealPeerCount * 2 && old > 100; old /= 2) -// if (m_peers.size() - dc.size() > m_idealPeerCount) +// for (unsigned old = 15000; m_sessions.size() - dc.size() > m_idealPeerCount * 2 && old > 100; old /= 2) +// if (m_sessions.size() - dc.size() > m_idealPeerCount) // { // // look for worst peer to kick off // // first work out how many are old enough to kick off. // shared_ptr worst; // unsigned agedPeers = 0; -// for (auto i: m_peers) +// for (auto i: m_sessions) // if (!dc.count(i.first)) // if (auto p = i.second.lock()) // if (/*(m_mode != NodeMode::Host || p->m_caps != 0x01) &&*/ chrono::steady_clock::now() > p->m_connect + chrono::milliseconds(old)) // don't throw off new peers; peer-servers should never kick off other peer-servers. @@ -578,21 +501,21 @@ unsigned NodeInfo::fallbackSeconds() const // } // // // Remove dead peers from list. -// for (auto i = m_peers.begin(); i != m_peers.end();) +// for (auto i = m_sessions.begin(); i != m_sessions.end();) // if (i->second.lock().get()) // ++i; // else -// i = m_peers.erase(i); +// i = m_sessions.erase(i); //} -PeerInfos Host::peers() const +PeerSessionInfos Host::peers() const { if (!m_run) - return PeerInfos(); + return PeerSessionInfos(); - std::vector ret; - RecursiveGuard l(x_peers); - for (auto& i: m_peers) + std::vector ret; + RecursiveGuard l(x_sessions); + for (auto& i: m_sessions) if (auto j = i.second.lock()) if (j->m_socket.is_open()) ret.push_back(j->m_info); @@ -615,7 +538,7 @@ void Host::run(boost::system::error_code const&) return; } - for (auto p: m_peers) + for (auto p: m_sessions) if (auto pp = p.second.lock()) pp->serviceNodesRequest(); @@ -658,7 +581,8 @@ void Host::startedWorking() runAcceptor(); if (!m_tcpPublic.address().is_unspecified()) - m_nodeTable.reset(new NodeTable(m_ioService, m_key, m_listenPort, m_tcpPublic)); + // TODO: add m_tcpPublic endpoint; sort out endpoint stuff for nodetable + m_nodeTable.reset(new NodeTable(m_ioService, m_key, m_listenPort)); else m_nodeTable.reset(new NodeTable(m_ioService, m_key, m_listenPort > 0 ? m_listenPort : 30303)); } @@ -676,8 +600,8 @@ void Host::doWork() void Host::keepAlivePeers() { - RecursiveGuard l(x_peers); - for (auto p: m_peers) + RecursiveGuard l(x_sessions); + for (auto p: m_sessions) if (auto pp = p.second.lock()) { if (chrono::steady_clock::now() - pp->m_lastReceived >= chrono::seconds(60)) @@ -694,10 +618,10 @@ bytes Host::saveNodes() const RLPStream nodes; int count = 0; { - RecursiveGuard l(x_peers); - for (auto const& i: m_nodes) + RecursiveGuard l(x_sessions); + for (auto const& i: m_peers) { - NodeInfo const& n = *(i.second); + PeerInfo const& n = *(i.second); // TODO: PoC-7: Figure out why it ever shares these ports.//n.address.port() >= 30300 && n.address.port() <= 30305 && if (!n.dead && chrono::system_clock::now() - n.lastConnected < chrono::seconds(3600 * 48) && n.address.port() > 0 && n.address.port() < /*49152*/32768 && n.id != id() && !isPrivateAddress(n.address.address())) { @@ -722,7 +646,7 @@ bytes Host::saveNodes() const void Host::restoreNodes(bytesConstRef _b) { - RecursiveGuard l(x_peers); + RecursiveGuard l(x_sessions); RLP r(_b); if (r.itemCount() > 0 && r[0].isInt()) switch (r[0].toInt()) @@ -740,7 +664,7 @@ void Host::restoreNodes(bytesConstRef _b) else ep = bi::tcp::endpoint(bi::address_v6(i[0].toArray()), i[1].toInt()); auto id = (NodeId)i[2]; - if (!m_nodes.count(id)) + if (!m_peers.count(id)) { //// auto o = (Origin)i[3].toInt(); // auto n = noteNode(id, ep); @@ -759,7 +683,7 @@ void Host::restoreNodes(bytesConstRef _b) for (auto i: r) { auto id = (NodeId)i[2]; - if (!m_nodes.count(id)) + if (!m_peers.count(id)) { bi::tcp::endpoint ep; if (i[0].itemCount() == 4) @@ -770,3 +694,27 @@ void Host::restoreNodes(bytesConstRef _b) } } } + +KeyPair Host::getHostIdentifier() +{ + static string s_file(getDataDir() + "/host"); + static mutex s_x; + lock_guard l(s_x); + + h256 secret; + bytes b = contents(s_file); + if (b.size() == 32) + memcpy(secret.data(), b.data(), 32); + else + { + // todo: replace w/user entropy; abstract to devcrypto + std::mt19937_64 s_eng(time(0) + chrono::high_resolution_clock::now().time_since_epoch().count()); + std::uniform_int_distribution d(0, 255); + for (unsigned i = 0; i < 32; ++i) + secret[i] = (byte)d(s_eng); + } + + if (!secret) + BOOST_THROW_EXCEPTION(crypto::InvalidState()); + return move(KeyPair(move(secret))); +} diff --git a/libp2p/Host.h b/libp2p/Host.h index 5892b5b0d..097b07433 100644 --- a/libp2p/Host.h +++ b/libp2p/Host.h @@ -51,7 +51,7 @@ namespace p2p class Host; -struct NodeInfo +struct PeerInfo { NodeId id; ///< Their id/public key. unsigned index; ///< Index into m_nodesList @@ -88,7 +88,7 @@ struct NodeInfo bool isOffline() const { return lastAttempted > lastConnected; } // p2p: Remove (in favor of lru eviction and sub-protocol ratings). - bool operator<(NodeInfo const& _n) const + bool operator<(PeerInfo const& _n) const { if (isOffline() != _n.isOffline()) return isOffline(); @@ -108,19 +108,30 @@ struct NodeInfo } }; -using Nodes = std::vector; +using Nodes = std::vector; + +class Host; +class HostNodeTableHandler: public NodeTableEventHandler +{ + HostNodeTableHandler(Host& _host); + virtual void processEvent(NodeId _n, NodeTableEventType _e); + Host& m_host; +}; + /** * @brief The Host class * Capabilities should be registered prior to startNetwork, since m_capabilities is not thread-safe. + * @todo gracefully disconnect peer if peer already connected * @todo determinePublic: ipv6, udp * @todo handle conflict if addNode/requireNode called and Node already exists w/conflicting tcp or udp port */ class Host: public Worker { + friend class HostNodeTableHandler; friend class Session; friend class HostCapabilityFace; - friend struct NodeInfo; + friend struct PeerInfo; public: /// Start server, listening for connections on the given port. @@ -142,31 +153,19 @@ public: CapDescs caps() const { CapDescs ret; for (auto const& i: m_capabilities) ret.push_back(i.first); return ret; } template std::shared_ptr cap() const { try { return std::static_pointer_cast(m_capabilities.at(std::make_pair(T::staticName(), T::staticVersion()))); } catch (...) { return nullptr; } } - /// Manually add node. - void addNode(NodeId const& _node, std::string const& _addr, unsigned short _tcpPort = 30303, unsigned short _udpPort = 30303); - - /// Connect to a peer explicitly. - void connect(NodeId const& _node, std::string const& _addr, unsigned short _tcpPort = 30303, unsigned short _udpPort = 30303) noexcept; - void connect(NodeId const& _node, bi::tcp::endpoint const& _ep); - void connect(std::shared_ptr const& _n); - - /// @returns true iff we have a peer of the given id. - bool havePeer(NodeId _id) const; - + bool havePeerSession(NodeId _id) { RecursiveGuard l(x_sessions); if (m_sessions.count(_id)) return !!m_sessions[_id].lock(); else return false; } + + /// Add node. + void addNode(NodeId const& _node, std::string const& _addr, unsigned short _tcpPort, unsigned short _udpPort); + /// Set ideal number of peers. void setIdealPeerCount(unsigned _n) { m_idealPeerCount = _n; } - - /// p2p: template? - void setIdealPeerCount(HostCapabilityFace* _cap, unsigned _n) { m_capIdealPeerCount[_cap->capDesc()] = _n; } /// Get peer information. - PeerInfos peers() const; + PeerSessionInfos peers() const; /// Get number of peers connected; equivalent to, but faster than, peers().size(). - size_t peerCount() const { RecursiveGuard l(x_peers); return m_peers.size(); } - - /// Ping the peers to update the latency information and disconnect peers which have timed out. - void keepAlivePeers(); + size_t peerCount() const { RecursiveGuard l(x_sessions); return m_peers.size(); } /// Get the port we're listening on currently. unsigned short listenPort() const { return m_tcpPublic.port(); } @@ -177,7 +176,8 @@ public: /// Deserialise the data and populate the set of known peers. void restoreNodes(bytesConstRef _b); - Nodes nodes() const { RecursiveGuard l(x_peers); Nodes ret; for (auto const& i: m_nodes) ret.push_back(*i.second); return ret; } + // TODO: P2P this should be combined with peers into a HostStat object of some kind; coalesce data, as it's only used for status information. + Nodes nodes() const { RecursiveGuard l(x_sessions); Nodes ret; for (auto const& i: m_peers) ret.push_back(*i.second); return ret; } void setNetworkPreferences(NetworkPreferences const& _p) { auto had = isStarted(); if (had) stop(); m_netPrefs = _p; if (had) start(); } @@ -195,14 +195,20 @@ public: void registerPeer(std::shared_ptr _s, CapDescs const& _caps); - std::shared_ptr node(NodeId _id) const { if (m_nodes.count(_id)) return m_nodes.at(_id); return std::shared_ptr(); } +// std::shared_ptr node(NodeId _id) const { if (m_nodes.count(_id)) return m_nodes.at(_id); return std::shared_ptr(); } + +protected: + void onNodeTableEvent(NodeId _n, NodeTableEventType _e); private: - KeyPair getHostIdentifier(); - /// Populate m_peerAddresses with available public addresses. void determinePublic(std::string const& _publicAddress, bool _upnp); + void connect(std::shared_ptr const& _n); + + /// Ping the peers to update the latency information and disconnect peers which have timed out. + void keepAlivePeers(); + /// Called only from startedWorking(). void runAcceptor(); @@ -223,9 +229,10 @@ private: virtual void doneWorking(); /// Add node - void addNode(Node const& _nodeInfo) { m_nodeTable->addNode(_nodeInfo); } + void addNode(Node const& _node) { m_nodeTable->addNode(_node); } -// Nodes potentialPeers(RangeMask const& _known); + /// Get or create host identifier (KeyPair). + KeyPair getHostIdentifier(); bool m_run = false; ///< Whether network is running. std::mutex x_runTimer; ///< Start/stop mutex. @@ -241,33 +248,24 @@ private: ba::io_service m_ioService; ///< IOService for network stuff. bi::tcp::acceptor m_tcp4Acceptor; ///< Listening acceptor. - std::unique_ptr m_socket; ///< Listening socket. std::unique_ptr m_timer; ///< Timer which, when network is running, calls scheduler() every c_timerInterval ms. static const unsigned c_timerInterval = 100; ///< Interval which m_timer is run when network is connected. - std::set m_pendingNodeConns; /// Used only by connect(NodeInfo&) to limit concurrently connecting to same node. See connect(shared_ptrconst&). + std::set m_pendingNodeConns; /// Used only by connect(PeerInfo&) to limit concurrently connecting to same node. See connect(shared_ptrconst&). Mutex x_pendingNodeConns; bi::tcp::endpoint m_tcpPublic; ///< Our public listening endpoint. KeyPair m_key; ///< Our unique ID. std::shared_ptr m_nodeTable; ///< Node table (uses kademlia-like discovery). - std::map m_capIdealPeerCount; ///< Ideal peer count for capability. - - mutable RecursiveMutex x_peers; + std::map> m_peers; + + mutable RecursiveMutex x_sessions; + /// The nodes to which we are currently connected. /// Mutable because we flush zombie entries (null-weakptrs) as regular maintenance from a const method. - mutable std::map> m_peers; - - /// Nodes to which we may connect (or to which we have connected). - /// TODO: mutex; replace with nodeTable - std::map > m_nodes; - -// /// A list of node IDs. This contains every index from m_nodes; the order is guaranteed to remain the same. -// std::vector m_nodesList; - -// RangeMask m_private; ///< Indices into m_nodesList over to which nodes are private. + mutable std::map> m_sessions; unsigned m_idealPeerCount = 5; ///< Ideal number of peers to be connected to. @@ -280,6 +278,6 @@ private: bool m_accepting = false; }; - + } } diff --git a/libp2p/HostCapability.cpp b/libp2p/HostCapability.cpp index 0728bef2c..8ff74d3b6 100644 --- a/libp2p/HostCapability.cpp +++ b/libp2p/HostCapability.cpp @@ -34,9 +34,9 @@ void HostCapabilityFace::seal(bytes& _b) std::vector > HostCapabilityFace::peers() const { - RecursiveGuard l(m_host->x_peers); + RecursiveGuard l(m_host->x_sessions); std::vector > ret; - for (auto const& i: m_host->m_peers) + for (auto const& i: m_host->m_sessions) if (std::shared_ptr p = i.second.lock()) if (p->m_capabilities.count(capDesc())) ret.push_back(p); diff --git a/libp2p/NodeTable.cpp b/libp2p/NodeTable.cpp index 1b74abbda..b384c42d2 100644 --- a/libp2p/NodeTable.cpp +++ b/libp2p/NodeTable.cpp @@ -27,7 +27,7 @@ using namespace dev::p2p; NodeEntry::NodeEntry(Node _src, Public _pubk, NodeIPEndpoint _gw): Node(_pubk, _gw), distance(NodeTable::dist(_src.id,_pubk)) {} NodeEntry::NodeEntry(Node _src, Public _pubk, bi::udp::endpoint _udp): Node(_pubk, NodeIPEndpoint(_udp)), distance(NodeTable::dist(_src.id,_pubk)) {} -NodeTable::NodeTable(ba::io_service& _io, KeyPair _alias, uint16_t _udp, bi::tcp::endpoint _ep): +NodeTable::NodeTable(ba::io_service& _io, KeyPair _alias, uint16_t _udp): m_node(Node(_alias.pub(), bi::udp::endpoint())), m_secret(_alias.sec()), m_io(_io), @@ -53,11 +53,32 @@ NodeTable::~NodeTable() m_socketPtr->disconnect(); } +shared_ptr NodeTable::addNode(Public const& _pubk, bi::udp::endpoint const& _udp, bi::tcp::endpoint const& _tcp) +{ + auto node = Node(_pubk, NodeIPEndpoint(_udp, _tcp)); + return move(addNode(node)); +} + +shared_ptr NodeTable::addNode(Node const& _node) +{ + Guard l(x_nodes); + shared_ptr ret = m_nodes[_node.id]; + if (!ret) + { + ret.reset(new NodeEntry(m_node, _node.id, NodeIPEndpoint(_node.endpoint.udp, _node.endpoint.tcp))); + m_nodes[_node.id] = ret; + PingNode p(_node.endpoint.udp, m_node.endpoint.udp.address().to_string(), m_node.endpoint.udp.port()); + p.sign(m_secret); + m_socketPtr->send(p); + } + return move(ret); +} + void NodeTable::join() { doFindNode(m_node.id); } - + list NodeTable::nodes() const { list nodes; @@ -84,6 +105,13 @@ Node NodeTable::operator[](NodeId _id) return !!n ? *n : Node(); } +shared_ptr NodeTable::getNodeEntry(NodeId _id) +{ + Guard l(x_nodes); + auto n = m_nodes[_id]; + return !!n ? move(n) : move(shared_ptr()); +} + void NodeTable::requestNeighbours(NodeEntry const& _node, NodeId _target) const { FindNode p(_node.endpoint.udp, _target); @@ -240,29 +268,6 @@ void NodeTable::evict(shared_ptr _leastSeen, shared_ptr _n ping(_leastSeen.get()); } -shared_ptr NodeTable::addNode(Public const& _pubk, bi::udp::endpoint const& _udp, bi::tcp::endpoint const& _tcp) -{ - auto node = Node(_pubk, NodeIPEndpoint(_udp, _tcp)); - return move(addNode(node)); -} - -shared_ptr NodeTable::addNode(Node const& _node) -{ - shared_ptr ret; - Guard l(x_nodes); - if (auto n = m_nodes[_node.id]) - ret = n; - else - { - ret.reset(new NodeEntry(m_node, _node.id, NodeIPEndpoint(_node.endpoint.udp, _node.endpoint.tcp))); - m_nodes[_node.id] = ret; - PingNode p(_node.endpoint.udp, m_node.endpoint.udp.address().to_string(), m_node.endpoint.udp.port()); - p.sign(m_secret); - m_socketPtr->send(p); - } - return move(ret); -} - void NodeTable::noteNode(Public const& _pubk, bi::udp::endpoint const& _endpoint) { if (_pubk == m_node.address()) diff --git a/libp2p/NodeTable.h b/libp2p/NodeTable.h index 79ce6a9e3..7f75bf27e 100644 --- a/libp2p/NodeTable.h +++ b/libp2p/NodeTable.h @@ -22,6 +22,7 @@ #pragma once #include +#include #include #include #include @@ -74,14 +75,37 @@ struct Node */ struct NodeEntry: public Node { - NodeEntry(Node _src, Public _pubk, NodeIPEndpoint _gw); //: Node(_pubk, _gw), distance(dist(_src.id,_pubk)) {} - NodeEntry(Node _src, Public _pubk, bi::udp::endpoint _udp); //: Node(_pubk, NodeIPEndpoint(_udp)), distance(dist(_src.id,_pubk)) {} + NodeEntry(Node _src, Public _pubk, NodeIPEndpoint _gw); + NodeEntry(Node _src, Public _pubk, bi::udp::endpoint _udp); - const unsigned distance; ///< Node's distance from _src (see constructor). + const unsigned distance; ///< Node's distance (xor of _src as integer). +}; + +enum NodeTableEventType { + NodeEntryAdded, + NodeEntryRemoved +}; +class NodeTable; +class NodeTableEventHandler +{ + friend class NodeTable; +public: + virtual void processEvent(NodeId _n, NodeTableEventType _e) =0; + +protected: + /// Called by NodeTable on behalf of an implementation (Host) to process new events without blocking nodetable. + void processEvents() { std::list> events; { Guard l(x_events); if (!m_nodeEvents.size()) return; m_nodeEvents.unique(); for (auto const& n: m_nodeEvents) events.push_back(std::make_pair(n,m_events[n])); m_nodeEvents.empty(); m_events.empty(); } for (auto const& e: events) processEvent(e.first, e.second); } + + /// Called by NodeTable to append event. + virtual void appendEvent(NodeId _n, NodeTableEventType _e) { Guard l(x_events); m_nodeEvents.push_back(_n); m_events[_n] = _e; } + + Mutex x_events; + std::list m_nodeEvents; + std::map m_events; }; /** - * NodeTable using S/Kademlia system for node discovery and preference. + * NodeTable using modified kademlia for node discovery and preference. * untouched buckets are refreshed if they have not been touched within an hour * * Thread-safety is ensured by modifying NodeEntry details via @@ -122,7 +146,7 @@ class NodeTable: UDPSocketEvents, public std::enable_shared_from_this using EvictionTimeout = std::pair,NodeId>; ///< First NodeId may be evicted and replaced with second NodeId. public: - NodeTable(ba::io_service& _io, KeyPair _alias, uint16_t _udpPort = 30303, bi::tcp::endpoint _ep = bi::tcp::endpoint()); + NodeTable(ba::io_service& _io, KeyPair _alias, uint16_t _udpPort = 30303); ~NodeTable(); /// Constants for Kademlia, mostly derived from address space. @@ -145,6 +169,12 @@ public: static unsigned dist(NodeId const& _a, NodeId const& _b) { u512 d = _a ^ _b; unsigned ret; for (ret = 0; d >>= 1; ++ret) {}; return ret; } + /// Set event handler for NodeEntryAdded and NodeEntryRemoved events. + void setEventHandler(NodeTableEventHandler* _handler) { m_nodeEvents.reset(_handler); } + + /// Called by implementation which provided handler to process NodeEntryAdded/NodeEntryRemoved events. Events are coalesced by type whereby old events are ignored. + void processEvents() { if (m_nodeEvents) m_nodeEvents->processEvents(); } + /// Add node. Node will be pinged if it's not already known. std::shared_ptr addNode(Public const& _pubk, bi::udp::endpoint const& _udp, bi::tcp::endpoint const& _tcp = bi::tcp::endpoint()); @@ -157,7 +187,9 @@ public: std::list nodes() const; std::list state() const; + bool haveNode(NodeId _id) { Guard l(x_nodes); return !!m_nodes[_id]; } Node operator[](NodeId _id); + std::shared_ptr getNodeEntry(NodeId _id); protected: struct NodeBucket @@ -207,6 +239,8 @@ protected: /// Sends FindNeighbor packet. See doFindNode. void requestNeighbours(NodeEntry const& _node, NodeId _target) const; + std::unique_ptr m_nodeEvents; ///< Event handler for node events. + Node m_node; ///< This node. Secret m_secret; ///< This nodes secret key. diff --git a/libp2p/Session.cpp b/libp2p/Session.cpp index c6797b69c..0ef06ab50 100644 --- a/libp2p/Session.cpp +++ b/libp2p/Session.cpp @@ -36,21 +36,21 @@ using namespace dev::p2p; #endif #define clogS(X) dev::LogOutputStream(false) << "| " << std::setw(2) << m_socket.native_handle() << "] " -Session::Session(Host* _s, bi::tcp::socket _socket, std::shared_ptr const& _n): +Session::Session(Host* _s, bi::tcp::socket _socket, std::shared_ptr const& _n): m_server(_s), m_socket(std::move(_socket)), - m_node(_n), + m_info({m_peer->id, "?", _n->address.address().to_string(), _n->address.port(), std::chrono::steady_clock::duration(0), CapDescSet(), 0, map()}), + m_peer(_n), m_manualEndpoint(_n->address) { m_lastReceived = m_connect = std::chrono::steady_clock::now(); - m_info = PeerInfo({m_node->id, "?", _n->address.address().to_string(), _n->address.port(), std::chrono::steady_clock::duration(0), CapDescSet(), 0, map()}); } Session::~Session() { // TODO: P2P revisit (refactored from previous logic) - if (m_node && !(id() && !isPermanentProblem(m_node->lastDisconnect) && !m_node->dead)) - m_node->lastConnected = m_node->lastAttempted - chrono::seconds(1); + if (m_peer && !(id() && !isPermanentProblem(m_peer->lastDisconnect) && !m_peer->dead)) + m_peer->lastConnected = m_peer->lastAttempted - chrono::seconds(1); // Read-chain finished for one reason or another. for (auto& i: m_capabilities) @@ -70,35 +70,35 @@ Session::~Session() NodeId Session::id() const { - return m_node ? m_node->id : NodeId(); + return m_peer ? m_peer->id : NodeId(); } void Session::addRating(unsigned _r) { - if (m_node) + if (m_peer) { - m_node->rating += _r; - m_node->score += _r; + m_peer->rating += _r; + m_peer->score += _r; } } int Session::rating() const { - return m_node->rating; + return m_peer->rating; } // TODO: P2P integration: session->? should be unavailable when socket isn't open bi::tcp::endpoint Session::endpoint() const { - if (m_socket.is_open() && m_node) + if (m_socket.is_open() && m_peer) try { - return bi::tcp::endpoint(m_socket.remote_endpoint().address(), m_node->address.port()); + return bi::tcp::endpoint(m_socket.remote_endpoint().address(), m_peer->address.port()); } catch (...) {} - if (m_node) - return m_node->address; + if (m_peer) + return m_peer->address; return m_manualEndpoint; } @@ -195,10 +195,11 @@ bool Session::interpret(RLP const& _r) return true; } - assert(!!m_node); - assert(!!m_node->id); + assert(!!m_peer); + assert(!!m_peer->id); - if (m_server->havePeer(id)) + // TODO: P2P ensure disabled logic is covered + if (false /* m_server->havePeer(id) */) { // Already connected. clogS(NetWarn) << "Already connected to a peer with id" << id.abridged(); @@ -217,20 +218,28 @@ bool Session::interpret(RLP const& _r) // TODO: P2P Move all node-lifecycle information into Host. Determine best way to handle peer-lifecycle properties vs node lifecycle. // TODO: P2P remove oldid // TODO: P2P with encrypted transport the handshake will fail and we won't get here -// m_node = m_server->noteNode(m_node->id, bi::tcp::endpoint(m_socket.remote_endpoint().address(), listenPort)); - if (m_node->isOffline()) - m_node->lastConnected = chrono::system_clock::now(); +// m_peer = m_server->noteNode(m_peer->id, bi::tcp::endpoint(m_socket.remote_endpoint().address(), listenPort)); + if (m_peer->isOffline()) + m_peer->lastConnected = chrono::system_clock::now(); // // // TODO: P2P introduce map of nodes we've given to this node (if GetPeers/Peers stays in TCP) - m_knownNodes.extendAll(m_node->index); - m_knownNodes.unionWith(m_node->index); + m_knownNodes.extendAll(m_peer->index); + m_knownNodes.unionWith(m_peer->index); if (m_protocolVersion != m_server->protocolVersion()) { disconnect(IncompatibleProtocol); return true; } - m_info = PeerInfo({id, clientVersion, m_socket.remote_endpoint().address().to_string(), listenPort, std::chrono::steady_clock::duration(), _r[3].toSet(), (unsigned)m_socket.native_handle(), map() }); + + // TODO: P2P migrate auth to Host and Handshake to constructor + m_info.clientVersion = clientVersion; + m_info.host = m_socket.remote_endpoint().address().to_string(); + m_info.port = listenPort; + m_info.lastPing = std::chrono::steady_clock::duration(); + m_info.caps = _r[3].toSet(); + m_info.socket = (unsigned)m_socket.native_handle(); + m_info.notes = map(); m_server->registerPeer(shared_from_this(), caps); break; @@ -285,64 +294,63 @@ bool Session::interpret(RLP const& _r) } auto ep = bi::tcp::endpoint(peerAddress, _r[i][1].toInt()); NodeId id = _r[i][2].toHash(); - clogS(NetAllDetail) << "Checking: " << ep << "(" << id.abridged() << ")" << isPrivateAddress(peerAddress) << this->id().abridged() << isPrivateAddress(endpoint().address()) << m_server->m_nodes.count(id) << (m_server->m_nodes.count(id) ? isPrivateAddress(m_server->m_nodes.at(id)->address.address()) : -1); + + clogS(NetAllDetail) << "Checking: " << ep << "(" << id.abridged() << ")"; +// clogS(NetAllDetail) << "Checking: " << ep << "(" << id.abridged() << ")" << isPrivateAddress(peerAddress) << this->id().abridged() << isPrivateAddress(endpoint().address()) << m_server->m_peers.count(id) << (m_server->m_peers.count(id) ? isPrivateAddress(m_server->m_peers.at(id)->address.address()) : -1); - if (isPrivateAddress(peerAddress) && !m_server->m_netPrefs.localNetworking) + // ignore if dist(us,item) - dist(us,them) > 1 + + // TODO: isPrivate + if (!m_server->m_netPrefs.localNetworking && isPrivateAddress(peerAddress)) goto CONTINUE; // Private address. Ignore. if (!id) - goto CONTINUE; // Null identity. Ignore. + goto LAMEPEER; // Null identity. Ignore. if (m_server->id() == id) - goto CONTINUE; // Just our info - we already have that. + goto LAMEPEER; // Just our info - we already have that. if (id == this->id()) - goto CONTINUE; // Just their info - we already have that. + goto LAMEPEER; // Just their info - we already have that. + // we don't worry about m_peers.count(id) now because node table will handle this and + // by default we will not blindly connect to nodes received via tcp; instead they will + // be pinged, as-is standard, by the node table and added if appropriate. unless flagged + // as required, nodes aren't connected to unless they respond via discovery; no matter if + // a node is relayed via udp or tcp. // check that it's not us or one we already know: - if (m_server->m_nodes.count(id)) - { - /* MEH. Far from an ideal solution. Leave alone for now. - // Already got this node. - // See if it's any better that ours or not... - // This could be the public address of a known node. - // SECURITY: remove this in beta - it's only for lazy connections and presents an easy attack vector. - if (m_server->m_nodes.count(id) && isPrivateAddress(m_server->m_nodes.at(id)->address.address()) && ep.port() != 0) - // Update address if the node if we now have a public IP for it. - m_server->m_nodes[id]->address = ep; - */ - goto CONTINUE; - } +// if (m_server->m_peers.count(id)) +// { +// /* MEH. Far from an ideal solution. Leave alone for now. +// // Already got this node. +// // See if it's any better that ours or not... +// // This could be the public address of a known node. +// // SECURITY: remove this in beta - it's only for lazy connections and presents an easy attack vector. +// if (m_server->m_peers.count(id) && isPrivateAddress(m_server->m_peers.at(id)->address.address()) && ep.port() != 0) +// // Update address if the node if we now have a public IP for it. +// m_server->m_peers[id]->address = ep; +// */ +// goto CONTINUE; +// } if (!ep.port()) - goto CONTINUE; // Zero port? Don't think so. + goto LAMEPEER; // Zero port? Don't think so. if (ep.port() >= /*49152*/32768) - goto CONTINUE; // Private port according to IANA. + goto LAMEPEER; // Private port according to IANA. - // TODO: PoC-7: - // Technically fine, but ignore for now to avoid peers passing on incoming ports until we can be sure that doesn't happen any more. -// if (ep.port() < 30300 || ep.port() > 30305) -// goto CONTINUE; // Wierd port. - - // Avoid our random other addresses that they might end up giving us. - for (auto i: m_server->m_peerAddresses) - if (ep.address() == i && ep.port() == m_server->listenPort()) - goto CONTINUE; - - // Check that we don't already know about this addr:port combination. If we are, assume the original is best. - // SECURITY: Not a valid assumption in general. Should compare ID origins and pick the best or note uncertainty and weight each equally. - for (auto const& i: m_server->m_nodes) - if (i.second->address == ep) - goto CONTINUE; // Same address but a different node. + // node table handles another node giving us a node which represents one of our other local network interfaces + // node table handles another node giving us a node we already know about // OK passed all our checks. Assume it's good. addRating(1000); // TODO: P2P change to addNode() -// m_server->noteNode(id, ep); + m_server->addNode(Node(id, NodeIPEndpoint(bi::udp::endpoint(ep.address(), 30303), ep))); + clogS(NetTriviaDetail) << "New peer: " << ep << "(" << id .abridged()<< ")"; CONTINUE:; + LAMEPEER:; } break; default: @@ -469,15 +477,15 @@ void Session::drop(DisconnectReason _reason) } catch (...) {} - if (m_node) + if (m_peer) { - if (_reason != m_node->lastDisconnect || _reason == NoDisconnect || _reason == ClientQuit || _reason == DisconnectRequested) - m_node->failedAttempts = 0; - m_node->lastDisconnect = _reason; + if (_reason != m_peer->lastDisconnect || _reason == NoDisconnect || _reason == ClientQuit || _reason == DisconnectRequested) + m_peer->failedAttempts = 0; + m_peer->lastDisconnect = _reason; if (_reason == BadProtocol) { - m_node->rating /= 2; - m_node->score /= 2; + m_peer->rating /= 2; + m_peer->score /= 2; } } m_dropped = true; diff --git a/libp2p/Session.h b/libp2p/Session.h index 19dc60a28..9c0472a81 100644 --- a/libp2p/Session.h +++ b/libp2p/Session.h @@ -39,7 +39,7 @@ namespace dev namespace p2p { -struct NodeInfo; +struct PeerInfo; /** * @brief The Session class @@ -51,7 +51,7 @@ class Session: public std::enable_shared_from_this friend class HostCapabilityFace; public: - Session(Host* _server, bi::tcp::socket _socket, std::shared_ptr const& _n); + Session(Host* _server, bi::tcp::socket _socket, std::shared_ptr const& _n); virtual ~Session(); void start(); @@ -80,7 +80,7 @@ public: void addNote(std::string const& _k, std::string const& _v) { m_info.notes[_k] = _v; } - PeerInfo const& info() const { return m_info; } + PeerSessionInfo const& info() const { return m_info; } void ensureNodesRequested(); void serviceNodesRequest(); @@ -109,10 +109,10 @@ private: std::array m_data; ///< Buffer for ingress packet data. bytes m_incoming; ///< Read buffer for ingress bytes. - PeerInfo m_info; ///< Dynamic information about this peer. + PeerSessionInfo m_info; ///< Dynamic information about this peer. unsigned m_protocolVersion = 0; ///< The protocol version of the peer. - std::shared_ptr m_node; ///< The NodeInfo object. + std::shared_ptr m_peer; ///< The PeerInfo object. bi::tcp::endpoint m_manualEndpoint; ///< The endpoint as specified by the constructor. bool m_dropped = false; ///< If true, we've already divested ourselves of this peer. We're just waiting for the reads & writes to fail before the shared_ptr goes OOS and the destructor kicks in. diff --git a/libwebthree/WebThree.cpp b/libwebthree/WebThree.cpp index fd51b5947..40e878858 100644 --- a/libwebthree/WebThree.cpp +++ b/libwebthree/WebThree.cpp @@ -75,7 +75,7 @@ void WebThreeDirect::setNetworkPreferences(p2p::NetworkPreferences const& _n) startNetwork(); } -std::vector WebThreeDirect::peers() +std::vector WebThreeDirect::peers() { return m_net.peers(); } @@ -102,5 +102,5 @@ void WebThreeDirect::restoreNodes(bytesConstRef _saved) void WebThreeDirect::connect(std::string const& _seedHost, unsigned short _port) { - m_net.connect(NodeId(), _seedHost, _port); + m_net.addNode(NodeId(), _seedHost, _port, _port); } diff --git a/libwebthree/WebThree.h b/libwebthree/WebThree.h index ec7bf2406..3a9fbbb30 100644 --- a/libwebthree/WebThree.h +++ b/libwebthree/WebThree.h @@ -84,7 +84,7 @@ public: // Network stuff: /// Get information on the current peer set. - std::vector peers(); + std::vector peers(); /// Same as peers().size(), but more efficient. size_t peerCount() const; @@ -195,7 +195,7 @@ public: // Peer network stuff - forward through RPCSlave, probably with P2PNetworkSlave/Master classes like Whisper & Ethereum. /// Get information on the current peer set. - std::vector peers(); + std::vector peers(); /// Same as peers().size(), but more efficient. size_t peerCount() const; diff --git a/neth/main.cpp b/neth/main.cpp index 8552177ce..2bcbb5e04 100644 --- a/neth/main.cpp +++ b/neth/main.cpp @@ -913,7 +913,7 @@ int main(int argc, char** argv) // Peers y = 1; - for (PeerInfo const& i: web3.peers()) + for (PeerSessionInfo const& i: web3.peers()) { auto s = boost::format("%1% ms - %2%:%3% - %4%") % toString(chrono::duration_cast(i.lastPing).count()) % diff --git a/test/peer.cpp b/test/peer.cpp index d13ea97f3..5c11d4cfb 100644 --- a/test/peer.cpp +++ b/test/peer.cpp @@ -49,14 +49,14 @@ int peerTest(int argc, char** argv) Host ph("Test", NetworkPreferences(listenPort)); if (!remoteHost.empty()) - ph.connect(NodeId(), remoteHost, remotePort); - - for (int i = 0; ; ++i) - { - this_thread::sleep_for(chrono::milliseconds(100)); - if (!(i % 10)) - ph.keepAlivePeers(); - } + ph.addNode(NodeId(), remoteHost, remotePort, remotePort); + +// for (int i = 0; ; ++i) +// { +// this_thread::sleep_for(chrono::milliseconds(100)); +// if (!(i % 10)) +// ph.keepAlivePeers(); +// } return 0; } diff --git a/test/whisperTopic.cpp b/test/whisperTopic.cpp index 8ecabde9b..ea46b162c 100644 --- a/test/whisperTopic.cpp +++ b/test/whisperTopic.cpp @@ -72,7 +72,7 @@ BOOST_AUTO_TEST_CASE(topic) this_thread::sleep_for(chrono::milliseconds(500)); ph.start(); this_thread::sleep_for(chrono::milliseconds(500)); - ph.connect(NodeId(), "127.0.0.1", 50303); + ph.addNode(NodeId(), "127.0.0.1", 50303, 50303); KeyPair us = KeyPair::create(); for (int i = 0; i < 10; ++i)