diff --git a/libp2p/Host.cpp b/libp2p/Host.cpp index 5377d80b8..e8e206233 100644 --- a/libp2p/Host.cpp +++ b/libp2p/Host.cpp @@ -121,6 +121,23 @@ void Host::doneWorking() for (auto const& h: m_capabilities) h.second->onStopping(); + // disconnect pending handshake, before peers, as a handshake may create a peer + for (unsigned n = 0;; n = 0) + { + { + Guard l(x_connecting); + for (auto i: m_connecting) + if (auto h = i.lock()) + { + h->cancel(); + n++; + } + } + if (!n) + break; + m_ioService.poll(); + } + // disconnect peers for (unsigned n = 0;; n = 0) { @@ -157,7 +174,7 @@ unsigned Host::protocolVersion() const return 3; } -bool Host::startPeerSession(Public const& _id, RLP const& _rlp, RLPXFrameIO* _io, bi::tcp::endpoint _endpoint) +void Host::startPeerSession(Public const& _id, RLP const& _rlp, RLPXFrameIO* _io, bi::tcp::endpoint _endpoint) { /// Get or create Peer shared_ptr p; @@ -171,7 +188,6 @@ bool Host::startPeerSession(Public const& _id, RLP const& _rlp, RLPXFrameIO* _io if (p->isOffline()) p->m_lastConnected = std::chrono::system_clock::now(); p->m_failedAttempts = 0; - // TODO: update pendingconns w/session-weak-ptr for graceful shutdown (otherwise this line isn't safe) p->endpoint.tcp.address(_endpoint.address()); auto protocolVersion = _rlp[0].toInt(); @@ -191,18 +207,20 @@ bool Host::startPeerSession(Public const& _id, RLP const& _rlp, RLPXFrameIO* _io if (protocolVersion != this->protocolVersion()) { ps->disconnect(IncompatibleProtocol); - return false; + return; } { RecursiveGuard l(x_sessions); if (m_sessions.count(_id) && !!m_sessions[_id].lock()) - { - // Already connected. - clog(NetWarn) << "Session already exists for peer with id" << _id.abridged(); - ps->disconnect(DuplicatePeer); - return false; - } + if (auto s = m_sessions[_id].lock()) + if(s->isConnected()) + { + // Already connected. + clog(NetWarn) << "Session already exists for peer with id" << _id.abridged(); + ps->disconnect(DuplicatePeer); + return; + } m_sessions[_id] = ps; } ps->start(); @@ -215,7 +233,6 @@ bool Host::startPeerSession(Public const& _id, RLP const& _rlp, RLPXFrameIO* _io } clog(NetNote) << "p2p.host.peer.register" << _id.abridged(); StructuredLogger::p2pConnected(_id.abridged(), ps->m_peer->peerEndpoint(), ps->m_peer->m_lastConnected, clientVersion, peerCount()); - return true; } void Host::onNodeTableEvent(NodeId const& _n, NodeTableEventType const& _e) @@ -340,22 +357,6 @@ void Host::runAcceptor() clog(NetConnect) << "Listening on local port " << m_listenPort << " (public: " << m_tcpPublic << ")"; m_accepting = true; - // socket is created outside of acceptor-callback - // An allocated socket is necessary as asio can use the socket - // until the callback succeeds or fails. - // - // Until callback succeeds or fails, we can't dealloc it. - // - // Callback is guaranteed to be called via asio or when - // m_tcp4Acceptor->stop() is called by Host. - // - // All exceptions are caught so they don't halt asio and so the - // socket is deleted. - // - // It's possible for an accepted connection to return an error in which - // case the socket may be open and must be closed to prevent asio from - // processing socket events after socket is deallocated. - auto socket = make_shared(new bi::tcp::socket(m_ioService)); m_tcp4Acceptor.async_accept(socket->ref(), [=](boost::system::error_code ec) { @@ -367,6 +368,7 @@ void Host::runAcceptor() { // incoming connection; we don't yet know nodeid auto handshake = make_shared(this, socket); + m_connecting.push_back(handshake); handshake->start(); success = true; } @@ -435,15 +437,14 @@ void Host::addNode(NodeId const& _node, std::string const& _addr, unsigned short void Host::connect(std::shared_ptr const& _p) { - for (unsigned i = 0; i < 200; i++) - if (isWorking() && !m_run) - this_thread::sleep_for(chrono::milliseconds(50)); if (!m_run) return; + _p->m_lastAttempted = std::chrono::system_clock::now(); + if (havePeerSession(_p->id)) { - clog(NetWarn) << "Aborted connect. Node already connected."; + clog(NetConnect) << "Aborted connect. Node already connected."; return; } @@ -475,10 +476,15 @@ void Host::connect(std::shared_ptr const& _p) } else { - clog(NetConnect) << "Connected to" << _p->id.abridged() << "@" << _p->peerEndpoint(); + clog(NetConnect) << "Connecting to" << _p->id.abridged() << "@" << _p->peerEndpoint(); auto handshake = make_shared(this, socket, _p->id); + { + Guard l(x_connecting); + m_connecting.push_back(handshake); + } handshake->start(); } + Guard l(x_pendingNodeConns); m_pendingPeerConns.erase(nptr); }); @@ -527,26 +533,30 @@ void Host::run(boost::system::error_code const&) m_nodeTable->processEvents(); + // cleanup zombies + { + Guard l(x_connecting); + m_connecting.remove_if([](std::weak_ptr h){ return h.lock(); }); + } + for (auto p: m_sessions) if (auto pp = p.second.lock()) pp->serviceNodesRequest(); -// keepAlivePeers(); + keepAlivePeers(); // disconnectLatePeers(); - auto c = peerCount(); - if (m_idealPeerCount && !c) + if (peerCount() < m_idealPeerCount) + { for (auto p: m_peers) if (p.second->shouldReconnect()) { - // TODO p2p: fixme - p.second->m_lastAttempted = std::chrono::system_clock::now(); connect(p.second); break; } - - if (c < m_idealPeerCount) + m_nodeTable->discover(); + } auto runcb = [this](boost::system::error_code const& error) { run(error); }; m_timer->expires_from_now(boost::posix_time::milliseconds(c_timerInterval)); diff --git a/libp2p/Host.h b/libp2p/Host.h index 00fcc0943..ccb34b064 100644 --- a/libp2p/Host.h +++ b/libp2p/Host.h @@ -151,7 +151,7 @@ public: NodeId id() const { return m_alias.pub(); } /// Validates and starts peer session, taking ownership of _io. Disconnects and returns false upon error. - bool startPeerSession(Public const& _id, RLP const& _hello, RLPXFrameIO* _io, bi::tcp::endpoint _endpoint); + void startPeerSession(Public const& _id, RLP const& _hello, RLPXFrameIO* _io, bi::tcp::endpoint _endpoint); protected: void onNodeTableEvent(NodeId const& _n, NodeTableEventType const& _e); @@ -224,6 +224,9 @@ private: /// Mutable because we flush zombie entries (null-weakptrs) as regular maintenance from a const method. mutable std::map> m_sessions; mutable RecursiveMutex x_sessions; + + std::list> m_connecting; ///< Pending connections. + Mutex x_connecting; unsigned m_idealPeerCount = 5; ///< Ideal number of peers to be connected to. diff --git a/libp2p/RLPxHandshake.cpp b/libp2p/RLPxHandshake.cpp index fa346ec09..4f5c70802 100644 --- a/libp2p/RLPxHandshake.cpp +++ b/libp2p/RLPxHandshake.cpp @@ -168,6 +168,8 @@ void RLPXHandshake::transition(boost::system::error_code _ech) else clog(NetConnect) << "p2p.connect.ingress sending capabilities handshake"; + /// This pointer will be freed if there is an error otherwise + /// it will be passed to Host which will take ownership. m_io = new RLPXFrameIO(*this); // old packet format diff --git a/libp2p/RLPxHandshake.h b/libp2p/RLPxHandshake.h index 8459de053..aac8f4b5a 100644 --- a/libp2p/RLPxHandshake.h +++ b/libp2p/RLPxHandshake.h @@ -72,6 +72,8 @@ public: /// Start handshake. void start() { transition(); } + void cancel() { m_nextState = Error; } + protected: /// Write Auth message to socket and transitions to AckAuth. void writeAuth();