diff --git a/docs/HOWTO.rst b/docs/HOWTO.rst index eb7f4bd..c7bc41f 100644 --- a/docs/HOWTO.rst +++ b/docs/HOWTO.rst @@ -234,8 +234,9 @@ Terminating ElectrumX ===================== The preferred way to terminate the server process is to send it the -INT or TERM signals. For a daemontools supervised process this is best -done by bringing it down like so:: +**stop** RPC command, or alternatively on Unix the INT or TERM +signals. For a daemontools supervised process this can be done by +bringing it down like so:: svc -d ~/service/electrumx diff --git a/docs/RPC-INTERFACE.rst b/docs/RPC-INTERFACE.rst index 5705134..709c672 100644 --- a/docs/RPC-INTERFACE.rst +++ b/docs/RPC-INTERFACE.rst @@ -2,14 +2,26 @@ The ElectrumX RPC Interface =========================== You can query the status of a running server, and affect its behaviour -using the RPC interface. +by sending JSON RPC commands to the LocalRPC port it is listening on. +This is best done using the electrumx_rpc.py script provided. The general form of invocation is: - ``electrumx_rpc.py [arg1 [arg2...]`` + ``electrumx_rpc.py [-p PORT] [arg1 [arg2...]`` + +The port to send the commands to can be specified on the command line, +otherwise it is taken from the environment variable **RPC_PORT**, or +8000 is used if that is not set. The following commands are available: +* **stop** + + Flush all cached data to disk and shut down the server cleanly, as + if sending the KILL signal. Be patient - during initial sync + flushing all cached data to disk can take several minutes. This + command takes no arguments. + * **getinfo** Returns a summary of server state. This command takes no arguments. diff --git a/electrumx_server.py b/electrumx_server.py index d8a6029..f23542f 100755 --- a/electrumx_server.py +++ b/electrumx_server.py @@ -36,8 +36,9 @@ def main_loop(): def on_signal(signame): '''Call on receipt of a signal to cleanly shutdown.''' - logging.warning('received {} signal, shutting down'.format(signame)) - future.cancel() + logging.warning('received {} signal, initiating shutdown' + .format(signame)) + controller.initiate_shutdown() def on_exception(loop, context): '''Suppress spurious messages it appears we cannot control.''' @@ -47,8 +48,8 @@ def main_loop(): 'accept_connection2()' in repr(context.get('task'))): loop.default_exception_handler(context) - server = Controller(Env()) - future = asyncio.ensure_future(server.main_loop()) + controller = Controller(Env()) + future = asyncio.ensure_future(controller.main_loop()) # Install signal handlers for signame in ('SIGINT', 'SIGTERM'): diff --git a/server/block_processor.py b/server/block_processor.py index 6252e1a..100b292 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -193,6 +193,7 @@ class BlockProcessor(server.db.DB): '''Called by the controller to shut processing down.''' async def do_nothing(): pass + self.logger.info('preparing clean shutdown') self.stop = True self.add_task(do_nothing) # Ensure something is on the queue diff --git a/server/controller.py b/server/controller.py index 994ec0d..46534c3 100644 --- a/server/controller.py +++ b/server/controller.py @@ -9,7 +9,6 @@ import asyncio import codecs import json import os -import _socket import ssl import time from bisect import bisect_left @@ -50,6 +49,8 @@ class Controller(util.LoggedClass): def __init__(self, env): super().__init__() + # Set this event to cleanly shutdown + self.shutdown_event = asyncio.Event() self.loop = asyncio.get_event_loop() self.start = time.time() self.coin = env.coin @@ -76,11 +77,11 @@ class Controller(util.LoggedClass): self.delayed_sessions = [] self.next_queue_id = 0 self.cache_height = 0 - self.futures = [] env.max_send = max(350000, env.max_send) self.setup_bands() # Set up the RPC request handlers - cmds = 'disconnect getinfo groups log peers reorg sessions'.split() + cmds = ('disconnect getinfo groups log peers reorg sessions stop' + .split()) self.rpc_handlers = {cmd: getattr(self, 'rpc_' + cmd) for cmd in cmds} # Set up the ElectrumX request handlers rpcs = [ @@ -189,37 +190,75 @@ class Controller(util.LoggedClass): if session in self.sessions: await session.serve_requests() + def initiate_shutdown(self): + '''Call this function to start the shutdown process.''' + self.shutdown_event.set() + async def main_loop(self): '''Controller main loop.''' def add_future(coro): - self.futures.append(asyncio.ensure_future(coro)) - - # shutdown() assumes bp.main_loop() is first - add_future(self.bp.main_loop()) + futures.append(asyncio.ensure_future(coro)) + + async def await_bp_catchup(): + '''Wait for the block processor to catch up. + + When it has, start the servers and connect to IRC. + ''' + await self.bp.caught_up_event.wait() + self.logger.info('block processor has caught up') + add_future(self.irc.start()) + add_future(self.start_servers()) + add_future(self.mempool.main_loop()) + add_future(self.enqueue_delayed_sessions()) + add_future(self.notify()) + for n in range(4): + add_future(self.serve_requests()) + + bp_future = asyncio.ensure_future(self.bp.main_loop()) + futures = [] add_future(self.bp.prefetcher.main_loop()) - add_future(self.irc.start(self.bp.caught_up_event)) - add_future(self.start_servers(self.bp.caught_up_event)) - add_future(self.mempool.main_loop()) - add_future(self.enqueue_delayed_sessions()) - add_future(self.notify()) - for n in range(4): - add_future(self.serve_requests()) - - for future in asyncio.as_completed(self.futures): - try: - await future # Note: future is not one of self.futures - except asyncio.CancelledError: - break + add_future(await_bp_catchup()) + + # Perform a clean shutdown when this event is signalled. + await self.shutdown_event.wait() + self.logger.info('shutting down gracefully') + self.state = self.SHUTTING_DOWN + + # First tell the block processor to shut down, it may need to + # perform a lengthy flush. Then shut down the rest. + self.bp.on_shutdown() + self.close_servers(list(self.servers.keys())) + for future in futures: + future.cancel() - await self.shutdown() + # Now wait for the cleanup to complete + await self.close_sessions() + if not bp_future.done(): + self.logger.info('waiting for block processor') + await bp_future def close_servers(self, kinds): '''Close the servers of the given kinds (TCP etc.).''' + self.logger.info('closing down {} listening servers' + .format(', '.join(kinds))) for kind in kinds: server = self.servers.pop(kind, None) if server: server.close() - # Don't bother awaiting the close - we're not async + + async def close_sessions(self, secs=30): + if not self.sessions: + return + self.logger.info('waiting up to {:d} seconds for socket cleanup' + .format(secs)) + for session in self.sessions: + self.close_session(session) + limit = time.time() + secs + while self.sessions and time.time() < limit: + self.clear_stale_sessions(grace=secs//2) + await asyncio.sleep(2) + self.logger.info('{:,d} sessions remaining' + .format(len(self.sessions))) async def start_server(self, kind, *args, **kw_args): protocol_class = LocalRPC if kind == 'RPC' else ElectrumX @@ -236,12 +275,10 @@ class Controller(util.LoggedClass): self.logger.info('{} server listening on {}:{:d}' .format(kind, host, port)) - async def start_servers(self, caught_up): + async def start_servers(self): '''Start RPC, TCP and SSL servers once caught up.''' if self.env.rpc_port is not None: await self.start_server('RPC', 'localhost', self.env.rpc_port) - await caught_up.wait() - _socket.setdefaulttimeout(5) self.logger.info('max session count: {:,d}'.format(self.max_sessions)) self.logger.info('session timeout: {:,d} seconds' .format(self.env.session_timeout)) @@ -312,31 +349,6 @@ class Controller(util.LoggedClass): self.header_cache[height] = header return header - async def shutdown(self): - '''Call to shutdown everything. Returns when done.''' - self.state = self.SHUTTING_DOWN - self.bp.on_shutdown() - self.close_servers(list(self.servers.keys())) - # Don't cancel the block processor main loop - let it close itself - for future in self.futures[1:]: - future.cancel() - if self.sessions: - await self.close_sessions() - await self.futures[0] - - async def close_sessions(self, secs=30): - self.logger.info('cleanly closing client sessions, please wait...') - for session in self.sessions: - self.close_session(session) - self.logger.info('listening sockets closed, waiting up to ' - '{:d} seconds for socket cleanup'.format(secs)) - limit = time.time() + secs - while self.sessions and time.time() < limit: - self.clear_stale_sessions(grace=secs//2) - await asyncio.sleep(2) - self.logger.info('{:,d} sessions remaining' - .format(len(self.sessions))) - def add_session(self, session): now = time.time() if now > self.next_stale_check: @@ -559,6 +571,11 @@ class Controller(util.LoggedClass): ''' return self.for_each_session(session_ids, self.toggle_logging) + async def rpc_stop(self): + '''Shut down the server cleanly.''' + self.initiate_shutdown() + return 'stopping' + async def rpc_getinfo(self): '''Return summary information about the server process.''' return self.server_summary() diff --git a/server/irc.py b/server/irc.py index 0e4ed48..31154d6 100644 --- a/server/irc.py +++ b/server/irc.py @@ -55,9 +55,8 @@ class IRC(LoggedClass): self.peer_regexp = re.compile('({}[^!]*)!'.format(self.prefix)) self.peers = {} - async def start(self, caught_up): - '''Start IRC connections once caught up if enabled in environment.''' - await caught_up.wait() + async def start(self): + '''Start IRC connections if enabled in environment.''' try: if self.env.irc: await self.join()