diff --git a/electrumx/server/chain_state.py b/electrumx/server/chain_state.py index 78e0b13..cbf3a3a 100644 --- a/electrumx/server/chain_state.py +++ b/electrumx/server/chain_state.py @@ -9,31 +9,21 @@ import asyncio import pylru -from electrumx.server.mempool import MemPool - class ChainState(object): '''Used as an interface by servers to request information about blocks, transaction history, UTXOs and the mempool. ''' - def __init__(self, env, tasks, notifications): + def __init__(self, env, tasks, daemon, bp, notifications): self._env = env self._tasks = tasks - self._daemon = env.coin.DAEMON(env) - BlockProcessor = env.coin.BLOCK_PROCESSOR - self._bp = BlockProcessor(env, tasks, self._daemon, notifications) - self._mempool = MemPool(env.coin, tasks, self._daemon, self, - notifications) + self._daemon = daemon + self._bp = bp self._history_cache = pylru.lrucache(256) # External interface pass-throughs for session.py self.force_chain_reorg = self._bp.force_chain_reorg - self.mempool_fee_histogram = self._mempool.get_fee_histogram - self.mempool_get_utxos = self._mempool.get_utxos - self.mempool_potential_spends = self._mempool.potential_spends - self.mempool_transactions = self._mempool.transactions - self.mempool_value = self._mempool.value self.tx_branch_and_root = self._bp.merkle.branch_and_root self.read_headers = self._bp.read_headers # Cache maintenance @@ -105,7 +95,3 @@ class ChainState(object): async def shutdown(self): '''Shut down the block processor to flush chain state to disk.''' await self._bp.shutdown() - - async def wait_for_mempool(self): - await self._bp.catch_up_to_daemon() - await self._mempool.start_and_wait_for_sync() diff --git a/electrumx/server/controller.py b/electrumx/server/controller.py index 7c766ef..9144664 100644 --- a/electrumx/server/controller.py +++ b/electrumx/server/controller.py @@ -11,6 +11,7 @@ import electrumx from electrumx.lib.server_base import ServerBase from electrumx.lib.util import version_string from electrumx.server.chain_state import ChainState +from electrumx.server.mempool import MemPool from electrumx.server.peers import PeerManager from electrumx.server.session import SessionManager @@ -94,18 +95,25 @@ class Controller(ServerBase): self.logger.info(f'reorg limit is {env.reorg_limit:,d} blocks') notifications = Notifications() - self.chain_state = ChainState(env, self.tasks, notifications) + daemon = env.coin.DAEMON(env) + BlockProcessor = env.coin.BLOCK_PROCESSOR + self.bp = BlockProcessor(env, self.tasks, daemon, notifications) + self.mempool = MemPool(env.coin, self.tasks, daemon, notifications, + self.bp.db_utxo_lookup) + self.chain_state = ChainState(env, self.tasks, daemon, self.bp, + notifications) self.peer_mgr = PeerManager(env, self.tasks, self.chain_state) self.session_mgr = SessionManager(env, self.tasks, self.chain_state, - self.peer_mgr, notifications, - self.shutdown_event) + self.mempool, self.peer_mgr, + notifications, self.shutdown_event) async def start_servers(self): '''Start the RPC server and wait for the mempool to synchronize. Then start the peer manager and serving external clients. ''' self.session_mgr.start_rpc_server() - await self.chain_state.wait_for_mempool() + await self.bp.catch_up_to_daemon() + await self.mempool.start_and_wait_for_sync() self.peer_mgr.start_peer_discovery() self.session_mgr.start_serving() diff --git a/electrumx/server/mempool.py b/electrumx/server/mempool.py index d03b897..e08b283 100644 --- a/electrumx/server/mempool.py +++ b/electrumx/server/mempool.py @@ -36,22 +36,15 @@ class MemPool(object): self.coin = coin self.utxo_lookup = utxo_lookup self.tasks = tasks + self.daemon = daemon self.notifications = notifications self.txs = {} self.hashXs = defaultdict(set) # None can be a key self.fee_histogram = defaultdict(int) - self.compact_fee_histogram = [] + self.cached_compact_histogram = [] self.histogram_time = 0 self.next_log = 0 - async def start_and_wait_for_sync(self): - '''Creates the mempool synchronization task, and waits for it to - first synchronize before returning.''' - self.logger.info('beginning processing of daemon mempool. ' - 'This can take some time...') - await self._synchronize(True) - self.tasks.create_task(self._synchronize_forever()) - async def _synchronize_forever(self): while True: await asyncio.sleep(5) @@ -63,7 +56,7 @@ class MemPool(object): height = self.daemon.cached_height() while True: hashes = await self.daemon.mempool_hashes() - later_height = await daemon.height() + later_height = await self.daemon.height() if height == later_height: return set(hashes), height height = later_height @@ -104,7 +97,7 @@ class MemPool(object): if unfetched: count = min(len(unfetched), fetch_size) hex_hashes = [unfetched.pop() for n in range(count)] - unprocessed.update(await self.fetch_raw_txs(hex_hashes)) + unprocessed.update(await self._fetch_raw_txs(hex_hashes)) if unprocessed: await process_some(unprocessed, touched) @@ -168,7 +161,7 @@ class MemPool(object): pending = [] result, deferred = await self.tasks.run_in_thread( - self.process_raw_txs, raw_txs, deferred) + self._process_raw_txs, raw_txs, deferred) pending.extend(deferred) hashXs = self.hashXs @@ -185,7 +178,7 @@ class MemPool(object): return process - async def fetch_raw_txs(self, hex_hashes): + async def _fetch_raw_txs(self, hex_hashes): '''Fetch a list of mempool transactions.''' raw_txs = await self.daemon.getrawtransactions(hex_hashes) @@ -193,7 +186,7 @@ class MemPool(object): # evicted or they got in a block. return {hh: raw for hh, raw in zip(hex_hashes, raw_txs) if raw} - def process_raw_txs(self, raw_tx_map, pending): + def _process_raw_txs(self, raw_tx_map, pending): '''Process the dictionary of raw transactions and return a dictionary of updates to apply to self.txs. @@ -264,7 +257,7 @@ class MemPool(object): return result, deferred - async def raw_transactions(self, hashX): + async def _raw_transactions(self, hashX): '''Returns an iterable of (hex_hash, raw_tx) pairs for all transactions in the mempool that touch hashX. @@ -278,14 +271,85 @@ class MemPool(object): raw_txs = await self.daemon.getrawtransactions(hex_hashes) return zip(hex_hashes, raw_txs) - async def transactions(self, hashX): - '''Generate (hex_hash, tx_fee, unconfirmed) tuples for mempool - entries for the hashX. + def _calc_compact_histogram(self): + # For efficiency, get_fees returns a compact histogram with + # variable bin size. The compact histogram is an array of + # (fee, vsize) values. vsize_n is the cumulative virtual size + # of mempool transactions with a fee rate in the interval + # [fee_(n-1), fee_n)], and fee_(n-1) > fee_n. Fee intervals + # are chosen so as to create tranches that contain at least + # 100kb of transactions + out = [] + size = 0 + r = 0 + binsize = 100000 + for fee, s in sorted(self.fee_histogram.items(), reverse=True): + size += s + if size + r > binsize: + out.append((fee, size)) + r += size - binsize + size = 0 + binsize *= 1.1 + return out + + # External interface + async def start_and_wait_for_sync(self): + '''Starts the mempool synchronizer. + + Waits for an initial synchronization before returning. + ''' + self.logger.info('beginning processing of daemon mempool. ' + 'This can take some time...') + await self._synchronize(True) + self.tasks.create_task(self._synchronize_forever()) + + async def balance_delta(self, hashX): + '''Return the unconfirmed amount in the mempool for hashX. + + Can be positive or negative. + ''' + value = 0 + # hashXs is a defaultdict + if hashX in self.hashXs: + for hex_hash in self.hashXs[hashX]: + txin_pairs, txout_pairs, tx_fee, tx_size = self.txs[hex_hash] + value -= sum(v for h168, v in txin_pairs if h168 == hashX) + value += sum(v for h168, v in txout_pairs if h168 == hashX) + return value + + async def compact_fee_histogram(self): + '''Return a compact fee histogram of the current mempool.''' + now = time.time() + if now > self.histogram_time: + self.histogram_time = now + 30 + self.cached_compact_histogram = self._calc_compact_histogram() + return self.cached_compact_histogram + + async def potential_spends(self, hashX): + '''Return a set of (prev_hash, prev_idx) pairs from mempool + transactions that touch hashX. + + None, some or all of these may be spends of the hashX. + ''' + deserializer = self.coin.DESERIALIZER + pairs = await self._raw_transactions(hashX) + result = set() + for hex_hash, raw_tx in pairs: + if not raw_tx: + continue + tx = deserializer(raw_tx).read_tx() + for txin in tx.inputs: + result.add((txin.prev_hash, txin.prev_idx)) + return result + + async def transaction_summaries(self, hashX): + '''Return a list of (tx_hex_hash, tx_fee, unconfirmed) tuples for + mempool entries for the hashX. unconfirmed is True if any txin is unconfirmed. ''' deserializer = self.coin.DESERIALIZER - pairs = await self.raw_transactions(hashX) + pairs = await self._raw_transactions(hashX) result = [] for hex_hash, raw_tx in pairs: item = self.txs.get(hex_hash) @@ -298,7 +362,7 @@ class MemPool(object): result.append((hex_hash, tx_fee, unconfirmed)) return result - def get_utxos(self, hashX): + async def unordered_UTXOs(self, hashX): '''Return an unordered list of UTXO named tuples from mempool transactions that pay to hashX. @@ -318,63 +382,3 @@ class MemPool(object): utxos.append(UTXO(-1, pos, hex_str_to_hash(hex_hash), 0, value)) return utxos - - async def potential_spends(self, hashX): - '''Return a set of (prev_hash, prev_idx) pairs from mempool - transactions that touch hashX. - - None, some or all of these may be spends of the hashX. - ''' - deserializer = self.coin.DESERIALIZER - pairs = await self.raw_transactions(hashX) - result = set() - for hex_hash, raw_tx in pairs: - if not raw_tx: - continue - tx = deserializer(raw_tx).read_tx() - for txin in tx.inputs: - result.add((txin.prev_hash, txin.prev_idx)) - return result - - def value(self, hashX): - '''Return the unconfirmed amount in the mempool for hashX. - - Can be positive or negative. - ''' - value = 0 - # hashXs is a defaultdict - if hashX in self.hashXs: - for hex_hash in self.hashXs[hashX]: - txin_pairs, txout_pairs, tx_fee, tx_size = self.txs[hex_hash] - value -= sum(v for h168, v in txin_pairs if h168 == hashX) - value += sum(v for h168, v in txout_pairs if h168 == hashX) - return value - - def get_fee_histogram(self): - now = time.time() - if now > self.histogram_time + 30: - self.update_compact_histogram() - self.histogram_time = now - return self.compact_fee_histogram - - def update_compact_histogram(self): - # For efficiency, get_fees returns a compact histogram with - # variable bin size. The compact histogram is an array of - # (fee, vsize) values. vsize_n is the cumulative virtual size - # of mempool transactions with a fee rate in the interval - # [fee_(n-1), fee_n)], and fee_(n-1) > fee_n. Fee intervals - # are chosen so as to create tranches that contain at least - # 100kb of transactions - items = list(reversed(sorted(self.fee_histogram.items()))) - out = [] - size = 0 - r = 0 - binsize = 100000 - for fee, s in items: - size += s - if size + r > binsize: - out.append((fee, size)) - r += size - binsize - size = 0 - binsize *= 1.1 - self.compact_fee_histogram = out diff --git a/electrumx/server/session.py b/electrumx/server/session.py index 73c5355..8e7f958 100644 --- a/electrumx/server/session.py +++ b/electrumx/server/session.py @@ -97,12 +97,13 @@ class SessionManager(object): CATCHING_UP, LISTENING, PAUSED, SHUTTING_DOWN = range(4) - def __init__(self, env, tasks, chain_state, peer_mgr, notifications, - shutdown_event): + def __init__(self, env, tasks, chain_state, mempool, peer_mgr, + notifications, shutdown_event): env.max_send = max(350000, env.max_send) self.env = env self.tasks = tasks self.chain_state = chain_state + self.mempool = mempool self.peer_mgr = peer_mgr self.shutdown_event = shutdown_event self.logger = util.class_logger(__name__, self.__class__.__name__) @@ -139,7 +140,7 @@ class SessionManager(object): else: protocol_class = self.env.coin.SESSIONCLS protocol_factory = partial(protocol_class, self, self.chain_state, - self.peer_mgr, kind) + self.mempool, self.peer_mgr, kind) server = loop.create_server(protocol_factory, *args, **kw_args) host, port = args[:2] @@ -476,11 +477,12 @@ class SessionBase(ServerSession): MAX_CHUNK_SIZE = 2016 session_counter = itertools.count() - def __init__(self, session_mgr, chain_state, peer_mgr, kind): + def __init__(self, session_mgr, chain_state, mempool, peer_mgr, kind): super().__init__(rpc_protocol=JSONRPCAutoDetect) self.logger = util.class_logger(__name__, self.__class__.__name__) self.session_mgr = session_mgr self.chain_state = chain_state + self.mempool = mempool self.peer_mgr = peer_mgr self.kind = kind # 'RPC', 'TCP' etc. self.env = session_mgr.env @@ -727,7 +729,7 @@ class ElectrumX(SessionBase): # Note history is ordered and mempool unordered in electrum-server # For mempool, height is -1 if unconfirmed txins, otherwise 0 history = await self.chain_state.get_history(hashX) - mempool = await self.chain_state.mempool_transactions(hashX) + mempool = await self.mempool.transaction_summaries(hashX) status = ''.join('{}:{:d}:'.format(hash_to_hex_str(tx_hash), height) for tx_hash, height in history) @@ -750,8 +752,8 @@ class ElectrumX(SessionBase): effects.''' utxos = await self.chain_state.get_utxos(hashX) utxos = sorted(utxos) - utxos.extend(self.chain_state.mempool_get_utxos(hashX)) - spends = await self.chain_state.mempool_potential_spends(hashX) + utxos.extend(await self.mempool.unordered_UTXOs(hashX)) + spends = await self.mempool.potential_spends(hashX) return [{'tx_hash': hash_to_hex_str(utxo.tx_hash), 'tx_pos': utxo.tx_pos, @@ -807,7 +809,7 @@ class ElectrumX(SessionBase): async def get_balance(self, hashX): utxos = await self.chain_state.get_utxos(hashX) confirmed = sum(utxo.value for utxo in utxos) - unconfirmed = self.chain_state.mempool_value(hashX) + unconfirmed = await self.mempool.balance_delta(hashX) return {'confirmed': confirmed, 'unconfirmed': unconfirmed} async def scripthash_get_balance(self, scripthash): @@ -818,7 +820,7 @@ class ElectrumX(SessionBase): async def unconfirmed_history(self, hashX): # Note unconfirmed history is unordered in electrum-server # Height is -1 if unconfirmed txins, otherwise 0 - mempool = await self.chain_state.mempool_transactions(hashX) + mempool = await self.mempool.transaction_summaries(hashX) return [{'tx_hash': tx_hash, 'height': -unconfirmed, 'fee': fee} for tx_hash, fee, unconfirmed in mempool] @@ -972,10 +974,6 @@ class ElectrumX(SessionBase): return banner - def mempool_get_fee_histogram(self): - '''Memory pool fee histogram.''' - return self.chain_state.mempool_fee_histogram() - async def relayfee(self): '''The minimum fee a low-priority tx must pay in order to be accepted to the daemon's memory pool.''' @@ -1150,7 +1148,8 @@ class ElectrumX(SessionBase): if ptuple >= (1, 2): # New handler as of 1.2 handlers.update({ - 'mempool.get_fee_histogram': self.mempool_get_fee_histogram, + 'mempool.get_fee_histogram': + self.mempool.compact_fee_histogram, 'blockchain.block.headers': self.block_headers_12, 'server.ping': self.ping, })