Browse Source

Don't shut down block processor by cancellation

The block processor needs to be able to close cleanly, and not
mid-block.  In order to be able to yield whilst processing blocks
we cannot forcefully close its coroutine with a cancellation.
master
Neil Booth 8 years ago
parent
commit
6aef79461f
  1. 162
      server/block_processor.py
  2. 2
      server/db.py
  3. 6
      server/protocol.py

162
server/block_processor.py

@ -23,6 +23,10 @@ from lib.util import chunks, formatted_time, LoggedClass
import server.db
# Tasks placed on task queue
BLOCKS, CAUGHT_UP = range(2)
class ChainError(Exception):
pass
@ -30,17 +34,19 @@ class ChainError(Exception):
class Prefetcher(LoggedClass):
'''Prefetches blocks (in the forward direction only).'''
def __init__(self, daemon, height):
def __init__(self, tasks, daemon, height):
super().__init__()
self.tasks = tasks
self.daemon = daemon
self.semaphore = asyncio.Semaphore()
self.queue = asyncio.Queue()
self.queue_size = 0
self.caught_up = False
self.fetched_height = height
# A list of (blocks, size) pairs. Earliest last.
self.cache = []
self.cache_size = 0
# Target cache size. Has little effect on sync time.
self.target_cache_size = 10 * 1024 * 1024
# First fetch to be 10 blocks
# This makes the first fetch be 10 blocks
self.ave_size = self.target_cache_size // 10
async def clear(self, height):
@ -53,19 +59,19 @@ class Prefetcher(LoggedClass):
with await self.semaphore:
while not self.queue.empty():
self.queue.get_nowait()
self.queue_size = 0
self.cache = []
self.cache_size = 0
self.fetched_height = height
self.caught_up = False
self.logger.info('reset to height'.format(height))
async def get_blocks(self):
'''Blocking function that returns prefetched blocks.
The returned result empty just once - when the prefetcher
has caught up with the daemon.
'''
blocks, size = await self.queue.get()
self.queue_size -= size
return blocks
def get_blocks(self):
'''Return the next list of blocks from our prefetch cache.'''
# Cache might be empty after a clear()
if self.cache:
blocks, size = self.cache.pop()
self.cache_size -= size
return blocks
return []
async def main_loop(self):
'''Loop forever polling for more blocks.'''
@ -73,9 +79,15 @@ class Prefetcher(LoggedClass):
.format(await self.daemon.height()))
while True:
try:
with await self.semaphore:
await self._prefetch()
await asyncio.sleep(5 if self.caught_up else 0)
secs = 0
if self.cache_size < self.target_cache_size:
if await self._prefetch():
self.tasks.put_nowait(BLOCKS)
else:
self.tasks.put_nowait(CAUGHT_UP)
self.caught_up = True
secs = 5
await asyncio.sleep(secs)
except DaemonError as e:
self.logger.info('ignoring daemon error: {}'.format(e))
except asyncio.CancelledError:
@ -83,40 +95,37 @@ class Prefetcher(LoggedClass):
async def _prefetch(self):
'''Prefetch blocks unless the prefetch queue is full.'''
if self.queue_size >= self.target_cache_size:
return
daemon_height = await self.daemon.height()
cache_room = self.target_cache_size // self.ave_size
# Try and catch up all blocks but limit to room in cache.
# Constrain count to between 0 and 4000 regardless
count = min(daemon_height - self.fetched_height, cache_room)
count = min(4000, max(count, 0))
if not count:
# Indicate when we have caught up for the first time only
if not self.caught_up:
self.caught_up = True
self.queue.put_nowait(([], 0))
return
with await self.semaphore:
# Try and catch up all blocks but limit to room in cache.
# Constrain count to between 0 and 4000 regardless
count = min(daemon_height - self.fetched_height, cache_room)
count = min(4000, max(count, 0))
if not count:
return 0
first = self.fetched_height + 1
hex_hashes = await self.daemon.block_hex_hashes(first, count)
if self.caught_up:
self.logger.info('new block height {:,d} hash {}'
.format(first + count - 1, hex_hashes[-1]))
blocks = await self.daemon.raw_blocks(hex_hashes)
first = self.fetched_height + 1
hex_hashes = await self.daemon.block_hex_hashes(first, count)
if self.caught_up:
self.logger.info('new block height {:,d} hash {}'
.format(first + count - 1, hex_hashes[-1]))
blocks = await self.daemon.raw_blocks(hex_hashes)
size = sum(len(block) for block in blocks)
# Update our recent average block size estimate
if count >= 10:
self.ave_size = size // count
else:
self.ave_size = (size + (10 - count) * self.ave_size) // 10
size = sum(len(block) for block in blocks)
# Update our recent average block size estimate
if count >= 10:
self.ave_size = size // count
else:
self.ave_size = (size + (10 - count) * self.ave_size) // 10
self.fetched_height += len(blocks)
self.queue.put_nowait((blocks, size))
self.queue_size += size
self.cache.insert(0, (blocks, size))
self.cache_size += size
self.fetched_height += len(blocks)
return count
class ChainReorg(Exception):
@ -135,6 +144,9 @@ class BlockProcessor(server.db.DB):
self.client = client
# The block processor reads its tasks from this queue
self.tasks = asyncio.Queue()
# These are our state as we move ahead of DB state
self.fs_height = self.db_height
self.fs_tx_count = self.db_tx_count
@ -144,6 +156,7 @@ class BlockProcessor(server.db.DB):
self.daemon = Daemon(self.coin.daemon_urls(env.daemon_url))
self.caught_up = False
self._shutdown = False
self.event = asyncio.Event()
# Meta
@ -154,7 +167,7 @@ class BlockProcessor(server.db.DB):
# Headers and tx_hashes have one entry per block
self.history = defaultdict(partial(array.array, 'I'))
self.history_size = 0
self.prefetcher = Prefetcher(self.daemon, self.height)
self.prefetcher = Prefetcher(self.tasks, self.daemon, self.height)
self.last_flush = time.time()
self.last_flush_tx_count = self.tx_count
@ -176,32 +189,37 @@ class BlockProcessor(server.db.DB):
async def main_loop(self):
'''Main loop for block processing.'''
try:
# Simulate a reorg if requested
if self.env.force_reorg > 0:
self.logger.info('DEBUG: simulating reorg of {:,d} blocks'
.format(self.env.force_reorg))
await self.handle_chain_reorg(self.env.force_reorg, set())
while True:
await self._wait_for_update()
except asyncio.CancelledError:
pass
async def shutdown(self):
'''Shut down the DB cleanly.'''
self.logger.info('flushing state to DB for clean shutdown...')
# Simulate a reorg if requested
if self.env.force_reorg > 0:
self.logger.info('DEBUG: simulating reorg of {:,d} blocks'
.format(self.env.force_reorg))
await self.handle_chain_reorg(self.env.force_reorg, set())
while True:
task = await self.tasks.get()
if self._shutdown:
break
if task == BLOCKS:
await self.advance_blocks()
else:
assert task == CAUGHT_UP
if not self.caught_up:
self.caught_up = True
self.first_caught_up()
self.flush(True)
async def _wait_for_update(self):
'''Wait for the prefetcher to deliver blocks.
def shutdown(self):
'''Call to shut down the block processor.'''
self.logger.info('flushing state to DB for clean shutdown...')
self._shutdown = True
# Ensure we don't sit waiting for a task
self.tasks.put_nowait(BLOCKS)
Blocks are only processed in the forward direction.
'''
blocks = await self.prefetcher.get_blocks()
if not blocks:
self.first_caught_up()
return
async def advance_blocks(self):
'''Take blocks from the prefetcher and process them.'''
blocks = self.prefetcher.get_blocks()
'''Strip the unspendable genesis coinbase.'''
if self.height == -1:
@ -226,7 +244,6 @@ class BlockProcessor(server.db.DB):
def first_caught_up(self):
'''Called when first caught up after start, or after a reorg.'''
self.caught_up = True
if self.first_sync:
self.first_sync = False
self.logger.info('{} synced to height {:,d}'
@ -253,7 +270,6 @@ class BlockProcessor(server.db.DB):
self.backup_blocks(blocks, touched)
await self.prefetcher.clear(self.height)
self.logger.info('prefetcher reset')
async def reorg_hashes(self, count):
'''Return the list of hashes to back up beacuse of a reorg.

2
server/db.py

@ -270,8 +270,6 @@ class DB(LoggedClass):
cursor += size
file_pos += size
os.sync()
def read_headers(self, start, count):
'''Requires count >= 0.'''
# Read some from disk

6
server/protocol.py

@ -254,6 +254,7 @@ class ServerManager(util.LoggedClass):
def add_future(coro):
self.futures.append(asyncio.ensure_future(coro))
# shutdown() assumes bp.main_loop() is first
add_future(self.bp.main_loop())
add_future(self.bp.prefetcher.main_loop())
add_future(self.mempool.main_loop(self.bp.event))
@ -316,7 +317,9 @@ class ServerManager(util.LoggedClass):
async def shutdown(self):
'''Call to shutdown the servers. Returns when done.'''
for future in self.futures:
self.bp.shutdown()
# Don't cancel the block processor main loop - let it close itself
for future in self.futures[1:]:
future.cancel()
for server in self.servers:
server.close()
@ -326,7 +329,6 @@ class ServerManager(util.LoggedClass):
await asyncio.sleep(0)
if self.sessions:
await self.close_sessions()
await self.bp.shutdown()
async def close_sessions(self, secs=60):
self.logger.info('cleanly closing client sessions, please wait...')

Loading…
Cancel
Save