From d7af868ed8c4dd94e332d3af9dc0cd15c479f306 Mon Sep 17 00:00:00 2001 From: SomberNight Date: Mon, 21 Feb 2022 20:09:26 +0100 Subject: [PATCH] network: test if interface is alive before iface.taskgroup.spawn closes https://github.com/spesmilo/electrum/issues/7677 ``` E/n | network | taskgroup died. Traceback (most recent call last): File "/opt/electrum/electrum/network.py", line 1204, in main [await group.spawn(job) for job in self._jobs] File "/home/voegtlin/.local/lib/python3.8/site-packages/aiorpcx/curio.py", line 297, in __aexit__ await self.join() File "/opt/electrum/electrum/util.py", line 1255, in join task.result() File "/opt/electrum/electrum/network.py", line 1277, in _maintain_sessions await maintain_main_interface() File "/opt/electrum/electrum/network.py", line 1268, in maintain_main_interface await self._ensure_there_is_a_main_interface() File "/opt/electrum/electrum/network.py", line 1245, in _ensure_there_is_a_main_interface await self._switch_to_random_interface() File "/opt/electrum/electrum/network.py", line 648, in _switch_to_random_interface await self.switch_to_interface(random.choice(servers)) File "/opt/electrum/electrum/network.py", line 714, in switch_to_interface await i.taskgroup.spawn(self._request_server_info(i)) File "/home/voegtlin/.local/lib/python3.8/site-packages/aiorpcx/curio.py", line 204, in spawn self._add_task(task) File "/home/voegtlin/.local/lib/python3.8/site-packages/aiorpcx/curio.py", line 150, in _add_task raise RuntimeError('task group terminated') RuntimeError: task group terminated ``` I believe the "suppress spurious cancellations" block was added as SilentTaskGroup raised CancelledError instead of RuntimeError for this scenario. --- electrum/interface.py | 5 ++++- electrum/network.py | 27 +++++++++++---------------- 2 files changed, 15 insertions(+), 17 deletions(-) diff --git a/electrum/interface.py b/electrum/interface.py index f45ec070e..d27a99efa 100644 --- a/electrum/interface.py +++ b/electrum/interface.py @@ -489,7 +489,7 @@ class Interface(Logger): self.logger.warning(f"disconnecting due to {repr(e)}") self.logger.debug(f"(disconnect) trace for {repr(e)}", exc_info=True) finally: - self.got_disconnected.set() + self.got_disconnected.set() # set this ASAP, ideally before any awaits await self.network.connection_down(self) # if was not 'ready' yet, schedule waiting coroutines: self.ready.cancel() @@ -534,6 +534,9 @@ class Interface(Logger): self.ready.set_result(1) + def is_connected_and_ready(self) -> bool: + return self.ready.done() and not self.got_disconnected.is_set() + async def _save_certificate(self) -> None: if not os.path.exists(self.cert_path): # we may need to retry this a few times, in case the handshake hasn't completed diff --git a/electrum/network.py b/electrum/network.py index 4d7235eb8..044ea425c 100644 --- a/electrum/network.py +++ b/electrum/network.py @@ -437,7 +437,7 @@ class Network(Logger, NetworkRetryManager[ServerAddr]): def is_connected(self): interface = self.interface - return interface is not None and interface.ready.done() + return interface is not None and interface.is_connected_and_ready() def is_connecting(self): return self.connection_status == 'connecting' @@ -707,8 +707,9 @@ class Network(Logger, NetworkRetryManager[ServerAddr]): i = self.interfaces[server] if old_interface != i: + if not i.is_connected_and_ready(): + return self.logger.info(f"switching to {server}") - assert i.ready.done(), "interface we are switching to is not ready yet" blockchain_updated = i.blockchain != self.blockchain() self.interface = i await i.taskgroup.spawn(self._request_server_info(i)) @@ -1195,7 +1196,7 @@ class Network(Logger, NetworkRetryManager[ServerAddr]): await self.taskgroup.spawn(self._run_new_interface(self.default_server)) async def main(): - self.logger.info("starting taskgroup.") + self.logger.info(f"starting taskgroup ({hex(id(taskgroup))}).") try: # note: if a task finishes with CancelledError, that # will NOT raise, and the group will keep the other tasks running @@ -1203,9 +1204,9 @@ class Network(Logger, NetworkRetryManager[ServerAddr]): await group.spawn(self._maintain_sessions()) [await group.spawn(job) for job in self._jobs] except Exception as e: - self.logger.exception("taskgroup died.") + self.logger.exception(f"taskgroup died ({hex(id(taskgroup))}).") finally: - self.logger.info("taskgroup stopped.") + self.logger.info(f"taskgroup stopped ({hex(id(taskgroup))}).") asyncio.run_coroutine_threadsafe(main(), self.asyncio_loop) util.trigger_callback('network_updated') @@ -1238,13 +1239,13 @@ class Network(Logger, NetworkRetryManager[ServerAddr]): util.trigger_callback('network_updated') async def _ensure_there_is_a_main_interface(self): - if self.is_connected(): + if self.interface: return # if auto_connect is set, try a different server if self.auto_connect and not self.is_connecting(): await self._switch_to_random_interface() # if auto_connect is not set, or still no main interface, retry current - if not self.is_connected() and not self.is_connecting(): + if not self.interface and not self.is_connecting(): if self._can_retry_addr(self.default_server, urgent=True): await self.switch_to_interface(self.default_server) @@ -1271,15 +1272,9 @@ class Network(Logger, NetworkRetryManager[ServerAddr]): await self.interface.taskgroup.spawn(self._request_fee_estimates, self.interface) while True: - try: - await maybe_start_new_interfaces() - await maintain_healthy_spread_of_connected_servers() - await maintain_main_interface() - except asyncio.CancelledError: - # suppress spurious cancellations - group = self.taskgroup - if not group or group.joined: - raise + await maybe_start_new_interfaces() + await maintain_healthy_spread_of_connected_servers() + await maintain_main_interface() await asyncio.sleep(0.1) @classmethod