From 9fbbc8bfdb35d0c4607ea6ca7fe6bfd2dddbbdc8 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Sat, 5 Nov 2016 15:59:42 +0900 Subject: [PATCH] Clean up daemon interface. --- server/block_processor.py | 2 +- server/daemon.py | 90 ++++++++++++++++++++++----------------- server/protocol.py | 3 +- 3 files changed, 52 insertions(+), 43 deletions(-) diff --git a/server/block_processor.py b/server/block_processor.py index 6a64d15..ec9891e 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -89,7 +89,7 @@ class Prefetcher(LoggedClass): else: await asyncio.sleep(0) except DaemonError as e: - self.logger.info('ignoring daemon errors: {}'.format(e)) + self.logger.info('ignoring daemon error: {}'.format(e)) async def _caught_up(self): '''Poll for new blocks and mempool state. diff --git a/server/daemon.py b/server/daemon.py index e2406d1..b745076 100644 --- a/server/daemon.py +++ b/server/daemon.py @@ -24,70 +24,80 @@ class DaemonError(Exception): class Daemon(util.LoggedClass): '''Handles connections to a daemon at the given URL.''' + WARMING_UP = -28 + def __init__(self, url): super().__init__() self.url = url self._height = None self.logger.info('connecting to daemon at URL {}'.format(url)) - async def send_single(self, method, params=None): - payload = {'method': method} - if params: - payload['params'] = params - result, = await self.send((payload, )) - return result - - async def send_many(self, mp_pairs): - if mp_pairs: - payload = [{'method': method, 'params': params} - for method, params in mp_pairs] - return await self.send(payload) - return [] - - async def send_vector(self, method, params_list): - if params_list: - payload = [{'method': method, 'params': params} - for params in params_list] - return await self.send(payload) - return [] + @classmethod + def is_warming_up(cls, err): + if not isinstance(err, list): + err = [err] + return any(elt.get('code') == cls.WARMING_UP for elt in err) async def send(self, payload): - assert isinstance(payload, (tuple, list)) + '''Send a payload to be converted to JSON.''' data = json.dumps(payload) + secs = 1 while True: try: async with aiohttp.post(self.url, data=data) as resp: result = await resp.json() - except asyncio.CancelledError: - raise - except Exception as e: - msg = 'aiohttp error: {}'.format(e) - secs = 3 - else: - errs = tuple(item['error'] for item in result) - if not any(errs): - return tuple(item['result'] for item in result) - if any(err.get('code') == -28 for err in errs): - msg = 'daemon still warming up.' - secs = 30 - else: - raise DaemonError(errs) + if not self.is_warming_up(result): + return result + msg = 'daemon is still warming up' + except aiohttp.DisconnectedError as e: + msg = '{}: {}'.format(e.__class__.__name__, e) + secs = min(180, secs * 2) self.logger.error('{}. Sleeping {:d}s and trying again...' .format(msg, secs)) await asyncio.sleep(secs) + async def send_single(self, method, params=None): + '''Send a single request to the daemon.''' + payload = {'method': method} + if params: + payload['params'] = params + item = await self.send(payload) + if item['error']: + raise DaemonError(item['error']) + return item['result'] + + async def send_many(self, mp_iterable): + '''Send several requests at once. + + The results are returned as a tuple.''' + payload = tuple({'method': m, 'params': p} for m, p in mp_iterable) + if payload: + items = await self.send(payload) + errs = tuple(item['error'] for item in items) + if any(errs): + raise DaemonError(errs) + return tuple(item['result'] for item in items) + return () + + async def send_vector(self, method, params_iterable): + '''Send several requests of the same method. + + The results are returned as a tuple.''' + return await self.send_many((method, params) + for params in params_iterable) + async def block_hex_hashes(self, first, count): '''Return the hex hashes of count block starting at height first.''' - param_lists = [[height] for height in range(first, first + count)] - return await self.send_vector('getblockhash', param_lists) + params_iterable = ((h, ) for h in range(first, first + count)) + return await self.send_vector('getblockhash', params_iterable) async def raw_blocks(self, hex_hashes): '''Return the raw binary blocks with the given hex hashes.''' - param_lists = [(h, False) for h in hex_hashes] - blocks = await self.send_vector('getblock', param_lists) + params_iterable = ((h, False) for h in hex_hashes) + blocks = await self.send_vector('getblock', params_iterable) # Convert hex string to bytes - return [bytes.fromhex(block) for block in blocks] + return tuple(bytes.fromhex(block) for block in blocks) async def mempool_hashes(self): '''Return the hashes of the txs in the daemon's mempool.''' diff --git a/server/protocol.py b/server/protocol.py index 69105e2..5c960d1 100644 --- a/server/protocol.py +++ b/server/protocol.py @@ -404,8 +404,7 @@ class ElectrumX(JSONRPC): self.logger.info('sent tx: {}'.format(tx_hash)) return tx_hash except DaemonError as e: - errors = e.args[0] - error = errors[0] + error = e.args[0] message = error['message'] self.logger.info('sendrawtransaction: {}'.format(message)) if 'non-mandatory-script-verify-flag' in message: