From 92ddb52f633b0541ebdc20b48c05974690d62553 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Tue, 24 Jul 2018 11:41:29 +0800 Subject: [PATCH] Update various comments --- electrumx/server/mempool.py | 51 ++++++++++++++++--------------------- 1 file changed, 22 insertions(+), 29 deletions(-) diff --git a/electrumx/server/mempool.py b/electrumx/server/mempool.py index c370d03..88dfad2 100644 --- a/electrumx/server/mempool.py +++ b/electrumx/server/mempool.py @@ -22,6 +22,7 @@ from electrumx.server.db import UTXO @attr.s(slots=True) class MemPoolTx(object): prevouts = attr.ib() + # A pair is a (hashX, value) tuple in_pairs = attr.ib() out_pairs = attr.ib() fee = attr.ib() @@ -32,14 +33,11 @@ class MemPool(object): '''Representation of the daemon's mempool. Updated regularly in caught-up state. Goal is to enable efficient - response to the value() and transactions() calls. + response to the calls in the external interface. To that end we + maintain the following maps: - To that end we maintain the following maps: - - tx_hash -> MemPoolTx - hashX -> set of all tx hashes in which the hashX appears - - A pair is a (hashX, value) tuple. tx hashes are binary not strings. + tx: tx_hash -> MemPoolTx + hashXs: hashX -> set of all hashes of txs touching the hashX ''' def __init__(self, coin, tasks, daemon, notifications, lookup_utxos): @@ -117,28 +115,25 @@ class MemPool(object): # Spend the prevouts unspent.difference_update(tx.prevouts) - # Save the in_pairs, compute the fee, and accept the TX + # Save the in_pairs, compute the fee and accept the TX tx.in_pairs = tuple(in_pairs) tx.fee = (sum(v for hashX, v in tx.in_pairs) - sum(v for hashX, v in tx.out_pairs)) txs[hash] = tx + for hashX, value in itertools.chain(tx.in_pairs, tx.out_pairs): touched.add(hashX) hashXs[hashX].add(hash) return deferred, {prevout: utxo_map[prevout] for prevout in unspent} - async def _refresh_hashes(self, single_pass): - '''Return a (hash set, height) pair when we're sure which height they - are for.''' - refresh_event = asyncio.Event() - loop = self.tasks.loop + async def _refresh_hashes(self, once): + '''Refresh our view of the daemon's mempool.''' for loop_count in itertools.count(): height = self.daemon.cached_height() hex_hashes = await self.daemon.mempool_hashes() if height != await self.daemon.height(): continue - loop.call_later(5, refresh_event.set) hashes = set(hex_str_to_hash(hh) for hh in hex_hashes) touched = await self._process_mempool(hashes) await self.notifications.on_mempool(touched, height) @@ -146,10 +141,9 @@ class MemPool(object): # can be expensive. if loop_count % 100 == 0: await self.tasks.run_in_thread(self._update_histogram) - if single_pass: + if once: return - await refresh_event.wait() - refresh_event.clear() + await asyncio.sleep(5) async def _process_mempool(self, all_hashes): # Re-sync with the new set of hashes @@ -201,8 +195,7 @@ class MemPool(object): hex_hashes_iter = (hash_to_hex_str(hash) for hash in hashes) raw_txs = await self.daemon.getrawtransactions(hex_hashes_iter) - def deserialize_txs(): - # This function is pure + def deserialize_txs(): # This function is pure to_hashX = self.coin.hashX_from_script deserializer = self.coin.DESERIALIZER @@ -213,15 +206,11 @@ class MemPool(object): if not raw_tx: continue tx, tx_size = deserializer(raw_tx).read_tx_and_vsize() - - # Convert the tx outputs into (hashX, value) pairs - txout_pairs = tuple((to_hashX(txout.pk_script), txout.value) - for txout in tx.outputs) - - # Convert the tx inputs to (prev_hash, prev_idx) pairs + # Convert the inputs and outputs into (hashX, value) pairs txin_pairs = tuple((txin.prev_hash, txin.prev_idx) for txin in tx.inputs) - + txout_pairs = tuple((to_hashX(txout.pk_script), txout.value) + for txout in tx.outputs) txs[hash] = MemPoolTx(txin_pairs, None, txout_pairs, 0, tx_size) return txs @@ -231,7 +220,8 @@ class MemPool(object): # Determine all prevouts not in the mempool, and fetch the # UTXO information from the database. Failed prevout lookups - # return None - concurrent database updates happen + # return None - concurrent database updates happen - which is + # relied upon by _accept_transactions prevouts = tuple(prevout for tx in tx_map.values() for prevout in tx.prevouts if prevout[0] not in all_hashes) @@ -240,7 +230,10 @@ class MemPool(object): return self._accept_transactions(tx_map, utxo_map, touched) + # # External interface + # + async def start_and_wait_for_sync(self): '''Starts the mempool synchronizer. @@ -249,11 +242,11 @@ class MemPool(object): self.logger.info('beginning processing of daemon mempool. ' 'This can take some time...') start = time.time() - await self._refresh_hashes(True) + await self._refresh_hashes(once=True) elapsed = time.time() - start self.logger.info(f'synced in {elapsed:.2f}s') self.tasks.create_task(self._log_stats()) - self.tasks.create_task(self._refresh_hashes(False)) + self.tasks.create_task(self._refresh_hashes(once=False)) async def balance_delta(self, hashX): '''Return the unconfirmed amount in the mempool for hashX.