diff --git a/compact_history.py b/compact_history.py index 9186c64..0a4574b 100755 --- a/compact_history.py +++ b/compact_history.py @@ -48,7 +48,7 @@ async def compact_history(): environ['DAEMON_URL'] = '' # Avoid Env erroring out env = Env() db = DB(env) - await db.open_for_sync() + await db.open_for_compacting() assert not db.first_sync history = db.history diff --git a/contrib/query.py b/contrib/query.py index 60f1696..56dbda8 100755 --- a/contrib/query.py +++ b/contrib/query.py @@ -62,7 +62,7 @@ async def query(args): db = DB(env) coin = env.coin - await db._open_dbs(False) + await db.open_for_serving() if not args.scripts: await print_stats(db.hist_db, db.utxo_db) diff --git a/electrumx/lib/server_base.py b/electrumx/lib/server_base.py index 06557b5..db7a213 100644 --- a/electrumx/lib/server_base.py +++ b/electrumx/lib/server_base.py @@ -100,6 +100,9 @@ class ServerBase(object): self.logger.info('shutting down') server_task.cancel() + # Prevent some silly logs + await asyncio.sleep(0.01) + self.logger.info('shutdown complete') def run(self): diff --git a/electrumx/server/block_processor.py b/electrumx/server/block_processor.py index 97f93a0..c9bde57 100644 --- a/electrumx/server/block_processor.py +++ b/electrumx/server/block_processor.py @@ -20,8 +20,8 @@ from aiorpcx import TaskGroup, run_in_thread import electrumx from electrumx.server.daemon import DaemonError from electrumx.lib.hash import hash_to_hex_str, HASHX_LEN -from electrumx.lib.util import chunks, formatted_time, class_logger -import electrumx.server.db +from electrumx.lib.util import chunks, class_logger +from electrumx.server.db import FlushData class Prefetcher(object): @@ -142,26 +142,26 @@ class ChainError(Exception): '''Raised on error processing blocks.''' -class BlockProcessor(electrumx.server.db.DB): +class BlockProcessor(object): '''Process blocks and update the DB state to match. Employ a prefetcher to prefetch blocks in batches for processing. Coordinate backing up in case of chain reorganisations. ''' - def __init__(self, env, daemon, notifications): - super().__init__(env) - + def __init__(self, env, db, daemon, notifications): + self.env = env + self.db = db self.daemon = daemon self.notifications = notifications + self.coin = env.coin self.blocks_event = asyncio.Event() self.prefetcher = Prefetcher(daemon, env.coin, self.blocks_event) + self.logger = class_logger(__name__, self.__class__.__name__) # Meta - self.cache_MB = env.cache_MB self.next_cache_check = 0 - self.last_flush = time.time() self.touched = set() self.reorg_count = 0 @@ -189,17 +189,6 @@ class BlockProcessor(electrumx.server.db.DB): return await run_in_thread(func, *args) return await asyncio.shield(run_in_thread_locked()) - async def _maybe_flush(self): - # If caught up, flush everything as client queries are - # performed on the DB. - if self._caught_up_event.is_set(): - await self.flush(True) - elif time.time() > self.next_cache_check: - flush_arg = self.check_cache_size() - if flush_arg is not None: - await self.flush(flush_arg) - self.next_cache_check = time.time() + 30 - async def check_and_advance_blocks(self, raw_blocks): '''Process the list of raw blocks passed. Detects and handles reorgs. @@ -217,7 +206,7 @@ class BlockProcessor(electrumx.server.db.DB): start = time.time() await self.run_in_thread_with_lock(self.advance_blocks, blocks) await self._maybe_flush() - if not self.first_sync: + if not self.db.first_sync: s = '' if len(blocks) == 1 else 's' self.logger.info('processed {:,d} block{} in {:.1f}s' .format(len(blocks), s, @@ -257,13 +246,19 @@ class BlockProcessor(electrumx.server.db.DB): except Exception: return await self.daemon.raw_blocks(hex_hashes) + def flush_backup(): + # self.touched can include other addresses which is + # harmless, but remove None. + self.touched.discard(None) + self.db.flush_backup(self.flush_data(), self.touched) + start, last, hashes = await self.reorg_hashes(count) # Reverse and convert to hex strings. hashes = [hash_to_hex_str(hash) for hash in reversed(hashes)] for hex_hashes in chunks(hashes, 50): raw_blocks = await get_raw_blocks(last, hex_hashes) await self.run_in_thread_with_lock(self.backup_blocks, raw_blocks) - await self.backup_flush() + await self.run_in_thread_with_lock(flush_backup) last -= len(raw_blocks) await self.prefetcher.reset_height(self.height) @@ -280,7 +275,7 @@ class BlockProcessor(electrumx.server.db.DB): self.logger.info(f'chain was reorganised replacing {count:,d} ' f'block{s} at heights {start:,d}-{last:,d}') - return start, last, await self.fs_block_hashes(start, count) + return start, last, await self.db.fs_block_hashes(start, count) async def calc_reorg_range(self, count): '''Calculate the reorg range''' @@ -298,7 +293,7 @@ class BlockProcessor(electrumx.server.db.DB): start = self.height - 1 count = 1 while start > 0: - hashes = await self.fs_block_hashes(start, count) + hashes = await self.db.fs_block_hashes(start, count) hex_hashes = [hash_to_hex_str(hash) for hash in hashes] d_hex_hashes = await self.daemon.block_hex_hashes(start, count) n = diff_pos(hex_hashes, d_hex_hashes) @@ -314,135 +309,40 @@ class BlockProcessor(electrumx.server.db.DB): return start, count - def flush_state(self, batch): - '''Flush chain state to the batch.''' - now = time.time() - self.wall_time += now - self.last_flush - self.last_flush = now - self.last_flush_tx_count = self.tx_count - self.write_utxo_state(batch) - - def assert_flushed(self): - '''Asserts state is fully flushed.''' - assert self.tx_count == self.fs_tx_count == self.db_tx_count - assert self.height == self.fs_height == self.db_height - assert not self.undo_infos - assert not self.utxo_cache - assert not self.db_deletes - self.history.assert_flushed() + def estimate_txs_remaining(self): + # Try to estimate how many txs there are to go + daemon_height = self.daemon.cached_height() + coin = self.coin + tail_count = daemon_height - max(self.height, coin.TX_COUNT_HEIGHT) + # Damp the initial enthusiasm + realism = max(2.0 - 0.9 * self.height / coin.TX_COUNT_HEIGHT, 1.0) + return (tail_count * coin.TX_PER_BLOCK + + max(coin.TX_COUNT - self.tx_count, 0)) * realism + + # - Flushing + def flush_data(self): + '''The data for a flush. The lock must be taken.''' + assert self.state_lock.locked() + return FlushData(self.height, self.tx_count, self.headers, + self.tx_hashes, self.undo_infos, self.utxo_cache, + self.db_deletes, self.tip) async def flush(self, flush_utxos): - if self.height == self.db_height: - self.assert_flushed() - else: - await self.run_in_thread_with_lock(self._flush_body, flush_utxos) - - def _flush_body(self, flush_utxos): - '''Flush out cached state. - - History is always flushed. UTXOs are flushed if flush_utxos.''' - flush_start = time.time() - last_flush = self.last_flush - tx_diff = self.tx_count - self.last_flush_tx_count - - # Flush to file system - self.fs_flush() - fs_end = time.time() - if self.utxo_db.for_sync: - self.logger.info('flushed to FS in {:.1f}s' - .format(fs_end - flush_start)) - - # History next - it's fast and frees memory - hashX_count = self.history.flush() - if self.utxo_db.for_sync: - self.logger.info('flushed history in {:.1f}s for {:,d} addrs' - .format(time.time() - fs_end, hashX_count)) - - # Flush state last as it reads the wall time. - with self.utxo_db.write_batch() as batch: - if flush_utxos: - self.flush_utxos(batch) - self.flush_state(batch) - - # Update and put the wall time again - otherwise we drop the - # time it took to commit the batch - self.flush_state(self.utxo_db) - - self.logger.info('flush #{:,d} took {:.1f}s. Height {:,d} txs: {:,d}' - .format(self.history.flush_count, - self.last_flush - flush_start, - self.height, self.tx_count)) - - # Catch-up stats - if self.utxo_db.for_sync: - tx_per_sec = int(self.tx_count / self.wall_time) - this_tx_per_sec = 1 + int(tx_diff / (self.last_flush - last_flush)) - self.logger.info('tx/sec since genesis: {:,d}, ' - 'since last flush: {:,d}' - .format(tx_per_sec, this_tx_per_sec)) - - daemon_height = self.daemon.cached_height() - if self.height > self.coin.TX_COUNT_HEIGHT: - tx_est = (daemon_height - self.height) * self.coin.TX_PER_BLOCK - else: - tx_est = ((daemon_height - self.coin.TX_COUNT_HEIGHT) - * self.coin.TX_PER_BLOCK - + (self.coin.TX_COUNT - self.tx_count)) - - # Damp the enthusiasm - realism = 2.0 - 0.9 * self.height / self.coin.TX_COUNT_HEIGHT - tx_est *= max(realism, 1.0) - - self.logger.info('sync time: {} ETA: {}' - .format(formatted_time(self.wall_time), - formatted_time(tx_est / this_tx_per_sec))) - - def fs_flush(self): - '''Flush the things stored on the filesystem.''' - assert self.fs_height + len(self.headers) == self.height - assert self.tx_count == self.tx_counts[-1] if self.tx_counts else 0 - - self.fs_update(self.fs_height, self.headers, self.tx_hashes) - self.fs_height = self.height - self.fs_tx_count = self.tx_count - self.tx_hashes = [] - self.headers = [] - - async def backup_flush(self): - assert self.height < self.db_height - assert not self.headers - assert not self.tx_hashes - self.history.assert_flushed() - await self.run_in_thread_with_lock(self._backup_flush_body) - - def _backup_flush_body(self): - '''Like flush() but when backing up. All UTXOs are flushed. - - hashXs - sequence of hashXs which were touched by backing - up. Searched for history entries to remove after the backup - height. - ''' - flush_start = time.time() - - self.backup_fs(self.height, self.tx_count) - - # Backup history. self.touched can include other addresses - # which is harmless, but remove None. - self.touched.discard(None) - nremoves = self.history.backup(self.touched, self.tx_count) - self.logger.info('backing up removed {:,d} history entries' - .format(nremoves)) + def flush(): + self.db.flush_dbs(self.flush_data(), flush_utxos, + self.estimate_txs_remaining) + await self.run_in_thread_with_lock(flush) - with self.utxo_db.write_batch() as batch: - # Flush state last as it reads the wall time. - self.flush_utxos(batch) - self.flush_state(batch) - - self.logger.info('backup flush #{:,d} took {:.1f}s. ' - 'Height {:,d} txs: {:,d}' - .format(self.history.flush_count, - self.last_flush - flush_start, - self.height, self.tx_count)) + async def _maybe_flush(self): + # If caught up, flush everything as client queries are + # performed on the DB. + if self._caught_up_event.is_set(): + await self.flush(True) + elif time.time() > self.next_cache_check: + flush_arg = self.check_cache_size() + if flush_arg is not None: + await self.flush(flush_arg) + self.next_cache_check = time.time() + 30 def check_cache_size(self): '''Flush a cache if it gets too big.''' @@ -451,10 +351,10 @@ class BlockProcessor(electrumx.server.db.DB): one_MB = 1000*1000 utxo_cache_size = len(self.utxo_cache) * 205 db_deletes_size = len(self.db_deletes) * 57 - hist_cache_size = self.history.unflushed_memsize() + hist_cache_size = self.db.history.unflushed_memsize() # Roughly ntxs * 32 + nblocks * 42 - tx_hash_size = ((self.tx_count - self.fs_tx_count) * 32 - + (self.height - self.fs_height) * 42) + tx_hash_size = ((self.tx_count - self.db.fs_tx_count) * 32 + + (self.height - self.db.fs_height) * 42) utxo_MB = (db_deletes_size + utxo_cache_size) // one_MB hist_MB = (hist_cache_size + tx_hash_size) // one_MB @@ -465,8 +365,9 @@ class BlockProcessor(electrumx.server.db.DB): # Flush history if it takes up over 20% of cache memory. # Flush UTXOs once they take up 80% of cache memory. - if utxo_MB + hist_MB >= self.cache_MB or hist_MB >= self.cache_MB // 5: - return utxo_MB >= self.cache_MB * 4 // 5 + cache_MB = self.env.cache_MB + if utxo_MB + hist_MB >= cache_MB or hist_MB >= cache_MB // 5: + return utxo_MB >= cache_MB * 4 // 5 return None def advance_blocks(self, blocks): @@ -474,7 +375,7 @@ class BlockProcessor(electrumx.server.db.DB): It is already verified they correctly connect onto our tip. ''' - min_height = self.min_undo_height(self.daemon.cached_height()) + min_height = self.db.min_undo_height(self.daemon.cached_height()) height = self.height for block in blocks: @@ -482,7 +383,7 @@ class BlockProcessor(electrumx.server.db.DB): undo_info = self.advance_txs(block.transactions) if height >= min_height: self.undo_infos.append((undo_info, height)) - self.write_raw_block(block.raw, height) + self.db.write_raw_block(block.raw, height) headers = [block.header for block in blocks] self.height = height @@ -529,10 +430,10 @@ class BlockProcessor(electrumx.server.db.DB): update_touched(hashXs) tx_num += 1 - self.history.add_unflushed(hashXs_by_tx, self.tx_count) + self.db.history.add_unflushed(hashXs_by_tx, self.tx_count) self.tx_count = tx_num - self.tx_counts.append(tx_num) + self.db.tx_counts.append(tx_num) return undo_info @@ -542,7 +443,7 @@ class BlockProcessor(electrumx.server.db.DB): The blocks should be in order of decreasing height, starting at. self.height. A flush is performed once the blocks are backed up. ''' - self.assert_flushed() + self.db.assert_flushed(self.flush_data()) assert self.height >= len(raw_blocks) coin = self.coin @@ -558,14 +459,14 @@ class BlockProcessor(electrumx.server.db.DB): self.tip = coin.header_prevhash(block.header) self.backup_txs(block.transactions) self.height -= 1 - self.tx_counts.pop() + self.db.tx_counts.pop() self.logger.info('backed up to height {:,d}'.format(self.height)) def backup_txs(self, txs): # Prevout values, in order down the block (coinbase first if present) # undo_info is in reverse block order - undo_info = self.read_undo_info(self.height) + undo_info = self.db.read_undo_info(self.height) if undo_info is None: raise ChainError('no undo information found for height {:,d}' .format(self.height)) @@ -673,14 +574,14 @@ class BlockProcessor(electrumx.server.db.DB): # Value: hashX prefix = b'h' + tx_hash[:4] + idx_packed candidates = {db_key: hashX for db_key, hashX - in self.utxo_db.iterator(prefix=prefix)} + in self.db.utxo_db.iterator(prefix=prefix)} for hdb_key, hashX in candidates.items(): tx_num_packed = hdb_key[-4:] if len(candidates) > 1: tx_num, = unpack('False state. - first_sync = self.first_sync - self.first_sync = False + first_sync = self.db.first_sync + self.db.first_sync = False await self.flush(True) if first_sync: self.logger.info(f'{electrumx.version} synced to ' @@ -768,22 +627,13 @@ class BlockProcessor(electrumx.server.db.DB): # Initialise the notification framework await self.notifications.on_block(set(), self.height) # Reopen for serving - await self.open_for_serving() + await self.db.open_for_serving() async def _first_open_dbs(self): - await self.open_for_sync() - # An incomplete compaction needs to be cancelled otherwise - # restarting it will corrupt the history - self.history.cancel_compaction() - # These are our state as we move ahead of DB state - self.fs_height = self.db_height - self.fs_tx_count = self.db_tx_count - self.height = self.db_height - self.tip = self.db_tip - self.tx_count = self.db_tx_count - self.last_flush_tx_count = self.tx_count - if self.utxo_db.for_sync: - self.logger.info(f'flushing DB cache at {self.cache_MB:,d} MB') + await self.db.open_for_sync() + self.height = self.db.db_height + self.tip = self.db.db_tip + self.tx_count = self.db.db_tx_count # --- External API diff --git a/electrumx/server/chain_state.py b/electrumx/server/chain_state.py index 8e33830..135d42a 100644 --- a/electrumx/server/chain_state.py +++ b/electrumx/server/chain_state.py @@ -14,18 +14,18 @@ class ChainState(object): blocks, transaction history, UTXOs and the mempool. ''' - def __init__(self, env, daemon, bp): + def __init__(self, env, db, daemon, bp): self._env = env + self._db = db self._daemon = daemon - self._bp = bp # External interface pass-throughs for session.py - self.force_chain_reorg = self._bp.force_chain_reorg - self.tx_branch_and_root = self._bp.merkle.branch_and_root - self.read_headers = self._bp.read_headers - self.all_utxos = self._bp.all_utxos - self.limited_history = self._bp.limited_history - self.header_branch_and_root = self._bp.header_branch_and_root + self.force_chain_reorg = bp.force_chain_reorg + self.tx_branch_and_root = db.merkle.branch_and_root + self.read_headers = db.read_headers + self.all_utxos = db.all_utxos + self.limited_history = db.limited_history + self.header_branch_and_root = db.header_branch_and_root async def broadcast_transaction(self, raw_tx): return await self._daemon.sendrawtransaction([raw_tx]) @@ -34,7 +34,7 @@ class ChainState(object): return await getattr(self._daemon, method)(*args) def db_height(self): - return self._bp.db_height + return self._db.db_height def get_info(self): '''Chain state info for LocalRPC and logs.''' @@ -57,7 +57,7 @@ class ChainState(object): async def query(self, args, limit): coin = self._env.coin - db = self._bp + db = self._db lines = [] def arg_to_hashX(arg): diff --git a/electrumx/server/controller.py b/electrumx/server/controller.py index e3115c6..665d39c 100644 --- a/electrumx/server/controller.py +++ b/electrumx/server/controller.py @@ -13,6 +13,7 @@ import electrumx from electrumx.lib.server_base import ServerBase from electrumx.lib.util import version_string from electrumx.server.chain_state import ChainState +from electrumx.server.db import DB from electrumx.server.mempool import MemPool from electrumx.server.session import SessionManager @@ -93,10 +94,11 @@ class Controller(ServerBase): notifications = Notifications() daemon = env.coin.DAEMON(env) + db = DB(env) BlockProcessor = env.coin.BLOCK_PROCESSOR - bp = BlockProcessor(env, daemon, notifications) - mempool = MemPool(env.coin, daemon, notifications, bp.lookup_utxos) - chain_state = ChainState(env, daemon, bp) + bp = BlockProcessor(env, db, daemon, notifications) + mempool = MemPool(env.coin, daemon, notifications, db.lookup_utxos) + chain_state = ChainState(env, db, daemon, bp) session_mgr = SessionManager(env, chain_state, mempool, notifications, shutdown_event) @@ -108,7 +110,7 @@ class Controller(ServerBase): await group.spawn(session_mgr.serve(serve_externally_event)) await group.spawn(bp.fetch_and_process_blocks(caught_up_event)) await caught_up_event.wait() - await group.spawn(bp.populate_header_merkle_cache()) + await group.spawn(db.populate_header_merkle_cache()) await group.spawn(mempool.keep_synchronized(synchronized_event)) await synchronized_event.wait() serve_externally_event.set() diff --git a/electrumx/server/db.py b/electrumx/server/db.py index b23f87c..6e82a52 100644 --- a/electrumx/server/db.py +++ b/electrumx/server/db.py @@ -18,17 +18,30 @@ from collections import namedtuple from glob import glob from struct import pack, unpack +import attr from aiorpcx import run_in_thread import electrumx.lib.util as util from electrumx.lib.hash import hash_to_hex_str, HASHX_LEN from electrumx.lib.merkle import Merkle, MerkleCache +from electrumx.lib.util import formatted_time from electrumx.server.storage import db_class from electrumx.server.history import History UTXO = namedtuple("UTXO", "tx_num tx_pos tx_hash height value") +@attr.s(slots=True) +class FlushData(object): + height = attr.ib() + tx_count = attr.ib() + headers = attr.ib() + block_tx_hashes = attr.ib() + # The following are flushed to the UTXO DB if undo_infos is not None + undo_infos = attr.ib() + adds = attr.ib() + deletes = attr.ib() + tip = attr.ib() class DB(object): '''Simple wrapper of the backend database for querying. @@ -62,6 +75,7 @@ class DB(object): self.history = History() self.utxo_db = None self.tx_counts = None + self.last_flush = time.time() self.logger.info(f'using {self.env.db_engine} for DB backend') @@ -90,7 +104,7 @@ class DB(object): else: assert self.db_tx_count == 0 - async def _open_dbs(self, for_sync): + async def _open_dbs(self, for_sync, compacting): assert self.utxo_db is None # First UTXO DB @@ -110,12 +124,16 @@ class DB(object): # Then history DB self.utxo_flush_count = self.history.open_db(self.db_class, for_sync, - self.utxo_flush_count) + self.utxo_flush_count, + compacting) self.clear_excess_undo_info() # Read TX counts (requires meta directory) await self._read_tx_counts() + async def open_for_compacting(self): + await self._open_dbs(True, True) + async def open_for_sync(self): '''Open the databases to sync to the daemon. @@ -123,7 +141,7 @@ class DB(object): synchronization. When serving clients we want the open files for serving network connections. ''' - await self._open_dbs(True) + await self._open_dbs(True, False) async def open_for_serving(self): '''Open the databases for serving. If they are already open they are @@ -134,13 +152,13 @@ class DB(object): self.utxo_db.close() self.history.close_db() self.utxo_db = None - await self._open_dbs(False) + await self._open_dbs(False, False) # Header merkle cache async def populate_header_merkle_cache(self): self.logger.info('populating header merkle cache...') - length = max(1, self.height - self.env.reorg_limit) + length = max(1, self.db_height - self.env.reorg_limit) start = time.time() await self.header_mc.initialize(length) elapsed = time.time() - start @@ -149,6 +167,178 @@ class DB(object): async def header_branch_and_root(self, length, height): return await self.header_mc.branch_and_root(length, height) + # Flushing + def assert_flushed(self, flush_data): + '''Asserts state is fully flushed.''' + assert flush_data.tx_count == self.fs_tx_count == self.db_tx_count + assert flush_data.height == self.fs_height == self.db_height + assert flush_data.tip == self.db_tip + assert not flush_data.headers + assert not flush_data.block_tx_hashes + assert not flush_data.adds + assert not flush_data.deletes + assert not flush_data.undo_infos + self.history.assert_flushed() + + def flush_dbs(self, flush_data, flush_utxos, estimate_txs_remaining): + '''Flush out cached state. History is always flushed; UTXOs are + flushed if flush_utxos.''' + if flush_data.height == self.db_height: + self.assert_flushed(flush_data) + return + + start_time = time.time() + prior_flush = self.last_flush + tx_delta = flush_data.tx_count - self.last_flush_tx_count + + # Flush to file system + self.flush_fs(flush_data) + + # Then history + self.flush_history() + + # Flush state last as it reads the wall time. + with self.utxo_db.write_batch() as batch: + if flush_utxos: + self.flush_utxo_db(batch, flush_data) + self.flush_state(batch) + + # Update and put the wall time again - otherwise we drop the + # time it took to commit the batch + self.flush_state(self.utxo_db) + + elapsed = self.last_flush - start_time + self.logger.info(f'flush #{self.history.flush_count:,d} took ' + f'{elapsed:.1f}s. Height {flush_data.height:,d} ' + f'txs: {flush_data.tx_count:,d} ({tx_delta:+,d})') + + # Catch-up stats + if self.utxo_db.for_sync: + flush_interval = self.last_flush - prior_flush + tx_per_sec_gen = int(flush_data.tx_count / self.wall_time) + tx_per_sec_last = 1 + int(tx_delta / flush_interval) + eta = estimate_txs_remaining() / tx_per_sec_last + self.logger.info(f'tx/sec since genesis: {tx_per_sec_gen:,d}, ' + f'since last flush: {tx_per_sec_last:,d}') + self.logger.info(f'sync time: {formatted_time(self.wall_time)} ' + f'ETA: {formatted_time(eta)}') + + def flush_fs(self, flush_data): + '''Write headers, tx counts and block tx hashes to the filesystem. + + The first height to write is self.fs_height + 1. The FS + metadata is all append-only, so in a crash we just pick up + again from the height stored in the DB. + ''' + prior_tx_count = (self.tx_counts[self.fs_height] + if self.fs_height >= 0 else 0) + assert len(flush_data.block_tx_hashes) == len(flush_data.headers) + assert flush_data.height == self.fs_height + len(flush_data.headers) + assert flush_data.tx_count == (self.tx_counts[-1] if self.tx_counts + else 0) + assert len(self.tx_counts) == flush_data.height + 1 + hashes = b''.join(flush_data.block_tx_hashes) + flush_data.block_tx_hashes.clear() + assert len(hashes) % 32 == 0 + assert len(hashes) // 32 == flush_data.tx_count - prior_tx_count + + # Write the headers, tx counts, and tx hashes + start_time = time.time() + height_start = self.fs_height + 1 + offset = self.header_offset(height_start) + self.headers_file.write(offset, b''.join(flush_data.headers)) + self.fs_update_header_offsets(offset, height_start, flush_data.headers) + flush_data.headers.clear() + + offset = height_start * self.tx_counts.itemsize + self.tx_counts_file.write(offset, + self.tx_counts[height_start:].tobytes()) + offset = prior_tx_count * 32 + self.hashes_file.write(offset, hashes) + + self.fs_height = flush_data.height + self.fs_tx_count = flush_data.tx_count + + if self.utxo_db.for_sync: + elapsed = time.time() - start_time + self.logger.info(f'flushed filesystem data in {elapsed:.2f}s') + + def flush_history(self): + self.history.flush() + + def flush_utxo_db(self, batch, flush_data): + '''Flush the cached DB writes and UTXO set to the batch.''' + # Care is needed because the writes generated by flushing the + # UTXO state may have keys in common with our write cache or + # may be in the DB already. + start_time = time.time() + add_count = len(flush_data.adds) + spend_count = len(flush_data.deletes) // 2 + + # Spends + batch_delete = batch.delete + for key in sorted(flush_data.deletes): + batch_delete(key) + flush_data.deletes.clear() + + # New UTXOs + batch_put = batch.put + for key, value in flush_data.adds.items(): + # suffix = tx_idx + tx_num + hashX = value[:-12] + suffix = key[-2:] + value[-12:-8] + batch_put(b'h' + key[:4] + suffix, hashX) + batch_put(b'u' + hashX + suffix, value[-8:]) + flush_data.adds.clear() + + # New undo information + self.flush_undo_infos(batch_put, flush_data.undo_infos) + flush_data.undo_infos.clear() + + if self.utxo_db.for_sync: + block_count = flush_data.height - self.db_height + tx_count = flush_data.tx_count - self.db_tx_count + elapsed = time.time() - start_time + self.logger.info(f'flushed {block_count:,d} blocks with ' + f'{tx_count:,d} txs, {add_count:,d} UTXO adds, ' + f'{spend_count:,d} spends in ' + f'{elapsed:.1f}s, committing...') + + self.utxo_flush_count = self.history.flush_count + self.db_height = flush_data.height + self.db_tx_count = flush_data.tx_count + self.db_tip = flush_data.tip + + def flush_state(self, batch): + '''Flush chain state to the batch.''' + now = time.time() + self.wall_time += now - self.last_flush + self.last_flush = now + self.last_flush_tx_count = self.fs_tx_count + self.write_utxo_state(batch) + + def flush_backup(self, flush_data, touched): + '''Like flush_dbs() but when backing up. All UTXOs are flushed.''' + assert not flush_data.headers + assert not flush_data.block_tx_hashes + assert flush_data.height < self.db_height + self.history.assert_flushed() + + start_time = time.time() + tx_delta = flush_data.tx_count - self.last_flush_tx_count + + self.backup_fs(flush_data.height, flush_data.tx_count) + self.history.backup(touched, flush_data.tx_count) + with self.utxo_db.write_batch() as batch: + self.flush_utxo_db(batch, flush_data) + # Flush state last as it reads the wall time. + self.flush_state(batch) + + elapsed = self.last_flush - start_time + self.logger.info(f'backup flush #{self.history.flush_count:,d} took ' + f'{elapsed:.1f}s. Height {flush_data.height:,d} ' + f'txs: {flush_data.tx_count:,d} ({tx_delta:+,d})') + def fs_update_header_offsets(self, offset_start, height_start, headers): if self.coin.STATIC_BLOCK_HEADERS: return @@ -178,36 +368,6 @@ class DB(object): # Truncate header_mc: header count is 1 more than the height. self.header_mc.truncate(height + 1) - def fs_update(self, fs_height, headers, block_tx_hashes): - '''Write headers, the tx_count array and block tx hashes to disk. - - Their first height is fs_height. No recorded DB state is - updated. These arrays are all append only, so in a crash we - just pick up again from the DB height. - ''' - blocks_done = len(headers) - height_start = fs_height + 1 - new_height = fs_height + blocks_done - prior_tx_count = (self.tx_counts[fs_height] if fs_height >= 0 else 0) - cur_tx_count = self.tx_counts[-1] if self.tx_counts else 0 - txs_done = cur_tx_count - prior_tx_count - - assert len(block_tx_hashes) == blocks_done - assert len(self.tx_counts) == new_height + 1 - hashes = b''.join(block_tx_hashes) - assert len(hashes) % 32 == 0 - assert len(hashes) // 32 == txs_done - - # Write the headers, tx counts, and tx hashes - offset = self.header_offset(height_start) - self.headers_file.write(offset, b''.join(headers)) - self.fs_update_header_offsets(offset, height_start, headers) - offset = height_start * self.tx_counts.itemsize - self.tx_counts_file.write(offset, - self.tx_counts[height_start:].tobytes()) - offset = prior_tx_count * 32 - self.hashes_file.write(offset, hashes) - async def read_headers(self, start_height, count): '''Requires start_height >= 0, count >= 0. Reads as many headers as are available starting at start_height up to count. This @@ -379,6 +539,11 @@ class DB(object): self.wall_time = state['wall_time'] self.first_sync = state['first_sync'] + # These are our state as we move ahead of DB state + self.fs_height = self.db_height + self.fs_tx_count = self.db_tx_count + self.last_flush_tx_count = self.fs_tx_count + # Log some stats self.logger.info('DB version: {:d}'.format(self.db_version)) self.logger.info('coin: {}'.format(self.coin.NAME)) @@ -386,6 +551,8 @@ class DB(object): self.logger.info('height: {:,d}'.format(self.db_height)) self.logger.info('tip: {}'.format(hash_to_hex_str(self.db_tip))) self.logger.info('tx count: {:,d}'.format(self.db_tx_count)) + if self.utxo_db.for_sync: + self.logger.info(f'flushing DB cache at {self.env.cache_MB:,d} MB') if self.first_sync: self.logger.info('sync time so far: {}' .format(util.formatted_time(self.wall_time))) diff --git a/electrumx/server/history.py b/electrumx/server/history.py index b525af8..b42ca6c 100644 --- a/electrumx/server/history.py +++ b/electrumx/server/history.py @@ -11,6 +11,7 @@ import array import ast import bisect +import time from collections import defaultdict from functools import partial from struct import pack, unpack @@ -31,10 +32,14 @@ class History(object): self.unflushed_count = 0 self.db = None - def open_db(self, db_class, for_sync, utxo_flush_count): + def open_db(self, db_class, for_sync, utxo_flush_count, compacting): self.db = db_class('hist', for_sync) self.read_state() self.clear_excess(utxo_flush_count) + # An incomplete compaction needs to be cancelled otherwise + # restarting it will corrupt the history + if not compacting: + self._cancel_compaction() return self.flush_count def close_db(self): @@ -80,7 +85,7 @@ class History(object): if flush_id > utxo_flush_count: keys.append(key) - self.logger.info('deleting {:,d} history entries'.format(len(keys))) + self.logger.info(f'deleting {len(keys):,d} history entries') self.flush_count = utxo_flush_count with self.db.write_batch() as batch: @@ -119,6 +124,7 @@ class History(object): assert not self.unflushed def flush(self): + start_time = time.time() self.flush_count += 1 flush_id = pack('>H', self.flush_count) unflushed = self.unflushed @@ -132,7 +138,11 @@ class History(object): count = len(unflushed) unflushed.clear() self.unflushed_count = 0 - return count + + if self.db.for_sync: + elapsed = time.time() - start_time + self.logger.info(f'flushed history in {elapsed:.1f}s ' + f'for {count:,d} addrs') def backup(self, hashXs, tx_count): # Not certain this is needed, but it doesn't hurt @@ -161,7 +171,7 @@ class History(object): batch.put(key, value) self.write_state(batch) - return nremoves + self.logger.info(f'backing up removed {nremoves:,d} history entries') def get_txnums(self, hashX, limit=1000): '''Generator that returns an unpruned, sorted list of tx_nums in the @@ -307,7 +317,7 @@ class History(object): 100 * cursor / 65536)) return write_size - def cancel_compaction(self): + def _cancel_compaction(self): if self.comp_cursor != -1: self.logger.warning('cancelling in-progress history compaction') self.comp_flush_count = -1