Browse Source

Clarify mempool's exported interface

patch-2
Neil Booth 7 years ago
parent
commit
2c51b127de
  1. 20
      electrumx/server/chain_state.py
  2. 16
      electrumx/server/controller.py
  3. 164
      electrumx/server/mempool.py
  4. 27
      electrumx/server/session.py

20
electrumx/server/chain_state.py

@ -9,31 +9,21 @@
import asyncio
import pylru
from electrumx.server.mempool import MemPool
class ChainState(object):
'''Used as an interface by servers to request information about
blocks, transaction history, UTXOs and the mempool.
'''
def __init__(self, env, tasks, notifications):
def __init__(self, env, tasks, daemon, bp, notifications):
self._env = env
self._tasks = tasks
self._daemon = env.coin.DAEMON(env)
BlockProcessor = env.coin.BLOCK_PROCESSOR
self._bp = BlockProcessor(env, tasks, self._daemon, notifications)
self._mempool = MemPool(env.coin, tasks, self._daemon, self,
notifications)
self._daemon = daemon
self._bp = bp
self._history_cache = pylru.lrucache(256)
# External interface pass-throughs for session.py
self.force_chain_reorg = self._bp.force_chain_reorg
self.mempool_fee_histogram = self._mempool.get_fee_histogram
self.mempool_get_utxos = self._mempool.get_utxos
self.mempool_potential_spends = self._mempool.potential_spends
self.mempool_transactions = self._mempool.transactions
self.mempool_value = self._mempool.value
self.tx_branch_and_root = self._bp.merkle.branch_and_root
self.read_headers = self._bp.read_headers
# Cache maintenance
@ -105,7 +95,3 @@ class ChainState(object):
async def shutdown(self):
'''Shut down the block processor to flush chain state to disk.'''
await self._bp.shutdown()
async def wait_for_mempool(self):
await self._bp.catch_up_to_daemon()
await self._mempool.start_and_wait_for_sync()

16
electrumx/server/controller.py

@ -11,6 +11,7 @@ import electrumx
from electrumx.lib.server_base import ServerBase
from electrumx.lib.util import version_string
from electrumx.server.chain_state import ChainState
from electrumx.server.mempool import MemPool
from electrumx.server.peers import PeerManager
from electrumx.server.session import SessionManager
@ -94,18 +95,25 @@ class Controller(ServerBase):
self.logger.info(f'reorg limit is {env.reorg_limit:,d} blocks')
notifications = Notifications()
self.chain_state = ChainState(env, self.tasks, notifications)
daemon = env.coin.DAEMON(env)
BlockProcessor = env.coin.BLOCK_PROCESSOR
self.bp = BlockProcessor(env, self.tasks, daemon, notifications)
self.mempool = MemPool(env.coin, self.tasks, daemon, notifications,
self.bp.db_utxo_lookup)
self.chain_state = ChainState(env, self.tasks, daemon, self.bp,
notifications)
self.peer_mgr = PeerManager(env, self.tasks, self.chain_state)
self.session_mgr = SessionManager(env, self.tasks, self.chain_state,
self.peer_mgr, notifications,
self.shutdown_event)
self.mempool, self.peer_mgr,
notifications, self.shutdown_event)
async def start_servers(self):
'''Start the RPC server and wait for the mempool to synchronize. Then
start the peer manager and serving external clients.
'''
self.session_mgr.start_rpc_server()
await self.chain_state.wait_for_mempool()
await self.bp.catch_up_to_daemon()
await self.mempool.start_and_wait_for_sync()
self.peer_mgr.start_peer_discovery()
self.session_mgr.start_serving()

164
electrumx/server/mempool.py

@ -36,22 +36,15 @@ class MemPool(object):
self.coin = coin
self.utxo_lookup = utxo_lookup
self.tasks = tasks
self.daemon = daemon
self.notifications = notifications
self.txs = {}
self.hashXs = defaultdict(set) # None can be a key
self.fee_histogram = defaultdict(int)
self.compact_fee_histogram = []
self.cached_compact_histogram = []
self.histogram_time = 0
self.next_log = 0
async def start_and_wait_for_sync(self):
'''Creates the mempool synchronization task, and waits for it to
first synchronize before returning.'''
self.logger.info('beginning processing of daemon mempool. '
'This can take some time...')
await self._synchronize(True)
self.tasks.create_task(self._synchronize_forever())
async def _synchronize_forever(self):
while True:
await asyncio.sleep(5)
@ -63,7 +56,7 @@ class MemPool(object):
height = self.daemon.cached_height()
while True:
hashes = await self.daemon.mempool_hashes()
later_height = await daemon.height()
later_height = await self.daemon.height()
if height == later_height:
return set(hashes), height
height = later_height
@ -104,7 +97,7 @@ class MemPool(object):
if unfetched:
count = min(len(unfetched), fetch_size)
hex_hashes = [unfetched.pop() for n in range(count)]
unprocessed.update(await self.fetch_raw_txs(hex_hashes))
unprocessed.update(await self._fetch_raw_txs(hex_hashes))
if unprocessed:
await process_some(unprocessed, touched)
@ -168,7 +161,7 @@ class MemPool(object):
pending = []
result, deferred = await self.tasks.run_in_thread(
self.process_raw_txs, raw_txs, deferred)
self._process_raw_txs, raw_txs, deferred)
pending.extend(deferred)
hashXs = self.hashXs
@ -185,7 +178,7 @@ class MemPool(object):
return process
async def fetch_raw_txs(self, hex_hashes):
async def _fetch_raw_txs(self, hex_hashes):
'''Fetch a list of mempool transactions.'''
raw_txs = await self.daemon.getrawtransactions(hex_hashes)
@ -193,7 +186,7 @@ class MemPool(object):
# evicted or they got in a block.
return {hh: raw for hh, raw in zip(hex_hashes, raw_txs) if raw}
def process_raw_txs(self, raw_tx_map, pending):
def _process_raw_txs(self, raw_tx_map, pending):
'''Process the dictionary of raw transactions and return a dictionary
of updates to apply to self.txs.
@ -264,7 +257,7 @@ class MemPool(object):
return result, deferred
async def raw_transactions(self, hashX):
async def _raw_transactions(self, hashX):
'''Returns an iterable of (hex_hash, raw_tx) pairs for all
transactions in the mempool that touch hashX.
@ -278,14 +271,85 @@ class MemPool(object):
raw_txs = await self.daemon.getrawtransactions(hex_hashes)
return zip(hex_hashes, raw_txs)
async def transactions(self, hashX):
'''Generate (hex_hash, tx_fee, unconfirmed) tuples for mempool
entries for the hashX.
def _calc_compact_histogram(self):
# For efficiency, get_fees returns a compact histogram with
# variable bin size. The compact histogram is an array of
# (fee, vsize) values. vsize_n is the cumulative virtual size
# of mempool transactions with a fee rate in the interval
# [fee_(n-1), fee_n)], and fee_(n-1) > fee_n. Fee intervals
# are chosen so as to create tranches that contain at least
# 100kb of transactions
out = []
size = 0
r = 0
binsize = 100000
for fee, s in sorted(self.fee_histogram.items(), reverse=True):
size += s
if size + r > binsize:
out.append((fee, size))
r += size - binsize
size = 0
binsize *= 1.1
return out
# External interface
async def start_and_wait_for_sync(self):
'''Starts the mempool synchronizer.
Waits for an initial synchronization before returning.
'''
self.logger.info('beginning processing of daemon mempool. '
'This can take some time...')
await self._synchronize(True)
self.tasks.create_task(self._synchronize_forever())
async def balance_delta(self, hashX):
'''Return the unconfirmed amount in the mempool for hashX.
Can be positive or negative.
'''
value = 0
# hashXs is a defaultdict
if hashX in self.hashXs:
for hex_hash in self.hashXs[hashX]:
txin_pairs, txout_pairs, tx_fee, tx_size = self.txs[hex_hash]
value -= sum(v for h168, v in txin_pairs if h168 == hashX)
value += sum(v for h168, v in txout_pairs if h168 == hashX)
return value
async def compact_fee_histogram(self):
'''Return a compact fee histogram of the current mempool.'''
now = time.time()
if now > self.histogram_time:
self.histogram_time = now + 30
self.cached_compact_histogram = self._calc_compact_histogram()
return self.cached_compact_histogram
async def potential_spends(self, hashX):
'''Return a set of (prev_hash, prev_idx) pairs from mempool
transactions that touch hashX.
None, some or all of these may be spends of the hashX.
'''
deserializer = self.coin.DESERIALIZER
pairs = await self._raw_transactions(hashX)
result = set()
for hex_hash, raw_tx in pairs:
if not raw_tx:
continue
tx = deserializer(raw_tx).read_tx()
for txin in tx.inputs:
result.add((txin.prev_hash, txin.prev_idx))
return result
async def transaction_summaries(self, hashX):
'''Return a list of (tx_hex_hash, tx_fee, unconfirmed) tuples for
mempool entries for the hashX.
unconfirmed is True if any txin is unconfirmed.
'''
deserializer = self.coin.DESERIALIZER
pairs = await self.raw_transactions(hashX)
pairs = await self._raw_transactions(hashX)
result = []
for hex_hash, raw_tx in pairs:
item = self.txs.get(hex_hash)
@ -298,7 +362,7 @@ class MemPool(object):
result.append((hex_hash, tx_fee, unconfirmed))
return result
def get_utxos(self, hashX):
async def unordered_UTXOs(self, hashX):
'''Return an unordered list of UTXO named tuples from mempool
transactions that pay to hashX.
@ -318,63 +382,3 @@ class MemPool(object):
utxos.append(UTXO(-1, pos, hex_str_to_hash(hex_hash),
0, value))
return utxos
async def potential_spends(self, hashX):
'''Return a set of (prev_hash, prev_idx) pairs from mempool
transactions that touch hashX.
None, some or all of these may be spends of the hashX.
'''
deserializer = self.coin.DESERIALIZER
pairs = await self.raw_transactions(hashX)
result = set()
for hex_hash, raw_tx in pairs:
if not raw_tx:
continue
tx = deserializer(raw_tx).read_tx()
for txin in tx.inputs:
result.add((txin.prev_hash, txin.prev_idx))
return result
def value(self, hashX):
'''Return the unconfirmed amount in the mempool for hashX.
Can be positive or negative.
'''
value = 0
# hashXs is a defaultdict
if hashX in self.hashXs:
for hex_hash in self.hashXs[hashX]:
txin_pairs, txout_pairs, tx_fee, tx_size = self.txs[hex_hash]
value -= sum(v for h168, v in txin_pairs if h168 == hashX)
value += sum(v for h168, v in txout_pairs if h168 == hashX)
return value
def get_fee_histogram(self):
now = time.time()
if now > self.histogram_time + 30:
self.update_compact_histogram()
self.histogram_time = now
return self.compact_fee_histogram
def update_compact_histogram(self):
# For efficiency, get_fees returns a compact histogram with
# variable bin size. The compact histogram is an array of
# (fee, vsize) values. vsize_n is the cumulative virtual size
# of mempool transactions with a fee rate in the interval
# [fee_(n-1), fee_n)], and fee_(n-1) > fee_n. Fee intervals
# are chosen so as to create tranches that contain at least
# 100kb of transactions
items = list(reversed(sorted(self.fee_histogram.items())))
out = []
size = 0
r = 0
binsize = 100000
for fee, s in items:
size += s
if size + r > binsize:
out.append((fee, size))
r += size - binsize
size = 0
binsize *= 1.1
self.compact_fee_histogram = out

27
electrumx/server/session.py

@ -97,12 +97,13 @@ class SessionManager(object):
CATCHING_UP, LISTENING, PAUSED, SHUTTING_DOWN = range(4)
def __init__(self, env, tasks, chain_state, peer_mgr, notifications,
shutdown_event):
def __init__(self, env, tasks, chain_state, mempool, peer_mgr,
notifications, shutdown_event):
env.max_send = max(350000, env.max_send)
self.env = env
self.tasks = tasks
self.chain_state = chain_state
self.mempool = mempool
self.peer_mgr = peer_mgr
self.shutdown_event = shutdown_event
self.logger = util.class_logger(__name__, self.__class__.__name__)
@ -139,7 +140,7 @@ class SessionManager(object):
else:
protocol_class = self.env.coin.SESSIONCLS
protocol_factory = partial(protocol_class, self, self.chain_state,
self.peer_mgr, kind)
self.mempool, self.peer_mgr, kind)
server = loop.create_server(protocol_factory, *args, **kw_args)
host, port = args[:2]
@ -476,11 +477,12 @@ class SessionBase(ServerSession):
MAX_CHUNK_SIZE = 2016
session_counter = itertools.count()
def __init__(self, session_mgr, chain_state, peer_mgr, kind):
def __init__(self, session_mgr, chain_state, mempool, peer_mgr, kind):
super().__init__(rpc_protocol=JSONRPCAutoDetect)
self.logger = util.class_logger(__name__, self.__class__.__name__)
self.session_mgr = session_mgr
self.chain_state = chain_state
self.mempool = mempool
self.peer_mgr = peer_mgr
self.kind = kind # 'RPC', 'TCP' etc.
self.env = session_mgr.env
@ -727,7 +729,7 @@ class ElectrumX(SessionBase):
# Note history is ordered and mempool unordered in electrum-server
# For mempool, height is -1 if unconfirmed txins, otherwise 0
history = await self.chain_state.get_history(hashX)
mempool = await self.chain_state.mempool_transactions(hashX)
mempool = await self.mempool.transaction_summaries(hashX)
status = ''.join('{}:{:d}:'.format(hash_to_hex_str(tx_hash), height)
for tx_hash, height in history)
@ -750,8 +752,8 @@ class ElectrumX(SessionBase):
effects.'''
utxos = await self.chain_state.get_utxos(hashX)
utxos = sorted(utxos)
utxos.extend(self.chain_state.mempool_get_utxos(hashX))
spends = await self.chain_state.mempool_potential_spends(hashX)
utxos.extend(await self.mempool.unordered_UTXOs(hashX))
spends = await self.mempool.potential_spends(hashX)
return [{'tx_hash': hash_to_hex_str(utxo.tx_hash),
'tx_pos': utxo.tx_pos,
@ -807,7 +809,7 @@ class ElectrumX(SessionBase):
async def get_balance(self, hashX):
utxos = await self.chain_state.get_utxos(hashX)
confirmed = sum(utxo.value for utxo in utxos)
unconfirmed = self.chain_state.mempool_value(hashX)
unconfirmed = await self.mempool.balance_delta(hashX)
return {'confirmed': confirmed, 'unconfirmed': unconfirmed}
async def scripthash_get_balance(self, scripthash):
@ -818,7 +820,7 @@ class ElectrumX(SessionBase):
async def unconfirmed_history(self, hashX):
# Note unconfirmed history is unordered in electrum-server
# Height is -1 if unconfirmed txins, otherwise 0
mempool = await self.chain_state.mempool_transactions(hashX)
mempool = await self.mempool.transaction_summaries(hashX)
return [{'tx_hash': tx_hash, 'height': -unconfirmed, 'fee': fee}
for tx_hash, fee, unconfirmed in mempool]
@ -972,10 +974,6 @@ class ElectrumX(SessionBase):
return banner
def mempool_get_fee_histogram(self):
'''Memory pool fee histogram.'''
return self.chain_state.mempool_fee_histogram()
async def relayfee(self):
'''The minimum fee a low-priority tx must pay in order to be accepted
to the daemon's memory pool.'''
@ -1150,7 +1148,8 @@ class ElectrumX(SessionBase):
if ptuple >= (1, 2):
# New handler as of 1.2
handlers.update({
'mempool.get_fee_histogram': self.mempool_get_fee_histogram,
'mempool.get_fee_histogram':
self.mempool.compact_fee_histogram,
'blockchain.block.headers': self.block_headers_12,
'server.ping': self.ping,
})

Loading…
Cancel
Save