diff --git a/electrumx/server/block_processor.py b/electrumx/server/block_processor.py index c37eec2..de79440 100644 --- a/electrumx/server/block_processor.py +++ b/electrumx/server/block_processor.py @@ -626,8 +626,6 @@ class BlockProcessor(object): if first_sync: self.logger.info(f'{electrumx.version} synced to ' f'height {self.height:,d}') - # Initialise the notification framework - await self.notifications.on_block(set(), self.height) # Reopen for serving await self.db.open_for_serving() diff --git a/electrumx/server/controller.py b/electrumx/server/controller.py index 41b8055..9f2006d 100644 --- a/electrumx/server/controller.py +++ b/electrumx/server/controller.py @@ -33,8 +33,7 @@ class Notifications(object): def __init__(self): self._touched_mp = {} self._touched_bp = {} - self._highest_block = 0 - self._notify_funcs = [] + self._highest_block = -1 async def _maybe_notify(self): tmp, tbp = self._touched_mp, self._touched_bp @@ -54,11 +53,15 @@ class Notifications(object): del tmp[old] for old in [h for h in tbp if h <= height]: del tbp[old] - for notify_func in self._notify_funcs: - await notify_func(height, touched) + await self.notify(height, touched) - def add_callback(self, notify_func): - self._notify_funcs.append(notify_func) + async def notify(self, height, touched): + pass + + async def start(self, height, notify_func): + self._highest_block = height + self.notify = notify_func + await self.notify(height, set()) async def on_mempool(self, touched, height): self._touched_mp[height] = touched @@ -99,18 +102,17 @@ class Controller(ServerBase): db = DB(env) bp = BlockProcessor(env, db, daemon, notifications) - # Set ourselves up to implement the MemPoolAPI - self.height = daemon.height - self.cached_height = daemon.cached_height - self.mempool_hashes = daemon.mempool_hashes - self.raw_transactions = daemon.getrawtransactions - self.lookup_utxos = db.lookup_utxos - self.on_mempool = notifications.on_mempool - MemPoolAPI.register(Controller) - mempool = MemPool(env.coin, self) + # Set notifications up to implement the MemPoolAPI + notifications.height = daemon.height + notifications.cached_height = daemon.cached_height + notifications.mempool_hashes = daemon.mempool_hashes + notifications.raw_transactions = daemon.getrawtransactions + notifications.lookup_utxos = db.lookup_utxos + MemPoolAPI.register(Notifications) + mempool = MemPool(env.coin, notifications) session_mgr = SessionManager(env, db, bp, daemon, mempool, - notifications, shutdown_event) + shutdown_event) # Test daemon authentication, and also ensure it has a cached # height. Do this before entering the task group. @@ -120,7 +122,8 @@ class Controller(ServerBase): serve_externally_event = Event() synchronized_event = Event() async with TaskGroup() as group: - await group.spawn(session_mgr.serve(serve_externally_event)) + await group.spawn(session_mgr.serve(notifications, + serve_externally_event)) await group.spawn(bp.fetch_and_process_blocks(caught_up_event)) await caught_up_event.wait() await group.spawn(db.populate_header_merkle_cache()) diff --git a/electrumx/server/session.py b/electrumx/server/session.py index a43843e..f7ade96 100644 --- a/electrumx/server/session.py +++ b/electrumx/server/session.py @@ -108,8 +108,7 @@ class SessionGroup(object): class SessionManager(object): '''Holds global state about all sessions.''' - def __init__(self, env, db, bp, daemon, mempool, notifications, - shutdown_event): + def __init__(self, env, db, bp, daemon, mempool, shutdown_event): env.max_send = max(350000, env.max_send) self.env = env self.db = db @@ -136,8 +135,6 @@ class SessionManager(object): # Event triggered when electrumx is listening for incoming requests. self.server_listening = Event() self.session_event = Event() - # Tell sessions about subscription changes - notifications.add_callback(self._notify_sessions) # Set up the RPC request handlers cmds = ('add_peer daemon_url disconnect getinfo groups log peers ' @@ -346,6 +343,8 @@ class SessionManager(object): '''Refresh the cached header subscription responses to be for height, and record that as notified_height. ''' + # Paranoia: a reorg could race and leave db_height lower + height = min(height, self.db.db_height) electrum, raw = await self._electrum_and_raw_headers(height) self.hsub_results = (electrum, {'hex': raw.hex(), 'height': height}) self.notified_height = height @@ -477,7 +476,7 @@ class SessionManager(object): # --- External Interface - async def serve(self, event): + async def serve(self, notifications, event): '''Start the RPC server if enabled. When the event is triggered, start TCP and SSL servers.''' try: @@ -499,6 +498,8 @@ class SessionManager(object): if self.env.drop_client is not None: self.logger.info('drop clients matching: {}' .format(self.env.drop_client.pattern)) + # Start notifications; initialize hsub_results + await notifications.start(self.db.db_height, self._notify_sessions) await self._start_external_servers() # Peer discovery should start after the external servers # because we connect to ourself @@ -559,8 +560,7 @@ class SessionManager(object): '''Notify sessions about height changes and touched addresses.''' height_changed = height != self.notified_height if height_changed: - # Paranoia: a reorg could race and leave db_height lower - await self._refresh_hsub_results(min(height, self.db.db_height)) + await self._refresh_hsub_results(height) # Invalidate our history cache for touched hashXs hc = self.history_cache for hashX in set(hc).intersection(touched):