Browse Source

Rework prefetch logic

This also fixes a recent reorg bug...
patch-2
Neil Booth 7 years ago
parent
commit
3fbd4992ce
  1. 95
      electrumx/server/block_processor.py

95
electrumx/server/block_processor.py

@ -23,17 +23,15 @@ from electrumx.lib.util import chunks, formatted_time, class_logger
import electrumx.server.db
RAW_BLOCKS, PREFETCHER_CAUGHT_UP, REORG_CHAIN = range(3)
class Prefetcher(object):
'''Prefetches blocks (in the forward direction only).'''
def __init__(self, daemon, coin, queue):
def __init__(self, daemon, coin, blocks_event):
self.logger = class_logger(__name__, self.__class__.__name__)
self.daemon = daemon
self.coin = coin
self.queue = queue
self.blocks_event = blocks_event
self.blocks = []
self.caught_up = False
# Access to fetched_height should be protected by the semaphore
self.fetched_height = None
@ -57,11 +55,13 @@ class Prefetcher(object):
except DaemonError as e:
self.logger.info('ignoring daemon error: {}'.format(e))
def processing_blocks(self, blocks):
def get_prefetched_blocks(self):
'''Called by block processor when it is processing queued blocks.'''
self.cache_size -= sum(len(block) for block in blocks)
if self.cache_size < self.min_cache_size:
self.refill_event.set()
blocks = self.blocks
self.blocks = []
self.cache_size = 0
self.refill_event.set()
return blocks
async def reset_height(self, height):
'''Reset to prefetch blocks from the block processor's height.
@ -71,6 +71,8 @@ class Prefetcher(object):
must synchronize with a semaphore.
'''
async with self.semaphore:
self.blocks.clear()
self.cache_size = 0
self.fetched_height = height
self.refill_event.set()
@ -100,9 +102,7 @@ class Prefetcher(object):
count = min(daemon_height - self.fetched_height, cache_room)
count = min(500, max(count, 0))
if not count:
if not self.caught_up:
self.caught_up = True
await self.queue.put((PREFETCHER_CAUGHT_UP, ))
self.caught_up = True
return False
first = self.fetched_height + 1
@ -127,9 +127,10 @@ class Prefetcher(object):
else:
self.ave_size = (size + (10 - count) * self.ave_size) // 10
await self.queue.put((RAW_BLOCKS, blocks, first))
self.blocks.extend(blocks)
self.cache_size += size
self.fetched_height += count
self.blocks_event.set()
self.refill_event.clear()
return True
@ -159,16 +160,16 @@ class BlockProcessor(electrumx.server.db.DB):
self.daemon = daemon
self.notifications = notifications
# Work queue
self.queue = asyncio.Queue()
self._caught_up_event = asyncio.Event()
self.prefetcher = Prefetcher(daemon, env.coin, self.queue)
self.blocks_event = asyncio.Event()
self.prefetcher = Prefetcher(daemon, env.coin, self.blocks_event)
# Meta
self.cache_MB = env.cache_MB
self.next_cache_check = 0
self.last_flush = time.time()
self.touched = set()
self.reorg_count = 0
# Header merkle cache
self.merkle = Merkle()
@ -197,20 +198,11 @@ class BlockProcessor(electrumx.server.db.DB):
'''
self.callbacks.append(callback)
async def check_and_advance_blocks(self, raw_blocks, first):
async def check_and_advance_blocks(self, raw_blocks):
'''Process the list of raw blocks passed. Detects and handles
reorgs.
'''
self.prefetcher.processing_blocks(raw_blocks)
if first != self.height + 1:
# If we prefetched two sets of blocks and the first caused
# a reorg this will happen when we try to process the
# second. It should be very rare.
self.logger.warning('ignoring {:,d} blocks starting height {:,d}, '
'expected {:,d}'.format(len(raw_blocks), first,
self.height + 1))
return
first = self.height + 1
blocks = [self.coin.block(raw_block, first + n)
for n, raw_block in enumerate(raw_blocks)]
headers = [block.header for block in blocks]
@ -748,20 +740,28 @@ class BlockProcessor(electrumx.server.db.DB):
self.db_height = self.height
self.db_tip = self.tip
async def _process_queue(self):
'''Loop forever processing enqueued work.'''
async def _process_blocks(self):
'''Loop forever processing blocks as they arrive.'''
while True:
work, *args = await self.queue.get()
if work == RAW_BLOCKS:
raw_blocks, first = args
await self.check_and_advance_blocks(raw_blocks, first)
elif work == PREFETCHER_CAUGHT_UP:
self._caught_up_event.set()
# Initialise the notification framework
await self.notifications.on_block(set(), self.height)
elif work == REORG_CHAIN:
count, = args
await self.reorg_chain(count)
if self.height == self.daemon.cached_height():
if not self._caught_up_event.is_set():
self.logger.info(f'caught up to height {self.height}')
self._caught_up_event.set()
# Flush everything but with first_sync->False state.
first_sync = self.first_sync
self.first_sync = False
self.flush(True)
if first_sync:
self.logger.info(f'{electrumx.version} synced to '
f'height {self.height:,d}')
await self.blocks_event.wait()
self.blocks_event.clear()
if self.reorg_count:
await self.reorg_chain(self.reorg_count)
self.reorg_count = 0
else:
blocks = self.prefetcher.get_prefetched_blocks()
await self.check_and_advance_blocks(blocks)
def _on_dbs_opened(self):
# An incomplete compaction needs to be cancelled otherwise
@ -792,16 +792,11 @@ class BlockProcessor(electrumx.server.db.DB):
self.tasks.create_task(self.prefetcher.main_loop())
await self.prefetcher.reset_height(self.height)
# Start our loop that processes blocks as they are fetched
self.worker_task = self.tasks.create_task(self._process_queue())
self.worker_task = self.tasks.create_task(self._process_blocks())
# Wait until caught up
await self._caught_up_event.wait()
# Flush everything but with first_sync->False state.
first_sync = self.first_sync
self.first_sync = False
self.flush(True)
if first_sync:
self.logger.info(f'{electrumx.version} synced to '
f'height {self.height:,d}')
# Initialise the notification framework
await self.notifications.on_block(set(), self.height)
# Reopen for serving
await self.open_for_serving()
@ -816,7 +811,9 @@ class BlockProcessor(electrumx.server.db.DB):
Returns True if a reorg is queued, false if not caught up.
'''
if self._caught_up_event.is_set():
self.queue.put_nowait((REORG_CHAIN, count))
if count > 0:
self.reorg_count = count
self.blocks_event.set()
return True
return False

Loading…
Cancel
Save