diff --git a/electrumx/server/session.py b/electrumx/server/session.py index 5040aeb..3781988 100644 --- a/electrumx/server/session.py +++ b/electrumx/server/session.py @@ -129,8 +129,8 @@ class SessionManager(object): self.state = self.CATCHING_UP self.txs_sent = 0 self.start_time = time.time() - self._history_cache = pylru.lrucache(256) - self._hc_height = 0 + self.history_cache = pylru.lrucache(256) + self.notified_height = None # Cache some idea of room to avoid recounting on each subscription self.subs_room = 0 # Masternode stuff only for such coins @@ -332,6 +332,19 @@ class SessionManager(object): ]) return result + async def _electrum_and_raw_headers(self, height): + raw_header = await self.raw_header(height) + electrum_header = self.env.coin.electrum_header(raw_header, height) + return electrum_header, raw_header + + async def _refresh_hsub_results(self, height): + '''Refresh the cached header subscription responses to be for height, + and record that as notified_height. + ''' + electrum, raw = await self._electrum_and_raw_headers(height) + self.hsub_results = (electrum, {'hex': raw.hex(), 'height': height}) + self.notified_height = height + # --- LocalRPC command handlers async def rpc_add_peer(self, real_name): @@ -518,8 +531,8 @@ class SessionManager(object): async def electrum_header(self, height): '''Return the deserialized header at the given height.''' - raw_header = await self.raw_header(height) - return self.env.coin.electrum_header(raw_header, height) + electrum_header, _ = await self._electrum_and_raw_headers(height) + return electrum_header async def broadcast_transaction(self, raw_tx): hex_hash = await self.daemon.sendrawtransaction([raw_tx]) @@ -528,7 +541,7 @@ class SessionManager(object): async def limited_history(self, hashX): '''A caching layer.''' - hc = self._history_cache + hc = self.history_cache if hashX not in hc: # History DoS limit. Each element of history is about 99 # bytes when encoded as JSON. This limits resource usage @@ -540,16 +553,18 @@ class SessionManager(object): async def _notify_sessions(self, height, touched): '''Notify sessions about height changes and touched addresses.''' - # Invalidate our history cache for touched hashXs - if height != self._hc_height: - self._hc_height = height - hc = self._history_cache + height_changed = height != self.notified_height + if height_changed: + # Paranoia: a reorg could race and leave db_height lower + await self._refresh_hsub_results(min(height, self.db.db_height)) + # Invalidate our history cache for touched hashXs + hc = self.history_cache for hashX in set(hc).intersection(touched): del hc[hashX] async with TaskGroup() as group: for session in self.sessions: - await group.spawn(session.notify(height, touched)) + await group.spawn(session.notify(touched, height_changed)) def add_session(self, session): self.sessions.add(session) @@ -610,7 +625,7 @@ class SessionBase(ServerSession): self._receive_message_orig = self.connection.receive_message self.connection.receive_message = self.receive_message - async def notify(self, height, touched): + async def notify(self, touched, height_changed): pass def peer_address_str(self, *, for_log=True): @@ -695,7 +710,6 @@ class ElectrumX(SessionBase): super().__init__(*args, **kwargs) self.subscribe_headers = False self.subscribe_headers_raw = False - self.notified_height = None self.connection.max_response_size = self.env.max_send self.max_subs = self.env.max_session_subs self.hashX_subs = {} @@ -736,73 +750,55 @@ class ElectrumX(SessionBase): def sub_count(self): return len(self.hashX_subs) - async def notify_touched(self, our_touched): - changed = {} - - for hashX in our_touched: - alias = self.hashX_subs[hashX] - status = await self.address_status(hashX) - changed[alias] = status - - # Check mempool hashXs - the status is a function of the - # confirmed state of other transactions. Note: we cannot - # iterate over mempool_statuses as it changes size. - for hashX in tuple(self.mempool_statuses): - # Items can be evicted whilst await-ing below; False - # ensures such hashXs are notified - old_status = self.mempool_statuses.get(hashX, False) - status = await self.address_status(hashX) - if status != old_status: - alias = self.hashX_subs[hashX] - changed[alias] = status - - for alias, status in changed.items(): - if len(alias) == 64: - method = 'blockchain.scripthash.subscribe' - else: - method = 'blockchain.address.subscribe' - await self.send_notification(method, (alias, status)) - - if changed: - es = '' if len(changed) == 1 else 'es' - self.logger.info('notified of {:,d} address{}' - .format(len(changed), es)) - - async def notify(self, height, touched): + async def notify(self, touched, height_changed): '''Notify the client about changes to touched addresses (from mempool updates or new blocks) and height. - - Return the set of addresses the session needs to be - asyncronously notified about. This can be empty if there are - possible mempool status updates. - - Returns None if nothing needs to be notified asynchronously. ''' - height_changed = height != self.notified_height - if height_changed: - self.notified_height = height - if self.subscribe_headers: - args = (await self.subscribe_headers_result(height), ) - await self.send_notification('blockchain.headers.subscribe', - args) + if height_changed and self.subscribe_headers: + args = (await self.subscribe_headers_result(), ) + await self.send_notification('blockchain.headers.subscribe', args) touched = touched.intersection(self.hashX_subs) if touched or (height_changed and self.mempool_statuses): - await self.notify_touched(touched) + changed = {} - async def subscribe_headers_result(self, height): - '''The result of a header subscription for the given height.''' - if self.subscribe_headers_raw: - raw_header = await self.session_mgr.raw_header(height) - return {'hex': raw_header.hex(), 'height': height} - return await self.session_mgr.electrum_header(height) + for hashX in touched: + alias = self.hashX_subs[hashX] + status = await self.address_status(hashX) + changed[alias] = status + + # Check mempool hashXs - the status is a function of the + # confirmed state of other transactions. Note: we cannot + # iterate over mempool_statuses as it changes size. + for hashX in tuple(self.mempool_statuses): + # Items can be evicted whilst await-ing status; False + # ensures such hashXs are notified + old_status = self.mempool_statuses.get(hashX, False) + status = await self.address_status(hashX) + if status != old_status: + alias = self.hashX_subs[hashX] + changed[alias] = status + + for alias, status in changed.items(): + if len(alias) == 64: + method = 'blockchain.scripthash.subscribe' + else: + method = 'blockchain.address.subscribe' + await self.send_notification(method, (alias, status)) + + if changed: + es = '' if len(changed) == 1 else 'es' + self.logger.info(f'notified of {len(changed):,d} address{es}') + + async def subscribe_headers_result(self): + '''The result of a header subscription or notification.''' + return self.session_mgr.hsub_results[self.subscribe_headers_raw] async def _headers_subscribe(self, raw): '''Subscribe to get headers of new blocks.''' - self.subscribe_headers = True self.subscribe_headers_raw = assert_boolean(raw) - self.notified_height = self.db.db_height - return await self.subscribe_headers_result(self.notified_height) + self.subscribe_headers = True + return await self.subscribe_headers_result() async def headers_subscribe(self): '''Subscribe to get raw headers of new blocks.''' @@ -1313,9 +1309,9 @@ class DashElectrumX(ElectrumX): 'masternode.list': self.masternode_list }) - async def notify(self, height, touched): + async def notify(self, touched, height_changed): '''Notify the client about changes in masternode list.''' - await super().notify(height, touched) + await super().notify(touched, height_changed) for mn in self.mns: status = await self.daemon_request('masternode_list', ['status', mn])