From f20fe9d7a5ea05d727e4ca08b5b1aa92df516428 Mon Sep 17 00:00:00 2001 From: Neil Booth <kyuupichan@gmail.com> Date: Fri, 10 Aug 2018 21:32:31 +0900 Subject: [PATCH] Tweak mempool logging --- electrumx/server/mempool.py | 40 +++++++++++++++++-------------------- 1 file changed, 18 insertions(+), 22 deletions(-) diff --git a/electrumx/server/mempool.py b/electrumx/server/mempool.py index 67305a0..67e3826 100644 --- a/electrumx/server/mempool.py +++ b/electrumx/server/mempool.py @@ -7,14 +7,13 @@ '''Mempool handling.''' -import asyncio import itertools import time from abc import ABC, abstractmethod from collections import defaultdict import attr -from aiorpcx import TaskGroup, run_in_thread +from aiorpcx import TaskGroup, run_in_thread, sleep from electrumx.lib.hash import hash_to_hex_str, hex_str_to_hash from electrumx.lib.util import class_logger, chunks @@ -102,11 +101,19 @@ class MemPool(object): self.hashXs = defaultdict(set) # None can be a key self.cached_compact_histogram = [] - async def _log_stats(self): + async def _logging(self, synchronized_event): + '''Print regular logs of mempool stats.''' + self.logger.info('beginning processing of daemon mempool. ' + 'This can take some time...') + start = time.time() + await synchronized_event.wait() + elapsed = time.time() - start + self.logger.info(f'synced in {elapsed:.2f}s') while True: self.logger.info(f'{len(self.txs):,d} txs ' f'touching {len(self.hashXs):,d} addresses') - await asyncio.sleep(120) + await sleep(120) + await synchronized_event.wait() def _update_histogram(self): # Build a histogram by fee rate @@ -180,8 +187,8 @@ class MemPool(object): async def _refresh_hashes(self, synchronized_event): '''Refresh our view of the daemon's mempool.''' - sleep = 5 - histogram_refresh = self.coin.MEMPOOL_HISTOGRAM_REFRESH_SECS // sleep + secs = 5 + histogram_refresh = self.coin.MEMPOOL_HISTOGRAM_REFRESH_SECS // secs for loop_count in itertools.count(): height = self.api.cached_height() hex_hashes = await self.api.mempool_hashes() @@ -190,11 +197,12 @@ class MemPool(object): hashes = set(hex_str_to_hash(hh) for hh in hex_hashes) touched = await self._process_mempool(hashes) synchronized_event.set() + synchronized_event.clear() await self.api.on_mempool(touched, height) # Thread mempool histogram refreshes - they can be expensive if loop_count % histogram_refresh == 0: await run_in_thread(self._update_histogram) - await asyncio.sleep(sleep) + await sleep(secs) async def _process_mempool(self, all_hashes): # Re-sync with the new set of hashes @@ -227,9 +235,6 @@ class MemPool(object): tx_map.update(deferred) utxo_map.update(unspent) - # Handle the stragglers - if len(tx_map) >= 10: - self.logger.info(f'{len(tx_map)} stragglers') prior_count = 0 # FIXME: this is not particularly efficient while tx_map and len(tx_map) != prior_count: @@ -286,19 +291,10 @@ class MemPool(object): # async def keep_synchronized(self, synchronized_event): - '''Starts the mempool synchronizer. - - Waits for an initial synchronization before returning. - ''' - self.logger.info('beginning processing of daemon mempool. ' - 'This can take some time...') - async with TaskGroup() as group: + '''Keep the mempool synchronized with the daemon.''' + async with TaskGroup(wait=any) as group: await group.spawn(self._refresh_hashes(synchronized_event)) - start = time.time() - await synchronized_event.wait() - elapsed = time.time() - start - self.logger.info(f'synced in {elapsed:.2f}s') - await group.spawn(self._log_stats()) + await group.spawn(self._logging(synchronized_event)) async def balance_delta(self, hashX): '''Return the unconfirmed amount in the mempool for hashX.