diff --git a/server/block_processor.py b/server/block_processor.py index 207a905..e53305d 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -10,7 +10,6 @@ import array import asyncio -import os from struct import pack, unpack import time from bisect import bisect_left @@ -22,7 +21,6 @@ from server.version import VERSION from lib.hash import hash_to_str from lib.util import chunks, formatted_time, LoggedClass import server.db -from server.storage import open_db class ChainError(Exception): @@ -152,7 +150,6 @@ class BlockProcessor(server.db.DB): self.utxo_MB = env.utxo_MB self.hist_MB = env.hist_MB self.next_cache_check = 0 - self.reorg_limit = env.reorg_limit # Headers and tx_hashes have one entry per block self.history = defaultdict(partial(array.array, 'I')) @@ -171,14 +168,11 @@ class BlockProcessor(server.db.DB): self.db_deletes = [] # Log state - self.logger.info('reorg limit is {:,d} blocks' - .format(self.reorg_limit)) if self.first_sync: self.logger.info('flushing UTXO cache at {:,d} MB' .format(self.utxo_MB)) self.logger.info('flushing history cache at {:,d} MB' .format(self.hist_MB)) - self.clean_db() async def main_loop(self): '''Main loop for block processing.''' @@ -295,52 +289,6 @@ class BlockProcessor(server.db.DB): return self.fs_block_hashes(start, count) - def clean_db(self): - '''Clean out stale DB items. - - Stale DB items are excess history flushed since the most - recent UTXO flush (only happens on unclean shutdown), and aged - undo information. - ''' - if self.flush_count < self.utxo_flush_count: - raise ChainError('DB corrupt: flush_count < utxo_flush_count') - with self.db.write_batch() as batch: - if self.flush_count > self.utxo_flush_count: - self.logger.info('DB shut down uncleanly. Scanning for ' - 'excess history flushes...') - self.remove_excess_history(batch) - self.utxo_flush_count = self.flush_count - self.remove_stale_undo_items(batch) - self.flush_state(batch) - - def remove_excess_history(self, batch): - prefix = b'H' - keys = [] - for key, hist in self.db.iterator(prefix=prefix): - flush_id, = unpack('>H', key[-2:]) - if flush_id > self.utxo_flush_count: - keys.append(key) - - self.logger.info('deleting {:,d} history entries' - .format(len(keys))) - for key in keys: - batch.delete(key) - - def remove_stale_undo_items(self, batch): - prefix = b'U' - cutoff = self.db_height - self.reorg_limit - keys = [] - for key, hist in self.db.iterator(prefix=prefix): - height, = unpack('>I', key[-4:]) - if height > cutoff: - break - keys.append(key) - - self.logger.info('deleting {:,d} stale undo entries' - .format(len(keys))) - for key in keys: - batch.delete(key) - def flush_state(self, batch): '''Flush chain state to the batch.''' now = time.time() @@ -537,7 +485,7 @@ class BlockProcessor(server.db.DB): self.tip = self.coin.header_hash(header) self.height += 1 undo_info = self.advance_txs(tx_hashes, txs, touched) - if self.daemon.cached_height() - self.height <= self.reorg_limit: + if self.daemon.cached_height() - self.height <= self.env.reorg_limit: self.write_undo_info(self.height, b''.join(undo_info)) def advance_txs(self, tx_hashes, txs, touched): diff --git a/server/db.py b/server/db.py index 376cb0f..bdf45ee 100644 --- a/server/db.py +++ b/server/db.py @@ -47,6 +47,8 @@ class DB(LoggedClass): self.logger.info('switching current directory to {}' .format(env.db_dir)) os.chdir(env.db_dir) + self.logger.info('reorg limit is {:,d} blocks' + .format(self.env.reorg_limit)) # Open DB and metadata files. Record some of its state. db_name = '{}-{}'.format(self.coin.NAME, self.coin.NET) @@ -73,6 +75,7 @@ class DB(LoggedClass): assert self.db_tx_count == self.tx_counts[-1] else: assert self.db_tx_count == 0 + self.clean_db() def read_state(self): if self.db.is_new: @@ -117,6 +120,8 @@ class DB(LoggedClass): if self.first_sync: self.logger.info('sync time so far: {}' .format(formatted_time(self.wall_time))) + if self.flush_count < self.utxo_flush_count: + raise self.DBError('DB corrupt: flush_count < utxo_flush_count') def write_state(self, batch): '''Write chain state to the batch.''' @@ -133,6 +138,56 @@ class DB(LoggedClass): } batch.put(b'state', repr(state).encode()) + def clean_db(self): + '''Clean out stale DB items. + + Stale DB items are excess history flushed since the most + recent UTXO flush (only happens on unclean shutdown), and aged + undo information. + ''' + if self.flush_count > self.utxo_flush_count: + self.utxo_flush_count = self.flush_count + self.logger.info('DB shut down uncleanly. Scanning for ' + 'excess history flushes...') + history_keys = self.excess_history_keys() + self.logger.info('deleting {:,d} history entries' + .format(len(history_keys))) + else: + history_keys = [] + + undo_keys = self.stale_undo_keys() + if undo_keys: + self.logger.info('deleting {:,d} stale undo entries' + .format(len(undo_keys))) + + with self.db.write_batch() as batch: + batch_delete = batch.delete + for key in history_keys: + batch_delete(key) + for key in undo_keys: + batch_delete(key) + self.write_state(batch) + + def excess_history_keys(self): + prefix = b'H' + keys = [] + for key, hist in self.db.iterator(prefix=prefix): + flush_id, = unpack('>H', key[-2:]) + if flush_id > self.utxo_flush_count: + keys.append(key) + return keys + + def stale_undo_keys(self): + prefix = b'U' + cutoff = self.db_height - self.env.reorg_limit + keys = [] + for key, hist in self.db.iterator(prefix=prefix): + height, = unpack('>I', key[-4:]) + if height > cutoff: + break + keys.append(key) + return keys + def open_file(self, filename, create=False): '''Open the file name. Return its handle.''' try: