Browse Source

Give sockets a grace period to close

Defaults to 15 seconds for now.
Update docs on systemd and process limits.
Fixes #62
master
Neil Booth 8 years ago
parent
commit
99d7e65de8
  1. 19
      docs/HOWTO.rst
  2. 3
      lib/jsonrpc.py
  3. 49
      server/protocol.py

19
docs/HOWTO.rst

@ -75,6 +75,21 @@ on an SSD::
mkdir /path/to/db_directory mkdir /path/to/db_directory
chown electrumx /path/to/db_directory chown electrumx /path/to/db_directory
Process limits
--------------
You should ensure the ElectrumX process has a large open file limit.
During sync it should not need more than about 1,024 open files. When
serving it will use approximately 256 for LevelDB plus the number of
incoming connections. It is not unusual to have 1,000 to 2,000
connections being served, so I suggest you set your open files limit
to at least 2,500.
Note that setting the limit in your shell does NOT affect ElectrumX
unless you are invoking ElectrumX directly from your shell. If you
are using systemd, you need to set it in the .service file (see
samples/systemd/electrumx.service in the ElectrumX source).
Using daemontools Using daemontools
----------------- -----------------
@ -158,6 +173,10 @@ Once configured, you may want to start ElectrumX at boot::
systemctl enable electrumx systemctl enable electrumx
systemd is aggressive in shutting down processes. ElectrumX can need
several minutes to flush cached data to disk during sync. You should
set TimeoutStopSec to at least 10 mins in your .service file.
Sync Progress Sync Progress
============= =============

3
lib/jsonrpc.py

@ -145,6 +145,7 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
def __init__(self): def __init__(self):
super().__init__() super().__init__()
self.start = time.time() self.start = time.time()
self.stop = 0
self.last_recv = self.start self.last_recv = self.start
self.bandwidth_start = self.start self.bandwidth_start = self.start
self.bandwidth_interval = 3600 self.bandwidth_interval = 3600
@ -195,9 +196,9 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
pass pass
def close_connection(self): def close_connection(self):
self.stop = time.time()
if self.transport: if self.transport:
self.transport.close() self.transport.close()
self.socket.shutdown(socket.SHUT_RDWR)
def using_bandwidth(self, amount): def using_bandwidth(self, amount):
now = time.time() now = time.time()

49
server/protocol.py

@ -462,11 +462,12 @@ class ServerManager(util.LoggedClass):
self.logger.info('cleanly closing client sessions, please wait...') self.logger.info('cleanly closing client sessions, please wait...')
for session in self.sessions: for session in self.sessions:
self.close_session(session) self.close_session(session)
self.logger.info('server listening sockets closed, waiting ' self.logger.info('listening sockets closed, waiting up to '
'{:d} seconds for socket cleanup'.format(secs)) '{:d} seconds for socket cleanup'.format(secs))
limit = time.time() + secs limit = time.time() + secs
while self.sessions and time.time() < limit: while self.sessions and time.time() < limit:
await asyncio.sleep(4) self.clear_stale_sessions(grace=secs//2)
await asyncio.sleep(2)
self.logger.info('{:,d} sessions remaining' self.logger.info('{:,d} sessions remaining'
.format(len(self.sessions))) .format(len(self.sessions)))
@ -474,7 +475,10 @@ class ServerManager(util.LoggedClass):
# Some connections are acknowledged after the servers are closed # Some connections are acknowledged after the servers are closed
if not self.servers: if not self.servers:
return return
self.clear_stale_sessions() now = time.time()
if now > self.next_stale_check:
self.next_stale_check = now + 60
self.clear_stale_sessions()
group = self.groups[int(session.start - self.start) // 60] group = self.groups[int(session.start - self.start) // 60]
group.add(session) group.add(session)
self.sessions[session] = group self.sessions[session] = group
@ -496,23 +500,30 @@ class ServerManager(util.LoggedClass):
session.log_me = not session.log_me session.log_me = not session.log_me
return 'log {:d}: {}'.format(session.id_, session.log_me) return 'log {:d}: {}'.format(session.id_, session.log_me)
def clear_stale_sessions(self): def clear_stale_sessions(self, grace=15):
'''Cut off sessions that haven't done anything for 10 minutes.''' '''Cut off sessions that haven't done anything for 10 minutes. Force
close stubborn connections that won't close cleanly after a
short grace period.
'''
now = time.time() now = time.time()
if now > self.next_stale_check: shutdown_cutoff = now - grace
self.next_stale_check = now + 60 stale_cutoff = now - self.env.session_timeout
# Clear out empty groups
for key in [k for k, v in self.groups.items() if not v]: stale = []
del self.groups[key] for session in self.sessions:
cutoff = now - self.env.session_timeout if session.is_closing():
stale = [session for session in self.sessions if session.stop <= shutdown_cutoff and session.socket:
if session.last_recv < cutoff # Should trigger a call to connection_lost very soon
and not session.is_closing()] self.socket.shutdown(socket.SHUT_RDWR)
for session in stale: else:
self.close_session(session) if session.last_recv < stale_cutoff:
if stale: self.close_session(session)
self.logger.info('closing stale connections {}' stale.append(session.id_)
.format([session.id_ for session in stale])) if stale:
self.logger.info('closing stale connections {}'.format(stale))
# Clear out empty groups
for key in [k for k, v in self.groups.items() if not v]:
del self.groups[key]
def new_subscription(self): def new_subscription(self):
if self.subscription_count >= self.max_subs: if self.subscription_count >= self.max_subs:

Loading…
Cancel
Save