Browse Source

Populate the header merkle cache in a thread

- It can take a while
- Client requests that need it will block until it's done
- It's a function of FS state so move it to the DB
- Tweak docs

Fixes #558
patch-2
Neil Booth 7 years ago
parent
commit
e0ccf0cce3
  1. 4
      docs/protocol-methods.rst
  2. 5
      electrumx/lib/merkle.py
  3. 15
      electrumx/server/block_processor.py
  4. 4
      electrumx/server/chain_state.py
  5. 1
      electrumx/server/controller.py
  6. 26
      electrumx/server/db.py

4
docs/protocol-methods.rst

@ -50,7 +50,7 @@ Return the block header at the given height.
**Example Result**
With *cp_height* zero:
With *height* 5 and *cp_height* 0 on the Bitcoin Cash chain:
::
@ -58,7 +58,7 @@ With *cp_height* zero:
.. _cp_height example:
With *cp_height* 8 on the Bitcoin Cash chain::
With *cp_height* 8::
{
"branch": [

5
electrumx/lib/merkle.py

@ -28,6 +28,8 @@
from math import ceil, log
from aiorpcx import Event
from electrumx.lib.hash import double_sha256
@ -168,6 +170,7 @@ class MerkleCache(object):
self.source_func = source_func
self.length = 0
self.depth_higher = 0
self.initialized = Event()
def _segment_length(self):
return 1 << self.depth_higher
@ -210,6 +213,7 @@ class MerkleCache(object):
self.length = length
self.depth_higher = self.merkle.tree_depth(length) // 2
self.level = self._level(await self.source_func(0, length))
self.initialized.set()
def truncate(self, length):
'''Truncate the cache so it covers no more than length underlying
@ -238,6 +242,7 @@ class MerkleCache(object):
raise ValueError('length must be positive')
if index >= length:
raise ValueError('index must be less than length')
await self.initialized.wait()
await self._extend_to(length)
leaf_start = self._leaf_start(index)
count = min(self._segment_length(), length - leaf_start)

15
electrumx/server/block_processor.py

@ -20,7 +20,6 @@ from aiorpcx import TaskGroup, run_in_thread
import electrumx
from electrumx.server.daemon import DaemonError
from electrumx.lib.hash import hash_to_hex_str, HASHX_LEN
from electrumx.lib.merkle import Merkle, MerkleCache
from electrumx.lib.util import chunks, formatted_time, class_logger
import electrumx.server.db
@ -166,10 +165,6 @@ class BlockProcessor(electrumx.server.db.DB):
self.touched = set()
self.reorg_count = 0
# Header merkle cache
self.merkle = Merkle()
self.header_mc = MerkleCache(self.merkle, self.fs_block_hashes)
# Caches of unflushed items.
self.headers = []
self.tx_hashes = []
@ -268,8 +263,6 @@ class BlockProcessor(electrumx.server.db.DB):
await self.run_in_thread_with_lock(self.backup_blocks, raw_blocks)
await self.backup_flush()
last -= len(raw_blocks)
# Truncate header_mc: header count is 1 more than the height.
self.header_mc.truncate(self.height + 1)
await self.prefetcher.reset_height(self.height)
async def reorg_hashes(self, count):
@ -429,9 +422,7 @@ class BlockProcessor(electrumx.server.db.DB):
'''
flush_start = time.time()
# Backup FS (just move the pointers back)
self.fs_height = self.height
self.fs_tx_count = self.tx_count
self.backup_fs(self.height, self.tx_count)
# Backup history. self.touched can include other addresses
# which is harmless, but remove None.
@ -776,10 +767,6 @@ class BlockProcessor(electrumx.server.db.DB):
await self.notifications.on_block(set(), self.height)
# Reopen for serving
await self.open_for_serving()
# Populate the header merkle cache
length = max(1, self.height - self.env.reorg_limit)
await self.header_mc.initialize(length)
self.logger.info('populated header merkle cache')
async def _first_open_dbs(self):
await self.open_for_sync()

4
electrumx/server/chain_state.py

@ -25,6 +25,7 @@ class ChainState(object):
self.read_headers = self._bp.read_headers
self.all_utxos = self._bp.all_utxos
self.limited_history = self._bp.limited_history
self.header_branch_and_root = self._bp.header_branch_and_root
async def broadcast_transaction(self, raw_tx):
return await self._daemon.sendrawtransaction([raw_tx])
@ -43,9 +44,6 @@ class ChainState(object):
'db_height': self.db_height(),
}
async def header_branch_and_root(self, length, height):
return self._bp.header_mc.branch_and_root(length, height)
async def raw_header(self, height):
'''Return the binary header at the given height.'''
header, n = await self.read_headers(height, 1)

1
electrumx/server/controller.py

@ -108,6 +108,7 @@ class Controller(ServerBase):
await group.spawn(session_mgr.serve(serve_externally_event))
await group.spawn(bp.fetch_and_process_blocks(caught_up_event))
await caught_up_event.wait()
await group.spawn(bp.populate_header_merkle_cache())
await group.spawn(mempool.keep_synchronized(synchronized_event))
await synchronized_event.wait()
serve_externally_event.set()

26
electrumx/server/db.py

@ -12,6 +12,7 @@
import array
import ast
import os
import time
from bisect import bisect_right
from collections import namedtuple
from glob import glob
@ -21,6 +22,7 @@ from aiorpcx import run_in_thread
import electrumx.lib.util as util
from electrumx.lib.hash import hash_to_hex_str, HASHX_LEN
from electrumx.lib.merkle import Merkle, MerkleCache
from electrumx.server.storage import db_class
from electrumx.server.history import History
@ -63,6 +65,10 @@ class DB(object):
self.logger.info(f'using {self.env.db_engine} for DB backend')
# Header merkle cache
self.merkle = Merkle()
self.header_mc = MerkleCache(self.merkle, self.fs_block_hashes)
self.headers_file = util.LogicalFile('meta/headers', 2, 16000000)
self.tx_counts_file = util.LogicalFile('meta/txcounts', 2, 2000000)
self.hashes_file = util.LogicalFile('meta/hashes', 4, 16000000)
@ -130,6 +136,19 @@ class DB(object):
self.utxo_db = None
await self._open_dbs(False)
# Header merkle cache
async def populate_header_merkle_cache(self):
self.logger.info('populating header merkle cache...')
length = max(1, self.height - self.env.reorg_limit)
start = time.time()
await self.header_mc.initialize(length)
elapsed = time.time() - start
self.logger.info(f'header merkle cache populated in {elapsed:.1f}s')
async def header_branch_and_root(self, length, height):
return await self.header_mc.branch_and_root(length, height)
def fs_update_header_offsets(self, offset_start, height_start, headers):
if self.coin.STATIC_BLOCK_HEADERS:
return
@ -152,6 +171,13 @@ class DB(object):
return self.dynamic_header_offset(height + 1)\
- self.dynamic_header_offset(height)
def backup_fs(self, height, tx_count):
'''Back up during a reorg. This just updates our pointers.'''
self.fs_height = height
self.fs_tx_count = tx_count
# Truncate header_mc: header count is 1 more than the height.
self.header_mc.truncate(height + 1)
def fs_update(self, fs_height, headers, block_tx_hashes):
'''Write headers, the tx_count array and block tx hashes to disk.

Loading…
Cancel
Save