Browse Source

Rework main block processor loop

It's less awkward and more explicit.
This brings back the efficiency lost in the 0.9.x series.
It also removes the special case hack: both when syncing and
caught up, block processing is done in the executor.

Fixes #58
master
Neil Booth 8 years ago
parent
commit
39af7a7463
  1. 166
      server/block_processor.py
  2. 6
      server/protocol.py

166
server/block_processor.py

@ -26,81 +26,95 @@ import server.db
class Prefetcher(LoggedClass):
'''Prefetches blocks (in the forward direction only).'''
def __init__(self, tasks, daemon, height):
def __init__(self, coin, daemon, height):
super().__init__()
self.tasks = tasks
self.coin = coin
self.daemon = daemon
self.semaphore = asyncio.Semaphore()
self.caught_up = False
# Access to fetched_height should be protected by the semaphore
self.fetched_height = height
# A list of (blocks, size) pairs. Earliest last.
self.cache = []
self.semaphore = asyncio.Semaphore()
self.refill_event = asyncio.Event()
# A cache queue of (blocks, size) pairs. The target cache
# size has little effect on sync time.
self.cache = asyncio.Queue()
self.cache_size = 0
# Target cache size. Has little effect on sync time.
self.target_cache_size = 10 * 1024 * 1024
self.min_cache_size = 10 * 1024 * 1024
# This makes the first fetch be 10 blocks
self.ave_size = self.target_cache_size // 10
self.ave_size = self.min_cache_size // 10
async def clear(self, height):
'''Clear prefetched blocks and restart from the given height.
Used in blockchain reorganisations. This coroutine can be
called asynchronously to the _prefetch coroutine so we must
synchronize.
synchronize with a semaphore.
Set height to -1 when shutting down to place a sentinel on the
queue to tell the block processor to shut down too.
'''
with await self.semaphore:
while not self.tasks.empty():
self.tasks.get_nowait()
self.cache = []
while not self.cache.empty():
self.cache.get_nowait()
self.cache_size = 0
if height == -1:
self.cache.put_nowait((None, 0))
else:
self.refill_event.set()
self.fetched_height = height
self.logger.info('reset to height'.format(height))
def get_blocks(self):
'''Return the next list of blocks from our prefetch cache.'''
# Cache might be empty after a clear()
if self.cache:
blocks, size = self.cache.pop()
async def get_blocks(self):
'''Return the next list of blocks from our prefetch cache.
A return value of None indicates to shut down. Once caught up
an entry is queued every few seconds synchronized with mempool
refreshes to indicate a new mempool is available. Of course
the list of blocks in such a case will normally be empty.'''
blocks, size = await self.cache.get()
self.cache_size -= size
if self.cache_size < self.min_cache_size:
self.refill_event.set()
return blocks
return []
async def main_loop(self):
'''Loop forever polling for more blocks.'''
daemon_height = await self.daemon.height()
if daemon_height > self.fetched_height:
log_msg = 'catching up to daemon height {:,d}...'
else:
if self.fetched_height >= daemon_height:
log_msg = 'caught up to daemon height {:,d}'
else:
log_msg = 'catching up to daemon height {:,d}...'
self.logger.info(log_msg.format(daemon_height))
while True:
try:
secs = 0
if self.cache_size < self.target_cache_size:
if not await self._prefetch():
self.caught_up = True
secs = 5
self.tasks.put_nowait(None)
await asyncio.sleep(secs)
with await self.semaphore:
await self._prefetch_blocks()
await self.refill_event.wait()
except DaemonError as e:
self.logger.info('ignoring daemon error: {}'.format(e))
except asyncio.CancelledError:
break
await self.clear(-1)
return
async def _prefetch_blocks(self):
'''Prefetch some blocks and put them on the queue.
async def _prefetch(self):
'''Prefetch blocks unless the prefetch queue is full.'''
# Refresh the mempool after updating the daemon height, if and
# only if we've caught up
Repeats until the queue is full or caught up. If caught up,
sleep for a period of time before returning.
'''
daemon_height = await self.daemon.height(mempool=self.caught_up)
cache_room = self.target_cache_size // self.ave_size
with await self.semaphore:
while self.cache_size < self.min_cache_size:
# Try and catch up all blocks but limit to room in cache.
# Constrain count to between 0 and 4000 regardless
# Constrain fetch count to between 0 and 2500 regardless.
cache_room = self.min_cache_size // self.ave_size
count = min(daemon_height - self.fetched_height, cache_room)
count = min(4000, max(count, 0))
count = min(2500, max(count, 0))
if not count:
return 0
self.cache.put_nowait(([], 0))
self.caught_up = True
await asyncio.sleep(5)
return
first = self.fetched_height + 1
hex_hashes = await self.daemon.block_hex_hashes(first, count)
@ -108,20 +122,24 @@ class Prefetcher(LoggedClass):
self.logger.info('new block height {:,d} hash {}'
.format(first + count - 1, hex_hashes[-1]))
blocks = await self.daemon.raw_blocks(hex_hashes)
assert count == len(blocks)
size = sum(len(block) for block in blocks)
# Strip the unspendable genesis coinbase
if first == 0:
blocks[0] = blocks[0][:self.coin.HEADER_LEN] + bytes(1)
# Update our recent average block size estimate
size = sum(len(block) for block in blocks)
if count >= 10:
self.ave_size = size // count
else:
self.ave_size = (size + (10 - count) * self.ave_size) // 10
self.cache.insert(0, (blocks, size))
self.cache.put_nowait((blocks, size))
self.cache_size += size
self.fetched_height += len(blocks)
self.fetched_height += count
return count
self.refill_event.clear()
class ChainError(Exception):
@ -141,9 +159,6 @@ class BlockProcessor(server.db.DB):
def __init__(self, env):
super().__init__(env)
# The block processor reads its tasks from this queue
self.tasks = asyncio.Queue()
# These are our state as we move ahead of DB state
self.fs_height = self.db_height
self.fs_tx_count = self.db_tx_count
@ -152,9 +167,7 @@ class BlockProcessor(server.db.DB):
self.tx_count = self.db_tx_count
self.daemon = Daemon(self.coin.daemon_urls(env.daemon_url))
self.caught_up = False
self._shutdown = False
self.event = asyncio.Event()
self.caught_up_event = asyncio.Event()
# Meta
self.utxo_MB = env.utxo_MB
@ -164,7 +177,7 @@ class BlockProcessor(server.db.DB):
# Headers and tx_hashes have one entry per block
self.history = defaultdict(partial(array.array, 'I'))
self.history_size = 0
self.prefetcher = Prefetcher(self.tasks, self.daemon, self.height)
self.prefetcher = Prefetcher(self.coin, self.daemon, self.height)
self.last_flush = time.time()
self.last_flush_tx_count = self.tx_count
@ -194,52 +207,32 @@ class BlockProcessor(server.db.DB):
await self.handle_chain_reorg(set(), self.env.force_reorg)
while True:
task = await self.tasks.get()
if self._shutdown:
break
blocks = self.prefetcher.get_blocks()
blocks = await self.prefetcher.get_blocks()
if blocks:
start = time.time()
await self.advance_blocks(blocks, touched)
if not self.first_sync:
s = '' if len(blocks) == 1 else 's'
self.logger.info('processed {:,d} block{} in {:.1f}s'
.format(len(blocks), s,
time.time() - start))
elif not self.caught_up:
self.caught_up = True
self.first_caught_up()
elif blocks is None:
break # Shutdown
else:
self.caught_up()
self.logger.info('flushing state to DB for a clean shutdown...')
self.flush(True)
self.logger.info('shut down complete')
def shutdown(self):
'''Call to shut down the block processor.'''
self.logger.info('flushing state to DB for clean shutdown...')
self._shutdown = True
self.tasks.put_nowait(None)
self.logger.info('shutdown complete')
async def advance_blocks(self, blocks, touched):
'''Strip the unspendable genesis coinbase.'''
if self.height == -1:
blocks[0] = blocks[0][:self.coin.HEADER_LEN] + bytes(1)
def do_it():
'''Process the list of blocks passed. Detects and handles reorgs.'''
def job():
for block in blocks:
if self._shutdown:
break
self.advance_block(block, touched)
start = time.time()
loop = asyncio.get_event_loop()
try:
if self.caught_up:
await loop.run_in_executor(None, do_it)
else:
do_it()
await loop.run_in_executor(None, job)
except ChainReorg:
await self.handle_chain_reorg(touched)
if self.caught_up:
if self.caught_up_event.is_set():
# Flush everything as queries are performed on the DB and
# not in-memory.
await asyncio.sleep(0)
@ -248,8 +241,15 @@ class BlockProcessor(server.db.DB):
self.check_cache_size()
self.next_cache_check = time.time() + 60
def first_caught_up(self):
if not self.first_sync:
s = '' if len(blocks) == 1 else 's'
self.logger.info('processed {:,d} block{} in {:.1f}s'
.format(len(blocks), s,
time.time() - start))
def caught_up(self):
'''Called when first caught up after starting.'''
if not self.caught_up_event.is_set():
self.flush(True)
if self.first_sync:
self.logger.info('{} synced to height {:,d}'
@ -257,7 +257,7 @@ class BlockProcessor(server.db.DB):
self.first_sync = False
self.flush_state(self.db)
self.reopen_db(False)
self.event.set()
self.caught_up_event.set()
async def handle_chain_reorg(self, touched, count=None):
'''Handle a chain reorganisation.

6
server/protocol.py

@ -183,8 +183,8 @@ class ServerManager(util.LoggedClass):
# shutdown() assumes bp.main_loop() is first
add_future(self.bp.main_loop(self.mempool.touched))
add_future(self.bp.prefetcher.main_loop())
add_future(self.irc.start(self.bp.event))
add_future(self.start_servers(self.bp.event))
add_future(self.irc.start(self.bp.caught_up_event))
add_future(self.start_servers(self.bp.caught_up_event))
add_future(self.mempool.main_loop())
add_future(self.enqueue_delayed_sessions())
add_future(self.notify())
@ -307,12 +307,12 @@ class ServerManager(util.LoggedClass):
'''Call to shutdown everything. Returns when done.'''
self.state = self.SHUTTING_DOWN
self.close_servers(list(self.servers.keys()))
self.bp.shutdown()
# Don't cancel the block processor main loop - let it close itself
for future in self.futures[1:]:
future.cancel()
if self.sessions:
await self.close_sessions()
await self.futures[0]
async def close_sessions(self, secs=30):
self.logger.info('cleanly closing client sessions, please wait...')

Loading…
Cancel
Save