From 131601a7b2c9da4a53cb370d2d55a5bbb5dbd7e0 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Mon, 16 Jan 2017 21:13:01 +0900 Subject: [PATCH 1/3] Rework clean shutdown logic So that the main block processor future is cancellable. We wait for the executor and then flush anything unflushed. Resolves the rest of the second part of #100 --- server/block_processor.py | 20 ++++++-------------- server/controller.py | 32 ++++++++++++++++++-------------- server/db.py | 7 +++---- 3 files changed, 27 insertions(+), 32 deletions(-) diff --git a/server/block_processor.py b/server/block_processor.py index 157b5ab..6efabbb 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -151,7 +151,6 @@ class BlockProcessor(server.db.DB): self.caught_up_event = asyncio.Event() self.task_queue = asyncio.Queue() - self.stop = False # Meta self.cache_MB = env.cache_MB @@ -189,26 +188,19 @@ class BlockProcessor(server.db.DB): '''Called by the prefetcher when it first catches up.''' self.add_task(self.first_caught_up) - def on_shutdown(self): - '''Called by the controller to shut processing down.''' - async def do_nothing(): - pass - self.logger.info('preparing clean shutdown') - self.stop = True - # Ensure something is on the queue so main_loop notices self.stop - self.add_task(do_nothing) - async def main_loop(self): '''Main loop for block processing.''' await self.prefetcher.reset_height() - while not self.stop: + while True: task = await self.task_queue.get() await task() - self.logger.info('flushing state to DB for a clean shutdown...') - await self.executor(self.flush, True) - self.logger.info('shutdown complete') + def shutdown(self): + if self.height != self.db_height: + self.logger.info('flushing state to DB for a clean shutdown...') + self.flush(True) + self.logger.info('shutdown complete') async def executor(self, func, *args, **kwargs): '''Run func taking args in the executor.''' diff --git a/server/controller.py b/server/controller.py index ee8bf64..e14291a 100644 --- a/server/controller.py +++ b/server/controller.py @@ -13,6 +13,7 @@ import ssl import time from bisect import bisect_left from collections import defaultdict +from concurrent.futures import ThreadPoolExecutor from functools import partial import pylru @@ -53,6 +54,8 @@ class Controller(util.LoggedClass): # Set this event to cleanly shutdown self.shutdown_event = asyncio.Event() self.loop = asyncio.get_event_loop() + self.executor = ThreadPoolExecutor() + self.loop.set_default_executor(self.executor) self.start = time.time() self.coin = env.coin self.daemon = Daemon(env.coin.daemon_urls(env.daemon_url)) @@ -215,8 +218,8 @@ class Controller(util.LoggedClass): for n in range(4): add_future(self.serve_requests()) - bp_future = asyncio.ensure_future(self.bp.main_loop()) futures = [] + add_future(self.bp.main_loop()) add_future(self.bp.prefetcher.main_loop()) add_future(await_bp_catchup()) @@ -225,35 +228,36 @@ class Controller(util.LoggedClass): self.logger.info('shutting down gracefully') self.state = self.SHUTTING_DOWN - # First tell the block processor to shut down, it may need to - # perform a lengthy flush. Then shut down the rest. - self.bp.on_shutdown() + # Close servers and sessions self.close_servers(list(self.servers.keys())) + for session in self.sessions: + self.close_session(session) + + # Cancel the futures for future in futures: future.cancel() - # Now wait for the cleanup to complete - await self.close_sessions() - if not bp_future.done(): - self.logger.info('waiting for block processor') - await bp_future + await asyncio.wait(futures) + + # Wait for the executor to finish anything it's doing + self.executor.shutdown() + self.bp.shutdown() def close_servers(self, kinds): '''Close the servers of the given kinds (TCP etc.).''' - self.logger.info('closing down {} listening servers' - .format(', '.join(kinds))) + if kinds: + self.logger.info('closing down {} listening servers' + .format(', '.join(kinds))) for kind in kinds: server = self.servers.pop(kind, None) if server: server.close() - async def close_sessions(self, secs=30): + async def wait_for_sessions(self, secs=30): if not self.sessions: return self.logger.info('waiting up to {:d} seconds for socket cleanup' .format(secs)) - for session in self.sessions: - self.close_session(session) limit = time.time() + secs while self.sessions and time.time() < limit: self.clear_stale_sessions(grace=secs//2) diff --git a/server/db.py b/server/db.py index 0f920d5..44dacf3 100644 --- a/server/db.py +++ b/server/db.py @@ -123,7 +123,8 @@ class DB(util.LoggedClass): .format(util.formatted_time(self.wall_time))) def read_utxo_state(self): - if self.utxo_db.is_new: + state = self.utxo_db.get(b'state') + if not state: self.db_height = -1 self.db_tx_count = 0 self.db_tip = b'\0' * 32 @@ -132,9 +133,7 @@ class DB(util.LoggedClass): self.wall_time = 0 self.first_sync = True else: - state = self.utxo_db.get(b'state') - if state: - state = ast.literal_eval(state.decode()) + state = ast.literal_eval(state.decode()) if not isinstance(state, dict): raise self.DBError('failed reading state from DB') self.db_version = state['db_version'] From 65f927122b985a9af434031f1685af905badd8f6 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Mon, 16 Jan 2017 21:21:26 +0900 Subject: [PATCH 2/3] daemon: properly check for HTTP status codes --- server/daemon.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/server/daemon.py b/server/daemon.py index 39cb857..783dfba 100644 --- a/server/daemon.py +++ b/server/daemon.py @@ -70,10 +70,13 @@ class Daemon(util.LoggedClass): async with self.workqueue_semaphore: url = self.urls[self.url_index] async with aiohttp.post(url, data=data) as resp: - result = processor(await resp.json()) - if self.prior_msg: - self.logger.info('connection restored') - return result + if resp.status == 200: + if self.prior_msg: + self.logger.info('connection restored') + result = processor(await resp.json()) + return result + log_error('HTTP error code {:d}: {}' + .format(resp.status, resp.reason)) except asyncio.TimeoutError: log_error('timeout error.', skip_once=True) except aiohttp.ClientHttpProcessingError: From 1a9ac22fa0d245e00ea16d58a511224f5dc65483 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Mon, 16 Jan 2017 21:24:59 +0900 Subject: [PATCH 3/3] Prepare 0.10.6 --- README.rst | 9 ++++++++- server/version.py | 2 +- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/README.rst b/README.rst index e3de606..a3f2a22 100644 --- a/README.rst +++ b/README.rst @@ -137,10 +137,17 @@ version prior to the release of 1.0. ChangeLog ========= +Version 0.10.6 +-------------- + +* fix for rest of second part of issue `#100`_ +* check HTTP error codes from bitcoind and log appropriately +* don't error opening a new DB that has nothing written yet + Version 0.10.5 -------------- -* fix for second part of issue `#100`_ where the ElectrumX was not +* fix for some of second part of issue `#100`_ where the ElectrumX was not killable if bitcoind was unavailable diff --git a/server/version.py b/server/version.py index dc25c60..83b18bf 100644 --- a/server/version.py +++ b/server/version.py @@ -1 +1 @@ -VERSION = "ElectrumX 0.10.5" +VERSION = "ElectrumX 0.10.6"