From bd636a75adda009af5526b501864d5f70c7579f1 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Thu, 19 Jul 2018 14:17:14 +0800 Subject: [PATCH] Clean up the process of opening the DBs, make asynchronous --- compact_history.py | 7 +- electrumx/server/block_processor.py | 108 +++++++++++++++------------ electrumx/server/chain_state.py | 3 +- electrumx/server/db.py | 111 ++++++++++++++-------------- 4 files changed, 124 insertions(+), 105 deletions(-) diff --git a/compact_history.py b/compact_history.py index 134b454..9186c64 100755 --- a/compact_history.py +++ b/compact_history.py @@ -31,6 +31,7 @@ running the compaction to completion, it will not benefit and subsequent compactions will restart from the beginning. ''' +import asyncio import logging import sys import traceback @@ -40,13 +41,14 @@ from electrumx import Env from electrumx.server.db import DB -def compact_history(): +async def compact_history(): if sys.version_info < (3, 6): raise RuntimeError('Python >= 3.6 is required to run ElectrumX') environ['DAEMON_URL'] = '' # Avoid Env erroring out env = Env() db = DB(env) + await db.open_for_sync() assert not db.first_sync history = db.history @@ -66,8 +68,9 @@ def compact_history(): def main(): logging.basicConfig(level=logging.INFO) logging.info('Starting history compaction...') + loop = asyncio.get_event_loop() try: - compact_history() + loop.run_until_complete(compact_history()) except Exception: traceback.print_exc() logging.critical('History compaction terminated abnormally') diff --git a/electrumx/server/block_processor.py b/electrumx/server/block_processor.py index 8d7e33f..daa0de0 100644 --- a/electrumx/server/block_processor.py +++ b/electrumx/server/block_processor.py @@ -84,7 +84,7 @@ class Prefetcher(object): Repeats until the queue is full or caught up. ''' daemon = self.bp.daemon - daemon_height = await daemon.height(self.bp.caught_up_event.is_set()) + daemon_height = await daemon.height(self.bp._caught_up_event.is_set()) async with self.semaphore: while self.cache_size < self.min_cache_size: # Try and catch up all blocks but limit to room in cache. @@ -149,28 +149,16 @@ class BlockProcessor(electrumx.server.db.DB): def __init__(self, env, tasks, daemon): super().__init__(env) - # An incomplete compaction needs to be cancelled otherwise - # restarting it will corrupt the history - self.history.cancel_compaction() - self.tasks = tasks self.daemon = daemon - # These are our state as we move ahead of DB state - self.fs_height = self.db_height - self.fs_tx_count = self.db_tx_count - self.height = self.db_height - self.tip = self.db_tip - self.tx_count = self.db_tx_count - - self.caught_up_event = asyncio.Event() + self._caught_up_event = asyncio.Event() self.task_queue = asyncio.Queue() # Meta self.cache_MB = env.cache_MB self.next_cache_check = 0 self.last_flush = time.time() - self.last_flush_tx_count = self.tx_count self.touched = set() self.callbacks = [] @@ -189,10 +177,6 @@ class BlockProcessor(electrumx.server.db.DB): self.prefetcher = Prefetcher(self) - if self.utxo_db.for_sync: - self.logger.info('flushing DB cache at {:,d} MB' - .format(self.cache_MB)) - def add_task(self, task): '''Add the task to our task queue.''' self.task_queue.put_nowait(task) @@ -203,7 +187,10 @@ class BlockProcessor(electrumx.server.db.DB): def on_prefetcher_first_caught_up(self): '''Called by the prefetcher when it first catches up.''' - self.add_task(self.first_caught_up) + # Process after prior tasks (blocks) are completed. + async def set_event(): + self._caught_up_event.set() + self.add_task(set_event) def add_new_block_callback(self, callback): '''Add a function called when a new block is found. @@ -214,15 +201,6 @@ class BlockProcessor(electrumx.server.db.DB): ''' self.callbacks.append(callback) - async def main_loop(self): - '''Main loop for block processing.''' - self.tasks.create_task(self.prefetcher.main_loop()) - await self.prefetcher.reset_height() - - while True: - task = await self.task_queue.get() - await task() - def shutdown(self, executor): '''Shutdown cleanly and flush to disk.''' # First stut down the executor; it may be processing a block. @@ -232,23 +210,6 @@ class BlockProcessor(electrumx.server.db.DB): self.logger.info('flushing state to DB for a clean shutdown...') self.flush(True) - async def first_caught_up(self): - '''Called when first caught up to daemon after starting.''' - # Flush everything with updated first_sync->False state. - self.first_sync = False - await self.tasks.run_in_thread(self.flush, True) - if self.utxo_db.for_sync: - self.logger.info(f'{electrumx.version} synced to ' - f'height {self.height:,d}') - self.open_dbs() - self.logger.info(f'caught up to height {self.height:,d}') - length = max(1, self.height - self.env.reorg_limit) - self.header_mc = MerkleCache(self.merkle, HeaderSource(self), length) - self.logger.info('populated header merkle cache') - - # Reorgs use header_mc so safest to set this after initializing it - self.caught_up_event.set() - async def check_and_advance_blocks(self, raw_blocks, first): '''Process the list of raw blocks passed. Detects and handles reorgs. @@ -297,7 +258,7 @@ class BlockProcessor(electrumx.server.db.DB): Returns True if a reorg is queued, false if not caught up. ''' - if self.caught_up_event.is_set(): + if self._caught_up_event.is_set(): self.add_task(partial(self.reorg_chain, count=count)) return True return False @@ -550,7 +511,7 @@ class BlockProcessor(electrumx.server.db.DB): # If caught up, flush everything as client queries are # performed on the DB. - if self.caught_up_event.is_set(): + if self._caught_up_event.is_set(): self.flush(True) else: if time.time() > self.next_cache_check: @@ -808,3 +769,56 @@ class BlockProcessor(electrumx.server.db.DB): self.db_tx_count = self.tx_count self.db_height = self.height self.db_tip = self.tip + + async def _process_blocks_forever(self): + '''Loop forever processing blocks.''' + while True: + task = await self.task_queue.get() + await task() + + def _on_dbs_opened(self): + # An incomplete compaction needs to be cancelled otherwise + # restarting it will corrupt the history + self.history.cancel_compaction() + # These are our state as we move ahead of DB state + self.fs_height = self.db_height + self.fs_tx_count = self.db_tx_count + self.height = self.db_height + self.tip = self.db_tip + self.tx_count = self.db_tx_count + self.last_flush_tx_count = self.tx_count + if self.utxo_db.for_sync: + self.logger.info(f'flushing DB cache at {self.cache_MB:,d} MB') + + # --- External API + + async def catch_up_to_daemon(self): + '''Process and index blocks until we catch up with the daemon. + + Returns once caught up. Future blocks continue to be + processed in a separate task. + ''' + # Open the databases first. + await self.open_for_sync() + self._on_dbs_opened() + # Get the prefetcher running + self.tasks.create_task(self.prefetcher.main_loop()) + await self.prefetcher.reset_height() + # Start our loop that processes blocks as they are fetched + self.tasks.create_task(self._process_blocks_forever()) + # Wait until caught up + await self._caught_up_event.wait() + # Flush everything but with first_sync->False state. + first_sync = self.first_sync + self.first_sync = False + await self.tasks.run_in_thread(self.flush, True) + if first_sync: + self.logger.info(f'{electrumx.version} synced to ' + f'height {self.height:,d}') + # Reopen for serving + await self.open_for_serving() + + # Populate the header merkle cache + length = max(1, self.height - self.env.reorg_limit) + self.header_mc = MerkleCache(self.merkle, HeaderSource(self), length) + self.logger.info('populated header merkle cache') diff --git a/electrumx/server/chain_state.py b/electrumx/server/chain_state.py index b667c31..d0656ff 100644 --- a/electrumx/server/chain_state.py +++ b/electrumx/server/chain_state.py @@ -107,7 +107,6 @@ class ChainState(object): self.tasks.loop.call_soon(self.shutdown_event.set) async def wait_for_mempool(self): - self.tasks.create_task(self.bp.main_loop()) - await self.bp.caught_up_event.wait() + await self.bp.catch_up_to_daemon() self.tasks.create_task(self.mempool.main_loop()) await self.mempool.synchronized_event.wait() diff --git a/electrumx/server/db.py b/electrumx/server/db.py index 25be167..1c64469 100644 --- a/electrumx/server/db.py +++ b/electrumx/server/db.py @@ -54,19 +54,16 @@ class DB(object): self.header_offset = self.dynamic_header_offset self.header_len = self.dynamic_header_len - self.logger.info('switching current directory to {}' - .format(env.db_dir)) + self.logger.info(f'switching current directory to {env.db_dir}') os.chdir(env.db_dir) self.db_class = db_class(self.env.db_engine) - self.logger.info('using {} for DB backend'.format(self.env.db_engine)) - self.history = History() self.utxo_db = None - self.open_dbs() + self.tx_counts = None - self.logger.info('reorg limit is {:,d} blocks' - .format(self.env.reorg_limit)) + self.logger.info(f'using {self.env.db_engine} for DB backend') + self.logger.info(f'reorg limit is {self.env.reorg_limit:,d} blocks') self.headers_file = util.LogicalFile('meta/headers', 2, 16000000) self.tx_counts_file = util.LogicalFile('meta/txcounts', 2, 2000000) @@ -74,10 +71,10 @@ class DB(object): if not self.coin.STATIC_BLOCK_HEADERS: self.headers_offsets_file = util.LogicalFile( 'meta/headers_offsets', 2, 16000000) - # Write the offset of the genesis block - if self.headers_offsets_file.read(0, 8) != b'\x00' * 8: - self.headers_offsets_file.write(0, b'\x00' * 8) + async def _read_tx_counts(self): + if self.tx_counts is not None: + return # tx_counts[N] has the cumulative number of txs at the end of # height N. So tx_counts[0] is 1 - the genesis coinbase size = (self.db_height + 1) * 4 @@ -89,56 +86,51 @@ class DB(object): else: assert self.db_tx_count == 0 - def open_dbs(self): - '''Open the databases. If already open they are closed and re-opened. + async def _open_dbs(self, for_sync): + assert self.utxo_db is None + + # First UTXO DB + self.utxo_db = self.db_class('utxo', for_sync) + if self.utxo_db.is_new: + self.logger.info('created new database') + self.logger.info('creating metadata directory') + os.mkdir('meta') + with util.open_file('COIN', create=True) as f: + f.write(f'ElectrumX databases and metadata for ' + f'{self.coin.NAME} {self.coin.NET}') + if not self.coin.STATIC_BLOCK_HEADERS: + self.headers_offsets_file.write(0, bytes(8)) + else: + self.logger.info(f'opened UTXO DB (for sync: {for_sync})') + self.read_utxo_state() + + # Then history DB + self.utxo_flush_count = self.history.open_db(self.db_class, for_sync, + self.utxo_flush_count) + self.clear_excess_undo_info() + + # Read TX counts (requires meta directory) + await self._read_tx_counts() + + async def open_for_sync(self): + '''Open the databases to sync to the daemon. When syncing we want to reserve a lot of open files for the synchronization. When serving clients we want the open files for serving network connections. ''' - def log_reason(message, is_for_sync): - reason = 'sync' if is_for_sync else 'serving' - self.logger.info('{} for {}'.format(message, reason)) - - # Assume we're serving until we find out otherwise - for for_sync in [False, True]: - if self.utxo_db: - if self.utxo_db.for_sync == for_sync: - return - log_reason('closing DB to re-open', for_sync) - self.utxo_db.close() - self.history.close_db() - - # Open DB and metadata files. Record some of its state. - self.utxo_db = self.db_class('utxo', for_sync) - if self.utxo_db.is_new: - self.logger.info('created new database') - self.logger.info('creating metadata directory') - os.mkdir('meta') - with util.open_file('COIN', create=True) as f: - f.write('ElectrumX databases and metadata for {} {}' - .format(self.coin.NAME, self.coin.NET).encode()) - else: - log_reason('opened DB', self.utxo_db.for_sync) - - self.read_utxo_state() - if self.first_sync == self.utxo_db.for_sync: - break - - # Open history DB, clear excess history - self.utxo_flush_count = self.history.open_db(self.db_class, for_sync, - self.utxo_flush_count) - self.clear_excess_undo_info() + await self._open_dbs(True) - self.logger.info('DB version: {:d}'.format(self.db_version)) - self.logger.info('coin: {}'.format(self.coin.NAME)) - self.logger.info('network: {}'.format(self.coin.NET)) - self.logger.info('height: {:,d}'.format(self.db_height)) - self.logger.info('tip: {}'.format(hash_to_hex_str(self.db_tip))) - self.logger.info('tx count: {:,d}'.format(self.db_tx_count)) - if self.first_sync: - self.logger.info('sync time so far: {}' - .format(util.formatted_time(self.wall_time))) + async def open_for_serving(self): + '''Open the databases for serving. If they are already open they are + closed first. + ''' + if self.utxo_db: + self.logger.info('closing DBs to re-open for serving') + self.utxo_db.close() + self.history.close_db() + self.utxo_db = None + await self._open_dbs(False) def fs_update_header_offsets(self, offset_start, height_start, headers): if self.coin.STATIC_BLOCK_HEADERS: @@ -355,6 +347,17 @@ class DB(object): self.wall_time = state['wall_time'] self.first_sync = state['first_sync'] + # Log some stats + self.logger.info('DB version: {:d}'.format(self.db_version)) + self.logger.info('coin: {}'.format(self.coin.NAME)) + self.logger.info('network: {}'.format(self.coin.NET)) + self.logger.info('height: {:,d}'.format(self.db_height)) + self.logger.info('tip: {}'.format(hash_to_hex_str(self.db_tip))) + self.logger.info('tx count: {:,d}'.format(self.db_tx_count)) + if self.first_sync: + self.logger.info('sync time so far: {}' + .format(util.formatted_time(self.wall_time))) + def write_utxo_state(self, batch): '''Write (UTXO) state to the batch.''' state = {