Browse Source

Move DB UTXO code into one place.

master
Neil Booth 8 years ago
parent
commit
858bac217d
  1. 2
      server/block_processor.py
  2. 204
      server/db.py

2
server/block_processor.py

@ -337,7 +337,7 @@ class BlockProcessor(server.db.DB):
self.wall_time += now - self.last_flush self.wall_time += now - self.last_flush
self.last_flush = now self.last_flush = now
self.last_flush_tx_count = self.tx_count self.last_flush_tx_count = self.tx_count
self.utxo_write_state(batch) self.write_utxo_state(batch)
def assert_flushed(self): def assert_flushed(self):
'''Asserts state is fully flushed.''' '''Asserts state is fully flushed.'''

204
server/db.py

@ -142,54 +142,6 @@ class DB(util.LoggedClass):
self.logger.info('sync time so far: {}' self.logger.info('sync time so far: {}'
.format(util.formatted_time(self.wall_time))) .format(util.formatted_time(self.wall_time)))
def read_utxo_state(self):
state = self.utxo_db.get(b'state')
if not state:
self.db_height = -1
self.db_tx_count = 0
self.db_tip = b'\0' * 32
self.db_version = max(self.DB_VERSIONS)
self.utxo_flush_count = 0
self.wall_time = 0
self.first_sync = True
else:
state = ast.literal_eval(state.decode())
if not isinstance(state, dict):
raise self.DBError('failed reading state from DB')
self.db_version = state['db_version']
if self.db_version not in self.DB_VERSIONS:
raise self.DBError('your DB version is {} but this software '
'only handles versions {}'
.format(self.db_version, self.DB_VERSIONS))
# backwards compat
genesis_hash = state['genesis']
if isinstance(genesis_hash, bytes):
genesis_hash = genesis_hash.decode()
if genesis_hash != self.coin.GENESIS_HASH:
raise self.DBError('DB genesis hash {} does not match coin {}'
.format(state['genesis_hash'],
self.coin.GENESIS_HASH))
self.db_height = state['height']
self.db_tx_count = state['tx_count']
self.db_tip = state['tip']
self.utxo_flush_count = state['utxo_flush_count']
self.wall_time = state['wall_time']
self.first_sync = state['first_sync']
def utxo_write_state(self, batch):
'''Write (UTXO) state to the batch.'''
state = {
'genesis': self.coin.GENESIS_HASH,
'height': self.db_height,
'tx_count': self.db_tx_count,
'tip': self.db_tip,
'utxo_flush_count': self.utxo_flush_count,
'wall_time': self.wall_time,
'first_sync': self.first_sync,
'db_version': self.db_version,
}
batch.put(b'state', repr(state).encode())
def clean_db(self): def clean_db(self):
'''Clean out stale DB items. '''Clean out stale DB items.
@ -299,6 +251,93 @@ class DB(util.LoggedClass):
assert isinstance(limit, int) and limit >= 0 assert isinstance(limit, int) and limit >= 0
return limit return limit
# -- Undo information
def min_undo_height(self, max_height):
'''Returns a height from which we should store undo info.'''
return max_height - self.env.reorg_limit + 1
def undo_key(self, height):
'''DB key for undo information at the given height.'''
return b'U' + pack('>I', height)
def read_undo_info(self, height):
'''Read undo information from a file for the current height.'''
return self.utxo_db.get(self.undo_key(height))
def flush_undo_infos(self, batch_put, undo_infos):
'''undo_infos is a list of (undo_info, height) pairs.'''
for undo_info, height in undo_infos:
batch_put(self.undo_key(height), b''.join(undo_info))
def clear_excess_undo_info(self):
'''Clear excess undo info. Only most recent N are kept.'''
prefix = b'U'
min_height = self.min_undo_height(self.db_height)
keys = []
for key, hist in self.utxo_db.iterator(prefix=prefix):
height, = unpack('>I', key[-4:])
if height >= min_height:
break
keys.append(key)
if keys:
with self.utxo_db.write_batch() as batch:
for key in keys:
batch.delete(key)
self.logger.info('deleted {:,d} stale undo entries'
.format(len(keys)))
# -- UTXO database
def read_utxo_state(self):
state = self.utxo_db.get(b'state')
if not state:
self.db_height = -1
self.db_tx_count = 0
self.db_tip = b'\0' * 32
self.db_version = max(self.DB_VERSIONS)
self.utxo_flush_count = 0
self.wall_time = 0
self.first_sync = True
else:
state = ast.literal_eval(state.decode())
if not isinstance(state, dict):
raise self.DBError('failed reading state from DB')
self.db_version = state['db_version']
if self.db_version not in self.DB_VERSIONS:
raise self.DBError('your DB version is {} but this software '
'only handles versions {}'
.format(self.db_version, self.DB_VERSIONS))
# backwards compat
genesis_hash = state['genesis']
if isinstance(genesis_hash, bytes):
genesis_hash = genesis_hash.decode()
if genesis_hash != self.coin.GENESIS_HASH:
raise self.DBError('DB genesis hash {} does not match coin {}'
.format(state['genesis_hash'],
self.coin.GENESIS_HASH))
self.db_height = state['height']
self.db_tx_count = state['tx_count']
self.db_tip = state['tip']
self.utxo_flush_count = state['utxo_flush_count']
self.wall_time = state['wall_time']
self.first_sync = state['first_sync']
def write_utxo_state(self, batch):
'''Write (UTXO) state to the batch.'''
state = {
'genesis': self.coin.GENESIS_HASH,
'height': self.db_height,
'tx_count': self.db_tx_count,
'tip': self.db_tip,
'utxo_flush_count': self.utxo_flush_count,
'wall_time': self.wall_time,
'first_sync': self.first_sync,
'db_version': self.db_version,
}
batch.put(b'state', repr(state).encode())
def get_balance(self, hashX): def get_balance(self, hashX):
'''Returns the confirmed balance of an address.''' '''Returns the confirmed balance of an address.'''
return sum(utxo.value for utxo in self.get_utxos(hashX, limit=None)) return sum(utxo.value for utxo in self.get_utxos(hashX, limit=None))
@ -322,24 +361,6 @@ class DB(util.LoggedClass):
tx_hash, height = self.fs_tx_hash(tx_num) tx_hash, height = self.fs_tx_hash(tx_num)
yield UTXO(tx_num, tx_pos, tx_hash, height, value) yield UTXO(tx_num, tx_pos, tx_hash, height, value)
def db_hashX(self, tx_hash, idx_packed):
'''Return (hashX, tx_num_packed) for the given TXO.
Both are None if not found.'''
# Key: b'h' + compressed_tx_hash + tx_idx + tx_num
# Value: hashX
prefix = b'h' + tx_hash[:4] + idx_packed
# Find which entry, if any, the TX_HASH matches.
for db_key, hashX in self.utxo_db.iterator(prefix=prefix):
tx_num_packed = db_key[-4:]
tx_num, = unpack('<I', tx_num_packed)
hash, height = self.fs_tx_hash(tx_num)
if hash == tx_hash:
return hashX, tx_num_packed
return None, None
def db_utxo_lookup(self, tx_hash, tx_idx): def db_utxo_lookup(self, tx_hash, tx_idx):
'''Given a prevout return a (hashX, value) pair. '''Given a prevout return a (hashX, value) pair.
@ -347,7 +368,7 @@ class DB(util.LoggedClass):
mempool code. mempool code.
''' '''
idx_packed = pack('<H', tx_idx) idx_packed = pack('<H', tx_idx)
hashX, tx_num_packed = self.db_hashX(tx_hash, idx_packed) hashX, tx_num_packed = self._db_hashX(tx_hash, idx_packed)
if not hashX: if not hashX:
# This can happen when the daemon is a block ahead of us # This can happen when the daemon is a block ahead of us
# and has mempool txs spending outputs from that new block # and has mempool txs spending outputs from that new block
@ -363,42 +384,23 @@ class DB(util.LoggedClass):
value, = unpack('<Q', db_value) value, = unpack('<Q', db_value)
return hashX, value return hashX, value
# -- Undo information def _db_hashX(self, tx_hash, idx_packed):
'''Return (hashX, tx_num_packed) for the given TXO.
def min_undo_height(self, max_height):
'''Returns a height from which we should store undo info.'''
return max_height - self.env.reorg_limit + 1
def undo_key(self, height):
'''DB key for undo information at the given height.'''
return b'U' + pack('>I', height)
def read_undo_info(self, height):
'''Read undo information from a file for the current height.'''
return self.utxo_db.get(self.undo_key(height))
def flush_undo_infos(self, batch_put, undo_infos): Both are None if not found.'''
'''undo_infos is a list of (undo_info, height) pairs.''' # Key: b'h' + compressed_tx_hash + tx_idx + tx_num
for undo_info, height in undo_infos: # Value: hashX
batch_put(self.undo_key(height), b''.join(undo_info)) prefix = b'h' + tx_hash[:4] + idx_packed
def clear_excess_undo_info(self): # Find which entry, if any, the TX_HASH matches.
'''Clear excess undo info. Only most recent N are kept.''' for db_key, hashX in self.utxo_db.iterator(prefix=prefix):
prefix = b'U' tx_num_packed = db_key[-4:]
min_height = self.min_undo_height(self.db_height) tx_num, = unpack('<I', tx_num_packed)
keys = [] hash, height = self.fs_tx_hash(tx_num)
for key, hist in self.utxo_db.iterator(prefix=prefix): if hash == tx_hash:
height, = unpack('>I', key[-4:]) return hashX, tx_num_packed
if height >= min_height:
break
keys.append(key)
if keys: return None, None
with self.utxo_db.write_batch() as batch:
for key in keys:
batch.delete(key)
self.logger.info('deleted {:,d} stale undo entries'
.format(len(keys)))
# -- History database # -- History database
@ -553,7 +555,7 @@ class DB(util.LoggedClass):
if cursor == 65536: if cursor == 65536:
self.utxo_flush_count = self.flush_count self.utxo_flush_count = self.flush_count
with self.utxo_db.write_batch() as batch: with self.utxo_db.write_batch() as batch:
self.utxo_write_state(batch) self.write_utxo_state(batch)
def _compact_hashX(self, hashX, hist_map, hist_list, def _compact_hashX(self, hashX, hist_map, hist_list,
write_items, keys_to_delete): write_items, keys_to_delete):

Loading…
Cancel
Save