Browse Source

Merge branch 'separate-db-bp' into devel

patch-2
Neil Booth 7 years ago
parent
commit
eeb76b0f3f
  1. 2
      compact_history.py
  2. 2
      contrib/query.py
  3. 3
      electrumx/lib/server_base.py
  4. 294
      electrumx/server/block_processor.py
  5. 20
      electrumx/server/chain_state.py
  6. 10
      electrumx/server/controller.py
  7. 237
      electrumx/server/db.py
  8. 20
      electrumx/server/history.py

2
compact_history.py

@ -48,7 +48,7 @@ async def compact_history():
environ['DAEMON_URL'] = '' # Avoid Env erroring out
env = Env()
db = DB(env)
await db.open_for_sync()
await db.open_for_compacting()
assert not db.first_sync
history = db.history

2
contrib/query.py

@ -62,7 +62,7 @@ async def query(args):
db = DB(env)
coin = env.coin
await db._open_dbs(False)
await db.open_for_serving()
if not args.scripts:
await print_stats(db.hist_db, db.utxo_db)

3
electrumx/lib/server_base.py

@ -100,6 +100,9 @@ class ServerBase(object):
self.logger.info('shutting down')
server_task.cancel()
# Prevent some silly logs
await asyncio.sleep(0.01)
self.logger.info('shutdown complete')
def run(self):

294
electrumx/server/block_processor.py

@ -20,8 +20,8 @@ from aiorpcx import TaskGroup, run_in_thread
import electrumx
from electrumx.server.daemon import DaemonError
from electrumx.lib.hash import hash_to_hex_str, HASHX_LEN
from electrumx.lib.util import chunks, formatted_time, class_logger
import electrumx.server.db
from electrumx.lib.util import chunks, class_logger
from electrumx.server.db import FlushData
class Prefetcher(object):
@ -142,26 +142,26 @@ class ChainError(Exception):
'''Raised on error processing blocks.'''
class BlockProcessor(electrumx.server.db.DB):
class BlockProcessor(object):
'''Process blocks and update the DB state to match.
Employ a prefetcher to prefetch blocks in batches for processing.
Coordinate backing up in case of chain reorganisations.
'''
def __init__(self, env, daemon, notifications):
super().__init__(env)
def __init__(self, env, db, daemon, notifications):
self.env = env
self.db = db
self.daemon = daemon
self.notifications = notifications
self.coin = env.coin
self.blocks_event = asyncio.Event()
self.prefetcher = Prefetcher(daemon, env.coin, self.blocks_event)
self.logger = class_logger(__name__, self.__class__.__name__)
# Meta
self.cache_MB = env.cache_MB
self.next_cache_check = 0
self.last_flush = time.time()
self.touched = set()
self.reorg_count = 0
@ -189,17 +189,6 @@ class BlockProcessor(electrumx.server.db.DB):
return await run_in_thread(func, *args)
return await asyncio.shield(run_in_thread_locked())
async def _maybe_flush(self):
# If caught up, flush everything as client queries are
# performed on the DB.
if self._caught_up_event.is_set():
await self.flush(True)
elif time.time() > self.next_cache_check:
flush_arg = self.check_cache_size()
if flush_arg is not None:
await self.flush(flush_arg)
self.next_cache_check = time.time() + 30
async def check_and_advance_blocks(self, raw_blocks):
'''Process the list of raw blocks passed. Detects and handles
reorgs.
@ -217,7 +206,7 @@ class BlockProcessor(electrumx.server.db.DB):
start = time.time()
await self.run_in_thread_with_lock(self.advance_blocks, blocks)
await self._maybe_flush()
if not self.first_sync:
if not self.db.first_sync:
s = '' if len(blocks) == 1 else 's'
self.logger.info('processed {:,d} block{} in {:.1f}s'
.format(len(blocks), s,
@ -257,13 +246,19 @@ class BlockProcessor(electrumx.server.db.DB):
except Exception:
return await self.daemon.raw_blocks(hex_hashes)
def flush_backup():
# self.touched can include other addresses which is
# harmless, but remove None.
self.touched.discard(None)
self.db.flush_backup(self.flush_data(), self.touched)
start, last, hashes = await self.reorg_hashes(count)
# Reverse and convert to hex strings.
hashes = [hash_to_hex_str(hash) for hash in reversed(hashes)]
for hex_hashes in chunks(hashes, 50):
raw_blocks = await get_raw_blocks(last, hex_hashes)
await self.run_in_thread_with_lock(self.backup_blocks, raw_blocks)
await self.backup_flush()
await self.run_in_thread_with_lock(flush_backup)
last -= len(raw_blocks)
await self.prefetcher.reset_height(self.height)
@ -280,7 +275,7 @@ class BlockProcessor(electrumx.server.db.DB):
self.logger.info(f'chain was reorganised replacing {count:,d} '
f'block{s} at heights {start:,d}-{last:,d}')
return start, last, await self.fs_block_hashes(start, count)
return start, last, await self.db.fs_block_hashes(start, count)
async def calc_reorg_range(self, count):
'''Calculate the reorg range'''
@ -298,7 +293,7 @@ class BlockProcessor(electrumx.server.db.DB):
start = self.height - 1
count = 1
while start > 0:
hashes = await self.fs_block_hashes(start, count)
hashes = await self.db.fs_block_hashes(start, count)
hex_hashes = [hash_to_hex_str(hash) for hash in hashes]
d_hex_hashes = await self.daemon.block_hex_hashes(start, count)
n = diff_pos(hex_hashes, d_hex_hashes)
@ -314,135 +309,40 @@ class BlockProcessor(electrumx.server.db.DB):
return start, count
def flush_state(self, batch):
'''Flush chain state to the batch.'''
now = time.time()
self.wall_time += now - self.last_flush
self.last_flush = now
self.last_flush_tx_count = self.tx_count
self.write_utxo_state(batch)
def assert_flushed(self):
'''Asserts state is fully flushed.'''
assert self.tx_count == self.fs_tx_count == self.db_tx_count
assert self.height == self.fs_height == self.db_height
assert not self.undo_infos
assert not self.utxo_cache
assert not self.db_deletes
self.history.assert_flushed()
def estimate_txs_remaining(self):
# Try to estimate how many txs there are to go
daemon_height = self.daemon.cached_height()
coin = self.coin
tail_count = daemon_height - max(self.height, coin.TX_COUNT_HEIGHT)
# Damp the initial enthusiasm
realism = max(2.0 - 0.9 * self.height / coin.TX_COUNT_HEIGHT, 1.0)
return (tail_count * coin.TX_PER_BLOCK +
max(coin.TX_COUNT - self.tx_count, 0)) * realism
# - Flushing
def flush_data(self):
'''The data for a flush. The lock must be taken.'''
assert self.state_lock.locked()
return FlushData(self.height, self.tx_count, self.headers,
self.tx_hashes, self.undo_infos, self.utxo_cache,
self.db_deletes, self.tip)
async def flush(self, flush_utxos):
if self.height == self.db_height:
self.assert_flushed()
else:
await self.run_in_thread_with_lock(self._flush_body, flush_utxos)
def _flush_body(self, flush_utxos):
'''Flush out cached state.
History is always flushed. UTXOs are flushed if flush_utxos.'''
flush_start = time.time()
last_flush = self.last_flush
tx_diff = self.tx_count - self.last_flush_tx_count
# Flush to file system
self.fs_flush()
fs_end = time.time()
if self.utxo_db.for_sync:
self.logger.info('flushed to FS in {:.1f}s'
.format(fs_end - flush_start))
# History next - it's fast and frees memory
hashX_count = self.history.flush()
if self.utxo_db.for_sync:
self.logger.info('flushed history in {:.1f}s for {:,d} addrs'
.format(time.time() - fs_end, hashX_count))
# Flush state last as it reads the wall time.
with self.utxo_db.write_batch() as batch:
if flush_utxos:
self.flush_utxos(batch)
self.flush_state(batch)
# Update and put the wall time again - otherwise we drop the
# time it took to commit the batch
self.flush_state(self.utxo_db)
self.logger.info('flush #{:,d} took {:.1f}s. Height {:,d} txs: {:,d}'
.format(self.history.flush_count,
self.last_flush - flush_start,
self.height, self.tx_count))
# Catch-up stats
if self.utxo_db.for_sync:
tx_per_sec = int(self.tx_count / self.wall_time)
this_tx_per_sec = 1 + int(tx_diff / (self.last_flush - last_flush))
self.logger.info('tx/sec since genesis: {:,d}, '
'since last flush: {:,d}'
.format(tx_per_sec, this_tx_per_sec))
daemon_height = self.daemon.cached_height()
if self.height > self.coin.TX_COUNT_HEIGHT:
tx_est = (daemon_height - self.height) * self.coin.TX_PER_BLOCK
else:
tx_est = ((daemon_height - self.coin.TX_COUNT_HEIGHT)
* self.coin.TX_PER_BLOCK
+ (self.coin.TX_COUNT - self.tx_count))
# Damp the enthusiasm
realism = 2.0 - 0.9 * self.height / self.coin.TX_COUNT_HEIGHT
tx_est *= max(realism, 1.0)
self.logger.info('sync time: {} ETA: {}'
.format(formatted_time(self.wall_time),
formatted_time(tx_est / this_tx_per_sec)))
def fs_flush(self):
'''Flush the things stored on the filesystem.'''
assert self.fs_height + len(self.headers) == self.height
assert self.tx_count == self.tx_counts[-1] if self.tx_counts else 0
self.fs_update(self.fs_height, self.headers, self.tx_hashes)
self.fs_height = self.height
self.fs_tx_count = self.tx_count
self.tx_hashes = []
self.headers = []
async def backup_flush(self):
assert self.height < self.db_height
assert not self.headers
assert not self.tx_hashes
self.history.assert_flushed()
await self.run_in_thread_with_lock(self._backup_flush_body)
def _backup_flush_body(self):
'''Like flush() but when backing up. All UTXOs are flushed.
hashXs - sequence of hashXs which were touched by backing
up. Searched for history entries to remove after the backup
height.
'''
flush_start = time.time()
self.backup_fs(self.height, self.tx_count)
# Backup history. self.touched can include other addresses
# which is harmless, but remove None.
self.touched.discard(None)
nremoves = self.history.backup(self.touched, self.tx_count)
self.logger.info('backing up removed {:,d} history entries'
.format(nremoves))
def flush():
self.db.flush_dbs(self.flush_data(), flush_utxos,
self.estimate_txs_remaining)
await self.run_in_thread_with_lock(flush)
with self.utxo_db.write_batch() as batch:
# Flush state last as it reads the wall time.
self.flush_utxos(batch)
self.flush_state(batch)
self.logger.info('backup flush #{:,d} took {:.1f}s. '
'Height {:,d} txs: {:,d}'
.format(self.history.flush_count,
self.last_flush - flush_start,
self.height, self.tx_count))
async def _maybe_flush(self):
# If caught up, flush everything as client queries are
# performed on the DB.
if self._caught_up_event.is_set():
await self.flush(True)
elif time.time() > self.next_cache_check:
flush_arg = self.check_cache_size()
if flush_arg is not None:
await self.flush(flush_arg)
self.next_cache_check = time.time() + 30
def check_cache_size(self):
'''Flush a cache if it gets too big.'''
@ -451,10 +351,10 @@ class BlockProcessor(electrumx.server.db.DB):
one_MB = 1000*1000
utxo_cache_size = len(self.utxo_cache) * 205
db_deletes_size = len(self.db_deletes) * 57
hist_cache_size = self.history.unflushed_memsize()
hist_cache_size = self.db.history.unflushed_memsize()
# Roughly ntxs * 32 + nblocks * 42
tx_hash_size = ((self.tx_count - self.fs_tx_count) * 32
+ (self.height - self.fs_height) * 42)
tx_hash_size = ((self.tx_count - self.db.fs_tx_count) * 32
+ (self.height - self.db.fs_height) * 42)
utxo_MB = (db_deletes_size + utxo_cache_size) // one_MB
hist_MB = (hist_cache_size + tx_hash_size) // one_MB
@ -465,8 +365,9 @@ class BlockProcessor(electrumx.server.db.DB):
# Flush history if it takes up over 20% of cache memory.
# Flush UTXOs once they take up 80% of cache memory.
if utxo_MB + hist_MB >= self.cache_MB or hist_MB >= self.cache_MB // 5:
return utxo_MB >= self.cache_MB * 4 // 5
cache_MB = self.env.cache_MB
if utxo_MB + hist_MB >= cache_MB or hist_MB >= cache_MB // 5:
return utxo_MB >= cache_MB * 4 // 5
return None
def advance_blocks(self, blocks):
@ -474,7 +375,7 @@ class BlockProcessor(electrumx.server.db.DB):
It is already verified they correctly connect onto our tip.
'''
min_height = self.min_undo_height(self.daemon.cached_height())
min_height = self.db.min_undo_height(self.daemon.cached_height())
height = self.height
for block in blocks:
@ -482,7 +383,7 @@ class BlockProcessor(electrumx.server.db.DB):
undo_info = self.advance_txs(block.transactions)
if height >= min_height:
self.undo_infos.append((undo_info, height))
self.write_raw_block(block.raw, height)
self.db.write_raw_block(block.raw, height)
headers = [block.header for block in blocks]
self.height = height
@ -529,10 +430,10 @@ class BlockProcessor(electrumx.server.db.DB):
update_touched(hashXs)
tx_num += 1
self.history.add_unflushed(hashXs_by_tx, self.tx_count)
self.db.history.add_unflushed(hashXs_by_tx, self.tx_count)
self.tx_count = tx_num
self.tx_counts.append(tx_num)
self.db.tx_counts.append(tx_num)
return undo_info
@ -542,7 +443,7 @@ class BlockProcessor(electrumx.server.db.DB):
The blocks should be in order of decreasing height, starting at.
self.height. A flush is performed once the blocks are backed up.
'''
self.assert_flushed()
self.db.assert_flushed(self.flush_data())
assert self.height >= len(raw_blocks)
coin = self.coin
@ -558,14 +459,14 @@ class BlockProcessor(electrumx.server.db.DB):
self.tip = coin.header_prevhash(block.header)
self.backup_txs(block.transactions)
self.height -= 1
self.tx_counts.pop()
self.db.tx_counts.pop()
self.logger.info('backed up to height {:,d}'.format(self.height))
def backup_txs(self, txs):
# Prevout values, in order down the block (coinbase first if present)
# undo_info is in reverse block order
undo_info = self.read_undo_info(self.height)
undo_info = self.db.read_undo_info(self.height)
if undo_info is None:
raise ChainError('no undo information found for height {:,d}'
.format(self.height))
@ -673,14 +574,14 @@ class BlockProcessor(electrumx.server.db.DB):
# Value: hashX
prefix = b'h' + tx_hash[:4] + idx_packed
candidates = {db_key: hashX for db_key, hashX
in self.utxo_db.iterator(prefix=prefix)}
in self.db.utxo_db.iterator(prefix=prefix)}
for hdb_key, hashX in candidates.items():
tx_num_packed = hdb_key[-4:]
if len(candidates) > 1:
tx_num, = unpack('<I', tx_num_packed)
hash, height = self.fs_tx_hash(tx_num)
hash, height = self.db.fs_tx_hash(tx_num)
if hash != tx_hash:
assert hash is not None # Should always be found
continue
@ -688,7 +589,7 @@ class BlockProcessor(electrumx.server.db.DB):
# Key: b'u' + address_hashX + tx_idx + tx_num
# Value: the UTXO value as a 64-bit unsigned integer
udb_key = b'u' + hashX + hdb_key[-6:]
utxo_value_packed = self.utxo_db.get(udb_key)
utxo_value_packed = self.db.utxo_db.get(udb_key)
if utxo_value_packed:
# Remove both entries for this UTXO
self.db_deletes.append(hdb_key)
@ -698,48 +599,6 @@ class BlockProcessor(electrumx.server.db.DB):
raise ChainError('UTXO {} / {:,d} not found in "h" table'
.format(hash_to_hex_str(tx_hash), tx_idx))
def flush_utxos(self, batch):
'''Flush the cached DB writes and UTXO set to the batch.'''
# Care is needed because the writes generated by flushing the
# UTXO state may have keys in common with our write cache or
# may be in the DB already.
flush_start = time.time()
delete_count = len(self.db_deletes) // 2
utxo_cache_len = len(self.utxo_cache)
# Spends
batch_delete = batch.delete
for key in sorted(self.db_deletes):
batch_delete(key)
self.db_deletes = []
# New UTXOs
batch_put = batch.put
for cache_key, cache_value in self.utxo_cache.items():
# suffix = tx_idx + tx_num
hashX = cache_value[:-12]
suffix = cache_key[-2:] + cache_value[-12:-8]
batch_put(b'h' + cache_key[:4] + suffix, hashX)
batch_put(b'u' + hashX + suffix, cache_value[-8:])
self.utxo_cache = {}
# New undo information
self.flush_undo_infos(batch_put, self.undo_infos)
self.undo_infos = []
if self.utxo_db.for_sync:
self.logger.info('flushed {:,d} blocks with {:,d} txs, {:,d} UTXO '
'adds, {:,d} spends in {:.1f}s, committing...'
.format(self.height - self.db_height,
self.tx_count - self.db_tx_count,
utxo_cache_len, delete_count,
time.time() - flush_start))
self.utxo_flush_count = self.history.flush_count
self.db_tx_count = self.tx_count
self.db_height = self.height
self.db_tip = self.tip
async def _process_prefetched_blocks(self):
'''Loop forever processing blocks as they arrive.'''
while True:
@ -759,8 +618,8 @@ class BlockProcessor(electrumx.server.db.DB):
async def _first_caught_up(self):
self.logger.info(f'caught up to height {self.height}')
# Flush everything but with first_sync->False state.
first_sync = self.first_sync
self.first_sync = False
first_sync = self.db.first_sync
self.db.first_sync = False
await self.flush(True)
if first_sync:
self.logger.info(f'{electrumx.version} synced to '
@ -768,22 +627,13 @@ class BlockProcessor(electrumx.server.db.DB):
# Initialise the notification framework
await self.notifications.on_block(set(), self.height)
# Reopen for serving
await self.open_for_serving()
await self.db.open_for_serving()
async def _first_open_dbs(self):
await self.open_for_sync()
# An incomplete compaction needs to be cancelled otherwise
# restarting it will corrupt the history
self.history.cancel_compaction()
# These are our state as we move ahead of DB state
self.fs_height = self.db_height
self.fs_tx_count = self.db_tx_count
self.height = self.db_height
self.tip = self.db_tip
self.tx_count = self.db_tx_count
self.last_flush_tx_count = self.tx_count
if self.utxo_db.for_sync:
self.logger.info(f'flushing DB cache at {self.cache_MB:,d} MB')
await self.db.open_for_sync()
self.height = self.db.db_height
self.tip = self.db.db_tip
self.tx_count = self.db.db_tx_count
# --- External API

20
electrumx/server/chain_state.py

@ -14,18 +14,18 @@ class ChainState(object):
blocks, transaction history, UTXOs and the mempool.
'''
def __init__(self, env, daemon, bp):
def __init__(self, env, db, daemon, bp):
self._env = env
self._db = db
self._daemon = daemon
self._bp = bp
# External interface pass-throughs for session.py
self.force_chain_reorg = self._bp.force_chain_reorg
self.tx_branch_and_root = self._bp.merkle.branch_and_root
self.read_headers = self._bp.read_headers
self.all_utxos = self._bp.all_utxos
self.limited_history = self._bp.limited_history
self.header_branch_and_root = self._bp.header_branch_and_root
self.force_chain_reorg = bp.force_chain_reorg
self.tx_branch_and_root = db.merkle.branch_and_root
self.read_headers = db.read_headers
self.all_utxos = db.all_utxos
self.limited_history = db.limited_history
self.header_branch_and_root = db.header_branch_and_root
async def broadcast_transaction(self, raw_tx):
return await self._daemon.sendrawtransaction([raw_tx])
@ -34,7 +34,7 @@ class ChainState(object):
return await getattr(self._daemon, method)(*args)
def db_height(self):
return self._bp.db_height
return self._db.db_height
def get_info(self):
'''Chain state info for LocalRPC and logs.'''
@ -57,7 +57,7 @@ class ChainState(object):
async def query(self, args, limit):
coin = self._env.coin
db = self._bp
db = self._db
lines = []
def arg_to_hashX(arg):

10
electrumx/server/controller.py

@ -13,6 +13,7 @@ import electrumx
from electrumx.lib.server_base import ServerBase
from electrumx.lib.util import version_string
from electrumx.server.chain_state import ChainState
from electrumx.server.db import DB
from electrumx.server.mempool import MemPool
from electrumx.server.session import SessionManager
@ -93,10 +94,11 @@ class Controller(ServerBase):
notifications = Notifications()
daemon = env.coin.DAEMON(env)
db = DB(env)
BlockProcessor = env.coin.BLOCK_PROCESSOR
bp = BlockProcessor(env, daemon, notifications)
mempool = MemPool(env.coin, daemon, notifications, bp.lookup_utxos)
chain_state = ChainState(env, daemon, bp)
bp = BlockProcessor(env, db, daemon, notifications)
mempool = MemPool(env.coin, daemon, notifications, db.lookup_utxos)
chain_state = ChainState(env, db, daemon, bp)
session_mgr = SessionManager(env, chain_state, mempool,
notifications, shutdown_event)
@ -108,7 +110,7 @@ class Controller(ServerBase):
await group.spawn(session_mgr.serve(serve_externally_event))
await group.spawn(bp.fetch_and_process_blocks(caught_up_event))
await caught_up_event.wait()
await group.spawn(bp.populate_header_merkle_cache())
await group.spawn(db.populate_header_merkle_cache())
await group.spawn(mempool.keep_synchronized(synchronized_event))
await synchronized_event.wait()
serve_externally_event.set()

237
electrumx/server/db.py

@ -18,17 +18,30 @@ from collections import namedtuple
from glob import glob
from struct import pack, unpack
import attr
from aiorpcx import run_in_thread
import electrumx.lib.util as util
from electrumx.lib.hash import hash_to_hex_str, HASHX_LEN
from electrumx.lib.merkle import Merkle, MerkleCache
from electrumx.lib.util import formatted_time
from electrumx.server.storage import db_class
from electrumx.server.history import History
UTXO = namedtuple("UTXO", "tx_num tx_pos tx_hash height value")
@attr.s(slots=True)
class FlushData(object):
height = attr.ib()
tx_count = attr.ib()
headers = attr.ib()
block_tx_hashes = attr.ib()
# The following are flushed to the UTXO DB if undo_infos is not None
undo_infos = attr.ib()
adds = attr.ib()
deletes = attr.ib()
tip = attr.ib()
class DB(object):
'''Simple wrapper of the backend database for querying.
@ -62,6 +75,7 @@ class DB(object):
self.history = History()
self.utxo_db = None
self.tx_counts = None
self.last_flush = time.time()
self.logger.info(f'using {self.env.db_engine} for DB backend')
@ -90,7 +104,7 @@ class DB(object):
else:
assert self.db_tx_count == 0
async def _open_dbs(self, for_sync):
async def _open_dbs(self, for_sync, compacting):
assert self.utxo_db is None
# First UTXO DB
@ -110,12 +124,16 @@ class DB(object):
# Then history DB
self.utxo_flush_count = self.history.open_db(self.db_class, for_sync,
self.utxo_flush_count)
self.utxo_flush_count,
compacting)
self.clear_excess_undo_info()
# Read TX counts (requires meta directory)
await self._read_tx_counts()
async def open_for_compacting(self):
await self._open_dbs(True, True)
async def open_for_sync(self):
'''Open the databases to sync to the daemon.
@ -123,7 +141,7 @@ class DB(object):
synchronization. When serving clients we want the open files for
serving network connections.
'''
await self._open_dbs(True)
await self._open_dbs(True, False)
async def open_for_serving(self):
'''Open the databases for serving. If they are already open they are
@ -134,13 +152,13 @@ class DB(object):
self.utxo_db.close()
self.history.close_db()
self.utxo_db = None
await self._open_dbs(False)
await self._open_dbs(False, False)
# Header merkle cache
async def populate_header_merkle_cache(self):
self.logger.info('populating header merkle cache...')
length = max(1, self.height - self.env.reorg_limit)
length = max(1, self.db_height - self.env.reorg_limit)
start = time.time()
await self.header_mc.initialize(length)
elapsed = time.time() - start
@ -149,6 +167,178 @@ class DB(object):
async def header_branch_and_root(self, length, height):
return await self.header_mc.branch_and_root(length, height)
# Flushing
def assert_flushed(self, flush_data):
'''Asserts state is fully flushed.'''
assert flush_data.tx_count == self.fs_tx_count == self.db_tx_count
assert flush_data.height == self.fs_height == self.db_height
assert flush_data.tip == self.db_tip
assert not flush_data.headers
assert not flush_data.block_tx_hashes
assert not flush_data.adds
assert not flush_data.deletes
assert not flush_data.undo_infos
self.history.assert_flushed()
def flush_dbs(self, flush_data, flush_utxos, estimate_txs_remaining):
'''Flush out cached state. History is always flushed; UTXOs are
flushed if flush_utxos.'''
if flush_data.height == self.db_height:
self.assert_flushed(flush_data)
return
start_time = time.time()
prior_flush = self.last_flush
tx_delta = flush_data.tx_count - self.last_flush_tx_count
# Flush to file system
self.flush_fs(flush_data)
# Then history
self.flush_history()
# Flush state last as it reads the wall time.
with self.utxo_db.write_batch() as batch:
if flush_utxos:
self.flush_utxo_db(batch, flush_data)
self.flush_state(batch)
# Update and put the wall time again - otherwise we drop the
# time it took to commit the batch
self.flush_state(self.utxo_db)
elapsed = self.last_flush - start_time
self.logger.info(f'flush #{self.history.flush_count:,d} took '
f'{elapsed:.1f}s. Height {flush_data.height:,d} '
f'txs: {flush_data.tx_count:,d} ({tx_delta:+,d})')
# Catch-up stats
if self.utxo_db.for_sync:
flush_interval = self.last_flush - prior_flush
tx_per_sec_gen = int(flush_data.tx_count / self.wall_time)
tx_per_sec_last = 1 + int(tx_delta / flush_interval)
eta = estimate_txs_remaining() / tx_per_sec_last
self.logger.info(f'tx/sec since genesis: {tx_per_sec_gen:,d}, '
f'since last flush: {tx_per_sec_last:,d}')
self.logger.info(f'sync time: {formatted_time(self.wall_time)} '
f'ETA: {formatted_time(eta)}')
def flush_fs(self, flush_data):
'''Write headers, tx counts and block tx hashes to the filesystem.
The first height to write is self.fs_height + 1. The FS
metadata is all append-only, so in a crash we just pick up
again from the height stored in the DB.
'''
prior_tx_count = (self.tx_counts[self.fs_height]
if self.fs_height >= 0 else 0)
assert len(flush_data.block_tx_hashes) == len(flush_data.headers)
assert flush_data.height == self.fs_height + len(flush_data.headers)
assert flush_data.tx_count == (self.tx_counts[-1] if self.tx_counts
else 0)
assert len(self.tx_counts) == flush_data.height + 1
hashes = b''.join(flush_data.block_tx_hashes)
flush_data.block_tx_hashes.clear()
assert len(hashes) % 32 == 0
assert len(hashes) // 32 == flush_data.tx_count - prior_tx_count
# Write the headers, tx counts, and tx hashes
start_time = time.time()
height_start = self.fs_height + 1
offset = self.header_offset(height_start)
self.headers_file.write(offset, b''.join(flush_data.headers))
self.fs_update_header_offsets(offset, height_start, flush_data.headers)
flush_data.headers.clear()
offset = height_start * self.tx_counts.itemsize
self.tx_counts_file.write(offset,
self.tx_counts[height_start:].tobytes())
offset = prior_tx_count * 32
self.hashes_file.write(offset, hashes)
self.fs_height = flush_data.height
self.fs_tx_count = flush_data.tx_count
if self.utxo_db.for_sync:
elapsed = time.time() - start_time
self.logger.info(f'flushed filesystem data in {elapsed:.2f}s')
def flush_history(self):
self.history.flush()
def flush_utxo_db(self, batch, flush_data):
'''Flush the cached DB writes and UTXO set to the batch.'''
# Care is needed because the writes generated by flushing the
# UTXO state may have keys in common with our write cache or
# may be in the DB already.
start_time = time.time()
add_count = len(flush_data.adds)
spend_count = len(flush_data.deletes) // 2
# Spends
batch_delete = batch.delete
for key in sorted(flush_data.deletes):
batch_delete(key)
flush_data.deletes.clear()
# New UTXOs
batch_put = batch.put
for key, value in flush_data.adds.items():
# suffix = tx_idx + tx_num
hashX = value[:-12]
suffix = key[-2:] + value[-12:-8]
batch_put(b'h' + key[:4] + suffix, hashX)
batch_put(b'u' + hashX + suffix, value[-8:])
flush_data.adds.clear()
# New undo information
self.flush_undo_infos(batch_put, flush_data.undo_infos)
flush_data.undo_infos.clear()
if self.utxo_db.for_sync:
block_count = flush_data.height - self.db_height
tx_count = flush_data.tx_count - self.db_tx_count
elapsed = time.time() - start_time
self.logger.info(f'flushed {block_count:,d} blocks with '
f'{tx_count:,d} txs, {add_count:,d} UTXO adds, '
f'{spend_count:,d} spends in '
f'{elapsed:.1f}s, committing...')
self.utxo_flush_count = self.history.flush_count
self.db_height = flush_data.height
self.db_tx_count = flush_data.tx_count
self.db_tip = flush_data.tip
def flush_state(self, batch):
'''Flush chain state to the batch.'''
now = time.time()
self.wall_time += now - self.last_flush
self.last_flush = now
self.last_flush_tx_count = self.fs_tx_count
self.write_utxo_state(batch)
def flush_backup(self, flush_data, touched):
'''Like flush_dbs() but when backing up. All UTXOs are flushed.'''
assert not flush_data.headers
assert not flush_data.block_tx_hashes
assert flush_data.height < self.db_height
self.history.assert_flushed()
start_time = time.time()
tx_delta = flush_data.tx_count - self.last_flush_tx_count
self.backup_fs(flush_data.height, flush_data.tx_count)
self.history.backup(touched, flush_data.tx_count)
with self.utxo_db.write_batch() as batch:
self.flush_utxo_db(batch, flush_data)
# Flush state last as it reads the wall time.
self.flush_state(batch)
elapsed = self.last_flush - start_time
self.logger.info(f'backup flush #{self.history.flush_count:,d} took '
f'{elapsed:.1f}s. Height {flush_data.height:,d} '
f'txs: {flush_data.tx_count:,d} ({tx_delta:+,d})')
def fs_update_header_offsets(self, offset_start, height_start, headers):
if self.coin.STATIC_BLOCK_HEADERS:
return
@ -178,36 +368,6 @@ class DB(object):
# Truncate header_mc: header count is 1 more than the height.
self.header_mc.truncate(height + 1)
def fs_update(self, fs_height, headers, block_tx_hashes):
'''Write headers, the tx_count array and block tx hashes to disk.
Their first height is fs_height. No recorded DB state is
updated. These arrays are all append only, so in a crash we
just pick up again from the DB height.
'''
blocks_done = len(headers)
height_start = fs_height + 1
new_height = fs_height + blocks_done
prior_tx_count = (self.tx_counts[fs_height] if fs_height >= 0 else 0)
cur_tx_count = self.tx_counts[-1] if self.tx_counts else 0
txs_done = cur_tx_count - prior_tx_count
assert len(block_tx_hashes) == blocks_done
assert len(self.tx_counts) == new_height + 1
hashes = b''.join(block_tx_hashes)
assert len(hashes) % 32 == 0
assert len(hashes) // 32 == txs_done
# Write the headers, tx counts, and tx hashes
offset = self.header_offset(height_start)
self.headers_file.write(offset, b''.join(headers))
self.fs_update_header_offsets(offset, height_start, headers)
offset = height_start * self.tx_counts.itemsize
self.tx_counts_file.write(offset,
self.tx_counts[height_start:].tobytes())
offset = prior_tx_count * 32
self.hashes_file.write(offset, hashes)
async def read_headers(self, start_height, count):
'''Requires start_height >= 0, count >= 0. Reads as many headers as
are available starting at start_height up to count. This
@ -379,6 +539,11 @@ class DB(object):
self.wall_time = state['wall_time']
self.first_sync = state['first_sync']
# These are our state as we move ahead of DB state
self.fs_height = self.db_height
self.fs_tx_count = self.db_tx_count
self.last_flush_tx_count = self.fs_tx_count
# Log some stats
self.logger.info('DB version: {:d}'.format(self.db_version))
self.logger.info('coin: {}'.format(self.coin.NAME))
@ -386,6 +551,8 @@ class DB(object):
self.logger.info('height: {:,d}'.format(self.db_height))
self.logger.info('tip: {}'.format(hash_to_hex_str(self.db_tip)))
self.logger.info('tx count: {:,d}'.format(self.db_tx_count))
if self.utxo_db.for_sync:
self.logger.info(f'flushing DB cache at {self.env.cache_MB:,d} MB')
if self.first_sync:
self.logger.info('sync time so far: {}'
.format(util.formatted_time(self.wall_time)))

20
electrumx/server/history.py

@ -11,6 +11,7 @@
import array
import ast
import bisect
import time
from collections import defaultdict
from functools import partial
from struct import pack, unpack
@ -31,10 +32,14 @@ class History(object):
self.unflushed_count = 0
self.db = None
def open_db(self, db_class, for_sync, utxo_flush_count):
def open_db(self, db_class, for_sync, utxo_flush_count, compacting):
self.db = db_class('hist', for_sync)
self.read_state()
self.clear_excess(utxo_flush_count)
# An incomplete compaction needs to be cancelled otherwise
# restarting it will corrupt the history
if not compacting:
self._cancel_compaction()
return self.flush_count
def close_db(self):
@ -80,7 +85,7 @@ class History(object):
if flush_id > utxo_flush_count:
keys.append(key)
self.logger.info('deleting {:,d} history entries'.format(len(keys)))
self.logger.info(f'deleting {len(keys):,d} history entries')
self.flush_count = utxo_flush_count
with self.db.write_batch() as batch:
@ -119,6 +124,7 @@ class History(object):
assert not self.unflushed
def flush(self):
start_time = time.time()
self.flush_count += 1
flush_id = pack('>H', self.flush_count)
unflushed = self.unflushed
@ -132,7 +138,11 @@ class History(object):
count = len(unflushed)
unflushed.clear()
self.unflushed_count = 0
return count
if self.db.for_sync:
elapsed = time.time() - start_time
self.logger.info(f'flushed history in {elapsed:.1f}s '
f'for {count:,d} addrs')
def backup(self, hashXs, tx_count):
# Not certain this is needed, but it doesn't hurt
@ -161,7 +171,7 @@ class History(object):
batch.put(key, value)
self.write_state(batch)
return nremoves
self.logger.info(f'backing up removed {nremoves:,d} history entries')
def get_txnums(self, hashX, limit=1000):
'''Generator that returns an unpruned, sorted list of tx_nums in the
@ -307,7 +317,7 @@ class History(object):
100 * cursor / 65536))
return write_size
def cancel_compaction(self):
def _cancel_compaction(self):
if self.comp_cursor != -1:
self.logger.warning('cancelling in-progress history compaction')
self.comp_flush_count = -1

Loading…
Cancel
Save