From 955a8e927d6c5cfa447d9ddcf2c660a9eae9c190 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Mon, 6 Aug 2018 22:23:41 +0900 Subject: [PATCH] Put flushing-to-DB in a thread - flush() and backup_flush() are now async --- electrumx/server/block_processor.py | 89 ++++++++++++++++------------- 1 file changed, 50 insertions(+), 39 deletions(-) diff --git a/electrumx/server/block_processor.py b/electrumx/server/block_processor.py index 08a72a6..18b697a 100644 --- a/electrumx/server/block_processor.py +++ b/electrumx/server/block_processor.py @@ -56,7 +56,7 @@ class Prefetcher(object): if not await self._prefetch_blocks(): await asyncio.sleep(5) except DaemonError as e: - self.logger.info('ignoring daemon error: {}'.format(e)) + self.logger.info(f'ignoring daemon error: {e}') def get_prefetched_blocks(self): '''Called by block processor when it is processing queued blocks.''' @@ -183,10 +183,26 @@ class BlockProcessor(electrumx.server.db.DB): # is consistent with self.height self.state_lock = asyncio.Lock() - async def run_in_thread_shielded(self, func, *args): + async def run_in_thread_with_lock(self, func, *args): + # Run in a thread to prevent blocking. Shielded so that + # cancellations from shutdown don't lose work - when the task + # completes the data will be flushed and then we shut down. + # Take the state lock to be certain in-memory state is + # consistent and not being updated elsewhere. async with self.state_lock: return await asyncio.shield(run_in_thread(func, *args)) + async def _maybe_flush(self): + # If caught up, flush everything as client queries are + # performed on the DB. + if self._caught_up_event.is_set(): + await self.flush(True) + elif time.time() > self.next_cache_check: + flush_arg = self.check_cache_size() + if flush_arg is not None: + await self.flush(flush_arg) + self.next_cache_check = time.time() + 30 + async def check_and_advance_blocks(self, raw_blocks): '''Process the list of raw blocks passed. Detects and handles reorgs. @@ -201,7 +217,14 @@ class BlockProcessor(electrumx.server.db.DB): chain = [self.tip] + [self.coin.header_hash(h) for h in headers[:-1]] if hprevs == chain: - await self.run_in_thread_shielded(self.advance_blocks, blocks) + start = time.time() + await self.run_in_thread_with_lock(self.advance_blocks, blocks) + await self._maybe_flush() + if not self.first_sync: + s = '' if len(blocks) == 1 else 's' + self.logger.info('processed {:,d} block{} in {:.1f}s' + .format(len(blocks), s, + time.time() - start)) if self._caught_up_event.is_set(): await self.notifications.on_block(self.touched, self.height) self.touched = set() @@ -226,7 +249,7 @@ class BlockProcessor(electrumx.server.db.DB): self.logger.info('chain reorg detected') else: self.logger.info(f'faking a reorg of {count:,d} blocks') - await run_in_thread(self.flush, True) + await self.flush(True) async def get_raw_blocks(last_height, hex_hashes): heights = range(last_height, last_height - len(hex_hashes), -1) @@ -242,7 +265,8 @@ class BlockProcessor(electrumx.server.db.DB): hashes = [hash_to_hex_str(hash) for hash in reversed(hashes)] for hex_hashes in chunks(hashes, 50): raw_blocks = await get_raw_blocks(last, hex_hashes) - await self.run_in_thread_shielded(self.backup_blocks, raw_blocks) + await self.run_in_thread_with_lock(self.backup_blocks, raw_blocks) + await self.backup_flush() last -= len(raw_blocks) # Truncate header_mc: header count is 1 more than the height. self.header_mc.truncate(self.height + 1) @@ -312,14 +336,16 @@ class BlockProcessor(electrumx.server.db.DB): assert not self.db_deletes self.history.assert_flushed() - def flush(self, flush_utxos=False): - '''Flush out cached state. - - History is always flushed. UTXOs are flushed if flush_utxos.''' + async def flush(self, flush_utxos): if self.height == self.db_height: self.assert_flushed() - return + else: + await self.run_in_thread_with_lock(self._flush_body, flush_utxos) + + def _flush_body(self, flush_utxos): + '''Flush out cached state. + History is always flushed. UTXOs are flushed if flush_utxos.''' flush_start = time.time() last_flush = self.last_flush tx_diff = self.tx_count - self.last_flush_tx_count @@ -387,23 +413,25 @@ class BlockProcessor(electrumx.server.db.DB): self.tx_hashes = [] self.headers = [] - def backup_flush(self): + async def backup_flush(self): + assert self.height < self.db_height + assert not self.headers + assert not self.tx_hashes + self.history.assert_flushed() + await self.run_in_thread_with_lock(self._backup_flush_body) + + def _backup_flush_body(self): '''Like flush() but when backing up. All UTXOs are flushed. hashXs - sequence of hashXs which were touched by backing up. Searched for history entries to remove after the backup height. ''' - assert self.height < self.db_height - self.history.assert_flushed() - flush_start = time.time() # Backup FS (just move the pointers back) self.fs_height = self.height self.fs_tx_count = self.tx_count - assert not self.headers - assert not self.tx_hashes # Backup history. self.touched can include other addresses # which is harmless, but remove None. @@ -445,14 +473,14 @@ class BlockProcessor(electrumx.server.db.DB): # Flush history if it takes up over 20% of cache memory. # Flush UTXOs once they take up 80% of cache memory. if utxo_MB + hist_MB >= self.cache_MB or hist_MB >= self.cache_MB // 5: - self.flush(utxo_MB >= self.cache_MB * 4 // 5) + return utxo_MB >= self.cache_MB * 4 // 5 + return None def advance_blocks(self, blocks): '''Synchronously advance the blocks. It is already verified they correctly connect onto our tip. ''' - start = time.time() min_height = self.min_undo_height(self.daemon.cached_height()) height = self.height @@ -468,21 +496,6 @@ class BlockProcessor(electrumx.server.db.DB): self.headers.extend(headers) self.tip = self.coin.header_hash(headers[-1]) - # If caught up, flush everything as client queries are - # performed on the DB. - if self._caught_up_event.is_set(): - self.flush(True) - else: - if time.time() > self.next_cache_check: - self.check_cache_size() - self.next_cache_check = time.time() + 30 - - if not self.first_sync: - s = '' if len(blocks) == 1 else 's' - self.logger.info('processed {:,d} block{} in {:.1f}s' - .format(len(blocks), s, - time.time() - start)) - def advance_txs(self, txs): self.tx_hashes.append(b''.join(tx_hash for tx, tx_hash in txs)) @@ -555,7 +568,6 @@ class BlockProcessor(electrumx.server.db.DB): self.tx_counts.pop() self.logger.info('backed up to height {:,d}'.format(self.height)) - self.backup_flush() def backup_txs(self, txs): # Prevout values, in order down the block (coinbase first if present) @@ -756,7 +768,7 @@ class BlockProcessor(electrumx.server.db.DB): # Flush everything but with first_sync->False state. first_sync = self.first_sync self.first_sync = False - self.flush(True) + await self.flush(True) if first_sync: self.logger.info(f'{electrumx.version} synced to ' f'height {self.height:,d}') @@ -808,10 +820,9 @@ class BlockProcessor(electrumx.server.db.DB): await group.spawn(self.prefetcher.main_loop(self.height)) await group.spawn(self._process_prefetched_blocks()) finally: - async with self.state_lock: - # Shut down block processing - self.logger.info('flushing to DB for a clean shutdown...') - self.flush(True) + # Shut down block processing + self.logger.info('flushing to DB for a clean shutdown...') + await self.flush(True) def force_chain_reorg(self, count): '''Force a reorg of the given number of blocks.