From 897e68d20cc12006a6176c5eeeea6fbc93db2b1d Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Thu, 20 Oct 2016 18:18:55 +0900 Subject: [PATCH] Move some daemon logic to daemon.py --- server/controller.py | 24 ++++++++---------------- server/daemon.py | 40 ++++++++++++++++++++++++++++++++++------ 2 files changed, 42 insertions(+), 22 deletions(-) diff --git a/server/controller.py b/server/controller.py index 1f58959..5c880e0 100644 --- a/server/controller.py +++ b/server/controller.py @@ -164,14 +164,13 @@ class BlockCache(LoggedClass): self.daemon = daemon # Target cache size. Has little effect on sync time. self.target_cache_size = 10 * 1024 * 1024 - self.daemon_height = 0 self.fetched_height = db.height self.queue = asyncio.Queue() self.queue_size = 0 self.recent_sizes = [0] def flush_db(self): - self.db.flush(self.daemon_height, True) + self.db.flush(self.daemon.cached_height(), True) async def process_blocks(self): try: @@ -179,13 +178,13 @@ class BlockCache(LoggedClass): blocks, total_size = await self.queue.get() self.queue_size -= total_size for block in blocks: - self.db.process_block(block, self.daemon_height) + self.db.process_block(block, self.daemon.cached_height()) # Release asynchronous block fetching await asyncio.sleep(0) - if self.db.height == self.daemon_height: + if self.db.height == self.daemon.cached_height(): self.logger.info('caught up to height {:d}' - .format(self.daemon_height)) + .format(self.db_height)) self.flush_db() finally: self.flush_db() @@ -210,26 +209,19 @@ class BlockCache(LoggedClass): async def maybe_prefetch(self): '''Prefetch blocks if there are any to prefetch.''' - daemon = self.daemon while self.queue_size < self.target_cache_size: # Keep going by getting a whole new cache_limit of blocks - self.daemon_height = await daemon.send_single('getblockcount') - max_count = min(self.daemon_height - self.fetched_height, 4000) + daemon_height = await self.daemon.height() + max_count = min(daemon_height - self.fetched_height, 4000) count = min(max_count, self.prefill_count(self.target_cache_size)) if not count: break first = self.fetched_height + 1 - param_lists = [[height] for height in range(first, first + count)] - hashes = await daemon.send_vector('getblockhash', param_lists) + hashes = await self.daemon.block_hex_hashes(first, count) + blocks = await self.daemon.raw_blocks(hashes) - # Hashes is an array of hex strings - param_lists = [(h, False) for h in hashes] - blocks = await daemon.send_vector('getblock', param_lists) self.fetched_height += count - - # Convert hex string to bytes - blocks = [bytes.fromhex(block) for block in blocks] sizes = [len(block) for block in blocks] total_size = sum(sizes) self.queue.put_nowait((blocks, total_size)) diff --git a/server/daemon.py b/server/daemon.py index c60a4c8..96a07bc 100644 --- a/server/daemon.py +++ b/server/daemon.py @@ -23,6 +23,7 @@ class Daemon(LoggedClass): def __init__(self, url): super().__init__() self.url = url + self._height = None self.logger.info('connecting to daemon at URL {}'.format(url)) async def send_single(self, method, params=None): @@ -33,14 +34,18 @@ class Daemon(LoggedClass): return result async def send_many(self, mp_pairs): - payload = [{'method': method, 'params': params} - for method, params in mp_pairs] - return await self.send(payload) + if mp_pairs: + payload = [{'method': method, 'params': params} + for method, params in mp_pairs] + return await self.send(payload) + return [] async def send_vector(self, method, params_list): - payload = [{'method': method, 'params': params} - for params in params_list] - return await self.send(payload) + if params_list: + payload = [{'method': method, 'params': params} + for params in params_list] + return await self.send(payload) + return [] async def send(self, payload): assert isinstance(payload, (tuple, list)) @@ -68,3 +73,26 @@ class Daemon(LoggedClass): self.logger.error('{}. Sleeping {:d}s and trying again...' .format(msg, secs)) await asyncio.sleep(secs) + + async def block_hex_hashes(self, first, count): + '''Return the hex hashes of count block starting at height first.''' + param_lists = [[height] for height in range(first, first + count)] + return await self.send_vector('getblockhash', param_lists) + + async def raw_blocks(self, hex_hashes): + '''Return the raw binary blocks with the given hex hashes.''' + param_lists = [(h, False) for h in hex_hashes] + blocks = await self.send_vector('getblock', param_lists) + # Convert hex string to bytes + return [bytes.fromhex(block) for block in blocks] + + async def height(self): + '''Query the daemon for its current height.''' + self._height = await self.send_single('getblockcount') + return self._height + + def cached_height(self): + '''Return the cached daemon height. + + If the daemon has not been queried yet this returns None.''' + return self._height