Browse Source

Clean up client notifications

- mempool informed of new block; it notifies controller synchronously
- controller notifies sessions synchronously
- sessions are notified of new height synchronously.  Any address touch
notifications are returned to the controller and scheduled
asynchronously.

Also, remove a redundant notification of height on initial header
subscriptions - the subscription response gives the current height;
we also used to send a notification as we didn't update our idea
of notified height.
patch-1
Neil Booth 7 years ago
parent
commit
cb33dd115f
  1. 3
      server/block_processor.py
  2. 38
      server/controller.py
  3. 17
      server/mempool.py
  4. 88
      server/session.py

3
server/block_processor.py

@ -253,6 +253,8 @@ class BlockProcessor(server.db.DB):
self.logger.info('processed {:,d} block{} in {:.1f}s'
.format(len(blocks), s,
time.time() - start))
self.controller.mempool.on_new_block(self.touched)
self.touched.clear()
elif hprevs[0] != chain[0]:
await self.reorg_chain()
else:
@ -511,7 +513,6 @@ class BlockProcessor(server.db.DB):
if self.caught_up_event.is_set():
self.flush(True)
else:
self.touched.clear()
if time.time() > self.next_cache_check:
self.check_cache_size()
self.next_cache_check = time.time() + 30

38
server/controller.py

@ -214,7 +214,6 @@ class Controller(ServerBase):
self.ensure_future(self.log_start_external_servers())
self.ensure_future(self.housekeeping())
self.ensure_future(self.mempool.main_loop())
self.ensure_future(self.notify())
def close_servers(self, kinds):
'''Close the servers of the given kinds (TCP etc.).'''
@ -272,27 +271,24 @@ class Controller(ServerBase):
sslc.load_cert_chain(env.ssl_certfile, keyfile=env.ssl_keyfile)
await self.start_server('SSL', host, env.ssl_port, ssl=sslc)
async def notify(self):
def notify_sessions(self, touched):
'''Notify sessions about height changes and touched addresses.'''
while True:
await self.mempool.touched_event.wait()
touched = self.mempool.touched.copy()
self.mempool.touched.clear()
self.mempool.touched_event.clear()
# Invalidate caches
hc = self.history_cache
for hashX in set(hc).intersection(touched):
del hc[hashX]
if self.bp.db_height != self.cache_height:
self.cache_height = self.bp.db_height
self.header_cache.clear()
# Make a copy; self.sessions can change whilst await-ing
sessions = [s for s in self.sessions
if isinstance(s, self.coin.SESSIONCLS)]
for session in sessions:
await session.notify(self.bp.db_height, touched)
# Invalidate caches
hc = self.history_cache
for hashX in set(hc).intersection(touched):
del hc[hashX]
height = self.bp.db_height
if height != self.cache_height:
self.cache_height = height
self.header_cache.clear()
# Height notifications are synchronous. Those sessions with
# touched addresses are scheduled for asynchronous completion
for session in self.sessions:
session_touched = session.notify(height, touched)
if session_touched is not None:
self.ensure_future(session.notify_async(session_touched))
def notify_peers(self, updates):
'''Notify of peer updates.'''

17
server/mempool.py

@ -37,8 +37,7 @@ class MemPool(util.LoggedClass):
self.controller = controller
self.coin = bp.coin
self.db = bp
self.touched = bp.touched
self.touched_event = asyncio.Event()
self.touched = set()
self.prioritized = set()
self.stop = False
self.txs = {}
@ -101,7 +100,8 @@ class MemPool(util.LoggedClass):
while True:
# Avoid double notifications if processing a block
if self.touched and not self.processing_new_block():
self.touched_event.set()
self.controller.notify_sessions(self.touched)
self.touched.clear()
# Log progress / state
todo = len(unfetched) + len(unprocessed)
@ -177,6 +177,17 @@ class MemPool(util.LoggedClass):
return process
def on_new_block(self, touched):
'''Called after processing one or more new blocks.
Touched is a set of hashXs touched by the transactions in the
block. Caller must be aware it is modified by this function.
'''
# Minor race condition here with mempool processor thread
touched.update(self.touched)
self.touched.clear()
self.controller.notify_sessions(touched)
def processing_new_block(self):
'''Return True if we're processing a new block.'''
return self.daemon.cached_height() > self.db.db_height

88
server/session.py

@ -119,64 +119,70 @@ class ElectrumX(SessionBase):
def sub_count(self):
return len(self.hashX_subs)
async def notify(self, height, touched):
'''Notify the client about changes in height and touched addresses.
Cache is a shared cache for this update.
'''
pairs = []
changed = []
async def notify_async(self, our_touched):
changed = {}
matches = touched.intersection(self.hashX_subs)
for hashX in matches:
for hashX in our_touched:
alias = self.hashX_subs[hashX]
status = await self.address_status(hashX)
changed.append((alias, status))
changed[alias] = status
if height != self.notified_height:
self.notified_height = height
if self.subscribe_headers:
args = (self.controller.electrum_header(height), )
pairs.append(('blockchain.headers.subscribe', args))
# Check mempool hashXs - the status is a function of the
# confirmed state of other transactions
for hashX, old_status in self.mempool_statuses.items():
status = await self.address_status(hashX)
if status != old_status:
alias = self.hashX_subs[hashX]
changed[alias] = status
if self.subscribe_height:
pairs.append(('blockchain.numblocks.subscribe', (height, )))
# Check mempool hashXs - the status is a function of the
# confirmed state of other transactions
for hashX in set(self.mempool_statuses).difference(matches):
old_status = self.mempool_statuses[hashX]
status = await self.address_status(hashX)
if status != old_status:
alias = self.hashX_subs[hashX]
changed.append((alias, status))
for alias_status in changed:
if len(alias_status[0]) == 64:
for alias, status in changed.items():
if len(alias) == 64:
method = 'blockchain.scripthash.subscribe'
else:
method = 'blockchain.address.subscribe'
pairs.append((method, alias_status))
self.send_notification(method, (alias, status))
if changed:
es = '' if len(changed) == 1 else 'es'
self.log_info('notified of {:,d} address{}'
.format(len(changed), es))
def notify(self, height, touched):
'''Notify the client about changes to touched addresses (from mempool
updates or new blocks) and height.
if pairs:
self.send_notifications(pairs)
if changed:
es = '' if len(changed) == 1 else 'es'
self.log_info('notified of {:,d} address{}'
.format(len(changed), es))
Return the set of addresses the session needs to be
asyncronously notified about. This can be empty if there are
possible mempool status updates.
Returns None if nothing needs to be notified asynchronously.
'''
height_changed = height != self.notified_height
if height_changed:
self.notified_height = height
if self.subscribe_headers:
args = (self.controller.electrum_header(height), )
self.send_notification('blockchain.headers.subscribe', args)
if self.subscribe_height:
args = (height, )
self.send_notification('blockchain.numblocks.subscribe', args)
our_touched = touched.intersection(self.hashX_subs)
if our_touched or (height_changed and self.mempool_statuses):
return our_touched
return None
def height(self):
'''Return the current flushed database height.'''
return self.bp.db_height
def current_electrum_header(self):
'''Used as response to a headers subscription request.'''
return self.controller.electrum_header(self.height())
def headers_subscribe(self):
'''Subscribe to get headers of new blocks.'''
self.subscribe_headers = True
return self.current_electrum_header()
height = self.height()
self.notified_height = height
return self.controller.electrum_header(height)
def numblocks_subscribe(self):
'''Subscribe to get height of new blocks.'''

Loading…
Cancel
Save