diff --git a/server/block_processor.py b/server/block_processor.py index 8954238..6252e1a 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -163,6 +163,7 @@ class BlockProcessor(server.db.DB): # Caches of unflushed items. self.headers = [] self.tx_hashes = [] + self.undo_infos = [] self.history = defaultdict(partial(array.array, 'I')) self.history_size = 0 @@ -338,6 +339,7 @@ class BlockProcessor(server.db.DB): '''Asserts state is fully flushed.''' assert self.tx_count == self.fs_tx_count == self.db_tx_count assert self.height == self.fs_height == self.db_height + assert not self.undo_infos assert not self.history assert not self.utxo_cache assert not self.db_deletes @@ -484,14 +486,16 @@ class BlockProcessor(server.db.DB): It is already verified they correctly connect onto our tip. ''' block_txs = self.coin.block_txs - daemon_height = self.daemon.cached_height() + min_height = self.min_undo_height(self.daemon.cached_height()) + height = self.height for block in blocks: - self.height += 1 - undo_info = self.advance_txs(block_txs(block, self.height)) - if daemon_height - self.height <= self.env.reorg_limit: - self.write_undo_info(self.height, b''.join(undo_info)) + height += 1 + undo_info = self.advance_txs(block_txs(block, height)) + if height >= min_height: + self.undo_infos.append((undo_info, height)) + self.height = height self.headers.extend(headers) self.tip = self.coin.header_hash(headers[-1]) @@ -724,11 +728,13 @@ class BlockProcessor(server.db.DB): delete_count = len(self.db_deletes) // 2 utxo_cache_len = len(self.utxo_cache) + # Spends batch_delete = batch.delete for key in sorted(self.db_deletes): batch_delete(key) self.db_deletes = [] + # New UTXOs batch_put = batch.put for cache_key, cache_value in self.utxo_cache.items(): # suffix = tx_idx + tx_num @@ -736,6 +742,11 @@ class BlockProcessor(server.db.DB): suffix = cache_key[-2:] + cache_value[-12:-8] batch_put(b'h' + cache_key[:4] + suffix, hashX) batch_put(b'u' + hashX + suffix, cache_value[-8:]) + self.utxo_cache = {} + + # New undo information + self.flush_undo_infos(batch_put, self.undo_infos) + self.undo_infos = [] if self.utxo_db.for_sync: self.logger.info('flushed {:,d} blocks with {:,d} txs, {:,d} UTXO ' @@ -745,7 +756,6 @@ class BlockProcessor(server.db.DB): utxo_cache_len, delete_count, time.time() - flush_start)) - self.utxo_cache = {} self.utxo_flush_count = self.flush_count self.db_tx_count = self.tx_count self.db_height = self.height diff --git a/server/db.py b/server/db.py index c124594..0f920d5 100644 --- a/server/db.py +++ b/server/db.py @@ -182,36 +182,7 @@ class DB(util.LoggedClass): raise self.DBError('DB corrupt: flush_count < utxo_flush_count') if self.flush_count > self.utxo_flush_count: self.clear_excess_history(self.utxo_flush_count) - - # Remove stale undo information - prefix = b'U' - cutoff = self.db_height - self.env.reorg_limit - keys = [] - for key, hist in self.utxo_db.iterator(prefix=prefix): - height, = unpack('>I', key[-4:]) - if height > cutoff: - break - keys.append(key) - if keys: - self.logger.info('deleting {:,d} stale undo entries' - .format(len(keys))) - - with self.utxo_db.write_batch() as batch: - for key in keys: - batch.delete(key) - self.write_state(batch) - - def undo_key(self, height): - '''DB key for undo information at the given height.''' - return b'U' + pack('>I', height) - - def write_undo_info(self, height, undo_info): - '''Write out undo information for the current height.''' - self.utxo_db.put(self.undo_key(height), undo_info) - - def read_undo_info(self, height): - '''Read undo information from a file for the current height.''' - return self.utxo_db.get(self.undo_key(height)) + self.clear_excess_undo_info() def open_file(self, filename, create=False): '''Open the file name. Return its handle.''' @@ -356,6 +327,43 @@ class DB(util.LoggedClass): value, = unpack('I', height) + + def read_undo_info(self, height): + '''Read undo information from a file for the current height.''' + return self.utxo_db.get(self.undo_key(height)) + + def flush_undo_infos(self, batch_put, undo_infos): + '''undo_infos is a list of (undo_info, height) pairs.''' + for undo_info, height in undo_infos: + batch_put(self.undo_key(height), b''.join(undo_info)) + + def clear_excess_undo_info(self): + '''Clear excess undo info. Only most recent N are kept.''' + prefix = b'U' + min_height = self.min_undo_height(self.db_height) + keys = [] + for key, hist in self.utxo_db.iterator(prefix=prefix): + height, = unpack('>I', key[-4:]) + if height >= min_height: + break + keys.append(key) + + if keys: + with self.utxo_db.write_batch() as batch: + for key in keys: + batch.delete(key) + self.logger.info('deleted {:,d} stale undo entries' + .format(len(keys))) + # -- History database def clear_excess_history(self, flush_count):