From 5c63cd40dea23b844da77e9a813dd0cef7ae0dec Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Sat, 22 Oct 2016 10:49:56 +0900 Subject: [PATCH] Implement reorg logic --- query.py | 20 ++ server/block_processor.py | 372 +++++++++++++++++++++++++++++--------- server/cache.py | 108 ++++------- server/env.py | 1 + 4 files changed, 344 insertions(+), 157 deletions(-) diff --git a/query.py b/query.py index 60f3380..57941fa 100755 --- a/query.py +++ b/query.py @@ -11,11 +11,31 @@ from server.block_processor import BlockProcessor from lib.hash import hash_to_str +def count_entries(db): + utxos = 0 + for key in db.iterator(prefix=b'u', include_value=False): + utxos += 1 + print("UTXO count:", utxos) + + hash168 = 0 + for key in db.iterator(prefix=b'h', include_value=False): + hash168 += 1 + print("Hash168 count:", hash168) + + hist = 0 + for key in db.iterator(prefix=b'H', include_value=False): + hist += 1 + print("History addresses:", hist) + + def main(): env = Env() coin = env.coin os.chdir(env.db_dir) bp = BlockProcessor(env, None) + if len(sys.argv) == 1: + count_entries(bp.db) + return argc = 1 try: limit = int(sys.argv[argc]) diff --git a/server/block_processor.py b/server/block_processor.py index d2852d5..325e754 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -6,15 +6,17 @@ import ast import asyncio import struct import time +from bisect import bisect_left from collections import defaultdict, namedtuple from functools import partial import plyvel -from server.cache import FSCache, UTXOCache +from server.cache import FSCache, UTXOCache, NO_CACHE_ENTRY from server.daemon import DaemonError from lib.hash import hash_to_str -from lib.util import LoggedClass +from lib.script import ScriptPubKey +from lib.util import chunks, LoggedClass def formatted_time(t): @@ -124,24 +126,28 @@ class BlockProcessor(LoggedClass): self.next_cache_check = 0 self.last_flush = time.time() self.coin = env.coin + self.caught_up = False + self.reorg_limit = env.reorg_limit # Chain state (initialize to genesis in case of new DB) self.db_height = -1 self.db_tx_count = 0 + self.db_tip = b'\0' * 32 self.flush_count = 0 self.utxo_flush_count = 0 self.wall_time = 0 - self.tip = b'\0' * 32 # Open DB and metadata files. Record some of its state. self.db = self.open_db(self.coin) self.tx_count = self.db_tx_count self.height = self.db_height + self.tip = self.db_tip # Caches to be flushed later. Headers and tx_hashes have one # entry per block self.history = defaultdict(partial(array.array, 'I')) self.history_size = 0 + self.backup_hash168s = set() self.utxo_cache = UTXOCache(self, self.db, self.coin) self.fs_cache = FSCache(self.coin, self.height, self.tx_count) self.prefetcher = Prefetcher(daemon, self.height) @@ -157,13 +163,20 @@ class BlockProcessor(LoggedClass): self.tx_count, self.flush_count, self.utxo_flush_count, formatted_time(self.wall_time))) + self.logger.info('reorg limit of {:,d} blocks' + .format(self.reorg_limit)) self.logger.info('flushing UTXO cache at {:,d} MB' .format(self.utxo_MB)) self.logger.info('flushing history cache at {:,d} MB' .format(self.hist_MB)) - def coros(self): - return [self.start(), self.prefetcher.start()] + self.clean_db() + + def coros(self, force_backup=False): + if force_backup: + return [self.force_chain_reorg(True), self.prefetcher.start()] + else: + return [self.start(), self.prefetcher.start()] async def start(self): '''External entry point for block processing. @@ -178,30 +191,49 @@ class BlockProcessor(LoggedClass): async def advance_blocks(self): '''Loop forever processing blocks in the forward direction.''' - caught_up = False while True: blocks = await self.prefetcher.get_blocks() for block in blocks: if not self.advance_block(block): await self.handle_chain_reorg() - caught_up = False + self.caught_up = False break await asyncio.sleep(0) # Yield - if not caught_up and self.height == self.daemon.cached_height(): - caught_up = True + if self.height != self.daemon.cached_height(): + continue + + if not self.caught_up: + self.caught_up = True self.logger.info('caught up to height {:,d}' .format(self.height)) - async def handle_chain_reorg(self): - hashes = await self.reorg_hashes(self) - hex_hashes = [hash_to_str(hash) for hash in hashes] - blocks = await self.daemon.raw_blocks(hex_hashes) - for block in reversed(blocks): - self.backup_block(block) - await self.prefetcher.clear() + # Flush everything when in caught-up state as queries + # are performed on DB not in-memory + self.flush(True) - async def reorg_hashes(self): + async def force_chain_reorg(self, to_genesis): + try: + await self.handle_chain_reorg(to_genesis) + finally: + self.flush(True) + + async def handle_chain_reorg(self, to_genesis=False): + # First get all state on disk + self.logger.info('chain reorg detected') + self.flush(True) + self.logger.info('finding common height...') + hashes = await self.reorg_hashes(to_genesis) + # Reverse and convert to hex strings. + hashes = [hash_to_str(hash) for hash in reversed(hashes)] + for hex_hashes in chunks(hashes, 50): + blocks = await self.daemon.raw_blocks(hex_hashes) + self.backup_blocks(blocks) + self.logger.info('backed up to height {:,d}'.format(self.height)) + await self.prefetcher.clear(self.height) + self.logger.info('prefetcher reset') + + async def reorg_hashes(self, to_genesis): '''Return the list of hashes to back up beacuse of a reorg. The hashes are returned in order of increasing height.''' @@ -211,27 +243,26 @@ class BlockProcessor(LoggedClass): return n return -1 - self.logger.info('chain reorg detected; finding common height...') - start = self.height - 1 count = 1 - while True: + while start > 0: + self.logger.info('start: {:,d} count: {:,d}'.format(start, count)) hashes = self.fs_cache.block_hashes(start, count) + hex_hashes = [hash_to_str(hash) for hash in hashes] d_hex_hashes = await self.daemon.block_hex_hashes(start, count) - d_hashes = [bytes.fromhex(hex_hash) for hex_hash in d_hex_hashes] - n = match_pos(hashes, d_hashes) - if n >= 0: + n = match_pos(hex_hashes, d_hex_hashes) + if n >= 0 and not to_genesis: + start += n + 1 break - assert start > 0 count = min(count * 2, start) start -= count # Hashes differ from height 'start' - start += n + 1 count = (self.height - start) + 1 - self.logger.info('chain was reorganised for {:,d} blocks starting ' - 'at height {:,d}', start, count) + self.logger.info('chain was reorganised for {:,d} blocks from ' + 'height {:,d} to height {:,d}' + .format(count, start, start + count - 1)) return self.fs_cache.block_hashes(start, count) @@ -244,11 +275,9 @@ class BlockProcessor(LoggedClass): db = plyvel.DB(db_name, create_if_missing=True, error_if_exists=True, compression=None) self.logger.info('created new database {}'.format(db_name)) - self.flush_state(db) else: self.logger.info('successfully opened database {}'.format(db_name)) self.read_state(db) - self.delete_excess_history(db) return db @@ -261,37 +290,58 @@ class BlockProcessor(LoggedClass): self.coin.GENESIS_HASH)) self.db_height = state['height'] self.db_tx_count = state['tx_count'] - self.tip = state['tip'] + self.db_tip = state['tip'] self.flush_count = state['flush_count'] self.utxo_flush_count = state['utxo_flush_count'] self.wall_time = state['wall_time'] - def delete_excess_history(self, db): - '''Clear history flushed since the most recent UTXO flush.''' - utxo_flush_count = self.utxo_flush_count - diff = self.flush_count - utxo_flush_count - if diff == 0: - return - if diff < 0: + def clean_db(self): + '''Clean out stale DB items. + + Stale DB items are excess history flushed since the most + recent UTXO flush (only happens on unclean shutdown), and aged + undo information. + ''' + if self.flush_count < self.utxo_flush_count: raise ChainError('DB corrupt: flush_count < utxo_flush_count') + with self.db.write_batch(transaction=True) as batch: + if self.flush_count > self.utxo_flush_count: + self.logger.info('DB shut down uncleanly. Scanning for ' + 'excess history flushes...') + self.remove_excess_history(batch) + self.utxo_flush_count = self.flush_count + self.remove_stale_undo_items(batch) + self.flush_state(batch) - self.logger.info('DB not shut down cleanly. Scanning for most ' - 'recent {:,d} history flushes'.format(diff)) + def remove_excess_history(self, batch): prefix = b'H' unpack = struct.unpack keys = [] - for key, hist in db.iterator(prefix=prefix): + for key, hist in self.db.iterator(prefix=prefix): flush_id, = unpack('>H', key[-2:]) if flush_id > self.utxo_flush_count: keys.append(key) - self.logger.info('deleting {:,d} history entries'.format(len(keys))) - with db.write_batch(transaction=True) as batch: - for key in keys: - db.delete(key) - self.utxo_flush_count = self.flush_count - self.flush_state(batch) - self.logger.info('deletion complete') + self.logger.info('deleting {:,d} history entries' + .format(len(keys))) + for key in keys: + batch.delete(key) + + def remove_stale_undo_items(self, batch): + prefix = b'U' + unpack = struct.unpack + cutoff = self.db_height - self.reorg_limit + keys = [] + for key, hist in self.db.iterator(prefix=prefix): + height, = unpack('>I', key[-4:]) + if height > cutoff: + break + keys.append(key) + + self.logger.info('deleting {:,d} stale undo entries' + .format(len(keys))) + for key in keys: + batch.delete(key) def flush_state(self, batch): '''Flush chain state to the batch.''' @@ -302,7 +352,7 @@ class BlockProcessor(LoggedClass): 'genesis': self.coin.GENESIS_HASH, 'height': self.db_height, 'tx_count': self.db_tx_count, - 'tip': self.tip, + 'tip': self.db_tip, 'flush_count': self.flush_count, 'utxo_flush_count': self.utxo_flush_count, 'wall_time': self.wall_time, @@ -317,63 +367,83 @@ class BlockProcessor(LoggedClass): self.utxo_flush_count = self.flush_count self.db_tx_count = self.tx_count self.db_height = self.height + self.db_tip = self.tip + + def assert_flushed(self): + '''Asserts state is fully flushed.''' + assert self.tx_count == self.db_tx_count + assert not self.history + assert not self.utxo_cache.cache + assert not self.utxo_cache.db_cache + assert not self.backup_hash168s def flush(self, flush_utxos=False): '''Flush out cached state. History is always flushed. UTXOs are flushed if flush_utxos.''' + if self.height == self.db_height: + self.logger.info('nothing to flush') + self.assert_flushed() + return + flush_start = time.time() last_flush = self.last_flush + tx_diff = self.tx_count - self.db_tx_count # Write out the files to the FS before flushing to the DB. If # the DB transaction fails, the files being too long doesn't # matter. But if writing the files fails we do not want to # have updated the DB. - tx_diff = self.fs_cache.flush(self.height, self.tx_count) + if self.height > self.db_height: + self.fs_cache.flush(self.height, self.tx_count) with self.db.write_batch(transaction=True) as batch: # History first - fast and frees memory. Flush state last # as it reads the wall time. - self.flush_history(batch) + if self.height > self.db_height: + self.flush_history(batch) + else: + self.backup_history(batch) if flush_utxos: self.flush_utxos(batch) self.flush_state(batch) self.logger.info('committing transaction...') # Update and put the wall time again - otherwise we drop the - # time it took leveldb to commit the batch + # time it took to commit the batch self.flush_state(self.db) flush_time = int(self.last_flush - flush_start) - self.logger.info('flush #{:,d} to height {:,d} took {:,d}s' - .format(self.flush_count, self.height, flush_time)) - - # Log handy stats - daemon_height = self.daemon.cached_height() - txs_per_sec = int(self.tx_count / self.wall_time) - this_txs_per_sec = 1 + int(tx_diff / (self.last_flush - last_flush)) - if self.height > self.coin.TX_COUNT_HEIGHT: - tx_est = (daemon_height - self.height) * self.coin.TX_PER_BLOCK - else: - tx_est = ((daemon_height - self.coin.TX_COUNT_HEIGHT) - * self.coin.TX_PER_BLOCK - + (self.coin.TX_COUNT - self.tx_count)) - - self.logger.info('txs: {:,d} tx/sec since genesis: {:,d}, ' - 'since last flush: {:,d}' - .format(self.tx_count, txs_per_sec, this_txs_per_sec)) - self.logger.info('sync time: {} ETA: {}' - .format(formatted_time(self.wall_time), - formatted_time(tx_est / this_txs_per_sec))) + self.logger.info('flush #{:,d} to height {:,d} txs: {:,d} took {:,d}s' + .format(self.flush_count, self.height, self.tx_count, + flush_time)) + + # Catch-up stats + if not self.caught_up and tx_diff > 0: + daemon_height = self.daemon.cached_height() + txs_per_sec = int(self.tx_count / self.wall_time) + this_txs_per_sec = 1 + int(tx_diff / (self.last_flush - last_flush)) + if self.height > self.coin.TX_COUNT_HEIGHT: + tx_est = (daemon_height - self.height) * self.coin.TX_PER_BLOCK + else: + tx_est = ((daemon_height - self.coin.TX_COUNT_HEIGHT) + * self.coin.TX_PER_BLOCK + + (self.coin.TX_COUNT - self.tx_count)) + + self.logger.info('tx/sec since genesis: {:,d}, ' + 'since last flush: {:,d}' + .format(txs_per_sec, this_txs_per_sec)) + self.logger.info('sync time: {} ETA: {}' + .format(formatted_time(self.wall_time), + formatted_time(tx_est / this_txs_per_sec))) def flush_history(self, batch): self.logger.info('flushing history') - - # Drop any None entry - self.history.pop(None, None) + assert not self.backup_hash168s self.flush_count += 1 flush_id = struct.pack('>H', self.flush_count) + for hash168, hist in self.history.items(): key = b'H' + hash168 + flush_id batch.put(key, hist.tobytes()) @@ -384,6 +454,39 @@ class BlockProcessor(LoggedClass): self.history = defaultdict(partial(array.array, 'I')) self.history_size = 0 + def backup_history(self, batch): + self.logger.info('backing up history to height {:,d} tx_count {:,d}' + .format(self.height, self.tx_count)) + + # Drop any NO_CACHE entry + self.backup_hash168s.discard(NO_CACHE_ENTRY) + assert not self.history + + nremoves = 0 + for hash168 in sorted(self.backup_hash168s): + prefix = b'H' + hash168 + deletes = [] + puts = {} + for key, hist in self.db.iterator(reverse=True, prefix=prefix): + a = array.array('I') + a.frombytes(hist) + # Remove all history entries >= self.tx_count + idx = bisect_left(a, self.tx_count) + nremoves += len(a) - idx + if idx > 0: + puts[key] = a[:idx].tobytes() + break + deletes.append(key) + + for key in deletes: + batch.delete(key) + for key, value in puts.items(): + batch.put(key, value) + + self.logger.info('removed {:,d} history entries from {:,d} addresses' + .format(nremoves, len(self.backup_hash168s))) + self.backup_hash168s = set() + def cache_sizes(self): '''Returns the approximate size of the cache, in MB.''' # Good average estimates based on traversal of subobjects and @@ -400,15 +503,27 @@ class BlockProcessor(LoggedClass): self.logger.info('cache stats at height {:,d} daemon height: {:,d}' .format(self.height, self.daemon.cached_height())) self.logger.info(' entries: UTXO: {:,d} DB: {:,d} ' - 'hist addrs: {:,d} hist size: {:,d}' + 'hist addrs: {:,d} hist size {:,d}' .format(len(self.utxo_cache.cache), len(self.utxo_cache.db_cache), - len(self.history), - self.history_size)) + self.history_size, + len(self.history))) self.logger.info(' size: {:,d}MB (UTXOs {:,d}MB hist {:,d}MB)' .format(utxo_MB + hist_MB, utxo_MB, hist_MB)) return utxo_MB, hist_MB + def undo_key(self, height): + '''DB key for undo information at the given height.''' + return b'U' + struct.pack('>I', height) + + def write_undo_info(self, height, undo_info): + '''Write out undo information for the current height.''' + self.db.put(self.undo_key(height), undo_info) + + def read_undo_info(self, height): + '''Read undo information from a file for the current height.''' + return self.db.get(self.undo_key(height)) + def advance_block(self, block): # We must update the fs_cache before calling advance_txs() as # the UTXO cache uses the fs_cache via get_tx_hash() to @@ -421,7 +536,9 @@ class BlockProcessor(LoggedClass): self.tip = header_hash self.height += 1 - self.advance_txs(tx_hashes, txs) + undo_info = self.advance_txs(tx_hashes, txs) + if self.daemon.cached_height() - self.height <= self.reorg_limit: + self.write_undo_info(self.height, b''.join(undo_info)) # Check if we're getting full and time to flush? now = time.time() @@ -434,28 +551,105 @@ class BlockProcessor(LoggedClass): return True def advance_txs(self, tx_hashes, txs): - cache = self.utxo_cache + put_utxo = self.utxo_cache.put + spend_utxo = self.utxo_cache.spend + undo_info = [] + + # Use local vars for speed in the loops + history = self.history tx_num = self.tx_count + coin = self.coin + parse_script = ScriptPubKey.from_script + pack = struct.pack - for tx_hash, tx in zip(tx_hashes, txs): - # Add the outputs as new UTXOs; spend the inputs - hash168s = cache.add_many(tx_hash, tx_num, tx.outputs) + for tx, tx_hash in zip(txs, tx_hashes): + hash168s = set() + tx_numb = pack('= 0 + # Just update in-memory. It doesn't matter if disk files are + # too long, they will be overwritten when advancing. + self.height -= 1 + self.tx_counts.pop() def flush(self, new_height, new_tx_count): '''Flush the things stored on the filesystem. @@ -326,9 +299,10 @@ class FSCache(LoggedClass): txs_done = cur_tx_count - prior_tx_count assert self.height + blocks_done == new_height - assert cur_tx_count == new_tx_count assert len(self.tx_hashes) == blocks_done assert len(self.tx_counts) == new_height + 1 + assert cur_tx_count == new_tx_count, \ + 'cur: {:,d} new: {:,d}'.format(cur_tx_count, new_tx_count) # First the headers headers = b''.join(self.headers) @@ -364,8 +338,6 @@ class FSCache(LoggedClass): self.headers = [] self.height += blocks_done - return txs_done - def read_headers(self, height, count): read_count = min(count, self.height + 1 - height) diff --git a/server/env.py b/server/env.py index 5a73bc9..0bee0ad 100644 --- a/server/env.py +++ b/server/env.py @@ -25,6 +25,7 @@ class Env(LoggedClass): self.tcp_port = self.integer('TCP_PORT', None) self.ssl_port = self.integer('SSL_PORT', None) self.rpc_port = self.integer('RPC_PORT', 8000) + self.reorg_limit = self.integer('REORG_LIMIT', 200) self.daemon_url = self.build_daemon_url() self.max_subscriptions = self.integer('MAX_SUBSCRIPTIONS', 10000) self.banner_file = self.default('BANNER_FILE', None)