Browse Source

Initial attempt at mempool

master
Neil Booth 8 years ago
parent
commit
48b8b9332e
  1. 200
      server/block_processor.py
  2. 68
      server/cache.py
  3. 17
      server/daemon.py
  4. 21
      server/protocol.py

200
server/block_processor.py

@ -21,6 +21,7 @@ from server.cache import FSCache, UTXOCache, NO_CACHE_ENTRY
from server.daemon import DaemonError
from lib.hash import hash_to_str
from lib.script import ScriptPubKey
from lib.tx import Deserializer
from lib.util import chunks, LoggedClass
from server.storage import open_db
@ -50,7 +51,7 @@ class Prefetcher(LoggedClass):
self.queue = asyncio.Queue()
self.queue_size = 0
self.fetched_height = height
self.mempool = []
self.mempool_hashes = []
# Target cache size. Has little effect on sync time.
self.target_cache_size = 10 * 1024 * 1024
# First fetch to be 10 blocks
@ -74,7 +75,7 @@ class Prefetcher(LoggedClass):
blocks, height, size = await self.queue.get()
self.queue_size -= size
if height == self.daemon.cached_height():
return blocks, self.mempool
return blocks, self.mempool_hashes
else:
return blocks, None
@ -99,7 +100,7 @@ class Prefetcher(LoggedClass):
self.fetched_height += len(blocks)
caught_up = self.fetched_height == self.daemon.cached_height()
if caught_up:
self.mempool = await self.daemon.mempool_hashes()
self.mempool_hashes = await self.daemon.mempool_hashes()
# Wake up block processor if we have something
if blocks or caught_up:
@ -137,6 +138,142 @@ class Prefetcher(LoggedClass):
return blocks, size
class MissingUTXOError(Exception):
pass
class MemPool(LoggedClass):
'''Representation of the daemon's mempool.
Updated regularly in caught-up state. Goal is to enable efficient
response to the value() and transactions() calls.
To that end we maintain the following maps:
tx_hash -> [txin_pairs, txout_pairs, unconfirmed]
hash168 -> set of all tx hashes in which the hash168 appears
A pair is a (hash168, value) tuple. Unconfirmed is true if any of the
tx's txins are unconfirmed. tx hashes are hex strings.
'''
def __init__(self, bp):
super().__init__()
self.txs = {}
self.hash168s = defaultdict(set) # None can be a key
self.bp = bp
async def update(self, hex_hashes):
'''Update state given the current mempool to the passed set of hashes.
Remove transactions that are no longer in our mempool.
Request new transactions we don't have then add to our mempool.
'''
hex_hashes = set(hex_hashes)
touched = set()
if not self.txs:
self.logger.info('initial fetch of {:,d} daemon mempool txs'
.format(len(hex_hashes)))
# Remove gone items
gone = set(self.txs).difference(hex_hashes)
for hex_hash in gone:
txin_pairs, txout_pairs, unconfirmed = self.txs.pop(hex_hash)
hash168s = set(hash168 for hash168, value in txin_pairs)
hash168s.update(hash168 for hash168, value in txout_pairs)
for hash168 in hash168s:
self.hash168s[hash168].remove(hex_hash)
touched.update(hash168s)
if gone:
self.logger.info('{:,d} entries removed from mempool'
.format(len(gone)))
# Get the raw transactions for the new hashes. Ignore the
# ones the daemon no longer has (it will return None). Put
# them into a dictionary of hex hash to deserialized tx.
hex_hashes.difference_update(self.txs)
raw_txs = await self.bp.daemon.getrawtransactions(hex_hashes)
new_txs = {hex_hash: Deserializer(raw_tx).read_tx()
for hex_hash, raw_tx in zip(hex_hashes, raw_txs) if raw_tx}
del raw_txs, hex_hashes
# The mempool is unordered, so process all outputs first so
# that looking for inputs has full info.
parse_script = ScriptPubKey.from_script
coin = self.bp.coin
utxo_lookup = self.bp.utxo_cache.lookup
def txout_pair(txout):
return (parse_script(txout.pk_script, coin).hash168, txout.value)
for hex_hash, tx in new_txs.items():
txout_pairs = tuple(txout_pair(txout) for txout in tx.outputs)
self.txs[hex_hash] = [None, txout_pairs, None]
def txin_info(txin):
hex_hash = hash_to_str(txin.prev_hash)
mempool_entry = self.txs.get(hex_hash)
if mempool_entry:
return mempool_entry[1][txin.prev_idx], True
entry = utxo_lookup(txin.prev_hash, txin.prev_idx)
if entry == NO_CACHE_ENTRY:
# Not possible unless daemon is lying or we're corrupted?
self.logger.warning('no UTXO found for {} / {}'
.format(hash_to_str(txin.prev_hash),
txin.prev_idx))
raise MissingUTXOError
return (entry[:21], struct.unpack('<Q', entry[-8:])), False
# Now add the inputs
for hex_hash, tx in new_txs.items():
txout_pairs = self.txs[hex_hash][1]
try:
infos = (txin_info(txin) for txin in tx.inputs)
txin_pairs, unconfs = zip(*infos)
except MissingUTXOError:
# If we were missing a UTXO for some reason drop this tx
del self.txs[hex_hash]
continue
self.txs[hex_hash] = [txin_pairs, txout_pairs, any(unconfs)]
# Update touched and self.hash168s for the new tx
for hash168, value in txin_pairs:
self.hash168s[hash168].add(hex_hash)
touched.add(hash168)
for hash168, value in txout_pairs:
self.hash168s[hash168].add(hex_hash)
touched.add(hash168)
self.logger.info('{:,d} entries in mempool for {:,d} addresses'
.format(len(self.txs), len(self.hash168s)))
# Might include a None
return touched
def transactions(self, hash168):
'''Generate (hex_hash, tx_fee, unconfirmed) tuples for mempool
entries for the hash168.
unconfirmed is True if any txin is confirmed.
'''
for hex_hash in self.hash168s[hash168]:
txin_pairs, txout_pairs, unconfirmed = self.txs[hex_hash]
tx_fee = (sum(v for hash168, v in txin_pairs)
- sum(v for hash168, v in txout_pairs))
yield (hex_hash, tx_fee, unconfirmed)
def value(self, hash168):
'''Return the unconfirmed amount in the mempool for hash168.
Can be positive or negative.
'''
value = 0
for tx_hash in self.hash168s[hash168]:
txin_pairs, txout_pairs, unconfirmed = self.txs[hex_hash]
value -= sum(v for h168, v in txin_pairs if h168 == hash168)
value += sum(v for h168, v in txout_pairs if h168 == hash168)
return value
class BlockProcessor(LoggedClass):
'''Process blocks and update the DB state to match.
@ -153,6 +290,8 @@ class BlockProcessor(LoggedClass):
self.daemon = daemon
self.on_update = on_update
self.mempool = MemPool(self)
self.touched = set()
# Meta
self.utxo_MB = env.utxo_MB
@ -232,18 +371,19 @@ class BlockProcessor(LoggedClass):
Blocks are only processed in the forward direction. The
prefetcher only provides a non-None mempool when caught up.
'''
all_touched = [set()]
blocks, mempool = await self.prefetcher.get_blocks()
blocks, mempool_hashes = await self.prefetcher.get_blocks()
caught_up = mempool_hashes is not None
for block in blocks:
touched = self.advance_block(block)
if touched is None:
all_touched.append(await self.handle_chain_reorg())
mempool = None
break
all_touched.append(touched)
if self.advance_block(block, caught_up):
await self.handle_chain_reorg()
return
await asyncio.sleep(0) # Yield
if mempool is not None:
if caught_up:
await self.caught_up(mempool_hashes)
async def caught_up(self, mempool_hashes):
'''Called after each deamon poll if caught up.'''
# Caught up to daemon height. Flush everything as queries
# are performed on the DB and not in-memory.
self.flush(True)
@ -251,8 +391,9 @@ class BlockProcessor(LoggedClass):
self.first_sync = False
self.logger.info('synced to height {:,d}'.format(self.height))
if self.on_update:
await self.on_update(self.height, set.union(*all_touched))
self.touched.update(await self.mempool.update(mempool_hashes))
await self.on_update(self.height, self.touched)
self.touched = set()
async def force_chain_reorg(self, to_genesis):
try:
@ -266,20 +407,17 @@ class BlockProcessor(LoggedClass):
self.flush(True)
self.logger.info('finding common height...')
touched = set()
hashes = await self.reorg_hashes(to_genesis)
# Reverse and convert to hex strings.
hashes = [hash_to_str(hash) for hash in reversed(hashes)]
for hex_hashes in chunks(hashes, 50):
blocks = await self.daemon.raw_blocks(hex_hashes)
touched.update(self.backup_blocks(blocks))
self.backup_blocks(blocks)
self.logger.info('backed up to height {:,d}'.format(self.height))
await self.prefetcher.clear(self.height)
self.logger.info('prefetcher reset')
return touched
async def reorg_hashes(self, to_genesis):
'''Return the list of hashes to back up beacuse of a reorg.
@ -565,7 +703,7 @@ class BlockProcessor(LoggedClass):
'''Read undo information from a file for the current height.'''
return self.db.get(self.undo_key(height))
def advance_block(self, block):
def advance_block(self, block, update_touched):
# We must update the fs_cache before calling advance_txs() as
# the UTXO cache uses the fs_cache via get_tx_hash() to
# resolve compressed key collisions
@ -590,7 +728,8 @@ class BlockProcessor(LoggedClass):
if utxo_MB >= self.utxo_MB or hist_MB >= self.hist_MB:
self.flush(utxo_MB >= self.utxo_MB)
return touched
if update_touched:
self.touched.update(touched)
def advance_txs(self, tx_hashes, txs, touched):
put_utxo = self.utxo_cache.put
@ -661,9 +800,9 @@ class BlockProcessor(LoggedClass):
self.logger.info('backed up to height {:,d}'.format(self.height))
self.touched.update(touched)
flush_history = partial(self.backup_history, hash168s=touched)
self.flush(True, flush_history=flush_history)
return touched
def backup_txs(self, tx_hashes, txs, touched):
# Prevout values, in order down the block (coinbase first if present)
@ -704,9 +843,24 @@ class BlockProcessor(LoggedClass):
assert isinstance(limit, int) and limit >= 0
return limit
def mempool_transactions(self, hash168):
'''Generate (hex_hash, tx_fee, unconfirmed) tuples for mempool
entries for the hash168.
unconfirmed is True if any txin is confirmed.
'''
return self.mempool.transactions(hash168)
def mempool_value(self, hash168):
'''Return the unconfirmed amount in the mempool for hash168.
Can be positive or negative.
'''
return self.mempool.value(hash168)
def get_history(self, hash168, limit=1000):
'''Generator that returns an unpruned, sorted list of (tx_hash,
height) tuples of transactions that touched the address,
height) tuples of confirmed transactions that touched the address,
earliest in the blockchain first. Includes both spending and
receiving transactions. By default yields at most 1000 entries.
Set limit to None to get them all.
@ -756,7 +910,7 @@ class BlockProcessor(LoggedClass):
hash168 = None
if 0 <= index <= 65535:
idx_packed = struct.pack('<H', index)
hash168 = self.utxo_cache.hash168(tx_hash, idx_packed)
hash168 = self.utxo_cache.hash168(tx_hash, idx_packed, False)
if hash168 == NO_CACHE_ENTRY:
hash168 = None
return hash168

68
server/cache.py

@ -95,38 +95,40 @@ class UTXOCache(LoggedClass):
self.cache_spends = 0
self.db_deletes = 0
def spend(self, prev_hash, prev_idx):
'''Spend a UTXO and return the cache's value.
def lookup(self, prev_hash, prev_idx):
'''Given a prevout, return a pair (hash168, value).
If the UTXO is not in the cache it must be on disk.
'''
# Fast track is it's in the cache
pack = struct.pack
idx_packed = pack('<H', prev_idx)
value = self.cache.pop(prev_hash + idx_packed, None)
If the UTXO is not found, returns (None, None).'''
# Fast track is it being in the cache
idx_packed = struct.pack('<H', prev_idx)
value = self.cache.get(prev_hash + idx_packed, None)
if value:
self.cache_spends += 1
return value
return self.db_lookup(prev_hash, idx_packed, False)
# Oh well. Find and remove it from the DB.
hash168 = self.hash168(prev_hash, idx_packed, True)
def db_lookup(self, tx_hash, idx_packed, delete=True):
'''Return a UTXO from the DB. Remove it if delete is True.
Return NO_CACHE_ENTRY if it is not in the DB.'''
hash168 = self.hash168(tx_hash, idx_packed, delete)
if not hash168:
return NO_CACHE_ENTRY
self.db_deletes += 1
# Read the UTXO through the cache from the disk. We have to
# go through the cache because compressed keys can collide.
key = b'u' + hash168 + prev_hash[:UTXO_TX_HASH_LEN] + idx_packed
key = b'u' + hash168 + tx_hash[:UTXO_TX_HASH_LEN] + idx_packed
data = self.cache_get(key)
if data is None:
# Uh-oh, this should not happen...
self.logger.error('found no UTXO for {} / {:d} key {}'
.format(hash_to_str(prev_hash), prev_idx,
.format(hash_to_str(tx_hash),
struct.unpack('<H', idx_packed),
bytes(key).hex()))
return NO_CACHE_ENTRY
if len(data) == 12:
if delete:
self.db_deletes += 1
self.cache_delete(key)
return hash168 + data
@ -135,26 +137,42 @@ class UTXOCache(LoggedClass):
assert len(data) % 12 == 0
for n in range(0, len(data), 12):
(tx_num, ) = struct.unpack('<I', data[n:n+4])
tx_hash, height = self.parent.get_tx_hash(tx_num)
if prev_hash == tx_hash:
result = hash168 + data[n: n+12]
data = data[:n] + data[n+12:]
self.cache_write(key, data)
this_tx_hash, height = self.parent.get_tx_hash(tx_num)
if tx_hash == this_tx_hash:
result = hash168 + data[n:n+12]
if delete:
self.db_deletes += 1
self.cache_write(key, data[:n] + data[n+12:])
return result
raise Exception('could not resolve UTXO key collision')
def hash168(self, tx_hash, idx_packed, delete=False):
def spend(self, prev_hash, prev_idx):
'''Spend a UTXO and return the cache's value.
If the UTXO is not in the cache it must be on disk.
'''
# Fast track is it being in the cache
idx_packed = struct.pack('<H', prev_idx)
value = self.cache.pop(prev_hash + idx_packed, None)
if value:
self.cache_spends += 1
return value
return self.db_lookup(prev_hash, idx_packed)
def hash168(self, tx_hash, idx_packed, delete=True):
'''Return the hash168 paid to by the given TXO.
Refers to the database. Returns None if not found (which is
indicates a non-standard script).
Look it up in the DB and removes it if delete is True. Return
None if not found.
'''
key = b'h' + tx_hash[:ADDR_TX_HASH_LEN] + idx_packed
data = self.cache_get(key)
if data is None:
# Assuming the DB is not corrupt, this indicates a
# successful spend of a non-standard script
# Assuming the DB is not corrupt, if delete is True this
# indicates a successful spend of a non-standard script
# as we don't currently record those
return None
if len(data) == 25:

17
server/daemon.py

@ -13,7 +13,7 @@ import json
import aiohttp
from lib.util import LoggedClass
import lib.util as util
class DaemonError(Exception):
@ -21,7 +21,7 @@ class DaemonError(Exception):
cannot be remedied by retrying.'''
class Daemon(LoggedClass):
class Daemon(util.LoggedClass):
'''Handles connections to a daemon at the given URL.'''
def __init__(self, url):
@ -107,6 +107,19 @@ class Daemon(LoggedClass):
'''Return the serialized raw transaction with the given hash.'''
return await self.send_single('getrawtransaction', (hex_hash, 0))
async def getrawtransactions(self, hex_hashes):
'''Return the serialized raw transactions with the given hashes.
Breaks large requests up. Yields after each sub request.'''
param_lists = tuple((hex_hash, 0) for hex_hash in hex_hashes)
raw_txs = []
for chunk in util.chunks(param_lists, 10000):
txs = await self.send_vector('getrawtransaction', chunk)
# Convert hex strings to bytes
raw_txs.append(tuple(bytes.fromhex(tx) for tx in txs))
await asyncio.sleep(0)
return sum(raw_txs, ())
async def sendrawtransaction(self, params):
'''Broadcast a transaction to the network.'''
return await self.send_single('sendrawtransaction', params)

21
server/protocol.py

@ -255,9 +255,15 @@ class ElectrumX(JSONRPC):
@classmethod
def address_status(cls, hash168):
'''Returns status as 32 bytes.'''
# Note history is ordered and mempool unordered in electrum-server
# For mempool, height is -1 if unconfirmed txins, otherwise 0
history = cls.BLOCK_PROCESSOR.get_history(hash168)
mempool = cls.BLOCK_PROCESSOR.mempool_transactions(hash168)
status = ''.join('{}:{:d}:'.format(hash_to_str(tx_hash), height)
for tx_hash, height in history)
status += ''.join('{}:{:d}:'.format(hex_hash, -unconfirmed)
for hex_hash, tx_fee, unconfirmed in mempool)
if status:
return sha256(status.encode()).hex()
return None
@ -297,11 +303,16 @@ class ElectrumX(JSONRPC):
@classmethod
def get_history(cls, hash168):
# Note history is ordered and mempool unordered in electrum-server
# For mempool, height is -1 if unconfirmed txins, otherwise 0
history = cls.BLOCK_PROCESSOR.get_history(hash168, limit=None)
return [
{'tx_hash': hash_to_str(tx_hash), 'height': height}
for tx_hash, height in history
]
mempool = cls.BLOCK_PROCESSOR.mempool_transactions(hash168)
conf = tuple({'tx_hash': hash_to_str(tx_hash), 'height': height}
for tx_hash, height in history)
unconf = tuple({'tx_hash': hex, 'height': -unconfirmed, 'fee': fee}
for hex, tx_fee, unconfirmed in mempool)
return conf + unconf
@classmethod
def get_chunk(cls, index):
@ -315,7 +326,7 @@ class ElectrumX(JSONRPC):
@classmethod
def get_balance(cls, hash168):
confirmed = cls.BLOCK_PROCESSOR.get_balance(hash168)
unconfirmed = -1 # FIXME
unconfirmed = cls.BLOCK_PROCESSOR.mempool_value(hash168)
return {'confirmed': confirmed, 'unconfirmed': unconfirmed}
@classmethod

Loading…
Cancel
Save