diff --git a/README.rst b/README.rst index ff5eeda..e3de606 100644 --- a/README.rst +++ b/README.rst @@ -124,9 +124,6 @@ Roadmap Post-1.0 - Python 3.6, which has several performance improvements relevant to ElectrumX - UTXO root logic and implementation -- improve DB abstraction so LMDB is not penalized -- investigate effects of cache defaults and DB configuration defaults - on sync time and simplify / optimize the default config accordingly - potentially move some functionality to C or C++ @@ -140,6 +137,13 @@ version prior to the release of 1.0. ChangeLog ========= +Version 0.10.5 +-------------- + +* fix for second part of issue `#100`_ where the ElectrumX was not + killable if bitcoind was unavailable + + Version 0.10.4 -------------- @@ -254,6 +258,7 @@ Version 0.9.17 .. _#93: https://github.com/kyuupichan/electrumx/issues/93 .. _#94: https://github.com/kyuupichan/electrumx/issues/94 .. _#99: https://github.com/kyuupichan/electrumx/issues/99 +.. _#100: https://github.com/kyuupichan/electrumx/issues/100 .. _#101: https://github.com/kyuupichan/electrumx/issues/101 .. _#102: https://github.com/kyuupichan/electrumx/issues/102 .. _#103: https://github.com/kyuupichan/electrumx/issues/103 diff --git a/docs/HOWTO.rst b/docs/HOWTO.rst index eb7f4bd..8f81b4b 100644 --- a/docs/HOWTO.rst +++ b/docs/HOWTO.rst @@ -45,16 +45,15 @@ recommend having at least 30-40GB of free space before starting. Database Engine =============== -You can choose from RocksDB, LevelDB or LMDB to store transaction -information on disk. Currently, the fastest seems to be RocksDB with -LevelDB being slightly slower. LMDB is slowest but that is because the -code needs reworking to be better usable with LMDB. +You can choose from LevelDB and RocksDB to store transaction +information on disk. The time taken and DB size is not significantly +different. We tried to support LMDB but its history write performance +was much worse. You will need to install one of: + `plyvel `_ for LevelDB + `pyrocksdb `_ for RocksDB -+ `lmdb `_ for LMDB Running ======= @@ -234,8 +233,9 @@ Terminating ElectrumX ===================== The preferred way to terminate the server process is to send it the -INT or TERM signals. For a daemontools supervised process this is best -done by bringing it down like so:: +**stop** RPC command, or alternatively on Unix the INT or TERM +signals. For a daemontools supervised process this can be done by +bringing it down like so:: svc -d ~/service/electrumx diff --git a/docs/RPC-INTERFACE.rst b/docs/RPC-INTERFACE.rst index 5705134..709c672 100644 --- a/docs/RPC-INTERFACE.rst +++ b/docs/RPC-INTERFACE.rst @@ -2,14 +2,26 @@ The ElectrumX RPC Interface =========================== You can query the status of a running server, and affect its behaviour -using the RPC interface. +by sending JSON RPC commands to the LocalRPC port it is listening on. +This is best done using the electrumx_rpc.py script provided. The general form of invocation is: - ``electrumx_rpc.py [arg1 [arg2...]`` + ``electrumx_rpc.py [-p PORT] [arg1 [arg2...]`` + +The port to send the commands to can be specified on the command line, +otherwise it is taken from the environment variable **RPC_PORT**, or +8000 is used if that is not set. The following commands are available: +* **stop** + + Flush all cached data to disk and shut down the server cleanly, as + if sending the KILL signal. Be patient - during initial sync + flushing all cached data to disk can take several minutes. This + command takes no arguments. + * **getinfo** Returns a summary of server state. This command takes no arguments. diff --git a/electrumx_server.py b/electrumx_server.py index d8a6029..f23542f 100755 --- a/electrumx_server.py +++ b/electrumx_server.py @@ -36,8 +36,9 @@ def main_loop(): def on_signal(signame): '''Call on receipt of a signal to cleanly shutdown.''' - logging.warning('received {} signal, shutting down'.format(signame)) - future.cancel() + logging.warning('received {} signal, initiating shutdown' + .format(signame)) + controller.initiate_shutdown() def on_exception(loop, context): '''Suppress spurious messages it appears we cannot control.''' @@ -47,8 +48,8 @@ def main_loop(): 'accept_connection2()' in repr(context.get('task'))): loop.default_exception_handler(context) - server = Controller(Env()) - future = asyncio.ensure_future(server.main_loop()) + controller = Controller(Env()) + future = asyncio.ensure_future(controller.main_loop()) # Install signal handlers for signame in ('SIGINT', 'SIGTERM'): diff --git a/server/block_processor.py b/server/block_processor.py index 6252e1a..157b5ab 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -15,7 +15,7 @@ import time from collections import defaultdict from functools import partial -from server.daemon import Daemon, DaemonError +from server.daemon import DaemonError from server.version import VERSION from lib.hash import hash_to_str from lib.util import chunks, formatted_time, LoggedClass @@ -138,8 +138,9 @@ class BlockProcessor(server.db.DB): Coordinate backing up in case of chain reorganisations. ''' - def __init__(self, env): + def __init__(self, env, daemon): super().__init__(env) + self.daemon = daemon # These are our state as we move ahead of DB state self.fs_height = self.db_height @@ -148,7 +149,6 @@ class BlockProcessor(server.db.DB): self.tip = self.db_tip self.tx_count = self.db_tx_count - self.daemon = Daemon(self.coin.daemon_urls(env.daemon_url)) self.caught_up_event = asyncio.Event() self.task_queue = asyncio.Queue() self.stop = False @@ -193,8 +193,10 @@ class BlockProcessor(server.db.DB): '''Called by the controller to shut processing down.''' async def do_nothing(): pass + self.logger.info('preparing clean shutdown') self.stop = True - self.add_task(do_nothing) # Ensure something is on the queue + # Ensure something is on the queue so main_loop notices self.stop + self.add_task(do_nothing) async def main_loop(self): '''Main loop for block processing.''' diff --git a/server/controller.py b/server/controller.py index 994ec0d..ee8bf64 100644 --- a/server/controller.py +++ b/server/controller.py @@ -9,7 +9,6 @@ import asyncio import codecs import json import os -import _socket import ssl import time from bisect import bisect_left @@ -22,6 +21,7 @@ from lib.jsonrpc import JSONRPC, RPCError, RequestBase from lib.hash import sha256, double_sha256, hash_to_str, hex_str_to_hash import lib.util as util from server.block_processor import BlockProcessor +from server.daemon import Daemon from server.irc import IRC from server.session import LocalRPC, ElectrumX from server.mempool import MemPool @@ -50,11 +50,13 @@ class Controller(util.LoggedClass): def __init__(self, env): super().__init__() + # Set this event to cleanly shutdown + self.shutdown_event = asyncio.Event() self.loop = asyncio.get_event_loop() self.start = time.time() self.coin = env.coin - self.bp = BlockProcessor(env) - self.daemon = self.bp.daemon + self.daemon = Daemon(env.coin.daemon_urls(env.daemon_url)) + self.bp = BlockProcessor(env, self.daemon) self.mempool = MemPool(self.bp) self.irc = IRC(env) self.env = env @@ -76,11 +78,11 @@ class Controller(util.LoggedClass): self.delayed_sessions = [] self.next_queue_id = 0 self.cache_height = 0 - self.futures = [] env.max_send = max(350000, env.max_send) self.setup_bands() # Set up the RPC request handlers - cmds = 'disconnect getinfo groups log peers reorg sessions'.split() + cmds = ('disconnect getinfo groups log peers reorg sessions stop' + .split()) self.rpc_handlers = {cmd: getattr(self, 'rpc_' + cmd) for cmd in cmds} # Set up the ElectrumX request handlers rpcs = [ @@ -189,37 +191,75 @@ class Controller(util.LoggedClass): if session in self.sessions: await session.serve_requests() + def initiate_shutdown(self): + '''Call this function to start the shutdown process.''' + self.shutdown_event.set() + async def main_loop(self): '''Controller main loop.''' def add_future(coro): - self.futures.append(asyncio.ensure_future(coro)) - - # shutdown() assumes bp.main_loop() is first - add_future(self.bp.main_loop()) + futures.append(asyncio.ensure_future(coro)) + + async def await_bp_catchup(): + '''Wait for the block processor to catch up. + + When it has, start the servers and connect to IRC. + ''' + await self.bp.caught_up_event.wait() + self.logger.info('block processor has caught up') + add_future(self.irc.start()) + add_future(self.start_servers()) + add_future(self.mempool.main_loop()) + add_future(self.enqueue_delayed_sessions()) + add_future(self.notify()) + for n in range(4): + add_future(self.serve_requests()) + + bp_future = asyncio.ensure_future(self.bp.main_loop()) + futures = [] add_future(self.bp.prefetcher.main_loop()) - add_future(self.irc.start(self.bp.caught_up_event)) - add_future(self.start_servers(self.bp.caught_up_event)) - add_future(self.mempool.main_loop()) - add_future(self.enqueue_delayed_sessions()) - add_future(self.notify()) - for n in range(4): - add_future(self.serve_requests()) - - for future in asyncio.as_completed(self.futures): - try: - await future # Note: future is not one of self.futures - except asyncio.CancelledError: - break + add_future(await_bp_catchup()) + + # Perform a clean shutdown when this event is signalled. + await self.shutdown_event.wait() + self.logger.info('shutting down gracefully') + self.state = self.SHUTTING_DOWN + + # First tell the block processor to shut down, it may need to + # perform a lengthy flush. Then shut down the rest. + self.bp.on_shutdown() + self.close_servers(list(self.servers.keys())) + for future in futures: + future.cancel() - await self.shutdown() + # Now wait for the cleanup to complete + await self.close_sessions() + if not bp_future.done(): + self.logger.info('waiting for block processor') + await bp_future def close_servers(self, kinds): '''Close the servers of the given kinds (TCP etc.).''' + self.logger.info('closing down {} listening servers' + .format(', '.join(kinds))) for kind in kinds: server = self.servers.pop(kind, None) if server: server.close() - # Don't bother awaiting the close - we're not async + + async def close_sessions(self, secs=30): + if not self.sessions: + return + self.logger.info('waiting up to {:d} seconds for socket cleanup' + .format(secs)) + for session in self.sessions: + self.close_session(session) + limit = time.time() + secs + while self.sessions and time.time() < limit: + self.clear_stale_sessions(grace=secs//2) + await asyncio.sleep(2) + self.logger.info('{:,d} sessions remaining' + .format(len(self.sessions))) async def start_server(self, kind, *args, **kw_args): protocol_class = LocalRPC if kind == 'RPC' else ElectrumX @@ -236,12 +276,10 @@ class Controller(util.LoggedClass): self.logger.info('{} server listening on {}:{:d}' .format(kind, host, port)) - async def start_servers(self, caught_up): + async def start_servers(self): '''Start RPC, TCP and SSL servers once caught up.''' if self.env.rpc_port is not None: await self.start_server('RPC', 'localhost', self.env.rpc_port) - await caught_up.wait() - _socket.setdefaulttimeout(5) self.logger.info('max session count: {:,d}'.format(self.max_sessions)) self.logger.info('session timeout: {:,d} seconds' .format(self.env.session_timeout)) @@ -312,31 +350,6 @@ class Controller(util.LoggedClass): self.header_cache[height] = header return header - async def shutdown(self): - '''Call to shutdown everything. Returns when done.''' - self.state = self.SHUTTING_DOWN - self.bp.on_shutdown() - self.close_servers(list(self.servers.keys())) - # Don't cancel the block processor main loop - let it close itself - for future in self.futures[1:]: - future.cancel() - if self.sessions: - await self.close_sessions() - await self.futures[0] - - async def close_sessions(self, secs=30): - self.logger.info('cleanly closing client sessions, please wait...') - for session in self.sessions: - self.close_session(session) - self.logger.info('listening sockets closed, waiting up to ' - '{:d} seconds for socket cleanup'.format(secs)) - limit = time.time() + secs - while self.sessions and time.time() < limit: - self.clear_stale_sessions(grace=secs//2) - await asyncio.sleep(2) - self.logger.info('{:,d} sessions remaining' - .format(len(self.sessions))) - def add_session(self, session): now = time.time() if now > self.next_stale_check: @@ -559,6 +572,11 @@ class Controller(util.LoggedClass): ''' return self.for_each_session(session_ids, self.toggle_logging) + async def rpc_stop(self): + '''Shut down the server cleanly.''' + self.initiate_shutdown() + return 'stopping' + async def rpc_getinfo(self): '''Return summary information about the server process.''' return self.server_summary() diff --git a/server/daemon.py b/server/daemon.py index c456f44..39cb857 100644 --- a/server/daemon.py +++ b/server/daemon.py @@ -10,6 +10,7 @@ daemon.''' import asyncio import json +import traceback import aiohttp @@ -82,11 +83,14 @@ class Daemon(util.LoggedClass): except aiohttp.ClientConnectionError: log_error('connection problem - is your daemon running?') except self.DaemonWarmingUpError: - log_error('still starting up checking blocks.') + log_error('starting up checking blocks.') except (asyncio.CancelledError, DaemonError): raise except Exception as e: + self.log_error(traceback.format_exc()) + self.log_error('response was: {}'.format(resp)) log_error('request gave unexpected error: {}.'.format(e)) + if secs >= max_secs and len(self.urls) > 1: self.url_index = (self.url_index + 1) % len(self.urls) logged_url = self.logged_url(self.urls[self.url_index]) diff --git a/server/irc.py b/server/irc.py index 0e4ed48..31154d6 100644 --- a/server/irc.py +++ b/server/irc.py @@ -55,9 +55,8 @@ class IRC(LoggedClass): self.peer_regexp = re.compile('({}[^!]*)!'.format(self.prefix)) self.peers = {} - async def start(self, caught_up): - '''Start IRC connections once caught up if enabled in environment.''' - await caught_up.wait() + async def start(self): + '''Start IRC connections if enabled in environment.''' try: if self.env.irc: await self.join() diff --git a/server/version.py b/server/version.py index 6384119..dc25c60 100644 --- a/server/version.py +++ b/server/version.py @@ -1 +1 @@ -VERSION = "ElectrumX 0.10.4" +VERSION = "ElectrumX 0.10.5"