Browse Source

Update various comments

patch-2
Neil Booth 7 years ago
parent
commit
92ddb52f63
  1. 51
      electrumx/server/mempool.py

51
electrumx/server/mempool.py

@ -22,6 +22,7 @@ from electrumx.server.db import UTXO
@attr.s(slots=True) @attr.s(slots=True)
class MemPoolTx(object): class MemPoolTx(object):
prevouts = attr.ib() prevouts = attr.ib()
# A pair is a (hashX, value) tuple
in_pairs = attr.ib() in_pairs = attr.ib()
out_pairs = attr.ib() out_pairs = attr.ib()
fee = attr.ib() fee = attr.ib()
@ -32,14 +33,11 @@ class MemPool(object):
'''Representation of the daemon's mempool. '''Representation of the daemon's mempool.
Updated regularly in caught-up state. Goal is to enable efficient Updated regularly in caught-up state. Goal is to enable efficient
response to the value() and transactions() calls. response to the calls in the external interface. To that end we
maintain the following maps:
To that end we maintain the following maps: tx: tx_hash -> MemPoolTx
hashXs: hashX -> set of all hashes of txs touching the hashX
tx_hash -> MemPoolTx
hashX -> set of all tx hashes in which the hashX appears
A pair is a (hashX, value) tuple. tx hashes are binary not strings.
''' '''
def __init__(self, coin, tasks, daemon, notifications, lookup_utxos): def __init__(self, coin, tasks, daemon, notifications, lookup_utxos):
@ -117,28 +115,25 @@ class MemPool(object):
# Spend the prevouts # Spend the prevouts
unspent.difference_update(tx.prevouts) unspent.difference_update(tx.prevouts)
# Save the in_pairs, compute the fee, and accept the TX # Save the in_pairs, compute the fee and accept the TX
tx.in_pairs = tuple(in_pairs) tx.in_pairs = tuple(in_pairs)
tx.fee = (sum(v for hashX, v in tx.in_pairs) - tx.fee = (sum(v for hashX, v in tx.in_pairs) -
sum(v for hashX, v in tx.out_pairs)) sum(v for hashX, v in tx.out_pairs))
txs[hash] = tx txs[hash] = tx
for hashX, value in itertools.chain(tx.in_pairs, tx.out_pairs): for hashX, value in itertools.chain(tx.in_pairs, tx.out_pairs):
touched.add(hashX) touched.add(hashX)
hashXs[hashX].add(hash) hashXs[hashX].add(hash)
return deferred, {prevout: utxo_map[prevout] for prevout in unspent} return deferred, {prevout: utxo_map[prevout] for prevout in unspent}
async def _refresh_hashes(self, single_pass): async def _refresh_hashes(self, once):
'''Return a (hash set, height) pair when we're sure which height they '''Refresh our view of the daemon's mempool.'''
are for.'''
refresh_event = asyncio.Event()
loop = self.tasks.loop
for loop_count in itertools.count(): for loop_count in itertools.count():
height = self.daemon.cached_height() height = self.daemon.cached_height()
hex_hashes = await self.daemon.mempool_hashes() hex_hashes = await self.daemon.mempool_hashes()
if height != await self.daemon.height(): if height != await self.daemon.height():
continue continue
loop.call_later(5, refresh_event.set)
hashes = set(hex_str_to_hash(hh) for hh in hex_hashes) hashes = set(hex_str_to_hash(hh) for hh in hex_hashes)
touched = await self._process_mempool(hashes) touched = await self._process_mempool(hashes)
await self.notifications.on_mempool(touched, height) await self.notifications.on_mempool(touched, height)
@ -146,10 +141,9 @@ class MemPool(object):
# can be expensive. # can be expensive.
if loop_count % 100 == 0: if loop_count % 100 == 0:
await self.tasks.run_in_thread(self._update_histogram) await self.tasks.run_in_thread(self._update_histogram)
if single_pass: if once:
return return
await refresh_event.wait() await asyncio.sleep(5)
refresh_event.clear()
async def _process_mempool(self, all_hashes): async def _process_mempool(self, all_hashes):
# Re-sync with the new set of hashes # Re-sync with the new set of hashes
@ -201,8 +195,7 @@ class MemPool(object):
hex_hashes_iter = (hash_to_hex_str(hash) for hash in hashes) hex_hashes_iter = (hash_to_hex_str(hash) for hash in hashes)
raw_txs = await self.daemon.getrawtransactions(hex_hashes_iter) raw_txs = await self.daemon.getrawtransactions(hex_hashes_iter)
def deserialize_txs(): def deserialize_txs(): # This function is pure
# This function is pure
to_hashX = self.coin.hashX_from_script to_hashX = self.coin.hashX_from_script
deserializer = self.coin.DESERIALIZER deserializer = self.coin.DESERIALIZER
@ -213,15 +206,11 @@ class MemPool(object):
if not raw_tx: if not raw_tx:
continue continue
tx, tx_size = deserializer(raw_tx).read_tx_and_vsize() tx, tx_size = deserializer(raw_tx).read_tx_and_vsize()
# Convert the inputs and outputs into (hashX, value) pairs
# Convert the tx outputs into (hashX, value) pairs
txout_pairs = tuple((to_hashX(txout.pk_script), txout.value)
for txout in tx.outputs)
# Convert the tx inputs to (prev_hash, prev_idx) pairs
txin_pairs = tuple((txin.prev_hash, txin.prev_idx) txin_pairs = tuple((txin.prev_hash, txin.prev_idx)
for txin in tx.inputs) for txin in tx.inputs)
txout_pairs = tuple((to_hashX(txout.pk_script), txout.value)
for txout in tx.outputs)
txs[hash] = MemPoolTx(txin_pairs, None, txout_pairs, txs[hash] = MemPoolTx(txin_pairs, None, txout_pairs,
0, tx_size) 0, tx_size)
return txs return txs
@ -231,7 +220,8 @@ class MemPool(object):
# Determine all prevouts not in the mempool, and fetch the # Determine all prevouts not in the mempool, and fetch the
# UTXO information from the database. Failed prevout lookups # UTXO information from the database. Failed prevout lookups
# return None - concurrent database updates happen # return None - concurrent database updates happen - which is
# relied upon by _accept_transactions
prevouts = tuple(prevout for tx in tx_map.values() prevouts = tuple(prevout for tx in tx_map.values()
for prevout in tx.prevouts for prevout in tx.prevouts
if prevout[0] not in all_hashes) if prevout[0] not in all_hashes)
@ -240,7 +230,10 @@ class MemPool(object):
return self._accept_transactions(tx_map, utxo_map, touched) return self._accept_transactions(tx_map, utxo_map, touched)
#
# External interface # External interface
#
async def start_and_wait_for_sync(self): async def start_and_wait_for_sync(self):
'''Starts the mempool synchronizer. '''Starts the mempool synchronizer.
@ -249,11 +242,11 @@ class MemPool(object):
self.logger.info('beginning processing of daemon mempool. ' self.logger.info('beginning processing of daemon mempool. '
'This can take some time...') 'This can take some time...')
start = time.time() start = time.time()
await self._refresh_hashes(True) await self._refresh_hashes(once=True)
elapsed = time.time() - start elapsed = time.time() - start
self.logger.info(f'synced in {elapsed:.2f}s') self.logger.info(f'synced in {elapsed:.2f}s')
self.tasks.create_task(self._log_stats()) self.tasks.create_task(self._log_stats())
self.tasks.create_task(self._refresh_hashes(False)) self.tasks.create_task(self._refresh_hashes(once=False))
async def balance_delta(self, hashX): async def balance_delta(self, hashX):
'''Return the unconfirmed amount in the mempool for hashX. '''Return the unconfirmed amount in the mempool for hashX.

Loading…
Cancel
Save