Browse Source

Add an RPC call to force a reorg at run-time

This required a large rework block processor and prefetcher
interaction, and led to various cleanups

Closes #103
master
Neil Booth 8 years ago
parent
commit
2ad64f6243
  1. 5
      docs/RPC-INTERFACE.rst
  2. 334
      server/block_processor.py
  3. 17
      server/controller.py
  4. 10
      server/mempool.py
  5. 2
      server/session.py

5
docs/RPC-INTERFACE.rst

@ -142,3 +142,8 @@ The following commands are available:
Returns a list of peer electrum servers. This command takes no arguments.
Currently this is data gleaned from an IRC session.
* **reorg**
Force a block chain reorg. This command takes an optional
argument - the number of blocks to reorg - that defaults to 3.

334
server/block_processor.py

@ -25,85 +25,65 @@ import server.db
class Prefetcher(LoggedClass):
'''Prefetches blocks (in the forward direction only).'''
def __init__(self, coin, daemon, height):
def __init__(self, bp):
super().__init__()
self.coin = coin
self.daemon = daemon
self.bp = bp
self.caught_up = False
# Access to fetched_height should be protected by the semaphore
self.fetched_height = height
self.fetched_height = None
self.semaphore = asyncio.Semaphore()
self.refill_event = asyncio.Event()
# A cache queue of (blocks, size) pairs. The target cache
# size has little effect on sync time.
self.cache = asyncio.Queue()
# The prefetched block cache size. The min cache size has
# little effect on sync time.
self.cache_size = 0
self.min_cache_size = 10 * 1024 * 1024
# This makes the first fetch be 10 blocks
self.ave_size = self.min_cache_size // 10
async def clear(self, height):
'''Clear prefetched blocks and restart from the given height.
Used in blockchain reorganisations. This coroutine can be
called asynchronously to the _prefetch coroutine so we must
synchronize with a semaphore.
Set height to -1 when shutting down to place a sentinel on the
queue to tell the block processor to shut down too.
'''
with await self.semaphore:
while not self.cache.empty():
self.cache.get_nowait()
self.cache_size = 0
if height == -1:
self.cache.put_nowait((None, 0))
else:
self.refill_event.set()
self.fetched_height = height
self.logger.info('reset to height {:,d}'.format(height))
async def get_blocks(self):
'''Return the next list of blocks from our prefetch cache.
A return value of None indicates to shut down. Once caught up
an entry is queued every few seconds synchronized with mempool
refreshes to indicate a new mempool is available. Of course
the list of blocks in such a case will normally be empty.'''
blocks, size = await self.cache.get()
self.cache_size -= size
if self.cache_size < self.min_cache_size:
self.refill_event.set()
return blocks
async def main_loop(self, caught_up_event):
async def main_loop(self):
'''Loop forever polling for more blocks.'''
daemon_height = await self.daemon.height()
if self.fetched_height >= daemon_height:
log_msg = 'caught up to daemon height {:,d}'
else:
log_msg = 'catching up to daemon height {:,d}...'
self.logger.info(log_msg.format(daemon_height))
while True:
try:
# Sleep a while if there is nothing to prefetch
if not await self._prefetch_blocks(caught_up_event.is_set()):
await asyncio.sleep(5)
await self.refill_event.wait()
if not await self._prefetch_blocks():
await asyncio.sleep(5)
except DaemonError as e:
self.logger.info('ignoring daemon error: {}'.format(e))
except asyncio.CancelledError:
await self.clear(-1)
return
async def _prefetch_blocks(self, mempool):
def processing_blocks(self, blocks):
'''Called by block processor when it is processing queued blocks.'''
self.cache_size -= sum(len(block) for block in blocks)
if self.cache_size < self.min_cache_size:
self.refill_event.set()
async def reset_height(self):
'''Reset to prefetch blocks from the block processor's height.
Used in blockchain reorganisations. This coroutine can be
called asynchronously to the _prefetch coroutine so we must
synchronize with a semaphore.'''
with await self.semaphore:
self.fetched_height = self.bp.height
self.refill_event.set()
daemon_height = await self.bp.daemon.height()
behind = daemon_height - self.bp.height
if behind > 0:
self.logger.info('catching up to daemon height {:,d} '
'({:,d} blocks behind)'
.format(daemon_height, behind))
else:
self.logger.info('caught up to daemon height {:,d}'
.format(daemon_height))
async def _prefetch_blocks(self):
'''Prefetch some blocks and put them on the queue.
Repeats until the queue is full or caught up. If caught up,
sleep for a period of time before returning.
Repeats until the queue is full or caught up.
'''
daemon_height = await self.daemon.height(mempool)
daemon = self.bp.daemon
daemon_height = await daemon.height(self.bp.caught_up_event.is_set())
with await self.semaphore:
while self.cache_size < self.min_cache_size:
# Try and catch up all blocks but limit to room in cache.
@ -112,22 +92,23 @@ class Prefetcher(LoggedClass):
count = min(daemon_height - self.fetched_height, cache_room)
count = min(2500, max(count, 0))
if not count:
self.cache.put_nowait(([], 0))
self.caught_up = True
if not self.caught_up:
self.caught_up = True
self.bp.on_prefetcher_first_caught_up()
return False
first = self.fetched_height + 1
hex_hashes = await self.daemon.block_hex_hashes(first, count)
hex_hashes = await daemon.block_hex_hashes(first, count)
if self.caught_up:
self.logger.info('new block height {:,d} hash {}'
.format(first + count-1, hex_hashes[-1]))
blocks = await self.daemon.raw_blocks(hex_hashes)
blocks = await daemon.raw_blocks(hex_hashes)
assert count == len(blocks)
# Special handling for genesis block
if first == 0:
blocks[0] = self.coin.genesis_block(blocks[0])
blocks[0] = self.bp.coin.genesis_block(blocks[0])
self.logger.info('verified genesis block with hash {}'
.format(hex_hashes[0]))
@ -138,19 +119,17 @@ class Prefetcher(LoggedClass):
else:
self.ave_size = (size + (10 - count) * self.ave_size) // 10
self.cache.put_nowait((blocks, size))
self.bp.on_prefetched_blocks(blocks, first)
self.cache_size += size
self.fetched_height += count
self.refill_event.clear()
return True
class ChainError(Exception):
'''Raised on error processing blocks.'''
class ChainReorg(Exception):
'''Raised on a blockchain reorganisation.'''
class BlockProcessor(server.db.DB):
'''Process blocks and update the DB state to match.
@ -171,82 +150,106 @@ class BlockProcessor(server.db.DB):
self.daemon = Daemon(self.coin.daemon_urls(env.daemon_url))
self.caught_up_event = asyncio.Event()
self.task_queue = asyncio.Queue()
self.stop = False
# Meta
self.cache_MB = env.cache_MB
self.next_cache_check = 0
# Headers and tx_hashes have one entry per block
self.history = defaultdict(partial(array.array, 'I'))
self.history_size = 0
self.prefetcher = Prefetcher(self.coin, self.daemon, self.height)
self.last_flush = time.time()
self.last_flush_tx_count = self.tx_count
self.touched = set()
# Caches of unflushed items
# Caches of unflushed items.
self.headers = []
self.tx_hashes = []
self.history = defaultdict(partial(array.array, 'I'))
self.history_size = 0
# UTXO cache
self.utxo_cache = {}
self.db_deletes = []
# Log state
self.prefetcher = Prefetcher(self)
if self.utxo_db.for_sync:
self.logger.info('flushing DB cache at {:,d} MB'
.format(self.cache_MB))
async def executor(self, func, *args, **kwargs):
'''Run func taking args in the executor.'''
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, partial(func, *args, **kwargs))
def add_task(self, task):
'''Add the task to our task queue.'''
self.task_queue.put_nowait(task)
def caught_up(self):
'''Called when first caught up after starting.'''
if not self.caught_up_event.is_set():
self.first_sync = False
self.flush(True)
if self.utxo_db.for_sync:
self.logger.info('{} synced to height {:,d}'
.format(VERSION, self.height))
self.open_dbs()
self.caught_up_event.set()
def on_prefetched_blocks(self, blocks, first):
'''Called by the prefetcher when it has prefetched some blocks.'''
self.add_task(partial(self.check_and_advance_blocks, blocks, first))
async def main_loop(self, touched):
def on_prefetcher_first_caught_up(self):
'''Called by the prefetcher when it first catches up.'''
self.add_task(self.first_caught_up)
def on_shutdown(self):
'''Called by the controller to shut processing down.'''
async def do_nothing():
pass
self.stop = True
self.add_task(do_nothing) # Ensure something is on the queue
async def main_loop(self):
'''Main loop for block processing.'''
await self.prefetcher.reset_height()
while True:
blocks = await self.prefetcher.get_blocks()
if blocks:
start = time.time()
await self.check_and_advance_blocks(blocks, touched)
if not self.first_sync:
s = '' if len(blocks) == 1 else 's'
self.logger.info('processed {:,d} block{} in {:.1f}s'
.format(len(blocks), s,
time.time() - start))
elif blocks is None:
break # Shutdown
else:
self.caught_up()
while not self.stop:
task = await self.task_queue.get()
await task()
self.logger.info('flushing state to DB for a clean shutdown...')
self.flush(True)
await self.executor(self.flush, True)
self.logger.info('shutdown complete')
async def check_and_advance_blocks(self, blocks, touched):
async def executor(self, func, *args, **kwargs):
'''Run func taking args in the executor.'''
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, partial(func, *args, **kwargs))
async def first_caught_up(self):
'''Called when first caught up to daemon after starting.'''
# Flush everything with updated first_sync->False state.
self.first_sync = False
await self.executor(self.flush, True)
if self.utxo_db.for_sync:
self.logger.info('{} synced to height {:,d}'
.format(VERSION, self.height))
self.open_dbs()
self.caught_up_event.set()
async def check_and_advance_blocks(self, blocks, first):
'''Process the list of blocks passed. Detects and handles reorgs.'''
first = self.height + 1
self.prefetcher.processing_blocks(blocks)
if first != self.height + 1:
# If we prefetched two sets of blocks and the first caused
# a reorg this will happen when we try to process the
# second. It should be very rare.
self.logger.warning('ignoring {:,d} blocks starting height {:,d}, '
'expected {:,d}'
.format(len(blocks), first, self.height + 1))
return
headers = [self.coin.block_header(block, first + n)
for n, block in enumerate(blocks)]
hprevs = [self.coin.header_prevhash(h) for h in headers]
chain = [self.tip] + [self.coin.header_hash(h) for h in headers[:-1]]
if hprevs == chain:
await self.executor(self.advance_blocks, blocks, headers, touched)
start = time.time()
await self.executor(self.advance_blocks, blocks, headers)
if not self.first_sync:
s = '' if len(blocks) == 1 else 's'
self.logger.info('processed {:,d} block{} in {:.1f}s'
.format(len(blocks), s,
time.time() - start))
elif hprevs[0] != chain[0]:
await self.handle_chain_reorg(touched)
await self.reorg_chain()
else:
# It is probably possible but extremely rare that what
# bitcoind returns doesn't form a chain because it
@ -255,43 +258,27 @@ class BlockProcessor(server.db.DB):
# just to reset the prefetcher and try again.
self.logger.warning('daemon blocks do not form a chain; '
'resetting the prefetcher')
await self.prefetcher.clear(self.height)
await self.prefetcher.reset_height()
def advance_blocks(self, blocks, headers, touched):
'''Synchronously advance the blocks.
def force_chain_reorg(self, count):
'''Force a reorg of the given number of blocks.
It is already verified they correctly connect onto our tip.
Returns True if a reorg is queued, false if not caught up.
'''
block_txs = self.coin.block_txs
daemon_height = self.daemon.cached_height()
for block in blocks:
self.height += 1
txs = block_txs(block, self.height)
self.tx_hashes.append(b''.join(tx_hash for tx, tx_hash in txs))
undo_info = self.advance_txs(txs, touched)
if daemon_height - self.height <= self.env.reorg_limit:
self.write_undo_info(self.height, b''.join(undo_info))
self.headers.extend(headers)
self.tip = self.coin.header_hash(headers[-1])
# If caught up, flush everything as client queries are
# performed on the DB.
if self.caught_up_event.is_set():
self.flush(True)
else:
touched.clear()
if time.time() > self.next_cache_check:
self.check_cache_size()
self.next_cache_check = time.time() + 30
self.add_task(partial(self.reorg_chain, count=count))
return True
return False
async def handle_chain_reorg(self, touched, count=None):
async def reorg_chain(self, count=None):
'''Handle a chain reorganisation.
Count is the number of blocks to simulate a reorg, or None for
a real reorg.'''
self.logger.info('chain reorg detected')
if count is None:
self.logger.info('chain reorg detected')
else:
self.logger.info('faking a reorg of {:,d} blocks'.format(count))
await self.executor(self.flush, True)
hashes = await self.reorg_hashes(count)
@ -299,8 +286,8 @@ class BlockProcessor(server.db.DB):
hashes = [hash_to_str(hash) for hash in reversed(hashes)]
for hex_hashes in chunks(hashes, 50):
blocks = await self.daemon.raw_blocks(hex_hashes)
await self.executor(self.backup_blocks, blocks, touched)
await self.prefetcher.clear(self.height)
await self.executor(self.backup_blocks, blocks)
await self.prefetcher.reset_height()
async def reorg_hashes(self, count):
'''Return the list of hashes to back up beacuse of a reorg.
@ -431,7 +418,7 @@ class BlockProcessor(server.db.DB):
self.tx_hashes = []
self.headers = []
def backup_flush(self, hashXs):
def backup_flush(self):
'''Like flush() but when backing up. All UTXOs are flushed.
hashXs - sequence of hashXs which were touched by backing
@ -449,10 +436,12 @@ class BlockProcessor(server.db.DB):
assert not self.headers
assert not self.tx_hashes
# Backup history
nremoves = self.backup_history(hashXs)
self.logger.info('backing up removed {:,d} history entries from '
'{:,d} addresses'.format(nremoves, len(hashXs)))
# Backup history. self.touched can include other addresses
# which is harmless, but remove None.
self.touched.discard(None)
nremoves = self.backup_history(self.touched)
self.logger.info('backing up removed {:,d} history entries'
.format(nremoves))
with self.utxo_db.write_batch() as batch:
# Flush state last as it reads the wall time.
@ -489,10 +478,38 @@ class BlockProcessor(server.db.DB):
if utxo_MB + hist_MB >= self.cache_MB or hist_MB >= self.cache_MB // 5:
self.flush(utxo_MB >= self.cache_MB * 4 // 5)
def advance_txs(self, txs, touched):
undo_info = []
def advance_blocks(self, blocks, headers):
'''Synchronously advance the blocks.
It is already verified they correctly connect onto our tip.
'''
block_txs = self.coin.block_txs
daemon_height = self.daemon.cached_height()
for block in blocks:
self.height += 1
undo_info = self.advance_txs(block_txs(block, self.height))
if daemon_height - self.height <= self.env.reorg_limit:
self.write_undo_info(self.height, b''.join(undo_info))
self.headers.extend(headers)
self.tip = self.coin.header_hash(headers[-1])
# If caught up, flush everything as client queries are
# performed on the DB.
if self.caught_up_event.is_set():
self.flush(True)
else:
self.touched.clear()
if time.time() > self.next_cache_check:
self.check_cache_size()
self.next_cache_check = time.time() + 30
def advance_txs(self, txs):
self.tx_hashes.append(b''.join(tx_hash for tx, tx_hash in txs))
# Use local vars for speed in the loops
undo_info = []
history = self.history
history_size = self.history_size
tx_num = self.tx_count
@ -501,6 +518,7 @@ class BlockProcessor(server.db.DB):
put_utxo = self.utxo_cache.__setitem__
spend_utxo = self.spend_utxo
undo_info_append = undo_info.append
touched = self.touched
for tx, tx_hash in txs:
hashXs = set()
@ -535,36 +553,33 @@ class BlockProcessor(server.db.DB):
return undo_info
def backup_blocks(self, blocks, touched):
def backup_blocks(self, blocks):
'''Backup the blocks and flush.
The blocks should be in order of decreasing height, starting at.
self.height. A flush is performed once the blocks are backed up.
'''
self.assert_flushed()
assert self.height >= len(blocks)
coin = self.coin
for block in blocks:
txs = self.coin.block_txs(block, self.height)
header_hash = self.coin.header_hash(header)
# Check and update self.tip
header = coin.block_header(block, self.height)
header_hash = coin.header_hash(header)
if header_hash != self.tip:
raise ChainError('backup block {} is not tip {} at height {:,d}'
.format(hash_to_str(header_hash),
hash_to_str(self.tip), self.height))
self.backup_txs(txs, touched)
self.tip = self.coin.header_prevhash(header)
assert self.height >= 0
self.tip = coin.header_prevhash(header)
self.backup_txs(coin.block_txs(block, self.height))
self.height -= 1
self.tx_counts.pop()
self.logger.info('backed up to height {:,d}'.format(self.height))
self.backup_flush()
# touched includes those passed into this function. That likely
# has additional addresses which is harmless. Remove None.
touched.discard(None)
self.backup_flush(touched)
def backup_txs(self, txs, touched):
def backup_txs(self, txs):
# Prevout values, in order down the block (coinbase first if present)
# undo_info is in reverse block order
undo_info = self.read_undo_info(self.height)
@ -578,6 +593,7 @@ class BlockProcessor(server.db.DB):
put_utxo = self.utxo_cache.__setitem__
spend_utxo = self.spend_utxo
script_hashX = self.coin.hashX_from_script
touched = self.touched
undo_entry_len = 12 + self.coin.HASHX_LEN
for tx, tx_hash in reversed(txs):

17
server/controller.py

@ -49,7 +49,7 @@ class Controller(util.LoggedClass):
self.loop = asyncio.get_event_loop()
self.start = time.time()
self.bp = BlockProcessor(env)
self.mempool = MemPool(self.bp.daemon, env.coin, self.bp)
self.mempool = MemPool(self.bp)
self.irc = IRC(env)
self.env = env
self.servers = {}
@ -172,8 +172,8 @@ class Controller(util.LoggedClass):
self.futures.append(asyncio.ensure_future(coro))
# shutdown() assumes bp.main_loop() is first
add_future(self.bp.main_loop(self.mempool.touched))
add_future(self.bp.prefetcher.main_loop(self.bp.caught_up_event))
add_future(self.bp.main_loop())
add_future(self.bp.prefetcher.main_loop())
add_future(self.irc.start(self.bp.caught_up_event))
add_future(self.start_servers(self.bp.caught_up_event))
add_future(self.mempool.main_loop())
@ -187,8 +187,8 @@ class Controller(util.LoggedClass):
await future # Note: future is not one of self.futures
except asyncio.CancelledError:
break
await self.shutdown()
await asyncio.sleep(1)
def close_servers(self, kinds):
'''Close the servers of the given kinds (TCP etc.).'''
@ -309,6 +309,7 @@ class Controller(util.LoggedClass):
async def shutdown(self):
'''Call to shutdown everything. Returns when done.'''
self.state = self.SHUTTING_DOWN
self.bp.on_shutdown()
self.close_servers(list(self.servers.keys()))
# Don't cancel the block processor main loop - let it close itself
for future in self.futures[1:]:
@ -559,3 +560,11 @@ class Controller(util.LoggedClass):
async def rpc_peers(self, params):
return self.irc.peers
async def rpc_reorg(self, params):
'''Force a reorg of the given number of blocks, 3 by default.'''
count = 3
if params:
count = JSONRPC.params_to_non_negative_integer(params)
if not self.bp.force_chain_reorg(count):
raise JSONRPC.RPCError('still catching up with daemon')

10
server/mempool.py

@ -31,12 +31,12 @@ class MemPool(util.LoggedClass):
A pair is a (hashX, value) tuple. tx hashes are hex strings.
'''
def __init__(self, daemon, coin, db):
def __init__(self, bp):
super().__init__()
self.daemon = daemon
self.coin = coin
self.db = db
self.touched = set()
self.daemon = bp.daemon
self.coin = bp.coin
self.db = bp
self.touched = bp.touched
self.touched_event = asyncio.Event()
self.prioritized = set()
self.stop = False

2
server/session.py

@ -496,7 +496,7 @@ class LocalRPC(Session):
def __init__(self, *args):
super().__init__(*args)
cmds = 'disconnect getinfo groups log peers sessions'.split()
cmds = 'disconnect getinfo groups log peers reorg sessions'.split()
self.handlers = {cmd: getattr(self.manager, 'rpc_{}'.format(cmd))
for cmd in cmds}
self.client = 'RPC'

Loading…
Cancel
Save