Browse Source

Tweak notification handling

Fixes #614, which could happen if a block was found immediately at
startup
patch-2
Neil Booth 6 years ago
parent
commit
cdb9034571
  1. 2
      electrumx/server/block_processor.py
  2. 37
      electrumx/server/controller.py
  3. 14
      electrumx/server/session.py

2
electrumx/server/block_processor.py

@ -626,8 +626,6 @@ class BlockProcessor(object):
if first_sync: if first_sync:
self.logger.info(f'{electrumx.version} synced to ' self.logger.info(f'{electrumx.version} synced to '
f'height {self.height:,d}') f'height {self.height:,d}')
# Initialise the notification framework
await self.notifications.on_block(set(), self.height)
# Reopen for serving # Reopen for serving
await self.db.open_for_serving() await self.db.open_for_serving()

37
electrumx/server/controller.py

@ -33,8 +33,7 @@ class Notifications(object):
def __init__(self): def __init__(self):
self._touched_mp = {} self._touched_mp = {}
self._touched_bp = {} self._touched_bp = {}
self._highest_block = 0 self._highest_block = -1
self._notify_funcs = []
async def _maybe_notify(self): async def _maybe_notify(self):
tmp, tbp = self._touched_mp, self._touched_bp tmp, tbp = self._touched_mp, self._touched_bp
@ -54,11 +53,15 @@ class Notifications(object):
del tmp[old] del tmp[old]
for old in [h for h in tbp if h <= height]: for old in [h for h in tbp if h <= height]:
del tbp[old] del tbp[old]
for notify_func in self._notify_funcs: await self.notify(height, touched)
await notify_func(height, touched)
def add_callback(self, notify_func): async def notify(self, height, touched):
self._notify_funcs.append(notify_func) pass
async def start(self, height, notify_func):
self._highest_block = height
self.notify = notify_func
await self.notify(height, set())
async def on_mempool(self, touched, height): async def on_mempool(self, touched, height):
self._touched_mp[height] = touched self._touched_mp[height] = touched
@ -99,18 +102,17 @@ class Controller(ServerBase):
db = DB(env) db = DB(env)
bp = BlockProcessor(env, db, daemon, notifications) bp = BlockProcessor(env, db, daemon, notifications)
# Set ourselves up to implement the MemPoolAPI # Set notifications up to implement the MemPoolAPI
self.height = daemon.height notifications.height = daemon.height
self.cached_height = daemon.cached_height notifications.cached_height = daemon.cached_height
self.mempool_hashes = daemon.mempool_hashes notifications.mempool_hashes = daemon.mempool_hashes
self.raw_transactions = daemon.getrawtransactions notifications.raw_transactions = daemon.getrawtransactions
self.lookup_utxos = db.lookup_utxos notifications.lookup_utxos = db.lookup_utxos
self.on_mempool = notifications.on_mempool MemPoolAPI.register(Notifications)
MemPoolAPI.register(Controller) mempool = MemPool(env.coin, notifications)
mempool = MemPool(env.coin, self)
session_mgr = SessionManager(env, db, bp, daemon, mempool, session_mgr = SessionManager(env, db, bp, daemon, mempool,
notifications, shutdown_event) shutdown_event)
# Test daemon authentication, and also ensure it has a cached # Test daemon authentication, and also ensure it has a cached
# height. Do this before entering the task group. # height. Do this before entering the task group.
@ -120,7 +122,8 @@ class Controller(ServerBase):
serve_externally_event = Event() serve_externally_event = Event()
synchronized_event = Event() synchronized_event = Event()
async with TaskGroup() as group: async with TaskGroup() as group:
await group.spawn(session_mgr.serve(serve_externally_event)) await group.spawn(session_mgr.serve(notifications,
serve_externally_event))
await group.spawn(bp.fetch_and_process_blocks(caught_up_event)) await group.spawn(bp.fetch_and_process_blocks(caught_up_event))
await caught_up_event.wait() await caught_up_event.wait()
await group.spawn(db.populate_header_merkle_cache()) await group.spawn(db.populate_header_merkle_cache())

14
electrumx/server/session.py

@ -108,8 +108,7 @@ class SessionGroup(object):
class SessionManager(object): class SessionManager(object):
'''Holds global state about all sessions.''' '''Holds global state about all sessions.'''
def __init__(self, env, db, bp, daemon, mempool, notifications, def __init__(self, env, db, bp, daemon, mempool, shutdown_event):
shutdown_event):
env.max_send = max(350000, env.max_send) env.max_send = max(350000, env.max_send)
self.env = env self.env = env
self.db = db self.db = db
@ -136,8 +135,6 @@ class SessionManager(object):
# Event triggered when electrumx is listening for incoming requests. # Event triggered when electrumx is listening for incoming requests.
self.server_listening = Event() self.server_listening = Event()
self.session_event = Event() self.session_event = Event()
# Tell sessions about subscription changes
notifications.add_callback(self._notify_sessions)
# Set up the RPC request handlers # Set up the RPC request handlers
cmds = ('add_peer daemon_url disconnect getinfo groups log peers ' cmds = ('add_peer daemon_url disconnect getinfo groups log peers '
@ -346,6 +343,8 @@ class SessionManager(object):
'''Refresh the cached header subscription responses to be for height, '''Refresh the cached header subscription responses to be for height,
and record that as notified_height. and record that as notified_height.
''' '''
# Paranoia: a reorg could race and leave db_height lower
height = min(height, self.db.db_height)
electrum, raw = await self._electrum_and_raw_headers(height) electrum, raw = await self._electrum_and_raw_headers(height)
self.hsub_results = (electrum, {'hex': raw.hex(), 'height': height}) self.hsub_results = (electrum, {'hex': raw.hex(), 'height': height})
self.notified_height = height self.notified_height = height
@ -477,7 +476,7 @@ class SessionManager(object):
# --- External Interface # --- External Interface
async def serve(self, event): async def serve(self, notifications, event):
'''Start the RPC server if enabled. When the event is triggered, '''Start the RPC server if enabled. When the event is triggered,
start TCP and SSL servers.''' start TCP and SSL servers.'''
try: try:
@ -499,6 +498,8 @@ class SessionManager(object):
if self.env.drop_client is not None: if self.env.drop_client is not None:
self.logger.info('drop clients matching: {}' self.logger.info('drop clients matching: {}'
.format(self.env.drop_client.pattern)) .format(self.env.drop_client.pattern))
# Start notifications; initialize hsub_results
await notifications.start(self.db.db_height, self._notify_sessions)
await self._start_external_servers() await self._start_external_servers()
# Peer discovery should start after the external servers # Peer discovery should start after the external servers
# because we connect to ourself # because we connect to ourself
@ -559,8 +560,7 @@ class SessionManager(object):
'''Notify sessions about height changes and touched addresses.''' '''Notify sessions about height changes and touched addresses.'''
height_changed = height != self.notified_height height_changed = height != self.notified_height
if height_changed: if height_changed:
# Paranoia: a reorg could race and leave db_height lower await self._refresh_hsub_results(height)
await self._refresh_hsub_results(min(height, self.db.db_height))
# Invalidate our history cache for touched hashXs # Invalidate our history cache for touched hashXs
hc = self.history_cache hc = self.history_cache
for hashX in set(hc).intersection(touched): for hashX in set(hc).intersection(touched):

Loading…
Cancel
Save