Browse Source

Cleaner handling of touched addresses

Moved to local vars rather than an instance variable
master
Neil Booth 8 years ago
parent
commit
df3cb8535a
  1. 31
      server/block_processor.py

31
server/block_processor.py

@ -148,7 +148,6 @@ class BlockProcessor(server.db.DB):
self.daemon = Daemon(self.coin.daemon_urls(env.daemon_url)) self.daemon = Daemon(self.coin.daemon_urls(env.daemon_url))
self.caught_up = False self.caught_up = False
self.event = asyncio.Event() self.event = asyncio.Event()
self.touched = set()
# Meta # Meta
self.utxo_MB = env.utxo_MB self.utxo_MB = env.utxo_MB
@ -189,7 +188,7 @@ class BlockProcessor(server.db.DB):
if self.env.force_reorg > 0: if self.env.force_reorg > 0:
self.logger.info('DEBUG: simulating reorg of {:,d} blocks' self.logger.info('DEBUG: simulating reorg of {:,d} blocks'
.format(self.env.force_reorg)) .format(self.env.force_reorg))
await self.handle_chain_reorg(self.env.force_reorg) await self.handle_chain_reorg(self.env.force_reorg, set())
while True: while True:
await self._wait_for_update() await self._wait_for_update()
@ -215,22 +214,22 @@ class BlockProcessor(server.db.DB):
if self.height == -1: if self.height == -1:
blocks[0] = blocks[0][:self.coin.HEADER_LEN] + bytes(1) blocks[0] = blocks[0][:self.coin.HEADER_LEN] + bytes(1)
touched = set()
try: try:
for block in blocks: for block in blocks:
self.advance_block(block, self.caught_up) self.advance_block(block, touched)
await asyncio.sleep(0) # Yield await asyncio.sleep(0) # Yield
except ChainReorg: except ChainReorg:
await self.handle_chain_reorg(None) await self.handle_chain_reorg(None, touched)
if self.caught_up: if self.caught_up:
# Flush everything as queries are performed on the DB and # Flush everything as queries are performed on the DB and
# not in-memory. # not in-memory.
self.flush(True) self.flush(True)
self.client.notify(self.touched) self.client.notify(touched)
elif time.time() > self.next_cache_check: elif time.time() > self.next_cache_check:
self.check_cache_size() self.check_cache_size()
self.next_cache_check = time.time() + 60 self.next_cache_check = time.time() + 60
self.touched = set()
def first_caught_up(self): def first_caught_up(self):
'''Called when first caught up after start, or after a reorg.''' '''Called when first caught up after start, or after a reorg.'''
@ -242,7 +241,7 @@ class BlockProcessor(server.db.DB):
self.flush(True) self.flush(True)
self.event.set() self.event.set()
async def handle_chain_reorg(self, count): async def handle_chain_reorg(self, count, touched):
'''Handle a chain reorganisation. '''Handle a chain reorganisation.
Count is the number of blocks to simulate a reorg, or None for Count is the number of blocks to simulate a reorg, or None for
@ -256,7 +255,7 @@ class BlockProcessor(server.db.DB):
hashes = [hash_to_str(hash) for hash in reversed(hashes)] hashes = [hash_to_str(hash) for hash in reversed(hashes)]
for hex_hashes in chunks(hashes, 50): for hex_hashes in chunks(hashes, 50):
blocks = await self.daemon.raw_blocks(hex_hashes) blocks = await self.daemon.raw_blocks(hex_hashes)
self.backup_blocks(blocks) self.backup_blocks(blocks, touched)
await self.prefetcher.clear(self.height) await self.prefetcher.clear(self.height)
self.logger.info('prefetcher reset') self.logger.info('prefetcher reset')
@ -291,8 +290,8 @@ class BlockProcessor(server.db.DB):
else: else:
start = (self.height - count) + 1 start = (self.height - count) + 1
self.logger.info('chain was reorganised for {:,d} blocks over ' self.logger.info('chain was reorganised: {:,d} blocks at '
'heights {:,d}-{:,d} inclusive' 'heights {:,d}-{:,d} were replaced'
.format(count, start, start + count - 1)) .format(count, start, start + count - 1))
return self.fs_block_hashes(start, count) return self.fs_block_hashes(start, count)
@ -563,7 +562,7 @@ class BlockProcessor(server.db.DB):
self.tx_hashes.append(tx_hashes) self.tx_hashes.append(tx_hashes)
self.tx_counts.append(prior_tx_count + len(txs)) self.tx_counts.append(prior_tx_count + len(txs))
def advance_block(self, block, update_touched): def advance_block(self, block, touched):
# We must update the FS cache before calling advance_txs() as # We must update the FS cache before calling advance_txs() as
# the UTXO cache uses the FS cache via get_tx_hash() to # the UTXO cache uses the FS cache via get_tx_hash() to
# resolve compressed key collisions # resolve compressed key collisions
@ -571,7 +570,6 @@ class BlockProcessor(server.db.DB):
if self.tip != self.coin.header_prevhash(header): if self.tip != self.coin.header_prevhash(header):
raise ChainReorg raise ChainReorg
touched = set()
self.fs_advance_block(header, tx_hashes, txs) self.fs_advance_block(header, tx_hashes, txs)
self.tip = self.coin.header_hash(header) self.tip = self.coin.header_hash(header)
self.height += 1 self.height += 1
@ -579,9 +577,6 @@ class BlockProcessor(server.db.DB):
if self.daemon.cached_height() - self.height <= self.reorg_limit: if self.daemon.cached_height() - self.height <= self.reorg_limit:
self.write_undo_info(self.height, b''.join(undo_info)) self.write_undo_info(self.height, b''.join(undo_info))
if update_touched:
self.touched.update(touched)
def advance_txs(self, tx_hashes, txs, touched): def advance_txs(self, tx_hashes, txs, touched):
put_utxo = self.utxo_cache.__setitem__ put_utxo = self.utxo_cache.__setitem__
spend_utxo = self.spend_utxo spend_utxo = self.spend_utxo
@ -623,7 +618,7 @@ class BlockProcessor(server.db.DB):
return undo_info return undo_info
def backup_blocks(self, blocks): def backup_blocks(self, blocks, touched):
'''Backup the blocks and flush. '''Backup the blocks and flush.
The blocks should be in order of decreasing height. The blocks should be in order of decreasing height.
@ -632,7 +627,6 @@ class BlockProcessor(server.db.DB):
self.logger.info('backing up {:,d} blocks'.format(len(blocks))) self.logger.info('backing up {:,d} blocks'.format(len(blocks)))
self.assert_flushed() self.assert_flushed()
touched = set()
for block in blocks: for block in blocks:
header, tx_hashes, txs = self.coin.read_block(block) header, tx_hashes, txs = self.coin.read_block(block)
header_hash = self.coin.header_hash(header) header_hash = self.coin.header_hash(header)
@ -649,7 +643,8 @@ class BlockProcessor(server.db.DB):
self.logger.info('backed up to height {:,d}'.format(self.height)) self.logger.info('backed up to height {:,d}'.format(self.height))
self.touched.update(touched) # touched includes those passed into this function. That will
# generally be empty but is harmless if not.
flush_history = partial(self.backup_history, hash168s=touched) flush_history = partial(self.backup_history, hash168s=touched)
self.flush(True, flush_history=flush_history) self.flush(True, flush_history=flush_history)

Loading…
Cancel
Save