Browse Source

Update BlockProcessor for server changes

master
Neil Booth 8 years ago
parent
commit
51accf7dfe
  1. 117
      server/block_processor.py

117
server/block_processor.py

@ -149,7 +149,6 @@ class BlockProcessor(LoggedClass):
'''on_update is awaitable, and called only when caught up with the '''on_update is awaitable, and called only when caught up with the
daemon and a new block arrives or the mempool is updated. daemon and a new block arrives or the mempool is updated.
''' '''
super().__init__() super().__init__()
self.daemon = daemon self.daemon = daemon
@ -160,7 +159,6 @@ class BlockProcessor(LoggedClass):
self.hist_MB = env.hist_MB self.hist_MB = env.hist_MB
self.next_cache_check = 0 self.next_cache_check = 0
self.coin = env.coin self.coin = env.coin
self.have_caught_up = False
self.reorg_limit = env.reorg_limit self.reorg_limit = env.reorg_limit
# Chain state (initialize to genesis in case of new DB) # Chain state (initialize to genesis in case of new DB)
@ -182,7 +180,6 @@ class BlockProcessor(LoggedClass):
# entry per block # entry per block
self.history = defaultdict(partial(array.array, 'I')) self.history = defaultdict(partial(array.array, 'I'))
self.history_size = 0 self.history_size = 0
self.backup_hash168s = set()
self.utxo_cache = UTXOCache(self, self.db, self.coin) self.utxo_cache = UTXOCache(self, self.db, self.coin)
self.fs_cache = FSCache(self.coin, self.height, self.tx_count) self.fs_cache = FSCache(self.coin, self.height, self.tx_count)
self.prefetcher = Prefetcher(daemon, self.height) self.prefetcher = Prefetcher(daemon, self.height)
@ -190,8 +187,9 @@ class BlockProcessor(LoggedClass):
self.last_flush = time.time() self.last_flush = time.time()
self.last_flush_tx_count = self.tx_count self.last_flush_tx_count = self.tx_count
# Redirected member func # Redirected member funcs
self.get_tx_hash = self.fs_cache.get_tx_hash self.get_tx_hash = self.fs_cache.get_tx_hash
self.read_headers = self.fs_cache.read_headers
# Log state # Log state
self.logger.info('{}/{} height: {:,d} tx count: {:,d} ' self.logger.info('{}/{} height: {:,d} tx count: {:,d} '
@ -216,44 +214,45 @@ class BlockProcessor(LoggedClass):
else: else:
return [self.start(), self.prefetcher.start()] return [self.start(), self.prefetcher.start()]
async def caught_up(self):
'''Call when we catch up to the daemon's height.'''
# Flush everything when in caught-up state as queries
# are performed on DB and not in-memory.
self.flush(True)
if not self.have_caught_up:
self.have_caught_up = True
self.logger.info('caught up to height {:,d}'.format(self.height))
if self.on_update:
await self.on_update(self.height, set())
async def start(self): async def start(self):
'''External entry point for block processing. '''External entry point for block processing.
A simple wrapper that safely flushes the DB on clean Safely flushes the DB on clean shutdown.
shutdown.
''' '''
try: try:
# If we're caught up so the start servers immediately while True:
if self.height == await self.daemon.height(): await self._wait_for_update()
await self.caught_up() await asyncio.sleep(0) # Yield
await self.wait_for_blocks()
finally: finally:
self.flush(True) self.flush(True)
async def wait_for_blocks(self): async def _wait_for_update(self):
'''Loop forever processing blocks in the forward direction.''' '''Wait for the prefetcher to deliver blocks or a mempool update.
while True:
Blocks are only processed in the forward direction. The
prefetcher only provides a non-None mempool when caught up.
'''
all_touched = []
blocks, mempool = await self.prefetcher.get_blocks() blocks, mempool = await self.prefetcher.get_blocks()
for block in blocks: for block in blocks:
if not self.advance_block(block): touched = self.advance_block(block)
await self.handle_chain_reorg() if touched is None:
self.have_caught_up = False all_touched.append(await self.handle_chain_reorg())
mempool = None
break break
all_touched.append(touched)
await asyncio.sleep(0) # Yield await asyncio.sleep(0) # Yield
if self.height == self.daemon.cached_height(): if mempool is not None:
await self.caught_up() # Caught up to daemon height. Flush everything as queries
# are performed on the DB and not in-memory.
self.flush(True)
if self.first_sync:
self.first_sync = False
self.logger.info('synced to height {:,d}'.format(self.height))
if self.on_update:
await self.on_update(self.height, set.union(*all_touched))
async def force_chain_reorg(self, to_genesis): async def force_chain_reorg(self, to_genesis):
try: try:
@ -266,16 +265,21 @@ class BlockProcessor(LoggedClass):
self.logger.info('chain reorg detected') self.logger.info('chain reorg detected')
self.flush(True) self.flush(True)
self.logger.info('finding common height...') self.logger.info('finding common height...')
touched = set()
hashes = await self.reorg_hashes(to_genesis) hashes = await self.reorg_hashes(to_genesis)
# Reverse and convert to hex strings. # Reverse and convert to hex strings.
hashes = [hash_to_str(hash) for hash in reversed(hashes)] hashes = [hash_to_str(hash) for hash in reversed(hashes)]
for hex_hashes in chunks(hashes, 50): for hex_hashes in chunks(hashes, 50):
blocks = await self.daemon.raw_blocks(hex_hashes) blocks = await self.daemon.raw_blocks(hex_hashes)
self.backup_blocks(blocks) touched.update(self.backup_blocks(blocks))
self.logger.info('backed up to height {:,d}'.format(self.height)) self.logger.info('backed up to height {:,d}'.format(self.height))
await self.prefetcher.clear(self.height) await self.prefetcher.clear(self.height)
self.logger.info('prefetcher reset') self.logger.info('prefetcher reset')
return touched
async def reorg_hashes(self, to_genesis): async def reorg_hashes(self, to_genesis):
'''Return the list of hashes to back up beacuse of a reorg. '''Return the list of hashes to back up beacuse of a reorg.
@ -395,8 +399,6 @@ class BlockProcessor(LoggedClass):
def flush_state(self, batch): def flush_state(self, batch):
'''Flush chain state to the batch.''' '''Flush chain state to the batch.'''
if self.have_caught_up:
self.first_sync = False
now = time.time() now = time.time()
self.wall_time += now - self.last_flush self.wall_time += now - self.last_flush
self.last_flush = now self.last_flush = now
@ -429,14 +431,13 @@ class BlockProcessor(LoggedClass):
assert not self.history assert not self.history
assert not self.utxo_cache.cache assert not self.utxo_cache.cache
assert not self.utxo_cache.db_cache assert not self.utxo_cache.db_cache
assert not self.backup_hash168s
def flush(self, flush_utxos=False): def flush(self, flush_utxos=False, flush_history=None):
'''Flush out cached state. '''Flush out cached state.
History is always flushed. UTXOs are flushed if flush_utxos.''' History is always flushed. UTXOs are flushed if flush_utxos.'''
if self.height == self.db_height: if self.height == self.db_height:
self.logger.info('nothing to flush') assert flush_history is None
self.assert_flushed() self.assert_flushed()
return return
@ -450,15 +451,14 @@ class BlockProcessor(LoggedClass):
# matter. But if writing the files fails we do not want to # matter. But if writing the files fails we do not want to
# have updated the DB. # have updated the DB.
if self.height > self.db_height: if self.height > self.db_height:
assert flush_history is None
flush_history = self.flush_history
self.fs_cache.flush(self.height, self.tx_count) self.fs_cache.flush(self.height, self.tx_count)
with self.db.write_batch() as batch: with self.db.write_batch() as batch:
# History first - fast and frees memory. Flush state last # History first - fast and frees memory. Flush state last
# as it reads the wall time. # as it reads the wall time.
if self.height > self.db_height: flush_history(batch)
self.flush_history(batch)
else:
self.backup_history(batch)
if flush_utxos: if flush_utxos:
self.flush_utxos(batch) self.flush_utxos(batch)
self.flush_state(batch) self.flush_state(batch)
@ -494,7 +494,6 @@ class BlockProcessor(LoggedClass):
def flush_history(self, batch): def flush_history(self, batch):
self.logger.info('flushing history') self.logger.info('flushing history')
assert not self.backup_hash168s
self.flush_count += 1 self.flush_count += 1
flush_id = struct.pack('>H', self.flush_count) flush_id = struct.pack('>H', self.flush_count)
@ -509,16 +508,16 @@ class BlockProcessor(LoggedClass):
self.history = defaultdict(partial(array.array, 'I')) self.history = defaultdict(partial(array.array, 'I'))
self.history_size = 0 self.history_size = 0
def backup_history(self, batch): def backup_history(self, batch, hash168s):
self.logger.info('backing up history to height {:,d} tx_count {:,d}' self.logger.info('backing up history to height {:,d} tx_count {:,d}'
.format(self.height, self.tx_count)) .format(self.height, self.tx_count))
# Drop any NO_CACHE entry # Drop any NO_CACHE entry
self.backup_hash168s.discard(NO_CACHE_ENTRY) hash168s.discard(NO_CACHE_ENTRY)
assert not self.history assert not self.history
nremoves = 0 nremoves = 0
for hash168 in sorted(self.backup_hash168s): for hash168 in sorted(hash168s):
prefix = b'H' + hash168 prefix = b'H' + hash168
deletes = [] deletes = []
puts = {} puts = {}
@ -539,8 +538,7 @@ class BlockProcessor(LoggedClass):
batch.put(key, value) batch.put(key, value)
self.logger.info('removed {:,d} history entries from {:,d} addresses' self.logger.info('removed {:,d} history entries from {:,d} addresses'
.format(nremoves, len(self.backup_hash168s))) .format(nremoves, len(hash168s)))
self.backup_hash168s = set()
def cache_sizes(self): def cache_sizes(self):
'''Returns the approximate size of the cache, in MB.''' '''Returns the approximate size of the cache, in MB.'''
@ -587,11 +585,12 @@ class BlockProcessor(LoggedClass):
self.fs_cache.advance_block(header, tx_hashes, txs) self.fs_cache.advance_block(header, tx_hashes, txs)
prev_hash, header_hash = self.coin.header_hashes(header) prev_hash, header_hash = self.coin.header_hashes(header)
if prev_hash != self.tip: if prev_hash != self.tip:
return False return None
touched = set()
self.tip = header_hash self.tip = header_hash
self.height += 1 self.height += 1
undo_info = self.advance_txs(tx_hashes, txs) undo_info = self.advance_txs(tx_hashes, txs, touched)
if self.daemon.cached_height() - self.height <= self.reorg_limit: if self.daemon.cached_height() - self.height <= self.reorg_limit:
self.write_undo_info(self.height, b''.join(undo_info)) self.write_undo_info(self.height, b''.join(undo_info))
@ -603,9 +602,9 @@ class BlockProcessor(LoggedClass):
if utxo_MB >= self.utxo_MB or hist_MB >= self.hist_MB: if utxo_MB >= self.utxo_MB or hist_MB >= self.hist_MB:
self.flush(utxo_MB >= self.utxo_MB) self.flush(utxo_MB >= self.utxo_MB)
return True return touched
def advance_txs(self, tx_hashes, txs): def advance_txs(self, tx_hashes, txs, touched):
put_utxo = self.utxo_cache.put put_utxo = self.utxo_cache.put
spend_utxo = self.utxo_cache.spend spend_utxo = self.utxo_cache.spend
undo_info = [] undo_info = []
@ -642,6 +641,7 @@ class BlockProcessor(LoggedClass):
for hash168 in hash168s: for hash168 in hash168s:
history[hash168].append(tx_num) history[hash168].append(tx_num)
self.history_size += len(hash168s) self.history_size += len(hash168s)
touched.update(hash168s)
tx_num += 1 tx_num += 1
self.tx_count = tx_num self.tx_count = tx_num
@ -657,6 +657,7 @@ class BlockProcessor(LoggedClass):
self.logger.info('backing up {:,d} blocks'.format(len(blocks))) self.logger.info('backing up {:,d} blocks'.format(len(blocks)))
self.assert_flushed() self.assert_flushed()
touched = set()
for block in blocks: for block in blocks:
header, tx_hashes, txs = self.coin.read_block(block) header, tx_hashes, txs = self.coin.read_block(block)
prev_hash, header_hash = self.coin.header_hashes(header) prev_hash, header_hash = self.coin.header_hashes(header)
@ -665,15 +666,18 @@ class BlockProcessor(LoggedClass):
.format(hash_to_str(header_hash), .format(hash_to_str(header_hash),
hash_to_str(self.tip), self.height)) hash_to_str(self.tip), self.height))
self.backup_txs(tx_hashes, txs) self.backup_txs(tx_hashes, txs, touched)
self.fs_cache.backup_block() self.fs_cache.backup_block()
self.tip = prev_hash self.tip = prev_hash
self.height -= 1 self.height -= 1
self.logger.info('backed up to height {:,d}'.format(self.height)) self.logger.info('backed up to height {:,d}'.format(self.height))
self.flush(True)
def backup_txs(self, tx_hashes, txs): flush_history = partial(self.backup_history, hash168s=touched)
self.flush(True, flush_history=flush_history)
return touched
def backup_txs(self, tx_hashes, txs, touched):
# Prevout values, in order down the block (coinbase first if present) # Prevout values, in order down the block (coinbase first if present)
# undo_info is in reverse block order # undo_info is in reverse block order
undo_info = self.read_undo_info(self.height) undo_info = self.read_undo_info(self.height)
@ -683,7 +687,6 @@ class BlockProcessor(LoggedClass):
pack = struct.pack pack = struct.pack
put_utxo = self.utxo_cache.put put_utxo = self.utxo_cache.put
spend_utxo = self.utxo_cache.spend spend_utxo = self.utxo_cache.spend
hash168s = self.backup_hash168s
rtxs = reversed(txs) rtxs = reversed(txs)
rtx_hashes = reversed(tx_hashes) rtx_hashes = reversed(tx_hashes)
@ -692,7 +695,7 @@ class BlockProcessor(LoggedClass):
# Spend the outputs # Spend the outputs
for idx, txout in enumerate(tx.outputs): for idx, txout in enumerate(tx.outputs):
cache_value = spend_utxo(tx_hash, idx) cache_value = spend_utxo(tx_hash, idx)
hash168s.add(cache_value[:21]) touched.add(cache_value[:21])
# Restore the inputs # Restore the inputs
if not tx.is_coinbase: if not tx.is_coinbase:
@ -701,7 +704,7 @@ class BlockProcessor(LoggedClass):
undo_item = undo_info[n:n + 33] undo_item = undo_info[n:n + 33]
put_utxo(txin.prev_hash + pack('<H', txin.prev_idx), put_utxo(txin.prev_hash + pack('<H', txin.prev_idx),
undo_item) undo_item)
hash168s.add(undo_item[:21]) touched.add(undo_item[:21])
assert n == 0 assert n == 0
self.tx_count -= len(txs) self.tx_count -= len(txs)
@ -760,7 +763,3 @@ class BlockProcessor(LoggedClass):
'''Returns all the UTXOs for an address sorted by height and '''Returns all the UTXOs for an address sorted by height and
position in the block.''' position in the block.'''
return sorted(self.get_utxos(hash168, limit=None)) return sorted(self.get_utxos(hash168, limit=None))
def get_current_header(self):
'''Returns the current header as a dictionary.'''
return self.fs_cache.encode_header(self.height)

Loading…
Cancel
Save