Browse Source

Start work on handling block reorgs

Unfinished
master
Neil Booth 8 years ago
parent
commit
6957b59b19
  1. 137
      server/block_processor.py
  2. 49
      server/cache.py
  3. 1
      server/protocol.py

137
server/block_processor.py

@ -35,8 +35,8 @@ class Prefetcher(LoggedClass):
def __init__(self, daemon, height): def __init__(self, daemon, height):
super().__init__() super().__init__()
self.daemon = daemon self.daemon = daemon
self.semaphore = asyncio.Semaphore()
self.queue = asyncio.Queue() self.queue = asyncio.Queue()
self.queue_semaphore = asyncio.Semaphore()
self.queue_size = 0 self.queue_size = 0
# Target cache size. Has little effect on sync time. # Target cache size. Has little effect on sync time.
self.target_cache_size = 10 * 1024 * 1024 self.target_cache_size = 10 * 1024 * 1024
@ -49,12 +49,26 @@ class Prefetcher(LoggedClass):
self.queue_size -= total_size self.queue_size -= total_size
return blocks return blocks
async def clear(self, height):
'''Clear prefetched blocks and restart from the given height.
Used in blockchain reorganisations. This coroutine can be
called asynchronously to the _prefetch coroutine so we must
synchronize.
'''
with await self.semaphore:
while not self.queue.empty():
self.queue.get_nowait()
self.queue_size = 0
self.fetched_height = height
async def start(self): async def start(self):
'''Loops forever polling for more blocks.''' '''Loop forever polling for more blocks.'''
self.logger.info('prefetching blocks...') self.logger.info('prefetching blocks...')
while True: while True:
while self.queue_size < self.target_cache_size: while self.queue_size < self.target_cache_size:
try: try:
with await self.semaphore:
await self._prefetch() await self._prefetch()
except DaemonError as e: except DaemonError as e:
self.logger.info('ignoring daemon errors: {}'.format(e)) self.logger.info('ignoring daemon errors: {}'.format(e))
@ -71,11 +85,11 @@ class Prefetcher(LoggedClass):
max_count = min(daemon_height - self.fetched_height, 4000) max_count = min(daemon_height - self.fetched_height, 4000)
count = min(max_count, self._prefill_count(self.target_cache_size)) count = min(max_count, self._prefill_count(self.target_cache_size))
first = self.fetched_height + 1 first = self.fetched_height + 1
hashes = await self.daemon.block_hex_hashes(first, count) hex_hashes = await self.daemon.block_hex_hashes(first, count)
if not hashes: if not hex_hashes:
return return
blocks = await self.daemon.raw_blocks(hashes) blocks = await self.daemon.raw_blocks(hex_hashes)
sizes = [len(block) for block in blocks] sizes = [len(block) for block in blocks]
total_size = sum(sizes) total_size = sum(sizes)
self.queue.put_nowait((blocks, total_size)) self.queue.put_nowait((blocks, total_size))
@ -149,34 +163,83 @@ class BlockProcessor(LoggedClass):
return [self.start(), self.prefetcher.start()] return [self.start(), self.prefetcher.start()]
async def start(self): async def start(self):
'''Loop forever processing blocks in the appropriate direction.''' '''External entry point for block processing.
A simple wrapper that safely flushes the DB on clean
shutdown.
'''
try: try:
await self.advance_blocks()
finally:
self.flush(True)
async def advance_blocks(self):
'''Loop forever processing blocks in the forward direction.'''
caught_up = False
while True: while True:
blocks = await self.prefetcher.get_blocks() blocks = await self.prefetcher.get_blocks()
for block in blocks: for block in blocks:
self.process_block(block) if not self.advance_block(block):
# Release asynchronous block fetching await self.handle_chain_reorg()
await asyncio.sleep(0) caught_up = False
break
await asyncio.sleep(0) # Yield
if not caught_up and self.height == self.daemon.cached_height():
caught_up = True
self.logger.info('caught up to height {:,d}'
.format(self.height))
async def handle_chain_reorg(self):
hashes = await self.reorg_hashes(self)
hex_hashes = [hash_to_str(hash) for hash in hashes]
blocks = await self.daemon.raw_blocks(hex_hashes)
for block in reversed(blocks):
self.backup_block(block)
await self.prefetcher.clear()
async def reorg_hashes(self):
'''Return the list of hashes to back up beacuse of a reorg.
The hashes are returned in order of increasing height.'''
def match_pos(hashes1, hashes2):
for n, (hash1, hash2) in enumerate(zip(hashes1, hashes2)):
if hash1 == hash2:
return n
return -1
if self.height == self.daemon.cached_height(): self.logger.info('chain reorg detected; finding common height...')
self.logger.info('caught up to height {:d}'
.format(self_height)) start = self.height - 1
self.flush(True) count = 1
finally: while True:
if self.daemon.cached_height() is not None: hashes = self.fs_cache.block_hashes(start, count)
self.flush(True) d_hex_hashes = await self.daemon.block_hex_hashes(start, count)
d_hashes = [bytes.fromhex(hex_hash) for hex_hash in d_hex_hashes]
n = match_pos(hashes, d_hashes)
if n >= 0:
break
assert start > 0
count = min(count * 2, start)
start -= count
# Hashes differ from height 'start'
start += n + 1
count = (self.height - start) + 1
self.logger.info('chain was reorganised for {:,d} blocks starting '
'at height {:,d}', start, count)
return self.fs_cache.block_hashes(start, count)
def open_db(self, coin): def open_db(self, coin):
block_size = 4 * 1024
db_name = '{}-{}'.format(coin.NAME, coin.NET) db_name = '{}-{}'.format(coin.NAME, coin.NET)
try: try:
db = plyvel.DB(db_name, create_if_missing=False, db = plyvel.DB(db_name, create_if_missing=False,
error_if_exists=False, compression=None, error_if_exists=False, compression=None)
block_size = block_size)
except: except:
db = plyvel.DB(db_name, create_if_missing=True, db = plyvel.DB(db_name, create_if_missing=True,
error_if_exists=True, compression=None, error_if_exists=True, compression=None)
block_size = block_size)
self.logger.info('created new database {}'.format(db_name)) self.logger.info('created new database {}'.format(db_name))
self.flush_state(db) self.flush_state(db)
else: else:
@ -343,21 +406,19 @@ class BlockProcessor(LoggedClass):
.format(utxo_MB + hist_MB, utxo_MB, hist_MB)) .format(utxo_MB + hist_MB, utxo_MB, hist_MB))
return utxo_MB, hist_MB return utxo_MB, hist_MB
def process_block(self, block): def advance_block(self, block):
# We must update the fs_cache before calling process_tx() as # We must update the fs_cache before calling advance_txs() as
# it uses the fs_cache for tx hash lookup # the UTXO cache uses the fs_cache via get_tx_hash() to
header, tx_hashes, txs = self.fs_cache.process_block(block) # resolve compressed key collisions
header, tx_hashes, txs = self.coin.read_block(block)
self.fs_cache.advance_block(header, tx_hashes, txs)
prev_hash, header_hash = self.coin.header_hashes(header) prev_hash, header_hash = self.coin.header_hashes(header)
if prev_hash != self.tip: if prev_hash != self.tip:
raise ChainError('trying to build header with prev_hash {} ' return False
'on top of tip with hash {}'
.format(hash_to_str(prev_hash),
hash_to_str(self.tip)))
self.tip = header_hash self.tip = header_hash
self.height += 1 self.height += 1
for tx_hash, tx in zip(tx_hashes, txs): self.advance_txs(tx_hashes, txs)
self.process_tx(tx_hash, tx)
# Check if we're getting full and time to flush? # Check if we're getting full and time to flush?
now = time.time() now = time.time()
@ -367,10 +428,13 @@ class BlockProcessor(LoggedClass):
if utxo_MB >= self.utxo_MB or hist_MB >= self.hist_MB: if utxo_MB >= self.utxo_MB or hist_MB >= self.hist_MB:
self.flush(utxo_MB >= self.utxo_MB) self.flush(utxo_MB >= self.utxo_MB)
def process_tx(self, tx_hash, tx): return True
def advance_txs(self, tx_hashes, txs):
cache = self.utxo_cache cache = self.utxo_cache
tx_num = self.tx_count tx_num = self.tx_count
for tx_hash, tx in zip(tx_hashes, txs):
# Add the outputs as new UTXOs; spend the inputs # Add the outputs as new UTXOs; spend the inputs
hash168s = cache.add_many(tx_hash, tx_num, tx.outputs) hash168s = cache.add_many(tx_hash, tx_num, tx.outputs)
if not tx.is_coinbase: if not tx.is_coinbase:
@ -380,8 +444,15 @@ class BlockProcessor(LoggedClass):
for hash168 in hash168s: for hash168 in hash168s:
self.history[hash168].append(tx_num) self.history[hash168].append(tx_num)
self.history_size += len(hash168s) self.history_size += len(hash168s)
tx_num += 1
self.tx_count = tx_num
self.tx_count += 1 def backup_block(self, block):
pass
def undo_txs(self, tx_hashes, txs):
pass
@staticmethod @staticmethod
def resolve_limit(limit): def resolve_limit(limit):

49
server/cache.py

@ -9,8 +9,8 @@ from bisect import bisect_right
from collections import namedtuple from collections import namedtuple
from lib.script import ScriptPubKey from lib.script import ScriptPubKey
from lib.util import LoggedClass from lib.util import chunks, LoggedClass
from lib.hash import hash_to_str from lib.hash import double_sha256, hash_to_str
# History can hold approx. 65536 * HIST_ENTRIES_PER_KEY entries # History can hold approx. 65536 * HIST_ENTRIES_PER_KEY entries
@ -285,6 +285,8 @@ class FSCache(LoggedClass):
self.headers_file = self.open_file('headers', is_new) self.headers_file = self.open_file('headers', is_new)
self.txcount_file = self.open_file('txcount', is_new) self.txcount_file = self.open_file('txcount', is_new)
# tx_counts[N] has the cumulative number of txs at the end of
# height N. So tx_counts[0] is 1 - the genesis coinbase
self.tx_counts = array.array('I') self.tx_counts = array.array('I')
self.txcount_file.seek(0) self.txcount_file.seek(0)
self.tx_counts.fromfile(self.txcount_file, self.height + 1) self.tx_counts.fromfile(self.txcount_file, self.height + 1)
@ -302,33 +304,33 @@ class FSCache(LoggedClass):
return open(filename, 'wb+') return open(filename, 'wb+')
raise raise
return self.tx_counts[self.height] if self.tx_counts else 0 def advance_block(self, header, tx_hashes, txs):
'''Update the FS cache for a new block.'''
def process_block(self, block): prior_tx_count = self.tx_counts[-1] if self.tx_counts else 0
'''Process a new block and return (header, tx_hashes, txs)'''
assert len(self.tx_counts) == self.height + 1 + len(self.headers)
triple = header, tx_hashes, txs = self.coin.read_block(block)
# Cache the new header, tx hashes and cumulative tx count # Cache the new header, tx hashes and cumulative tx count
self.headers.append(header) self.headers.append(header)
self.tx_hashes.append(tx_hashes) self.tx_hashes.append(tx_hashes)
prior_tx_count = self.tx_counts[-1] if self.tx_counts else 0
self.tx_counts.append(prior_tx_count + len(txs)) self.tx_counts.append(prior_tx_count + len(txs))
return triple def backup_block(self, block):
'''Revert a block and return (header, tx_hashes, txs)'''
pass
def flush(self, new_height, new_tx_count): def flush(self, new_height, new_tx_count):
'''Flush the things stored on the filesystem.''' '''Flush the things stored on the filesystem.
The arguments are passed for sanity check assertions only.'''
self.logger.info('flushing to file system') self.logger.info('flushing to file system')
block_count = len(self.headers) blocks_done = len(self.headers)
assert self.height + block_count == new_height
assert len(self.tx_hashes) == block_count
assert len(self.tx_counts) == self.height + 1 + block_count
assert new_tx_count == self.tx_counts[-1] if self.tx_counts else 0
prior_tx_count = self.tx_counts[self.height] if self.height >= 0 else 0 prior_tx_count = self.tx_counts[self.height] if self.height >= 0 else 0
tx_diff = new_tx_count - prior_tx_count cur_tx_count = self.tx_counts[-1] if self.tx_counts else 0
txs_done = cur_tx_count - prior_tx_count
assert self.height + blocks_done == new_height
assert cur_tx_count == new_tx_count
assert len(self.tx_hashes) == blocks_done
assert len(self.tx_counts) == new_height + 1
# First the headers # First the headers
headers = b''.join(self.headers) headers = b''.join(self.headers)
@ -345,7 +347,7 @@ class FSCache(LoggedClass):
# Finally the hashes # Finally the hashes
hashes = memoryview(b''.join(itertools.chain(*self.tx_hashes))) hashes = memoryview(b''.join(itertools.chain(*self.tx_hashes)))
assert len(hashes) % 32 == 0 assert len(hashes) % 32 == 0
assert len(hashes) // 32 == tx_diff assert len(hashes) // 32 == txs_done
cursor = 0 cursor = 0
file_pos = prior_tx_count * 32 file_pos = prior_tx_count * 32
while cursor < len(hashes): while cursor < len(hashes):
@ -362,9 +364,9 @@ class FSCache(LoggedClass):
self.tx_hashes = [] self.tx_hashes = []
self.headers = [] self.headers = []
self.height += block_count self.height += blocks_done
return tx_diff return txs_done
def read_headers(self, height, count): def read_headers(self, height, count):
read_count = min(count, self.height + 1 - height) read_count = min(count, self.height + 1 - height)
@ -403,6 +405,11 @@ class FSCache(LoggedClass):
return tx_hash, height return tx_hash, height
def block_hashes(self, height, count):
headers = self.read_headers(height, count)
hlen = self.coin.HEADER_LEN
return [double_sha256(header) for header in chunks(headers, hlen)]
def encode_header(self, height): def encode_header(self, height):
if height < 0 or height > self.height + len(self.headers): if height < 0 or height > self.height + len(self.headers):
raise Exception('no header information for height {:,d}' raise Exception('no header information for height {:,d}'

1
server/protocol.py

@ -7,7 +7,6 @@ import json
import traceback import traceback
from functools import partial from functools import partial
from lib.hash import hash_to_str
from lib.util import LoggedClass from lib.util import LoggedClass
from server.version import VERSION from server.version import VERSION

Loading…
Cancel
Save