diff --git a/README.rst b/README.rst index e3de606..a3f2a22 100644 --- a/README.rst +++ b/README.rst @@ -137,10 +137,17 @@ version prior to the release of 1.0. ChangeLog ========= +Version 0.10.6 +-------------- + +* fix for rest of second part of issue `#100`_ +* check HTTP error codes from bitcoind and log appropriately +* don't error opening a new DB that has nothing written yet + Version 0.10.5 -------------- -* fix for second part of issue `#100`_ where the ElectrumX was not +* fix for some of second part of issue `#100`_ where the ElectrumX was not killable if bitcoind was unavailable diff --git a/server/block_processor.py b/server/block_processor.py index 157b5ab..6efabbb 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -151,7 +151,6 @@ class BlockProcessor(server.db.DB): self.caught_up_event = asyncio.Event() self.task_queue = asyncio.Queue() - self.stop = False # Meta self.cache_MB = env.cache_MB @@ -189,26 +188,19 @@ class BlockProcessor(server.db.DB): '''Called by the prefetcher when it first catches up.''' self.add_task(self.first_caught_up) - def on_shutdown(self): - '''Called by the controller to shut processing down.''' - async def do_nothing(): - pass - self.logger.info('preparing clean shutdown') - self.stop = True - # Ensure something is on the queue so main_loop notices self.stop - self.add_task(do_nothing) - async def main_loop(self): '''Main loop for block processing.''' await self.prefetcher.reset_height() - while not self.stop: + while True: task = await self.task_queue.get() await task() - self.logger.info('flushing state to DB for a clean shutdown...') - await self.executor(self.flush, True) - self.logger.info('shutdown complete') + def shutdown(self): + if self.height != self.db_height: + self.logger.info('flushing state to DB for a clean shutdown...') + self.flush(True) + self.logger.info('shutdown complete') async def executor(self, func, *args, **kwargs): '''Run func taking args in the executor.''' diff --git a/server/controller.py b/server/controller.py index ee8bf64..e14291a 100644 --- a/server/controller.py +++ b/server/controller.py @@ -13,6 +13,7 @@ import ssl import time from bisect import bisect_left from collections import defaultdict +from concurrent.futures import ThreadPoolExecutor from functools import partial import pylru @@ -53,6 +54,8 @@ class Controller(util.LoggedClass): # Set this event to cleanly shutdown self.shutdown_event = asyncio.Event() self.loop = asyncio.get_event_loop() + self.executor = ThreadPoolExecutor() + self.loop.set_default_executor(self.executor) self.start = time.time() self.coin = env.coin self.daemon = Daemon(env.coin.daemon_urls(env.daemon_url)) @@ -215,8 +218,8 @@ class Controller(util.LoggedClass): for n in range(4): add_future(self.serve_requests()) - bp_future = asyncio.ensure_future(self.bp.main_loop()) futures = [] + add_future(self.bp.main_loop()) add_future(self.bp.prefetcher.main_loop()) add_future(await_bp_catchup()) @@ -225,35 +228,36 @@ class Controller(util.LoggedClass): self.logger.info('shutting down gracefully') self.state = self.SHUTTING_DOWN - # First tell the block processor to shut down, it may need to - # perform a lengthy flush. Then shut down the rest. - self.bp.on_shutdown() + # Close servers and sessions self.close_servers(list(self.servers.keys())) + for session in self.sessions: + self.close_session(session) + + # Cancel the futures for future in futures: future.cancel() - # Now wait for the cleanup to complete - await self.close_sessions() - if not bp_future.done(): - self.logger.info('waiting for block processor') - await bp_future + await asyncio.wait(futures) + + # Wait for the executor to finish anything it's doing + self.executor.shutdown() + self.bp.shutdown() def close_servers(self, kinds): '''Close the servers of the given kinds (TCP etc.).''' - self.logger.info('closing down {} listening servers' - .format(', '.join(kinds))) + if kinds: + self.logger.info('closing down {} listening servers' + .format(', '.join(kinds))) for kind in kinds: server = self.servers.pop(kind, None) if server: server.close() - async def close_sessions(self, secs=30): + async def wait_for_sessions(self, secs=30): if not self.sessions: return self.logger.info('waiting up to {:d} seconds for socket cleanup' .format(secs)) - for session in self.sessions: - self.close_session(session) limit = time.time() + secs while self.sessions and time.time() < limit: self.clear_stale_sessions(grace=secs//2) diff --git a/server/daemon.py b/server/daemon.py index 39cb857..783dfba 100644 --- a/server/daemon.py +++ b/server/daemon.py @@ -70,10 +70,13 @@ class Daemon(util.LoggedClass): async with self.workqueue_semaphore: url = self.urls[self.url_index] async with aiohttp.post(url, data=data) as resp: - result = processor(await resp.json()) - if self.prior_msg: - self.logger.info('connection restored') - return result + if resp.status == 200: + if self.prior_msg: + self.logger.info('connection restored') + result = processor(await resp.json()) + return result + log_error('HTTP error code {:d}: {}' + .format(resp.status, resp.reason)) except asyncio.TimeoutError: log_error('timeout error.', skip_once=True) except aiohttp.ClientHttpProcessingError: diff --git a/server/db.py b/server/db.py index 0f920d5..44dacf3 100644 --- a/server/db.py +++ b/server/db.py @@ -123,7 +123,8 @@ class DB(util.LoggedClass): .format(util.formatted_time(self.wall_time))) def read_utxo_state(self): - if self.utxo_db.is_new: + state = self.utxo_db.get(b'state') + if not state: self.db_height = -1 self.db_tx_count = 0 self.db_tip = b'\0' * 32 @@ -132,9 +133,7 @@ class DB(util.LoggedClass): self.wall_time = 0 self.first_sync = True else: - state = self.utxo_db.get(b'state') - if state: - state = ast.literal_eval(state.decode()) + state = ast.literal_eval(state.decode()) if not isinstance(state, dict): raise self.DBError('failed reading state from DB') self.db_version = state['db_version'] diff --git a/server/version.py b/server/version.py index dc25c60..83b18bf 100644 --- a/server/version.py +++ b/server/version.py @@ -1 +1 @@ -VERSION = "ElectrumX 0.10.5" +VERSION = "ElectrumX 0.10.6"