|
|
@ -7,14 +7,13 @@ |
|
|
|
|
|
|
|
'''Mempool handling.''' |
|
|
|
|
|
|
|
import asyncio |
|
|
|
import itertools |
|
|
|
import time |
|
|
|
from abc import ABC, abstractmethod |
|
|
|
from collections import defaultdict |
|
|
|
|
|
|
|
import attr |
|
|
|
from aiorpcx import TaskGroup, run_in_thread |
|
|
|
from aiorpcx import TaskGroup, run_in_thread, sleep |
|
|
|
|
|
|
|
from electrumx.lib.hash import hash_to_hex_str, hex_str_to_hash |
|
|
|
from electrumx.lib.util import class_logger, chunks |
|
|
@ -102,11 +101,19 @@ class MemPool(object): |
|
|
|
self.hashXs = defaultdict(set) # None can be a key |
|
|
|
self.cached_compact_histogram = [] |
|
|
|
|
|
|
|
async def _log_stats(self): |
|
|
|
async def _logging(self, synchronized_event): |
|
|
|
'''Print regular logs of mempool stats.''' |
|
|
|
self.logger.info('beginning processing of daemon mempool. ' |
|
|
|
'This can take some time...') |
|
|
|
start = time.time() |
|
|
|
await synchronized_event.wait() |
|
|
|
elapsed = time.time() - start |
|
|
|
self.logger.info(f'synced in {elapsed:.2f}s') |
|
|
|
while True: |
|
|
|
self.logger.info(f'{len(self.txs):,d} txs ' |
|
|
|
f'touching {len(self.hashXs):,d} addresses') |
|
|
|
await asyncio.sleep(120) |
|
|
|
await sleep(120) |
|
|
|
await synchronized_event.wait() |
|
|
|
|
|
|
|
def _update_histogram(self): |
|
|
|
# Build a histogram by fee rate |
|
|
@ -180,8 +187,8 @@ class MemPool(object): |
|
|
|
|
|
|
|
async def _refresh_hashes(self, synchronized_event): |
|
|
|
'''Refresh our view of the daemon's mempool.''' |
|
|
|
sleep = 5 |
|
|
|
histogram_refresh = self.coin.MEMPOOL_HISTOGRAM_REFRESH_SECS // sleep |
|
|
|
secs = 5 |
|
|
|
histogram_refresh = self.coin.MEMPOOL_HISTOGRAM_REFRESH_SECS // secs |
|
|
|
for loop_count in itertools.count(): |
|
|
|
height = self.api.cached_height() |
|
|
|
hex_hashes = await self.api.mempool_hashes() |
|
|
@ -190,11 +197,12 @@ class MemPool(object): |
|
|
|
hashes = set(hex_str_to_hash(hh) for hh in hex_hashes) |
|
|
|
touched = await self._process_mempool(hashes) |
|
|
|
synchronized_event.set() |
|
|
|
synchronized_event.clear() |
|
|
|
await self.api.on_mempool(touched, height) |
|
|
|
# Thread mempool histogram refreshes - they can be expensive |
|
|
|
if loop_count % histogram_refresh == 0: |
|
|
|
await run_in_thread(self._update_histogram) |
|
|
|
await asyncio.sleep(sleep) |
|
|
|
await sleep(secs) |
|
|
|
|
|
|
|
async def _process_mempool(self, all_hashes): |
|
|
|
# Re-sync with the new set of hashes |
|
|
@ -227,9 +235,6 @@ class MemPool(object): |
|
|
|
tx_map.update(deferred) |
|
|
|
utxo_map.update(unspent) |
|
|
|
|
|
|
|
# Handle the stragglers |
|
|
|
if len(tx_map) >= 10: |
|
|
|
self.logger.info(f'{len(tx_map)} stragglers') |
|
|
|
prior_count = 0 |
|
|
|
# FIXME: this is not particularly efficient |
|
|
|
while tx_map and len(tx_map) != prior_count: |
|
|
@ -286,19 +291,10 @@ class MemPool(object): |
|
|
|
# |
|
|
|
|
|
|
|
async def keep_synchronized(self, synchronized_event): |
|
|
|
'''Starts the mempool synchronizer. |
|
|
|
|
|
|
|
Waits for an initial synchronization before returning. |
|
|
|
''' |
|
|
|
self.logger.info('beginning processing of daemon mempool. ' |
|
|
|
'This can take some time...') |
|
|
|
async with TaskGroup() as group: |
|
|
|
'''Keep the mempool synchronized with the daemon.''' |
|
|
|
async with TaskGroup(wait=any) as group: |
|
|
|
await group.spawn(self._refresh_hashes(synchronized_event)) |
|
|
|
start = time.time() |
|
|
|
await synchronized_event.wait() |
|
|
|
elapsed = time.time() - start |
|
|
|
self.logger.info(f'synced in {elapsed:.2f}s') |
|
|
|
await group.spawn(self._log_stats()) |
|
|
|
await group.spawn(self._logging(synchronized_event)) |
|
|
|
|
|
|
|
async def balance_delta(self, hashX): |
|
|
|
'''Return the unconfirmed amount in the mempool for hashX. |
|
|
|