From 94d1f7a651102fa9690cb42ea9f81f0f2a6834a5 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Wed, 18 Jul 2018 15:49:44 +0800 Subject: [PATCH] Save raw blocks to disk for reorg purposes We used to rely on the daemon being able to return the orphaned blocks, but some old daemon codebases are buggy and fail to do so. Fixes #258 #315 #479 --- electrumx/server/block_processor.py | 29 +++++++++++++++----- electrumx/server/db.py | 42 ++++++++++++++++++++++++++--- 2 files changed, 61 insertions(+), 10 deletions(-) diff --git a/electrumx/server/block_processor.py b/electrumx/server/block_processor.py index 55c6ef2..8d7e33f 100644 --- a/electrumx/server/block_processor.py +++ b/electrumx/server/block_processor.py @@ -310,23 +310,37 @@ class BlockProcessor(electrumx.server.db.DB): if count is None: self.logger.info('chain reorg detected') else: - self.logger.info('faking a reorg of {:,d} blocks'.format(count)) + self.logger.info(f'faking a reorg of {count:,d} blocks') await self.tasks.run_in_thread(self.flush, True) - hashes = await self.reorg_hashes(count) + async def get_raw_blocks(last_height, hex_hashes): + heights = range(last_height, last_height - len(hex_hashes), -1) + try: + blocks = [self.read_raw_block(height) for height in heights] + self.logger.info(f'read {len(blocks)} blocks from disk') + return blocks + except Exception: + return await self.daemon.raw_blocks(hex_hashes) + + start, hashes = await self.reorg_hashes(count) # Reverse and convert to hex strings. hashes = [hash_to_hex_str(hash) for hash in reversed(hashes)] + last = start + count - 1 for hex_hashes in chunks(hashes, 50): - blocks = await self.daemon.raw_blocks(hex_hashes) - await self.tasks.run_in_thread(self.backup_blocks, blocks) + raw_blocks = await get_raw_blocks(last, hex_hashes) + await self.tasks.run_in_thread(self.backup_blocks, raw_blocks) + last -= len(raw_blocks) # Truncate header_mc: header count is 1 more than the height self.header_mc.truncate(self.height + 1) await self.prefetcher.reset_height() async def reorg_hashes(self, count): - '''Return the list of hashes to back up beacuse of a reorg. + '''Return a pair (start, hashes) of blocks to back up during a + reorg. - The hashes are returned in order of increasing height.''' + The hashes are returned in order of increasing height. Start + is the height of the first hash. + ''' def diff_pos(hashes1, hashes2): '''Returns the index of the first difference in the hash lists. @@ -360,7 +374,7 @@ class BlockProcessor(electrumx.server.db.DB): 'heights {:,d}-{:,d}' .format(count, s, start, start + count - 1)) - return self.fs_block_hashes(start, count) + return start, self.fs_block_hashes(start, count) def flush_state(self, batch): '''Flush chain state to the batch.''' @@ -527,6 +541,7 @@ class BlockProcessor(electrumx.server.db.DB): undo_info = self.advance_txs(block.transactions) if height >= min_height: self.undo_infos.append((undo_info, height)) + self.write_raw_block(block.raw, height) headers = [block.header for block in blocks] self.height = height diff --git a/electrumx/server/db.py b/electrumx/server/db.py index 2dd234a..25be167 100644 --- a/electrumx/server/db.py +++ b/electrumx/server/db.py @@ -12,9 +12,10 @@ import array import ast import os -from struct import pack, unpack from bisect import bisect_right from collections import namedtuple +from glob import glob +from struct import pack, unpack import electrumx.lib.util as util from electrumx.lib.hash import hash_to_hex_str, HASHX_LEN @@ -265,6 +266,29 @@ class DB(object): for undo_info, height in undo_infos: batch_put(self.undo_key(height), b''.join(undo_info)) + def raw_block_prefix(self): + return 'meta/block' + + def raw_block_path(self, height): + return f'{self.raw_block_prefix()}{height:d}' + + def read_raw_block(self, height): + '''Returns a raw block read from disk. Raises FileNotFoundError + if the block isn't on-disk.''' + with util.open_file(self.raw_block_path(height)) as f: + return f.read(-1) + + def write_raw_block(self, block, height): + '''Write a raw block to disk.''' + with util.open_truncate(self.raw_block_path(height)) as f: + f.write(block) + # Delete old blocks to prevent them accumulating + try: + del_height = self.min_undo_height(height) - 1 + os.remove(self.raw_block_path(del_height)) + except FileNotFoundError: + pass + def clear_excess_undo_info(self): '''Clear excess undo info. Only most recent N are kept.''' prefix = b'U' @@ -280,8 +304,20 @@ class DB(object): with self.utxo_db.write_batch() as batch: for key in keys: batch.delete(key) - self.logger.info('deleted {:,d} stale undo entries' - .format(len(keys))) + self.logger.info(f'deleted {len(keys):,d} stale undo entries') + + # delete old block files + prefix = self.raw_block_prefix() + paths = [path for path in glob(f'{prefix}[0-9]*') + if len(path) > len(prefix) + and int(path[len(prefix):]) < min_height] + if paths: + for path in paths: + try: + os.remove(path) + except FileNotFoundError: + pass + self.logger.info(f'deleted {len(paths):,d} stale block files') # -- UTXO database