|
|
@ -148,7 +148,6 @@ class BlockProcessor(server.db.DB): |
|
|
|
self.daemon = Daemon(self.coin.daemon_urls(env.daemon_url)) |
|
|
|
self.caught_up = False |
|
|
|
self.event = asyncio.Event() |
|
|
|
self.touched = set() |
|
|
|
|
|
|
|
# Meta |
|
|
|
self.utxo_MB = env.utxo_MB |
|
|
@ -189,7 +188,7 @@ class BlockProcessor(server.db.DB): |
|
|
|
if self.env.force_reorg > 0: |
|
|
|
self.logger.info('DEBUG: simulating reorg of {:,d} blocks' |
|
|
|
.format(self.env.force_reorg)) |
|
|
|
await self.handle_chain_reorg(self.env.force_reorg) |
|
|
|
await self.handle_chain_reorg(self.env.force_reorg, set()) |
|
|
|
|
|
|
|
while True: |
|
|
|
await self._wait_for_update() |
|
|
@ -215,22 +214,22 @@ class BlockProcessor(server.db.DB): |
|
|
|
if self.height == -1: |
|
|
|
blocks[0] = blocks[0][:self.coin.HEADER_LEN] + bytes(1) |
|
|
|
|
|
|
|
touched = set() |
|
|
|
try: |
|
|
|
for block in blocks: |
|
|
|
self.advance_block(block, self.caught_up) |
|
|
|
self.advance_block(block, touched) |
|
|
|
await asyncio.sleep(0) # Yield |
|
|
|
except ChainReorg: |
|
|
|
await self.handle_chain_reorg(None) |
|
|
|
await self.handle_chain_reorg(None, touched) |
|
|
|
|
|
|
|
if self.caught_up: |
|
|
|
# Flush everything as queries are performed on the DB and |
|
|
|
# not in-memory. |
|
|
|
self.flush(True) |
|
|
|
self.client.notify(self.touched) |
|
|
|
self.client.notify(touched) |
|
|
|
elif time.time() > self.next_cache_check: |
|
|
|
self.check_cache_size() |
|
|
|
self.next_cache_check = time.time() + 60 |
|
|
|
self.touched = set() |
|
|
|
|
|
|
|
def first_caught_up(self): |
|
|
|
'''Called when first caught up after start, or after a reorg.''' |
|
|
@ -242,7 +241,7 @@ class BlockProcessor(server.db.DB): |
|
|
|
self.flush(True) |
|
|
|
self.event.set() |
|
|
|
|
|
|
|
async def handle_chain_reorg(self, count): |
|
|
|
async def handle_chain_reorg(self, count, touched): |
|
|
|
'''Handle a chain reorganisation. |
|
|
|
|
|
|
|
Count is the number of blocks to simulate a reorg, or None for |
|
|
@ -256,7 +255,7 @@ class BlockProcessor(server.db.DB): |
|
|
|
hashes = [hash_to_str(hash) for hash in reversed(hashes)] |
|
|
|
for hex_hashes in chunks(hashes, 50): |
|
|
|
blocks = await self.daemon.raw_blocks(hex_hashes) |
|
|
|
self.backup_blocks(blocks) |
|
|
|
self.backup_blocks(blocks, touched) |
|
|
|
|
|
|
|
await self.prefetcher.clear(self.height) |
|
|
|
self.logger.info('prefetcher reset') |
|
|
@ -291,8 +290,8 @@ class BlockProcessor(server.db.DB): |
|
|
|
else: |
|
|
|
start = (self.height - count) + 1 |
|
|
|
|
|
|
|
self.logger.info('chain was reorganised for {:,d} blocks over ' |
|
|
|
'heights {:,d}-{:,d} inclusive' |
|
|
|
self.logger.info('chain was reorganised: {:,d} blocks at ' |
|
|
|
'heights {:,d}-{:,d} were replaced' |
|
|
|
.format(count, start, start + count - 1)) |
|
|
|
|
|
|
|
return self.fs_block_hashes(start, count) |
|
|
@ -563,7 +562,7 @@ class BlockProcessor(server.db.DB): |
|
|
|
self.tx_hashes.append(tx_hashes) |
|
|
|
self.tx_counts.append(prior_tx_count + len(txs)) |
|
|
|
|
|
|
|
def advance_block(self, block, update_touched): |
|
|
|
def advance_block(self, block, touched): |
|
|
|
# We must update the FS cache before calling advance_txs() as |
|
|
|
# the UTXO cache uses the FS cache via get_tx_hash() to |
|
|
|
# resolve compressed key collisions |
|
|
@ -571,7 +570,6 @@ class BlockProcessor(server.db.DB): |
|
|
|
if self.tip != self.coin.header_prevhash(header): |
|
|
|
raise ChainReorg |
|
|
|
|
|
|
|
touched = set() |
|
|
|
self.fs_advance_block(header, tx_hashes, txs) |
|
|
|
self.tip = self.coin.header_hash(header) |
|
|
|
self.height += 1 |
|
|
@ -579,9 +577,6 @@ class BlockProcessor(server.db.DB): |
|
|
|
if self.daemon.cached_height() - self.height <= self.reorg_limit: |
|
|
|
self.write_undo_info(self.height, b''.join(undo_info)) |
|
|
|
|
|
|
|
if update_touched: |
|
|
|
self.touched.update(touched) |
|
|
|
|
|
|
|
def advance_txs(self, tx_hashes, txs, touched): |
|
|
|
put_utxo = self.utxo_cache.__setitem__ |
|
|
|
spend_utxo = self.spend_utxo |
|
|
@ -623,7 +618,7 @@ class BlockProcessor(server.db.DB): |
|
|
|
|
|
|
|
return undo_info |
|
|
|
|
|
|
|
def backup_blocks(self, blocks): |
|
|
|
def backup_blocks(self, blocks, touched): |
|
|
|
'''Backup the blocks and flush. |
|
|
|
|
|
|
|
The blocks should be in order of decreasing height. |
|
|
@ -632,7 +627,6 @@ class BlockProcessor(server.db.DB): |
|
|
|
self.logger.info('backing up {:,d} blocks'.format(len(blocks))) |
|
|
|
self.assert_flushed() |
|
|
|
|
|
|
|
touched = set() |
|
|
|
for block in blocks: |
|
|
|
header, tx_hashes, txs = self.coin.read_block(block) |
|
|
|
header_hash = self.coin.header_hash(header) |
|
|
@ -649,7 +643,8 @@ class BlockProcessor(server.db.DB): |
|
|
|
|
|
|
|
self.logger.info('backed up to height {:,d}'.format(self.height)) |
|
|
|
|
|
|
|
self.touched.update(touched) |
|
|
|
# touched includes those passed into this function. That will |
|
|
|
# generally be empty but is harmless if not. |
|
|
|
flush_history = partial(self.backup_history, hash168s=touched) |
|
|
|
self.flush(True, flush_history=flush_history) |
|
|
|
|
|
|
|