Browse Source

Save raw blocks to disk for reorg purposes

We used to rely on the daemon being able to return the
orphaned blocks, but some old daemon codebases are buggy
and fail to do so.

Fixes #258 #315 #479
patch-2
Neil Booth 7 years ago
parent
commit
94d1f7a651
  1. 29
      electrumx/server/block_processor.py
  2. 42
      electrumx/server/db.py

29
electrumx/server/block_processor.py

@ -310,23 +310,37 @@ class BlockProcessor(electrumx.server.db.DB):
if count is None:
self.logger.info('chain reorg detected')
else:
self.logger.info('faking a reorg of {:,d} blocks'.format(count))
self.logger.info(f'faking a reorg of {count:,d} blocks')
await self.tasks.run_in_thread(self.flush, True)
hashes = await self.reorg_hashes(count)
async def get_raw_blocks(last_height, hex_hashes):
heights = range(last_height, last_height - len(hex_hashes), -1)
try:
blocks = [self.read_raw_block(height) for height in heights]
self.logger.info(f'read {len(blocks)} blocks from disk')
return blocks
except Exception:
return await self.daemon.raw_blocks(hex_hashes)
start, hashes = await self.reorg_hashes(count)
# Reverse and convert to hex strings.
hashes = [hash_to_hex_str(hash) for hash in reversed(hashes)]
last = start + count - 1
for hex_hashes in chunks(hashes, 50):
blocks = await self.daemon.raw_blocks(hex_hashes)
await self.tasks.run_in_thread(self.backup_blocks, blocks)
raw_blocks = await get_raw_blocks(last, hex_hashes)
await self.tasks.run_in_thread(self.backup_blocks, raw_blocks)
last -= len(raw_blocks)
# Truncate header_mc: header count is 1 more than the height
self.header_mc.truncate(self.height + 1)
await self.prefetcher.reset_height()
async def reorg_hashes(self, count):
'''Return the list of hashes to back up beacuse of a reorg.
'''Return a pair (start, hashes) of blocks to back up during a
reorg.
The hashes are returned in order of increasing height.'''
The hashes are returned in order of increasing height. Start
is the height of the first hash.
'''
def diff_pos(hashes1, hashes2):
'''Returns the index of the first difference in the hash lists.
@ -360,7 +374,7 @@ class BlockProcessor(electrumx.server.db.DB):
'heights {:,d}-{:,d}'
.format(count, s, start, start + count - 1))
return self.fs_block_hashes(start, count)
return start, self.fs_block_hashes(start, count)
def flush_state(self, batch):
'''Flush chain state to the batch.'''
@ -527,6 +541,7 @@ class BlockProcessor(electrumx.server.db.DB):
undo_info = self.advance_txs(block.transactions)
if height >= min_height:
self.undo_infos.append((undo_info, height))
self.write_raw_block(block.raw, height)
headers = [block.header for block in blocks]
self.height = height

42
electrumx/server/db.py

@ -12,9 +12,10 @@
import array
import ast
import os
from struct import pack, unpack
from bisect import bisect_right
from collections import namedtuple
from glob import glob
from struct import pack, unpack
import electrumx.lib.util as util
from electrumx.lib.hash import hash_to_hex_str, HASHX_LEN
@ -265,6 +266,29 @@ class DB(object):
for undo_info, height in undo_infos:
batch_put(self.undo_key(height), b''.join(undo_info))
def raw_block_prefix(self):
return 'meta/block'
def raw_block_path(self, height):
return f'{self.raw_block_prefix()}{height:d}'
def read_raw_block(self, height):
'''Returns a raw block read from disk. Raises FileNotFoundError
if the block isn't on-disk.'''
with util.open_file(self.raw_block_path(height)) as f:
return f.read(-1)
def write_raw_block(self, block, height):
'''Write a raw block to disk.'''
with util.open_truncate(self.raw_block_path(height)) as f:
f.write(block)
# Delete old blocks to prevent them accumulating
try:
del_height = self.min_undo_height(height) - 1
os.remove(self.raw_block_path(del_height))
except FileNotFoundError:
pass
def clear_excess_undo_info(self):
'''Clear excess undo info. Only most recent N are kept.'''
prefix = b'U'
@ -280,8 +304,20 @@ class DB(object):
with self.utxo_db.write_batch() as batch:
for key in keys:
batch.delete(key)
self.logger.info('deleted {:,d} stale undo entries'
.format(len(keys)))
self.logger.info(f'deleted {len(keys):,d} stale undo entries')
# delete old block files
prefix = self.raw_block_prefix()
paths = [path for path in glob(f'{prefix}[0-9]*')
if len(path) > len(prefix)
and int(path[len(prefix):]) < min_height]
if paths:
for path in paths:
try:
os.remove(path)
except FileNotFoundError:
pass
self.logger.info(f'deleted {len(paths):,d} stale block files')
# -- UTXO database

Loading…
Cancel
Save