Browse Source

Rework clean shutdown logic

So that the main block processor future is cancellable.
We wait for the executor and then flush anything unflushed.
Resolves the rest of the second part of #100
master
Neil Booth 8 years ago
parent
commit
131601a7b2
  1. 20
      server/block_processor.py
  2. 32
      server/controller.py
  3. 7
      server/db.py

20
server/block_processor.py

@ -151,7 +151,6 @@ class BlockProcessor(server.db.DB):
self.caught_up_event = asyncio.Event() self.caught_up_event = asyncio.Event()
self.task_queue = asyncio.Queue() self.task_queue = asyncio.Queue()
self.stop = False
# Meta # Meta
self.cache_MB = env.cache_MB self.cache_MB = env.cache_MB
@ -189,26 +188,19 @@ class BlockProcessor(server.db.DB):
'''Called by the prefetcher when it first catches up.''' '''Called by the prefetcher when it first catches up.'''
self.add_task(self.first_caught_up) self.add_task(self.first_caught_up)
def on_shutdown(self):
'''Called by the controller to shut processing down.'''
async def do_nothing():
pass
self.logger.info('preparing clean shutdown')
self.stop = True
# Ensure something is on the queue so main_loop notices self.stop
self.add_task(do_nothing)
async def main_loop(self): async def main_loop(self):
'''Main loop for block processing.''' '''Main loop for block processing.'''
await self.prefetcher.reset_height() await self.prefetcher.reset_height()
while not self.stop: while True:
task = await self.task_queue.get() task = await self.task_queue.get()
await task() await task()
self.logger.info('flushing state to DB for a clean shutdown...') def shutdown(self):
await self.executor(self.flush, True) if self.height != self.db_height:
self.logger.info('shutdown complete') self.logger.info('flushing state to DB for a clean shutdown...')
self.flush(True)
self.logger.info('shutdown complete')
async def executor(self, func, *args, **kwargs): async def executor(self, func, *args, **kwargs):
'''Run func taking args in the executor.''' '''Run func taking args in the executor.'''

32
server/controller.py

@ -13,6 +13,7 @@ import ssl
import time import time
from bisect import bisect_left from bisect import bisect_left
from collections import defaultdict from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor
from functools import partial from functools import partial
import pylru import pylru
@ -53,6 +54,8 @@ class Controller(util.LoggedClass):
# Set this event to cleanly shutdown # Set this event to cleanly shutdown
self.shutdown_event = asyncio.Event() self.shutdown_event = asyncio.Event()
self.loop = asyncio.get_event_loop() self.loop = asyncio.get_event_loop()
self.executor = ThreadPoolExecutor()
self.loop.set_default_executor(self.executor)
self.start = time.time() self.start = time.time()
self.coin = env.coin self.coin = env.coin
self.daemon = Daemon(env.coin.daemon_urls(env.daemon_url)) self.daemon = Daemon(env.coin.daemon_urls(env.daemon_url))
@ -215,8 +218,8 @@ class Controller(util.LoggedClass):
for n in range(4): for n in range(4):
add_future(self.serve_requests()) add_future(self.serve_requests())
bp_future = asyncio.ensure_future(self.bp.main_loop())
futures = [] futures = []
add_future(self.bp.main_loop())
add_future(self.bp.prefetcher.main_loop()) add_future(self.bp.prefetcher.main_loop())
add_future(await_bp_catchup()) add_future(await_bp_catchup())
@ -225,35 +228,36 @@ class Controller(util.LoggedClass):
self.logger.info('shutting down gracefully') self.logger.info('shutting down gracefully')
self.state = self.SHUTTING_DOWN self.state = self.SHUTTING_DOWN
# First tell the block processor to shut down, it may need to # Close servers and sessions
# perform a lengthy flush. Then shut down the rest.
self.bp.on_shutdown()
self.close_servers(list(self.servers.keys())) self.close_servers(list(self.servers.keys()))
for session in self.sessions:
self.close_session(session)
# Cancel the futures
for future in futures: for future in futures:
future.cancel() future.cancel()
# Now wait for the cleanup to complete await asyncio.wait(futures)
await self.close_sessions()
if not bp_future.done(): # Wait for the executor to finish anything it's doing
self.logger.info('waiting for block processor') self.executor.shutdown()
await bp_future self.bp.shutdown()
def close_servers(self, kinds): def close_servers(self, kinds):
'''Close the servers of the given kinds (TCP etc.).''' '''Close the servers of the given kinds (TCP etc.).'''
self.logger.info('closing down {} listening servers' if kinds:
.format(', '.join(kinds))) self.logger.info('closing down {} listening servers'
.format(', '.join(kinds)))
for kind in kinds: for kind in kinds:
server = self.servers.pop(kind, None) server = self.servers.pop(kind, None)
if server: if server:
server.close() server.close()
async def close_sessions(self, secs=30): async def wait_for_sessions(self, secs=30):
if not self.sessions: if not self.sessions:
return return
self.logger.info('waiting up to {:d} seconds for socket cleanup' self.logger.info('waiting up to {:d} seconds for socket cleanup'
.format(secs)) .format(secs))
for session in self.sessions:
self.close_session(session)
limit = time.time() + secs limit = time.time() + secs
while self.sessions and time.time() < limit: while self.sessions and time.time() < limit:
self.clear_stale_sessions(grace=secs//2) self.clear_stale_sessions(grace=secs//2)

7
server/db.py

@ -123,7 +123,8 @@ class DB(util.LoggedClass):
.format(util.formatted_time(self.wall_time))) .format(util.formatted_time(self.wall_time)))
def read_utxo_state(self): def read_utxo_state(self):
if self.utxo_db.is_new: state = self.utxo_db.get(b'state')
if not state:
self.db_height = -1 self.db_height = -1
self.db_tx_count = 0 self.db_tx_count = 0
self.db_tip = b'\0' * 32 self.db_tip = b'\0' * 32
@ -132,9 +133,7 @@ class DB(util.LoggedClass):
self.wall_time = 0 self.wall_time = 0
self.first_sync = True self.first_sync = True
else: else:
state = self.utxo_db.get(b'state') state = ast.literal_eval(state.decode())
if state:
state = ast.literal_eval(state.decode())
if not isinstance(state, dict): if not isinstance(state, dict):
raise self.DBError('failed reading state from DB') raise self.DBError('failed reading state from DB')
self.db_version = state['db_version'] self.db_version = state['db_version']

Loading…
Cancel
Save