From ace80c7b871c78dd8d8feef01ee5bfe5d5ab5373 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Thu, 20 Oct 2016 18:36:30 +0900 Subject: [PATCH] Split out the prefetcher. --- server/block_processor.py | 127 +++++++++++++++++++++----------------- server/controller.py | 7 +-- 2 files changed, 73 insertions(+), 61 deletions(-) diff --git a/server/block_processor.py b/server/block_processor.py index a9f8456..e605867 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -7,31 +7,90 @@ from server.daemon import DaemonError from lib.util import LoggedClass +class Prefetcher(LoggedClass): + '''Prefetches blocks (in the forward direction only).''' + + def __init__(self, daemon, height): + super().__init__() + self.daemon = daemon + self.queue = asyncio.Queue() + self.queue_semaphore = asyncio.Semaphore() + self.queue_size = 0 + # Target cache size. Has little effect on sync time. + self.target_cache_size = 10 * 1024 * 1024 + self.fetched_height = height + self.recent_sizes = [0] + + async def get_blocks(self): + '''Returns a list of prefetched blocks.''' + blocks, total_size = await self.queue.get() + self.queue_size -= total_size + return blocks + + async def start(self): + '''Loops forever polling for more blocks.''' + self.logger.info('prefetching blocks...') + while True: + while self.queue_size < self.target_cache_size: + try: + await self._prefetch() + except DaemonError as e: + self.logger.info('ignoring daemon errors: {}'.format(e)) + await asyncio.sleep(2) + + def _prefill_count(self, room): + ave_size = sum(self.recent_sizes) // len(self.recent_sizes) + count = room // ave_size if ave_size else 0 + return max(count, 10) + + async def _prefetch(self): + '''Prefetch blocks if there are any to prefetch.''' + daemon_height = await self.daemon.height() + max_count = min(daemon_height - self.fetched_height, 4000) + count = min(max_count, self._prefill_count(self.target_cache_size)) + first = self.fetched_height + 1 + hashes = await self.daemon.block_hex_hashes(first, count) + if not hashes: + return + + blocks = await self.daemon.raw_blocks(hashes) + sizes = [len(block) for block in blocks] + total_size = sum(sizes) + self.queue.put_nowait((blocks, total_size)) + self.queue_size += total_size + self.fetched_height += len(blocks) + + # Keep 50 most recent block sizes for fetch count estimation + self.recent_sizes.extend(sizes) + excess = len(self.recent_sizes) - 50 + if excess > 0: + self.recent_sizes = self.recent_sizes[excess:] + + class BlockProcessor(LoggedClass): - '''Requests and caches blocks ahead of time from the daemon. Serves - them to the blockchain processor. Coordinates backing up in case of - block chain reorganisations. + '''Process blocks and update the DB state to match. + + Employ a prefetcher to prefetch blocks in batches for processing. + Coordinate backing up in case of chain reorganisations. ''' def __init__(self, db, daemon): super().__init__() self.db = db self.daemon = daemon - # Target cache size. Has little effect on sync time. - self.target_cache_size = 10 * 1024 * 1024 - self.fetched_height = db.height - self.queue = asyncio.Queue() - self.queue_size = 0 - self.recent_sizes = [0] + self.prefetcher = Prefetcher(daemon, db.height) + + def coros(self): + return [self.start(), self.prefetcher.start()] def flush_db(self): self.db.flush(self.daemon.cached_height(), True) - async def process_blocks(self): + async def start(self): + '''Loop forever processing blocks in the appropriate direction.''' try: while True: - blocks, total_size = await self.queue.get() - self.queue_size -= total_size + blocks = await self.prefetcher.get_blocks() for block in blocks: self.db.process_block(block, self.daemon.cached_height()) # Release asynchronous block fetching @@ -43,47 +102,3 @@ class BlockProcessor(LoggedClass): self.flush_db() finally: self.flush_db() - - async def prefetcher(self): - '''Loops forever polling for more blocks.''' - self.logger.info('prefetching blocks...') - while True: - try: - await self.maybe_prefetch() - except DaemonError as e: - self.logger.info('ignoring daemon errors: {}'.format(e)) - await asyncio.sleep(2) - - def cache_used(self): - return sum(len(block) for block in self.blocks) - - def prefill_count(self, room): - ave_size = sum(self.recent_sizes) // len(self.recent_sizes) - count = room // ave_size if ave_size else 0 - return max(count, 10) - - async def maybe_prefetch(self): - '''Prefetch blocks if there are any to prefetch.''' - while self.queue_size < self.target_cache_size: - # Keep going by getting a whole new cache_limit of blocks - daemon_height = await self.daemon.height() - max_count = min(daemon_height - self.fetched_height, 4000) - count = min(max_count, self.prefill_count(self.target_cache_size)) - if not count: - break - - first = self.fetched_height + 1 - hashes = await self.daemon.block_hex_hashes(first, count) - blocks = await self.daemon.raw_blocks(hashes) - - self.fetched_height += count - sizes = [len(block) for block in blocks] - total_size = sum(sizes) - self.queue.put_nowait((blocks, total_size)) - self.queue_size += total_size - - # Keep 50 most recent block sizes for fetch count estimation - self.recent_sizes.extend(sizes) - excess = len(self.recent_sizes) - 50 - if excess > 0: - self.recent_sizes = self.recent_sizes[excess:] diff --git a/server/controller.py b/server/controller.py index 5515de8..f0f4240 100644 --- a/server/controller.py +++ b/server/controller.py @@ -27,7 +27,7 @@ class Controller(LoggedClass): self.env = env self.db = DB(env) self.daemon = Daemon(env.daemon_url) - self.block_cache = BlockProcessor(self.db, self.daemon) + self.block_processor = BlockProcessor(self.db, self.daemon) self.servers = [] self.sessions = set() self.addresses = {} @@ -61,10 +61,7 @@ class Controller(LoggedClass): self.logger.info('SSL server listening on {}:{:d}' .format(env.host, env.ssl_port)) - coros = [ - self.block_cache.prefetcher(), - self.block_cache.process_blocks(), - ] + coros = self.block_processor.coros() for coro in coros: asyncio.ensure_future(coro)