diff --git a/electrumx/server/block_processor.py b/electrumx/server/block_processor.py index a18708d..ac55d23 100644 --- a/electrumx/server/block_processor.py +++ b/electrumx/server/block_processor.py @@ -23,12 +23,17 @@ from electrumx.lib.util import chunks, formatted_time, class_logger import electrumx.server.db +RAW_BLOCKS, PREFETCHER_CAUGHT_UP, REORG_CHAIN = range(3) + + class Prefetcher(object): '''Prefetches blocks (in the forward direction only).''' - def __init__(self, bp): + def __init__(self, daemon, coin, queue): self.logger = class_logger(__name__, self.__class__.__name__) - self.bp = bp + self.daemon = daemon + self.coin = coin + self.queue = queue self.caught_up = False # Access to fetched_height should be protected by the semaphore self.fetched_height = None @@ -58,18 +63,19 @@ class Prefetcher(object): if self.cache_size < self.min_cache_size: self.refill_event.set() - async def reset_height(self): + async def reset_height(self, height): '''Reset to prefetch blocks from the block processor's height. Used in blockchain reorganisations. This coroutine can be - called asynchronously to the _prefetch coroutine so we must - synchronize with a semaphore.''' + called asynchronously to the _prefetch_blocks coroutine so we + must synchronize with a semaphore. + ''' async with self.semaphore: - self.fetched_height = self.bp.height + self.fetched_height = height self.refill_event.set() - daemon_height = await self.bp.daemon.height() - behind = daemon_height - self.bp.height + daemon_height = await self.daemon.height() + behind = daemon_height - height if behind > 0: self.logger.info('catching up to daemon height {:,d} ' '({:,d} blocks behind)' @@ -83,8 +89,8 @@ class Prefetcher(object): Repeats until the queue is full or caught up. ''' - daemon = self.bp.daemon - daemon_height = await daemon.height(self.bp._caught_up_event.is_set()) + daemon = self.daemon + daemon_height = await daemon.height() async with self.semaphore: while self.cache_size < self.min_cache_size: # Try and catch up all blocks but limit to room in cache. @@ -96,7 +102,7 @@ class Prefetcher(object): if not count: if not self.caught_up: self.caught_up = True - self.bp.on_prefetcher_first_caught_up() + await self.queue.put((PREFETCHER_CAUGHT_UP, )) return False first = self.fetched_height + 1 @@ -110,7 +116,7 @@ class Prefetcher(object): # Special handling for genesis block if first == 0: - blocks[0] = self.bp.coin.genesis_block(blocks[0]) + blocks[0] = self.coin.genesis_block(blocks[0]) self.logger.info('verified genesis block with hash {}' .format(hex_hashes[0])) @@ -121,7 +127,7 @@ class Prefetcher(object): else: self.ave_size = (size + (10 - count) * self.ave_size) // 10 - self.bp.on_prefetched_blocks(blocks, first) + await self.queue.put((RAW_BLOCKS, blocks, first)) self.cache_size += size self.fetched_height += count @@ -152,9 +158,10 @@ class BlockProcessor(electrumx.server.db.DB): self.tasks = tasks self.daemon = daemon + # Work queue + self.queue = asyncio.Queue() self._caught_up_event = asyncio.Event() - self.task_queue = asyncio.Queue() - self.prefetcher = Prefetcher(self) + self.prefetcher = Prefetcher(daemon, env.coin, self.queue) # Meta self.cache_MB = env.cache_MB @@ -185,17 +192,6 @@ class BlockProcessor(electrumx.server.db.DB): '''Add the task to our task queue.''' self.task_queue.put_nowait(task) - def on_prefetched_blocks(self, blocks, first): - '''Called by the prefetcher when it has prefetched some blocks.''' - self.add_task(partial(self.check_and_advance_blocks, blocks, first)) - - def on_prefetcher_first_caught_up(self): - '''Called by the prefetcher when it first catches up.''' - # Process after prior tasks (blocks) are completed. - async def set_event(): - self._caught_up_event.set() - self.add_task(set_event) - def add_new_block_callback(self, callback): '''Add a function called when a new block is found. @@ -247,7 +243,7 @@ class BlockProcessor(electrumx.server.db.DB): # just to reset the prefetcher and try again. self.logger.warning('daemon blocks do not form a chain; ' 'resetting the prefetcher') - await self.prefetcher.reset_height() + await self.prefetcher.reset_height(self.height) async def reorg_chain(self, count=None): '''Handle a chain reorganisation. @@ -280,7 +276,7 @@ class BlockProcessor(electrumx.server.db.DB): last -= len(raw_blocks) # Truncate header_mc: header count is 1 more than the height self.header_mc.truncate(self.height + 1) - await self.prefetcher.reset_height() + await self.prefetcher.reset_height(self.height) async def reorg_hashes(self, count): '''Return a pair (start, hashes) of blocks to back up during a @@ -760,8 +756,15 @@ class BlockProcessor(electrumx.server.db.DB): async def _process_queue(self): '''Loop forever processing enqueued work.''' while True: - task = await self.task_queue.get() - await task() + work, *args = await self.queue.get() + if work == RAW_BLOCKS: + raw_blocks, first = args + await self.check_and_advance_blocks(raw_blocks, first) + elif work == PREFETCHER_CAUGHT_UP: + self._caught_up_event.set() + elif work == REORG_CHAIN: + count, = args + await self.reorg_chain(count) def _on_dbs_opened(self): # An incomplete compaction needs to be cancelled otherwise @@ -790,7 +793,7 @@ class BlockProcessor(electrumx.server.db.DB): self._on_dbs_opened() # Get the prefetcher running self.tasks.create_task(self.prefetcher.main_loop()) - await self.prefetcher.reset_height() + await self.prefetcher.reset_height(self.height) # Start our loop that processes blocks as they are fetched self.worker_task = self.tasks.create_task(self._process_queue()) # Wait until caught up @@ -816,7 +819,7 @@ class BlockProcessor(electrumx.server.db.DB): Returns True if a reorg is queued, false if not caught up. ''' if self._caught_up_event.is_set(): - self.add_task(partial(self.reorg_chain, count=count)) + self.queue.put_nowait((REORG_CHAIN, count)) return True return False diff --git a/electrumx/server/chain_state.py b/electrumx/server/chain_state.py index 0a3f6b7..7722848 100644 --- a/electrumx/server/chain_state.py +++ b/electrumx/server/chain_state.py @@ -6,6 +6,7 @@ # and warranty status of this software. +import asyncio import pylru from electrumx.server.mempool import MemPool @@ -108,5 +109,9 @@ class ChainState(object): async def wait_for_mempool(self): await self.bp.catch_up_to_daemon() - self.tasks.create_task(self.mempool.main_loop()) - await self.mempool.synchronized_event.wait() + # Tell the daemon to fetch the mempool going forwards, trigger + # an initial fetch, and wait for the mempool to synchronize + mempool_refresh_event = asyncio.Event() + daemon._mempool_refresh_event = mempool_refresh_event + self.tasks.create_task(self.daemon.height()) + await self.mempool.start_and_wait(mempool_refresh_event) diff --git a/electrumx/server/daemon.py b/electrumx/server/daemon.py index 35c322b..8f4ccaa 100644 --- a/electrumx/server/daemon.py +++ b/electrumx/server/daemon.py @@ -41,7 +41,7 @@ class Daemon(object): self.set_urls(env.coin.daemon_urls(env.daemon_url)) self._height = None self._mempool_hashes = set() - self.mempool_refresh_event = asyncio.Event() + self._mempool_refresh_event = None # Limit concurrent RPC calls to this number. # See DEFAULT_HTTP_WORKQUEUE in bitcoind, which is typically 16 self.workqueue_semaphore = asyncio.Semaphore(value=10) @@ -281,12 +281,12 @@ class Daemon(object): '''Broadcast a transaction to the network.''' return await self._send_single('sendrawtransaction', params) - async def height(self, mempool=False): + async def height(self): '''Query the daemon for its current height.''' self._height = await self._send_single('getblockcount') - if mempool: + if self._mempool_refresh_event: self._mempool_hashes = set(await self.mempool_hashes()) - self.mempool_refresh_event.set() + self._mempool_refresh_event.set() return self._height def cached_mempool_hashes(self): diff --git a/electrumx/server/mempool.py b/electrumx/server/mempool.py index d75fb01..b2b12cb 100644 --- a/electrumx/server/mempool.py +++ b/electrumx/server/mempool.py @@ -41,12 +41,21 @@ class MemPool(object): self.stop = False self.txs = {} self.hashXs = defaultdict(set) # None can be a key - self.synchronized_event = asyncio.Event() self.fee_histogram = defaultdict(int) self.compact_fee_histogram = [] self.histogram_time = 0 add_new_block_callback(self.on_new_block) + async def start_and_wait(self, mempool_refresh_event): + '''Creates the mempool synchronization task, and waits for it to + first synchronize before returning.''' + self.logger.info('beginning processing of daemon mempool. ' + 'This can take some time...') + synchronized = asyncio.Event() + self.tasks.create_task(self._synchronize( + mempool_refresh_event, synchronized)) + await synchronized.wait() + def _resync_daemon_hashes(self, unprocessed, unfetched): '''Re-sync self.txs with the list of hashes in the daemon's mempool. @@ -83,21 +92,17 @@ class MemPool(object): for hex_hash in new: txs[hex_hash] = None - async def main_loop(self): + async def _synchronize(self, mempool_refresh_event, synchronized): '''Asynchronously maintain mempool status with daemon. - Processes the mempool each time the daemon's mempool refresh - event is signalled. + Processes the mempool each time the mempool refresh event is + signalled. ''' unprocessed = {} unfetched = set() txs = self.txs fetch_size = 800 process_some = self._async_process_some(fetch_size // 2) - - self.logger.info('beginning processing of daemon mempool. ' - 'This can take some time...') - await self.chain_state.mempool_refresh_event.wait() next_log = 0 loops = -1 # Zero during initial catchup @@ -116,7 +121,7 @@ class MemPool(object): if not todo: loops += 1 if loops > 0: - self.synchronized_event.set() + synchronized.set() now = time.time() if now >= next_log and loops: self.logger.info('{:,d} txs touching {:,d} addresses' @@ -125,10 +130,10 @@ class MemPool(object): try: if not todo: - await self.chain_state.mempool_refresh_event.wait() + await mempool_refresh_event.wait() self._resync_daemon_hashes(unprocessed, unfetched) - self.chain_state.mempool_refresh_event.clear() + mempool_refresh_event.clear() if unfetched: count = min(len(unfetched), fetch_size)