diff --git a/server/block_processor.py b/server/block_processor.py index 583dad4..57264df 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -148,7 +148,6 @@ class BlockProcessor(server.db.DB): self.daemon = Daemon(self.coin.daemon_urls(env.daemon_url)) self.caught_up = False self.event = asyncio.Event() - self.touched = set() # Meta self.utxo_MB = env.utxo_MB @@ -189,7 +188,7 @@ class BlockProcessor(server.db.DB): if self.env.force_reorg > 0: self.logger.info('DEBUG: simulating reorg of {:,d} blocks' .format(self.env.force_reorg)) - await self.handle_chain_reorg(self.env.force_reorg) + await self.handle_chain_reorg(self.env.force_reorg, set()) while True: await self._wait_for_update() @@ -215,22 +214,22 @@ class BlockProcessor(server.db.DB): if self.height == -1: blocks[0] = blocks[0][:self.coin.HEADER_LEN] + bytes(1) + touched = set() try: for block in blocks: - self.advance_block(block, self.caught_up) + self.advance_block(block, touched) await asyncio.sleep(0) # Yield except ChainReorg: - await self.handle_chain_reorg(None) + await self.handle_chain_reorg(None, touched) if self.caught_up: # Flush everything as queries are performed on the DB and # not in-memory. self.flush(True) - self.client.notify(self.touched) + self.client.notify(touched) elif time.time() > self.next_cache_check: self.check_cache_size() self.next_cache_check = time.time() + 60 - self.touched = set() def first_caught_up(self): '''Called when first caught up after start, or after a reorg.''' @@ -242,7 +241,7 @@ class BlockProcessor(server.db.DB): self.flush(True) self.event.set() - async def handle_chain_reorg(self, count): + async def handle_chain_reorg(self, count, touched): '''Handle a chain reorganisation. Count is the number of blocks to simulate a reorg, or None for @@ -256,7 +255,7 @@ class BlockProcessor(server.db.DB): hashes = [hash_to_str(hash) for hash in reversed(hashes)] for hex_hashes in chunks(hashes, 50): blocks = await self.daemon.raw_blocks(hex_hashes) - self.backup_blocks(blocks) + self.backup_blocks(blocks, touched) await self.prefetcher.clear(self.height) self.logger.info('prefetcher reset') @@ -291,8 +290,8 @@ class BlockProcessor(server.db.DB): else: start = (self.height - count) + 1 - self.logger.info('chain was reorganised for {:,d} blocks over ' - 'heights {:,d}-{:,d} inclusive' + self.logger.info('chain was reorganised: {:,d} blocks at ' + 'heights {:,d}-{:,d} were replaced' .format(count, start, start + count - 1)) return self.fs_block_hashes(start, count) @@ -563,7 +562,7 @@ class BlockProcessor(server.db.DB): self.tx_hashes.append(tx_hashes) self.tx_counts.append(prior_tx_count + len(txs)) - def advance_block(self, block, update_touched): + def advance_block(self, block, touched): # We must update the FS cache before calling advance_txs() as # the UTXO cache uses the FS cache via get_tx_hash() to # resolve compressed key collisions @@ -571,7 +570,6 @@ class BlockProcessor(server.db.DB): if self.tip != self.coin.header_prevhash(header): raise ChainReorg - touched = set() self.fs_advance_block(header, tx_hashes, txs) self.tip = self.coin.header_hash(header) self.height += 1 @@ -579,9 +577,6 @@ class BlockProcessor(server.db.DB): if self.daemon.cached_height() - self.height <= self.reorg_limit: self.write_undo_info(self.height, b''.join(undo_info)) - if update_touched: - self.touched.update(touched) - def advance_txs(self, tx_hashes, txs, touched): put_utxo = self.utxo_cache.__setitem__ spend_utxo = self.spend_utxo @@ -623,7 +618,7 @@ class BlockProcessor(server.db.DB): return undo_info - def backup_blocks(self, blocks): + def backup_blocks(self, blocks, touched): '''Backup the blocks and flush. The blocks should be in order of decreasing height. @@ -632,7 +627,6 @@ class BlockProcessor(server.db.DB): self.logger.info('backing up {:,d} blocks'.format(len(blocks))) self.assert_flushed() - touched = set() for block in blocks: header, tx_hashes, txs = self.coin.read_block(block) header_hash = self.coin.header_hash(header) @@ -649,7 +643,8 @@ class BlockProcessor(server.db.DB): self.logger.info('backed up to height {:,d}'.format(self.height)) - self.touched.update(touched) + # touched includes those passed into this function. That will + # generally be empty but is harmless if not. flush_history = partial(self.backup_history, hash168s=touched) self.flush(True, flush_history=flush_history)