diff --git a/electrumx/server/block_processor.py b/electrumx/server/block_processor.py index a3db141..3261b5d 100644 --- a/electrumx/server/block_processor.py +++ b/electrumx/server/block_processor.py @@ -23,17 +23,15 @@ from electrumx.lib.util import chunks, formatted_time, class_logger import electrumx.server.db -RAW_BLOCKS, PREFETCHER_CAUGHT_UP, REORG_CHAIN = range(3) - - class Prefetcher(object): '''Prefetches blocks (in the forward direction only).''' - def __init__(self, daemon, coin, queue): + def __init__(self, daemon, coin, blocks_event): self.logger = class_logger(__name__, self.__class__.__name__) self.daemon = daemon self.coin = coin - self.queue = queue + self.blocks_event = blocks_event + self.blocks = [] self.caught_up = False # Access to fetched_height should be protected by the semaphore self.fetched_height = None @@ -57,11 +55,13 @@ class Prefetcher(object): except DaemonError as e: self.logger.info('ignoring daemon error: {}'.format(e)) - def processing_blocks(self, blocks): + def get_prefetched_blocks(self): '''Called by block processor when it is processing queued blocks.''' - self.cache_size -= sum(len(block) for block in blocks) - if self.cache_size < self.min_cache_size: - self.refill_event.set() + blocks = self.blocks + self.blocks = [] + self.cache_size = 0 + self.refill_event.set() + return blocks async def reset_height(self, height): '''Reset to prefetch blocks from the block processor's height. @@ -71,6 +71,8 @@ class Prefetcher(object): must synchronize with a semaphore. ''' async with self.semaphore: + self.blocks.clear() + self.cache_size = 0 self.fetched_height = height self.refill_event.set() @@ -100,9 +102,7 @@ class Prefetcher(object): count = min(daemon_height - self.fetched_height, cache_room) count = min(500, max(count, 0)) if not count: - if not self.caught_up: - self.caught_up = True - await self.queue.put((PREFETCHER_CAUGHT_UP, )) + self.caught_up = True return False first = self.fetched_height + 1 @@ -127,9 +127,10 @@ class Prefetcher(object): else: self.ave_size = (size + (10 - count) * self.ave_size) // 10 - await self.queue.put((RAW_BLOCKS, blocks, first)) + self.blocks.extend(blocks) self.cache_size += size self.fetched_height += count + self.blocks_event.set() self.refill_event.clear() return True @@ -159,16 +160,16 @@ class BlockProcessor(electrumx.server.db.DB): self.daemon = daemon self.notifications = notifications - # Work queue - self.queue = asyncio.Queue() self._caught_up_event = asyncio.Event() - self.prefetcher = Prefetcher(daemon, env.coin, self.queue) + self.blocks_event = asyncio.Event() + self.prefetcher = Prefetcher(daemon, env.coin, self.blocks_event) # Meta self.cache_MB = env.cache_MB self.next_cache_check = 0 self.last_flush = time.time() self.touched = set() + self.reorg_count = 0 # Header merkle cache self.merkle = Merkle() @@ -197,20 +198,11 @@ class BlockProcessor(electrumx.server.db.DB): ''' self.callbacks.append(callback) - async def check_and_advance_blocks(self, raw_blocks, first): + async def check_and_advance_blocks(self, raw_blocks): '''Process the list of raw blocks passed. Detects and handles reorgs. ''' - self.prefetcher.processing_blocks(raw_blocks) - if first != self.height + 1: - # If we prefetched two sets of blocks and the first caused - # a reorg this will happen when we try to process the - # second. It should be very rare. - self.logger.warning('ignoring {:,d} blocks starting height {:,d}, ' - 'expected {:,d}'.format(len(raw_blocks), first, - self.height + 1)) - return - + first = self.height + 1 blocks = [self.coin.block(raw_block, first + n) for n, raw_block in enumerate(raw_blocks)] headers = [block.header for block in blocks] @@ -748,20 +740,28 @@ class BlockProcessor(electrumx.server.db.DB): self.db_height = self.height self.db_tip = self.tip - async def _process_queue(self): - '''Loop forever processing enqueued work.''' + async def _process_blocks(self): + '''Loop forever processing blocks as they arrive.''' while True: - work, *args = await self.queue.get() - if work == RAW_BLOCKS: - raw_blocks, first = args - await self.check_and_advance_blocks(raw_blocks, first) - elif work == PREFETCHER_CAUGHT_UP: - self._caught_up_event.set() - # Initialise the notification framework - await self.notifications.on_block(set(), self.height) - elif work == REORG_CHAIN: - count, = args - await self.reorg_chain(count) + if self.height == self.daemon.cached_height(): + if not self._caught_up_event.is_set(): + self.logger.info(f'caught up to height {self.height}') + self._caught_up_event.set() + # Flush everything but with first_sync->False state. + first_sync = self.first_sync + self.first_sync = False + self.flush(True) + if first_sync: + self.logger.info(f'{electrumx.version} synced to ' + f'height {self.height:,d}') + await self.blocks_event.wait() + self.blocks_event.clear() + if self.reorg_count: + await self.reorg_chain(self.reorg_count) + self.reorg_count = 0 + else: + blocks = self.prefetcher.get_prefetched_blocks() + await self.check_and_advance_blocks(blocks) def _on_dbs_opened(self): # An incomplete compaction needs to be cancelled otherwise @@ -792,16 +792,11 @@ class BlockProcessor(electrumx.server.db.DB): self.tasks.create_task(self.prefetcher.main_loop()) await self.prefetcher.reset_height(self.height) # Start our loop that processes blocks as they are fetched - self.worker_task = self.tasks.create_task(self._process_queue()) + self.worker_task = self.tasks.create_task(self._process_blocks()) # Wait until caught up await self._caught_up_event.wait() - # Flush everything but with first_sync->False state. - first_sync = self.first_sync - self.first_sync = False - self.flush(True) - if first_sync: - self.logger.info(f'{electrumx.version} synced to ' - f'height {self.height:,d}') + # Initialise the notification framework + await self.notifications.on_block(set(), self.height) # Reopen for serving await self.open_for_serving() @@ -816,7 +811,9 @@ class BlockProcessor(electrumx.server.db.DB): Returns True if a reorg is queued, false if not caught up. ''' if self._caught_up_event.is_set(): - self.queue.put_nowait((REORG_CHAIN, count)) + if count > 0: + self.reorg_count = count + self.blocks_event.set() return True return False