From bffdfcc47f79457fb466b1cac34fa42ee58736d8 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Wed, 14 Dec 2016 19:31:08 +0900 Subject: [PATCH 1/4] Prepare 0.9.7 --- RELEASE-NOTES | 7 +++++++ server/version.py | 2 +- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/RELEASE-NOTES b/RELEASE-NOTES index 2dea713..1cff1f0 100644 --- a/RELEASE-NOTES +++ b/RELEASE-NOTES @@ -1,3 +1,10 @@ +version 0.9.7 +------------- + +- history and UTXO requests are now processed by the executor, i.e., + properly asynchronously. This was the last of the potential latency + bottlenecks. + version 0.9.6 ------------- diff --git a/server/version.py b/server/version.py index 9dc1bda..a590391 100644 --- a/server/version.py +++ b/server/version.py @@ -1 +1 @@ -VERSION = "ElectrumX 0.9.6" +VERSION = "ElectrumX 0.9.7" From 1b2a0bd761da0cc7659c863df3089bfaee159bce Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Thu, 15 Dec 2016 12:07:19 +0900 Subject: [PATCH 2/4] Append first. --- server/protocol.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/protocol.py b/server/protocol.py index 4ae820f..95f8922 100644 --- a/server/protocol.py +++ b/server/protocol.py @@ -589,9 +589,9 @@ class Session(JSONRPC): def enqueue_request(self, request): '''Add a request to the session's list.''' - if not self.requests: - self.manager.enqueue_session(self) self.requests.append(request) + if len(self.requests) == 1: + self.manager.enqueue_session(self) async def serve_requests(self): '''Serve requests in batches.''' From e88ea91e89fd4cc8523c189702ae3c3af41a6784 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Wed, 14 Dec 2016 21:38:37 +0900 Subject: [PATCH 3/4] Defer address notifications whilst processing a block Partial fix of #70 --- server/mempool.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/server/mempool.py b/server/mempool.py index 36fa664..35cf702 100644 --- a/server/mempool.py +++ b/server/mempool.py @@ -88,7 +88,8 @@ class MemPool(util.LoggedClass): if unprocessed: await process_some(unprocessed) - if self.touched: + # Avoid double notifications if processing a block + if self.touched and not self.processing_new_block(): self.touched_event.set() if not unprocessed: @@ -101,6 +102,10 @@ class MemPool(util.LoggedClass): except DaemonError as e: self.logger.info('ignoring daemon error: {}'.format(e)) + def processing_new_block(self): + '''Return True if we're processing a new block.''' + return self.daemon.cached_height() > self.db.db_height + async def new_hashes(self, unprocessed, unfetched): '''Get the current list of hashes in the daemon's mempool. From 5fe49bb2612d4fade963e59bbda160910ad8604b Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Thu, 15 Dec 2016 09:57:03 +0900 Subject: [PATCH 4/4] Synchronize daemon height and mempool fetching Cleanup and simplify touched handling and its event, which is now controlled and owned by the mempool. The daemon object owns the set of current mempool hashes. Clean up and simplify the mempool main loop. Fixes #70. --- RELEASE-NOTES | 6 ++ server/block_processor.py | 21 +++--- server/daemon.py | 9 ++- server/mempool.py | 148 ++++++++++++++++++-------------------- server/protocol.py | 19 +++-- server/version.py | 2 +- 6 files changed, 100 insertions(+), 105 deletions(-) diff --git a/RELEASE-NOTES b/RELEASE-NOTES index 1cff1f0..859abc7 100644 --- a/RELEASE-NOTES +++ b/RELEASE-NOTES @@ -1,3 +1,9 @@ +version 0.9.8 +------------- + +- cleanup up mempool handling, notify of addresses only once when a new block + comes in. Fixes issue 70. + version 0.9.7 ------------- diff --git a/server/block_processor.py b/server/block_processor.py index a6b6cab..7d364a4 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -85,9 +85,13 @@ class Prefetcher(LoggedClass): async def _prefetch(self): '''Prefetch blocks unless the prefetch queue is full.''' + # Refresh the mempool after updating the daemon height, if and + # only if we've caught up daemon_height = await self.daemon.height() - cache_room = self.target_cache_size // self.ave_size + if self.caught_up: + await self.daemon.refresh_mempool_hashes() + cache_room = self.target_cache_size // self.ave_size with await self.semaphore: # Try and catch up all blocks but limit to room in cache. # Constrain count to between 0 and 4000 regardless @@ -132,7 +136,7 @@ class BlockProcessor(server.db.DB): Coordinate backing up in case of chain reorganisations. ''' - def __init__(self, env, touched, touched_event): + def __init__(self, env): super().__init__(env) # The block processor reads its tasks from this queue @@ -149,8 +153,6 @@ class BlockProcessor(server.db.DB): self.caught_up = False self._shutdown = False self.event = asyncio.Event() - self.touched = touched - self.touched_event = touched_event # Meta self.utxo_MB = env.utxo_MB @@ -180,7 +182,7 @@ class BlockProcessor(server.db.DB): self.logger.info('flushing history cache at {:,d} MB' .format(self.hist_MB)) - async def main_loop(self): + async def main_loop(self, touched): '''Main loop for block processing.''' # Simulate a reorg if requested @@ -195,7 +197,7 @@ class BlockProcessor(server.db.DB): break blocks = self.prefetcher.get_blocks() if blocks: - await self.advance_blocks(blocks) + await self.advance_blocks(blocks, touched) elif not self.caught_up: self.caught_up = True self.first_caught_up() @@ -209,7 +211,7 @@ class BlockProcessor(server.db.DB): self._shutdown = True self.tasks.put_nowait(None) - async def advance_blocks(self, blocks): + async def advance_blocks(self, blocks, touched): '''Strip the unspendable genesis coinbase.''' if self.height == -1: blocks[0] = blocks[0][:self.coin.HEADER_LEN] + bytes(1) @@ -218,7 +220,7 @@ class BlockProcessor(server.db.DB): for block in blocks: if self._shutdown: break - self.advance_block(block, self.touched) + self.advance_block(block, touched) loop = asyncio.get_event_loop() try: @@ -227,14 +229,13 @@ class BlockProcessor(server.db.DB): else: do_it() except ChainReorg: - await self.handle_chain_reorg(self.touched) + await self.handle_chain_reorg(touched) if self.caught_up: # Flush everything as queries are performed on the DB and # not in-memory. await asyncio.sleep(0) self.flush(True) - self.touched_event.set() elif time.time() > self.next_cache_check: self.check_cache_size() self.next_cache_check = time.time() + 60 diff --git a/server/daemon.py b/server/daemon.py index cee7717..42897b3 100644 --- a/server/daemon.py +++ b/server/daemon.py @@ -36,6 +36,8 @@ class Daemon(util.LoggedClass): self.urls = urls self.url_index = 0 self._height = None + self.mempool_hashes = set() + self.mempool_refresh_event = asyncio.Event() # Limit concurrent RPC calls to this number. # See DEFAULT_HTTP_WORKQUEUE in bitcoind, which is typically 16 self.workqueue_semaphore = asyncio.Semaphore(value=10) @@ -148,9 +150,10 @@ class Daemon(util.LoggedClass): # Convert hex string to bytes return [bytes.fromhex(block) for block in blocks] - async def mempool_hashes(self): - '''Return the hashes of the txs in the daemon's mempool.''' - return await self._send_single('getrawmempool') + async def refresh_mempool_hashes(self): + '''Update our record of the daemon's mempool hashes.''' + self.mempool_hashes = set(await self._send_single('getrawmempool')) + self.mempool_refresh_event.set() async def estimatefee(self, params): '''Return the fee estimate for the given parameters.''' diff --git a/server/mempool.py b/server/mempool.py index 35cf702..0984154 100644 --- a/server/mempool.py +++ b/server/mempool.py @@ -33,92 +33,31 @@ class MemPool(util.LoggedClass): A pair is a (hash168, value) tuple. tx hashes are hex strings. ''' - def __init__(self, daemon, coin, db, touched, touched_event): + def __init__(self, daemon, coin, db): super().__init__() self.daemon = daemon self.coin = coin self.db = db - self.touched = touched - self.touched_event = touched_event + self.touched = set() + self.touched_event = asyncio.Event() self.stop = False self.txs = {} self.hash168s = defaultdict(set) # None can be a key - async def main_loop(self, caught_up): - '''Asynchronously maintain mempool status with daemon. - - Waits until the caught up event is signalled.''' - await caught_up.wait() - self.logger.info('beginning processing of daemon mempool. ' - 'This can take some time...') - try: - await self.fetch_and_process() - except asyncio.CancelledError: - # This aids clean shutdowns - self.stop = True - - async def fetch_and_process(self): - '''The inner loop unprotected by try / except.''' - unfetched = set() - unprocessed = {} - log_every = 150 - log_secs = 0 - fetch_size = 400 - process_some = self.async_process_some(unfetched, fetch_size // 2) - next_refresh = 0 - # The list of mempool hashes is fetched no more frequently - # than this number of seconds - refresh_secs = 5 - - while True: - try: - now = time.time() - if now >= next_refresh: - await self.new_hashes(unprocessed, unfetched) - next_refresh = now + refresh_secs - log_secs -= refresh_secs - - # Fetch some txs if unfetched ones remain - if unfetched: - count = min(len(unfetched), fetch_size) - hex_hashes = [unfetched.pop() for n in range(count)] - unprocessed.update(await self.fetch_raw_txs(hex_hashes)) - - # Process some txs if unprocessed ones remain - if unprocessed: - await process_some(unprocessed) + def resync_daemon_hashes(self, unprocessed, unfetched): + '''Re-sync self.txs with the list of hashes in the daemon's mempool. - # Avoid double notifications if processing a block - if self.touched and not self.processing_new_block(): - self.touched_event.set() - - if not unprocessed: - if log_secs <= 0: - log_secs = log_every - self.logger.info('{:,d} txs touching {:,d} addresses' - .format(len(self.txs), - len(self.hash168s))) - await asyncio.sleep(1) - except DaemonError as e: - self.logger.info('ignoring daemon error: {}'.format(e)) - - def processing_new_block(self): - '''Return True if we're processing a new block.''' - return self.daemon.cached_height() > self.db.db_height - - async def new_hashes(self, unprocessed, unfetched): - '''Get the current list of hashes in the daemon's mempool. - - Remove ones that have disappeared from self.txs and unprocessed. + Additionally, remove gone hashes from unprocessed and + unfetched. Add new ones to unprocessed. ''' txs = self.txs hash168s = self.hash168s touched = self.touched - hashes = set(await self.daemon.mempool_hashes()) - new = hashes.difference(txs) + hashes = self.daemon.mempool_hashes gone = set(txs).difference(hashes) for hex_hash in gone: + unfetched.discard(hex_hash) unprocessed.pop(hex_hash, None) item = txs.pop(hex_hash) if item: @@ -131,18 +70,71 @@ class MemPool(util.LoggedClass): del hash168s[hash168] touched.update(tx_hash168s) + new = hashes.difference(txs) unfetched.update(new) for hex_hash in new: txs[hex_hash] = None + async def main_loop(self): + '''Asynchronously maintain mempool status with daemon. + + Processes the mempool each time the daemon's mempool refresh + event is signalled. + ''' + unprocessed = {} + unfetched = set() + txs = self.txs + fetch_size = 400 + process_some = self.async_process_some(unfetched, fetch_size // 2) + + await self.daemon.mempool_refresh_event.wait() + self.logger.info ('beginning processing of daemon mempool. ' + 'This can take some time...') + next_log = time.time() + 0.1 + + while True: + try: + todo = len(unfetched) + len(unprocessed) + if todo: + pct = (len(txs) - todo) * 100 // len(txs) if txs else 0 + self.logger.info('catchup {:d}% complete ({:,d} txs left)' + .format(pct, todo)) + else: + now = time.time() + if now >= next_log: + self.logger.info('{:,d} txs touching {:,d} addresses' + .format(len(txs), len(self.hash168s))) + next_log = now + 150 + await self.daemon.mempool_refresh_event.wait() + + self.resync_daemon_hashes(unprocessed, unfetched) + self.daemon.mempool_refresh_event.clear() + + if unfetched: + count = min(len(unfetched), fetch_size) + hex_hashes = [unfetched.pop() for n in range(count)] + unprocessed.update(await self.fetch_raw_txs(hex_hashes)) + + if unprocessed: + await process_some(unprocessed) + + # Avoid double notifications if processing a block + if self.touched and not self.processing_new_block(): + self.touched_event.set() + except DaemonError as e: + self.logger.info('ignoring daemon error: {}'.format(e)) + except asyncio.CancelledError: + # This aids clean shutdowns + self.stop = True + break + def async_process_some(self, unfetched, limit): loop = asyncio.get_event_loop() pending = [] txs = self.txs - first = True async def process(unprocessed): - nonlocal first, pending + nonlocal pending raw_txs = {} while unprocessed and len(raw_txs) < limit: @@ -169,16 +161,12 @@ class MemPool(util.LoggedClass): touched.add(hash168) hash168s[hash168].add(hex_hash) - to_do = len(unfetched) + len(unprocessed) - if to_do and txs: - percent = max(0, len(txs) - to_do) * 100 // len(txs) - self.logger.info('catchup {:d}% complete'.format(percent)) - elif first: - first = False - self.logger.info('caught up') - return process + def processing_new_block(self): + '''Return True if we're processing a new block.''' + return self.daemon.cached_height() > self.db.db_height + async def fetch_raw_txs(self, hex_hashes): '''Fetch a list of mempool transactions.''' raw_txs = await self.daemon.getrawtransactions(hex_hashes) diff --git a/server/protocol.py b/server/protocol.py index 95f8922..9f054b5 100644 --- a/server/protocol.py +++ b/server/protocol.py @@ -53,12 +53,9 @@ class ServerManager(util.LoggedClass): def __init__(self, env): super().__init__() self.loop = asyncio.get_event_loop() - self.touched = set() - self.touched_event = asyncio.Event() self.start = time.time() - self.bp = BlockProcessor(env, self.touched, self.touched_event) - self.mempool = MemPool(self.bp.daemon, env.coin, self.bp, - self.touched, self.touched_event) + self.bp = BlockProcessor(env) + self.mempool = MemPool(self.bp.daemon, env.coin, self.bp) self.irc = IRC(env) self.env = env self.servers = {} @@ -178,11 +175,11 @@ class ServerManager(util.LoggedClass): self.futures.append(asyncio.ensure_future(coro)) # shutdown() assumes bp.main_loop() is first - add_future(self.bp.main_loop()) + add_future(self.bp.main_loop(self.mempool.touched)) add_future(self.bp.prefetcher.main_loop()) add_future(self.irc.start(self.bp.event)) add_future(self.start_servers(self.bp.event)) - add_future(self.mempool.main_loop(self.bp.event)) + add_future(self.mempool.main_loop()) add_future(self.enqueue_delayed_sessions()) add_future(self.notify()) for n in range(4): @@ -245,10 +242,10 @@ class ServerManager(util.LoggedClass): async def notify(self): '''Notify sessions about height changes and touched addresses.''' while True: - await self.touched_event.wait() - touched = self.touched.copy() - self.touched.clear() - self.touched_event.clear() + await self.mempool.touched_event.wait() + touched = self.mempool.touched.copy() + self.mempool.touched.clear() + self.mempool.touched_event.clear() # Invalidate caches hc = self.history_cache diff --git a/server/version.py b/server/version.py index a590391..55a9e55 100644 --- a/server/version.py +++ b/server/version.py @@ -1 +1 @@ -VERSION = "ElectrumX 0.9.7" +VERSION = "ElectrumX 0.9.8"