diff --git a/server/block_processor.py b/server/block_processor.py index 7397582..9319fe4 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -49,16 +49,12 @@ class Prefetcher(LoggedClass): self.semaphore = asyncio.Semaphore() self.queue = asyncio.Queue() self.queue_size = 0 + self.fetched_height = height + self.mempool = [] # Target cache size. Has little effect on sync time. self.target_cache_size = 10 * 1024 * 1024 - self.fetched_height = height - self.recent_sizes = [0] - - async def get_blocks(self): - '''Returns a list of prefetched blocks.''' - blocks, total_size = await self.queue.get() - self.queue_size -= total_size - return blocks + # First fetch to be 10 blocks + self.ave_size = self.target_cache_size // 10 async def clear(self, height): '''Clear prefetched blocks and restart from the given height. @@ -73,49 +69,73 @@ class Prefetcher(LoggedClass): self.queue_size = 0 self.fetched_height = height + async def get_blocks(self): + '''Returns a list of prefetched blocks and the mempool.''' + blocks, height, size = await self.queue.get() + self.queue_size -= size + if height == self.daemon.cached_height(): + return blocks, self.mempool + else: + return blocks, None + async def start(self): '''Loop forever polling for more blocks.''' - self.logger.info('looping forever prefetching blocks...') + self.logger.info('starting prefetch loop...') while True: - while self.queue_size < self.target_cache_size: - try: - with await self.semaphore: - await self._prefetch() - except DaemonError as e: - self.logger.info('ignoring daemon errors: {}'.format(e)) - await asyncio.sleep(2) - - def _prefill_count(self, room): - ave_size = sum(self.recent_sizes) // len(self.recent_sizes) - count = room // ave_size if ave_size else 0 - return max(count, 10) + try: + if await self._caught_up(): + await asyncio.sleep(5) + else: + await asyncio.sleep(0) + except DaemonError as e: + self.logger.info('ignoring daemon errors: {}'.format(e)) + + async def _caught_up(self): + '''Poll for new blocks and mempool state. + + Mempool is only queried if caught up with daemon.''' + with await self.semaphore: + blocks, size = await self._prefetch() + self.fetched_height += len(blocks) + caught_up = self.fetched_height == self.daemon.cached_height() + if caught_up: + self.mempool = await self.daemon.mempool_hashes() + + # Wake up block processor if we have something + if blocks or caught_up: + self.queue.put_nowait((blocks, self.fetched_height, size)) + self.queue_size += size + + return caught_up async def _prefetch(self): - '''Prefetch blocks if there are any to prefetch.''' + '''Prefetch blocks unless the prefetch queue is full.''' + if self.queue_size >= self.target_cache_size: + return [], 0 + daemon_height = await self.daemon.height() - max_count = min(daemon_height - self.fetched_height, 4000) - count = min(max_count, self._prefill_count(self.target_cache_size)) + cache_room = self.target_cache_size // self.ave_size + + # Try and catch up all blocks but limit to room in cache. + # Constrain count to between 0 and 4000 regardless + count = min(daemon_height - self.fetched_height, cache_room) + count = min(4000, max(count, 0)) if not count: - return 0 + return [], 0 + first = self.fetched_height + 1 hex_hashes = await self.daemon.block_hex_hashes(first, count) - if not hex_hashes: - self.logger.error('requested {:,d} hashes, got none'.format(count)) - return 0 - blocks = await self.daemon.raw_blocks(hex_hashes) - sizes = [len(block) for block in blocks] - total_size = sum(sizes) - self.queue.put_nowait((blocks, total_size)) - self.queue_size += total_size - self.fetched_height += len(blocks) - # Keep 50 most recent block sizes for fetch count estimation - self.recent_sizes.extend(sizes) - excess = len(self.recent_sizes) - 50 - if excess > 0: - self.recent_sizes = self.recent_sizes[excess:] - return count + size = sum(len(block) for block in blocks) + + # Update our recent average block size estimate + if count >= 10: + self.ave_size = size // count + else: + self.ave_size = (size + (10 - count) * self.ave_size) // 10 + + return blocks, size class BlockProcessor(LoggedClass): @@ -224,7 +244,7 @@ class BlockProcessor(LoggedClass): async def wait_for_blocks(self): '''Loop forever processing blocks in the forward direction.''' while True: - blocks = await self.prefetcher.get_blocks() + blocks, mempool = await self.prefetcher.get_blocks() for block in blocks: if not self.advance_block(block): await self.handle_chain_reorg()