Browse Source

Clean up shutdown process of the block processor

- enables removal of executor code
- clarify flush guarantees
patch-2
Neil Booth 7 years ago
parent
commit
530c7cac6f
  1. 3
      docs/changelog.rst
  2. 59
      electrumx/lib/server_base.py
  3. 12
      electrumx/lib/tasks.py
  4. 64
      electrumx/server/block_processor.py
  5. 8
      electrumx/server/chain_state.py
  6. 16
      electrumx/server/controller.py
  7. 5
      electrumx/server/session.py

3
docs/changelog.rst

@ -13,6 +13,9 @@ Version 1.7 (in progress)
Version 1.6.1 (in progress)
============================
* cleaner shutdown process with clear guarantees
* add Motion coin (ocruzv)
Version 1.6 (19 July 2018)
===========================

59
electrumx/lib/server_base.py

@ -12,6 +12,7 @@ import sys
import time
from functools import partial
from electrumx.lib.tasks import Tasks
from electrumx.lib.util import class_logger
@ -37,9 +38,14 @@ class ServerBase(object):
'''Save the environment, perform basic sanity checks, and set the
event loop policy.
'''
# First asyncio operation must be to set the event loop policy
# as this replaces the event loop
asyncio.set_event_loop_policy(env.loop_policy)
self.logger = class_logger(__name__, self.__class__.__name__)
self.logger.info(f'Python version: {sys.version}')
self.env = env
self.tasks = Tasks()
# Sanity checks
if sys.version_info < self.PYTHON_MIN_VERSION:
@ -53,10 +59,6 @@ class ServerBase(object):
'To continue as root anyway, restart with '
'environment variable ALLOW_ROOT non-empty')
# First asyncio operation must be to set the event loop policy
# as this replaces the event loop
asyncio.set_event_loop_policy(self.env.loop_policy)
# Trigger this event to cleanly shutdown
self.shutdown_event = asyncio.Event()
@ -69,26 +71,6 @@ class ServerBase(object):
'''Override to perform the shutdown sequence, if any.'''
pass
async def _wait_for_shutdown_event(self):
'''Wait for shutdown to be signalled, and log it.
Derived classes may want to provide a shutdown() coroutine.'''
# Shut down cleanly after waiting for shutdown to be signalled
await self.shutdown_event.wait()
self.logger.info('shutting down')
# Wait for the shutdown sequence
await self.shutdown()
# Finally, work around an apparent asyncio bug that causes log
# spew on shutdown for partially opened SSL sockets
try:
del asyncio.sslproto._SSLProtocolTransport.__del__
except Exception:
pass
self.logger.info('shutdown complete')
def on_signal(self, signame):
'''Call on receipt of a signal to cleanly shutdown.'''
self.logger.warning('received {} signal, initiating shutdown'
@ -104,7 +86,7 @@ class ServerBase(object):
return
loop.default_exception_handler(context)
def run(self):
async def _main(self, loop):
'''Run the server application:
- record start time
@ -116,13 +98,32 @@ class ServerBase(object):
'''
self.start_time = time.time()
loop = asyncio.get_event_loop()
for signame in ('SIGINT', 'SIGTERM'):
loop.add_signal_handler(getattr(signal, signame),
partial(self.on_signal, signame))
loop.set_exception_handler(self.on_exception)
loop.run_until_complete(self.start_servers())
loop.run_until_complete(self._wait_for_shutdown_event())
self.tasks.create_task(self.start_servers())
# Wait for shutdown to be signalled, and log it.
# Derived classes may want to provide a shutdown() coroutine.
await self.shutdown_event.wait()
self.logger.info('shutting down')
await self.shutdown()
# Let the loop clean itself up; prevents some silly logs
await asyncio.sleep(0.001)
# Finally, work around an apparent asyncio bug that causes log
# spew on shutdown for partially opened SSL sockets
try:
del asyncio.sslproto._SSLProtocolTransport.__del__
except Exception:
pass
self.logger.info('shutdown complete')
def run(self):
loop = asyncio.get_event_loop()
loop.run_until_complete(self._main(loop))
loop.close()

12
electrumx/lib/tasks.py

@ -26,8 +26,6 @@
'''Concurrency via tasks and threads.'''
from concurrent.futures import ThreadPoolExecutor
from aiorpcx import TaskSet
import electrumx.lib.util as util
@ -40,12 +38,8 @@ class Tasks(object):
def __init__(self, *, loop=None):
self.tasks = TaskSet(loop=loop)
self.logger = util.class_logger(__name__, self.__class__.__name__)
# FIXME: is the executor still needed?
self.executor = ThreadPoolExecutor()
self.tasks.loop.set_default_executor(self.executor)
# Pass through until integrated
self.loop = self.tasks.loop
self.cancel_all = self.tasks.cancel_all
self.wait = self.tasks.wait
async def run_in_thread(self, func, *args):
@ -65,3 +59,9 @@ class Tasks(object):
task.result()
except Exception as e:
self.logger.exception(f'uncaught task exception: {e}')
async def cancel_all(self, wait=True):
'''Cancels all tasks and waits for them to complete.'''
self.tasks.cancel_all()
if wait:
await self.tasks.wait()

64
electrumx/server/block_processor.py

@ -154,6 +154,7 @@ class BlockProcessor(electrumx.server.db.DB):
self._caught_up_event = asyncio.Event()
self.task_queue = asyncio.Queue()
self.prefetcher = Prefetcher(self)
# Meta
self.cache_MB = env.cache_MB
@ -175,7 +176,10 @@ class BlockProcessor(electrumx.server.db.DB):
self.utxo_cache = {}
self.db_deletes = []
self.prefetcher = Prefetcher(self)
# If the lock is successfully acquired, in-memory chain state
# is consistent with self.height
self.state_lock = asyncio.Lock()
self.worker_task = None
def add_task(self, task):
'''Add the task to our task queue.'''
@ -201,15 +205,6 @@ class BlockProcessor(electrumx.server.db.DB):
'''
self.callbacks.append(callback)
def shutdown(self, executor):
'''Shutdown cleanly and flush to disk.'''
# First stut down the executor; it may be processing a block.
# Then we can flush anything remaining to disk.
executor.shutdown()
if self.height != self.db_height:
self.logger.info('flushing state to DB for a clean shutdown...')
self.flush(True)
async def check_and_advance_blocks(self, raw_blocks, first):
'''Process the list of raw blocks passed. Detects and handles
reorgs.
@ -232,7 +227,8 @@ class BlockProcessor(electrumx.server.db.DB):
if hprevs == chain:
start = time.time()
await self.tasks.run_in_thread(self.advance_blocks, blocks)
async with self.state_lock:
await self.tasks.run_in_thread(self.advance_blocks, blocks)
if not self.first_sync:
s = '' if len(blocks) == 1 else 's'
self.logger.info('processed {:,d} block{} in {:.1f}s'
@ -253,16 +249,6 @@ class BlockProcessor(electrumx.server.db.DB):
'resetting the prefetcher')
await self.prefetcher.reset_height()
def force_chain_reorg(self, count):
'''Force a reorg of the given number of blocks.
Returns True if a reorg is queued, false if not caught up.
'''
if self._caught_up_event.is_set():
self.add_task(partial(self.reorg_chain, count=count))
return True
return False
async def reorg_chain(self, count=None):
'''Handle a chain reorganisation.
@ -289,7 +275,8 @@ class BlockProcessor(electrumx.server.db.DB):
last = start + count - 1
for hex_hashes in chunks(hashes, 50):
raw_blocks = await get_raw_blocks(last, hex_hashes)
await self.tasks.run_in_thread(self.backup_blocks, raw_blocks)
async with self.state_lock:
await self.tasks.run_in_thread(self.backup_blocks, raw_blocks)
last -= len(raw_blocks)
# Truncate header_mc: header count is 1 more than the height
self.header_mc.truncate(self.height + 1)
@ -770,8 +757,8 @@ class BlockProcessor(electrumx.server.db.DB):
self.db_height = self.height
self.db_tip = self.tip
async def _process_blocks_forever(self):
'''Loop forever processing blocks.'''
async def _process_queue(self):
'''Loop forever processing enqueued work.'''
while True:
task = await self.task_queue.get()
await task()
@ -805,13 +792,13 @@ class BlockProcessor(electrumx.server.db.DB):
self.tasks.create_task(self.prefetcher.main_loop())
await self.prefetcher.reset_height()
# Start our loop that processes blocks as they are fetched
self.tasks.create_task(self._process_blocks_forever())
self.worker_task = self.tasks.create_task(self._process_queue())
# Wait until caught up
await self._caught_up_event.wait()
# Flush everything but with first_sync->False state.
first_sync = self.first_sync
self.first_sync = False
await self.tasks.run_in_thread(self.flush, True)
self.flush(True)
if first_sync:
self.logger.info(f'{electrumx.version} synced to '
f'height {self.height:,d}')
@ -822,3 +809,28 @@ class BlockProcessor(electrumx.server.db.DB):
length = max(1, self.height - self.env.reorg_limit)
self.header_mc = MerkleCache(self.merkle, HeaderSource(self), length)
self.logger.info('populated header merkle cache')
def force_chain_reorg(self, count):
'''Force a reorg of the given number of blocks.
Returns True if a reorg is queued, false if not caught up.
'''
if self._caught_up_event.is_set():
self.add_task(partial(self.reorg_chain, count=count))
return True
return False
async def shutdown(self):
'''Shutdown cleanly and flush to disk.
If during initial sync ElectrumX is asked to shut down when a
large number of blocks have been processed but not written to
disk, it should write those to disk before exiting, as
otherwise a significant amount of work could be lost.
'''
if self.worker_task:
async with self.state_lock:
# Shut down block processing
self.worker_task.cancel()
self.logger.info('flushing to DB for a clean shutdown...')
self.flush(True)

8
electrumx/server/chain_state.py

@ -16,10 +16,9 @@ class ChainState(object):
blocks, transaction history, UTXOs and the mempool.
'''
def __init__(self, env, tasks, shutdown_event):
def __init__(self, env, tasks):
self.env = env
self.tasks = tasks
self.shutdown_event = shutdown_event
self.daemon = env.coin.DAEMON(env)
BlockProcessor = env.coin.BLOCK_PROCESSOR
self.bp = BlockProcessor(env, tasks, self.daemon)
@ -103,8 +102,9 @@ class ChainState(object):
self.daemon.set_urls(self.env.coin.daemon_urls(daemon_url))
return self.daemon.logged_url()
def shutdown(self):
self.tasks.loop.call_soon(self.shutdown_event.set)
async def shutdown(self):
'''Shut down the block processor to flush chain state to disk.'''
await self.bp.shutdown()
async def wait_for_mempool(self):
await self.bp.catch_up_to_daemon()

16
electrumx/server/controller.py

@ -9,7 +9,6 @@ from aiorpcx import _version as aiorpcx_version
import electrumx
from electrumx.lib.server_base import ServerBase
from electrumx.lib.tasks import Tasks
from electrumx.lib.util import version_string
from electrumx.server.chain_state import ChainState
from electrumx.server.peers import PeerManager
@ -39,11 +38,10 @@ class Controller(ServerBase):
self.logger.info(f'supported protocol versions: {min_str}-{max_str}')
self.logger.info(f'event loop policy: {env.loop_policy}')
self.tasks = Tasks()
self.chain_state = ChainState(env, self.tasks, self.shutdown_event)
self.chain_state = ChainState(env, self.tasks)
self.peer_mgr = PeerManager(env, self.tasks, self.chain_state)
self.session_mgr = SessionManager(env, self.tasks, self.chain_state,
self.peer_mgr)
self.peer_mgr, self.shutdown_event)
async def start_servers(self):
'''Start the RPC server and wait for the mempool to synchronize. Then
@ -56,9 +54,9 @@ class Controller(ServerBase):
async def shutdown(self):
'''Perform the shutdown sequence.'''
# Not certain of ordering here
self.tasks.cancel_all()
# Close servers and connections - main source of new task creation
await self.session_mgr.shutdown()
await self.tasks.wait()
# Finally shut down the block processor and executor (FIXME)
self.chain_state.bp.shutdown(self.tasks.executor)
# Flush chain state to disk
await self.chain_state.shutdown()
# Cancel all tasks; this shuts down the peer manager and prefetcher
await self.tasks.cancel_all(wait=True)

5
electrumx/server/session.py

@ -97,12 +97,13 @@ class SessionManager(object):
CATCHING_UP, LISTENING, PAUSED, SHUTTING_DOWN = range(4)
def __init__(self, env, tasks, chain_state, peer_mgr):
def __init__(self, env, tasks, chain_state, peer_mgr, shutdown_event):
env.max_send = max(350000, env.max_send)
self.env = env
self.tasks = tasks
self.chain_state = chain_state
self.peer_mgr = peer_mgr
self.shutdown_event = shutdown_event
self.logger = util.class_logger(__name__, self.__class__.__name__)
self.servers = {}
self.sessions = set()
@ -361,7 +362,7 @@ class SessionManager(object):
def rpc_stop(self):
'''Shut down the server cleanly.'''
self.chain_state.shutdown()
self.shutdown_event.set()
return 'stopping'
def rpc_getinfo(self):

Loading…
Cancel
Save