From 99d7e65de88e89ffa2cea831ca0eae37d7aed7f0 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Sat, 10 Dec 2016 08:42:09 +0900 Subject: [PATCH] Give sockets a grace period to close Defaults to 15 seconds for now. Update docs on systemd and process limits. Fixes #62 --- docs/HOWTO.rst | 19 ++++++++++++++++++ lib/jsonrpc.py | 3 ++- server/protocol.py | 49 ++++++++++++++++++++++++++++------------------ 3 files changed, 51 insertions(+), 20 deletions(-) diff --git a/docs/HOWTO.rst b/docs/HOWTO.rst index 20cae89..00cdb49 100644 --- a/docs/HOWTO.rst +++ b/docs/HOWTO.rst @@ -75,6 +75,21 @@ on an SSD:: mkdir /path/to/db_directory chown electrumx /path/to/db_directory +Process limits +-------------- + +You should ensure the ElectrumX process has a large open file limit. +During sync it should not need more than about 1,024 open files. When +serving it will use approximately 256 for LevelDB plus the number of +incoming connections. It is not unusual to have 1,000 to 2,000 +connections being served, so I suggest you set your open files limit +to at least 2,500. + +Note that setting the limit in your shell does NOT affect ElectrumX +unless you are invoking ElectrumX directly from your shell. If you +are using systemd, you need to set it in the .service file (see +samples/systemd/electrumx.service in the ElectrumX source). + Using daemontools ----------------- @@ -158,6 +173,10 @@ Once configured, you may want to start ElectrumX at boot:: systemctl enable electrumx +systemd is aggressive in shutting down processes. ElectrumX can need +several minutes to flush cached data to disk during sync. You should +set TimeoutStopSec to at least 10 mins in your .service file. + Sync Progress ============= diff --git a/lib/jsonrpc.py b/lib/jsonrpc.py index 4f30282..c13f2b0 100644 --- a/lib/jsonrpc.py +++ b/lib/jsonrpc.py @@ -145,6 +145,7 @@ class JSONRPC(asyncio.Protocol, LoggedClass): def __init__(self): super().__init__() self.start = time.time() + self.stop = 0 self.last_recv = self.start self.bandwidth_start = self.start self.bandwidth_interval = 3600 @@ -195,9 +196,9 @@ class JSONRPC(asyncio.Protocol, LoggedClass): pass def close_connection(self): + self.stop = time.time() if self.transport: self.transport.close() - self.socket.shutdown(socket.SHUT_RDWR) def using_bandwidth(self, amount): now = time.time() diff --git a/server/protocol.py b/server/protocol.py index 0adece4..61766a4 100644 --- a/server/protocol.py +++ b/server/protocol.py @@ -462,11 +462,12 @@ class ServerManager(util.LoggedClass): self.logger.info('cleanly closing client sessions, please wait...') for session in self.sessions: self.close_session(session) - self.logger.info('server listening sockets closed, waiting ' + self.logger.info('listening sockets closed, waiting up to ' '{:d} seconds for socket cleanup'.format(secs)) limit = time.time() + secs while self.sessions and time.time() < limit: - await asyncio.sleep(4) + self.clear_stale_sessions(grace=secs//2) + await asyncio.sleep(2) self.logger.info('{:,d} sessions remaining' .format(len(self.sessions))) @@ -474,7 +475,10 @@ class ServerManager(util.LoggedClass): # Some connections are acknowledged after the servers are closed if not self.servers: return - self.clear_stale_sessions() + now = time.time() + if now > self.next_stale_check: + self.next_stale_check = now + 60 + self.clear_stale_sessions() group = self.groups[int(session.start - self.start) // 60] group.add(session) self.sessions[session] = group @@ -496,23 +500,30 @@ class ServerManager(util.LoggedClass): session.log_me = not session.log_me return 'log {:d}: {}'.format(session.id_, session.log_me) - def clear_stale_sessions(self): - '''Cut off sessions that haven't done anything for 10 minutes.''' + def clear_stale_sessions(self, grace=15): + '''Cut off sessions that haven't done anything for 10 minutes. Force + close stubborn connections that won't close cleanly after a + short grace period. + ''' now = time.time() - if now > self.next_stale_check: - self.next_stale_check = now + 60 - # Clear out empty groups - for key in [k for k, v in self.groups.items() if not v]: - del self.groups[key] - cutoff = now - self.env.session_timeout - stale = [session for session in self.sessions - if session.last_recv < cutoff - and not session.is_closing()] - for session in stale: - self.close_session(session) - if stale: - self.logger.info('closing stale connections {}' - .format([session.id_ for session in stale])) + shutdown_cutoff = now - grace + stale_cutoff = now - self.env.session_timeout + + stale = [] + for session in self.sessions: + if session.is_closing(): + if session.stop <= shutdown_cutoff and session.socket: + # Should trigger a call to connection_lost very soon + self.socket.shutdown(socket.SHUT_RDWR) + else: + if session.last_recv < stale_cutoff: + self.close_session(session) + stale.append(session.id_) + if stale: + self.logger.info('closing stale connections {}'.format(stale)) + # Clear out empty groups + for key in [k for k, v in self.groups.items() if not v]: + del self.groups[key] def new_subscription(self): if self.subscription_count >= self.max_subs: