From 136df7e5ee8bb51636feed4d4d2d342411416574 Mon Sep 17 00:00:00 2001 From: SomberNight Date: Sat, 8 Sep 2018 01:34:33 +0200 Subject: [PATCH] wallet: recreate Synchronizer and Verifier when switching servers not that nice but solves races --- electrum/address_synchronizer.py | 4 ++-- electrum/interface.py | 12 ++++++++++-- electrum/network.py | 26 ++++++++++++-------------- electrum/synchronizer.py | 4 ++-- 4 files changed, 26 insertions(+), 20 deletions(-) diff --git a/electrum/address_synchronizer.py b/electrum/address_synchronizer.py index 80708b18d..f976b1f55 100644 --- a/electrum/address_synchronizer.py +++ b/electrum/address_synchronizer.py @@ -143,6 +143,8 @@ class AddressSynchronizer(PrintError): interface = self.network.interface if interface is None: return # we should get called again soon + self.verifier = SPV(self.network, self) + self.synchronizer = Synchronizer(self) await interface.group.spawn(self.verifier.main(interface)) await interface.group.spawn(self.synchronizer.send_subscriptions(interface)) await interface.group.spawn(self.synchronizer.handle_status(interface)) @@ -151,8 +153,6 @@ class AddressSynchronizer(PrintError): def start_threads(self, network): self.network = network if self.network is not None: - self.verifier = SPV(self.network, self) - self.synchronizer = Synchronizer(self) self.network.register_callback(self.on_default_server_changed, ['default_server_changed']) self.network.trigger_callback('default_server_changed') else: diff --git a/electrum/interface.py b/electrum/interface.py index a0c928b34..d0a06c87f 100644 --- a/electrum/interface.py +++ b/electrum/interface.py @@ -72,6 +72,14 @@ class NotificationSession(ClientSession): class GracefulDisconnect(AIOSafeSilentException): pass +class CustomTaskGroup(TaskGroup): + + def spawn(self, *args, **kwargs): + if self._closed: + raise asyncio.CancelledError() + return super().spawn(*args, **kwargs) + + class Interface(PrintError): def __init__(self, network, server, config_path, proxy): @@ -89,7 +97,7 @@ class Interface(PrintError): # TODO combine? self.fut = asyncio.get_event_loop().create_task(self.run()) - self.group = TaskGroup() + self.group = CustomTaskGroup() if proxy: username, pw = proxy.get('user'), proxy.get('password') @@ -255,7 +263,7 @@ class Interface(PrintError): self.tip = new_header['block_height'] await copy_header_queue.put(new_header) except concurrent.futures.TimeoutError: - await asyncio.wait_for(self.session.send_request('server.ping'), 5) + await asyncio.wait_for(self.session.send_request('server.ping'), 10) def close(self): self.fut.cancel() diff --git a/electrum/network.py b/electrum/network.py index 497ca02b1..c3fe875f9 100644 --- a/electrum/network.py +++ b/electrum/network.py @@ -242,9 +242,6 @@ class Network(PrintError): self.start_network(deserialize_server(self.default_server)[2], deserialize_proxy(self.config.get('proxy'))) self.asyncio_loop = asyncio.get_event_loop() - self.server_info_job = asyncio.Future() - # just to not trigger a warning from switch_to_interface the first time we change default_server - self.server_info_job.set_result(1) @staticmethod def get_instance(): @@ -330,7 +327,6 @@ class Network(PrintError): def is_connecting(self): return self.connection_status == 'connecting' - @util.aiosafe async def request_server_info(self, interface): await interface.ready session = interface.session @@ -560,7 +556,6 @@ class Network(PrintError): being opened, start a thread to connect. The actual switch will happen on receipt of the connection notification. Do nothing if server already is our interface.''' - old_default_server = self.default_server self.default_server = server if server not in self.interfaces: self.interface = None @@ -570,14 +565,18 @@ class Network(PrintError): i = self.interfaces[server] if self.interface != i: self.print_error("switching to", server) - # stop any current interface in order to terminate subscriptions - # fixme: we don't want to close headers sub - #self.close_interface(self.interface) + if self.interface is not None: + # Stop any current interface in order to terminate subscriptions, + # and to cancel tasks in interface.group. + # However, for headers sub, give preference to this interface + # over unknown ones, i.e. start it again right away. + self.close_interface(self.interface) + if len(self.interfaces) <= self.num_server: + self.start_interface(self.interface.server) + self.interface = i - if not self.server_info_job.done(): - self.print_error('cancelled previous request_server_info job, was it too slow? server was:', old_default_server) - self.server_info_job.cancel() - self.server_info_job = asyncio.get_event_loop().create_task(self.request_server_info(i)) + asyncio.get_event_loop().create_task( + i.group.spawn(self.request_server_info(i))) self.trigger_callback('default_server_changed') self.set_status('connected') self.notify('updated') @@ -876,12 +875,11 @@ class Network(PrintError): changed = True else: if self.config.is_fee_estimates_update_required(): - asyncio.get_event_loop().create_task(self.attempt_fee_estimate_update()) + await self.interface.group.spawn(self.attempt_fee_estimate_update()) if changed: self.notify('updated') await asyncio.sleep(1) - @util.aiosafe async def attempt_fee_estimate_update(self): await asyncio.wait_for(self.request_fee_estimates(self.interface), 5) diff --git a/electrum/synchronizer.py b/electrum/synchronizer.py index f03b18cf3..d8a887607 100644 --- a/electrum/synchronizer.py +++ b/electrum/synchronizer.py @@ -151,13 +151,13 @@ class Synchronizer(PrintError): async def send_subscriptions(self, interface): while True: addr = await self.add_queue.get() - await interface.group.spawn(self.subscribe_to_address(addr)) + await interface.group.spawn(self.subscribe_to_address, addr) async def handle_status(self, interface): while True: h, status = await self.status_queue.get() addr = self.scripthash_to_address[h] - await interface.group.spawn(self.on_address_status(addr, status)) + await interface.group.spawn(self.on_address_status, addr, status) @property def session(self):