Browse Source

Clean up the process of opening the DBs, make asynchronous

patch-2
Neil Booth 7 years ago
parent
commit
bd636a75ad
  1. 7
      compact_history.py
  2. 108
      electrumx/server/block_processor.py
  3. 3
      electrumx/server/chain_state.py
  4. 111
      electrumx/server/db.py

7
compact_history.py

@ -31,6 +31,7 @@ running the compaction to completion, it will not benefit and
subsequent compactions will restart from the beginning. subsequent compactions will restart from the beginning.
''' '''
import asyncio
import logging import logging
import sys import sys
import traceback import traceback
@ -40,13 +41,14 @@ from electrumx import Env
from electrumx.server.db import DB from electrumx.server.db import DB
def compact_history(): async def compact_history():
if sys.version_info < (3, 6): if sys.version_info < (3, 6):
raise RuntimeError('Python >= 3.6 is required to run ElectrumX') raise RuntimeError('Python >= 3.6 is required to run ElectrumX')
environ['DAEMON_URL'] = '' # Avoid Env erroring out environ['DAEMON_URL'] = '' # Avoid Env erroring out
env = Env() env = Env()
db = DB(env) db = DB(env)
await db.open_for_sync()
assert not db.first_sync assert not db.first_sync
history = db.history history = db.history
@ -66,8 +68,9 @@ def compact_history():
def main(): def main():
logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.INFO)
logging.info('Starting history compaction...') logging.info('Starting history compaction...')
loop = asyncio.get_event_loop()
try: try:
compact_history() loop.run_until_complete(compact_history())
except Exception: except Exception:
traceback.print_exc() traceback.print_exc()
logging.critical('History compaction terminated abnormally') logging.critical('History compaction terminated abnormally')

108
electrumx/server/block_processor.py

@ -84,7 +84,7 @@ class Prefetcher(object):
Repeats until the queue is full or caught up. Repeats until the queue is full or caught up.
''' '''
daemon = self.bp.daemon daemon = self.bp.daemon
daemon_height = await daemon.height(self.bp.caught_up_event.is_set()) daemon_height = await daemon.height(self.bp._caught_up_event.is_set())
async with self.semaphore: async with self.semaphore:
while self.cache_size < self.min_cache_size: while self.cache_size < self.min_cache_size:
# Try and catch up all blocks but limit to room in cache. # Try and catch up all blocks but limit to room in cache.
@ -149,28 +149,16 @@ class BlockProcessor(electrumx.server.db.DB):
def __init__(self, env, tasks, daemon): def __init__(self, env, tasks, daemon):
super().__init__(env) super().__init__(env)
# An incomplete compaction needs to be cancelled otherwise
# restarting it will corrupt the history
self.history.cancel_compaction()
self.tasks = tasks self.tasks = tasks
self.daemon = daemon self.daemon = daemon
# These are our state as we move ahead of DB state self._caught_up_event = asyncio.Event()
self.fs_height = self.db_height
self.fs_tx_count = self.db_tx_count
self.height = self.db_height
self.tip = self.db_tip
self.tx_count = self.db_tx_count
self.caught_up_event = asyncio.Event()
self.task_queue = asyncio.Queue() self.task_queue = asyncio.Queue()
# Meta # Meta
self.cache_MB = env.cache_MB self.cache_MB = env.cache_MB
self.next_cache_check = 0 self.next_cache_check = 0
self.last_flush = time.time() self.last_flush = time.time()
self.last_flush_tx_count = self.tx_count
self.touched = set() self.touched = set()
self.callbacks = [] self.callbacks = []
@ -189,10 +177,6 @@ class BlockProcessor(electrumx.server.db.DB):
self.prefetcher = Prefetcher(self) self.prefetcher = Prefetcher(self)
if self.utxo_db.for_sync:
self.logger.info('flushing DB cache at {:,d} MB'
.format(self.cache_MB))
def add_task(self, task): def add_task(self, task):
'''Add the task to our task queue.''' '''Add the task to our task queue.'''
self.task_queue.put_nowait(task) self.task_queue.put_nowait(task)
@ -203,7 +187,10 @@ class BlockProcessor(electrumx.server.db.DB):
def on_prefetcher_first_caught_up(self): def on_prefetcher_first_caught_up(self):
'''Called by the prefetcher when it first catches up.''' '''Called by the prefetcher when it first catches up.'''
self.add_task(self.first_caught_up) # Process after prior tasks (blocks) are completed.
async def set_event():
self._caught_up_event.set()
self.add_task(set_event)
def add_new_block_callback(self, callback): def add_new_block_callback(self, callback):
'''Add a function called when a new block is found. '''Add a function called when a new block is found.
@ -214,15 +201,6 @@ class BlockProcessor(electrumx.server.db.DB):
''' '''
self.callbacks.append(callback) self.callbacks.append(callback)
async def main_loop(self):
'''Main loop for block processing.'''
self.tasks.create_task(self.prefetcher.main_loop())
await self.prefetcher.reset_height()
while True:
task = await self.task_queue.get()
await task()
def shutdown(self, executor): def shutdown(self, executor):
'''Shutdown cleanly and flush to disk.''' '''Shutdown cleanly and flush to disk.'''
# First stut down the executor; it may be processing a block. # First stut down the executor; it may be processing a block.
@ -232,23 +210,6 @@ class BlockProcessor(electrumx.server.db.DB):
self.logger.info('flushing state to DB for a clean shutdown...') self.logger.info('flushing state to DB for a clean shutdown...')
self.flush(True) self.flush(True)
async def first_caught_up(self):
'''Called when first caught up to daemon after starting.'''
# Flush everything with updated first_sync->False state.
self.first_sync = False
await self.tasks.run_in_thread(self.flush, True)
if self.utxo_db.for_sync:
self.logger.info(f'{electrumx.version} synced to '
f'height {self.height:,d}')
self.open_dbs()
self.logger.info(f'caught up to height {self.height:,d}')
length = max(1, self.height - self.env.reorg_limit)
self.header_mc = MerkleCache(self.merkle, HeaderSource(self), length)
self.logger.info('populated header merkle cache')
# Reorgs use header_mc so safest to set this after initializing it
self.caught_up_event.set()
async def check_and_advance_blocks(self, raw_blocks, first): async def check_and_advance_blocks(self, raw_blocks, first):
'''Process the list of raw blocks passed. Detects and handles '''Process the list of raw blocks passed. Detects and handles
reorgs. reorgs.
@ -297,7 +258,7 @@ class BlockProcessor(electrumx.server.db.DB):
Returns True if a reorg is queued, false if not caught up. Returns True if a reorg is queued, false if not caught up.
''' '''
if self.caught_up_event.is_set(): if self._caught_up_event.is_set():
self.add_task(partial(self.reorg_chain, count=count)) self.add_task(partial(self.reorg_chain, count=count))
return True return True
return False return False
@ -550,7 +511,7 @@ class BlockProcessor(electrumx.server.db.DB):
# If caught up, flush everything as client queries are # If caught up, flush everything as client queries are
# performed on the DB. # performed on the DB.
if self.caught_up_event.is_set(): if self._caught_up_event.is_set():
self.flush(True) self.flush(True)
else: else:
if time.time() > self.next_cache_check: if time.time() > self.next_cache_check:
@ -808,3 +769,56 @@ class BlockProcessor(electrumx.server.db.DB):
self.db_tx_count = self.tx_count self.db_tx_count = self.tx_count
self.db_height = self.height self.db_height = self.height
self.db_tip = self.tip self.db_tip = self.tip
async def _process_blocks_forever(self):
'''Loop forever processing blocks.'''
while True:
task = await self.task_queue.get()
await task()
def _on_dbs_opened(self):
# An incomplete compaction needs to be cancelled otherwise
# restarting it will corrupt the history
self.history.cancel_compaction()
# These are our state as we move ahead of DB state
self.fs_height = self.db_height
self.fs_tx_count = self.db_tx_count
self.height = self.db_height
self.tip = self.db_tip
self.tx_count = self.db_tx_count
self.last_flush_tx_count = self.tx_count
if self.utxo_db.for_sync:
self.logger.info(f'flushing DB cache at {self.cache_MB:,d} MB')
# --- External API
async def catch_up_to_daemon(self):
'''Process and index blocks until we catch up with the daemon.
Returns once caught up. Future blocks continue to be
processed in a separate task.
'''
# Open the databases first.
await self.open_for_sync()
self._on_dbs_opened()
# Get the prefetcher running
self.tasks.create_task(self.prefetcher.main_loop())
await self.prefetcher.reset_height()
# Start our loop that processes blocks as they are fetched
self.tasks.create_task(self._process_blocks_forever())
# Wait until caught up
await self._caught_up_event.wait()
# Flush everything but with first_sync->False state.
first_sync = self.first_sync
self.first_sync = False
await self.tasks.run_in_thread(self.flush, True)
if first_sync:
self.logger.info(f'{electrumx.version} synced to '
f'height {self.height:,d}')
# Reopen for serving
await self.open_for_serving()
# Populate the header merkle cache
length = max(1, self.height - self.env.reorg_limit)
self.header_mc = MerkleCache(self.merkle, HeaderSource(self), length)
self.logger.info('populated header merkle cache')

3
electrumx/server/chain_state.py

@ -107,7 +107,6 @@ class ChainState(object):
self.tasks.loop.call_soon(self.shutdown_event.set) self.tasks.loop.call_soon(self.shutdown_event.set)
async def wait_for_mempool(self): async def wait_for_mempool(self):
self.tasks.create_task(self.bp.main_loop()) await self.bp.catch_up_to_daemon()
await self.bp.caught_up_event.wait()
self.tasks.create_task(self.mempool.main_loop()) self.tasks.create_task(self.mempool.main_loop())
await self.mempool.synchronized_event.wait() await self.mempool.synchronized_event.wait()

111
electrumx/server/db.py

@ -54,19 +54,16 @@ class DB(object):
self.header_offset = self.dynamic_header_offset self.header_offset = self.dynamic_header_offset
self.header_len = self.dynamic_header_len self.header_len = self.dynamic_header_len
self.logger.info('switching current directory to {}' self.logger.info(f'switching current directory to {env.db_dir}')
.format(env.db_dir))
os.chdir(env.db_dir) os.chdir(env.db_dir)
self.db_class = db_class(self.env.db_engine) self.db_class = db_class(self.env.db_engine)
self.logger.info('using {} for DB backend'.format(self.env.db_engine))
self.history = History() self.history = History()
self.utxo_db = None self.utxo_db = None
self.open_dbs() self.tx_counts = None
self.logger.info('reorg limit is {:,d} blocks' self.logger.info(f'using {self.env.db_engine} for DB backend')
.format(self.env.reorg_limit)) self.logger.info(f'reorg limit is {self.env.reorg_limit:,d} blocks')
self.headers_file = util.LogicalFile('meta/headers', 2, 16000000) self.headers_file = util.LogicalFile('meta/headers', 2, 16000000)
self.tx_counts_file = util.LogicalFile('meta/txcounts', 2, 2000000) self.tx_counts_file = util.LogicalFile('meta/txcounts', 2, 2000000)
@ -74,10 +71,10 @@ class DB(object):
if not self.coin.STATIC_BLOCK_HEADERS: if not self.coin.STATIC_BLOCK_HEADERS:
self.headers_offsets_file = util.LogicalFile( self.headers_offsets_file = util.LogicalFile(
'meta/headers_offsets', 2, 16000000) 'meta/headers_offsets', 2, 16000000)
# Write the offset of the genesis block
if self.headers_offsets_file.read(0, 8) != b'\x00' * 8:
self.headers_offsets_file.write(0, b'\x00' * 8)
async def _read_tx_counts(self):
if self.tx_counts is not None:
return
# tx_counts[N] has the cumulative number of txs at the end of # tx_counts[N] has the cumulative number of txs at the end of
# height N. So tx_counts[0] is 1 - the genesis coinbase # height N. So tx_counts[0] is 1 - the genesis coinbase
size = (self.db_height + 1) * 4 size = (self.db_height + 1) * 4
@ -89,56 +86,51 @@ class DB(object):
else: else:
assert self.db_tx_count == 0 assert self.db_tx_count == 0
def open_dbs(self): async def _open_dbs(self, for_sync):
'''Open the databases. If already open they are closed and re-opened. assert self.utxo_db is None
# First UTXO DB
self.utxo_db = self.db_class('utxo', for_sync)
if self.utxo_db.is_new:
self.logger.info('created new database')
self.logger.info('creating metadata directory')
os.mkdir('meta')
with util.open_file('COIN', create=True) as f:
f.write(f'ElectrumX databases and metadata for '
f'{self.coin.NAME} {self.coin.NET}')
if not self.coin.STATIC_BLOCK_HEADERS:
self.headers_offsets_file.write(0, bytes(8))
else:
self.logger.info(f'opened UTXO DB (for sync: {for_sync})')
self.read_utxo_state()
# Then history DB
self.utxo_flush_count = self.history.open_db(self.db_class, for_sync,
self.utxo_flush_count)
self.clear_excess_undo_info()
# Read TX counts (requires meta directory)
await self._read_tx_counts()
async def open_for_sync(self):
'''Open the databases to sync to the daemon.
When syncing we want to reserve a lot of open files for the When syncing we want to reserve a lot of open files for the
synchronization. When serving clients we want the open files for synchronization. When serving clients we want the open files for
serving network connections. serving network connections.
''' '''
def log_reason(message, is_for_sync): await self._open_dbs(True)
reason = 'sync' if is_for_sync else 'serving'
self.logger.info('{} for {}'.format(message, reason))
# Assume we're serving until we find out otherwise
for for_sync in [False, True]:
if self.utxo_db:
if self.utxo_db.for_sync == for_sync:
return
log_reason('closing DB to re-open', for_sync)
self.utxo_db.close()
self.history.close_db()
# Open DB and metadata files. Record some of its state.
self.utxo_db = self.db_class('utxo', for_sync)
if self.utxo_db.is_new:
self.logger.info('created new database')
self.logger.info('creating metadata directory')
os.mkdir('meta')
with util.open_file('COIN', create=True) as f:
f.write('ElectrumX databases and metadata for {} {}'
.format(self.coin.NAME, self.coin.NET).encode())
else:
log_reason('opened DB', self.utxo_db.for_sync)
self.read_utxo_state()
if self.first_sync == self.utxo_db.for_sync:
break
# Open history DB, clear excess history
self.utxo_flush_count = self.history.open_db(self.db_class, for_sync,
self.utxo_flush_count)
self.clear_excess_undo_info()
self.logger.info('DB version: {:d}'.format(self.db_version)) async def open_for_serving(self):
self.logger.info('coin: {}'.format(self.coin.NAME)) '''Open the databases for serving. If they are already open they are
self.logger.info('network: {}'.format(self.coin.NET)) closed first.
self.logger.info('height: {:,d}'.format(self.db_height)) '''
self.logger.info('tip: {}'.format(hash_to_hex_str(self.db_tip))) if self.utxo_db:
self.logger.info('tx count: {:,d}'.format(self.db_tx_count)) self.logger.info('closing DBs to re-open for serving')
if self.first_sync: self.utxo_db.close()
self.logger.info('sync time so far: {}' self.history.close_db()
.format(util.formatted_time(self.wall_time))) self.utxo_db = None
await self._open_dbs(False)
def fs_update_header_offsets(self, offset_start, height_start, headers): def fs_update_header_offsets(self, offset_start, height_start, headers):
if self.coin.STATIC_BLOCK_HEADERS: if self.coin.STATIC_BLOCK_HEADERS:
@ -355,6 +347,17 @@ class DB(object):
self.wall_time = state['wall_time'] self.wall_time = state['wall_time']
self.first_sync = state['first_sync'] self.first_sync = state['first_sync']
# Log some stats
self.logger.info('DB version: {:d}'.format(self.db_version))
self.logger.info('coin: {}'.format(self.coin.NAME))
self.logger.info('network: {}'.format(self.coin.NET))
self.logger.info('height: {:,d}'.format(self.db_height))
self.logger.info('tip: {}'.format(hash_to_hex_str(self.db_tip)))
self.logger.info('tx count: {:,d}'.format(self.db_tx_count))
if self.first_sync:
self.logger.info('sync time so far: {}'
.format(util.formatted_time(self.wall_time)))
def write_utxo_state(self, batch): def write_utxo_state(self, batch):
'''Write (UTXO) state to the batch.''' '''Write (UTXO) state to the batch.'''
state = { state = {

Loading…
Cancel
Save