Browse Source

Make mempool processing more properly asynchronous

patch-2
Neil Booth 7 years ago
parent
commit
9bd9476a54
  1. 2
      electrumx/server/controller.py
  2. 88
      electrumx/server/db.py
  3. 101
      electrumx/server/mempool.py

2
electrumx/server/controller.py

@ -99,7 +99,7 @@ class Controller(ServerBase):
BlockProcessor = env.coin.BLOCK_PROCESSOR BlockProcessor = env.coin.BLOCK_PROCESSOR
self.bp = BlockProcessor(env, self.tasks, daemon, notifications) self.bp = BlockProcessor(env, self.tasks, daemon, notifications)
self.mempool = MemPool(env.coin, self.tasks, daemon, notifications, self.mempool = MemPool(env.coin, self.tasks, daemon, notifications,
self.bp.db_utxo_lookup) self.bp.lookup_utxos)
self.chain_state = ChainState(env, self.tasks, daemon, self.bp, self.chain_state = ChainState(env, self.tasks, daemon, self.bp,
notifications) notifications)
self.peer_mgr = PeerManager(env, self.tasks, self.chain_state) self.peer_mgr = PeerManager(env, self.tasks, self.chain_state)

88
electrumx/server/db.py

@ -35,9 +35,6 @@ class DB(object):
DB_VERSIONS = [6] DB_VERSIONS = [6]
class MissingUTXOError(Exception):
'''Raised if a mempool tx input UTXO couldn't be found.'''
class DBError(Exception): class DBError(Exception):
'''Raised on general DB errors generally indicating corruption.''' '''Raised on general DB errors generally indicating corruption.'''
@ -399,43 +396,52 @@ class DB(object):
tx_hash, height = self.fs_tx_hash(tx_num) tx_hash, height = self.fs_tx_hash(tx_num)
yield UTXO(tx_num, tx_pos, tx_hash, height, value) yield UTXO(tx_num, tx_pos, tx_hash, height, value)
def db_utxo_lookup(self, tx_hash, tx_idx): async def lookup_utxos(self, prevouts):
'''Given a prevout return a (hashX, value) pair. '''For each prevout, lookup it up in the DB and return a (hashX,
value) pair or None if not found.
Raises MissingUTXOError if the UTXO is not found. Used by the Used by the mempool code.
mempool code.
''' '''
idx_packed = pack('<H', tx_idx) def lookup_hashXs():
hashX, tx_num_packed = self._db_hashX(tx_hash, idx_packed) '''Return (hashX, suffix) pairs, or None if not found,
if not hashX: for each prevout.
# This can happen when the daemon is a block ahead of us '''
# and has mempool txs spending outputs from that new block def lookup_hashX(tx_hash, tx_idx):
raise self.MissingUTXOError idx_packed = pack('<H', tx_idx)
# Key: b'u' + address_hashX + tx_idx + tx_num # Key: b'h' + compressed_tx_hash + tx_idx + tx_num
# Value: the UTXO value as a 64-bit unsigned integer # Value: hashX
key = b'u' + hashX + idx_packed + tx_num_packed prefix = b'h' + tx_hash[:4] + idx_packed
db_value = self.utxo_db.get(key)
if not db_value: # Find which entry, if any, the TX_HASH matches.
raise self.DBError('UTXO {} / {:,d} in one table only' for db_key, hashX in self.utxo_db.iterator(prefix=prefix):
.format(hash_to_hex_str(tx_hash), tx_idx)) tx_num_packed = db_key[-4:]
value, = unpack('<Q', db_value) tx_num, = unpack('<I', tx_num_packed)
return hashX, value hash, height = self.fs_tx_hash(tx_num)
if hash == tx_hash:
def _db_hashX(self, tx_hash, idx_packed): return hashX, idx_packed + tx_num_packed
'''Return (hashX, tx_num_packed) for the given TXO. return None, None
return [lookup_hashX(*prevout) for prevout in prevouts]
Both are None if not found.'''
# Key: b'h' + compressed_tx_hash + tx_idx + tx_num def lookup_utxos(hashX_pairs):
# Value: hashX def lookup_utxo(hashX, suffix):
prefix = b'h' + tx_hash[:4] + idx_packed if not hashX:
# This can happen when the daemon is a block ahead
# Find which entry, if any, the TX_HASH matches. # of us and has mempool txs spending outputs from
for db_key, hashX in self.utxo_db.iterator(prefix=prefix): # that new block
tx_num_packed = db_key[-4:] return None
tx_num, = unpack('<I', tx_num_packed) # Key: b'u' + address_hashX + tx_idx + tx_num
hash, height = self.fs_tx_hash(tx_num) # Value: the UTXO value as a 64-bit unsigned integer
if hash == tx_hash: key = b'u' + hashX + suffix
return hashX, tx_num_packed db_value = self.utxo_db.get(key)
if not db_value:
return None, None # This can happen if the DB was updated between
# getting the hashXs and getting the UTXOs
return None
value, = unpack('<Q', db_value)
return hashX, value
return [lookup_utxo(*hashX_pair) for hashX_pair in hashX_pairs]
run_in_thread = self.tasks.run_in_thread
hashX_pairs = await run_in_thread(lookup_hashXs)
return await run_in_thread(lookup_utxos, hashX_pairs)

101
electrumx/server/mempool.py

@ -14,7 +14,7 @@ from collections import defaultdict
from electrumx.lib.hash import hash_to_hex_str, hex_str_to_hash from electrumx.lib.hash import hash_to_hex_str, hex_str_to_hash
from electrumx.lib.util import class_logger from electrumx.lib.util import class_logger
from electrumx.server.db import UTXO, DB from electrumx.server.db import UTXO
class MemPool(object): class MemPool(object):
@ -31,10 +31,10 @@ class MemPool(object):
A pair is a (hashX, value) tuple. tx hashes are hex strings. A pair is a (hashX, value) tuple. tx hashes are hex strings.
''' '''
def __init__(self, coin, tasks, daemon, notifications, utxo_lookup): def __init__(self, coin, tasks, daemon, notifications, lookup_utxos):
self.logger = class_logger(__name__, self.__class__.__name__) self.logger = class_logger(__name__, self.__class__.__name__)
self.coin = coin self.coin = coin
self.utxo_lookup = utxo_lookup self.lookup_utxos = lookup_utxos
self.tasks = tasks self.tasks = tasks
self.daemon = daemon self.daemon = daemon
self.notifications = notifications self.notifications = notifications
@ -142,7 +142,6 @@ class MemPool(object):
def _async_process_some(self, limit): def _async_process_some(self, limit):
pending = [] pending = []
txs = self.txs txs = self.txs
fee_hist = self.fee_histogram
async def process(unprocessed, touched): async def process(unprocessed, touched):
nonlocal pending nonlocal pending
@ -159,21 +158,8 @@ class MemPool(object):
deferred = pending deferred = pending
pending = [] pending = []
result, deferred = await self.tasks.run_in_thread( deferred = await self._process_raw_txs(raw_txs, deferred, touched)
self._process_raw_txs, raw_txs, deferred)
pending.extend(deferred) pending.extend(deferred)
hashXs = self.hashXs
for hex_hash, item in result.items():
if hex_hash in txs:
txs[hex_hash] = item
txin_pairs, txout_pairs, tx_fee, tx_size = item
fee_rate = tx_fee // tx_size
fee_hist[fee_rate] += tx_size
for hashX, value in itertools.chain(txin_pairs,
txout_pairs):
touched.add(hashX)
hashXs[hashX].add(hex_hash)
return process return process
@ -185,22 +171,15 @@ class MemPool(object):
# evicted or they got in a block. # evicted or they got in a block.
return {hh: raw for hh, raw in zip(hex_hashes, raw_txs) if raw} return {hh: raw for hh, raw in zip(hex_hashes, raw_txs) if raw}
def _process_raw_txs(self, raw_tx_map, pending): async def _process_raw_txs(self, raw_tx_map, pending, touched):
'''Process the dictionary of raw transactions and return a dictionary '''Process the dictionary of raw transactions and return a dictionary
of updates to apply to self.txs. of updates to apply to self.txs.
This runs in the executor so should not update any member
variables it doesn't own. Atomic reads of self.txs that do
not depend on the result remaining the same are fine.
''' '''
script_hashX = self.coin.hashX_from_script script_hashX = self.coin.hashX_from_script
deserializer = self.coin.DESERIALIZER deserializer = self.coin.DESERIALIZER
txs = self.txs
# Deserialize each tx and put it in a pending list # Deserialize each tx and put it in a pending list
for tx_hash, raw_tx in raw_tx_map.items(): for tx_hash, raw_tx in raw_tx_map.items():
if tx_hash not in txs:
continue
tx, tx_size = deserializer(raw_tx).read_tx_and_vsize() tx, tx_size = deserializer(raw_tx).read_tx_and_vsize()
# Convert the tx outputs into (hashX, value) pairs # Convert the tx outputs into (hashX, value) pairs
@ -213,48 +192,54 @@ class MemPool(object):
pending.append((tx_hash, txin_pairs, txout_pairs, tx_size)) pending.append((tx_hash, txin_pairs, txout_pairs, tx_size))
# Now process what we can # The transaction inputs can be from other mempool
result = {} # transactions (which may or may not be processed yet) or are
# otherwise presumably in the DB.
txs = self.txs
db_prevouts = [(hex_str_to_hash(prev_hash), prev_idx)
for item in pending
for (prev_hash, prev_idx) in item[1]
if prev_hash not in txs]
# If a lookup fails, it returns a None entry
db_utxos = await self.lookup_utxos(db_prevouts)
db_utxo_map = {(hash_to_hex_str(prev_hash), prev_idx): db_utxo
for (prev_hash, prev_idx), db_utxo
in zip(db_prevouts, db_utxos)}
deferred = [] deferred = []
utxo_lookup = self.utxo_lookup hashXs = self.hashXs
fee_hist = self.fee_histogram
for item in pending: for item in pending:
tx_hash, old_txin_pairs, txout_pairs, tx_size = item tx_hash, previns, txout_pairs, tx_size = item
if tx_hash not in txs: if tx_hash not in txs:
continue continue
mempool_missing = False
txin_pairs = [] txin_pairs = []
try: try:
for prev_hex_hash, prev_idx in old_txin_pairs: for previn in previns:
tx_info = txs.get(prev_hex_hash, 0) utxo = db_utxo_map.get(previn)
if tx_info is None: if not utxo:
tx_info = result.get(prev_hex_hash) prev_hash, prev_index = previn
if not tx_info: # This can raise a KeyError or TypeError
mempool_missing = True utxo = txs[prev_hash][1][prev_index]
continue txin_pairs.append(utxo)
if tx_info: except (KeyError, TypeError):
txin_pairs.append(tx_info[1][prev_idx])
elif not mempool_missing:
prev_hash = hex_str_to_hash(prev_hex_hash)
txin_pairs.append(utxo_lookup(prev_hash, prev_idx))
except (DB.MissingUTXOError, DB.DBError):
# DBError can happen when flushing a newly processed
# block. MissingUTXOError typically happens just
# after the daemon has accepted a new block and the
# new mempool has deps on new txs in that block.
continue
if mempool_missing:
deferred.append(item) deferred.append(item)
else: continue
# Compute fee
tx_fee = (sum(v for hashX, v in txin_pairs) -
sum(v for hashX, v in txout_pairs))
result[tx_hash] = (txin_pairs, txout_pairs, tx_fee, tx_size)
return result, deferred # Compute fee
tx_fee = (sum(v for hashX, v in txin_pairs) -
sum(v for hashX, v in txout_pairs))
fee_rate = tx_fee // tx_size
fee_hist[fee_rate] += tx_size
txs[tx_hash] = (txin_pairs, txout_pairs, tx_fee, tx_size)
for hashX, value in itertools.chain(txin_pairs, txout_pairs):
touched.add(hashX)
hashXs[hashX].add(tx_hash)
return deferred
async def _raw_transactions(self, hashX): async def _raw_transactions(self, hashX):
'''Returns an iterable of (hex_hash, raw_tx) pairs for all '''Returns an iterable of (hex_hash, raw_tx) pairs for all

Loading…
Cancel
Save