diff --git a/docs/RPC-INTERFACE.rst b/docs/RPC-INTERFACE.rst index 195fbf0..5705134 100644 --- a/docs/RPC-INTERFACE.rst +++ b/docs/RPC-INTERFACE.rst @@ -142,3 +142,8 @@ The following commands are available: Returns a list of peer electrum servers. This command takes no arguments. Currently this is data gleaned from an IRC session. + +* **reorg** + + Force a block chain reorg. This command takes an optional + argument - the number of blocks to reorg - that defaults to 3. diff --git a/server/block_processor.py b/server/block_processor.py index a6b3afe..8954238 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -25,85 +25,65 @@ import server.db class Prefetcher(LoggedClass): '''Prefetches blocks (in the forward direction only).''' - def __init__(self, coin, daemon, height): + def __init__(self, bp): super().__init__() - self.coin = coin - self.daemon = daemon + self.bp = bp self.caught_up = False # Access to fetched_height should be protected by the semaphore - self.fetched_height = height + self.fetched_height = None self.semaphore = asyncio.Semaphore() self.refill_event = asyncio.Event() - # A cache queue of (blocks, size) pairs. The target cache - # size has little effect on sync time. - self.cache = asyncio.Queue() + # The prefetched block cache size. The min cache size has + # little effect on sync time. self.cache_size = 0 self.min_cache_size = 10 * 1024 * 1024 # This makes the first fetch be 10 blocks self.ave_size = self.min_cache_size // 10 - async def clear(self, height): - '''Clear prefetched blocks and restart from the given height. - - Used in blockchain reorganisations. This coroutine can be - called asynchronously to the _prefetch coroutine so we must - synchronize with a semaphore. - - Set height to -1 when shutting down to place a sentinel on the - queue to tell the block processor to shut down too. - ''' - with await self.semaphore: - while not self.cache.empty(): - self.cache.get_nowait() - self.cache_size = 0 - if height == -1: - self.cache.put_nowait((None, 0)) - else: - self.refill_event.set() - self.fetched_height = height - self.logger.info('reset to height {:,d}'.format(height)) - - async def get_blocks(self): - '''Return the next list of blocks from our prefetch cache. - - A return value of None indicates to shut down. Once caught up - an entry is queued every few seconds synchronized with mempool - refreshes to indicate a new mempool is available. Of course - the list of blocks in such a case will normally be empty.''' - blocks, size = await self.cache.get() - self.cache_size -= size - if self.cache_size < self.min_cache_size: - self.refill_event.set() - return blocks - - async def main_loop(self, caught_up_event): + async def main_loop(self): '''Loop forever polling for more blocks.''' - daemon_height = await self.daemon.height() - if self.fetched_height >= daemon_height: - log_msg = 'caught up to daemon height {:,d}' - else: - log_msg = 'catching up to daemon height {:,d}...' - self.logger.info(log_msg.format(daemon_height)) - while True: try: # Sleep a while if there is nothing to prefetch - if not await self._prefetch_blocks(caught_up_event.is_set()): - await asyncio.sleep(5) await self.refill_event.wait() + if not await self._prefetch_blocks(): + await asyncio.sleep(5) except DaemonError as e: self.logger.info('ignoring daemon error: {}'.format(e)) - except asyncio.CancelledError: - await self.clear(-1) - return - async def _prefetch_blocks(self, mempool): + def processing_blocks(self, blocks): + '''Called by block processor when it is processing queued blocks.''' + self.cache_size -= sum(len(block) for block in blocks) + if self.cache_size < self.min_cache_size: + self.refill_event.set() + + async def reset_height(self): + '''Reset to prefetch blocks from the block processor's height. + + Used in blockchain reorganisations. This coroutine can be + called asynchronously to the _prefetch coroutine so we must + synchronize with a semaphore.''' + with await self.semaphore: + self.fetched_height = self.bp.height + self.refill_event.set() + + daemon_height = await self.bp.daemon.height() + behind = daemon_height - self.bp.height + if behind > 0: + self.logger.info('catching up to daemon height {:,d} ' + '({:,d} blocks behind)' + .format(daemon_height, behind)) + else: + self.logger.info('caught up to daemon height {:,d}' + .format(daemon_height)) + + async def _prefetch_blocks(self): '''Prefetch some blocks and put them on the queue. - Repeats until the queue is full or caught up. If caught up, - sleep for a period of time before returning. + Repeats until the queue is full or caught up. ''' - daemon_height = await self.daemon.height(mempool) + daemon = self.bp.daemon + daemon_height = await daemon.height(self.bp.caught_up_event.is_set()) with await self.semaphore: while self.cache_size < self.min_cache_size: # Try and catch up all blocks but limit to room in cache. @@ -112,22 +92,23 @@ class Prefetcher(LoggedClass): count = min(daemon_height - self.fetched_height, cache_room) count = min(2500, max(count, 0)) if not count: - self.cache.put_nowait(([], 0)) - self.caught_up = True + if not self.caught_up: + self.caught_up = True + self.bp.on_prefetcher_first_caught_up() return False first = self.fetched_height + 1 - hex_hashes = await self.daemon.block_hex_hashes(first, count) + hex_hashes = await daemon.block_hex_hashes(first, count) if self.caught_up: self.logger.info('new block height {:,d} hash {}' .format(first + count-1, hex_hashes[-1])) - blocks = await self.daemon.raw_blocks(hex_hashes) + blocks = await daemon.raw_blocks(hex_hashes) assert count == len(blocks) # Special handling for genesis block if first == 0: - blocks[0] = self.coin.genesis_block(blocks[0]) + blocks[0] = self.bp.coin.genesis_block(blocks[0]) self.logger.info('verified genesis block with hash {}' .format(hex_hashes[0])) @@ -138,19 +119,17 @@ class Prefetcher(LoggedClass): else: self.ave_size = (size + (10 - count) * self.ave_size) // 10 - self.cache.put_nowait((blocks, size)) + self.bp.on_prefetched_blocks(blocks, first) self.cache_size += size self.fetched_height += count self.refill_event.clear() return True + class ChainError(Exception): '''Raised on error processing blocks.''' -class ChainReorg(Exception): - '''Raised on a blockchain reorganisation.''' - class BlockProcessor(server.db.DB): '''Process blocks and update the DB state to match. @@ -171,82 +150,106 @@ class BlockProcessor(server.db.DB): self.daemon = Daemon(self.coin.daemon_urls(env.daemon_url)) self.caught_up_event = asyncio.Event() + self.task_queue = asyncio.Queue() + self.stop = False # Meta self.cache_MB = env.cache_MB self.next_cache_check = 0 - - # Headers and tx_hashes have one entry per block - self.history = defaultdict(partial(array.array, 'I')) - self.history_size = 0 - self.prefetcher = Prefetcher(self.coin, self.daemon, self.height) - self.last_flush = time.time() self.last_flush_tx_count = self.tx_count + self.touched = set() - # Caches of unflushed items + # Caches of unflushed items. self.headers = [] self.tx_hashes = [] + self.history = defaultdict(partial(array.array, 'I')) + self.history_size = 0 # UTXO cache self.utxo_cache = {} self.db_deletes = [] - # Log state + self.prefetcher = Prefetcher(self) + if self.utxo_db.for_sync: self.logger.info('flushing DB cache at {:,d} MB' .format(self.cache_MB)) - async def executor(self, func, *args, **kwargs): - '''Run func taking args in the executor.''' - loop = asyncio.get_event_loop() - await loop.run_in_executor(None, partial(func, *args, **kwargs)) + def add_task(self, task): + '''Add the task to our task queue.''' + self.task_queue.put_nowait(task) - def caught_up(self): - '''Called when first caught up after starting.''' - if not self.caught_up_event.is_set(): - self.first_sync = False - self.flush(True) - if self.utxo_db.for_sync: - self.logger.info('{} synced to height {:,d}' - .format(VERSION, self.height)) - self.open_dbs() - self.caught_up_event.set() + def on_prefetched_blocks(self, blocks, first): + '''Called by the prefetcher when it has prefetched some blocks.''' + self.add_task(partial(self.check_and_advance_blocks, blocks, first)) - async def main_loop(self, touched): + def on_prefetcher_first_caught_up(self): + '''Called by the prefetcher when it first catches up.''' + self.add_task(self.first_caught_up) + + def on_shutdown(self): + '''Called by the controller to shut processing down.''' + async def do_nothing(): + pass + self.stop = True + self.add_task(do_nothing) # Ensure something is on the queue + + async def main_loop(self): '''Main loop for block processing.''' + await self.prefetcher.reset_height() - while True: - blocks = await self.prefetcher.get_blocks() - if blocks: - start = time.time() - await self.check_and_advance_blocks(blocks, touched) - if not self.first_sync: - s = '' if len(blocks) == 1 else 's' - self.logger.info('processed {:,d} block{} in {:.1f}s' - .format(len(blocks), s, - time.time() - start)) - elif blocks is None: - break # Shutdown - else: - self.caught_up() + while not self.stop: + task = await self.task_queue.get() + await task() self.logger.info('flushing state to DB for a clean shutdown...') - self.flush(True) + await self.executor(self.flush, True) self.logger.info('shutdown complete') - async def check_and_advance_blocks(self, blocks, touched): + async def executor(self, func, *args, **kwargs): + '''Run func taking args in the executor.''' + loop = asyncio.get_event_loop() + await loop.run_in_executor(None, partial(func, *args, **kwargs)) + + async def first_caught_up(self): + '''Called when first caught up to daemon after starting.''' + # Flush everything with updated first_sync->False state. + self.first_sync = False + await self.executor(self.flush, True) + if self.utxo_db.for_sync: + self.logger.info('{} synced to height {:,d}' + .format(VERSION, self.height)) + self.open_dbs() + self.caught_up_event.set() + + async def check_and_advance_blocks(self, blocks, first): '''Process the list of blocks passed. Detects and handles reorgs.''' - first = self.height + 1 + self.prefetcher.processing_blocks(blocks) + if first != self.height + 1: + # If we prefetched two sets of blocks and the first caused + # a reorg this will happen when we try to process the + # second. It should be very rare. + self.logger.warning('ignoring {:,d} blocks starting height {:,d}, ' + 'expected {:,d}' + .format(len(blocks), first, self.height + 1)) + return + headers = [self.coin.block_header(block, first + n) for n, block in enumerate(blocks)] hprevs = [self.coin.header_prevhash(h) for h in headers] chain = [self.tip] + [self.coin.header_hash(h) for h in headers[:-1]] if hprevs == chain: - await self.executor(self.advance_blocks, blocks, headers, touched) + start = time.time() + await self.executor(self.advance_blocks, blocks, headers) + if not self.first_sync: + s = '' if len(blocks) == 1 else 's' + self.logger.info('processed {:,d} block{} in {:.1f}s' + .format(len(blocks), s, + time.time() - start)) elif hprevs[0] != chain[0]: - await self.handle_chain_reorg(touched) + await self.reorg_chain() else: # It is probably possible but extremely rare that what # bitcoind returns doesn't form a chain because it @@ -255,43 +258,27 @@ class BlockProcessor(server.db.DB): # just to reset the prefetcher and try again. self.logger.warning('daemon blocks do not form a chain; ' 'resetting the prefetcher') - await self.prefetcher.clear(self.height) + await self.prefetcher.reset_height() - def advance_blocks(self, blocks, headers, touched): - '''Synchronously advance the blocks. + def force_chain_reorg(self, count): + '''Force a reorg of the given number of blocks. - It is already verified they correctly connect onto our tip. + Returns True if a reorg is queued, false if not caught up. ''' - block_txs = self.coin.block_txs - daemon_height = self.daemon.cached_height() - - for block in blocks: - self.height += 1 - txs = block_txs(block, self.height) - self.tx_hashes.append(b''.join(tx_hash for tx, tx_hash in txs)) - undo_info = self.advance_txs(txs, touched) - if daemon_height - self.height <= self.env.reorg_limit: - self.write_undo_info(self.height, b''.join(undo_info)) - - self.headers.extend(headers) - self.tip = self.coin.header_hash(headers[-1]) - - # If caught up, flush everything as client queries are - # performed on the DB. if self.caught_up_event.is_set(): - self.flush(True) - else: - touched.clear() - if time.time() > self.next_cache_check: - self.check_cache_size() - self.next_cache_check = time.time() + 30 + self.add_task(partial(self.reorg_chain, count=count)) + return True + return False - async def handle_chain_reorg(self, touched, count=None): + async def reorg_chain(self, count=None): '''Handle a chain reorganisation. Count is the number of blocks to simulate a reorg, or None for a real reorg.''' - self.logger.info('chain reorg detected') + if count is None: + self.logger.info('chain reorg detected') + else: + self.logger.info('faking a reorg of {:,d} blocks'.format(count)) await self.executor(self.flush, True) hashes = await self.reorg_hashes(count) @@ -299,8 +286,8 @@ class BlockProcessor(server.db.DB): hashes = [hash_to_str(hash) for hash in reversed(hashes)] for hex_hashes in chunks(hashes, 50): blocks = await self.daemon.raw_blocks(hex_hashes) - await self.executor(self.backup_blocks, blocks, touched) - await self.prefetcher.clear(self.height) + await self.executor(self.backup_blocks, blocks) + await self.prefetcher.reset_height() async def reorg_hashes(self, count): '''Return the list of hashes to back up beacuse of a reorg. @@ -431,7 +418,7 @@ class BlockProcessor(server.db.DB): self.tx_hashes = [] self.headers = [] - def backup_flush(self, hashXs): + def backup_flush(self): '''Like flush() but when backing up. All UTXOs are flushed. hashXs - sequence of hashXs which were touched by backing @@ -449,10 +436,12 @@ class BlockProcessor(server.db.DB): assert not self.headers assert not self.tx_hashes - # Backup history - nremoves = self.backup_history(hashXs) - self.logger.info('backing up removed {:,d} history entries from ' - '{:,d} addresses'.format(nremoves, len(hashXs))) + # Backup history. self.touched can include other addresses + # which is harmless, but remove None. + self.touched.discard(None) + nremoves = self.backup_history(self.touched) + self.logger.info('backing up removed {:,d} history entries' + .format(nremoves)) with self.utxo_db.write_batch() as batch: # Flush state last as it reads the wall time. @@ -489,10 +478,38 @@ class BlockProcessor(server.db.DB): if utxo_MB + hist_MB >= self.cache_MB or hist_MB >= self.cache_MB // 5: self.flush(utxo_MB >= self.cache_MB * 4 // 5) - def advance_txs(self, txs, touched): - undo_info = [] + def advance_blocks(self, blocks, headers): + '''Synchronously advance the blocks. + + It is already verified they correctly connect onto our tip. + ''' + block_txs = self.coin.block_txs + daemon_height = self.daemon.cached_height() + + for block in blocks: + self.height += 1 + undo_info = self.advance_txs(block_txs(block, self.height)) + if daemon_height - self.height <= self.env.reorg_limit: + self.write_undo_info(self.height, b''.join(undo_info)) + + self.headers.extend(headers) + self.tip = self.coin.header_hash(headers[-1]) + + # If caught up, flush everything as client queries are + # performed on the DB. + if self.caught_up_event.is_set(): + self.flush(True) + else: + self.touched.clear() + if time.time() > self.next_cache_check: + self.check_cache_size() + self.next_cache_check = time.time() + 30 + + def advance_txs(self, txs): + self.tx_hashes.append(b''.join(tx_hash for tx, tx_hash in txs)) # Use local vars for speed in the loops + undo_info = [] history = self.history history_size = self.history_size tx_num = self.tx_count @@ -501,6 +518,7 @@ class BlockProcessor(server.db.DB): put_utxo = self.utxo_cache.__setitem__ spend_utxo = self.spend_utxo undo_info_append = undo_info.append + touched = self.touched for tx, tx_hash in txs: hashXs = set() @@ -535,36 +553,33 @@ class BlockProcessor(server.db.DB): return undo_info - def backup_blocks(self, blocks, touched): + def backup_blocks(self, blocks): '''Backup the blocks and flush. The blocks should be in order of decreasing height, starting at. self.height. A flush is performed once the blocks are backed up. ''' self.assert_flushed() + assert self.height >= len(blocks) + coin = self.coin for block in blocks: - txs = self.coin.block_txs(block, self.height) - header_hash = self.coin.header_hash(header) + # Check and update self.tip + header = coin.block_header(block, self.height) + header_hash = coin.header_hash(header) if header_hash != self.tip: raise ChainError('backup block {} is not tip {} at height {:,d}' .format(hash_to_str(header_hash), hash_to_str(self.tip), self.height)) - - self.backup_txs(txs, touched) - self.tip = self.coin.header_prevhash(header) - assert self.height >= 0 + self.tip = coin.header_prevhash(header) + self.backup_txs(coin.block_txs(block, self.height)) self.height -= 1 self.tx_counts.pop() self.logger.info('backed up to height {:,d}'.format(self.height)) + self.backup_flush() - # touched includes those passed into this function. That likely - # has additional addresses which is harmless. Remove None. - touched.discard(None) - self.backup_flush(touched) - - def backup_txs(self, txs, touched): + def backup_txs(self, txs): # Prevout values, in order down the block (coinbase first if present) # undo_info is in reverse block order undo_info = self.read_undo_info(self.height) @@ -578,6 +593,7 @@ class BlockProcessor(server.db.DB): put_utxo = self.utxo_cache.__setitem__ spend_utxo = self.spend_utxo script_hashX = self.coin.hashX_from_script + touched = self.touched undo_entry_len = 12 + self.coin.HASHX_LEN for tx, tx_hash in reversed(txs): diff --git a/server/controller.py b/server/controller.py index 1afe5eb..d1c4462 100644 --- a/server/controller.py +++ b/server/controller.py @@ -49,7 +49,7 @@ class Controller(util.LoggedClass): self.loop = asyncio.get_event_loop() self.start = time.time() self.bp = BlockProcessor(env) - self.mempool = MemPool(self.bp.daemon, env.coin, self.bp) + self.mempool = MemPool(self.bp) self.irc = IRC(env) self.env = env self.servers = {} @@ -172,8 +172,8 @@ class Controller(util.LoggedClass): self.futures.append(asyncio.ensure_future(coro)) # shutdown() assumes bp.main_loop() is first - add_future(self.bp.main_loop(self.mempool.touched)) - add_future(self.bp.prefetcher.main_loop(self.bp.caught_up_event)) + add_future(self.bp.main_loop()) + add_future(self.bp.prefetcher.main_loop()) add_future(self.irc.start(self.bp.caught_up_event)) add_future(self.start_servers(self.bp.caught_up_event)) add_future(self.mempool.main_loop()) @@ -187,8 +187,8 @@ class Controller(util.LoggedClass): await future # Note: future is not one of self.futures except asyncio.CancelledError: break + await self.shutdown() - await asyncio.sleep(1) def close_servers(self, kinds): '''Close the servers of the given kinds (TCP etc.).''' @@ -309,6 +309,7 @@ class Controller(util.LoggedClass): async def shutdown(self): '''Call to shutdown everything. Returns when done.''' self.state = self.SHUTTING_DOWN + self.bp.on_shutdown() self.close_servers(list(self.servers.keys())) # Don't cancel the block processor main loop - let it close itself for future in self.futures[1:]: @@ -559,3 +560,11 @@ class Controller(util.LoggedClass): async def rpc_peers(self, params): return self.irc.peers + + async def rpc_reorg(self, params): + '''Force a reorg of the given number of blocks, 3 by default.''' + count = 3 + if params: + count = JSONRPC.params_to_non_negative_integer(params) + if not self.bp.force_chain_reorg(count): + raise JSONRPC.RPCError('still catching up with daemon') diff --git a/server/mempool.py b/server/mempool.py index e61bf0b..0a0a952 100644 --- a/server/mempool.py +++ b/server/mempool.py @@ -31,12 +31,12 @@ class MemPool(util.LoggedClass): A pair is a (hashX, value) tuple. tx hashes are hex strings. ''' - def __init__(self, daemon, coin, db): + def __init__(self, bp): super().__init__() - self.daemon = daemon - self.coin = coin - self.db = db - self.touched = set() + self.daemon = bp.daemon + self.coin = bp.coin + self.db = bp + self.touched = bp.touched self.touched_event = asyncio.Event() self.prioritized = set() self.stop = False diff --git a/server/session.py b/server/session.py index 60b7a45..2f5d30a 100644 --- a/server/session.py +++ b/server/session.py @@ -496,7 +496,7 @@ class LocalRPC(Session): def __init__(self, *args): super().__init__(*args) - cmds = 'disconnect getinfo groups log peers sessions'.split() + cmds = 'disconnect getinfo groups log peers reorg sessions'.split() self.handlers = {cmd: getattr(self.manager, 'rpc_{}'.format(cmd)) for cmd in cmds} self.client = 'RPC'