diff --git a/server/block_processor.py b/server/block_processor.py index 4b9fcda..157d226 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -29,6 +29,7 @@ class Prefetcher(LoggedClass): def __init__(self, bp): super().__init__() self.bp = bp + self.caught_up = False # Access to fetched_height should be protected by the semaphore self.fetched_height = None self.semaphore = asyncio.Semaphore() @@ -83,14 +84,7 @@ class Prefetcher(LoggedClass): Repeats until the queue is full or caught up. ''' daemon = self.bp.daemon - # If caught up, refresh the mempool before the current height - caught_up = self.bp.caught_up_event.is_set() - if caught_up: - mempool = await daemon.mempool_hashes() - else: - mempool = [] - - daemon_height = await daemon.height() + daemon_height = await daemon.height(self.bp.caught_up_event.is_set()) with await self.semaphore: while self.cache_size < self.min_cache_size: # Try and catch up all blocks but limit to room in cache. @@ -100,15 +94,14 @@ class Prefetcher(LoggedClass): count = min(daemon_height - self.fetched_height, cache_room) count = min(500, max(count, 0)) if not count: - if caught_up: - self.bp.set_mempool_hashes(mempool) - else: + if not self.caught_up: + self.caught_up = True self.bp.on_prefetcher_first_caught_up() return False first = self.fetched_height + 1 hex_hashes = await daemon.block_hex_hashes(first, count) - if caught_up: + if self.caught_up: self.logger.info('new block height {:,d} hash {}' .format(first + count-1, hex_hashes[-1])) blocks = await daemon.raw_blocks(hex_hashes) @@ -128,7 +121,7 @@ class Prefetcher(LoggedClass): else: self.ave_size = (size + (10 - count) * self.ave_size) // 10 - self.bp.on_prefetched_blocks(blocks, first, mempool) + self.bp.on_prefetched_blocks(blocks, first) self.cache_size += size self.fetched_height += count @@ -195,10 +188,9 @@ class BlockProcessor(server.db.DB): '''Add the task to our task queue.''' self.task_queue.put_nowait(task) - def on_prefetched_blocks(self, blocks, first, mempool): + def on_prefetched_blocks(self, blocks, first): '''Called by the prefetcher when it has prefetched some blocks.''' - self.add_task(partial(self.check_and_advance_blocks, blocks, first, - mempool)) + self.add_task(partial(self.check_and_advance_blocks, blocks, first)) def on_prefetcher_first_caught_up(self): '''Called by the prefetcher when it first catches up.''' @@ -233,10 +225,7 @@ class BlockProcessor(server.db.DB): self.open_dbs() self.caught_up_event.set() - def set_mempool_hashes(self, mempool): - self.controller.mempool.set_hashes(mempool) - - async def check_and_advance_blocks(self, blocks, first, mempool): + async def check_and_advance_blocks(self, blocks, first): '''Process the list of blocks passed. Detects and handles reorgs.''' self.prefetcher.processing_blocks(blocks) if first != self.height + 1: @@ -262,7 +251,6 @@ class BlockProcessor(server.db.DB): self.logger.info('processed {:,d} block{} in {:.1f}s' .format(len(blocks), s, time.time() - start)) - self.set_mempool_hashes(mempool) elif hprevs[0] != chain[0]: await self.reorg_chain() else: diff --git a/server/daemon.py b/server/daemon.py index 23cebbf..5e9b311 100644 --- a/server/daemon.py +++ b/server/daemon.py @@ -38,6 +38,8 @@ class Daemon(util.LoggedClass): super().__init__() self.set_urls(urls) self._height = None + self._mempool_hashes = set() + self.mempool_refresh_event = asyncio.Event() # Limit concurrent RPC calls to this number. # See DEFAULT_HTTP_WORKQUEUE in bitcoind, which is typically 16 self.workqueue_semaphore = asyncio.Semaphore(value=10) @@ -208,7 +210,7 @@ class Daemon(util.LoggedClass): return [bytes.fromhex(block) for block in blocks] async def mempool_hashes(self): - '''Return a list of the daemon's mempool hashes.''' + '''Update our record of the daemon's mempool hashes.''' return await self._send_single('getrawmempool') async def estimatefee(self, params): @@ -243,11 +245,18 @@ class Daemon(util.LoggedClass): '''Broadcast a transaction to the network.''' return await self._send_single('sendrawtransaction', params) - async def height(self): + async def height(self, mempool=False): '''Query the daemon for its current height.''' self._height = await self._send_single('getblockcount') + if mempool: + self._mempool_hashes = set(await self.mempool_hashes()) + self.mempool_refresh_event.set() return self._height + def cached_mempool_hashes(self): + '''Return the cached mempool hashes.''' + return self._mempool_hashes + def cached_height(self): '''Return the cached daemon height. diff --git a/server/mempool.py b/server/mempool.py index 075c29d..0a6c27b 100644 --- a/server/mempool.py +++ b/server/mempool.py @@ -37,8 +37,6 @@ class MemPool(util.LoggedClass): self.controller = controller self.coin = bp.coin self.db = bp - self.hashes = set() - self.mempool_refresh_event = asyncio.Event() self.touched = bp.touched self.touched_event = asyncio.Event() self.prioritized = set() @@ -51,11 +49,6 @@ class MemPool(util.LoggedClass): initial mempool sync.''' self.prioritized.add(tx_hash) - def set_hashes(self, hashes): - '''Save the list of mempool hashes.''' - self.hashes = set(hashes) - self.mempool_refresh_event.set() - def resync_daemon_hashes(self, unprocessed, unfetched): '''Re-sync self.txs with the list of hashes in the daemon's mempool. @@ -66,7 +59,8 @@ class MemPool(util.LoggedClass): hashXs = self.hashXs touched = self.touched - gone = set(txs).difference(self.hashes) + hashes = self.daemon.cached_mempool_hashes() + gone = set(txs).difference(hashes) for hex_hash in gone: unfetched.discard(hex_hash) unprocessed.pop(hex_hash, None) @@ -81,7 +75,7 @@ class MemPool(util.LoggedClass): del hashXs[hashX] touched.update(tx_hashXs) - new = self.hashes.difference(txs) + new = hashes.difference(txs) unfetched.update(new) for hex_hash in new: txs[hex_hash] = None @@ -98,14 +92,15 @@ class MemPool(util.LoggedClass): fetch_size = 800 process_some = self.async_process_some(unfetched, fetch_size // 2) - await self.mempool_refresh_event.wait() + await self.daemon.mempool_refresh_event.wait() self.logger.info('beginning processing of daemon mempool. ' 'This can take some time...') next_log = 0 loops = -1 # Zero during initial catchup while True: - if self.touched: + # Avoid double notifications if processing a block + if self.touched and not self.processing_new_block(): self.touched_event.set() # Log progress / state @@ -125,10 +120,10 @@ class MemPool(util.LoggedClass): try: if not todo: self.prioritized.clear() - await self.mempool_refresh_event.wait() + await self.daemon.mempool_refresh_event.wait() self.resync_daemon_hashes(unprocessed, unfetched) - self.mempool_refresh_event.clear() + self.daemon.mempool_refresh_event.clear() if unfetched: count = min(len(unfetched), fetch_size) @@ -182,6 +177,10 @@ class MemPool(util.LoggedClass): return process + def processing_new_block(self): + '''Return True if we're processing a new block.''' + return self.daemon.cached_height() > self.db.db_height + async def fetch_raw_txs(self, hex_hashes): '''Fetch a list of mempool transactions.''' raw_txs = await self.daemon.getrawtransactions(hex_hashes)