From a813eaf5f57be847fb0c350bb776522f2b032375 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Thu, 20 Oct 2016 18:24:23 +0900 Subject: [PATCH] Rename BlockCache and put in own file --- server/block_processor.py | 89 +++++++++++++++++++++++++++++++++++++++ server/controller.py | 86 ++----------------------------------- 2 files changed, 92 insertions(+), 83 deletions(-) create mode 100644 server/block_processor.py diff --git a/server/block_processor.py b/server/block_processor.py new file mode 100644 index 0000000..a9f8456 --- /dev/null +++ b/server/block_processor.py @@ -0,0 +1,89 @@ +# See the file "LICENSE" for information about the copyright +# and warranty status of this software. + +import asyncio + +from server.daemon import DaemonError +from lib.util import LoggedClass + + +class BlockProcessor(LoggedClass): + '''Requests and caches blocks ahead of time from the daemon. Serves + them to the blockchain processor. Coordinates backing up in case of + block chain reorganisations. + ''' + + def __init__(self, db, daemon): + super().__init__() + self.db = db + self.daemon = daemon + # Target cache size. Has little effect on sync time. + self.target_cache_size = 10 * 1024 * 1024 + self.fetched_height = db.height + self.queue = asyncio.Queue() + self.queue_size = 0 + self.recent_sizes = [0] + + def flush_db(self): + self.db.flush(self.daemon.cached_height(), True) + + async def process_blocks(self): + try: + while True: + blocks, total_size = await self.queue.get() + self.queue_size -= total_size + for block in blocks: + self.db.process_block(block, self.daemon.cached_height()) + # Release asynchronous block fetching + await asyncio.sleep(0) + + if self.db.height == self.daemon.cached_height(): + self.logger.info('caught up to height {:d}' + .format(self.db_height)) + self.flush_db() + finally: + self.flush_db() + + async def prefetcher(self): + '''Loops forever polling for more blocks.''' + self.logger.info('prefetching blocks...') + while True: + try: + await self.maybe_prefetch() + except DaemonError as e: + self.logger.info('ignoring daemon errors: {}'.format(e)) + await asyncio.sleep(2) + + def cache_used(self): + return sum(len(block) for block in self.blocks) + + def prefill_count(self, room): + ave_size = sum(self.recent_sizes) // len(self.recent_sizes) + count = room // ave_size if ave_size else 0 + return max(count, 10) + + async def maybe_prefetch(self): + '''Prefetch blocks if there are any to prefetch.''' + while self.queue_size < self.target_cache_size: + # Keep going by getting a whole new cache_limit of blocks + daemon_height = await self.daemon.height() + max_count = min(daemon_height - self.fetched_height, 4000) + count = min(max_count, self.prefill_count(self.target_cache_size)) + if not count: + break + + first = self.fetched_height + 1 + hashes = await self.daemon.block_hex_hashes(first, count) + blocks = await self.daemon.raw_blocks(hashes) + + self.fetched_height += count + sizes = [len(block) for block in blocks] + total_size = sum(sizes) + self.queue.put_nowait((blocks, total_size)) + self.queue_size += total_size + + # Keep 50 most recent block sizes for fetch count estimation + self.recent_sizes.extend(sizes) + excess = len(self.recent_sizes) - 50 + if excess > 0: + self.recent_sizes = self.recent_sizes[excess:] diff --git a/server/controller.py b/server/controller.py index 5c880e0..5515de8 100644 --- a/server/controller.py +++ b/server/controller.py @@ -7,6 +7,7 @@ import traceback from functools import partial from server.daemon import Daemon, DaemonError +from server.block_processor import BlockProcessor from server.db import DB from server.protocol import ElectrumX, LocalRPC from lib.hash import (sha256, double_sha256, hash_to_str, @@ -19,14 +20,14 @@ class Controller(LoggedClass): def __init__(self, loop, env): '''Create up the controller. - Creates DB, Daemon and BlockCache instances. + Creates DB, Daemon and BlockProcessor instances. ''' super().__init__() self.loop = loop self.env = env self.db = DB(env) self.daemon = Daemon(env.daemon_url) - self.block_cache = BlockCache(self.db, self.daemon) + self.block_cache = BlockProcessor(self.db, self.daemon) self.servers = [] self.sessions = set() self.addresses = {} @@ -151,84 +152,3 @@ class Controller(LoggedClass): '''Returns a dictionary of IRC nick to (ip, host, ports) tuples, one per peer.''' return self.peers - -class BlockCache(LoggedClass): - '''Requests and caches blocks ahead of time from the daemon. Serves - them to the blockchain processor. Coordinates backing up in case of - block chain reorganisations. - ''' - - def __init__(self, db, daemon): - super().__init__() - self.db = db - self.daemon = daemon - # Target cache size. Has little effect on sync time. - self.target_cache_size = 10 * 1024 * 1024 - self.fetched_height = db.height - self.queue = asyncio.Queue() - self.queue_size = 0 - self.recent_sizes = [0] - - def flush_db(self): - self.db.flush(self.daemon.cached_height(), True) - - async def process_blocks(self): - try: - while True: - blocks, total_size = await self.queue.get() - self.queue_size -= total_size - for block in blocks: - self.db.process_block(block, self.daemon.cached_height()) - # Release asynchronous block fetching - await asyncio.sleep(0) - - if self.db.height == self.daemon.cached_height(): - self.logger.info('caught up to height {:d}' - .format(self.db_height)) - self.flush_db() - finally: - self.flush_db() - - async def prefetcher(self): - '''Loops forever polling for more blocks.''' - self.logger.info('prefetching blocks...') - while True: - try: - await self.maybe_prefetch() - except DaemonError as e: - self.logger.info('ignoring daemon errors: {}'.format(e)) - await asyncio.sleep(2) - - def cache_used(self): - return sum(len(block) for block in self.blocks) - - def prefill_count(self, room): - ave_size = sum(self.recent_sizes) // len(self.recent_sizes) - count = room // ave_size if ave_size else 0 - return max(count, 10) - - async def maybe_prefetch(self): - '''Prefetch blocks if there are any to prefetch.''' - while self.queue_size < self.target_cache_size: - # Keep going by getting a whole new cache_limit of blocks - daemon_height = await self.daemon.height() - max_count = min(daemon_height - self.fetched_height, 4000) - count = min(max_count, self.prefill_count(self.target_cache_size)) - if not count: - break - - first = self.fetched_height + 1 - hashes = await self.daemon.block_hex_hashes(first, count) - blocks = await self.daemon.raw_blocks(hashes) - - self.fetched_height += count - sizes = [len(block) for block in blocks] - total_size = sum(sizes) - self.queue.put_nowait((blocks, total_size)) - self.queue_size += total_size - - # Keep 50 most recent block sizes for fetch count estimation - self.recent_sizes.extend(sizes) - excess = len(self.recent_sizes) - 50 - if excess > 0: - self.recent_sizes = self.recent_sizes[excess:]