Browse Source

Merge branch 'utxo_cache' into develop

master
Neil Booth 8 years ago
parent
commit
1599cd6672
  1. 2
      query.py
  2. 457
      server/db.py
  3. 2
      server/env.py

2
query.py

@ -33,7 +33,7 @@ def main():
n = None
for n, utxo in enumerate(db.get_utxos(hash168, limit)):
print('UTXOs #{:d}: hash: {} pos: {:d} height: {:d} value: {:d}'
.format(n, bytes(reversed(utxo.tx_hash)).hex(),
.format(n + 1, bytes(reversed(utxo.tx_hash)).hex(),
utxo.tx_pos, utxo.height, utxo.value))
if n is None:
print('No UTXOs')

457
server/db.py

@ -31,6 +31,250 @@ def to_4_bytes(value):
def from_4_bytes(b):
return struct.unpack('<I', b)[0]
class UTXOCache(object):
'''An in-memory UTXO cache, representing all changes to UTXO state
since the last DB flush.
We want to store millions, perhaps 10s of millions of these in
memory for optimal performance during initial sync, because then
it is possible to spend UTXOs without ever going to the database
(other than as an entry in the address history, and there is only
one such entry per TX not per UTXO). So store them in a Python
dictionary with binary keys and values.
Key: TX_HASH + TX_IDX (32 + 2 = 34 bytes)
Value: HASH168 + TX_NUM + VALUE (21 + 4 + 8 = 33 bytes)
That's 67 bytes of raw data. Assume 100 bytes per UTXO accounting
for Python datastructure overhead, then perhaps 20 million UTXOs
can fit in 2GB of RAM. There are approximately 42 million UTXOs
on bitcoin mainnet at height 433,000.
Semantics:
add: Add it to the cache dictionary.
spend: Remove it if in the cache dictionary.
Otherwise it's been flushed to the DB. Each UTXO
is responsible for two entries in the DB stored using
compressed keys. Mark both for deletion in the next
flush of the in-memory UTXO cache.
A UTXO is stored in the DB in 2 "tables":
1. The output value and tx number. Must be keyed with a
hash168 prefix so the unspent outputs and balance of an
arbitrary address can be looked up with a simple key
traversal.
Key: b'u' + hash168 + compressed_tx_hash + tx_idx
Value: a (tx_num, value) pair
2. Given a prevout, we need to be able to look up the UTXO key
to remove it. As is keyed by hash168 and that is not part
of the prevout, we need a hash168 lookup.
Key: b'h' + compressed tx_hash + tx_idx
Value: (hash168, tx_num) pair
The compressed TX hash is just the first few bytes of the hash of
the TX the UTXO is in (and needn't be the same number of bytes in
each table). As this is not unique there will be collisions;
tx_num is stored to resolve them. The collision rate is around
0.02% for the hash168 table, and almost zero for the UTXO table
(there are around 100 collisions in the whole bitcoin blockchain).
'''
def __init__(self, parent, db, coin):
self.logger = logging.getLogger('UTXO')
self.logger.setLevel(logging.INFO)
self.parent = parent
self.coin = coin
self.cache = {}
self.db = db
self.db_cache = {}
# Statistics
self.adds = 0
self.cache_hits = 0
self.db_deletes = 0
def size_MB(self):
'''Returns the approximate size of the cache, in MB.'''
return (len(self.cache) + len(self.db_cache)) * 100 // 1048576
def add_many(self, tx_hash, tx_num, txouts):
'''Add a sequence of UTXOs to the cache, return the set of hash168s
seen.
Pass the hash of the TX it appears in, its TX number, and the
TX outputs.
'''
parse_script = ScriptPubKey.from_script
pack = struct.pack
tx_numb = pack('<I', tx_num)
hash168s = set()
self.adds += len(txouts)
for idx, txout in enumerate(txouts):
# Get the hash168. Ignore scripts we can't grok.
pk = parse_script(txout.pk_script, self.coin)
hash168 = pk.hash168
if not hash168:
continue
hash168s.add(hash168)
key = tx_hash + pack('<H', idx)
if key in self.cache:
logging.info('duplicate tx hash {}'
.format(bytes(reversed(tx_hash)).hex()))
# b''.join avoids this: https://bugs.python.org/issue13298
self.cache[key] = b''.join(
(hash168, tx_numb, pack('<Q', txout.value)))
return hash168s
def spend(self, prevout):
'''Spend a UTXO and return the address spent.
If the UTXO is not in the cache it must be on disk.
'''
# Fast track is it's in the cache
pack = struct.pack
key = b''.join((prevout.hash, pack('<H', prevout.n)))
value = self.cache.pop(key, None)
if value:
self.cache_hits += 1
return value[:21]
# Oh well. Find and remove it from the DB.
hash168 = self.hash168(prevout.hash, prevout.n)
if not hash168:
return None
self.db_deletes += 1
# Read the UTXO through the cache from the disk. We have to
# go through the cache because compressed keys can collide.
key = (b'u' + hash168 + prevout.hash[:UTXO_TX_HASH_LEN]
+ pack('<H', prevout.n))
data = self.cache_get(key)
if data is None:
# Uh-oh, this should not happen...
self.logger.error('found no UTXO for {} / {:d} key {}'
.format(bytes(reversed(prevout.hash)).hex(),
prevout.n, bytes(key).hex()))
return hash168
if len(data) == 12:
(tx_num, ) = struct.unpack('<I', data[:4])
self.cache_delete(key)
return hash168
# Resolve the compressed key collison. These should be
# extremely rare.
assert len(data) % 12 == 0
for n in range(0, len(data), 12):
(tx_num, ) = struct.unpack('<I', data[n:n+4])
tx_hash, height = self.parent.get_tx_hash(tx_num)
if prevout.hash == tx_hash:
data = data[:n] + data[n + 12:]
self.cache_write(key, data)
return hash168
raise Exception('could not resolve UTXO key collision')
def hash168(self, tx_hash, idx):
'''Return the hash168 paid to by the given TXO.
Refers to the database. Returns None if not found (which is
indicates a non-standard script).
'''
key = b'h' + tx_hash[:ADDR_TX_HASH_LEN] + struct.pack('<H', idx)
data = self.cache_get(key)
if data is None:
# Assuming the DB is not corrupt, this indicates a
# successful spend of a non-standard script
# self.logger.info('ignoring spend of non-standard UTXO {} / {:d}'
# .format(bytes(reversed(tx_hash)).hex(), idx)))
return None
if len(data) == 25:
self.cache_delete(key)
return data[:21]
assert len(data) % 25 == 0
# Resolve the compressed key collision using the TX number
for n in range(0, len(data), 25):
(tx_num, ) = struct.unpack('<I', data[n+21:n+25])
my_hash, height = self.parent.get_tx_hash(tx_num)
if my_hash == tx_hash:
self.cache_write(key, data[:n] + data[n+25:])
return data[n:n+21]
raise Exception('could not resolve hash168 collision')
def cache_write(self, key, value):
'''Cache write of a (key, value) pair to the DB.'''
assert(bool(value))
self.db_cache[key] = value
def cache_delete(self, key):
'''Cache deletion of a key from the DB.'''
self.db_cache[key] = None
def cache_get(self, key):
'''Fetch a value from the DB through our write cache.'''
value = self.db_cache.get(key)
if value:
return value
return self.db.get(key)
def flush(self, batch):
'''Flush the cached DB writes and UTXO set to the batch.'''
# Care is needed because the writes generated by flushing the
# UTXO state may have keys in common with our write cache or
# may be in the DB already.
hcolls = ucolls = 0
new_utxos = len(self.cache)
for cache_key, cache_value in self.cache.items():
# Frist write to the hash168 lookup table
key = b'h' + cache_key[:ADDR_TX_HASH_LEN] + cache_key[-2:]
value = cache_value[:25]
prior_value = self.cache_get(key)
if prior_value: # Should rarely happen
hcolls += 1
value += prior_value
self.cache_write(key, value)
# Next write the UTXO table
key = (b'u' + cache_value[:21] + cache_key[:UTXO_TX_HASH_LEN]
+ cache_key[-2:])
value = cache_value[-12:]
prior_value = self.cache_get(key)
if prior_value: # Should almost never happen
ucolls += 1
value += prior_value
self.cache_write(key, value)
# GC-ing this now can only help the levelDB write.
self.cache = {}
# Now we can update to the batch.
for key, value in self.db_cache.items():
if value:
batch.put(key, value)
else:
batch.delete(key)
self.db_cache = {}
self.logger.info('UTXO cache adds: {:,d} spends: {:,d} '
.format(self.adds, self.cache_hits))
self.logger.info('UTXO DB adds: {:,d} spends: {:,d}. '
'Collisions: hash168: {:,d} UTXO: {:,d}'
.format(new_utxos, self.db_deletes,
hcolls, ucolls))
self.adds = self.cache_hits = self.db_deletes = 0
class DB(object):
@ -49,9 +293,9 @@ class DB(object):
self.logger.setLevel(logging.INFO)
self.coin = env.coin
self.flush_size = env.flush_size
self.logger.info('using flush size of {:,d} entries'
.format(self.flush_size))
self.flush_MB = env.flush_MB
self.logger.info('flushing after cache reaches {:,d} MB'
.format(self.flush_MB))
self.tx_counts = array.array('I')
self.tx_hash_file_size = 4*1024*1024
@ -59,23 +303,7 @@ class DB(object):
self.headers = []
self.tx_hashes = []
self.history = defaultdict(list)
self.writes_avoided = 0
self.read_cache_hits = 0
self.write_cache_hits = 0
self.hcolls = 0
# Things put in a batch are not visible until the batch is written,
# so use a cache.
# Semantics: a key/value pair in this dictionary represents the
# in-memory state of the DB. Anything in this dictionary will be
# written at the next flush.
self.write_cache = {}
# Read cache: a key/value pair in this dictionary represents
# something read from the DB; it is on-disk as of the prior
# flush. If a key is in write_cache that value is more
# recent. Any key in write_cache and not in read_cache has
# never hit the disk.
self.read_cache = {}
self.history_size = 0
db_name = '{}-{}'.format(self.coin.NAME, self.coin.NET)
try:
@ -92,6 +320,8 @@ class DB(object):
self.txcount_file = self.open_file('txcount')
self.read_db()
self.utxo_cache = UTXOCache(self, self.db, self.coin)
# Note that DB_HEIGHT is the height of the next block to be written.
# So an empty DB has a DB_HEIGHT of 0 not -1.
self.tx_count = self.db_tx_count
@ -104,9 +334,7 @@ class DB(object):
def open_db(self, db_name, create):
return plyvel.DB(db_name, create_if_missing=create,
error_if_exists=create,
compression=None)
# lru_cache_size=256*1024*1024)
error_if_exists=create, compression=None)
def init_db(self):
self.db_height = 0
@ -114,18 +342,19 @@ class DB(object):
self.flush_count = 0
self.wall_time = 0
self.tip = self.coin.GENESIS_HASH
self.put(self.GENESIS_KEY, unhexlify(self.tip))
self.db.put(self.GENESIS_KEY, unhexlify(self.tip))
def read_db(self):
genesis_hash = hexlify(self.get(self.GENESIS_KEY))
db = self.db
genesis_hash = hexlify(db.get(self.GENESIS_KEY))
if genesis_hash != self.coin.GENESIS_HASH:
raise self.Error('DB genesis hash {} does not match coin {}'
.format(genesis_hash, self.coin.GENESIS_HASH))
self.db_height = from_4_bytes(self.get(self.HEIGHT_KEY))
self.db_tx_count = from_4_bytes(self.get(self.TX_COUNT_KEY))
self.flush_count = from_4_bytes(self.get(self.FLUSH_COUNT_KEY))
self.wall_time = from_4_bytes(self.get(self.WALL_TIME_KEY))
self.tip = hexlify(self.get(self.TIP_KEY))
self.db_height = from_4_bytes(db.get(self.HEIGHT_KEY))
self.db_tx_count = from_4_bytes(db.get(self.TX_COUNT_KEY))
self.flush_count = from_4_bytes(db.get(self.FLUSH_COUNT_KEY))
self.wall_time = from_4_bytes(db.get(self.WALL_TIME_KEY))
self.tip = hexlify(db.get(self.TIP_KEY))
self.logger.info('{}/{} height: {:,d} tx count: {:,d} '
'flush count: {:,d} sync time: {}'
.format(self.coin.NAME, self.coin.NET,
@ -138,33 +367,6 @@ class DB(object):
wall_time // 86400, (wall_time % 86400) // 3600,
(wall_time % 3600) // 60, wall_time % 60)
def get(self, key):
# Get a key from write_cache, then read_cache, then the DB
value = self.write_cache.get(key)
if not value:
value = self.read_cache.get(key)
if not value:
value = self.db.get(key)
self.read_cache[key] = value
else:
self.read_cache_hits += 1
else:
self.write_cache_hits += 1
return value
def put(self, key, value):
assert(bool(value))
self.write_cache[key] = value
def delete(self, key):
# Deleting an on-disk key requires a later physical delete
# If it's not on-disk we can just drop it entirely
if self.read_cache.get(key) is None:
self.writes_avoided += 1
self.write_cache.pop(key, None)
else:
self.write_cache[key] = None
def flush(self):
'''Flush out all cached state.'''
flush_start = time.time()
@ -181,9 +383,8 @@ class DB(object):
# time.
self.flush_to_fs()
with self.db.write_batch(transaction=True) as batch:
self.flush_cache(batch)
self.utxo_cache.flush(batch)
self.flush_history(batch)
self.logger.info('flushed history...')
self.flush_state(batch)
self.logger.info('committing transaction...')
@ -204,13 +405,6 @@ class DB(object):
.format(txs_per_sec, this_txs_per_sec,
self.formatted_wall_time()))
# Note this preserves semantics and hopefully saves time
self.read_cache = self.write_cache
self.write_cache = {}
self.writes_avoided = 0
self.read_cache_hits = 0
self.write_cache_hits = 0
def flush_to_fs(self):
'''Flush the things stored on the filesystem.'''
self.write_headers()
@ -219,6 +413,7 @@ class DB(object):
os.sync()
def update_wall_time(self, dest):
'''Put the wall time to dest - a DB or batch.'''
now = time.time()
self.wall_time += now - self.last_flush
self.last_flush = now
@ -234,23 +429,6 @@ class DB(object):
self.update_wall_time(batch)
self.flush_count += 1
def flush_cache(self, batch):
'''Flushes the UTXO write cache.'''
deletes = writes = 0
for n, (key, value) in enumerate(self.write_cache.items()):
if value is None:
batch.delete(key)
deletes += 1
else:
batch.put(key, value)
writes += 1
self.logger.info('flushed UTXO cache. Hits: {:,d}/{:,d} '
'writes: {:,d} deletes: {:,d} elided: {:,d}'
.format(self.write_cache_hits,
self.read_cache_hits, writes, deletes,
self.writes_avoided))
def flush_history(self, batch):
# Drop any None entry
self.history.pop(None, None)
@ -260,99 +438,12 @@ class DB(object):
key = b'H' + hash168 + flush_id
batch.put(key, array.array('I', hist).tobytes())
self.history = defaultdict(list)
def get_hash168(self, tx_hash, idx, delete=True):
key = b'h' + tx_hash[:ADDR_TX_HASH_LEN] + struct.pack('<H', idx)
data = self.get(key)
if data is None:
return None
if len(data) == 25:
if delete:
self.delete(key)
return data[:21]
assert len(data) % 25 == 0
self.hcolls += 1
if self.hcolls % 1000 == 0:
self.logger.info('{} total hash168 compressed key collisions'
.format(self.hcolls))
for n in range(0, len(data), 25):
(tx_num, ) = struct.unpack('<I', data[n+21 : n+25])
my_hash, height = self.get_tx_hash(tx_num)
if my_hash == tx_hash:
if delete:
self.put(key, data[:n] + data[n+25:])
return data[n : n+21]
raise Exception('could not resolve hash168 collision')
self.logger.info('flushed {:,d} history entries ({:,d} MB)...'
.format(self.history_size,
self.history_size * 4 // 1048576))
def spend_utxo(self, prevout):
hash168 = self.get_hash168(prevout.hash, prevout.n)
if hash168 is None:
# This indicates a successful spend of a non-standard script
# self.logger.info('ignoring spend of non-standard UTXO {}/{:d} '
# 'at height {:d}'
# .format(bytes(reversed(prevout.hash)).hex(),
# prevout.n, self.height))
return None
key = (b'u' + hash168 + prevout.hash[:UTXO_TX_HASH_LEN]
+ struct.pack('<H', prevout.n))
data = self.get(key)
if data is None:
# Uh-oh, this should not happen. It may be recoverable...
self.logger.error('found no UTXO for {} / {:d} key {}'
.format(bytes(reversed(prevout.hash)).hex(),
prevout.n, bytes(key).hex()))
return hash168
if len(data) == 12:
(tx_num, ) = struct.unpack('<I', data[:4])
self.delete(key)
else:
# This should almost never happen
assert len(data) % (4 + 8) == 0
for n in range(0, len(data), 12):
(tx_num, ) = struct.unpack('<I', data[n:n+4])
tx_hash, height = self.get_tx_hash(tx_num)
if prevout.hash == tx_hash:
break
else:
raise Exception('could not resolve UTXO key collision')
data = data[:n] + data[n + 12:]
self.put(key, data)
return hash168
def put_utxo(self, tx_hash, idx, txout):
pk = ScriptPubKey.from_script(txout.pk_script, self.coin)
if not pk.hash168:
return None
pack = struct.pack
idxb = pack('<H', idx)
txcb = pack('<I', self.tx_count)
# First write the hash168 lookup
key = b'h' + tx_hash[:ADDR_TX_HASH_LEN] + idxb
# b''.join avoids this: https://bugs.python.org/issue13298
value = b''.join((pk.hash168, txcb))
prior_value = self.get(key)
if prior_value: # Should almost never happen
value += prior_value
self.put(key, value)
# Next write the UTXO
key = b'u' + pk.hash168 + tx_hash[:UTXO_TX_HASH_LEN] + idxb
value = txcb + pack('<Q', txout.value)
prior_value = self.get(key)
if prior_value: # Should almost never happen
value += prior_value
self.put(key, value)
return pk.hash168
self.history = defaultdict(list)
self.history_size = 0
def open_file(self, filename, truncate=False, create=False):
try:
@ -416,20 +507,22 @@ class DB(object):
self.process_tx(tx_hash, tx)
# Flush if we're getting full
if len(self.write_cache) + len(self.history) > self.flush_size:
if self.utxo_cache.size_MB() + hist_MB > self.flush_MB:
self.flush()
def process_tx(self, tx_hash, tx):
hash168s = set()
cache = self.utxo_cache
tx_num = self.tx_count
# Add the outputs as new UTXOs; spend the inputs
hash168s = cache.add_many(tx_hash, tx_num, tx.outputs)
if not tx.is_coinbase:
for txin in tx.inputs:
hash168s.add(self.spend_utxo(txin.prevout))
for idx, txout in enumerate(tx.outputs):
hash168s.add(self.put_utxo(tx_hash, idx, txout))
hash168s.add(cache.spend(txin.prevout))
for hash168 in hash168s:
self.history[hash168].append(self.tx_count)
self.history[hash168].append(tx_num)
self.history_size += len(hash168s)
self.tx_count += 1

2
server/env.py

@ -20,7 +20,7 @@ class Env(object):
network = self.default('NETWORK', 'mainnet')
self.coin = Coin.lookup_coin_class(coin_name, network)
self.db_dir = self.required('DB_DIRECTORY')
self.flush_size = self.integer('FLUSH_SIZE', 1000000)
self.flush_MB = self.integer('FLUSH_MB', 1000)
self.rpc_url = self.build_rpc_url()
def default(self, envvar, default):

Loading…
Cancel
Save