# Copyright (c) 2016, Neil Booth # # All rights reserved. # # See the file "LICENCE" for information about the copyright # and warranty status of this software. '''Block prefetcher and chain processor.''' import array import asyncio import itertools import os from struct import pack, unpack import time from bisect import bisect_left from collections import defaultdict from functools import partial from server.daemon import Daemon, DaemonError from server.version import VERSION from lib.hash import hash_to_str from lib.util import chunks, formatted_time, LoggedClass import server.db from server.storage import open_db # Limits single address history to ~ 65536 * HIST_ENTRIES_PER_KEY entries HIST_ENTRIES_PER_KEY = 1024 HIST_VALUE_BYTES = HIST_ENTRIES_PER_KEY * 4 class ChainError(Exception): pass class Prefetcher(LoggedClass): '''Prefetches blocks (in the forward direction only).''' def __init__(self, daemon, height): super().__init__() self.daemon = daemon self.semaphore = asyncio.Semaphore() self.queue = asyncio.Queue() self.queue_size = 0 self.fetched_height = height self.mempool_hashes = [] # Target cache size. Has little effect on sync time. self.target_cache_size = 10 * 1024 * 1024 # First fetch to be 10 blocks self.ave_size = self.target_cache_size // 10 async def clear(self, height): '''Clear prefetched blocks and restart from the given height. Used in blockchain reorganisations. This coroutine can be called asynchronously to the _prefetch coroutine so we must synchronize. ''' with await self.semaphore: while not self.queue.empty(): self.queue.get_nowait() self.queue_size = 0 self.fetched_height = height async def get_blocks(self): '''Returns a list of prefetched blocks and the mempool.''' blocks, height, size = await self.queue.get() self.queue_size -= size if height == self.daemon.cached_height(): return blocks, self.mempool_hashes else: return blocks, None async def main_loop(self): '''Loop forever polling for more blocks.''' self.logger.info('catching up to daemon height {:,d}...' .format(await self.daemon.height())) while True: try: if await self._caught_up(): await asyncio.sleep(5) else: await asyncio.sleep(0) except DaemonError as e: self.logger.info('ignoring daemon error: {}'.format(e)) except asyncio.CancelledError: break async def _caught_up(self): '''Poll for new blocks and mempool state. Mempool is only queried if caught up with daemon.''' with await self.semaphore: blocks, size = await self._prefetch() self.fetched_height += len(blocks) caught_up = self.fetched_height == self.daemon.cached_height() if caught_up: self.mempool_hashes = await self.daemon.mempool_hashes() # Wake up block processor if we have something if blocks or caught_up: self.queue.put_nowait((blocks, self.fetched_height, size)) self.queue_size += size return caught_up async def _prefetch(self): '''Prefetch blocks unless the prefetch queue is full.''' if self.queue_size >= self.target_cache_size: return [], 0 caught_up = self.daemon.cached_height() == self.fetched_height daemon_height = await self.daemon.height() cache_room = self.target_cache_size // self.ave_size # Try and catch up all blocks but limit to room in cache. # Constrain count to between 0 and 4000 regardless count = min(daemon_height - self.fetched_height, cache_room) count = min(4000, max(count, 0)) if not count: return [], 0 first = self.fetched_height + 1 hex_hashes = await self.daemon.block_hex_hashes(first, count) if caught_up: self.logger.info('new block height {:,d} hash {}' .format(first + count - 1, hex_hashes[-1])) blocks = await self.daemon.raw_blocks(hex_hashes) size = sum(len(block) for block in blocks) # Update our recent average block size estimate if count >= 10: self.ave_size = size // count else: self.ave_size = (size + (10 - count) * self.ave_size) // 10 return blocks, size class ChainReorg(Exception): '''Raised on a blockchain reorganisation.''' class BlockProcessor(server.db.DB): '''Process blocks and update the DB state to match. Employ a prefetcher to prefetch blocks in batches for processing. Coordinate backing up in case of chain reorganisations. ''' def __init__(self, env): super().__init__(env) # These are our state as we move ahead of DB state self.fs_height = self.db_height self.fs_tx_count = self.db_tx_count self.height = self.db_height self.tip = self.db_tip self.tx_count = self.db_tx_count self.daemon = Daemon(env.daemon_url, env.debug) self.daemon.debug_set_height(self.height) self.touched = set() self.futures = [] # Meta self.utxo_MB = env.utxo_MB self.hist_MB = env.hist_MB self.next_cache_check = 0 self.reorg_limit = env.reorg_limit # Headers and tx_hashes have one entry per block self.history = defaultdict(partial(array.array, 'I')) self.history_size = 0 self.prefetcher = Prefetcher(self.daemon, self.height) self.last_flush = time.time() self.last_flush_tx_count = self.tx_count # Caches of unflushed items self.headers = [] self.tx_hashes = [] # UTXO cache self.utxo_cache = {} self.db_deletes = [] # Log state self.logger.info('reorg limit is {:,d} blocks' .format(self.reorg_limit)) if self.first_sync: self.logger.info('flushing UTXO cache at {:,d} MB' .format(self.utxo_MB)) self.logger.info('flushing history cache at {:,d} MB' .format(self.hist_MB)) self.clean_db() async def main_loop(self): '''Main loop for block processing. Safely flushes the DB on clean shutdown. ''' self.futures.append(asyncio.ensure_future(self.prefetcher.main_loop())) try: while True: await self._wait_for_update() except asyncio.CancelledError: self.on_cancel() await self.wait_shutdown() def on_cancel(self): '''Called when the main loop is cancelled. Intended to be overridden in derived classes.''' for future in self.futures: future.cancel() self.flush(True) async def wait_shutdown(self): '''Wait for shutdown to complete cleanly, and return.''' await asyncio.sleep(0) async def _wait_for_update(self): '''Wait for the prefetcher to deliver blocks or a mempool update. Blocks are only processed in the forward direction. The prefetcher only provides a non-None mempool when caught up. ''' blocks, mempool_hashes = await self.prefetcher.get_blocks() '''Strip the unspendable genesis coinbase.''' if self.height == -1: blocks[0] = blocks[0][:self.coin.HEADER_LEN] + bytes(1) caught_up = mempool_hashes is not None try: for block in blocks: self.advance_block(block, caught_up) if not caught_up and time.time() > self.next_cache_check: self.check_cache_size() self.next_cache_check = time.time() + 60 await asyncio.sleep(0) # Yield if caught_up: await self.caught_up(mempool_hashes) self.touched = set() except ChainReorg: await self.handle_chain_reorg() async def caught_up(self, mempool_hashes): '''Called after each deamon poll if caught up.''' # Caught up to daemon height. Flush everything as queries # are performed on the DB and not in-memory. if self.first_sync: self.first_sync = False self.logger.info('{} synced to height {:,d}. DB version:' .format(VERSION, self.height, self.db_version)) self.flush(True) async def handle_chain_reorg(self): # First get all state on disk self.logger.info('chain reorg detected') self.flush(True) self.logger.info('finding common height...') hashes = await self.reorg_hashes() # Reverse and convert to hex strings. hashes = [hash_to_str(hash) for hash in reversed(hashes)] for hex_hashes in chunks(hashes, 50): blocks = await self.daemon.raw_blocks(hex_hashes) self.backup_blocks(blocks) self.logger.info('backed up to height {:,d}'.format(self.height)) await self.prefetcher.clear(self.height) self.logger.info('prefetcher reset') async def reorg_hashes(self): '''Return the list of hashes to back up beacuse of a reorg. The hashes are returned in order of increasing height.''' def match_pos(hashes1, hashes2): for n, (hash1, hash2) in enumerate(zip(hashes1, hashes2)): if hash1 == hash2: return n return -1 start = self.height - 1 count = 1 while start > 0: hashes = self.fs_block_hashes(start, count) hex_hashes = [hash_to_str(hash) for hash in hashes] d_hex_hashes = await self.daemon.block_hex_hashes(start, count) n = match_pos(hex_hashes, d_hex_hashes) if n >= 0: start += n + 1 break count = min(count * 2, start) start -= count # Hashes differ from height 'start' count = (self.height - start) + 1 self.logger.info('chain was reorganised for {:,d} blocks from ' 'height {:,d} to height {:,d}' .format(count, start, start + count - 1)) return self.fs_block_hashes(start, count) def clean_db(self): '''Clean out stale DB items. Stale DB items are excess history flushed since the most recent UTXO flush (only happens on unclean shutdown), and aged undo information. ''' if self.flush_count < self.utxo_flush_count: raise ChainError('DB corrupt: flush_count < utxo_flush_count') with self.db.write_batch() as batch: if self.flush_count > self.utxo_flush_count: self.logger.info('DB shut down uncleanly. Scanning for ' 'excess history flushes...') self.remove_excess_history(batch) self.utxo_flush_count = self.flush_count self.remove_stale_undo_items(batch) self.flush_state(batch) def remove_excess_history(self, batch): prefix = b'H' keys = [] for key, hist in self.db.iterator(prefix=prefix): flush_id, = unpack('>H', key[-2:]) if flush_id > self.utxo_flush_count: keys.append(key) self.logger.info('deleting {:,d} history entries' .format(len(keys))) for key in keys: batch.delete(key) def remove_stale_undo_items(self, batch): prefix = b'U' cutoff = self.db_height - self.reorg_limit keys = [] for key, hist in self.db.iterator(prefix=prefix): height, = unpack('>I', key[-4:]) if height > cutoff: break keys.append(key) self.logger.info('deleting {:,d} stale undo entries' .format(len(keys))) for key in keys: batch.delete(key) def flush_state(self, batch): '''Flush chain state to the batch.''' now = time.time() self.wall_time += now - self.last_flush self.last_flush = now self.last_flush_tx_count = self.tx_count self.write_state(batch) def assert_flushed(self): '''Asserts state is fully flushed.''' assert self.tx_count == self.fs_tx_count == self.db_tx_count assert self.height == self.fs_height == self.db_height assert not self.history assert not self.utxo_cache assert not self.db_deletes def flush(self, flush_utxos=False, flush_history=None): '''Flush out cached state. History is always flushed. UTXOs are flushed if flush_utxos.''' if self.height == self.db_height: assert flush_history is None self.assert_flushed() return self.flush_count += 1 flush_start = time.time() last_flush = self.last_flush tx_diff = self.tx_count - self.last_flush_tx_count if self.height > self.db_height: assert flush_history is None flush_history = self.flush_history with self.db.write_batch() as batch: # History first - fast and frees memory. Flush state last # as it reads the wall time. flush_history(batch) if flush_utxos: self.flush_utxos(batch) self.flush_state(batch) # Update and put the wall time again - otherwise we drop the # time it took to commit the batch self.flush_state(self.db) self.logger.info('flush #{:,d} took {:.1f}s. Height {:,d} txs: {:,d}' .format(self.flush_count, self.last_flush - flush_start, self.height, self.tx_count)) # Catch-up stats if self.first_sync: daemon_height = self.daemon.cached_height() tx_per_sec = int(self.tx_count / self.wall_time) this_tx_per_sec = 1 + int(tx_diff / (self.last_flush - last_flush)) if self.height > self.coin.TX_COUNT_HEIGHT: tx_est = (daemon_height - self.height) * self.coin.TX_PER_BLOCK else: tx_est = ((daemon_height - self.coin.TX_COUNT_HEIGHT) * self.coin.TX_PER_BLOCK + (self.coin.TX_COUNT - self.tx_count)) # Damp the enthusiasm realism = 2.0 - 0.9 * self.height / self.coin.TX_COUNT_HEIGHT tx_est *= max(realism, 1.0) self.logger.info('tx/sec since genesis: {:,d}, ' 'since last flush: {:,d}' .format(tx_per_sec, this_tx_per_sec)) self.logger.info('sync time: {} ETA: {}' .format(formatted_time(self.wall_time), formatted_time(tx_est / this_tx_per_sec))) def flush_history(self, batch): fs_start = time.time() self.fs_flush() fs_end = time.time() flush_id = pack('>H', self.flush_count) for hash168, hist in self.history.items(): key = b'H' + hash168 + flush_id batch.put(key, hist.tobytes()) if self.first_sync: self.logger.info('flushed to FS in {:.1f}s, history in {:.1f}s ' 'for {:,d} addrs' .format(fs_end - fs_start, time.time() - fs_end, len(self.history))) self.history = defaultdict(partial(array.array, 'I')) self.history_size = 0 def fs_flush(self): '''Flush the things stored on the filesystem.''' blocks_done = len(self.headers) prior_tx_count = (self.tx_counts[self.fs_height] if self.fs_height >= 0 else 0) cur_tx_count = self.tx_counts[-1] if self.tx_counts else 0 txs_done = cur_tx_count - prior_tx_count assert self.fs_height + blocks_done == self.height assert len(self.tx_hashes) == blocks_done assert len(self.tx_counts) == self.height + 1 assert cur_tx_count == self.tx_count, \ 'cur: {:,d} new: {:,d}'.format(cur_tx_count, self.tx_count) # First the headers headers = b''.join(self.headers) header_len = self.coin.HEADER_LEN self.headers_file.seek((self.fs_height + 1) * header_len) self.headers_file.write(headers) self.headers_file.flush() # Then the tx counts self.txcount_file.seek((self.fs_height + 1) * self.tx_counts.itemsize) self.txcount_file.write(self.tx_counts[self.fs_height + 1:]) self.txcount_file.flush() # Finally the hashes hashes = memoryview(b''.join(itertools.chain(*self.tx_hashes))) assert len(hashes) % 32 == 0 assert len(hashes) // 32 == txs_done cursor = 0 file_pos = prior_tx_count * 32 while cursor < len(hashes): file_num, offset = divmod(file_pos, self.tx_hash_file_size) size = min(len(hashes) - cursor, self.tx_hash_file_size - offset) filename = 'hashes{:04d}'.format(file_num) with self.open_file(filename, create=True) as f: f.seek(offset) f.write(hashes[cursor:cursor + size]) cursor += size file_pos += size os.sync() self.fs_height = self.height self.fs_tx_count = self.tx_count self.tx_hashes = [] self.headers = [] def backup_history(self, batch, hash168s): self.logger.info('backing up history to height {:,d} tx_count {:,d}' .format(self.height, self.tx_count)) assert not self.history nremoves = 0 for hash168 in sorted(hash168s): prefix = b'H' + hash168 deletes = [] puts = {} for key, hist in self.db.iterator(prefix=prefix, reverse=True): a = array.array('I') a.frombytes(hist) # Remove all history entries >= self.tx_count idx = bisect_left(a, self.tx_count) nremoves += len(a) - idx if idx > 0: puts[key] = a[:idx].tobytes() break deletes.append(key) for key in deletes: batch.delete(key) for key, value in puts.items(): batch.put(key, value) self.logger.info('removed {:,d} history entries from {:,d} addresses' .format(nremoves, len(hash168s))) def check_cache_size(self): '''Flush a cache if it gets too big.''' # Good average estimates based on traversal of subobjects and # requesting size from Python (see deep_getsizeof). For # whatever reason Python O/S mem usage is typically +30% or # more, so we scale our already bloated object sizes. one_MB = int(1048576 / 1.3) utxo_cache_size = len(self.utxo_cache) * 187 db_deletes_size = len(self.db_deletes) * 61 hist_cache_size = len(self.history) * 180 + self.history_size * 4 tx_hash_size = (self.tx_count - self.fs_tx_count) * 74 utxo_MB = (db_deletes_size + utxo_cache_size) // one_MB hist_MB = (hist_cache_size + tx_hash_size) // one_MB self.logger.info('our height: {:,d} daemon: {:,d} ' 'UTXOs {:,d}MB hist {:,d}MB' .format(self.height, self.daemon.cached_height(), utxo_MB, hist_MB)) # Flush if a cache is too big if utxo_MB >= self.utxo_MB or hist_MB >= self.hist_MB: self.flush(utxo_MB >= self.utxo_MB) def undo_key(self, height): '''DB key for undo information at the given height.''' return b'U' + pack('>I', height) def write_undo_info(self, height, undo_info): '''Write out undo information for the current height.''' self.db.put(self.undo_key(height), undo_info) def read_undo_info(self, height): '''Read undo information from a file for the current height.''' return self.db.get(self.undo_key(height)) def fs_advance_block(self, header, tx_hashes, txs): '''Update unflushed FS state for a new block.''' prior_tx_count = self.tx_counts[-1] if self.tx_counts else 0 # Cache the new header, tx hashes and cumulative tx count self.headers.append(header) self.tx_hashes.append(tx_hashes) self.tx_counts.append(prior_tx_count + len(txs)) def advance_block(self, block, update_touched): # We must update the FS cache before calling advance_txs() as # the UTXO cache uses the FS cache via get_tx_hash() to # resolve compressed key collisions header, tx_hashes, txs = self.coin.read_block(block) prev_hash, header_hash = self.coin.header_hashes(header) if prev_hash != self.tip: raise ChainReorg touched = set() self.fs_advance_block(header, tx_hashes, txs) self.tip = header_hash self.height += 1 undo_info = self.advance_txs(tx_hashes, txs, touched) if self.daemon.cached_height() - self.height <= self.reorg_limit: self.write_undo_info(self.height, b''.join(undo_info)) if update_touched: self.touched.update(touched) def advance_txs(self, tx_hashes, txs, touched): put_utxo = self.utxo_cache.__setitem__ spend_utxo = self.spend_utxo undo_info = [] # Use local vars for speed in the loops history = self.history tx_num = self.tx_count script_hash168 = self.coin.hash168_from_script() s_pack = pack for tx, tx_hash in zip(txs, tx_hashes): hash168s = set() tx_numb = s_pack('= 0 self.height -= 1 assert not self.headers assert not self.tx_hashes self.logger.info('backed up to height {:,d}'.format(self.height)) self.touched.update(touched) flush_history = partial(self.backup_history, hash168s=touched) self.flush(True, flush_history=flush_history) def backup_txs(self, tx_hashes, txs, touched): # Prevout values, in order down the block (coinbase first if present) # undo_info is in reverse block order undo_info = self.read_undo_info(self.height) n = len(undo_info) # Use local vars for speed in the loops s_pack = pack put_utxo = self.utxo_cache.__setitem__ spend_utxo = self.spend_utxo rtxs = reversed(txs) rtx_hashes = reversed(tx_hashes) for tx_hash, tx in zip(rtx_hashes, rtxs): # Spend the outputs for idx, txout in enumerate(tx.outputs): cache_value = spend_utxo(tx_hash, idx) touched.add(cache_value[:21]) # Restore the inputs if not tx.is_coinbase: for txin in reversed(tx.inputs): n -= 33 undo_item = undo_info[n:n + 33] put_utxo(txin.prev_hash + s_pack(' 1: tx_num, = unpack('= 0 and start + count <= len(self.headers)): raise ChainError('{:,d} headers starting at {:,d} not on disk' .format(count, start)) result += b''.join(self.headers[start: start + count]) return result def get_tx_hash(self, tx_num): '''Returns the tx_hash and height of a tx number.''' tx_hash, tx_height = self.fs_tx_hash(tx_num) # Is this unflushed? if tx_hash is None: tx_hashes = self.tx_hashes[tx_height - (self.fs_height + 1)] tx_hash = tx_hashes[tx_num - self.tx_counts[tx_height - 1]] return tx_hash, tx_height