From 6aef79461f7e768dc063b5a8df74e98b4efad401 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Sun, 27 Nov 2016 12:12:20 +0900 Subject: [PATCH] Don't shut down block processor by cancellation The block processor needs to be able to close cleanly, and not mid-block. In order to be able to yield whilst processing blocks we cannot forcefully close its coroutine with a cancellation. --- server/block_processor.py | 162 +++++++++++++++++++++----------------- server/db.py | 2 - server/protocol.py | 6 +- 3 files changed, 93 insertions(+), 77 deletions(-) diff --git a/server/block_processor.py b/server/block_processor.py index 362f76f..3e8e2f8 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -23,6 +23,10 @@ from lib.util import chunks, formatted_time, LoggedClass import server.db +# Tasks placed on task queue +BLOCKS, CAUGHT_UP = range(2) + + class ChainError(Exception): pass @@ -30,17 +34,19 @@ class ChainError(Exception): class Prefetcher(LoggedClass): '''Prefetches blocks (in the forward direction only).''' - def __init__(self, daemon, height): + def __init__(self, tasks, daemon, height): super().__init__() + self.tasks = tasks self.daemon = daemon self.semaphore = asyncio.Semaphore() - self.queue = asyncio.Queue() - self.queue_size = 0 self.caught_up = False self.fetched_height = height + # A list of (blocks, size) pairs. Earliest last. + self.cache = [] + self.cache_size = 0 # Target cache size. Has little effect on sync time. self.target_cache_size = 10 * 1024 * 1024 - # First fetch to be 10 blocks + # This makes the first fetch be 10 blocks self.ave_size = self.target_cache_size // 10 async def clear(self, height): @@ -53,19 +59,19 @@ class Prefetcher(LoggedClass): with await self.semaphore: while not self.queue.empty(): self.queue.get_nowait() - self.queue_size = 0 + self.cache = [] + self.cache_size = 0 self.fetched_height = height - self.caught_up = False + self.logger.info('reset to height'.format(height)) - async def get_blocks(self): - '''Blocking function that returns prefetched blocks. - - The returned result empty just once - when the prefetcher - has caught up with the daemon. - ''' - blocks, size = await self.queue.get() - self.queue_size -= size - return blocks + def get_blocks(self): + '''Return the next list of blocks from our prefetch cache.''' + # Cache might be empty after a clear() + if self.cache: + blocks, size = self.cache.pop() + self.cache_size -= size + return blocks + return [] async def main_loop(self): '''Loop forever polling for more blocks.''' @@ -73,9 +79,15 @@ class Prefetcher(LoggedClass): .format(await self.daemon.height())) while True: try: - with await self.semaphore: - await self._prefetch() - await asyncio.sleep(5 if self.caught_up else 0) + secs = 0 + if self.cache_size < self.target_cache_size: + if await self._prefetch(): + self.tasks.put_nowait(BLOCKS) + else: + self.tasks.put_nowait(CAUGHT_UP) + self.caught_up = True + secs = 5 + await asyncio.sleep(secs) except DaemonError as e: self.logger.info('ignoring daemon error: {}'.format(e)) except asyncio.CancelledError: @@ -83,40 +95,37 @@ class Prefetcher(LoggedClass): async def _prefetch(self): '''Prefetch blocks unless the prefetch queue is full.''' - if self.queue_size >= self.target_cache_size: - return - daemon_height = await self.daemon.height() cache_room = self.target_cache_size // self.ave_size - # Try and catch up all blocks but limit to room in cache. - # Constrain count to between 0 and 4000 regardless - count = min(daemon_height - self.fetched_height, cache_room) - count = min(4000, max(count, 0)) - if not count: - # Indicate when we have caught up for the first time only - if not self.caught_up: - self.caught_up = True - self.queue.put_nowait(([], 0)) - return + with await self.semaphore: + # Try and catch up all blocks but limit to room in cache. + # Constrain count to between 0 and 4000 regardless + count = min(daemon_height - self.fetched_height, cache_room) + count = min(4000, max(count, 0)) + if not count: + return 0 + + first = self.fetched_height + 1 + hex_hashes = await self.daemon.block_hex_hashes(first, count) + if self.caught_up: + self.logger.info('new block height {:,d} hash {}' + .format(first + count - 1, hex_hashes[-1])) + blocks = await self.daemon.raw_blocks(hex_hashes) - first = self.fetched_height + 1 - hex_hashes = await self.daemon.block_hex_hashes(first, count) - if self.caught_up: - self.logger.info('new block height {:,d} hash {}' - .format(first + count - 1, hex_hashes[-1])) - blocks = await self.daemon.raw_blocks(hex_hashes) - size = sum(len(block) for block in blocks) - - # Update our recent average block size estimate - if count >= 10: - self.ave_size = size // count - else: - self.ave_size = (size + (10 - count) * self.ave_size) // 10 + size = sum(len(block) for block in blocks) + + # Update our recent average block size estimate + if count >= 10: + self.ave_size = size // count + else: + self.ave_size = (size + (10 - count) * self.ave_size) // 10 - self.fetched_height += len(blocks) - self.queue.put_nowait((blocks, size)) - self.queue_size += size + self.cache.insert(0, (blocks, size)) + self.cache_size += size + self.fetched_height += len(blocks) + + return count class ChainReorg(Exception): @@ -135,6 +144,9 @@ class BlockProcessor(server.db.DB): self.client = client + # The block processor reads its tasks from this queue + self.tasks = asyncio.Queue() + # These are our state as we move ahead of DB state self.fs_height = self.db_height self.fs_tx_count = self.db_tx_count @@ -144,6 +156,7 @@ class BlockProcessor(server.db.DB): self.daemon = Daemon(self.coin.daemon_urls(env.daemon_url)) self.caught_up = False + self._shutdown = False self.event = asyncio.Event() # Meta @@ -154,7 +167,7 @@ class BlockProcessor(server.db.DB): # Headers and tx_hashes have one entry per block self.history = defaultdict(partial(array.array, 'I')) self.history_size = 0 - self.prefetcher = Prefetcher(self.daemon, self.height) + self.prefetcher = Prefetcher(self.tasks, self.daemon, self.height) self.last_flush = time.time() self.last_flush_tx_count = self.tx_count @@ -176,32 +189,37 @@ class BlockProcessor(server.db.DB): async def main_loop(self): '''Main loop for block processing.''' - try: - # Simulate a reorg if requested - if self.env.force_reorg > 0: - self.logger.info('DEBUG: simulating reorg of {:,d} blocks' - .format(self.env.force_reorg)) - await self.handle_chain_reorg(self.env.force_reorg, set()) - - while True: - await self._wait_for_update() - except asyncio.CancelledError: - pass - - async def shutdown(self): - '''Shut down the DB cleanly.''' - self.logger.info('flushing state to DB for clean shutdown...') + + # Simulate a reorg if requested + if self.env.force_reorg > 0: + self.logger.info('DEBUG: simulating reorg of {:,d} blocks' + .format(self.env.force_reorg)) + await self.handle_chain_reorg(self.env.force_reorg, set()) + + while True: + task = await self.tasks.get() + if self._shutdown: + break + if task == BLOCKS: + await self.advance_blocks() + else: + assert task == CAUGHT_UP + if not self.caught_up: + self.caught_up = True + self.first_caught_up() + self.flush(True) - async def _wait_for_update(self): - '''Wait for the prefetcher to deliver blocks. + def shutdown(self): + '''Call to shut down the block processor.''' + self.logger.info('flushing state to DB for clean shutdown...') + self._shutdown = True + # Ensure we don't sit waiting for a task + self.tasks.put_nowait(BLOCKS) - Blocks are only processed in the forward direction. - ''' - blocks = await self.prefetcher.get_blocks() - if not blocks: - self.first_caught_up() - return + async def advance_blocks(self): + '''Take blocks from the prefetcher and process them.''' + blocks = self.prefetcher.get_blocks() '''Strip the unspendable genesis coinbase.''' if self.height == -1: @@ -226,7 +244,6 @@ class BlockProcessor(server.db.DB): def first_caught_up(self): '''Called when first caught up after start, or after a reorg.''' - self.caught_up = True if self.first_sync: self.first_sync = False self.logger.info('{} synced to height {:,d}' @@ -253,7 +270,6 @@ class BlockProcessor(server.db.DB): self.backup_blocks(blocks, touched) await self.prefetcher.clear(self.height) - self.logger.info('prefetcher reset') async def reorg_hashes(self, count): '''Return the list of hashes to back up beacuse of a reorg. diff --git a/server/db.py b/server/db.py index 677ca72..4c2f64a 100644 --- a/server/db.py +++ b/server/db.py @@ -270,8 +270,6 @@ class DB(LoggedClass): cursor += size file_pos += size - os.sync() - def read_headers(self, start, count): '''Requires count >= 0.''' # Read some from disk diff --git a/server/protocol.py b/server/protocol.py index 7e95a19..bd84a1a 100644 --- a/server/protocol.py +++ b/server/protocol.py @@ -254,6 +254,7 @@ class ServerManager(util.LoggedClass): def add_future(coro): self.futures.append(asyncio.ensure_future(coro)) + # shutdown() assumes bp.main_loop() is first add_future(self.bp.main_loop()) add_future(self.bp.prefetcher.main_loop()) add_future(self.mempool.main_loop(self.bp.event)) @@ -316,7 +317,9 @@ class ServerManager(util.LoggedClass): async def shutdown(self): '''Call to shutdown the servers. Returns when done.''' - for future in self.futures: + self.bp.shutdown() + # Don't cancel the block processor main loop - let it close itself + for future in self.futures[1:]: future.cancel() for server in self.servers: server.close() @@ -326,7 +329,6 @@ class ServerManager(util.LoggedClass): await asyncio.sleep(0) if self.sessions: await self.close_sessions() - await self.bp.shutdown() async def close_sessions(self, secs=60): self.logger.info('cleanly closing client sessions, please wait...')