diff --git a/docs/protocol-methods.rst b/docs/protocol-methods.rst index bbfda31..a37e19f 100644 --- a/docs/protocol-methods.rst +++ b/docs/protocol-methods.rst @@ -50,7 +50,7 @@ Return the block header at the given height. **Example Result** -With *cp_height* zero: +With *height* 5 and *cp_height* 0 on the Bitcoin Cash chain: :: @@ -58,7 +58,7 @@ With *cp_height* zero: .. _cp_height example: -With *cp_height* 8 on the Bitcoin Cash chain:: +With *cp_height* 8:: { "branch": [ diff --git a/electrumx/lib/merkle.py b/electrumx/lib/merkle.py index d8e5971..439e0a7 100644 --- a/electrumx/lib/merkle.py +++ b/electrumx/lib/merkle.py @@ -28,6 +28,8 @@ from math import ceil, log +from aiorpcx import Event + from electrumx.lib.hash import double_sha256 @@ -168,6 +170,7 @@ class MerkleCache(object): self.source_func = source_func self.length = 0 self.depth_higher = 0 + self.initialized = Event() def _segment_length(self): return 1 << self.depth_higher @@ -210,6 +213,7 @@ class MerkleCache(object): self.length = length self.depth_higher = self.merkle.tree_depth(length) // 2 self.level = self._level(await self.source_func(0, length)) + self.initialized.set() def truncate(self, length): '''Truncate the cache so it covers no more than length underlying @@ -238,6 +242,7 @@ class MerkleCache(object): raise ValueError('length must be positive') if index >= length: raise ValueError('index must be less than length') + await self.initialized.wait() await self._extend_to(length) leaf_start = self._leaf_start(index) count = min(self._segment_length(), length - leaf_start) diff --git a/electrumx/server/block_processor.py b/electrumx/server/block_processor.py index 18b697a..b7d0f6f 100644 --- a/electrumx/server/block_processor.py +++ b/electrumx/server/block_processor.py @@ -20,7 +20,6 @@ from aiorpcx import TaskGroup, run_in_thread import electrumx from electrumx.server.daemon import DaemonError from electrumx.lib.hash import hash_to_hex_str, HASHX_LEN -from electrumx.lib.merkle import Merkle, MerkleCache from electrumx.lib.util import chunks, formatted_time, class_logger import electrumx.server.db @@ -166,10 +165,6 @@ class BlockProcessor(electrumx.server.db.DB): self.touched = set() self.reorg_count = 0 - # Header merkle cache - self.merkle = Merkle() - self.header_mc = MerkleCache(self.merkle, self.fs_block_hashes) - # Caches of unflushed items. self.headers = [] self.tx_hashes = [] @@ -268,8 +263,6 @@ class BlockProcessor(electrumx.server.db.DB): await self.run_in_thread_with_lock(self.backup_blocks, raw_blocks) await self.backup_flush() last -= len(raw_blocks) - # Truncate header_mc: header count is 1 more than the height. - self.header_mc.truncate(self.height + 1) await self.prefetcher.reset_height(self.height) async def reorg_hashes(self, count): @@ -429,9 +422,7 @@ class BlockProcessor(electrumx.server.db.DB): ''' flush_start = time.time() - # Backup FS (just move the pointers back) - self.fs_height = self.height - self.fs_tx_count = self.tx_count + self.backup_fs(self.height, self.tx_count) # Backup history. self.touched can include other addresses # which is harmless, but remove None. @@ -776,10 +767,6 @@ class BlockProcessor(electrumx.server.db.DB): await self.notifications.on_block(set(), self.height) # Reopen for serving await self.open_for_serving() - # Populate the header merkle cache - length = max(1, self.height - self.env.reorg_limit) - await self.header_mc.initialize(length) - self.logger.info('populated header merkle cache') async def _first_open_dbs(self): await self.open_for_sync() diff --git a/electrumx/server/chain_state.py b/electrumx/server/chain_state.py index 18ee7d7..8e33830 100644 --- a/electrumx/server/chain_state.py +++ b/electrumx/server/chain_state.py @@ -25,6 +25,7 @@ class ChainState(object): self.read_headers = self._bp.read_headers self.all_utxos = self._bp.all_utxos self.limited_history = self._bp.limited_history + self.header_branch_and_root = self._bp.header_branch_and_root async def broadcast_transaction(self, raw_tx): return await self._daemon.sendrawtransaction([raw_tx]) @@ -43,9 +44,6 @@ class ChainState(object): 'db_height': self.db_height(), } - async def header_branch_and_root(self, length, height): - return self._bp.header_mc.branch_and_root(length, height) - async def raw_header(self, height): '''Return the binary header at the given height.''' header, n = await self.read_headers(height, 1) diff --git a/electrumx/server/controller.py b/electrumx/server/controller.py index 6424d0e..a43ee95 100644 --- a/electrumx/server/controller.py +++ b/electrumx/server/controller.py @@ -108,6 +108,7 @@ class Controller(ServerBase): await group.spawn(session_mgr.serve(serve_externally_event)) await group.spawn(bp.fetch_and_process_blocks(caught_up_event)) await caught_up_event.wait() + await group.spawn(bp.populate_header_merkle_cache()) await group.spawn(mempool.keep_synchronized(synchronized_event)) await synchronized_event.wait() serve_externally_event.set() diff --git a/electrumx/server/db.py b/electrumx/server/db.py index a6177a3..b23f87c 100644 --- a/electrumx/server/db.py +++ b/electrumx/server/db.py @@ -12,6 +12,7 @@ import array import ast import os +import time from bisect import bisect_right from collections import namedtuple from glob import glob @@ -21,6 +22,7 @@ from aiorpcx import run_in_thread import electrumx.lib.util as util from electrumx.lib.hash import hash_to_hex_str, HASHX_LEN +from electrumx.lib.merkle import Merkle, MerkleCache from electrumx.server.storage import db_class from electrumx.server.history import History @@ -63,6 +65,10 @@ class DB(object): self.logger.info(f'using {self.env.db_engine} for DB backend') + # Header merkle cache + self.merkle = Merkle() + self.header_mc = MerkleCache(self.merkle, self.fs_block_hashes) + self.headers_file = util.LogicalFile('meta/headers', 2, 16000000) self.tx_counts_file = util.LogicalFile('meta/txcounts', 2, 2000000) self.hashes_file = util.LogicalFile('meta/hashes', 4, 16000000) @@ -130,6 +136,19 @@ class DB(object): self.utxo_db = None await self._open_dbs(False) + # Header merkle cache + + async def populate_header_merkle_cache(self): + self.logger.info('populating header merkle cache...') + length = max(1, self.height - self.env.reorg_limit) + start = time.time() + await self.header_mc.initialize(length) + elapsed = time.time() - start + self.logger.info(f'header merkle cache populated in {elapsed:.1f}s') + + async def header_branch_and_root(self, length, height): + return await self.header_mc.branch_and_root(length, height) + def fs_update_header_offsets(self, offset_start, height_start, headers): if self.coin.STATIC_BLOCK_HEADERS: return @@ -152,6 +171,13 @@ class DB(object): return self.dynamic_header_offset(height + 1)\ - self.dynamic_header_offset(height) + def backup_fs(self, height, tx_count): + '''Back up during a reorg. This just updates our pointers.''' + self.fs_height = height + self.fs_tx_count = tx_count + # Truncate header_mc: header count is 1 more than the height. + self.header_mc.truncate(height + 1) + def fs_update(self, fs_height, headers, block_tx_hashes): '''Write headers, the tx_count array and block tx hashes to disk.