Browse Source

Synchronize daemon height and mempool fetching

Cleanup and simplify touched handling and its event, which is
now controlled and owned by the mempool.
The daemon object owns the set of current mempool hashes.
Clean up and simplify the mempool main loop.

Fixes #70.
master
Neil Booth 8 years ago
parent
commit
5fe49bb261
  1. 6
      RELEASE-NOTES
  2. 21
      server/block_processor.py
  3. 9
      server/daemon.py
  4. 148
      server/mempool.py
  5. 19
      server/protocol.py
  6. 2
      server/version.py

6
RELEASE-NOTES

@ -1,3 +1,9 @@
version 0.9.8
-------------
- cleanup up mempool handling, notify of addresses only once when a new block
comes in. Fixes issue 70.
version 0.9.7
-------------

21
server/block_processor.py

@ -85,9 +85,13 @@ class Prefetcher(LoggedClass):
async def _prefetch(self):
'''Prefetch blocks unless the prefetch queue is full.'''
# Refresh the mempool after updating the daemon height, if and
# only if we've caught up
daemon_height = await self.daemon.height()
cache_room = self.target_cache_size // self.ave_size
if self.caught_up:
await self.daemon.refresh_mempool_hashes()
cache_room = self.target_cache_size // self.ave_size
with await self.semaphore:
# Try and catch up all blocks but limit to room in cache.
# Constrain count to between 0 and 4000 regardless
@ -132,7 +136,7 @@ class BlockProcessor(server.db.DB):
Coordinate backing up in case of chain reorganisations.
'''
def __init__(self, env, touched, touched_event):
def __init__(self, env):
super().__init__(env)
# The block processor reads its tasks from this queue
@ -149,8 +153,6 @@ class BlockProcessor(server.db.DB):
self.caught_up = False
self._shutdown = False
self.event = asyncio.Event()
self.touched = touched
self.touched_event = touched_event
# Meta
self.utxo_MB = env.utxo_MB
@ -180,7 +182,7 @@ class BlockProcessor(server.db.DB):
self.logger.info('flushing history cache at {:,d} MB'
.format(self.hist_MB))
async def main_loop(self):
async def main_loop(self, touched):
'''Main loop for block processing.'''
# Simulate a reorg if requested
@ -195,7 +197,7 @@ class BlockProcessor(server.db.DB):
break
blocks = self.prefetcher.get_blocks()
if blocks:
await self.advance_blocks(blocks)
await self.advance_blocks(blocks, touched)
elif not self.caught_up:
self.caught_up = True
self.first_caught_up()
@ -209,7 +211,7 @@ class BlockProcessor(server.db.DB):
self._shutdown = True
self.tasks.put_nowait(None)
async def advance_blocks(self, blocks):
async def advance_blocks(self, blocks, touched):
'''Strip the unspendable genesis coinbase.'''
if self.height == -1:
blocks[0] = blocks[0][:self.coin.HEADER_LEN] + bytes(1)
@ -218,7 +220,7 @@ class BlockProcessor(server.db.DB):
for block in blocks:
if self._shutdown:
break
self.advance_block(block, self.touched)
self.advance_block(block, touched)
loop = asyncio.get_event_loop()
try:
@ -227,14 +229,13 @@ class BlockProcessor(server.db.DB):
else:
do_it()
except ChainReorg:
await self.handle_chain_reorg(self.touched)
await self.handle_chain_reorg(touched)
if self.caught_up:
# Flush everything as queries are performed on the DB and
# not in-memory.
await asyncio.sleep(0)
self.flush(True)
self.touched_event.set()
elif time.time() > self.next_cache_check:
self.check_cache_size()
self.next_cache_check = time.time() + 60

9
server/daemon.py

@ -36,6 +36,8 @@ class Daemon(util.LoggedClass):
self.urls = urls
self.url_index = 0
self._height = None
self.mempool_hashes = set()
self.mempool_refresh_event = asyncio.Event()
# Limit concurrent RPC calls to this number.
# See DEFAULT_HTTP_WORKQUEUE in bitcoind, which is typically 16
self.workqueue_semaphore = asyncio.Semaphore(value=10)
@ -148,9 +150,10 @@ class Daemon(util.LoggedClass):
# Convert hex string to bytes
return [bytes.fromhex(block) for block in blocks]
async def mempool_hashes(self):
'''Return the hashes of the txs in the daemon's mempool.'''
return await self._send_single('getrawmempool')
async def refresh_mempool_hashes(self):
'''Update our record of the daemon's mempool hashes.'''
self.mempool_hashes = set(await self._send_single('getrawmempool'))
self.mempool_refresh_event.set()
async def estimatefee(self, params):
'''Return the fee estimate for the given parameters.'''

148
server/mempool.py

@ -33,92 +33,31 @@ class MemPool(util.LoggedClass):
A pair is a (hash168, value) tuple. tx hashes are hex strings.
'''
def __init__(self, daemon, coin, db, touched, touched_event):
def __init__(self, daemon, coin, db):
super().__init__()
self.daemon = daemon
self.coin = coin
self.db = db
self.touched = touched
self.touched_event = touched_event
self.touched = set()
self.touched_event = asyncio.Event()
self.stop = False
self.txs = {}
self.hash168s = defaultdict(set) # None can be a key
async def main_loop(self, caught_up):
'''Asynchronously maintain mempool status with daemon.
Waits until the caught up event is signalled.'''
await caught_up.wait()
self.logger.info('beginning processing of daemon mempool. '
'This can take some time...')
try:
await self.fetch_and_process()
except asyncio.CancelledError:
# This aids clean shutdowns
self.stop = True
async def fetch_and_process(self):
'''The inner loop unprotected by try / except.'''
unfetched = set()
unprocessed = {}
log_every = 150
log_secs = 0
fetch_size = 400
process_some = self.async_process_some(unfetched, fetch_size // 2)
next_refresh = 0
# The list of mempool hashes is fetched no more frequently
# than this number of seconds
refresh_secs = 5
while True:
try:
now = time.time()
if now >= next_refresh:
await self.new_hashes(unprocessed, unfetched)
next_refresh = now + refresh_secs
log_secs -= refresh_secs
# Fetch some txs if unfetched ones remain
if unfetched:
count = min(len(unfetched), fetch_size)
hex_hashes = [unfetched.pop() for n in range(count)]
unprocessed.update(await self.fetch_raw_txs(hex_hashes))
# Process some txs if unprocessed ones remain
if unprocessed:
await process_some(unprocessed)
def resync_daemon_hashes(self, unprocessed, unfetched):
'''Re-sync self.txs with the list of hashes in the daemon's mempool.
# Avoid double notifications if processing a block
if self.touched and not self.processing_new_block():
self.touched_event.set()
if not unprocessed:
if log_secs <= 0:
log_secs = log_every
self.logger.info('{:,d} txs touching {:,d} addresses'
.format(len(self.txs),
len(self.hash168s)))
await asyncio.sleep(1)
except DaemonError as e:
self.logger.info('ignoring daemon error: {}'.format(e))
def processing_new_block(self):
'''Return True if we're processing a new block.'''
return self.daemon.cached_height() > self.db.db_height
async def new_hashes(self, unprocessed, unfetched):
'''Get the current list of hashes in the daemon's mempool.
Remove ones that have disappeared from self.txs and unprocessed.
Additionally, remove gone hashes from unprocessed and
unfetched. Add new ones to unprocessed.
'''
txs = self.txs
hash168s = self.hash168s
touched = self.touched
hashes = set(await self.daemon.mempool_hashes())
new = hashes.difference(txs)
hashes = self.daemon.mempool_hashes
gone = set(txs).difference(hashes)
for hex_hash in gone:
unfetched.discard(hex_hash)
unprocessed.pop(hex_hash, None)
item = txs.pop(hex_hash)
if item:
@ -131,18 +70,71 @@ class MemPool(util.LoggedClass):
del hash168s[hash168]
touched.update(tx_hash168s)
new = hashes.difference(txs)
unfetched.update(new)
for hex_hash in new:
txs[hex_hash] = None
async def main_loop(self):
'''Asynchronously maintain mempool status with daemon.
Processes the mempool each time the daemon's mempool refresh
event is signalled.
'''
unprocessed = {}
unfetched = set()
txs = self.txs
fetch_size = 400
process_some = self.async_process_some(unfetched, fetch_size // 2)
await self.daemon.mempool_refresh_event.wait()
self.logger.info ('beginning processing of daemon mempool. '
'This can take some time...')
next_log = time.time() + 0.1
while True:
try:
todo = len(unfetched) + len(unprocessed)
if todo:
pct = (len(txs) - todo) * 100 // len(txs) if txs else 0
self.logger.info('catchup {:d}% complete ({:,d} txs left)'
.format(pct, todo))
else:
now = time.time()
if now >= next_log:
self.logger.info('{:,d} txs touching {:,d} addresses'
.format(len(txs), len(self.hash168s)))
next_log = now + 150
await self.daemon.mempool_refresh_event.wait()
self.resync_daemon_hashes(unprocessed, unfetched)
self.daemon.mempool_refresh_event.clear()
if unfetched:
count = min(len(unfetched), fetch_size)
hex_hashes = [unfetched.pop() for n in range(count)]
unprocessed.update(await self.fetch_raw_txs(hex_hashes))
if unprocessed:
await process_some(unprocessed)
# Avoid double notifications if processing a block
if self.touched and not self.processing_new_block():
self.touched_event.set()
except DaemonError as e:
self.logger.info('ignoring daemon error: {}'.format(e))
except asyncio.CancelledError:
# This aids clean shutdowns
self.stop = True
break
def async_process_some(self, unfetched, limit):
loop = asyncio.get_event_loop()
pending = []
txs = self.txs
first = True
async def process(unprocessed):
nonlocal first, pending
nonlocal pending
raw_txs = {}
while unprocessed and len(raw_txs) < limit:
@ -169,16 +161,12 @@ class MemPool(util.LoggedClass):
touched.add(hash168)
hash168s[hash168].add(hex_hash)
to_do = len(unfetched) + len(unprocessed)
if to_do and txs:
percent = max(0, len(txs) - to_do) * 100 // len(txs)
self.logger.info('catchup {:d}% complete'.format(percent))
elif first:
first = False
self.logger.info('caught up')
return process
def processing_new_block(self):
'''Return True if we're processing a new block.'''
return self.daemon.cached_height() > self.db.db_height
async def fetch_raw_txs(self, hex_hashes):
'''Fetch a list of mempool transactions.'''
raw_txs = await self.daemon.getrawtransactions(hex_hashes)

19
server/protocol.py

@ -53,12 +53,9 @@ class ServerManager(util.LoggedClass):
def __init__(self, env):
super().__init__()
self.loop = asyncio.get_event_loop()
self.touched = set()
self.touched_event = asyncio.Event()
self.start = time.time()
self.bp = BlockProcessor(env, self.touched, self.touched_event)
self.mempool = MemPool(self.bp.daemon, env.coin, self.bp,
self.touched, self.touched_event)
self.bp = BlockProcessor(env)
self.mempool = MemPool(self.bp.daemon, env.coin, self.bp)
self.irc = IRC(env)
self.env = env
self.servers = {}
@ -178,11 +175,11 @@ class ServerManager(util.LoggedClass):
self.futures.append(asyncio.ensure_future(coro))
# shutdown() assumes bp.main_loop() is first
add_future(self.bp.main_loop())
add_future(self.bp.main_loop(self.mempool.touched))
add_future(self.bp.prefetcher.main_loop())
add_future(self.irc.start(self.bp.event))
add_future(self.start_servers(self.bp.event))
add_future(self.mempool.main_loop(self.bp.event))
add_future(self.mempool.main_loop())
add_future(self.enqueue_delayed_sessions())
add_future(self.notify())
for n in range(4):
@ -245,10 +242,10 @@ class ServerManager(util.LoggedClass):
async def notify(self):
'''Notify sessions about height changes and touched addresses.'''
while True:
await self.touched_event.wait()
touched = self.touched.copy()
self.touched.clear()
self.touched_event.clear()
await self.mempool.touched_event.wait()
touched = self.mempool.touched.copy()
self.mempool.touched.clear()
self.mempool.touched_event.clear()
# Invalidate caches
hc = self.history_cache

2
server/version.py

@ -1 +1 @@
VERSION = "ElectrumX 0.9.7"
VERSION = "ElectrumX 0.9.8"

Loading…
Cancel
Save