From 39af7a74630f63c57e6183aa9c14d65077d30f8c Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Sat, 17 Dec 2016 16:49:49 +0900 Subject: [PATCH] Rework main block processor loop It's less awkward and more explicit. This brings back the efficiency lost in the 0.9.x series. It also removes the special case hack: both when syncing and caught up, block processing is done in the executor. Fixes #58 --- server/block_processor.py | 186 +++++++++++++++++++------------------- server/protocol.py | 6 +- 2 files changed, 96 insertions(+), 96 deletions(-) diff --git a/server/block_processor.py b/server/block_processor.py index f223fcf..d427dd7 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -26,81 +26,95 @@ import server.db class Prefetcher(LoggedClass): '''Prefetches blocks (in the forward direction only).''' - def __init__(self, tasks, daemon, height): + def __init__(self, coin, daemon, height): super().__init__() - self.tasks = tasks + self.coin = coin self.daemon = daemon - self.semaphore = asyncio.Semaphore() self.caught_up = False + # Access to fetched_height should be protected by the semaphore self.fetched_height = height - # A list of (blocks, size) pairs. Earliest last. - self.cache = [] + self.semaphore = asyncio.Semaphore() + self.refill_event = asyncio.Event() + # A cache queue of (blocks, size) pairs. The target cache + # size has little effect on sync time. + self.cache = asyncio.Queue() self.cache_size = 0 - # Target cache size. Has little effect on sync time. - self.target_cache_size = 10 * 1024 * 1024 + self.min_cache_size = 10 * 1024 * 1024 # This makes the first fetch be 10 blocks - self.ave_size = self.target_cache_size // 10 + self.ave_size = self.min_cache_size // 10 async def clear(self, height): '''Clear prefetched blocks and restart from the given height. Used in blockchain reorganisations. This coroutine can be called asynchronously to the _prefetch coroutine so we must - synchronize. + synchronize with a semaphore. + + Set height to -1 when shutting down to place a sentinel on the + queue to tell the block processor to shut down too. ''' with await self.semaphore: - while not self.tasks.empty(): - self.tasks.get_nowait() - self.cache = [] + while not self.cache.empty(): + self.cache.get_nowait() self.cache_size = 0 - self.fetched_height = height - self.logger.info('reset to height'.format(height)) - - def get_blocks(self): - '''Return the next list of blocks from our prefetch cache.''' - # Cache might be empty after a clear() - if self.cache: - blocks, size = self.cache.pop() - self.cache_size -= size - return blocks - return [] + if height == -1: + self.cache.put_nowait((None, 0)) + else: + self.refill_event.set() + self.fetched_height = height + self.logger.info('reset to height'.format(height)) + + async def get_blocks(self): + '''Return the next list of blocks from our prefetch cache. + + A return value of None indicates to shut down. Once caught up + an entry is queued every few seconds synchronized with mempool + refreshes to indicate a new mempool is available. Of course + the list of blocks in such a case will normally be empty.''' + blocks, size = await self.cache.get() + self.cache_size -= size + if self.cache_size < self.min_cache_size: + self.refill_event.set() + return blocks async def main_loop(self): '''Loop forever polling for more blocks.''' daemon_height = await self.daemon.height() - if daemon_height > self.fetched_height: - log_msg = 'catching up to daemon height {:,d}...' - else: + if self.fetched_height >= daemon_height: log_msg = 'caught up to daemon height {:,d}' + else: + log_msg = 'catching up to daemon height {:,d}...' self.logger.info(log_msg.format(daemon_height)) while True: try: - secs = 0 - if self.cache_size < self.target_cache_size: - if not await self._prefetch(): - self.caught_up = True - secs = 5 - self.tasks.put_nowait(None) - await asyncio.sleep(secs) + with await self.semaphore: + await self._prefetch_blocks() + await self.refill_event.wait() except DaemonError as e: self.logger.info('ignoring daemon error: {}'.format(e)) except asyncio.CancelledError: - break + await self.clear(-1) + return + + async def _prefetch_blocks(self): + '''Prefetch some blocks and put them on the queue. - async def _prefetch(self): - '''Prefetch blocks unless the prefetch queue is full.''' - # Refresh the mempool after updating the daemon height, if and - # only if we've caught up + Repeats until the queue is full or caught up. If caught up, + sleep for a period of time before returning. + ''' daemon_height = await self.daemon.height(mempool=self.caught_up) - cache_room = self.target_cache_size // self.ave_size - with await self.semaphore: + while self.cache_size < self.min_cache_size: # Try and catch up all blocks but limit to room in cache. - # Constrain count to between 0 and 4000 regardless + # Constrain fetch count to between 0 and 2500 regardless. + cache_room = self.min_cache_size // self.ave_size count = min(daemon_height - self.fetched_height, cache_room) - count = min(4000, max(count, 0)) + count = min(2500, max(count, 0)) if not count: - return 0 + self.cache.put_nowait(([], 0)) + self.caught_up = True + await asyncio.sleep(5) + return first = self.fetched_height + 1 hex_hashes = await self.daemon.block_hex_hashes(first, count) @@ -108,20 +122,24 @@ class Prefetcher(LoggedClass): self.logger.info('new block height {:,d} hash {}' .format(first + count - 1, hex_hashes[-1])) blocks = await self.daemon.raw_blocks(hex_hashes) + assert count == len(blocks) - size = sum(len(block) for block in blocks) + # Strip the unspendable genesis coinbase + if first == 0: + blocks[0] = blocks[0][:self.coin.HEADER_LEN] + bytes(1) # Update our recent average block size estimate + size = sum(len(block) for block in blocks) if count >= 10: self.ave_size = size // count else: self.ave_size = (size + (10 - count) * self.ave_size) // 10 - self.cache.insert(0, (blocks, size)) + self.cache.put_nowait((blocks, size)) self.cache_size += size - self.fetched_height += len(blocks) + self.fetched_height += count - return count + self.refill_event.clear() class ChainError(Exception): @@ -141,9 +159,6 @@ class BlockProcessor(server.db.DB): def __init__(self, env): super().__init__(env) - # The block processor reads its tasks from this queue - self.tasks = asyncio.Queue() - # These are our state as we move ahead of DB state self.fs_height = self.db_height self.fs_tx_count = self.db_tx_count @@ -152,9 +167,7 @@ class BlockProcessor(server.db.DB): self.tx_count = self.db_tx_count self.daemon = Daemon(self.coin.daemon_urls(env.daemon_url)) - self.caught_up = False - self._shutdown = False - self.event = asyncio.Event() + self.caught_up_event = asyncio.Event() # Meta self.utxo_MB = env.utxo_MB @@ -164,7 +177,7 @@ class BlockProcessor(server.db.DB): # Headers and tx_hashes have one entry per block self.history = defaultdict(partial(array.array, 'I')) self.history_size = 0 - self.prefetcher = Prefetcher(self.tasks, self.daemon, self.height) + self.prefetcher = Prefetcher(self.coin, self.daemon, self.height) self.last_flush = time.time() self.last_flush_tx_count = self.tx_count @@ -194,52 +207,32 @@ class BlockProcessor(server.db.DB): await self.handle_chain_reorg(set(), self.env.force_reorg) while True: - task = await self.tasks.get() - if self._shutdown: - break - blocks = self.prefetcher.get_blocks() + blocks = await self.prefetcher.get_blocks() if blocks: - start = time.time() await self.advance_blocks(blocks, touched) - if not self.first_sync: - s = '' if len(blocks) == 1 else 's' - self.logger.info('processed {:,d} block{} in {:.1f}s' - .format(len(blocks), s, - time.time() - start)) - elif not self.caught_up: - self.caught_up = True - self.first_caught_up() + elif blocks is None: + break # Shutdown + else: + self.caught_up() + self.logger.info('flushing state to DB for a clean shutdown...') self.flush(True) - self.logger.info('shut down complete') - - def shutdown(self): - '''Call to shut down the block processor.''' - self.logger.info('flushing state to DB for clean shutdown...') - self._shutdown = True - self.tasks.put_nowait(None) + self.logger.info('shutdown complete') async def advance_blocks(self, blocks, touched): - '''Strip the unspendable genesis coinbase.''' - if self.height == -1: - blocks[0] = blocks[0][:self.coin.HEADER_LEN] + bytes(1) - - def do_it(): + '''Process the list of blocks passed. Detects and handles reorgs.''' + def job(): for block in blocks: - if self._shutdown: - break self.advance_block(block, touched) + start = time.time() loop = asyncio.get_event_loop() try: - if self.caught_up: - await loop.run_in_executor(None, do_it) - else: - do_it() + await loop.run_in_executor(None, job) except ChainReorg: await self.handle_chain_reorg(touched) - if self.caught_up: + if self.caught_up_event.is_set(): # Flush everything as queries are performed on the DB and # not in-memory. await asyncio.sleep(0) @@ -248,16 +241,23 @@ class BlockProcessor(server.db.DB): self.check_cache_size() self.next_cache_check = time.time() + 60 - def first_caught_up(self): + if not self.first_sync: + s = '' if len(blocks) == 1 else 's' + self.logger.info('processed {:,d} block{} in {:.1f}s' + .format(len(blocks), s, + time.time() - start)) + + def caught_up(self): '''Called when first caught up after starting.''' - self.flush(True) - if self.first_sync: - self.logger.info('{} synced to height {:,d}' - .format(VERSION, self.height)) - self.first_sync = False + if not self.caught_up_event.is_set(): + self.flush(True) + if self.first_sync: + self.logger.info('{} synced to height {:,d}' + .format(VERSION, self.height)) + self.first_sync = False self.flush_state(self.db) self.reopen_db(False) - self.event.set() + self.caught_up_event.set() async def handle_chain_reorg(self, touched, count=None): '''Handle a chain reorganisation. diff --git a/server/protocol.py b/server/protocol.py index 120c87b..949bbc5 100644 --- a/server/protocol.py +++ b/server/protocol.py @@ -183,8 +183,8 @@ class ServerManager(util.LoggedClass): # shutdown() assumes bp.main_loop() is first add_future(self.bp.main_loop(self.mempool.touched)) add_future(self.bp.prefetcher.main_loop()) - add_future(self.irc.start(self.bp.event)) - add_future(self.start_servers(self.bp.event)) + add_future(self.irc.start(self.bp.caught_up_event)) + add_future(self.start_servers(self.bp.caught_up_event)) add_future(self.mempool.main_loop()) add_future(self.enqueue_delayed_sessions()) add_future(self.notify()) @@ -307,12 +307,12 @@ class ServerManager(util.LoggedClass): '''Call to shutdown everything. Returns when done.''' self.state = self.SHUTTING_DOWN self.close_servers(list(self.servers.keys())) - self.bp.shutdown() # Don't cancel the block processor main loop - let it close itself for future in self.futures[1:]: future.cancel() if self.sessions: await self.close_sessions() + await self.futures[0] async def close_sessions(self, secs=30): self.logger.info('cleanly closing client sessions, please wait...')