Browse Source

Tweak the shutdown process

Clean it up a bit and make it harder to do wrongly.
master
Neil Booth 8 years ago
parent
commit
4729ba2e21
  1. 7
      server/block_processor.py
  2. 29
      server/controller.py

7
server/block_processor.py

@ -196,11 +196,14 @@ class BlockProcessor(server.db.DB):
task = await self.task_queue.get() task = await self.task_queue.get()
await task() await task()
def shutdown(self): def shutdown(self, executor):
'''Shutdown cleanly and flush to disk.'''
# First stut down the executor; it may be processing a block.
# Then we can flush anything remaining to disk.
executor.shutdown()
if self.height != self.db_height: if self.height != self.db_height:
self.logger.info('flushing state to DB for a clean shutdown...') self.logger.info('flushing state to DB for a clean shutdown...')
self.flush(True) self.flush(True)
self.logger.info('shutdown complete')
async def executor(self, func, *args, **kwargs): async def executor(self, func, *args, **kwargs):
'''Run func taking args in the executor.''' '''Run func taking args in the executor.'''

29
server/controller.py

@ -225,7 +225,13 @@ class Controller(util.LoggedClass):
# Perform a clean shutdown when this event is signalled. # Perform a clean shutdown when this event is signalled.
await self.shutdown_event.wait() await self.shutdown_event.wait()
self.logger.info('shutting down gracefully')
self.logger.info('shutting down')
await self.shutdown(futures)
self.logger.info('shutdown complete')
async def shutdown(self, futures):
'''Perform the shutdown sequence.'''
self.state = self.SHUTTING_DOWN self.state = self.SHUTTING_DOWN
# Close servers and sessions # Close servers and sessions
@ -237,11 +243,12 @@ class Controller(util.LoggedClass):
for future in futures: for future in futures:
future.cancel() future.cancel()
await asyncio.wait(futures) # Wait for all futures to finish
while any(not future.done() for future in futures):
await asyncio.sleep(1)
# Wait for the executor to finish anything it's doing # Finally shut down the block processor and executor
self.executor.shutdown() self.bp.shutdown(self.executor)
self.bp.shutdown()
def close_servers(self, kinds): def close_servers(self, kinds):
'''Close the servers of the given kinds (TCP etc.).''' '''Close the servers of the given kinds (TCP etc.).'''
@ -253,18 +260,6 @@ class Controller(util.LoggedClass):
if server: if server:
server.close() server.close()
async def wait_for_sessions(self, secs=30):
if not self.sessions:
return
self.logger.info('waiting up to {:d} seconds for socket cleanup'
.format(secs))
limit = time.time() + secs
while self.sessions and time.time() < limit:
self.clear_stale_sessions(grace=secs//2)
await asyncio.sleep(2)
self.logger.info('{:,d} sessions remaining'
.format(len(self.sessions)))
async def start_server(self, kind, *args, **kw_args): async def start_server(self, kind, *args, **kw_args):
protocol_class = LocalRPC if kind == 'RPC' else ElectrumX protocol_class = LocalRPC if kind == 'RPC' else ElectrumX
protocol = partial(protocol_class, self, self.bp, self.env, kind) protocol = partial(protocol_class, self, self.bp, self.env, kind)

Loading…
Cancel
Save