Browse Source

Separate the block processor from the DB

- BP no longer inherits from the DB, but is passed it
patch-2
Neil Booth 7 years ago
parent
commit
967b2de60d
  1. 61
      electrumx/server/block_processor.py
  2. 20
      electrumx/server/chain_state.py
  3. 10
      electrumx/server/controller.py
  4. 8
      electrumx/server/db.py

61
electrumx/server/block_processor.py

@ -142,21 +142,23 @@ class ChainError(Exception):
'''Raised on error processing blocks.'''
class BlockProcessor(DB):
class BlockProcessor(object):
'''Process blocks and update the DB state to match.
Employ a prefetcher to prefetch blocks in batches for processing.
Coordinate backing up in case of chain reorganisations.
'''
def __init__(self, env, daemon, notifications):
super().__init__(env)
def __init__(self, env, db, daemon, notifications):
self.env = env
self.db = db
self.daemon = daemon
self.notifications = notifications
self.coin = env.coin
self.blocks_event = asyncio.Event()
self.prefetcher = Prefetcher(daemon, env.coin, self.blocks_event)
self.logger = class_logger(__name__, self.__class__.__name__)
# Meta
self.next_cache_check = 0
@ -204,7 +206,7 @@ class BlockProcessor(DB):
start = time.time()
await self.run_in_thread_with_lock(self.advance_blocks, blocks)
await self._maybe_flush()
if not self.first_sync:
if not self.db.first_sync:
s = '' if len(blocks) == 1 else 's'
self.logger.info('processed {:,d} block{} in {:.1f}s'
.format(len(blocks), s,
@ -254,7 +256,7 @@ class BlockProcessor(DB):
# harmless, but remove None.
self.touched.discard(None)
await self.run_in_thread_with_lock(
self.flush_backup, self.flush_data(), self.touched)
self.db.flush_backup, self.flush_data(), self.touched)
last -= len(raw_blocks)
await self.prefetcher.reset_height(self.height)
@ -271,7 +273,7 @@ class BlockProcessor(DB):
self.logger.info(f'chain was reorganised replacing {count:,d} '
f'block{s} at heights {start:,d}-{last:,d}')
return start, last, await self.fs_block_hashes(start, count)
return start, last, await self.db.fs_block_hashes(start, count)
async def calc_reorg_range(self, count):
'''Calculate the reorg range'''
@ -289,7 +291,7 @@ class BlockProcessor(DB):
start = self.height - 1
count = 1
while start > 0:
hashes = await self.fs_block_hashes(start, count)
hashes = await self.db.fs_block_hashes(start, count)
hex_hashes = [hash_to_hex_str(hash) for hash in hashes]
d_hex_hashes = await self.daemon.block_hex_hashes(start, count)
n = diff_pos(hex_hashes, d_hex_hashes)
@ -323,7 +325,8 @@ class BlockProcessor(DB):
async def flush(self, flush_utxos):
await self.run_in_thread_with_lock(
self.flush_dbs, self.flush_data(), flush_utxos)
self.db.flush_dbs, self.flush_data(), flush_utxos,
self.estimate_txs_remaining)
async def _maybe_flush(self):
# If caught up, flush everything as client queries are
@ -343,10 +346,10 @@ class BlockProcessor(DB):
one_MB = 1000*1000
utxo_cache_size = len(self.utxo_cache) * 205
db_deletes_size = len(self.db_deletes) * 57
hist_cache_size = self.history.unflushed_memsize()
hist_cache_size = self.db.history.unflushed_memsize()
# Roughly ntxs * 32 + nblocks * 42
tx_hash_size = ((self.tx_count - self.fs_tx_count) * 32
+ (self.height - self.fs_height) * 42)
tx_hash_size = ((self.tx_count - self.db.fs_tx_count) * 32
+ (self.height - self.db.fs_height) * 42)
utxo_MB = (db_deletes_size + utxo_cache_size) // one_MB
hist_MB = (hist_cache_size + tx_hash_size) // one_MB
@ -367,7 +370,7 @@ class BlockProcessor(DB):
It is already verified they correctly connect onto our tip.
'''
min_height = self.min_undo_height(self.daemon.cached_height())
min_height = self.db.min_undo_height(self.daemon.cached_height())
height = self.height
for block in blocks:
@ -375,7 +378,7 @@ class BlockProcessor(DB):
undo_info = self.advance_txs(block.transactions)
if height >= min_height:
self.undo_infos.append((undo_info, height))
self.write_raw_block(block.raw, height)
self.db.write_raw_block(block.raw, height)
headers = [block.header for block in blocks]
self.height = height
@ -422,10 +425,10 @@ class BlockProcessor(DB):
update_touched(hashXs)
tx_num += 1
self.history.add_unflushed(hashXs_by_tx, self.tx_count)
self.db.history.add_unflushed(hashXs_by_tx, self.tx_count)
self.tx_count = tx_num
self.tx_counts.append(tx_num)
self.db.tx_counts.append(tx_num)
return undo_info
@ -435,7 +438,7 @@ class BlockProcessor(DB):
The blocks should be in order of decreasing height, starting at.
self.height. A flush is performed once the blocks are backed up.
'''
self.assert_flushed(self.flush_data())
self.db.assert_flushed(self.flush_data())
assert self.height >= len(raw_blocks)
coin = self.coin
@ -451,14 +454,14 @@ class BlockProcessor(DB):
self.tip = coin.header_prevhash(block.header)
self.backup_txs(block.transactions)
self.height -= 1
self.tx_counts.pop()
self.db.tx_counts.pop()
self.logger.info('backed up to height {:,d}'.format(self.height))
def backup_txs(self, txs):
# Prevout values, in order down the block (coinbase first if present)
# undo_info is in reverse block order
undo_info = self.read_undo_info(self.height)
undo_info = self.db.read_undo_info(self.height)
if undo_info is None:
raise ChainError('no undo information found for height {:,d}'
.format(self.height))
@ -566,14 +569,14 @@ class BlockProcessor(DB):
# Value: hashX
prefix = b'h' + tx_hash[:4] + idx_packed
candidates = {db_key: hashX for db_key, hashX
in self.utxo_db.iterator(prefix=prefix)}
in self.db.utxo_db.iterator(prefix=prefix)}
for hdb_key, hashX in candidates.items():
tx_num_packed = hdb_key[-4:]
if len(candidates) > 1:
tx_num, = unpack('<I', tx_num_packed)
hash, height = self.fs_tx_hash(tx_num)
hash, height = self.db.fs_tx_hash(tx_num)
if hash != tx_hash:
assert hash is not None # Should always be found
continue
@ -581,7 +584,7 @@ class BlockProcessor(DB):
# Key: b'u' + address_hashX + tx_idx + tx_num
# Value: the UTXO value as a 64-bit unsigned integer
udb_key = b'u' + hashX + hdb_key[-6:]
utxo_value_packed = self.utxo_db.get(udb_key)
utxo_value_packed = self.db.utxo_db.get(udb_key)
if utxo_value_packed:
# Remove both entries for this UTXO
self.db_deletes.append(hdb_key)
@ -610,8 +613,8 @@ class BlockProcessor(DB):
async def _first_caught_up(self):
self.logger.info(f'caught up to height {self.height}')
# Flush everything but with first_sync->False state.
first_sync = self.first_sync
self.first_sync = False
first_sync = self.db.first_sync
self.db.first_sync = False
await self.flush(True)
if first_sync:
self.logger.info(f'{electrumx.version} synced to '
@ -619,13 +622,13 @@ class BlockProcessor(DB):
# Initialise the notification framework
await self.notifications.on_block(set(), self.height)
# Reopen for serving
await self.open_for_serving()
await self.db.open_for_serving()
async def _first_open_dbs(self):
await self.open_for_sync()
self.height = self.db_height
self.tip = self.db_tip
self.tx_count = self.db_tx_count
await self.db.open_for_sync()
self.height = self.db.db_height
self.tip = self.db.db_tip
self.tx_count = self.db.db_tx_count
# --- External API

20
electrumx/server/chain_state.py

@ -14,18 +14,18 @@ class ChainState(object):
blocks, transaction history, UTXOs and the mempool.
'''
def __init__(self, env, daemon, bp):
def __init__(self, env, db, daemon, bp):
self._env = env
self._db = db
self._daemon = daemon
self._bp = bp
# External interface pass-throughs for session.py
self.force_chain_reorg = self._bp.force_chain_reorg
self.tx_branch_and_root = self._bp.merkle.branch_and_root
self.read_headers = self._bp.read_headers
self.all_utxos = self._bp.all_utxos
self.limited_history = self._bp.limited_history
self.header_branch_and_root = self._bp.header_branch_and_root
self.force_chain_reorg = bp.force_chain_reorg
self.tx_branch_and_root = db.merkle.branch_and_root
self.read_headers = db.read_headers
self.all_utxos = db.all_utxos
self.limited_history = db.limited_history
self.header_branch_and_root = db.header_branch_and_root
async def broadcast_transaction(self, raw_tx):
return await self._daemon.sendrawtransaction([raw_tx])
@ -34,7 +34,7 @@ class ChainState(object):
return await getattr(self._daemon, method)(*args)
def db_height(self):
return self._bp.db_height
return self._db.db_height
def get_info(self):
'''Chain state info for LocalRPC and logs.'''
@ -57,7 +57,7 @@ class ChainState(object):
async def query(self, args, limit):
coin = self._env.coin
db = self._bp
db = self._db
lines = []
def arg_to_hashX(arg):

10
electrumx/server/controller.py

@ -13,6 +13,7 @@ import electrumx
from electrumx.lib.server_base import ServerBase
from electrumx.lib.util import version_string
from electrumx.server.chain_state import ChainState
from electrumx.server.db import DB
from electrumx.server.mempool import MemPool
from electrumx.server.session import SessionManager
@ -93,10 +94,11 @@ class Controller(ServerBase):
notifications = Notifications()
daemon = env.coin.DAEMON(env)
db = DB(env)
BlockProcessor = env.coin.BLOCK_PROCESSOR
bp = BlockProcessor(env, daemon, notifications)
mempool = MemPool(env.coin, daemon, notifications, bp.lookup_utxos)
chain_state = ChainState(env, daemon, bp)
bp = BlockProcessor(env, db, daemon, notifications)
mempool = MemPool(env.coin, daemon, notifications, db.lookup_utxos)
chain_state = ChainState(env, db, daemon, bp)
session_mgr = SessionManager(env, chain_state, mempool,
notifications, shutdown_event)
@ -108,7 +110,7 @@ class Controller(ServerBase):
await group.spawn(session_mgr.serve(serve_externally_event))
await group.spawn(bp.fetch_and_process_blocks(caught_up_event))
await caught_up_event.wait()
await group.spawn(bp.populate_header_merkle_cache())
await group.spawn(db.populate_header_merkle_cache())
await group.spawn(mempool.keep_synchronized(synchronized_event))
await synchronized_event.wait()
serve_externally_event.set()

8
electrumx/server/db.py

@ -158,7 +158,7 @@ class DB(object):
async def populate_header_merkle_cache(self):
self.logger.info('populating header merkle cache...')
length = max(1, self.height - self.env.reorg_limit)
length = max(1, self.db_height - self.env.reorg_limit)
start = time.time()
await self.header_mc.initialize(length)
elapsed = time.time() - start
@ -172,7 +172,7 @@ class DB(object):
'''Asserts state is fully flushed.'''
assert flush_data.tx_count == self.fs_tx_count == self.db_tx_count
assert flush_data.height == self.fs_height == self.db_height
assert flush_data.tip == self.tip
assert flush_data.tip == self.db_tip
assert not flush_data.headers
assert not flush_data.block_tx_hashes
assert not flush_data.adds
@ -180,7 +180,7 @@ class DB(object):
assert not flush_data.undo_infos
self.history.assert_flushed()
def flush_dbs(self, flush_data, flush_utxos):
def flush_dbs(self, flush_data, flush_utxos, estimate_txs_remaining):
'''Flush out cached state. History is always flushed; UTXOs are
flushed if flush_utxos.'''
if flush_data.height == self.db_height:
@ -217,7 +217,7 @@ class DB(object):
flush_interval = self.last_flush - prior_flush
tx_per_sec_gen = int(flush_data.tx_count / self.wall_time)
tx_per_sec_last = 1 + int(tx_delta / flush_interval)
eta = self.estimate_txs_remaining() / tx_per_sec_last
eta = estimate_txs_remaining() / tx_per_sec_last
self.logger.info(f'tx/sec since genesis: {tx_per_sec_gen:,d}, '
f'since last flush: {tx_per_sec_last:,d}')
self.logger.info(f'sync time: {formatted_time(self.wall_time)} '

Loading…
Cancel
Save