Browse Source

lifecycle management of handshake sessions

cl-refactor
subtly 10 years ago
parent
commit
529fda3a95
  1. 86
      libp2p/Host.cpp
  2. 5
      libp2p/Host.h
  3. 2
      libp2p/RLPxHandshake.cpp
  4. 2
      libp2p/RLPxHandshake.h

86
libp2p/Host.cpp

@ -121,6 +121,23 @@ void Host::doneWorking()
for (auto const& h: m_capabilities) for (auto const& h: m_capabilities)
h.second->onStopping(); h.second->onStopping();
// disconnect pending handshake, before peers, as a handshake may create a peer
for (unsigned n = 0;; n = 0)
{
{
Guard l(x_connecting);
for (auto i: m_connecting)
if (auto h = i.lock())
{
h->cancel();
n++;
}
}
if (!n)
break;
m_ioService.poll();
}
// disconnect peers // disconnect peers
for (unsigned n = 0;; n = 0) for (unsigned n = 0;; n = 0)
{ {
@ -157,7 +174,7 @@ unsigned Host::protocolVersion() const
return 3; return 3;
} }
bool Host::startPeerSession(Public const& _id, RLP const& _rlp, RLPXFrameIO* _io, bi::tcp::endpoint _endpoint) void Host::startPeerSession(Public const& _id, RLP const& _rlp, RLPXFrameIO* _io, bi::tcp::endpoint _endpoint)
{ {
/// Get or create Peer /// Get or create Peer
shared_ptr<Peer> p; shared_ptr<Peer> p;
@ -171,7 +188,6 @@ bool Host::startPeerSession(Public const& _id, RLP const& _rlp, RLPXFrameIO* _io
if (p->isOffline()) if (p->isOffline())
p->m_lastConnected = std::chrono::system_clock::now(); p->m_lastConnected = std::chrono::system_clock::now();
p->m_failedAttempts = 0; p->m_failedAttempts = 0;
// TODO: update pendingconns w/session-weak-ptr for graceful shutdown (otherwise this line isn't safe)
p->endpoint.tcp.address(_endpoint.address()); p->endpoint.tcp.address(_endpoint.address());
auto protocolVersion = _rlp[0].toInt<unsigned>(); auto protocolVersion = _rlp[0].toInt<unsigned>();
@ -191,18 +207,20 @@ bool Host::startPeerSession(Public const& _id, RLP const& _rlp, RLPXFrameIO* _io
if (protocolVersion != this->protocolVersion()) if (protocolVersion != this->protocolVersion())
{ {
ps->disconnect(IncompatibleProtocol); ps->disconnect(IncompatibleProtocol);
return false; return;
} }
{ {
RecursiveGuard l(x_sessions); RecursiveGuard l(x_sessions);
if (m_sessions.count(_id) && !!m_sessions[_id].lock()) if (m_sessions.count(_id) && !!m_sessions[_id].lock())
{ if (auto s = m_sessions[_id].lock())
// Already connected. if(s->isConnected())
clog(NetWarn) << "Session already exists for peer with id" << _id.abridged(); {
ps->disconnect(DuplicatePeer); // Already connected.
return false; clog(NetWarn) << "Session already exists for peer with id" << _id.abridged();
} ps->disconnect(DuplicatePeer);
return;
}
m_sessions[_id] = ps; m_sessions[_id] = ps;
} }
ps->start(); ps->start();
@ -215,7 +233,6 @@ bool Host::startPeerSession(Public const& _id, RLP const& _rlp, RLPXFrameIO* _io
} }
clog(NetNote) << "p2p.host.peer.register" << _id.abridged(); clog(NetNote) << "p2p.host.peer.register" << _id.abridged();
StructuredLogger::p2pConnected(_id.abridged(), ps->m_peer->peerEndpoint(), ps->m_peer->m_lastConnected, clientVersion, peerCount()); StructuredLogger::p2pConnected(_id.abridged(), ps->m_peer->peerEndpoint(), ps->m_peer->m_lastConnected, clientVersion, peerCount());
return true;
} }
void Host::onNodeTableEvent(NodeId const& _n, NodeTableEventType const& _e) void Host::onNodeTableEvent(NodeId const& _n, NodeTableEventType const& _e)
@ -340,22 +357,6 @@ void Host::runAcceptor()
clog(NetConnect) << "Listening on local port " << m_listenPort << " (public: " << m_tcpPublic << ")"; clog(NetConnect) << "Listening on local port " << m_listenPort << " (public: " << m_tcpPublic << ")";
m_accepting = true; m_accepting = true;
// socket is created outside of acceptor-callback
// An allocated socket is necessary as asio can use the socket
// until the callback succeeds or fails.
//
// Until callback succeeds or fails, we can't dealloc it.
//
// Callback is guaranteed to be called via asio or when
// m_tcp4Acceptor->stop() is called by Host.
//
// All exceptions are caught so they don't halt asio and so the
// socket is deleted.
//
// It's possible for an accepted connection to return an error in which
// case the socket may be open and must be closed to prevent asio from
// processing socket events after socket is deallocated.
auto socket = make_shared<RLPXSocket>(new bi::tcp::socket(m_ioService)); auto socket = make_shared<RLPXSocket>(new bi::tcp::socket(m_ioService));
m_tcp4Acceptor.async_accept(socket->ref(), [=](boost::system::error_code ec) m_tcp4Acceptor.async_accept(socket->ref(), [=](boost::system::error_code ec)
{ {
@ -367,6 +368,7 @@ void Host::runAcceptor()
{ {
// incoming connection; we don't yet know nodeid // incoming connection; we don't yet know nodeid
auto handshake = make_shared<RLPXHandshake>(this, socket); auto handshake = make_shared<RLPXHandshake>(this, socket);
m_connecting.push_back(handshake);
handshake->start(); handshake->start();
success = true; success = true;
} }
@ -435,15 +437,14 @@ void Host::addNode(NodeId const& _node, std::string const& _addr, unsigned short
void Host::connect(std::shared_ptr<Peer> const& _p) void Host::connect(std::shared_ptr<Peer> const& _p)
{ {
for (unsigned i = 0; i < 200; i++)
if (isWorking() && !m_run)
this_thread::sleep_for(chrono::milliseconds(50));
if (!m_run) if (!m_run)
return; return;
_p->m_lastAttempted = std::chrono::system_clock::now();
if (havePeerSession(_p->id)) if (havePeerSession(_p->id))
{ {
clog(NetWarn) << "Aborted connect. Node already connected."; clog(NetConnect) << "Aborted connect. Node already connected.";
return; return;
} }
@ -475,10 +476,15 @@ void Host::connect(std::shared_ptr<Peer> const& _p)
} }
else else
{ {
clog(NetConnect) << "Connected to" << _p->id.abridged() << "@" << _p->peerEndpoint(); clog(NetConnect) << "Connecting to" << _p->id.abridged() << "@" << _p->peerEndpoint();
auto handshake = make_shared<RLPXHandshake>(this, socket, _p->id); auto handshake = make_shared<RLPXHandshake>(this, socket, _p->id);
{
Guard l(x_connecting);
m_connecting.push_back(handshake);
}
handshake->start(); handshake->start();
} }
Guard l(x_pendingNodeConns); Guard l(x_pendingNodeConns);
m_pendingPeerConns.erase(nptr); m_pendingPeerConns.erase(nptr);
}); });
@ -527,26 +533,30 @@ void Host::run(boost::system::error_code const&)
m_nodeTable->processEvents(); m_nodeTable->processEvents();
// cleanup zombies
{
Guard l(x_connecting);
m_connecting.remove_if([](std::weak_ptr<RLPXHandshake> h){ return h.lock(); });
}
for (auto p: m_sessions) for (auto p: m_sessions)
if (auto pp = p.second.lock()) if (auto pp = p.second.lock())
pp->serviceNodesRequest(); pp->serviceNodesRequest();
// keepAlivePeers(); keepAlivePeers();
// disconnectLatePeers(); // disconnectLatePeers();
auto c = peerCount(); if (peerCount() < m_idealPeerCount)
if (m_idealPeerCount && !c) {
for (auto p: m_peers) for (auto p: m_peers)
if (p.second->shouldReconnect()) if (p.second->shouldReconnect())
{ {
// TODO p2p: fixme
p.second->m_lastAttempted = std::chrono::system_clock::now();
connect(p.second); connect(p.second);
break; break;
} }
if (c < m_idealPeerCount)
m_nodeTable->discover(); m_nodeTable->discover();
}
auto runcb = [this](boost::system::error_code const& error) { run(error); }; auto runcb = [this](boost::system::error_code const& error) { run(error); };
m_timer->expires_from_now(boost::posix_time::milliseconds(c_timerInterval)); m_timer->expires_from_now(boost::posix_time::milliseconds(c_timerInterval));

5
libp2p/Host.h

@ -151,7 +151,7 @@ public:
NodeId id() const { return m_alias.pub(); } NodeId id() const { return m_alias.pub(); }
/// Validates and starts peer session, taking ownership of _io. Disconnects and returns false upon error. /// Validates and starts peer session, taking ownership of _io. Disconnects and returns false upon error.
bool startPeerSession(Public const& _id, RLP const& _hello, RLPXFrameIO* _io, bi::tcp::endpoint _endpoint); void startPeerSession(Public const& _id, RLP const& _hello, RLPXFrameIO* _io, bi::tcp::endpoint _endpoint);
protected: protected:
void onNodeTableEvent(NodeId const& _n, NodeTableEventType const& _e); void onNodeTableEvent(NodeId const& _n, NodeTableEventType const& _e);
@ -224,6 +224,9 @@ private:
/// Mutable because we flush zombie entries (null-weakptrs) as regular maintenance from a const method. /// Mutable because we flush zombie entries (null-weakptrs) as regular maintenance from a const method.
mutable std::map<NodeId, std::weak_ptr<Session>> m_sessions; mutable std::map<NodeId, std::weak_ptr<Session>> m_sessions;
mutable RecursiveMutex x_sessions; mutable RecursiveMutex x_sessions;
std::list<std::weak_ptr<RLPXHandshake>> m_connecting; ///< Pending connections.
Mutex x_connecting;
unsigned m_idealPeerCount = 5; ///< Ideal number of peers to be connected to. unsigned m_idealPeerCount = 5; ///< Ideal number of peers to be connected to.

2
libp2p/RLPxHandshake.cpp

@ -168,6 +168,8 @@ void RLPXHandshake::transition(boost::system::error_code _ech)
else else
clog(NetConnect) << "p2p.connect.ingress sending capabilities handshake"; clog(NetConnect) << "p2p.connect.ingress sending capabilities handshake";
/// This pointer will be freed if there is an error otherwise
/// it will be passed to Host which will take ownership.
m_io = new RLPXFrameIO(*this); m_io = new RLPXFrameIO(*this);
// old packet format // old packet format

2
libp2p/RLPxHandshake.h

@ -72,6 +72,8 @@ public:
/// Start handshake. /// Start handshake.
void start() { transition(); } void start() { transition(); }
void cancel() { m_nextState = Error; }
protected: protected:
/// Write Auth message to socket and transitions to AckAuth. /// Write Auth message to socket and transitions to AckAuth.
void writeAuth(); void writeAuth();

Loading…
Cancel
Save