Browse Source

Put flushing-to-DB in a thread

- flush() and backup_flush() are now async
patch-2
Neil Booth 7 years ago
parent
commit
955a8e927d
  1. 85
      electrumx/server/block_processor.py

85
electrumx/server/block_processor.py

@ -56,7 +56,7 @@ class Prefetcher(object):
if not await self._prefetch_blocks(): if not await self._prefetch_blocks():
await asyncio.sleep(5) await asyncio.sleep(5)
except DaemonError as e: except DaemonError as e:
self.logger.info('ignoring daemon error: {}'.format(e)) self.logger.info(f'ignoring daemon error: {e}')
def get_prefetched_blocks(self): def get_prefetched_blocks(self):
'''Called by block processor when it is processing queued blocks.''' '''Called by block processor when it is processing queued blocks.'''
@ -183,10 +183,26 @@ class BlockProcessor(electrumx.server.db.DB):
# is consistent with self.height # is consistent with self.height
self.state_lock = asyncio.Lock() self.state_lock = asyncio.Lock()
async def run_in_thread_shielded(self, func, *args): async def run_in_thread_with_lock(self, func, *args):
# Run in a thread to prevent blocking. Shielded so that
# cancellations from shutdown don't lose work - when the task
# completes the data will be flushed and then we shut down.
# Take the state lock to be certain in-memory state is
# consistent and not being updated elsewhere.
async with self.state_lock: async with self.state_lock:
return await asyncio.shield(run_in_thread(func, *args)) return await asyncio.shield(run_in_thread(func, *args))
async def _maybe_flush(self):
# If caught up, flush everything as client queries are
# performed on the DB.
if self._caught_up_event.is_set():
await self.flush(True)
elif time.time() > self.next_cache_check:
flush_arg = self.check_cache_size()
if flush_arg is not None:
await self.flush(flush_arg)
self.next_cache_check = time.time() + 30
async def check_and_advance_blocks(self, raw_blocks): async def check_and_advance_blocks(self, raw_blocks):
'''Process the list of raw blocks passed. Detects and handles '''Process the list of raw blocks passed. Detects and handles
reorgs. reorgs.
@ -201,7 +217,14 @@ class BlockProcessor(electrumx.server.db.DB):
chain = [self.tip] + [self.coin.header_hash(h) for h in headers[:-1]] chain = [self.tip] + [self.coin.header_hash(h) for h in headers[:-1]]
if hprevs == chain: if hprevs == chain:
await self.run_in_thread_shielded(self.advance_blocks, blocks) start = time.time()
await self.run_in_thread_with_lock(self.advance_blocks, blocks)
await self._maybe_flush()
if not self.first_sync:
s = '' if len(blocks) == 1 else 's'
self.logger.info('processed {:,d} block{} in {:.1f}s'
.format(len(blocks), s,
time.time() - start))
if self._caught_up_event.is_set(): if self._caught_up_event.is_set():
await self.notifications.on_block(self.touched, self.height) await self.notifications.on_block(self.touched, self.height)
self.touched = set() self.touched = set()
@ -226,7 +249,7 @@ class BlockProcessor(electrumx.server.db.DB):
self.logger.info('chain reorg detected') self.logger.info('chain reorg detected')
else: else:
self.logger.info(f'faking a reorg of {count:,d} blocks') self.logger.info(f'faking a reorg of {count:,d} blocks')
await run_in_thread(self.flush, True) await self.flush(True)
async def get_raw_blocks(last_height, hex_hashes): async def get_raw_blocks(last_height, hex_hashes):
heights = range(last_height, last_height - len(hex_hashes), -1) heights = range(last_height, last_height - len(hex_hashes), -1)
@ -242,7 +265,8 @@ class BlockProcessor(electrumx.server.db.DB):
hashes = [hash_to_hex_str(hash) for hash in reversed(hashes)] hashes = [hash_to_hex_str(hash) for hash in reversed(hashes)]
for hex_hashes in chunks(hashes, 50): for hex_hashes in chunks(hashes, 50):
raw_blocks = await get_raw_blocks(last, hex_hashes) raw_blocks = await get_raw_blocks(last, hex_hashes)
await self.run_in_thread_shielded(self.backup_blocks, raw_blocks) await self.run_in_thread_with_lock(self.backup_blocks, raw_blocks)
await self.backup_flush()
last -= len(raw_blocks) last -= len(raw_blocks)
# Truncate header_mc: header count is 1 more than the height. # Truncate header_mc: header count is 1 more than the height.
self.header_mc.truncate(self.height + 1) self.header_mc.truncate(self.height + 1)
@ -312,14 +336,16 @@ class BlockProcessor(electrumx.server.db.DB):
assert not self.db_deletes assert not self.db_deletes
self.history.assert_flushed() self.history.assert_flushed()
def flush(self, flush_utxos=False): async def flush(self, flush_utxos):
'''Flush out cached state.
History is always flushed. UTXOs are flushed if flush_utxos.'''
if self.height == self.db_height: if self.height == self.db_height:
self.assert_flushed() self.assert_flushed()
return else:
await self.run_in_thread_with_lock(self._flush_body, flush_utxos)
def _flush_body(self, flush_utxos):
'''Flush out cached state.
History is always flushed. UTXOs are flushed if flush_utxos.'''
flush_start = time.time() flush_start = time.time()
last_flush = self.last_flush last_flush = self.last_flush
tx_diff = self.tx_count - self.last_flush_tx_count tx_diff = self.tx_count - self.last_flush_tx_count
@ -387,23 +413,25 @@ class BlockProcessor(electrumx.server.db.DB):
self.tx_hashes = [] self.tx_hashes = []
self.headers = [] self.headers = []
def backup_flush(self): async def backup_flush(self):
assert self.height < self.db_height
assert not self.headers
assert not self.tx_hashes
self.history.assert_flushed()
await self.run_in_thread_with_lock(self._backup_flush_body)
def _backup_flush_body(self):
'''Like flush() but when backing up. All UTXOs are flushed. '''Like flush() but when backing up. All UTXOs are flushed.
hashXs - sequence of hashXs which were touched by backing hashXs - sequence of hashXs which were touched by backing
up. Searched for history entries to remove after the backup up. Searched for history entries to remove after the backup
height. height.
''' '''
assert self.height < self.db_height
self.history.assert_flushed()
flush_start = time.time() flush_start = time.time()
# Backup FS (just move the pointers back) # Backup FS (just move the pointers back)
self.fs_height = self.height self.fs_height = self.height
self.fs_tx_count = self.tx_count self.fs_tx_count = self.tx_count
assert not self.headers
assert not self.tx_hashes
# Backup history. self.touched can include other addresses # Backup history. self.touched can include other addresses
# which is harmless, but remove None. # which is harmless, but remove None.
@ -445,14 +473,14 @@ class BlockProcessor(electrumx.server.db.DB):
# Flush history if it takes up over 20% of cache memory. # Flush history if it takes up over 20% of cache memory.
# Flush UTXOs once they take up 80% of cache memory. # Flush UTXOs once they take up 80% of cache memory.
if utxo_MB + hist_MB >= self.cache_MB or hist_MB >= self.cache_MB // 5: if utxo_MB + hist_MB >= self.cache_MB or hist_MB >= self.cache_MB // 5:
self.flush(utxo_MB >= self.cache_MB * 4 // 5) return utxo_MB >= self.cache_MB * 4 // 5
return None
def advance_blocks(self, blocks): def advance_blocks(self, blocks):
'''Synchronously advance the blocks. '''Synchronously advance the blocks.
It is already verified they correctly connect onto our tip. It is already verified they correctly connect onto our tip.
''' '''
start = time.time()
min_height = self.min_undo_height(self.daemon.cached_height()) min_height = self.min_undo_height(self.daemon.cached_height())
height = self.height height = self.height
@ -468,21 +496,6 @@ class BlockProcessor(electrumx.server.db.DB):
self.headers.extend(headers) self.headers.extend(headers)
self.tip = self.coin.header_hash(headers[-1]) self.tip = self.coin.header_hash(headers[-1])
# If caught up, flush everything as client queries are
# performed on the DB.
if self._caught_up_event.is_set():
self.flush(True)
else:
if time.time() > self.next_cache_check:
self.check_cache_size()
self.next_cache_check = time.time() + 30
if not self.first_sync:
s = '' if len(blocks) == 1 else 's'
self.logger.info('processed {:,d} block{} in {:.1f}s'
.format(len(blocks), s,
time.time() - start))
def advance_txs(self, txs): def advance_txs(self, txs):
self.tx_hashes.append(b''.join(tx_hash for tx, tx_hash in txs)) self.tx_hashes.append(b''.join(tx_hash for tx, tx_hash in txs))
@ -555,7 +568,6 @@ class BlockProcessor(electrumx.server.db.DB):
self.tx_counts.pop() self.tx_counts.pop()
self.logger.info('backed up to height {:,d}'.format(self.height)) self.logger.info('backed up to height {:,d}'.format(self.height))
self.backup_flush()
def backup_txs(self, txs): def backup_txs(self, txs):
# Prevout values, in order down the block (coinbase first if present) # Prevout values, in order down the block (coinbase first if present)
@ -756,7 +768,7 @@ class BlockProcessor(electrumx.server.db.DB):
# Flush everything but with first_sync->False state. # Flush everything but with first_sync->False state.
first_sync = self.first_sync first_sync = self.first_sync
self.first_sync = False self.first_sync = False
self.flush(True) await self.flush(True)
if first_sync: if first_sync:
self.logger.info(f'{electrumx.version} synced to ' self.logger.info(f'{electrumx.version} synced to '
f'height {self.height:,d}') f'height {self.height:,d}')
@ -808,10 +820,9 @@ class BlockProcessor(electrumx.server.db.DB):
await group.spawn(self.prefetcher.main_loop(self.height)) await group.spawn(self.prefetcher.main_loop(self.height))
await group.spawn(self._process_prefetched_blocks()) await group.spawn(self._process_prefetched_blocks())
finally: finally:
async with self.state_lock:
# Shut down block processing # Shut down block processing
self.logger.info('flushing to DB for a clean shutdown...') self.logger.info('flushing to DB for a clean shutdown...')
self.flush(True) await self.flush(True)
def force_chain_reorg(self, count): def force_chain_reorg(self, count):
'''Force a reorg of the given number of blocks. '''Force a reorg of the given number of blocks.

Loading…
Cancel
Save