From b7572ce9c0c5cc53ab345af1d36aa6eacebbcf0a Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Wed, 18 Jul 2018 09:12:44 +0800 Subject: [PATCH] More refactoring of controller - remove the header cache - not needed for higher protocol versions - simplify session notification; move to session manager - move history cache to session manager --- electrumx/server/controller.py | 42 ---------------------------------- electrumx/server/mempool.py | 5 ++-- electrumx/server/session.py | 42 ++++++++++++++++++++++++++++++---- 3 files changed, 40 insertions(+), 49 deletions(-) diff --git a/electrumx/server/controller.py b/electrumx/server/controller.py index 310d6fa..16b080a 100644 --- a/electrumx/server/controller.py +++ b/electrumx/server/controller.py @@ -45,9 +45,6 @@ class Controller(ServerBase): self.coin = env.coin self.tasks = TaskSet() - self.history_cache = pylru.lrucache(256) - self.header_cache = pylru.lrucache(8) - self.cache_height = 0 self.cache_mn_height = 0 self.mn_cache = pylru.lrucache(256) env.max_send = max(350000, env.max_send) @@ -127,48 +124,9 @@ class Controller(ServerBase): self.create_task(self.session_mgr.start_serving()) self.create_task(self.session_mgr.housekeeping()) - def notify_sessions(self, touched): - '''Notify sessions about height changes and touched addresses.''' - # Invalidate caches - hc = self.history_cache - for hashX in set(hc).intersection(touched): - del hc[hashX] - - height = self.bp.db_height - if height != self.cache_height: - self.cache_height = height - self.header_cache.clear() - - self.session_mgr.notify(height, touched) - def raw_header(self, height): '''Return the binary header at the given height.''' header, n = self.bp.read_headers(height, 1) if n != 1: raise RPCError(BAD_REQUEST, f'height {height:,d} out of range') return header - - def electrum_header(self, height): - '''Return the deserialized header at the given height.''' - if height not in self.header_cache: - raw_header = self.raw_header(height) - self.header_cache[height] = self.coin.electrum_header(raw_header, - height) - return self.header_cache[height] - - async def get_history(self, hashX): - '''Get history asynchronously to reduce latency.''' - if hashX in self.history_cache: - return self.history_cache[hashX] - - def job(): - # History DoS limit. Each element of history is about 99 - # bytes when encoded as JSON. This limits resource usage - # on bloated history requests, and uses a smaller divisor - # so large requests are logged before refusing them. - limit = self.env.max_send // 97 - return list(self.bp.get_history(hashX, limit=limit)) - - history = await self.run_in_executor(job) - self.history_cache[hashX] = history - return history diff --git a/electrumx/server/mempool.py b/electrumx/server/mempool.py index 538fc64..2ff9209 100644 --- a/electrumx/server/mempool.py +++ b/electrumx/server/mempool.py @@ -36,6 +36,7 @@ class MemPool(object): self.logger = class_logger(__name__, self.__class__.__name__) self.daemon = bp.daemon self.controller = controller + self.notify_sessions = controller.session_mgr.notify_sessions self.coin = bp.coin self.db = bp self.touched = set() @@ -104,7 +105,7 @@ class MemPool(object): while True: # Avoid double notifications if processing a block if self.touched and not self.processing_new_block(): - self.controller.notify_sessions(self.touched) + self.notify_sessions(self.touched) self.touched.clear() # Log progress / state @@ -192,7 +193,7 @@ class MemPool(object): # Minor race condition here with mempool processor thread touched.update(self.touched) self.touched.clear() - self.controller.notify_sessions(touched) + self.notify_sessions(touched) def processing_new_block(self): '''Return True if we're processing a new block.''' diff --git a/electrumx/server/session.py b/electrumx/server/session.py index 3267b8b..b3f124a 100644 --- a/electrumx/server/session.py +++ b/electrumx/server/session.py @@ -18,6 +18,7 @@ import time from collections import defaultdict from functools import partial +import pylru from aiorpcx import ServerSession, JSONRPCAutoDetect, RPCError import electrumx @@ -111,6 +112,7 @@ class SessionManager(object): self.state = self.CATCHING_UP self.txs_sent = 0 self.start_time = time.time() + self.history_cache = pylru.lrucache(256) # Cache some idea of room to avoid recounting on each subscription self.subs_room = 0 # Event triggered when electrumx is listening for incoming requests. @@ -400,7 +402,14 @@ class SessionManager(object): '''The number of connections that we've sent something to.''' return len(self.sessions) - def notify(self, height, touched): + def notify_sessions(self, touched): + '''Notify sessions about height changes and touched addresses.''' + height = self.controller.bp.db_height + # Invalidate caches + hc = self.history_cache + for hashX in set(hc).intersection(touched): + del hc[hashX] + # Height notifications are synchronous. Those sessions with # touched addresses are scheduled for asynchronous completion create_task = self.controller.create_task @@ -411,6 +420,24 @@ class SessionManager(object): if session_touched is not None: create_task(session.notify_async(session_touched)) + async def get_history(self, hashX): + '''Get history asynchronously to reduce latency.''' + if hashX in self.history_cache: + return self.history_cache[hashX] + + controller = self.controller + def job(): + # History DoS limit. Each element of history is about 99 + # bytes when encoded as JSON. This limits resource usage + # on bloated history requests, and uses a smaller divisor + # so large requests are logged before refusing them. + limit = self.env.max_send // 97 + return list(controller.bp.get_history(hashX, limit=limit)) + + history = await controller.run_in_executor(job) + self.history_cache[hashX] = history + return history + async def housekeeping(self): '''Regular housekeeping checks.''' n = 0 @@ -672,12 +699,17 @@ class ElectrumX(SessionBase): return value raise RPCError(BAD_REQUEST, f'{value} should be a boolean value') + def electrum_header(self, height): + '''Return the deserialized header at the given height.''' + raw_header = self.controller.raw_header(height) + return self.coin.electrum_header(raw_header, height) + def subscribe_headers_result(self, height): '''The result of a header subscription for the given height.''' if self.subscribe_headers_raw: raw_header = self.controller.raw_header(height) return {'hex': raw_header.hex(), 'height': height} - return self.controller.electrum_header(height) + return self.electrum_header(height) def _headers_subscribe(self, raw): '''Subscribe to get headers of new blocks.''' @@ -714,7 +746,7 @@ class ElectrumX(SessionBase): ''' # Note history is ordered and mempool unordered in electrum-server # For mempool, height is -1 if unconfirmed txins, otherwise 0 - history = await self.controller.get_history(hashX) + history = await self.session_mgr.get_history(hashX) mempool = await self.controller.mempool_transactions(hashX) status = ''.join('{}:{:d}:'.format(hash_to_hex_str(tx_hash), height) @@ -819,7 +851,7 @@ class ElectrumX(SessionBase): async def confirmed_and_unconfirmed_history(self, hashX): # Note history is ordered but unconfirmed is unordered in e-s - history = await self.controller.get_history(hashX) + history = await self.session_mgr.get_history(hashX) conf = [{'tx_hash': hash_to_hex_str(tx_hash), 'height': height} for tx_hash, height in history] return conf + await self.unconfirmed_history(hashX) @@ -915,7 +947,7 @@ class ElectrumX(SessionBase): height: the header's height''' height = non_negative_integer(height) - return self.controller.electrum_header(height) + return self.electrum_header(height) def is_tor(self): '''Try to detect if the connection is to a tor hidden service we are