diff --git a/compact_history.py b/compact_history.py index 9186c64..0a4574b 100755 --- a/compact_history.py +++ b/compact_history.py @@ -48,7 +48,7 @@ async def compact_history(): environ['DAEMON_URL'] = '' # Avoid Env erroring out env = Env() db = DB(env) - await db.open_for_sync() + await db.open_for_compacting() assert not db.first_sync history = db.history diff --git a/contrib/query.py b/contrib/query.py index 60f1696..56dbda8 100755 --- a/contrib/query.py +++ b/contrib/query.py @@ -62,7 +62,7 @@ async def query(args): db = DB(env) coin = env.coin - await db._open_dbs(False) + await db.open_for_serving() if not args.scripts: await print_stats(db.hist_db, db.utxo_db) diff --git a/electrumx/server/block_processor.py b/electrumx/server/block_processor.py index b157a33..b30a0a4 100644 --- a/electrumx/server/block_processor.py +++ b/electrumx/server/block_processor.py @@ -159,7 +159,6 @@ class BlockProcessor(electrumx.server.db.DB): self.prefetcher = Prefetcher(daemon, env.coin, self.blocks_event) # Meta - self.cache_MB = env.cache_MB self.next_cache_check = 0 self.last_flush = time.time() self.touched = set() @@ -448,8 +447,9 @@ class BlockProcessor(electrumx.server.db.DB): # Flush history if it takes up over 20% of cache memory. # Flush UTXOs once they take up 80% of cache memory. - if utxo_MB + hist_MB >= self.cache_MB or hist_MB >= self.cache_MB // 5: - return utxo_MB >= self.cache_MB * 4 // 5 + cache_MB = self.env.cache_MB + if utxo_MB + hist_MB >= cache_MB or hist_MB >= cache_MB // 5: + return utxo_MB >= cache_MB * 4 // 5 return None def advance_blocks(self, blocks): @@ -755,18 +755,10 @@ class BlockProcessor(electrumx.server.db.DB): async def _first_open_dbs(self): await self.open_for_sync() - # An incomplete compaction needs to be cancelled otherwise - # restarting it will corrupt the history - self.history.cancel_compaction() - # These are our state as we move ahead of DB state - self.fs_height = self.db_height - self.fs_tx_count = self.db_tx_count self.height = self.db_height self.tip = self.db_tip self.tx_count = self.db_tx_count self.last_flush_tx_count = self.tx_count - if self.utxo_db.for_sync: - self.logger.info(f'flushing DB cache at {self.cache_MB:,d} MB') # --- External API diff --git a/electrumx/server/db.py b/electrumx/server/db.py index f862af3..b705258 100644 --- a/electrumx/server/db.py +++ b/electrumx/server/db.py @@ -90,7 +90,7 @@ class DB(object): else: assert self.db_tx_count == 0 - async def _open_dbs(self, for_sync): + async def _open_dbs(self, for_sync, compacting): assert self.utxo_db is None # First UTXO DB @@ -110,12 +110,16 @@ class DB(object): # Then history DB self.utxo_flush_count = self.history.open_db(self.db_class, for_sync, - self.utxo_flush_count) + self.utxo_flush_count, + compacting) self.clear_excess_undo_info() # Read TX counts (requires meta directory) await self._read_tx_counts() + async def open_for_compacting(self): + await self._open_dbs(True, True) + async def open_for_sync(self): '''Open the databases to sync to the daemon. @@ -123,7 +127,7 @@ class DB(object): synchronization. When serving clients we want the open files for serving network connections. ''' - await self._open_dbs(True) + await self._open_dbs(True, False) async def open_for_serving(self): '''Open the databases for serving. If they are already open they are @@ -134,7 +138,7 @@ class DB(object): self.utxo_db.close() self.history.close_db() self.utxo_db = None - await self._open_dbs(False) + await self._open_dbs(False, False) # Header merkle cache @@ -395,6 +399,10 @@ class DB(object): self.wall_time = state['wall_time'] self.first_sync = state['first_sync'] + # These are our state as we move ahead of DB state + self.fs_height = self.db_height + self.fs_tx_count = self.db_tx_count + # Log some stats self.logger.info('DB version: {:d}'.format(self.db_version)) self.logger.info('coin: {}'.format(self.coin.NAME)) @@ -402,6 +410,8 @@ class DB(object): self.logger.info('height: {:,d}'.format(self.db_height)) self.logger.info('tip: {}'.format(hash_to_hex_str(self.db_tip))) self.logger.info('tx count: {:,d}'.format(self.db_tx_count)) + if self.utxo_db.for_sync: + self.logger.info(f'flushing DB cache at {self.env.cache_MB:,d} MB') if self.first_sync: self.logger.info('sync time so far: {}' .format(util.formatted_time(self.wall_time))) diff --git a/electrumx/server/history.py b/electrumx/server/history.py index b78e757..eaab4af 100644 --- a/electrumx/server/history.py +++ b/electrumx/server/history.py @@ -32,10 +32,14 @@ class History(object): self.unflushed_count = 0 self.db = None - def open_db(self, db_class, for_sync, utxo_flush_count): + def open_db(self, db_class, for_sync, utxo_flush_count, compacting): self.db = db_class('hist', for_sync) self.read_state() self.clear_excess(utxo_flush_count) + # An incomplete compaction needs to be cancelled otherwise + # restarting it will corrupt the history + if not compacting: + self._cancel_compaction() return self.flush_count def close_db(self): @@ -314,7 +318,7 @@ class History(object): 100 * cursor / 65536)) return write_size - def cancel_compaction(self): + def _cancel_compaction(self): if self.comp_cursor != -1: self.logger.warning('cancelling in-progress history compaction') self.comp_flush_count = -1