Browse Source

Session manager coordinates header subscriptions

- caches both raw and deserialized headers
- session manager holds the notified height, not each session
patch-2
Neil Booth 7 years ago
parent
commit
5524bd3daf
  1. 136
      electrumx/server/session.py

136
electrumx/server/session.py

@ -129,8 +129,8 @@ class SessionManager(object):
self.state = self.CATCHING_UP self.state = self.CATCHING_UP
self.txs_sent = 0 self.txs_sent = 0
self.start_time = time.time() self.start_time = time.time()
self._history_cache = pylru.lrucache(256) self.history_cache = pylru.lrucache(256)
self._hc_height = 0 self.notified_height = None
# Cache some idea of room to avoid recounting on each subscription # Cache some idea of room to avoid recounting on each subscription
self.subs_room = 0 self.subs_room = 0
# Masternode stuff only for such coins # Masternode stuff only for such coins
@ -332,6 +332,19 @@ class SessionManager(object):
]) ])
return result return result
async def _electrum_and_raw_headers(self, height):
raw_header = await self.raw_header(height)
electrum_header = self.env.coin.electrum_header(raw_header, height)
return electrum_header, raw_header
async def _refresh_hsub_results(self, height):
'''Refresh the cached header subscription responses to be for height,
and record that as notified_height.
'''
electrum, raw = await self._electrum_and_raw_headers(height)
self.hsub_results = (electrum, {'hex': raw.hex(), 'height': height})
self.notified_height = height
# --- LocalRPC command handlers # --- LocalRPC command handlers
async def rpc_add_peer(self, real_name): async def rpc_add_peer(self, real_name):
@ -518,8 +531,8 @@ class SessionManager(object):
async def electrum_header(self, height): async def electrum_header(self, height):
'''Return the deserialized header at the given height.''' '''Return the deserialized header at the given height.'''
raw_header = await self.raw_header(height) electrum_header, _ = await self._electrum_and_raw_headers(height)
return self.env.coin.electrum_header(raw_header, height) return electrum_header
async def broadcast_transaction(self, raw_tx): async def broadcast_transaction(self, raw_tx):
hex_hash = await self.daemon.sendrawtransaction([raw_tx]) hex_hash = await self.daemon.sendrawtransaction([raw_tx])
@ -528,7 +541,7 @@ class SessionManager(object):
async def limited_history(self, hashX): async def limited_history(self, hashX):
'''A caching layer.''' '''A caching layer.'''
hc = self._history_cache hc = self.history_cache
if hashX not in hc: if hashX not in hc:
# History DoS limit. Each element of history is about 99 # History DoS limit. Each element of history is about 99
# bytes when encoded as JSON. This limits resource usage # bytes when encoded as JSON. This limits resource usage
@ -540,16 +553,18 @@ class SessionManager(object):
async def _notify_sessions(self, height, touched): async def _notify_sessions(self, height, touched):
'''Notify sessions about height changes and touched addresses.''' '''Notify sessions about height changes and touched addresses.'''
# Invalidate our history cache for touched hashXs height_changed = height != self.notified_height
if height != self._hc_height: if height_changed:
self._hc_height = height # Paranoia: a reorg could race and leave db_height lower
hc = self._history_cache await self._refresh_hsub_results(min(height, self.db.db_height))
# Invalidate our history cache for touched hashXs
hc = self.history_cache
for hashX in set(hc).intersection(touched): for hashX in set(hc).intersection(touched):
del hc[hashX] del hc[hashX]
async with TaskGroup() as group: async with TaskGroup() as group:
for session in self.sessions: for session in self.sessions:
await group.spawn(session.notify(height, touched)) await group.spawn(session.notify(touched, height_changed))
def add_session(self, session): def add_session(self, session):
self.sessions.add(session) self.sessions.add(session)
@ -610,7 +625,7 @@ class SessionBase(ServerSession):
self._receive_message_orig = self.connection.receive_message self._receive_message_orig = self.connection.receive_message
self.connection.receive_message = self.receive_message self.connection.receive_message = self.receive_message
async def notify(self, height, touched): async def notify(self, touched, height_changed):
pass pass
def peer_address_str(self, *, for_log=True): def peer_address_str(self, *, for_log=True):
@ -695,7 +710,6 @@ class ElectrumX(SessionBase):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
self.subscribe_headers = False self.subscribe_headers = False
self.subscribe_headers_raw = False self.subscribe_headers_raw = False
self.notified_height = None
self.connection.max_response_size = self.env.max_send self.connection.max_response_size = self.env.max_send
self.max_subs = self.env.max_session_subs self.max_subs = self.env.max_session_subs
self.hashX_subs = {} self.hashX_subs = {}
@ -736,73 +750,55 @@ class ElectrumX(SessionBase):
def sub_count(self): def sub_count(self):
return len(self.hashX_subs) return len(self.hashX_subs)
async def notify_touched(self, our_touched): async def notify(self, touched, height_changed):
changed = {}
for hashX in our_touched:
alias = self.hashX_subs[hashX]
status = await self.address_status(hashX)
changed[alias] = status
# Check mempool hashXs - the status is a function of the
# confirmed state of other transactions. Note: we cannot
# iterate over mempool_statuses as it changes size.
for hashX in tuple(self.mempool_statuses):
# Items can be evicted whilst await-ing below; False
# ensures such hashXs are notified
old_status = self.mempool_statuses.get(hashX, False)
status = await self.address_status(hashX)
if status != old_status:
alias = self.hashX_subs[hashX]
changed[alias] = status
for alias, status in changed.items():
if len(alias) == 64:
method = 'blockchain.scripthash.subscribe'
else:
method = 'blockchain.address.subscribe'
await self.send_notification(method, (alias, status))
if changed:
es = '' if len(changed) == 1 else 'es'
self.logger.info('notified of {:,d} address{}'
.format(len(changed), es))
async def notify(self, height, touched):
'''Notify the client about changes to touched addresses (from mempool '''Notify the client about changes to touched addresses (from mempool
updates or new blocks) and height. updates or new blocks) and height.
Return the set of addresses the session needs to be
asyncronously notified about. This can be empty if there are
possible mempool status updates.
Returns None if nothing needs to be notified asynchronously.
''' '''
height_changed = height != self.notified_height if height_changed and self.subscribe_headers:
if height_changed: args = (await self.subscribe_headers_result(), )
self.notified_height = height await self.send_notification('blockchain.headers.subscribe', args)
if self.subscribe_headers:
args = (await self.subscribe_headers_result(height), )
await self.send_notification('blockchain.headers.subscribe',
args)
touched = touched.intersection(self.hashX_subs) touched = touched.intersection(self.hashX_subs)
if touched or (height_changed and self.mempool_statuses): if touched or (height_changed and self.mempool_statuses):
await self.notify_touched(touched) changed = {}
async def subscribe_headers_result(self, height): for hashX in touched:
'''The result of a header subscription for the given height.''' alias = self.hashX_subs[hashX]
if self.subscribe_headers_raw: status = await self.address_status(hashX)
raw_header = await self.session_mgr.raw_header(height) changed[alias] = status
return {'hex': raw_header.hex(), 'height': height}
return await self.session_mgr.electrum_header(height) # Check mempool hashXs - the status is a function of the
# confirmed state of other transactions. Note: we cannot
# iterate over mempool_statuses as it changes size.
for hashX in tuple(self.mempool_statuses):
# Items can be evicted whilst await-ing status; False
# ensures such hashXs are notified
old_status = self.mempool_statuses.get(hashX, False)
status = await self.address_status(hashX)
if status != old_status:
alias = self.hashX_subs[hashX]
changed[alias] = status
for alias, status in changed.items():
if len(alias) == 64:
method = 'blockchain.scripthash.subscribe'
else:
method = 'blockchain.address.subscribe'
await self.send_notification(method, (alias, status))
if changed:
es = '' if len(changed) == 1 else 'es'
self.logger.info(f'notified of {len(changed):,d} address{es}')
async def subscribe_headers_result(self):
'''The result of a header subscription or notification.'''
return self.session_mgr.hsub_results[self.subscribe_headers_raw]
async def _headers_subscribe(self, raw): async def _headers_subscribe(self, raw):
'''Subscribe to get headers of new blocks.''' '''Subscribe to get headers of new blocks.'''
self.subscribe_headers = True
self.subscribe_headers_raw = assert_boolean(raw) self.subscribe_headers_raw = assert_boolean(raw)
self.notified_height = self.db.db_height self.subscribe_headers = True
return await self.subscribe_headers_result(self.notified_height) return await self.subscribe_headers_result()
async def headers_subscribe(self): async def headers_subscribe(self):
'''Subscribe to get raw headers of new blocks.''' '''Subscribe to get raw headers of new blocks.'''
@ -1313,9 +1309,9 @@ class DashElectrumX(ElectrumX):
'masternode.list': self.masternode_list 'masternode.list': self.masternode_list
}) })
async def notify(self, height, touched): async def notify(self, touched, height_changed):
'''Notify the client about changes in masternode list.''' '''Notify the client about changes in masternode list.'''
await super().notify(height, touched) await super().notify(touched, height_changed)
for mn in self.mns: for mn in self.mns:
status = await self.daemon_request('masternode_list', status = await self.daemon_request('masternode_list',
['status', mn]) ['status', mn])

Loading…
Cancel
Save