From 370cceab83ce312ff3240bc9717293ac054b8f62 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Thu, 13 Oct 2016 22:10:34 +0900 Subject: [PATCH] Clean up RPC handling Remove class Handle exceptions properly by cancelling tasks Log what is happening Generalise send interface --- server/server.py | 157 ++++++++++++++++++++--------------------------- server_main.py | 2 + 2 files changed, 67 insertions(+), 92 deletions(-) diff --git a/server/server.py b/server/server.py index 01ee46d..1e96080 100644 --- a/server/server.py +++ b/server/server.py @@ -19,27 +19,37 @@ class Server(object): def __init__(self, env): self.env = env self.db = DB(env) - self.rpc = RPC(env) - self.block_cache = BlockCache(env, self.db, self.rpc) - - def async_tasks(self): - return [ + self.block_cache = BlockCache(env, self.db) + self.tasks = [ asyncio.ensure_future(self.block_cache.catch_up()), asyncio.ensure_future(self.block_cache.process_cache()), ] + loop = asyncio.get_event_loop() + for signame in ('SIGINT', 'SIGTERM'): + loop.add_signal_handler(getattr(signal, signame), + partial(self.on_signal, signame)) + + def on_signal(self, signame): + logging.warning('received {} signal, preparing to shut down' + .format(signame)) + for task in self.tasks: + task.cancel() + + def async_tasks(self): + return self.tasks + class BlockCache(object): '''Requests blocks ahead of time from the daemon. Serves them to the blockchain processor.''' - def __init__(self, env, db, rpc): + def __init__(self, env, db): self.logger = logging.getLogger('BlockCache') self.logger.setLevel(logging.INFO) self.db = db - self.rpc = rpc - self.stop = False + self.rpc_url = env.rpc_url # Cache target size is in MB. Has little effect on sync time. self.cache_limit = 10 self.daemon_height = 0 @@ -49,37 +59,26 @@ class BlockCache(object): self.recent_sizes = [] self.ave_size = 0 - loop = asyncio.get_event_loop() - for signame in ('SIGINT', 'SIGTERM'): - loop.add_signal_handler(getattr(signal, signame), - partial(self.on_signal, signame)) - - def on_signal(self, signame): - logging.warning('Received {} signal, preparing to shut down' - .format(signame)) - self.blocks = [] - self.stop = True + self.logger.info('using RPC URL {}'.format(self.rpc_url)) async def process_cache(self): - while not self.stop: + while True: await asyncio.sleep(1) while self.blocks: self.db.process_block(self.blocks.pop(), self.daemon_height) # Release asynchronous block fetching await asyncio.sleep(0) - self.db.flush_all(self.daemon_height) - async def catch_up(self): self.logger.info('catching up, block cache limit {:d}MB...' .format(self.cache_limit)) - while await self.maybe_prefill(): - await asyncio.sleep(1) - - if not self.stop: + try: + while await self.maybe_prefill(): + await asyncio.sleep(1) self.logger.info('caught up to height {:d}' .format(self.daemon_height)) + finally: self.db.flush_all(self.daemon_height) def cache_used(self): @@ -96,35 +95,26 @@ class BlockCache(object): processing.''' cache_limit = self.cache_limit * 1024 * 1024 while True: - if self.stop: - return False - cache_used = self.cache_used() if cache_used > cache_limit: return True # Keep going by getting a whole new cache_limit of blocks - self.daemon_height = await self.rpc.rpc_single('getblockcount') + self.daemon_height = await self.send_single('getblockcount') max_count = min(self.daemon_height - self.fetched_height, 4000) count = min(max_count, self.prefill_count(cache_limit)) - if not count or self.stop: + if not count: return False # Done catching up first = self.fetched_height + 1 param_lists = [[height] for height in range(first, first + count)] - hashes = await self.rpc.rpc_multi('getblockhash', param_lists) - - if self.stop: - return False + hashes = await self.send_vector('getblockhash', param_lists) # Hashes is an array of hex strings param_lists = [(h, False) for h in hashes] - blocks = await self.rpc.rpc_multi('getblock', param_lists) + blocks = await self.send_vector('getblock', param_lists) self.fetched_height += count - if self.stop: - return False - # Convert hex string to bytes and put in memoryview blocks = [bytes.fromhex(block) for block in blocks] # Reverse order and place at front of list @@ -138,64 +128,47 @@ class BlockCache(object): self.recent_sizes = self.recent_sizes[excess:] self.ave_size = sum(self.recent_sizes) // len(self.recent_sizes) - -class RPC(object): - - def __init__(self, env): - self.logger = logging.getLogger('RPC') - self.logger.setLevel(logging.INFO) - self.rpc_url = env.rpc_url - self.logger.info('using RPC URL {}'.format(self.rpc_url)) - - async def rpc_multi(self, method, param_lists): - payload = [{'method': method, 'params': param_list} - for param_list in param_lists] - while True: - dresults = await self.daemon(payload) - errs = [dresult['error'] for dresult in dresults] - if not any(errs): - return [dresult['result'] for dresult in dresults] - for err in errs: - if err.get('code') == -28: - self.logger.warning('daemon still warming up...') - secs = 10 - break - else: - self.logger.error('daemon returned errors: {}'.format(errs)) - secs = 0 - self.logger.info('sleeping {:d} seconds and trying again...' - .format(secs)) - await asyncio.sleep(secs) - - - async def rpc_single(self, method, params=None): + async def send_single(self, method, params=None): payload = {'method': method} if params: payload['params'] = params - while True: - dresult = await self.daemon(payload) - err = dresult['error'] - if not err: - return dresult['result'] - if err.get('code') == -28: - self.logger.warning('daemon still warming up...') - secs = 10 - else: - self.logger.error('daemon returned error: {}'.format(err)) - secs = 0 - self.logger.info('sleeping {:d} seconds and trying again...' - .format(secs)) - await asyncio.sleep(secs) - - async def daemon(self, payload): + result, = await self.send((payload, )) + return result + + async def send_many(self, mp_pairs): + payload = [{'method': method, 'params': params} + for method, params in mp_pairs] + return await self.send(payload) + + async def send_vector(self, method, params_list): + payload = [{'method': method, 'params': params} + for params in params_list] + return await self.send(payload) + + async def send(self, payload): + assert isinstance(payload, (tuple, list)) + data = json.dumps(payload) while True: try: - async with aiohttp.ClientSession() as session: - async with session.post(self.rpc_url, - data=json.dumps(payload)) as resp: - return await resp.json() + async with aiohttp.request('POST', self.rpc_url, + data = data) as resp: + result = await resp.json() + except asyncio.CancelledError: + raise except Exception as e: - self.logger.error('aiohttp error: {}'.format(e)) + msg = 'aiohttp error: {}'.format(e) + secs = 3 + else: + errs = tuple(item['error'] for item in result) + if not any(errs): + return tuple(item['result'] for item in result) + if any(err.get('code') == -28 for err in errs): + msg = 'daemon still warming up...' + secs = 10 + else: + msg = 'daemon errors: {}'.format(errs) + secs = 1 - self.logger.info('sleeping 1 second and trying again...') - await asyncio.sleep(1) + self.logger.error('{}. Sleeping {:d}s and trying again...' + .format(msg, secs)) + await asyncio.sleep(secs) diff --git a/server_main.py b/server_main.py index 0ebd347..3166d24 100755 --- a/server_main.py +++ b/server_main.py @@ -28,6 +28,8 @@ def main_loop(): loop = asyncio.get_event_loop() try: loop.run_until_complete(asyncio.gather(*tasks)) + except asyncio.CancelledError: + logging.warning('task cancelled; asyncio event loop closing') finally: loop.close()