From 56274fb9eac01f57f868aee403d484198b2a3df6 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Sat, 12 Nov 2016 07:43:24 +0900 Subject: [PATCH 1/3] Prefix internal methods with an underscore --- server/daemon.py | 41 ++++++++++++++++++----------------------- 1 file changed, 18 insertions(+), 23 deletions(-) diff --git a/server/daemon.py b/server/daemon.py index 7a886fe..b8a166b 100644 --- a/server/daemon.py +++ b/server/daemon.py @@ -45,7 +45,7 @@ class Daemon(util.LoggedClass): .format(height)) self._height = height - async def post(self, data): + async def _post(self, data): '''Send data to the daemon and handle the response.''' async with self.workqueue_semaphore: async with aiohttp.post(self.url, data=data) as resp: @@ -66,7 +66,7 @@ class Daemon(util.LoggedClass): raise DaemonWarmingUpError raise DaemonError(err) - async def send(self, payload): + async def _send(self, payload): '''Send a payload to be converted to JSON. Handles temporary connection issues. Daemon reponse errors @@ -77,7 +77,7 @@ class Daemon(util.LoggedClass): prior_msg = None while True: try: - result = await self.post(data) + result = await self._post(data) if prior_msg: self.logger.info('connection successfully restored') return result @@ -105,34 +105,29 @@ class Daemon(util.LoggedClass): count += 1 secs = min(16, secs * 2) - async def send_single(self, method, params=None): + async def _send_single(self, method, params=None): '''Send a single request to the daemon.''' payload = {'method': method} if params: payload['params'] = params - return await self.send(payload) + return await self._send(payload) - async def send_many(self, mp_iterable): - '''Send several requests at once.''' - payload = [{'method': m, 'params': p} for m, p in mp_iterable] + async def _send_vector(self, method, params_iterable): + '''Send several requests of the same method.''' + payload = [{'method': method, 'params': p} for p in params_iterable] if payload: - return await self.send(payload) + return await self._send(payload) return [] - async def send_vector(self, method, params_iterable): - '''Send several requests of the same method.''' - return await self.send_many((method, params) - for params in params_iterable) - async def block_hex_hashes(self, first, count): '''Return the hex hashes of count block starting at height first.''' params_iterable = ((h, ) for h in range(first, first + count)) - return await self.send_vector('getblockhash', params_iterable) + return await self._send_vector('getblockhash', params_iterable) async def raw_blocks(self, hex_hashes): '''Return the raw binary blocks with the given hex hashes.''' params_iterable = ((h, False) for h in hex_hashes) - blocks = await self.send_vector('getblock', params_iterable) + blocks = await self._send_vector('getblock', params_iterable) # Convert hex string to bytes return [bytes.fromhex(block) for block in blocks] @@ -140,39 +135,39 @@ class Daemon(util.LoggedClass): '''Return the hashes of the txs in the daemon's mempool.''' if self.debug_caught_up: return [] - return await self.send_single('getrawmempool') + return await self._send_single('getrawmempool') async def estimatefee(self, params): '''Return the fee estimate for the given parameters.''' - return await self.send_single('estimatefee', params) + return await self._send_single('estimatefee', params) async def relayfee(self): '''The minimum fee a low-priority tx must pay in order to be accepted to the daemon's memory pool.''' - net_info = await self.send_single('getnetworkinfo') + net_info = await self._send_single('getnetworkinfo') return net_info['relayfee'] async def getrawtransaction(self, hex_hash): '''Return the serialized raw transaction with the given hash.''' - return await self.send_single('getrawtransaction', (hex_hash, 0)) + return await self._send_single('getrawtransaction', (hex_hash, 0)) async def getrawtransactions(self, hex_hashes): '''Return the serialized raw transactions with the given hashes. Breaks large requests up. Yields after each sub request.''' params_iterable = ((hex_hash, 0) for hex_hash in hex_hashes) - txs = await self.send_vector('getrawtransaction', params_iterable) + txs = await self._send_vector('getrawtransaction', params_iterable) # Convert hex strings to bytes return [bytes.fromhex(tx) for tx in txs] async def sendrawtransaction(self, params): '''Broadcast a transaction to the network.''' - return await self.send_single('sendrawtransaction', params) + return await self._send_single('sendrawtransaction', params) async def height(self): '''Query the daemon for its current height.''' if not self.debug_caught_up: - self._height = await self.send_single('getblockcount') + self._height = await self._send_single('getblockcount') return self._height def cached_height(self): From f05e51df2ae0aa3191caee6fa7146a3e7a901332 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Sat, 12 Nov 2016 08:09:40 +0900 Subject: [PATCH 2/3] Skip logging transient errors just once --- server/daemon.py | 41 +++++++++++++++++++++++------------------ 1 file changed, 23 insertions(+), 18 deletions(-) diff --git a/server/daemon.py b/server/daemon.py index b8a166b..c4a3cc6 100644 --- a/server/daemon.py +++ b/server/daemon.py @@ -72,38 +72,43 @@ class Daemon(util.LoggedClass): Handles temporary connection issues. Daemon reponse errors are raise through DaemonError. ''' - data = json.dumps(payload) secs = 1 prior_msg = None + skip_count = 10 + + async def sleep(msg, *, skip_once=False): + skip_count -= 1 + if skip_once and msg != prior_msg: + skip_count = 1 + elif msg != prior_msg or skip_count == 0: + self.logger.error('{}. Retrying between sleeps...' + .format(msg)) + skip_count = 10 + prior_msg = msg + await asyncio.sleep(secs) + secs = min(16, secs * 2) + + data = json.dumps(payload) while True: try: result = await self._post(data) if prior_msg: self.logger.info('connection successfully restored') return result + except (asyncio.CancelledError, DaemonError): + raise except asyncio.TimeoutError: - msg = 'timeout error' + sleep('timeout error', skip_once=True) except aiohttp.ClientHttpProcessingError: - msg = 'HTTP error' + sleep('HTTP error', skip_once=True) except aiohttp.ServerDisconnectedError: - msg = 'disconnected' + sleep('disconnected', skip_once=True) except aiohttp.ClientConnectionError: - msg = 'connection problem - is your daemon running?' + sleep('connection problem - is your daemon running?') except DaemonWarmingUpError: - msg = 'still starting up checking blocks...' - except (asyncio.CancelledError, DaemonError): - raise + sleep('still starting up checking blocks...') except Exception as e: - msg = ('request gave unexpected error: {}'.format(e)) - - if msg != prior_msg or count == 10: - self.logger.error('{}. Retrying between sleeps...' - .format(msg)) - prior_msg = msg - count = 0 - await asyncio.sleep(secs) - count += 1 - secs = min(16, secs * 2) + sleep('request gave unexpected error: {}'.format(e)) async def _send_single(self, method, params=None): '''Send a single request to the daemon.''' From 409ea2c78456392f81b19ea45d5d71646a6de3db Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Sat, 12 Nov 2016 08:47:11 +0900 Subject: [PATCH 3/3] Daemon's send takes a processor Simplest way to process the result and still be able to raise exceptions in the daemon request loop. In particular, we can pass missing txs through as None in getrawtransactions. Fixes #9 --- server/daemon.py | 102 ++++++++++++++++++++++++----------------------- 1 file changed, 52 insertions(+), 50 deletions(-) diff --git a/server/daemon.py b/server/daemon.py index c4a3cc6..4696d76 100644 --- a/server/daemon.py +++ b/server/daemon.py @@ -45,83 +45,84 @@ class Daemon(util.LoggedClass): .format(height)) self._height = height - async def _post(self, data): - '''Send data to the daemon and handle the response.''' - async with self.workqueue_semaphore: - async with aiohttp.post(self.url, data=data) as resp: - result = await resp.json() - - if isinstance(result, list): - errs = [item['error'] for item in result] - if not any(errs): - return [item['result'] for item in result] - if any(err.get('code') == self.WARMING_UP for err in errs if err): - raise DaemonWarmingUpError - raise DaemonError(errs) - else: - err = result['error'] - if not err: - return result['result'] - if err.get('code') == self.WARMING_UP: - raise DaemonWarmingUpError - raise DaemonError(err) - - async def _send(self, payload): + async def _send(self, payload, processor): '''Send a payload to be converted to JSON. Handles temporary connection issues. Daemon reponse errors are raise through DaemonError. ''' - secs = 1 prior_msg = None - skip_count = 10 + skip_count = None - async def sleep(msg, *, skip_once=False): - skip_count -= 1 - if skip_once and msg != prior_msg: + def log_error(msg, skip_once=False): + if skip_once and skip_count is None: skip_count = 1 - elif msg != prior_msg or skip_count == 0: + if msg != prior_msg or skip_count == 0: + skip_count = 10 + prior_msg = msg self.logger.error('{}. Retrying between sleeps...' .format(msg)) - skip_count = 10 - prior_msg = msg - await asyncio.sleep(secs) - secs = min(16, secs * 2) + skip_count -= 1 data = json.dumps(payload) + secs = 1 while True: try: - result = await self._post(data) - if prior_msg: - self.logger.info('connection successfully restored') - return result + async with self.workqueue_semaphore: + async with aiohttp.post(self.url, data=data) as resp: + result = processor(await resp.json()) + if prior_msg: + self.logger.info('connection restored') + return result except (asyncio.CancelledError, DaemonError): raise except asyncio.TimeoutError: - sleep('timeout error', skip_once=True) + log_error('timeout error', skip_once=True) except aiohttp.ClientHttpProcessingError: - sleep('HTTP error', skip_once=True) + log_error('HTTP error', skip_once=True) except aiohttp.ServerDisconnectedError: - sleep('disconnected', skip_once=True) + log_error('disconnected', skip_once=True) except aiohttp.ClientConnectionError: - sleep('connection problem - is your daemon running?') + log_error('connection problem - is your daemon running?') except DaemonWarmingUpError: - sleep('still starting up checking blocks...') + log_error('still starting up checking blocks...') except Exception as e: - sleep('request gave unexpected error: {}'.format(e)) + log_error('request gave unexpected error: {}'.format(e)) + await asyncio.sleep(secs) + secs = min(16, secs * 2) async def _send_single(self, method, params=None): '''Send a single request to the daemon.''' + def processor(result): + err = result['error'] + if not err: + return result['result'] + if err.get('code') == self.WARMING_UP: + raise DaemonWarmingUpError + raise DaemonError(err) + payload = {'method': method} if params: payload['params'] = params - return await self._send(payload) + return await self._send(payload, processor) + + async def _send_vector(self, method, params_iterable, replace_errs=False): + '''Send several requests of the same method. + + The result will be an array of the same length as params_iterable. + If replace_errs is true, any item with an error is returned as None, + othewise an exception is raised.''' + def processor(result): + errs = [item['error'] for item in result if item['error']] + if not errs or replace_errs: + return [item['result'] for item in result] + if any(err.get('code') == self.WARMING_UP for err in errs): + raise DaemonWarmingUpError + raise DaemonError(errs) - async def _send_vector(self, method, params_iterable): - '''Send several requests of the same method.''' payload = [{'method': method, 'params': p} for p in params_iterable] if payload: - return await self._send(payload) + return await self._send(payload, processor) return [] async def block_hex_hashes(self, first, count): @@ -156,14 +157,15 @@ class Daemon(util.LoggedClass): '''Return the serialized raw transaction with the given hash.''' return await self._send_single('getrawtransaction', (hex_hash, 0)) - async def getrawtransactions(self, hex_hashes): + async def getrawtransactions(self, hex_hashes, replace_errs=True): '''Return the serialized raw transactions with the given hashes. - Breaks large requests up. Yields after each sub request.''' + Replaces errors with None by default.''' params_iterable = ((hex_hash, 0) for hex_hash in hex_hashes) - txs = await self._send_vector('getrawtransaction', params_iterable) + txs = await self._send_vector('getrawtransaction', params_iterable, + replace_errs=replace_errs) # Convert hex strings to bytes - return [bytes.fromhex(tx) for tx in txs] + return [bytes.fromhex(tx) if tx else None for tx in txs] async def sendrawtransaction(self, params): '''Broadcast a transaction to the network.'''