Browse Source

Write out undo info with the UTXO flushes

Cleaner and slightly more efficient.
Closes #101
master
Neil Booth 8 years ago
parent
commit
5784412393
  1. 22
      server/block_processor.py
  2. 68
      server/db.py

22
server/block_processor.py

@ -163,6 +163,7 @@ class BlockProcessor(server.db.DB):
# Caches of unflushed items. # Caches of unflushed items.
self.headers = [] self.headers = []
self.tx_hashes = [] self.tx_hashes = []
self.undo_infos = []
self.history = defaultdict(partial(array.array, 'I')) self.history = defaultdict(partial(array.array, 'I'))
self.history_size = 0 self.history_size = 0
@ -338,6 +339,7 @@ class BlockProcessor(server.db.DB):
'''Asserts state is fully flushed.''' '''Asserts state is fully flushed.'''
assert self.tx_count == self.fs_tx_count == self.db_tx_count assert self.tx_count == self.fs_tx_count == self.db_tx_count
assert self.height == self.fs_height == self.db_height assert self.height == self.fs_height == self.db_height
assert not self.undo_infos
assert not self.history assert not self.history
assert not self.utxo_cache assert not self.utxo_cache
assert not self.db_deletes assert not self.db_deletes
@ -484,14 +486,16 @@ class BlockProcessor(server.db.DB):
It is already verified they correctly connect onto our tip. It is already verified they correctly connect onto our tip.
''' '''
block_txs = self.coin.block_txs block_txs = self.coin.block_txs
daemon_height = self.daemon.cached_height() min_height = self.min_undo_height(self.daemon.cached_height())
height = self.height
for block in blocks: for block in blocks:
self.height += 1 height += 1
undo_info = self.advance_txs(block_txs(block, self.height)) undo_info = self.advance_txs(block_txs(block, height))
if daemon_height - self.height <= self.env.reorg_limit: if height >= min_height:
self.write_undo_info(self.height, b''.join(undo_info)) self.undo_infos.append((undo_info, height))
self.height = height
self.headers.extend(headers) self.headers.extend(headers)
self.tip = self.coin.header_hash(headers[-1]) self.tip = self.coin.header_hash(headers[-1])
@ -724,11 +728,13 @@ class BlockProcessor(server.db.DB):
delete_count = len(self.db_deletes) // 2 delete_count = len(self.db_deletes) // 2
utxo_cache_len = len(self.utxo_cache) utxo_cache_len = len(self.utxo_cache)
# Spends
batch_delete = batch.delete batch_delete = batch.delete
for key in sorted(self.db_deletes): for key in sorted(self.db_deletes):
batch_delete(key) batch_delete(key)
self.db_deletes = [] self.db_deletes = []
# New UTXOs
batch_put = batch.put batch_put = batch.put
for cache_key, cache_value in self.utxo_cache.items(): for cache_key, cache_value in self.utxo_cache.items():
# suffix = tx_idx + tx_num # suffix = tx_idx + tx_num
@ -736,6 +742,11 @@ class BlockProcessor(server.db.DB):
suffix = cache_key[-2:] + cache_value[-12:-8] suffix = cache_key[-2:] + cache_value[-12:-8]
batch_put(b'h' + cache_key[:4] + suffix, hashX) batch_put(b'h' + cache_key[:4] + suffix, hashX)
batch_put(b'u' + hashX + suffix, cache_value[-8:]) batch_put(b'u' + hashX + suffix, cache_value[-8:])
self.utxo_cache = {}
# New undo information
self.flush_undo_infos(batch_put, self.undo_infos)
self.undo_infos = []
if self.utxo_db.for_sync: if self.utxo_db.for_sync:
self.logger.info('flushed {:,d} blocks with {:,d} txs, {:,d} UTXO ' self.logger.info('flushed {:,d} blocks with {:,d} txs, {:,d} UTXO '
@ -745,7 +756,6 @@ class BlockProcessor(server.db.DB):
utxo_cache_len, delete_count, utxo_cache_len, delete_count,
time.time() - flush_start)) time.time() - flush_start))
self.utxo_cache = {}
self.utxo_flush_count = self.flush_count self.utxo_flush_count = self.flush_count
self.db_tx_count = self.tx_count self.db_tx_count = self.tx_count
self.db_height = self.height self.db_height = self.height

68
server/db.py

@ -182,36 +182,7 @@ class DB(util.LoggedClass):
raise self.DBError('DB corrupt: flush_count < utxo_flush_count') raise self.DBError('DB corrupt: flush_count < utxo_flush_count')
if self.flush_count > self.utxo_flush_count: if self.flush_count > self.utxo_flush_count:
self.clear_excess_history(self.utxo_flush_count) self.clear_excess_history(self.utxo_flush_count)
self.clear_excess_undo_info()
# Remove stale undo information
prefix = b'U'
cutoff = self.db_height - self.env.reorg_limit
keys = []
for key, hist in self.utxo_db.iterator(prefix=prefix):
height, = unpack('>I', key[-4:])
if height > cutoff:
break
keys.append(key)
if keys:
self.logger.info('deleting {:,d} stale undo entries'
.format(len(keys)))
with self.utxo_db.write_batch() as batch:
for key in keys:
batch.delete(key)
self.write_state(batch)
def undo_key(self, height):
'''DB key for undo information at the given height.'''
return b'U' + pack('>I', height)
def write_undo_info(self, height, undo_info):
'''Write out undo information for the current height.'''
self.utxo_db.put(self.undo_key(height), undo_info)
def read_undo_info(self, height):
'''Read undo information from a file for the current height.'''
return self.utxo_db.get(self.undo_key(height))
def open_file(self, filename, create=False): def open_file(self, filename, create=False):
'''Open the file name. Return its handle.''' '''Open the file name. Return its handle.'''
@ -356,6 +327,43 @@ class DB(util.LoggedClass):
value, = unpack('<Q', db_value) value, = unpack('<Q', db_value)
return hashX, value return hashX, value
# -- Undo information
def min_undo_height(self, max_height):
'''Returns a height from which we should store undo info.'''
return max_height - self.env.reorg_limit + 1
def undo_key(self, height):
'''DB key for undo information at the given height.'''
return b'U' + pack('>I', height)
def read_undo_info(self, height):
'''Read undo information from a file for the current height.'''
return self.utxo_db.get(self.undo_key(height))
def flush_undo_infos(self, batch_put, undo_infos):
'''undo_infos is a list of (undo_info, height) pairs.'''
for undo_info, height in undo_infos:
batch_put(self.undo_key(height), b''.join(undo_info))
def clear_excess_undo_info(self):
'''Clear excess undo info. Only most recent N are kept.'''
prefix = b'U'
min_height = self.min_undo_height(self.db_height)
keys = []
for key, hist in self.utxo_db.iterator(prefix=prefix):
height, = unpack('>I', key[-4:])
if height >= min_height:
break
keys.append(key)
if keys:
with self.utxo_db.write_batch() as batch:
for key in keys:
batch.delete(key)
self.logger.info('deleted {:,d} stale undo entries'
.format(len(keys)))
# -- History database # -- History database
def clear_excess_history(self, flush_count): def clear_excess_history(self, flush_count):

Loading…
Cancel
Save