From c715ae624966641bce847ccedd5dff19f86b3b57 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Sat, 21 Jul 2018 08:39:56 +0800 Subject: [PATCH] Rework mempool and notification code Clarifies the mempool interface to look more like what it would in its own process --- docs/changelog.rst | 1 + electrumx/server/block_processor.py | 12 ++- electrumx/server/chain_state.py | 18 ++-- electrumx/server/controller.py | 58 +++++++++- electrumx/server/daemon.py | 9 -- electrumx/server/mempool.py | 158 +++++++++++++--------------- electrumx/server/session.py | 10 +- 7 files changed, 146 insertions(+), 120 deletions(-) diff --git a/docs/changelog.rst b/docs/changelog.rst index 2606757..2c89dab 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -14,6 +14,7 @@ Version 1.6.1 (in progress) ============================ * cleaner shutdown process with clear guarantees +* cleaner mempool and notification handling * aiohttp min version requirement raised to 2.0 * onion peers are ignored if no tor proxy is available * add Motion coin (ocruzv) diff --git a/electrumx/server/block_processor.py b/electrumx/server/block_processor.py index 0aa7872..04811df 100644 --- a/electrumx/server/block_processor.py +++ b/electrumx/server/block_processor.py @@ -152,11 +152,12 @@ class BlockProcessor(electrumx.server.db.DB): Coordinate backing up in case of chain reorganisations. ''' - def __init__(self, env, tasks, daemon): + def __init__(self, env, tasks, daemon, notifications): super().__init__(env) self.tasks = tasks self.daemon = daemon + self.notifications = notifications # Work queue self.queue = asyncio.Queue() @@ -168,7 +169,6 @@ class BlockProcessor(electrumx.server.db.DB): self.next_cache_check = 0 self.last_flush = time.time() self.touched = set() - self.callbacks = [] # Header merkle cache self.merkle = Merkle() @@ -226,9 +226,9 @@ class BlockProcessor(electrumx.server.db.DB): self.logger.info('processed {:,d} block{} in {:.1f}s' .format(len(blocks), s, time.time() - start)) - for callback in self.callbacks: - callback(self.touched) - self.touched.clear() + if self._caught_up_event.is_set(): + await self.notifications.on_block(self.touched, self.height) + self.touched = set() elif hprevs[0] != chain[0]: await self.reorg_chain() else: @@ -758,6 +758,8 @@ class BlockProcessor(electrumx.server.db.DB): await self.check_and_advance_blocks(raw_blocks, first) elif work == PREFETCHER_CAUGHT_UP: self._caught_up_event.set() + # Initialise the notification framework + await self.notifications.on_block(set(), self.height) elif work == REORG_CHAIN: count, = args await self.reorg_chain(count) diff --git a/electrumx/server/chain_state.py b/electrumx/server/chain_state.py index 77b04b4..5609ed8 100644 --- a/electrumx/server/chain_state.py +++ b/electrumx/server/chain_state.py @@ -17,18 +17,17 @@ class ChainState(object): blocks, transaction history, UTXOs and the mempool. ''' - def __init__(self, env, tasks): + def __init__(self, env, tasks, notifications): self._env = env self._tasks = tasks self._daemon = env.coin.DAEMON(env) BlockProcessor = env.coin.BLOCK_PROCESSOR - self._bp = BlockProcessor(env, tasks, self._daemon) - self._mempool = MemPool(env.coin, self, tasks, - self._bp.add_new_block_callback) + self._bp = BlockProcessor(env, tasks, self._daemon, notifications) + self._mempool = MemPool(env.coin, self, tasks, notifications) self._history_cache = pylru.lrucache(256) # External interface: pass-throughs for mempool.py - self.cached_mempool_hashes = self._daemon.cached_mempool_hashes + self.cached_height = self._daemon.cached_height self.getrawtransactions = self._daemon.getrawtransactions self.utxo_lookup = self._bp.db_utxo_lookup # External interface pass-throughs for session.py @@ -44,7 +43,7 @@ class ChainState(object): async def broadcast_transaction(self, raw_tx): return await self._daemon.sendrawtransaction([raw_tx]) - async def daemon_request(self, method, args): + async def daemon_request(self, method, args=()): return await getattr(self._daemon, method)(*args) def db_height(self): @@ -109,9 +108,4 @@ class ChainState(object): async def wait_for_mempool(self): await self._bp.catch_up_to_daemon() - # Tell the daemon to fetch the mempool going forwards, trigger - # an initial fetch, and wait for the mempool to synchronize - mempool_refresh_event = asyncio.Event() - self._daemon._mempool_refresh_event = mempool_refresh_event - self._tasks.create_task(self._daemon.height()) - await self._mempool.start_and_wait(mempool_refresh_event) + await self._mempool.start_and_wait_for_sync() diff --git a/electrumx/server/controller.py b/electrumx/server/controller.py index 02e16a3..c182d1e 100644 --- a/electrumx/server/controller.py +++ b/electrumx/server/controller.py @@ -15,6 +15,58 @@ from electrumx.server.peers import PeerManager from electrumx.server.session import SessionManager +class Notifications(object): + # hashX notifications come from two sources: new blocks and + # mempool refreshes. The logic in daemon.py only gets new mempool + # hashes after getting the latest height. + # + # A user with a pending transaction is notified after the block it + # gets in is processed. Block processing can take an extended + # time, and any mempool refreshes during that time will not have + # the transaction in the mempool any more, which would cause a + # redundant notification. To avoid this, mempool touches are not + # notified whilst a block is being processed, but combined with + # the block notification when it is made. We do not pause mempool + # processing + + def __init__(self): + self._touched_mp = {} + self._touched_bp = {} + self._highest_block = 0 + + async def _maybe_notify(self): + tmp, tbp = self._touched_mp, self._touched_bp + common = set(tmp).intersection(tbp) + if common: + height = max(common) + elif tmp and max(tmp) == self._highest_block: + height = self._highest_block + else: + # Either we are processing a block and waiting for it to + # come in, or we have had no mempool update for the + # current block + return + touched = tmp.pop(height) + touched.update(tbp.pop(height, set())) + for old in [h for h in tmp if h <= height]: + del tmp[old] + for old in [h for h in tbp if h <= height]: + del tbp[old] + await self.notify_sessions(touched, height) + + async def on_mempool(self, touched, height): + self._touched_mp[height] = touched + await self._maybe_notify() + + async def on_block(self, touched, height): + self._touched_bp[height] = touched + self._highest_block = height + await self._maybe_notify() + + async def notify_sessions(self, touched, height): + pass + + class Controller(ServerBase): '''Manages server initialisation and stutdown. @@ -39,10 +91,12 @@ class Controller(ServerBase): self.logger.info(f'event loop policy: {env.loop_policy}') self.logger.info(f'reorg limit is {env.reorg_limit:,d} blocks') - self.chain_state = ChainState(env, self.tasks) + notifications = Notifications() + self.chain_state = ChainState(env, self.tasks, notifications) self.peer_mgr = PeerManager(env, self.tasks, self.chain_state) self.session_mgr = SessionManager(env, self.tasks, self.chain_state, - self.peer_mgr, self.shutdown_event) + self.peer_mgr, notifications, + self.shutdown_event) async def start_servers(self): '''Start the RPC server and wait for the mempool to synchronize. Then diff --git a/electrumx/server/daemon.py b/electrumx/server/daemon.py index 2425867..67f4e61 100644 --- a/electrumx/server/daemon.py +++ b/electrumx/server/daemon.py @@ -40,8 +40,6 @@ class Daemon(object): self.coin = env.coin self.set_urls(env.coin.daemon_urls(env.daemon_url)) self._height = None - self._mempool_hashes = set() - self._mempool_refresh_event = None # Limit concurrent RPC calls to this number. # See DEFAULT_HTTP_WORKQUEUE in bitcoind, which is typically 16 self.workqueue_semaphore = asyncio.Semaphore(value=10) @@ -275,15 +273,8 @@ class Daemon(object): async def height(self): '''Query the daemon for its current height.''' self._height = await self._send_single('getblockcount') - if self._mempool_refresh_event: - self._mempool_hashes = set(await self.mempool_hashes()) - self._mempool_refresh_event.set() return self._height - def cached_mempool_hashes(self): - '''Return the cached mempool hashes.''' - return self._mempool_hashes - def cached_height(self): '''Return the cached daemon height. diff --git a/electrumx/server/mempool.py b/electrumx/server/mempool.py index b2b12cb..671ad6e 100644 --- a/electrumx/server/mempool.py +++ b/electrumx/server/mempool.py @@ -14,7 +14,6 @@ from collections import defaultdict from electrumx.lib.hash import hash_to_hex_str, hex_str_to_hash from electrumx.lib.util import class_logger -from electrumx.server.daemon import DaemonError from electrumx.server.db import UTXO, DB @@ -32,31 +31,91 @@ class MemPool(object): A pair is a (hashX, value) tuple. tx hashes are hex strings. ''' - def __init__(self, coin, chain_state, tasks, add_new_block_callback): + def __init__(self, coin, chain_state, tasks, notifications): self.logger = class_logger(__name__, self.__class__.__name__) self.coin = coin self.chain_state = chain_state self.tasks = tasks - self.touched = set() - self.stop = False + self.notifications = notifications self.txs = {} self.hashXs = defaultdict(set) # None can be a key self.fee_histogram = defaultdict(int) self.compact_fee_histogram = [] self.histogram_time = 0 - add_new_block_callback(self.on_new_block) + self.next_log = 0 - async def start_and_wait(self, mempool_refresh_event): + async def start_and_wait_for_sync(self): '''Creates the mempool synchronization task, and waits for it to first synchronize before returning.''' self.logger.info('beginning processing of daemon mempool. ' 'This can take some time...') - synchronized = asyncio.Event() - self.tasks.create_task(self._synchronize( - mempool_refresh_event, synchronized)) - await synchronized.wait() + await self._synchronize(True) + self.tasks.create_task(self._synchronize_forever()) - def _resync_daemon_hashes(self, unprocessed, unfetched): + async def _synchronize_forever(self): + while True: + await asyncio.sleep(5) + await self._synchronize(False) + + async def _refresh_hashes(self): + '''Return daemon hashes when we're sure which height they are + good for.''' + height = self.chain_state.cached_height() + daemon_request = self.chain_state.daemon_request + while True: + hashes = await daemon_request('mempool_hashes') + later_height = await daemon_request('height') + if height == later_height: + return set(hashes), height + height = later_height + + async def _synchronize(self, first_time): + '''Asynchronously maintain mempool status with daemon. + + Processes the mempool each time the mempool refresh event is + signalled. + ''' + unprocessed = {} + unfetched = set() + touched = set() + txs = self.txs + next_refresh = 0 + fetch_size = 800 + process_some = self._async_process_some(fetch_size // 2) + + while True: + now = time.time() + # If processing a large mempool, a block being found might + # shrink our work considerably, so refresh our view every 20s + if now > next_refresh: + hashes, height = await self._refresh_hashes() + self._resync_hashes(hashes, unprocessed, unfetched, touched) + next_refresh = time.time() + 20 + + # Log progress of initial sync + todo = len(unfetched) + len(unprocessed) + if first_time: + pct = (len(txs) - todo) * 100 // len(txs) if txs else 0 + self.logger.info(f'catchup {pct:d}% complete ' + f'({todo:,d} txs left)') + if not todo: + break + + # FIXME: parallelize + if unfetched: + count = min(len(unfetched), fetch_size) + hex_hashes = [unfetched.pop() for n in range(count)] + unprocessed.update(await self.fetch_raw_txs(hex_hashes)) + if unprocessed: + await process_some(unprocessed, touched) + + if now >= self.next_log: + self.logger.info('{:,d} txs touching {:,d} addresses' + .format(len(txs), len(self.hashXs))) + self.next_log = time.time() + 150 + await self.notifications.on_mempool(touched, height) + + def _resync_hashes(self, hashes, unprocessed, unfetched, touched): '''Re-sync self.txs with the list of hashes in the daemon's mempool. Additionally, remove gone hashes from unprocessed and @@ -64,10 +123,7 @@ class MemPool(object): ''' txs = self.txs hashXs = self.hashXs - touched = self.touched fee_hist = self.fee_histogram - - hashes = self.chain_state.cached_mempool_hashes() gone = set(txs).difference(hashes) for hex_hash in gone: unfetched.discard(hex_hash) @@ -92,69 +148,12 @@ class MemPool(object): for hex_hash in new: txs[hex_hash] = None - async def _synchronize(self, mempool_refresh_event, synchronized): - '''Asynchronously maintain mempool status with daemon. - - Processes the mempool each time the mempool refresh event is - signalled. - ''' - unprocessed = {} - unfetched = set() - txs = self.txs - fetch_size = 800 - process_some = self._async_process_some(fetch_size // 2) - next_log = 0 - loops = -1 # Zero during initial catchup - - while True: - # Avoid double notifications if processing a block - if self.touched and not self.chain_state.processing_new_block(): - self.notify_sessions(self.touched) - self.touched.clear() - - # Log progress / state - todo = len(unfetched) + len(unprocessed) - if loops == 0: - pct = (len(txs) - todo) * 100 // len(txs) if txs else 0 - self.logger.info('catchup {:d}% complete ' - '({:,d} txs left)'.format(pct, todo)) - if not todo: - loops += 1 - if loops > 0: - synchronized.set() - now = time.time() - if now >= next_log and loops: - self.logger.info('{:,d} txs touching {:,d} addresses' - .format(len(txs), len(self.hashXs))) - next_log = now + 150 - - try: - if not todo: - await mempool_refresh_event.wait() - - self._resync_daemon_hashes(unprocessed, unfetched) - mempool_refresh_event.clear() - - if unfetched: - count = min(len(unfetched), fetch_size) - hex_hashes = [unfetched.pop() for n in range(count)] - unprocessed.update(await self.fetch_raw_txs(hex_hashes)) - - if unprocessed: - await process_some(unprocessed) - except DaemonError as e: - self.logger.info('ignoring daemon error: {}'.format(e)) - except asyncio.CancelledError: - # This aids clean shutdowns - self.stop = True - break - def _async_process_some(self, limit): pending = [] txs = self.txs fee_hist = self.fee_histogram - async def process(unprocessed): + async def process(unprocessed, touched): nonlocal pending raw_txs = {} @@ -174,7 +173,6 @@ class MemPool(object): pending.extend(deferred) hashXs = self.hashXs - touched = self.touched for hex_hash, item in result.items(): if hex_hash in txs: txs[hex_hash] = item @@ -188,17 +186,6 @@ class MemPool(object): return process - def on_new_block(self, touched): - '''Called after processing one or more new blocks. - - Touched is a set of hashXs touched by the transactions in the - block. Caller must be aware it is modified by this function. - ''' - # Minor race condition here with mempool processor thread - touched.update(self.touched) - self.touched.clear() - self.notify_sessions(touched) - async def fetch_raw_txs(self, hex_hashes): '''Fetch a list of mempool transactions.''' raw_txs = await self.chain_state.getrawtransactions(hex_hashes) @@ -241,9 +228,6 @@ class MemPool(object): utxo_lookup = self.chain_state.utxo_lookup for item in pending: - if self.stop: - break - tx_hash, old_txin_pairs, txout_pairs, tx_size = item if tx_hash not in txs: continue diff --git a/electrumx/server/session.py b/electrumx/server/session.py index fa47e84..81196fe 100644 --- a/electrumx/server/session.py +++ b/electrumx/server/session.py @@ -97,12 +97,14 @@ class SessionManager(object): CATCHING_UP, LISTENING, PAUSED, SHUTTING_DOWN = range(4) - def __init__(self, env, tasks, chain_state, peer_mgr, shutdown_event): + def __init__(self, env, tasks, chain_state, peer_mgr, notifications, + shutdown_event): env.max_send = max(350000, env.max_send) self.env = env self.tasks = tasks self.chain_state = chain_state self.peer_mgr = peer_mgr + self.notifications = notifications self.shutdown_event = shutdown_event self.logger = util.class_logger(__name__, self.__class__.__name__) self.servers = {} @@ -123,8 +125,7 @@ class SessionManager(object): self.mn_cache = [] # Event triggered when electrumx is listening for incoming requests. self.server_listening = asyncio.Event() - # FIXME - chain_state._mempool.notify_sessions = self.notify_sessions + notifications.notify_sessions = self.notify_sessions # Set up the RPC request handlers cmds = ('add_peer daemon_url disconnect getinfo groups log peers ' 'reorg sessions stop'.split()) @@ -431,12 +432,11 @@ class SessionManager(object): '''The number of connections that we've sent something to.''' return len(self.sessions) - def notify_sessions(self, touched): + async def notify_sessions(self, touched, height): '''Notify sessions about height changes and touched addresses.''' self.chain_state.invalidate_history_cache(touched) # Height notifications are synchronous. Those sessions with # touched addresses are scheduled for asynchronous completion - height = self.chain_state.db_height() for session in self.sessions: if isinstance(session, LocalRPC): continue