# Copyright (c) 2016, Neil Booth # # All rights reserved. # # See the file "LICENCE" for information about the copyright # and warranty status of this software. '''Block prefetcher and chain processor.''' import array import asyncio from struct import pack, unpack import time from bisect import bisect_left from collections import defaultdict from functools import partial from server.daemon import Daemon, DaemonError from server.version import VERSION from lib.hash import hash_to_str from lib.util import chunks, formatted_time, LoggedClass import server.db class Prefetcher(LoggedClass): '''Prefetches blocks (in the forward direction only).''' def __init__(self, coin, daemon, height): super().__init__() self.coin = coin self.daemon = daemon self.caught_up = False # Access to fetched_height should be protected by the semaphore self.fetched_height = height self.semaphore = asyncio.Semaphore() self.refill_event = asyncio.Event() # A cache queue of (blocks, size) pairs. The target cache # size has little effect on sync time. self.cache = asyncio.Queue() self.cache_size = 0 self.min_cache_size = 10 * 1024 * 1024 # This makes the first fetch be 10 blocks self.ave_size = self.min_cache_size // 10 async def clear(self, height): '''Clear prefetched blocks and restart from the given height. Used in blockchain reorganisations. This coroutine can be called asynchronously to the _prefetch coroutine so we must synchronize with a semaphore. Set height to -1 when shutting down to place a sentinel on the queue to tell the block processor to shut down too. ''' with await self.semaphore: while not self.cache.empty(): self.cache.get_nowait() self.cache_size = 0 if height == -1: self.cache.put_nowait((None, 0)) else: self.refill_event.set() self.fetched_height = height self.logger.info('reset to height'.format(height)) async def get_blocks(self): '''Return the next list of blocks from our prefetch cache. A return value of None indicates to shut down. Once caught up an entry is queued every few seconds synchronized with mempool refreshes to indicate a new mempool is available. Of course the list of blocks in such a case will normally be empty.''' blocks, size = await self.cache.get() self.cache_size -= size if self.cache_size < self.min_cache_size: self.refill_event.set() return blocks async def main_loop(self, caught_up_event): '''Loop forever polling for more blocks.''' daemon_height = await self.daemon.height() if self.fetched_height >= daemon_height: log_msg = 'caught up to daemon height {:,d}' else: log_msg = 'catching up to daemon height {:,d}...' self.logger.info(log_msg.format(daemon_height)) while True: try: with await self.semaphore: await self._prefetch_blocks(caught_up_event.is_set()) await self.refill_event.wait() except DaemonError as e: self.logger.info('ignoring daemon error: {}'.format(e)) except asyncio.CancelledError: await self.clear(-1) return async def _prefetch_blocks(self, mempool): '''Prefetch some blocks and put them on the queue. Repeats until the queue is full or caught up. If caught up, sleep for a period of time before returning. ''' daemon_height = await self.daemon.height(mempool) while self.cache_size < self.min_cache_size: # Try and catch up all blocks but limit to room in cache. # Constrain fetch count to between 0 and 2500 regardless. cache_room = self.min_cache_size // self.ave_size count = min(daemon_height - self.fetched_height, cache_room) count = min(2500, max(count, 0)) if not count: self.cache.put_nowait(([], 0)) self.caught_up = True await asyncio.sleep(5) return first = self.fetched_height + 1 hex_hashes = await self.daemon.block_hex_hashes(first, count) if self.caught_up: self.logger.info('new block height {:,d} hash {}' .format(first + count - 1, hex_hashes[-1])) blocks = await self.daemon.raw_blocks(hex_hashes) assert count == len(blocks) # Strip the unspendable genesis coinbase if first == 0: blocks[0] = blocks[0][:self.coin.HEADER_LEN] + bytes(1) # Update our recent average block size estimate size = sum(len(block) for block in blocks) if count >= 10: self.ave_size = size // count else: self.ave_size = (size + (10 - count) * self.ave_size) // 10 self.cache.put_nowait((blocks, size)) self.cache_size += size self.fetched_height += count self.refill_event.clear() class ChainError(Exception): '''Raised on error processing blocks.''' class ChainReorg(Exception): '''Raised on a blockchain reorganisation.''' class BlockProcessor(server.db.DB): '''Process blocks and update the DB state to match. Employ a prefetcher to prefetch blocks in batches for processing. Coordinate backing up in case of chain reorganisations. ''' def __init__(self, env): super().__init__(env) # These are our state as we move ahead of DB state self.fs_height = self.db_height self.fs_tx_count = self.db_tx_count self.height = self.db_height self.tip = self.db_tip self.tx_count = self.db_tx_count self.daemon = Daemon(self.coin.daemon_urls(env.daemon_url)) self.caught_up_event = asyncio.Event() # Meta self.utxo_MB = env.utxo_MB self.hist_MB = env.hist_MB self.next_cache_check = 0 # Headers and tx_hashes have one entry per block self.history = defaultdict(partial(array.array, 'I')) self.history_size = 0 self.prefetcher = Prefetcher(self.coin, self.daemon, self.height) self.last_flush = time.time() self.last_flush_tx_count = self.tx_count # Caches of unflushed items self.headers = [] self.tx_hashes = [] # UTXO cache self.utxo_cache = {} self.db_deletes = [] # Log state if self.first_sync: self.logger.info('flushing UTXO cache at {:,d} MB' .format(self.utxo_MB)) self.logger.info('flushing history cache at {:,d} MB' .format(self.hist_MB)) async def main_loop(self, touched): '''Main loop for block processing.''' # Simulate a reorg if requested if self.env.force_reorg > 0: self.logger.info('DEBUG: simulating reorg of {:,d} blocks' .format(self.env.force_reorg)) await self.handle_chain_reorg(set(), self.env.force_reorg) while True: blocks = await self.prefetcher.get_blocks() if blocks: await self.advance_blocks(blocks, touched) elif blocks is None: break # Shutdown else: self.caught_up() self.logger.info('flushing state to DB for a clean shutdown...') self.flush(True) self.logger.info('shutdown complete') async def advance_blocks(self, blocks, touched): '''Process the list of blocks passed. Detects and handles reorgs.''' def job(): for block in blocks: self.advance_block(block, touched) start = time.time() loop = asyncio.get_event_loop() try: await loop.run_in_executor(None, job) except ChainReorg: await self.handle_chain_reorg(touched) if self.caught_up_event.is_set(): # Flush everything as queries are performed on the DB and # not in-memory. self.flush(True) else: touched.clear() if time.time() > self.next_cache_check: self.check_cache_size() self.next_cache_check = time.time() + 60 if not self.first_sync: s = '' if len(blocks) == 1 else 's' self.logger.info('processed {:,d} block{} in {:.1f}s' .format(len(blocks), s, time.time() - start)) def caught_up(self): '''Called when first caught up after starting.''' if not self.caught_up_event.is_set(): self.flush(True) if self.first_sync: self.logger.info('{} synced to height {:,d}' .format(VERSION, self.height)) self.first_sync = False self.flush_state(self.db) self.reopen_db(False) self.caught_up_event.set() async def handle_chain_reorg(self, touched, count=None): '''Handle a chain reorganisation. Count is the number of blocks to simulate a reorg, or None for a real reorg.''' self.logger.info('chain reorg detected') self.flush(True) hashes = await self.reorg_hashes(count) # Reverse and convert to hex strings. hashes = [hash_to_str(hash) for hash in reversed(hashes)] for hex_hashes in chunks(hashes, 50): blocks = await self.daemon.raw_blocks(hex_hashes) self.backup_blocks(blocks, touched) await self.prefetcher.clear(self.height) async def reorg_hashes(self, count): '''Return the list of hashes to back up beacuse of a reorg. The hashes are returned in order of increasing height.''' def match_pos(hashes1, hashes2): for n, (hash1, hash2) in enumerate(zip(hashes1, hashes2)): if hash1 == hash2: return n return -1 if count is None: # A real reorg start = self.height - 1 count = 1 while start > 0: hashes = self.fs_block_hashes(start, count) hex_hashes = [hash_to_str(hash) for hash in hashes] d_hex_hashes = await self.daemon.block_hex_hashes(start, count) n = match_pos(hex_hashes, d_hex_hashes) if n >= 0: start += n + 1 break count = min(count * 2, start) start -= count count = (self.height - start) + 1 else: start = (self.height - count) + 1 self.logger.info('chain was reorganised: {:,d} blocks at ' 'heights {:,d}-{:,d} were replaced' .format(count, start, start + count - 1)) return self.fs_block_hashes(start, count) def flush_state(self, batch): '''Flush chain state to the batch.''' now = time.time() self.wall_time += now - self.last_flush self.last_flush = now self.last_flush_tx_count = self.tx_count self.write_state(batch) def assert_flushed(self): '''Asserts state is fully flushed.''' assert self.tx_count == self.fs_tx_count == self.db_tx_count assert self.height == self.fs_height == self.db_height assert not self.history assert not self.utxo_cache assert not self.db_deletes def flush(self, flush_utxos=False): '''Flush out cached state. History is always flushed. UTXOs are flushed if flush_utxos.''' if self.height == self.db_height: self.assert_flushed() return self.flush_count += 1 flush_start = time.time() last_flush = self.last_flush tx_diff = self.tx_count - self.last_flush_tx_count with self.db.write_batch() as batch: # History first - fast and frees memory. Flush state last # as it reads the wall time. self.flush_history(batch) if flush_utxos: self.flush_utxos(batch) self.flush_state(batch) # Update and put the wall time again - otherwise we drop the # time it took to commit the batch self.flush_state(self.db) self.logger.info('flush #{:,d} took {:.1f}s. Height {:,d} txs: {:,d}' .format(self.flush_count, self.last_flush - flush_start, self.height, self.tx_count)) # Catch-up stats if self.first_sync: daemon_height = self.daemon.cached_height() tx_per_sec = int(self.tx_count / self.wall_time) this_tx_per_sec = 1 + int(tx_diff / (self.last_flush - last_flush)) if self.height > self.coin.TX_COUNT_HEIGHT: tx_est = (daemon_height - self.height) * self.coin.TX_PER_BLOCK else: tx_est = ((daemon_height - self.coin.TX_COUNT_HEIGHT) * self.coin.TX_PER_BLOCK + (self.coin.TX_COUNT - self.tx_count)) # Damp the enthusiasm realism = 2.0 - 0.9 * self.height / self.coin.TX_COUNT_HEIGHT tx_est *= max(realism, 1.0) self.logger.info('tx/sec since genesis: {:,d}, ' 'since last flush: {:,d}' .format(tx_per_sec, this_tx_per_sec)) self.logger.info('sync time: {} ETA: {}' .format(formatted_time(self.wall_time), formatted_time(tx_est / this_tx_per_sec))) def flush_history(self, batch): fs_start = time.time() self.fs_flush() fs_end = time.time() flush_id = pack('>H', self.flush_count) for hash168, hist in self.history.items(): key = b'H' + hash168 + flush_id batch.put(key, hist.tobytes()) if self.first_sync: self.logger.info('flushed to FS in {:.1f}s, history in {:.1f}s ' 'for {:,d} addrs' .format(fs_end - fs_start, time.time() - fs_end, len(self.history))) self.history = defaultdict(partial(array.array, 'I')) self.history_size = 0 def fs_flush(self): '''Flush the things stored on the filesystem.''' assert self.fs_height + len(self.headers) == self.height assert self.tx_count == self.tx_counts[-1] if self.tx_counts else 0 self.fs_update(self.fs_height, self.headers, self.tx_hashes) self.fs_height = self.height self.fs_tx_count = self.tx_count self.tx_hashes = [] self.headers = [] def backup_flush(self, hash168s): '''Like flush() but when backing up. All UTXOs are flushed. hash168s - sequence of hash168s which were touched by backing up. Searched for history entries to remove after the backup height. ''' assert self.height < self.db_height assert not self.history self.flush_count += 1 flush_start = time.time() with self.db.write_batch() as batch: # Flush state last as it reads the wall time. self.backup_history(batch, hash168s) self.flush_utxos(batch) self.flush_state(batch) # Update and put the wall time again - otherwise we drop the # time it took to commit the batch self.flush_state(self.db) self.logger.info('backup flush #{:,d} took {:.1f}s. ' 'Height {:,d} txs: {:,d}' .format(self.flush_count, self.last_flush - flush_start, self.height, self.tx_count)) def backup_history(self, batch, hash168s): nremoves = 0 for hash168 in sorted(hash168s): prefix = b'H' + hash168 deletes = [] puts = {} for key, hist in self.db.iterator(prefix=prefix, reverse=True): a = array.array('I') a.frombytes(hist) # Remove all history entries >= self.tx_count idx = bisect_left(a, self.tx_count) nremoves += len(a) - idx if idx > 0: puts[key] = a[:idx].tobytes() break deletes.append(key) for key in deletes: batch.delete(key) for key, value in puts.items(): batch.put(key, value) self.fs_height = self.height self.fs_tx_count = self.tx_count assert not self.headers assert not self.tx_hashes self.logger.info('backing up removed {:,d} history entries from ' '{:,d} addresses'.format(nremoves, len(hash168s))) def check_cache_size(self): '''Flush a cache if it gets too big.''' # Good average estimates based on traversal of subobjects and # requesting size from Python (see deep_getsizeof). For # whatever reason Python O/S mem usage is typically +30% or # more, so we scale our already bloated object sizes. one_MB = int(1048576 / 1.3) utxo_cache_size = len(self.utxo_cache) * 187 db_deletes_size = len(self.db_deletes) * 61 hist_cache_size = len(self.history) * 180 + self.history_size * 4 tx_hash_size = (self.tx_count - self.fs_tx_count) * 74 utxo_MB = (db_deletes_size + utxo_cache_size) // one_MB hist_MB = (hist_cache_size + tx_hash_size) // one_MB self.logger.info('our height: {:,d} daemon: {:,d} ' 'UTXOs {:,d}MB hist {:,d}MB' .format(self.height, self.daemon.cached_height(), utxo_MB, hist_MB)) # Flush if a cache is too big if utxo_MB >= self.utxo_MB or hist_MB >= self.hist_MB: self.flush(utxo_MB >= self.utxo_MB) def fs_advance_block(self, header, tx_hashes, txs): '''Update unflushed FS state for a new block.''' prior_tx_count = self.tx_counts[-1] if self.tx_counts else 0 # Cache the new header, tx hashes and cumulative tx count self.headers.append(header) self.tx_hashes.append(tx_hashes) self.tx_counts.append(prior_tx_count + len(txs)) def advance_block(self, block, touched): header, tx_hashes, txs = self.coin.read_block(block) if self.tip != self.coin.header_prevhash(header): raise ChainReorg self.fs_advance_block(header, tx_hashes, txs) self.tip = self.coin.header_hash(header) self.height += 1 undo_info = self.advance_txs(tx_hashes, txs, touched) if self.daemon.cached_height() - self.height <= self.env.reorg_limit: self.write_undo_info(self.height, b''.join(undo_info)) def advance_txs(self, tx_hashes, txs, touched): undo_info = [] # Use local vars for speed in the loops history = self.history history_size = self.history_size tx_num = self.tx_count script_hash168 = self.coin.hash168_from_script() s_pack = pack put_utxo = self.utxo_cache.__setitem__ spend_utxo = self.spend_utxo undo_info_append = undo_info.append for tx, tx_hash in zip(txs, tx_hashes): hash168s = set() add_hash168 = hash168s.add tx_numb = s_pack('= 0 self.height -= 1 self.tx_counts.pop() self.logger.info('backed up to height {:,d}'.format(self.height)) # touched includes those passed into this function. That likely # has additional addresses which is harmless. Remove None. touched.discard(None) self.backup_flush(touched) def backup_txs(self, tx_hashes, txs, touched): # Prevout values, in order down the block (coinbase first if present) # undo_info is in reverse block order undo_info = self.read_undo_info(self.height) if undo_info is None: raise ChainError('no undo information found for height {:,d}' .format(self.height)) n = len(undo_info) # Use local vars for speed in the loops s_pack = pack put_utxo = self.utxo_cache.__setitem__ spend_utxo = self.spend_utxo script_hash168 = self.coin.hash168_from_script() rtxs = reversed(txs) rtx_hashes = reversed(tx_hashes) for tx_hash, tx in zip(rtx_hashes, rtxs): for idx, txout in enumerate(tx.outputs): # Spend the TX outputs. Be careful with unspendable # outputs - we didn't save those in the first place. hash168 = script_hash168(txout.pk_script) if hash168: cache_value = spend_utxo(tx_hash, idx) touched.add(cache_value[:21]) # Restore the inputs if not tx.is_coinbase: for txin in reversed(tx.inputs): n -= 33 undo_item = undo_info[n:n + 33] put_utxo(txin.prev_hash + s_pack(' 1: tx_num, = unpack('