Browse Source

Clean up shutdown logic and logging

Add RPC "stop" command, and document it.

Fixes the 2nd part of #100
master
Neil Booth 8 years ago
parent
commit
8d58d2a0e7
  1. 5
      docs/HOWTO.rst
  2. 16
      docs/RPC-INTERFACE.rst
  3. 9
      electrumx_server.py
  4. 1
      server/block_processor.py
  5. 117
      server/controller.py
  6. 5
      server/irc.py

5
docs/HOWTO.rst

@ -234,8 +234,9 @@ Terminating ElectrumX
===================== =====================
The preferred way to terminate the server process is to send it the The preferred way to terminate the server process is to send it the
INT or TERM signals. For a daemontools supervised process this is best **stop** RPC command, or alternatively on Unix the INT or TERM
done by bringing it down like so:: signals. For a daemontools supervised process this can be done by
bringing it down like so::
svc -d ~/service/electrumx svc -d ~/service/electrumx

16
docs/RPC-INTERFACE.rst

@ -2,14 +2,26 @@ The ElectrumX RPC Interface
=========================== ===========================
You can query the status of a running server, and affect its behaviour You can query the status of a running server, and affect its behaviour
using the RPC interface. by sending JSON RPC commands to the LocalRPC port it is listening on.
This is best done using the electrumx_rpc.py script provided.
The general form of invocation is: The general form of invocation is:
``electrumx_rpc.py <command> [arg1 [arg2...]`` ``electrumx_rpc.py [-p PORT] <command> [arg1 [arg2...]``
The port to send the commands to can be specified on the command line,
otherwise it is taken from the environment variable **RPC_PORT**, or
8000 is used if that is not set.
The following commands are available: The following commands are available:
* **stop**
Flush all cached data to disk and shut down the server cleanly, as
if sending the KILL signal. Be patient - during initial sync
flushing all cached data to disk can take several minutes. This
command takes no arguments.
* **getinfo** * **getinfo**
Returns a summary of server state. This command takes no arguments. Returns a summary of server state. This command takes no arguments.

9
electrumx_server.py

@ -36,8 +36,9 @@ def main_loop():
def on_signal(signame): def on_signal(signame):
'''Call on receipt of a signal to cleanly shutdown.''' '''Call on receipt of a signal to cleanly shutdown.'''
logging.warning('received {} signal, shutting down'.format(signame)) logging.warning('received {} signal, initiating shutdown'
future.cancel() .format(signame))
controller.initiate_shutdown()
def on_exception(loop, context): def on_exception(loop, context):
'''Suppress spurious messages it appears we cannot control.''' '''Suppress spurious messages it appears we cannot control.'''
@ -47,8 +48,8 @@ def main_loop():
'accept_connection2()' in repr(context.get('task'))): 'accept_connection2()' in repr(context.get('task'))):
loop.default_exception_handler(context) loop.default_exception_handler(context)
server = Controller(Env()) controller = Controller(Env())
future = asyncio.ensure_future(server.main_loop()) future = asyncio.ensure_future(controller.main_loop())
# Install signal handlers # Install signal handlers
for signame in ('SIGINT', 'SIGTERM'): for signame in ('SIGINT', 'SIGTERM'):

1
server/block_processor.py

@ -193,6 +193,7 @@ class BlockProcessor(server.db.DB):
'''Called by the controller to shut processing down.''' '''Called by the controller to shut processing down.'''
async def do_nothing(): async def do_nothing():
pass pass
self.logger.info('preparing clean shutdown')
self.stop = True self.stop = True
self.add_task(do_nothing) # Ensure something is on the queue self.add_task(do_nothing) # Ensure something is on the queue

117
server/controller.py

@ -9,7 +9,6 @@ import asyncio
import codecs import codecs
import json import json
import os import os
import _socket
import ssl import ssl
import time import time
from bisect import bisect_left from bisect import bisect_left
@ -50,6 +49,8 @@ class Controller(util.LoggedClass):
def __init__(self, env): def __init__(self, env):
super().__init__() super().__init__()
# Set this event to cleanly shutdown
self.shutdown_event = asyncio.Event()
self.loop = asyncio.get_event_loop() self.loop = asyncio.get_event_loop()
self.start = time.time() self.start = time.time()
self.coin = env.coin self.coin = env.coin
@ -76,11 +77,11 @@ class Controller(util.LoggedClass):
self.delayed_sessions = [] self.delayed_sessions = []
self.next_queue_id = 0 self.next_queue_id = 0
self.cache_height = 0 self.cache_height = 0
self.futures = []
env.max_send = max(350000, env.max_send) env.max_send = max(350000, env.max_send)
self.setup_bands() self.setup_bands()
# Set up the RPC request handlers # Set up the RPC request handlers
cmds = 'disconnect getinfo groups log peers reorg sessions'.split() cmds = ('disconnect getinfo groups log peers reorg sessions stop'
.split())
self.rpc_handlers = {cmd: getattr(self, 'rpc_' + cmd) for cmd in cmds} self.rpc_handlers = {cmd: getattr(self, 'rpc_' + cmd) for cmd in cmds}
# Set up the ElectrumX request handlers # Set up the ElectrumX request handlers
rpcs = [ rpcs = [
@ -189,37 +190,75 @@ class Controller(util.LoggedClass):
if session in self.sessions: if session in self.sessions:
await session.serve_requests() await session.serve_requests()
def initiate_shutdown(self):
'''Call this function to start the shutdown process.'''
self.shutdown_event.set()
async def main_loop(self): async def main_loop(self):
'''Controller main loop.''' '''Controller main loop.'''
def add_future(coro): def add_future(coro):
self.futures.append(asyncio.ensure_future(coro)) futures.append(asyncio.ensure_future(coro))
# shutdown() assumes bp.main_loop() is first async def await_bp_catchup():
add_future(self.bp.main_loop()) '''Wait for the block processor to catch up.
When it has, start the servers and connect to IRC.
'''
await self.bp.caught_up_event.wait()
self.logger.info('block processor has caught up')
add_future(self.irc.start())
add_future(self.start_servers())
add_future(self.mempool.main_loop())
add_future(self.enqueue_delayed_sessions())
add_future(self.notify())
for n in range(4):
add_future(self.serve_requests())
bp_future = asyncio.ensure_future(self.bp.main_loop())
futures = []
add_future(self.bp.prefetcher.main_loop()) add_future(self.bp.prefetcher.main_loop())
add_future(self.irc.start(self.bp.caught_up_event)) add_future(await_bp_catchup())
add_future(self.start_servers(self.bp.caught_up_event))
add_future(self.mempool.main_loop()) # Perform a clean shutdown when this event is signalled.
add_future(self.enqueue_delayed_sessions()) await self.shutdown_event.wait()
add_future(self.notify()) self.logger.info('shutting down gracefully')
for n in range(4): self.state = self.SHUTTING_DOWN
add_future(self.serve_requests())
# First tell the block processor to shut down, it may need to
for future in asyncio.as_completed(self.futures): # perform a lengthy flush. Then shut down the rest.
try: self.bp.on_shutdown()
await future # Note: future is not one of self.futures self.close_servers(list(self.servers.keys()))
except asyncio.CancelledError: for future in futures:
break future.cancel()
await self.shutdown() # Now wait for the cleanup to complete
await self.close_sessions()
if not bp_future.done():
self.logger.info('waiting for block processor')
await bp_future
def close_servers(self, kinds): def close_servers(self, kinds):
'''Close the servers of the given kinds (TCP etc.).''' '''Close the servers of the given kinds (TCP etc.).'''
self.logger.info('closing down {} listening servers'
.format(', '.join(kinds)))
for kind in kinds: for kind in kinds:
server = self.servers.pop(kind, None) server = self.servers.pop(kind, None)
if server: if server:
server.close() server.close()
# Don't bother awaiting the close - we're not async
async def close_sessions(self, secs=30):
if not self.sessions:
return
self.logger.info('waiting up to {:d} seconds for socket cleanup'
.format(secs))
for session in self.sessions:
self.close_session(session)
limit = time.time() + secs
while self.sessions and time.time() < limit:
self.clear_stale_sessions(grace=secs//2)
await asyncio.sleep(2)
self.logger.info('{:,d} sessions remaining'
.format(len(self.sessions)))
async def start_server(self, kind, *args, **kw_args): async def start_server(self, kind, *args, **kw_args):
protocol_class = LocalRPC if kind == 'RPC' else ElectrumX protocol_class = LocalRPC if kind == 'RPC' else ElectrumX
@ -236,12 +275,10 @@ class Controller(util.LoggedClass):
self.logger.info('{} server listening on {}:{:d}' self.logger.info('{} server listening on {}:{:d}'
.format(kind, host, port)) .format(kind, host, port))
async def start_servers(self, caught_up): async def start_servers(self):
'''Start RPC, TCP and SSL servers once caught up.''' '''Start RPC, TCP and SSL servers once caught up.'''
if self.env.rpc_port is not None: if self.env.rpc_port is not None:
await self.start_server('RPC', 'localhost', self.env.rpc_port) await self.start_server('RPC', 'localhost', self.env.rpc_port)
await caught_up.wait()
_socket.setdefaulttimeout(5)
self.logger.info('max session count: {:,d}'.format(self.max_sessions)) self.logger.info('max session count: {:,d}'.format(self.max_sessions))
self.logger.info('session timeout: {:,d} seconds' self.logger.info('session timeout: {:,d} seconds'
.format(self.env.session_timeout)) .format(self.env.session_timeout))
@ -312,31 +349,6 @@ class Controller(util.LoggedClass):
self.header_cache[height] = header self.header_cache[height] = header
return header return header
async def shutdown(self):
'''Call to shutdown everything. Returns when done.'''
self.state = self.SHUTTING_DOWN
self.bp.on_shutdown()
self.close_servers(list(self.servers.keys()))
# Don't cancel the block processor main loop - let it close itself
for future in self.futures[1:]:
future.cancel()
if self.sessions:
await self.close_sessions()
await self.futures[0]
async def close_sessions(self, secs=30):
self.logger.info('cleanly closing client sessions, please wait...')
for session in self.sessions:
self.close_session(session)
self.logger.info('listening sockets closed, waiting up to '
'{:d} seconds for socket cleanup'.format(secs))
limit = time.time() + secs
while self.sessions and time.time() < limit:
self.clear_stale_sessions(grace=secs//2)
await asyncio.sleep(2)
self.logger.info('{:,d} sessions remaining'
.format(len(self.sessions)))
def add_session(self, session): def add_session(self, session):
now = time.time() now = time.time()
if now > self.next_stale_check: if now > self.next_stale_check:
@ -559,6 +571,11 @@ class Controller(util.LoggedClass):
''' '''
return self.for_each_session(session_ids, self.toggle_logging) return self.for_each_session(session_ids, self.toggle_logging)
async def rpc_stop(self):
'''Shut down the server cleanly.'''
self.initiate_shutdown()
return 'stopping'
async def rpc_getinfo(self): async def rpc_getinfo(self):
'''Return summary information about the server process.''' '''Return summary information about the server process.'''
return self.server_summary() return self.server_summary()

5
server/irc.py

@ -55,9 +55,8 @@ class IRC(LoggedClass):
self.peer_regexp = re.compile('({}[^!]*)!'.format(self.prefix)) self.peer_regexp = re.compile('({}[^!]*)!'.format(self.prefix))
self.peers = {} self.peers = {}
async def start(self, caught_up): async def start(self):
'''Start IRC connections once caught up if enabled in environment.''' '''Start IRC connections if enabled in environment.'''
await caught_up.wait()
try: try:
if self.env.irc: if self.env.irc:
await self.join() await self.join()

Loading…
Cancel
Save