From 13a8b62d8ceb8d77c89f7f7eb72ffdb4b130984f Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Sun, 12 Aug 2018 23:44:18 +0900 Subject: [PATCH 01/14] Daemon constructor passed coin and URLs directly --- electrumx/server/controller.py | 6 ++++-- electrumx/server/daemon.py | 8 ++++---- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/electrumx/server/controller.py b/electrumx/server/controller.py index 5b156c2..6588ea9 100644 --- a/electrumx/server/controller.py +++ b/electrumx/server/controller.py @@ -92,9 +92,11 @@ class Controller(ServerBase): self.logger.info(f'reorg limit is {env.reorg_limit:,d} blocks') notifications = Notifications() - daemon = env.coin.DAEMON(env) - db = DB(env) + Daemon = env.coin.DAEMON BlockProcessor = env.coin.BLOCK_PROCESSOR + + daemon = Daemon(env.coin, env.daemon_url) + db = DB(env) bp = BlockProcessor(env, db, daemon, notifications) # Set ourselves up to implement the MemPoolAPI diff --git a/electrumx/server/daemon.py b/electrumx/server/daemon.py index be22e99..721987e 100644 --- a/electrumx/server/daemon.py +++ b/electrumx/server/daemon.py @@ -37,14 +37,14 @@ class Daemon(object): class DaemonWarmingUpError(Exception): '''Raised when the daemon returns an error in its results.''' - def __init__(self, env): + def __init__(self, coin, urls, max_workqueue=10): + self.coin = coin self.logger = class_logger(__name__, self.__class__.__name__) - self.coin = env.coin - self.set_urls(env.coin.daemon_urls(env.daemon_url)) + self.set_urls(coin.daemon_urls(urls)) self._height = None # Limit concurrent RPC calls to this number. # See DEFAULT_HTTP_WORKQUEUE in bitcoind, which is typically 16 - self.workqueue_semaphore = asyncio.Semaphore(value=10) + self.workqueue_semaphore = asyncio.Semaphore(value=max_workqueue) self.down = False self.last_error_time = 0 self.req_id = 0 From 9ebd2e86e878f8a4c4c4f17d70a3a75ffb0269a5 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Sun, 12 Aug 2018 23:47:52 +0900 Subject: [PATCH 02/14] Use a counter for ID counting --- electrumx/server/daemon.py | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/electrumx/server/daemon.py b/electrumx/server/daemon.py index 721987e..a217897 100644 --- a/electrumx/server/daemon.py +++ b/electrumx/server/daemon.py @@ -9,6 +9,7 @@ daemon.''' import asyncio +import itertools import json import time from calendar import timegm @@ -33,6 +34,7 @@ class Daemon(object): WARMING_UP = -28 RPC_MISC_ERROR = -1 + id_counter = itertools.count() class DaemonWarmingUpError(Exception): '''Raised when the daemon returns an error in its results.''' @@ -47,14 +49,8 @@ class Daemon(object): self.workqueue_semaphore = asyncio.Semaphore(value=max_workqueue) self.down = False self.last_error_time = 0 - self.req_id = 0 self._available_rpcs = {} # caches results for _is_rpc_available() - def next_req_id(self): - '''Retrns the next request ID.''' - self.req_id += 1 - return self.req_id - def set_urls(self, urls): '''Set the URLS to the given list, and switch to the first one.''' if not urls: @@ -162,7 +158,7 @@ class Daemon(object): raise self.DaemonWarmingUpError raise DaemonError(err) - payload = {'method': method, 'id': self.next_req_id()} + payload = {'method': method, 'id': next(self.id_counter)} if params: payload['params'] = params return await self._send(payload, processor) @@ -181,7 +177,7 @@ class Daemon(object): return [item['result'] for item in result] raise DaemonError(errs) - payload = [{'method': method, 'params': p, 'id': self.next_req_id()} + payload = [{'method': method, 'params': p, 'id': next(self.id_counter)} for p in params_iterable] if payload: return await self._send(payload, processor) From 6dafbfd455cbf590cb41970f0886c96ae10b84e4 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Sun, 12 Aug 2018 23:51:36 +0900 Subject: [PATCH 03/14] Make down and last_error_time locals --- electrumx/server/daemon.py | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/electrumx/server/daemon.py b/electrumx/server/daemon.py index a217897..c3d3e1a 100644 --- a/electrumx/server/daemon.py +++ b/electrumx/server/daemon.py @@ -47,8 +47,6 @@ class Daemon(object): # Limit concurrent RPC calls to this number. # See DEFAULT_HTTP_WORKQUEUE in bitcoind, which is typically 16 self.workqueue_semaphore = asyncio.Semaphore(value=max_workqueue) - self.down = False - self.last_error_time = 0 self._available_rpcs = {} # caches results for _is_rpc_available() def set_urls(self, urls): @@ -99,17 +97,20 @@ class Daemon(object): are raise through DaemonError. ''' def log_error(error): - self.down = True + nonlocal down, last_error_time + down = True now = time.time() - prior_time = self.last_error_time + prior_time = last_error_time if now - prior_time > 60: - self.last_error_time = now + last_error_time = now if prior_time and self.failover(): secs = 0 else: self.logger.error('{} Retrying occasionally...' .format(error)) + down = False + last_error_time = 0 data = json.dumps(payload) secs = 1 max_secs = 4 @@ -118,9 +119,7 @@ class Daemon(object): result = await self._send_data(data) if not isinstance(result, tuple): result = processor(result) - if self.down: - self.down = False - self.last_error_time = 0 + if down: self.logger.info('connection restored') return result log_error('HTTP error code {:d}: {}' From 4e40e26ac4784b8d3b540d7a47ae5c32695d7f6f Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Mon, 13 Aug 2018 10:19:08 +0900 Subject: [PATCH 04/14] Move from .format to f'' strings --- electrumx/server/daemon.py | 21 +++++++++------------ 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/electrumx/server/daemon.py b/electrumx/server/daemon.py index c3d3e1a..65067d4 100644 --- a/electrumx/server/daemon.py +++ b/electrumx/server/daemon.py @@ -56,9 +56,9 @@ class Daemon(object): self.urls = urls self.url_index = 0 for n, url in enumerate(urls): - self.logger.info('daemon #{:d} at {}{}' - .format(n + 1, self.logged_url(url), - '' if n else ' (current)')) + status = '' if n else ' (current)' + logged_url = self.logged_url(url) + self.logger.info(f'daemon #{n + 1} at {logged_url}{status}') def url(self): '''Returns the current daemon URL.''' @@ -71,7 +71,7 @@ class Daemon(object): ''' if len(self.urls) > 1: self.url_index = (self.url_index + 1) % len(self.urls) - self.logger.info('failing over to {}'.format(self.logged_url())) + self.logger.info(f'failing over to {self.logged_url()}') return True return False @@ -106,8 +106,7 @@ class Daemon(object): if prior_time and self.failover(): secs = 0 else: - self.logger.error('{} Retrying occasionally...' - .format(error)) + self.logger.error(f'{error} Retrying occasionally...') down = False last_error_time = 0 @@ -122,8 +121,7 @@ class Daemon(object): if down: self.logger.info('connection restored') return result - log_error('HTTP error code {:d}: {}' - .format(result[0], result[1])) + log_error(f'HTTP error code {result[0]}: {result[1]}') except asyncio.TimeoutError: log_error('timeout error.') except aiohttp.ServerDisconnectedError: @@ -202,10 +200,9 @@ class Daemon(object): # probably because we did not provide arguments available = True else: - self.logger.warning('error (code {:d}: {}) when testing ' - 'RPC availability of method {}' - .format(error_code, err.get("message"), - method)) + self.logger.warning(f'error (code {error_code}: ' + f'{err.get("message")}) testing ' + f'RPC availability of method {method}') available = False self._available_rpcs[method] = available return available From 08347fe2751e38e8e72efe15f17701c5f6ee19d8 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Mon, 13 Aug 2018 10:23:30 +0900 Subject: [PATCH 05/14] Simplify _is_rpc_available --- electrumx/server/daemon.py | 21 +++++---------------- 1 file changed, 5 insertions(+), 16 deletions(-) diff --git a/electrumx/server/daemon.py b/electrumx/server/daemon.py index 65067d4..a33a44c 100644 --- a/electrumx/server/daemon.py +++ b/electrumx/server/daemon.py @@ -33,7 +33,6 @@ class Daemon(object): '''Handles connections to a daemon at the given URL.''' WARMING_UP = -28 - RPC_MISC_ERROR = -1 id_counter = itertools.count() class DaemonWarmingUpError(Exception): @@ -47,7 +46,7 @@ class Daemon(object): # Limit concurrent RPC calls to this number. # See DEFAULT_HTTP_WORKQUEUE in bitcoind, which is typically 16 self.workqueue_semaphore = asyncio.Semaphore(value=max_workqueue) - self._available_rpcs = {} # caches results for _is_rpc_available() + self.available_rpcs = {} def set_urls(self, urls): '''Set the URLS to the given list, and switch to the first one.''' @@ -185,26 +184,16 @@ class Daemon(object): Results are cached and the daemon will generally not be queried with the same method more than once.''' - available = self._available_rpcs.get(method, None) + available = self.available_rpcs.get(method) if available is None: + available = True try: await self._send_single(method) - available = True except DaemonError as e: err = e.args[0] error_code = err.get("code") - if error_code == JSONRPC.METHOD_NOT_FOUND: - available = False - elif error_code == self.RPC_MISC_ERROR: - # method found but exception was thrown in command handling - # probably because we did not provide arguments - available = True - else: - self.logger.warning(f'error (code {error_code}: ' - f'{err.get("message")}) testing ' - f'RPC availability of method {method}') - available = False - self._available_rpcs[method] = available + available = error_code != JSONRPC.METHOD_NOT_FOUND + self.available_rpcs[method] = available return available async def block_hex_hashes(self, first, count): From 61711fcfd7bfdbb449a16a5c16853fc19d50d8f3 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Mon, 13 Aug 2018 10:47:22 +0900 Subject: [PATCH 06/14] Clean up daemon URL handling and interface --- electrumx/lib/coins.py | 4 ---- electrumx/server/daemon.py | 28 ++++++++++++++-------------- electrumx/server/session.py | 2 +- 3 files changed, 15 insertions(+), 19 deletions(-) diff --git a/electrumx/lib/coins.py b/electrumx/lib/coins.py index 905b423..2cce41d 100644 --- a/electrumx/lib/coins.py +++ b/electrumx/lib/coins.py @@ -111,10 +111,6 @@ class Coin(object): url = 'http://' + url return url + '/' - @classmethod - def daemon_urls(cls, urls): - return [cls.sanitize_url(url) for url in urls.split(',')] - @classmethod def genesis_block(cls, block): '''Check the Genesis block is the right one for this coin. diff --git a/electrumx/server/daemon.py b/electrumx/server/daemon.py index a33a44c..0136dda 100644 --- a/electrumx/server/daemon.py +++ b/electrumx/server/daemon.py @@ -38,31 +38,36 @@ class Daemon(object): class DaemonWarmingUpError(Exception): '''Raised when the daemon returns an error in its results.''' - def __init__(self, coin, urls, max_workqueue=10): + def __init__(self, coin, url, max_workqueue=10): self.coin = coin self.logger = class_logger(__name__, self.__class__.__name__) - self.set_urls(coin.daemon_urls(urls)) + self.set_url(url) self._height = None # Limit concurrent RPC calls to this number. # See DEFAULT_HTTP_WORKQUEUE in bitcoind, which is typically 16 self.workqueue_semaphore = asyncio.Semaphore(value=max_workqueue) self.available_rpcs = {} - def set_urls(self, urls): + def set_url(self, url): '''Set the URLS to the given list, and switch to the first one.''' - if not urls: - raise DaemonError('no daemon URLs provided') - self.urls = urls - self.url_index = 0 + urls = url.split(',') + urls = [self.coin.sanitize_url(url) for url in urls] for n, url in enumerate(urls): status = '' if n else ' (current)' logged_url = self.logged_url(url) self.logger.info(f'daemon #{n + 1} at {logged_url}{status}') + self.url_index = 0 + self.urls = urls - def url(self): + def current_url(self): '''Returns the current daemon URL.''' return self.urls[self.url_index] + def logged_url(self, url=None): + '''The host and port part, for logging.''' + url = url or self.current_url() + return url[url.rindex('@') + 1:] + def failover(self): '''Call to fail-over to the next daemon URL. @@ -81,7 +86,7 @@ class Daemon(object): async def _send_data(self, data): async with self.workqueue_semaphore: async with self.client_session() as session: - async with session.post(self.url(), data=data) as resp: + async with session.post(self.current_url(), data=data) as resp: # If bitcoind can't find a tx, for some reason # it returns 500 but fills out the JSON. # Should still return 200 IMO. @@ -139,11 +144,6 @@ class Daemon(object): await asyncio.sleep(secs) secs = min(max_secs, secs * 2, 1) - def logged_url(self, url=None): - '''The host and port part, for logging.''' - url = url or self.url() - return url[url.rindex('@') + 1:] - async def _send_single(self, method, params=None): '''Send a single request to the daemon.''' def processor(result): diff --git a/electrumx/server/session.py b/electrumx/server/session.py index 37290cf..fd19dd4 100644 --- a/electrumx/server/session.py +++ b/electrumx/server/session.py @@ -383,7 +383,7 @@ class SessionManager(object): '''Replace the daemon URL.''' daemon_url = daemon_url or self.env.daemon_url try: - self.daemon.set_urls(self.env.coin.daemon_urls(daemon_url)) + self.daemon.set_url(daemon_url) except Exception as e: raise RPCError(BAD_REQUEST, f'an error occured: {e!r}') return f'now using daemon at {self.daemon.logged_url()}' From 92e8cff7702f7d3cc1af905c792650b6e9ad86ca Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Mon, 13 Aug 2018 13:26:29 +0900 Subject: [PATCH 07/14] Improve daemon API for broadcasting a tx --- electrumx/server/daemon.py | 4 ++-- electrumx/server/session.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/electrumx/server/daemon.py b/electrumx/server/daemon.py index 0136dda..c733ef9 100644 --- a/electrumx/server/daemon.py +++ b/electrumx/server/daemon.py @@ -249,9 +249,9 @@ class Daemon(object): # Convert hex strings to bytes return [hex_to_bytes(tx) if tx else None for tx in txs] - async def sendrawtransaction(self, params): + async def broadcast_transaction(self, raw_tx): '''Broadcast a transaction to the network.''' - return await self._send_single('sendrawtransaction', params) + return await self._send_single('sendrawtransaction', (raw_tx, )) async def height(self): '''Query the daemon for its current height.''' diff --git a/electrumx/server/session.py b/electrumx/server/session.py index fd19dd4..c45fb1d 100644 --- a/electrumx/server/session.py +++ b/electrumx/server/session.py @@ -535,7 +535,7 @@ class SessionManager(object): return electrum_header async def broadcast_transaction(self, raw_tx): - hex_hash = await self.daemon.sendrawtransaction([raw_tx]) + hex_hash = await self.daemon.broadcast_transaction(raw_tx) self.txs_sent += 1 return hex_hash @@ -1144,7 +1144,7 @@ class ElectrumX(SessionBase): except DaemonError as e: error, = e.args message = error['message'] - self.logger.info(f'sendrawtransaction: {message}') + self.logger.info(f'error sending transaction: {message}') raise RPCError(BAD_REQUEST, 'the transaction was rejected by ' f'network rules.\n\n{message}\n[{raw_tx}]') From 3f69595fbd176771b1c68ef2dd42351a384bb5d6 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Mon, 13 Aug 2018 14:44:35 +0900 Subject: [PATCH 08/14] Improve estimatefee API --- electrumx/server/daemon.py | 14 +++++++++----- electrumx/server/session.py | 2 +- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/electrumx/server/daemon.py b/electrumx/server/daemon.py index c733ef9..e642aa9 100644 --- a/electrumx/server/daemon.py +++ b/electrumx/server/daemon.py @@ -216,12 +216,16 @@ class Daemon(object): '''Update our record of the daemon's mempool hashes.''' return await self._send_single('getrawmempool') - async def estimatefee(self, params): - '''Return the fee estimate for the given parameters.''' + async def estimatefee(self, block_count): + '''Return the fee estimate for the block count. Units are whole + currency units per KB, e.g. 0.00000995, or -1 if no estimate + is available. + ''' + args = (block_count, ) if await self._is_rpc_available('estimatesmartfee'): - estimate = await self._send_single('estimatesmartfee', params) + estimate = await self._send_single('estimatesmartfee', args) return estimate.get('feerate', -1) - return await self._send_single('estimatefee', params) + return await self._send_single('estimatefee', args) async def getnetworkinfo(self): '''Return the result of the 'getnetworkinfo' RPC call.''' @@ -280,7 +284,7 @@ class FakeEstimateFeeDaemon(Daemon): '''Daemon that simulates estimatefee and relayfee RPC calls. Coin that wants to use this daemon must define ESTIMATE_FEE & RELAY_FEE''' - async def estimatefee(self, params): + async def estimatefee(self, block_count): '''Return the fee estimate for the given parameters.''' return self.coin.ESTIMATE_FEE diff --git a/electrumx/server/session.py b/electrumx/server/session.py index c45fb1d..c379f45 100644 --- a/electrumx/server/session.py +++ b/electrumx/server/session.py @@ -1088,7 +1088,7 @@ class ElectrumX(SessionBase): number: the number of blocks ''' number = non_negative_integer(number) - return await self.daemon_request('estimatefee', [number]) + return await self.daemon_request('estimatefee', number) async def ping(self): '''Serves as a connection keep-alive mechanism and for the client to From ab2691563f7c2c9993d4127df87379836084c5a6 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Mon, 13 Aug 2018 18:33:26 +0900 Subject: [PATCH 09/14] Improve daemon error handling --- electrumx/server/daemon.py | 59 +++++++++++++++++++++----------------- 1 file changed, 32 insertions(+), 27 deletions(-) diff --git a/electrumx/server/daemon.py b/electrumx/server/daemon.py index e642aa9..5a7ee99 100644 --- a/electrumx/server/daemon.py +++ b/electrumx/server/daemon.py @@ -29,6 +29,10 @@ class DaemonError(Exception): '''Raised when the daemon returns an error in its results.''' +class WorkQueueFullError(Exception): + pass + + class Daemon(object): '''Handles connections to a daemon at the given URL.''' @@ -87,12 +91,16 @@ class Daemon(object): async with self.workqueue_semaphore: async with self.client_session() as session: async with session.post(self.current_url(), data=data) as resp: - # If bitcoind can't find a tx, for some reason - # it returns 500 but fills out the JSON. - # Should still return 200 IMO. - if resp.status in (200, 404, 500): + kind = resp.headers.get('Content-Type', None) + if kind == 'application/json': return await resp.json() - return (resp.status, resp.reason) + # bitcoind's HTTP protocol "handling" is a bad joke + text = await resp.text() + if 'Work queue depth exceeded' in text: + raise WorkQueueFullError + text = text.strip() or resp.reason + self.logger.error(text) + raise DaemonError(text) async def _send(self, payload, processor): '''Send a payload to be converted to JSON. @@ -101,48 +109,45 @@ class Daemon(object): are raise through DaemonError. ''' def log_error(error): - nonlocal down, last_error_time - down = True + nonlocal last_error_log, secs now = time.time() - prior_time = last_error_time - if now - prior_time > 60: + if now - last_error_log > 60: last_error_time = now - if prior_time and self.failover(): - secs = 0 - else: - self.logger.error(f'{error} Retrying occasionally...') + self.logger.error(f'{error} Retrying occasionally...') + if secs == max_secs and self.failover(): + secs = 0.25 - down = False - last_error_time = 0 + on_good_message = None + last_error_log = 0 data = json.dumps(payload) - secs = 1 + secs = 0.25 max_secs = 4 while True: try: result = await self._send_data(data) - if not isinstance(result, tuple): - result = processor(result) - if down: - self.logger.info('connection restored') - return result - log_error(f'HTTP error code {result[0]}: {result[1]}') + result = processor(result) + if on_good_message: + self.logger.info(on_good_message) + return result except asyncio.TimeoutError: log_error('timeout error.') except aiohttp.ServerDisconnectedError: log_error('disconnected.') + on_good_message = 'connection restored' except aiohttp.ClientPayloadError: log_error('payload encoding error.') except aiohttp.ClientConnectionError: log_error('connection problem - is your daemon running?') + on_good_message = 'connection restored' except self.DaemonWarmingUpError: log_error('starting up checking blocks.') - except (asyncio.CancelledError, DaemonError): - raise - except Exception as e: - self.logger.exception(f'uncaught exception: {e}') + on_good_message = 'running normally' + except WorkQueueFullError: + log_error('work queue full.') + on_good_message = 'running normally' await asyncio.sleep(secs) - secs = min(max_secs, secs * 2, 1) + secs = min(max_secs, secs * 2) async def _send_single(self, method, params=None): '''Send a single request to the daemon.''' From b087d1492b2be00eb92289f21113649b080ebddc Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Mon, 13 Aug 2018 18:48:07 +0900 Subject: [PATCH 10/14] Controller tests daemon connectivity and auth first Server base doesn't need a task group --- electrumx/lib/server_base.py | 13 ++++++------- electrumx/server/block_processor.py | 5 +---- electrumx/server/controller.py | 5 ++++- 3 files changed, 11 insertions(+), 12 deletions(-) diff --git a/electrumx/lib/server_base.py b/electrumx/lib/server_base.py index db7a213..f3688d6 100644 --- a/electrumx/lib/server_base.py +++ b/electrumx/lib/server_base.py @@ -13,7 +13,7 @@ import sys import time from functools import partial -from aiorpcx import TaskGroup +from aiorpcx import spawn from electrumx.lib.util import class_logger @@ -93,12 +93,11 @@ class ServerBase(object): loop.set_exception_handler(self.on_exception) shutdown_event = asyncio.Event() - async with TaskGroup() as group: - server_task = await group.spawn(self.serve(shutdown_event)) - # Wait for shutdown, log on receipt of the event - await shutdown_event.wait() - self.logger.info('shutting down') - server_task.cancel() + server_task = await spawn(self.serve(shutdown_event)) + # Wait for shutdown, log on receipt of the event + await shutdown_event.wait() + self.logger.info('shutting down') + server_task.cancel() # Prevent some silly logs await asyncio.sleep(0.01) diff --git a/electrumx/server/block_processor.py b/electrumx/server/block_processor.py index bea879e..e658fe5 100644 --- a/electrumx/server/block_processor.py +++ b/electrumx/server/block_processor.py @@ -650,10 +650,7 @@ class BlockProcessor(object): could be lost. ''' self._caught_up_event = caught_up_event - async with TaskGroup() as group: - await group.spawn(self._first_open_dbs()) - # Ensure cached_height is set - await group.spawn(self.daemon.height()) + await self._first_open_dbs() try: async with TaskGroup() as group: await group.spawn(self.prefetcher.main_loop(self.height)) diff --git a/electrumx/server/controller.py b/electrumx/server/controller.py index 6588ea9..09d022b 100644 --- a/electrumx/server/controller.py +++ b/electrumx/server/controller.py @@ -112,10 +112,13 @@ class Controller(ServerBase): session_mgr = SessionManager(env, db, bp, daemon, mempool, notifications, shutdown_event) + # Test daemon authentication, and also ensure it has a cached + # height. Do this before entering the task group. + await daemon.height() + caught_up_event = Event() serve_externally_event = Event() synchronized_event = Event() - async with TaskGroup() as group: await group.spawn(session_mgr.serve(serve_externally_event)) await group.spawn(bp.fetch_and_process_blocks(caught_up_event)) From ab0e9eb1236d1a5130c7560b9ba89db7f71b2310 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Mon, 13 Aug 2018 19:17:52 +0900 Subject: [PATCH 11/14] Improve retry logic --- electrumx/server/daemon.py | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/electrumx/server/daemon.py b/electrumx/server/daemon.py index 5a7ee99..ecad41c 100644 --- a/electrumx/server/daemon.py +++ b/electrumx/server/daemon.py @@ -42,14 +42,17 @@ class Daemon(object): class DaemonWarmingUpError(Exception): '''Raised when the daemon returns an error in its results.''' - def __init__(self, coin, url, max_workqueue=10): + def __init__(self, coin, url, max_workqueue=10, init_retry=0.25, + max_retry=4.0): self.coin = coin self.logger = class_logger(__name__, self.__class__.__name__) self.set_url(url) - self._height = None # Limit concurrent RPC calls to this number. # See DEFAULT_HTTP_WORKQUEUE in bitcoind, which is typically 16 self.workqueue_semaphore = asyncio.Semaphore(value=max_workqueue) + self.init_retry = init_retry + self.max_retry = max_retry + self._height = None self.available_rpcs = {} def set_url(self, url): @@ -109,19 +112,18 @@ class Daemon(object): are raise through DaemonError. ''' def log_error(error): - nonlocal last_error_log, secs + nonlocal last_error_log, retry now = time.time() if now - last_error_log > 60: last_error_time = now self.logger.error(f'{error} Retrying occasionally...') - if secs == max_secs and self.failover(): - secs = 0.25 + if retry == self.max_retry and self.failover(): + retry = 0 on_good_message = None last_error_log = 0 data = json.dumps(payload) - secs = 0.25 - max_secs = 4 + retry = self.init_retry while True: try: result = await self._send_data(data) @@ -146,8 +148,8 @@ class Daemon(object): log_error('work queue full.') on_good_message = 'running normally' - await asyncio.sleep(secs) - secs = min(max_secs, secs * 2) + await asyncio.sleep(retry) + retry = max(min(self.max_retry, retry * 2), self.init_retry) async def _send_single(self, method, params=None): '''Send a single request to the daemon.''' From 1b618d19d40a537bdbc3cfa3019952e4029a1f3e Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Mon, 13 Aug 2018 22:37:33 +0900 Subject: [PATCH 12/14] Remove dead code At least, I believe it is --- electrumx/server/daemon.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/electrumx/server/daemon.py b/electrumx/server/daemon.py index ecad41c..92fe05f 100644 --- a/electrumx/server/daemon.py +++ b/electrumx/server/daemon.py @@ -136,8 +136,6 @@ class Daemon(object): except aiohttp.ServerDisconnectedError: log_error('disconnected.') on_good_message = 'connection restored' - except aiohttp.ClientPayloadError: - log_error('payload encoding error.') except aiohttp.ClientConnectionError: log_error('connection problem - is your daemon running?') on_good_message = 'connection restored' From 6950fca7cbf9a26d6dc9e98ebfa6df1a02be6d08 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Mon, 13 Aug 2018 19:24:59 +0900 Subject: [PATCH 13/14] Make WarmingUpError file-scope --- electrumx/server/daemon.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/electrumx/server/daemon.py b/electrumx/server/daemon.py index 92fe05f..c21bc5a 100644 --- a/electrumx/server/daemon.py +++ b/electrumx/server/daemon.py @@ -29,8 +29,12 @@ class DaemonError(Exception): '''Raised when the daemon returns an error in its results.''' +class WarmingUpError(Exception): + '''Internal - when the daemon is warming up.''' + + class WorkQueueFullError(Exception): - pass + '''Internal - when the daemon's work queue is full.''' class Daemon(object): @@ -39,9 +43,6 @@ class Daemon(object): WARMING_UP = -28 id_counter = itertools.count() - class DaemonWarmingUpError(Exception): - '''Raised when the daemon returns an error in its results.''' - def __init__(self, coin, url, max_workqueue=10, init_retry=0.25, max_retry=4.0): self.coin = coin @@ -139,7 +140,7 @@ class Daemon(object): except aiohttp.ClientConnectionError: log_error('connection problem - is your daemon running?') on_good_message = 'connection restored' - except self.DaemonWarmingUpError: + except WarmingUpError: log_error('starting up checking blocks.') on_good_message = 'running normally' except WorkQueueFullError: @@ -156,7 +157,7 @@ class Daemon(object): if not err: return result['result'] if err.get('code') == self.WARMING_UP: - raise self.DaemonWarmingUpError + raise WarmingUpError raise DaemonError(err) payload = {'method': method, 'id': next(self.id_counter)} @@ -173,7 +174,7 @@ class Daemon(object): def processor(result): errs = [item['error'] for item in result if item['error']] if any(err.get('code') == self.WARMING_UP for err in errs): - raise self.DaemonWarmingUpError + raise WarmingUpError if not errs or replace_errs: return [item['result'] for item in result] raise DaemonError(errs) From 374ec8f26c6104282ae25317b0ec21c4414640dc Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Mon, 13 Aug 2018 18:39:11 +0900 Subject: [PATCH 14/14] Add daemon tests --- tests/server/test_daemon.py | 489 ++++++++++++++++++++++++++++++++++++ 1 file changed, 489 insertions(+) create mode 100644 tests/server/test_daemon.py diff --git a/tests/server/test_daemon.py b/tests/server/test_daemon.py new file mode 100644 index 0000000..2a905a9 --- /dev/null +++ b/tests/server/test_daemon.py @@ -0,0 +1,489 @@ +import aiohttp +import asyncio +import json +import logging + +import pytest + +from aiorpcx import ( + JSONRPCv1, JSONRPCLoose, RPCError, ignore_after, + Request, Batch, +) +from electrumx.lib.coins import BitcoinCash, CoinError, Bitzeny +from electrumx.server.daemon import ( + Daemon, FakeEstimateFeeDaemon, DaemonError +) + + +coin = BitcoinCash + +# These should be full, canonical URLs +urls = ['http://rpc_user:rpc_pass@127.0.0.1:8332/', + 'http://rpc_user:rpc_pass@192.168.0.1:8332/'] + + +@pytest.fixture(params=[BitcoinCash, Bitzeny]) +def daemon(request): + coin = request.param + return coin.DAEMON(coin, ','.join(urls)) + + +class ResponseBase(object): + + def __init__(self, headers, status): + self.headers = headers + self.status = status + + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc_value, traceback): + pass + + +class JSONResponse(ResponseBase): + + def __init__(self, result, msg_id, status=200): + super().__init__({'Content-Type': 'application/json'}, status) + self.result = result + self.msg_id = msg_id + + async def json(self): + if isinstance(self.msg_id, int): + message = JSONRPCv1.response_message(self.result, self.msg_id) + else: + parts = [JSONRPCv1.response_message(item, msg_id) + for item, msg_id in zip(self.result, self.msg_id)] + message = JSONRPCv1.batch_message_from_parts(parts) + return json.loads(message.decode()) + + +class HTMLResponse(ResponseBase): + + def __init__(self, text, reason, status): + super().__init__({'Content-Type': 'text/html; charset=ISO-8859-1'}, + status) + self._text = text + self.reason = reason + + async def text(self): + return self._text + + +class ClientSessionBase(object): + + def __enter__(self): + self.prior_class = aiohttp.ClientSession + aiohttp.ClientSession = lambda: self + + def __exit__(self, exc_type, exc_value, traceback): + aiohttp.ClientSession = self.prior_class + + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc_value, traceback): + pass + + +class ClientSessionGood(ClientSessionBase): + '''Imitate aiohttp for testing purposes.''' + + def __init__(self, *triples): + self.triples = triples # each a (method, args, result) + self.count = 0 + self.expected_url = urls[0] + + def post(self, url, data=""): + assert url == self.expected_url + request, request_id = JSONRPCLoose.message_to_item(data.encode()) + method, args, result = self.triples[self.count] + self.count += 1 + if isinstance(request, Request): + assert request.method == method + assert request.args == args + return JSONResponse(result, request_id) + else: + assert isinstance(request, Batch) + for request, args in zip(request, args): + assert request.method == method + assert request.args == args + return JSONResponse(result, request_id) + + +class ClientSessionBadAuth(ClientSessionBase): + + def post(self, url, data=""): + return HTMLResponse('', 'Unauthorized', 401) + + +class ClientSessionWorkQueueFull(ClientSessionGood): + + def post(self, url, data=""): + self.post = super().post + return HTMLResponse('Work queue depth exceeded', + 'Internal server error', 500) + + +class ClientSessionNoConnection(ClientSessionGood): + + def __init__(self, *args): + self.args = args + + async def __aenter__(self): + aiohttp.ClientSession = lambda: ClientSessionGood(*self.args) + raise aiohttp.ClientConnectionError + + +class ClientSessionPostError(ClientSessionGood): + + def __init__(self, exception, *args): + self.exception = exception + self.args = args + + def post(self, url, data=""): + aiohttp.ClientSession = lambda: ClientSessionGood(*self.args) + raise self.exception + + +class ClientSessionFailover(ClientSessionGood): + + def post(self, url, data=""): + # If not failed over; simulate disconnecting + if url == self.expected_url: + raise aiohttp.ServerDisconnectedError + else: + self.expected_url = urls[1] + return super().post(url, data) + + +def in_caplog(caplog, message, count=1): + return sum(message in record.message + for record in caplog.records) == count + +# +# Tests +# + +def test_set_urls_bad(): + with pytest.raises(CoinError): + Daemon(coin, '') + with pytest.raises(CoinError): + Daemon(coin, 'a') + + +def test_set_urls_one(caplog): + with caplog.at_level(logging.INFO): + daemon = Daemon(coin, urls[0]) + assert daemon.current_url() == urls[0] + assert len(daemon.urls) == 1 + logged_url = daemon.logged_url() + assert logged_url == '127.0.0.1:8332/' + assert in_caplog(caplog, f'daemon #1 at {logged_url} (current)') + + +def test_set_urls_two(caplog): + with caplog.at_level(logging.INFO): + daemon = Daemon(coin, ','.join(urls)) + assert daemon.current_url() == urls[0] + assert len(daemon.urls) == 2 + logged_url = daemon.logged_url() + assert logged_url == '127.0.0.1:8332/' + assert in_caplog(caplog, f'daemon #1 at {logged_url} (current)') + assert in_caplog(caplog, 'daemon #2 at 192.168.0.1:8332') + + +def test_set_urls_short(): + no_prefix_urls = ['/'.join(part for part in url.split('/')[2:]) + for url in urls] + daemon = Daemon(coin, ','.join(no_prefix_urls)) + assert daemon.current_url() == urls[0] + assert len(daemon.urls) == 2 + + no_slash_urls = [url[:-1] for url in urls] + daemon = Daemon(coin, ','.join(no_slash_urls)) + assert daemon.current_url() == urls[0] + assert len(daemon.urls) == 2 + + no_port_urls = [url[:url.rfind(':')] for url in urls] + daemon = Daemon(coin, ','.join(no_port_urls)) + assert daemon.current_url() == urls[0] + assert len(daemon.urls) == 2 + + +def test_failover_good(caplog): + daemon = Daemon(coin, ','.join(urls)) + with caplog.at_level(logging.INFO): + result = daemon.failover() + assert result is True + assert daemon.current_url() == urls[1] + logged_url = daemon.logged_url() + assert in_caplog(caplog, f'failing over to {logged_url}') + # And again + result = daemon.failover() + assert result is True + assert daemon.current_url() == urls[0] + + +def test_failover_fail(caplog): + daemon = Daemon(coin, urls[0]) + with caplog.at_level(logging.INFO): + result = daemon.failover() + assert result is False + assert daemon.current_url() == urls[0] + assert not in_caplog(caplog, f'failing over') + + +@pytest.mark.asyncio +async def test_height(daemon): + assert daemon.cached_height() is None + height = 300 + with ClientSessionGood(('getblockcount', [], height)): + assert await daemon.height() == height + assert daemon.cached_height() == height + + +@pytest.mark.asyncio +async def test_broadcast_transaction(daemon): + raw_tx = 'deadbeef' + tx_hash = 'hash' + with ClientSessionGood(('sendrawtransaction', [raw_tx], tx_hash)): + assert await daemon.broadcast_transaction(raw_tx) == tx_hash + + +@pytest.mark.asyncio +async def test_relayfee(daemon): + response = {"relayfee": sats, "other:": "cruft"} + with ClientSessionGood(('getnetworkinfo', [], response)): + assert await daemon.getnetworkinfo() == response + + +@pytest.mark.asyncio +async def test_relayfee(daemon): + if isinstance(daemon, FakeEstimateFeeDaemon): + sats = daemon.coin.ESTIMATE_FEE + else: + sats = 2 + response = {"relayfee": sats, "other:": "cruft"} + with ClientSessionGood(('getnetworkinfo', [], response)): + assert await daemon.relayfee() == sats + + +@pytest.mark.asyncio +async def test_mempool_hashes(daemon): + hashes = ['hex_hash1', 'hex_hash2'] + with ClientSessionGood(('getrawmempool', [], hashes)): + assert await daemon.mempool_hashes() == hashes + + +@pytest.mark.asyncio +async def test_deserialised_block(daemon): + block_hash = 'block_hash' + result = {'some': 'mess'} + with ClientSessionGood(('getblock', [block_hash, True], result)): + assert await daemon.deserialised_block(block_hash) == result + + +@pytest.mark.asyncio +async def test_estimatefee(daemon): + method_not_found = RPCError(JSONRPCv1.METHOD_NOT_FOUND, 'nope') + if isinstance(daemon, FakeEstimateFeeDaemon): + result = daemon.coin.ESTIMATE_FEE + else: + result = -1 + with ClientSessionGood( + ('estimatesmartfee', [], method_not_found), + ('estimatefee', [2], result) + ): + assert await daemon.estimatefee(2) == result + + +@pytest.mark.asyncio +async def test_estimatefee_smart(daemon): + bad_args = RPCError(JSONRPCv1.INVALID_ARGS, 'bad args') + if isinstance(daemon, FakeEstimateFeeDaemon): + return + rate = 0.0002 + result = {'feerate': rate} + with ClientSessionGood( + ('estimatesmartfee', [], bad_args), + ('estimatesmartfee', [2], result) + ): + assert await daemon.estimatefee(2) == rate + + # Test the rpc_available_cache is used + with ClientSessionGood(('estimatesmartfee', [2], result)): + assert await daemon.estimatefee(2) == rate + + +@pytest.mark.asyncio +async def test_getrawtransaction(daemon): + hex_hash = 'deadbeef' + simple = 'tx_in_hex' + verbose = {'hex': hex_hash, 'other': 'cruft'} + # Test False is converted to 0 - old daemon's reject False + with ClientSessionGood(('getrawtransaction', [hex_hash, 0], simple)): + assert await daemon.getrawtransaction(hex_hash) == simple + + # Test True is converted to 1 + with ClientSessionGood(('getrawtransaction', [hex_hash, 1], verbose)): + assert await daemon.getrawtransaction( + hex_hash, True) == verbose + + +# Batch tests + +@pytest.mark.asyncio +async def test_empty_send(daemon): + first = 5 + count = 0 + with ClientSessionGood(('getblockhash', [], [])): + assert await daemon.block_hex_hashes(first, count) == [] + + +@pytest.mark.asyncio +async def test_block_hex_hashes(daemon): + first = 5 + count = 3 + hashes = [f'hex_hash{n}' for n in range(count)] + with ClientSessionGood(('getblockhash', + [[n] for n in range(first, first + count)], + hashes)): + assert await daemon.block_hex_hashes(first, count) == hashes + + +@pytest.mark.asyncio +async def test_raw_blocks(daemon): + count = 3 + hex_hashes = [f'hex_hash{n}' for n in range(count)] + args_list = [[hex_hash, False] for hex_hash in hex_hashes] + iterable = (hex_hash for hex_hash in hex_hashes) + blocks = ["00", "019a", "02fe"] + blocks_raw = [bytes.fromhex(block) for block in blocks] + with ClientSessionGood(('getblock', args_list, blocks)): + assert await daemon.raw_blocks(iterable) == blocks_raw + + +@pytest.mark.asyncio +async def test_get_raw_transactions(daemon): + hex_hashes = ['deadbeef0', 'deadbeef1'] + args_list = [[hex_hash, 0] for hex_hash in hex_hashes] + raw_txs_hex = ['fffefdfc', '0a0b0c0d'] + raw_txs = [bytes.fromhex(raw_tx) for raw_tx in raw_txs_hex] + # Test 0 - old daemon's reject False + with ClientSessionGood(('getrawtransaction', args_list, raw_txs_hex)): + assert await daemon.getrawtransactions(hex_hashes) == raw_txs + + # Test one error + tx_not_found = RPCError(-1, 'some error message') + results = ['ff0b7d', tx_not_found] + raw_txs = [bytes.fromhex(results[0]), None] + with ClientSessionGood(('getrawtransaction', args_list, results)): + assert await daemon.getrawtransactions(hex_hashes) == raw_txs + + +# Other tests + +@pytest.mark.asyncio +async def test_bad_auth(daemon, caplog): + with pytest.raises(DaemonError) as e: + with ClientSessionBadAuth(): + await daemon.height() + + assert "Unauthorized" in e.value.args[0] + assert in_caplog(caplog, "Unauthorized") + + +@pytest.mark.asyncio +async def test_workqueue_depth(daemon, caplog): + daemon.init_retry = 0.01 + height = 125 + with caplog.at_level(logging.INFO): + with ClientSessionWorkQueueFull(('getblockcount', [], height)): + await daemon.height() == height + + assert in_caplog(caplog, "work queue full") + assert in_caplog(caplog, "running normally") + + +@pytest.mark.asyncio +async def test_connection_error(daemon, caplog): + height = 100 + daemon.init_retry = 0.01 + with caplog.at_level(logging.INFO): + with ClientSessionNoConnection(('getblockcount', [], height)): + await daemon.height() == height + + assert in_caplog(caplog, "connection problem - is your daemon running?") + assert in_caplog(caplog, "connection restored") + + +@pytest.mark.asyncio +async def test_timeout_error(daemon, caplog): + height = 100 + daemon.init_retry = 0.01 + with caplog.at_level(logging.INFO): + with ClientSessionPostError(asyncio.TimeoutError, + ('getblockcount', [], height)): + await daemon.height() == height + + assert in_caplog(caplog, "timeout error") + + +@pytest.mark.asyncio +async def test_disconnected(daemon, caplog): + height = 100 + daemon.init_retry = 0.01 + with caplog.at_level(logging.INFO): + with ClientSessionPostError(aiohttp.ServerDisconnectedError, + ('getblockcount', [], height)): + await daemon.height() == height + + assert in_caplog(caplog, "disconnected") + assert in_caplog(caplog, "connection restored") + + +@pytest.mark.asyncio +async def test_warming_up(daemon, caplog): + warming_up = RPCError(-28, 'reading block index') + height = 100 + daemon.init_retry = 0.01 + with caplog.at_level(logging.INFO): + with ClientSessionGood( + ('getblockcount', [], warming_up), + ('getblockcount', [], height) + ): + assert await daemon.height() == height + + assert in_caplog(caplog, "starting up checking blocks") + assert in_caplog(caplog, "running normally") + + +@pytest.mark.asyncio +async def test_warming_up_batch(daemon, caplog): + warming_up = RPCError(-28, 'reading block index') + first = 5 + count = 1 + daemon.init_retry = 0.01 + hashes = ['hex_hash5'] + with caplog.at_level(logging.INFO): + with ClientSessionGood(('getblockhash', [[first]], [warming_up]), + ('getblockhash', [[first]], hashes)): + assert await daemon.block_hex_hashes(first, count) == hashes + + assert in_caplog(caplog, "starting up checking blocks") + assert in_caplog(caplog, "running normally") + + +@pytest.mark.asyncio +async def test_failover(daemon, caplog): + height = 100 + daemon.init_retry = 0.01 + daemon.max_retry = 0.04 + with caplog.at_level(logging.INFO): + with ClientSessionFailover(('getblockcount', [], height)): + await daemon.height() == height + + assert in_caplog(caplog, "disconnected", 3) + assert in_caplog(caplog, "failing over") + assert in_caplog(caplog, "connection restored")