From 2c57c78ebe77287ba9698225bf8e20880b8fee62 Mon Sep 17 00:00:00 2001 From: SomberNight Date: Fri, 29 Apr 2022 18:24:49 +0200 Subject: [PATCH 1/2] asyncio: stop using get_event_loop(). introduce ~singleton loop. asyncio.get_event_loop() became deprecated in python3.10. (see https://github.com/python/cpython/issues/83710) ``` .../electrum/electrum/daemon.py:470: DeprecationWarning: There is no current event loop self.asyncio_loop = asyncio.get_event_loop() .../electrum/electrum/network.py:276: DeprecationWarning: There is no current event loop self.asyncio_loop = asyncio.get_event_loop() ``` Also, according to that thread, "set_event_loop() [... is] not deprecated by oversight". So, we stop using get_event_loop() and set_event_loop() in our own code. Note that libraries we use (such as the stdlib for python <3.10), might call get_event_loop, which then relies on us having called set_event_loop e.g. for the GUI thread. To work around this, a custom event loop policy providing a get_event_loop implementation is used. Previously, we have been using a single asyncio event loop, created with util.create_and_start_event_loop, and code in many places got a reference to this loop using asyncio.get_event_loop(). Now, we still use a single asyncio event loop, but it is now stored as a global in util._asyncio_event_loop (access with util.get_asyncio_loop()). I believe these changes also fix https://github.com/spesmilo/electrum/issues/5376 --- electrum/commands.py | 2 +- electrum/daemon.py | 4 +- electrum/exchange_rate.py | 4 +- electrum/gui/kivy/main_window.py | 2 +- electrum/lnworker.py | 2 +- electrum/network.py | 12 +++--- electrum/sql_db.py | 1 - electrum/tests/test_lnpeer.py | 5 ++- electrum/tests/test_lnrouter.py | 3 +- electrum/tests/test_lntransport.py | 16 ++++++-- electrum/tests/test_network.py | 23 ++++++++--- electrum/tests/test_storage_upgrade.py | 3 -- electrum/tests/test_wallet.py | 6 +-- electrum/tests/test_wallet_vertical.py | 45 +++++++++------------ electrum/util.py | 56 ++++++++++++++++++++++---- 15 files changed, 120 insertions(+), 64 deletions(-) diff --git a/electrum/commands.py b/electrum/commands.py index 6058eed70..9de9b3a6a 100644 --- a/electrum/commands.py +++ b/electrum/commands.py @@ -187,7 +187,7 @@ class Commands: kwargs.pop('wallet') coro = f(*args, **kwargs) - fut = asyncio.run_coroutine_threadsafe(coro, asyncio.get_event_loop()) + fut = asyncio.run_coroutine_threadsafe(coro, util.get_asyncio_loop()) result = fut.result() if self._callback: diff --git a/electrum/daemon.py b/electrum/daemon.py index 8bb8fc6b9..cf8d96481 100644 --- a/electrum/daemon.py +++ b/electrum/daemon.py @@ -124,7 +124,7 @@ def request(config: SimpleConfig, endpoint, args=(), timeout=60): rpc_user, rpc_password = get_rpc_credentials(config) server_url = 'http://%s:%d' % (host, port) auth = aiohttp.BasicAuth(login=rpc_user, password=rpc_password) - loop = asyncio.get_event_loop() + loop = util.get_asyncio_loop() async def request_coroutine(): if socktype == 'unix': connector = aiohttp.UnixConnector(path=path) @@ -467,7 +467,7 @@ class Daemon(Logger): if 'wallet_path' in config.cmdline_options: self.logger.warning("Ignoring parameter 'wallet_path' for daemon. " "Use the load_wallet command instead.") - self.asyncio_loop = asyncio.get_event_loop() + self.asyncio_loop = util.get_asyncio_loop() self.network = None if not config.get('offline'): self.network = Network(config, daemon=self) diff --git a/electrum/exchange_rate.py b/electrum/exchange_rate.py index aacc6dee1..5e033e4ea 100644 --- a/electrum/exchange_rate.py +++ b/electrum/exchange_rate.py @@ -148,7 +148,7 @@ class ExchangeBase(Logger): if h is None: h = self.read_historical_rates(ccy, cache_dir) if h is None or h['timestamp'] < time.time() - 24*3600: - asyncio.get_event_loop().create_task(self.get_historical_rates_safe(ccy, cache_dir)) + util.get_asyncio_loop().create_task(self.get_historical_rates_safe(ccy, cache_dir)) def history_ccys(self) -> Sequence[str]: return [] @@ -471,7 +471,7 @@ def get_exchanges_and_currencies(): for name, klass in exchanges.items(): exchange = klass(None, None) await group.spawn(get_currencies_safe(name, exchange)) - loop = asyncio.get_event_loop() + loop = util.get_asyncio_loop() try: loop.run_until_complete(query_all_exchanges_for_their_ccys_over_network()) except Exception as e: diff --git a/electrum/gui/kivy/main_window.py b/electrum/gui/kivy/main_window.py index 70d17a434..163fe9a60 100644 --- a/electrum/gui/kivy/main_window.py +++ b/electrum/gui/kivy/main_window.py @@ -394,7 +394,7 @@ class ElectrumWindow(App, Logger): self.is_exit = False self.wallet = None # type: Optional[Abstract_Wallet] self.pause_time = 0 - self.asyncio_loop = asyncio.get_event_loop() + self.asyncio_loop = util.get_asyncio_loop() self.password = None self._use_single_password = False self.resume_dialog = None diff --git a/electrum/lnworker.py b/electrum/lnworker.py index 42098ad32..91a442589 100644 --- a/electrum/lnworker.py +++ b/electrum/lnworker.py @@ -496,7 +496,7 @@ class LNWorker(Logger, NetworkRetryManager[LNPeerAddr]): # Try DNS-resolving the host (if needed). This is simply so that # the caller gets a nice exception if it cannot be resolved. try: - await asyncio.get_event_loop().getaddrinfo(host, port) + await asyncio.get_running_loop().getaddrinfo(host, port) except socket.gaierror: raise ConnStringFormatError(_('Hostname does not resolve (getaddrinfo failed)')) # add peer diff --git a/electrum/network.py b/electrum/network.py index 8b9c4aca0..cb0c5e1c1 100644 --- a/electrum/network.py +++ b/electrum/network.py @@ -273,7 +273,7 @@ class Network(Logger, NetworkRetryManager[ServerAddr]): init_retry_delay_urgent=1, ) - self.asyncio_loop = asyncio.get_event_loop() + self.asyncio_loop = util.get_asyncio_loop() assert self.asyncio_loop.is_running(), "event loop not running" assert isinstance(config, SimpleConfig), f"config should be a SimpleConfig instead of {type(config)}" @@ -381,9 +381,11 @@ class Network(Logger, NetworkRetryManager[ServerAddr]): self.channel_db = None self.path_finder = None - def run_from_another_thread(self, coro, *, timeout=None): - assert util.get_running_loop() != self.asyncio_loop, 'must not be called from network thread' - fut = asyncio.run_coroutine_threadsafe(coro, self.asyncio_loop) + @classmethod + def run_from_another_thread(cls, coro, *, timeout=None): + loop = util.get_asyncio_loop() + assert util.get_running_loop() != loop, 'must not be called from asyncio thread' + fut = asyncio.run_coroutine_threadsafe(coro, loop) return fut.result(timeout) @staticmethod @@ -1321,7 +1323,7 @@ class Network(Logger, NetworkRetryManager[ServerAddr]): assert util.get_running_loop() != network.asyncio_loop loop = network.asyncio_loop else: - loop = asyncio.get_event_loop() + loop = util.get_asyncio_loop() coro = asyncio.run_coroutine_threadsafe(cls._send_http_on_proxy(method, url, **kwargs), loop) # note: _send_http_on_proxy has its own timeout, so no timeout here: return coro.result() diff --git a/electrum/sql_db.py b/electrum/sql_db.py index 3d9dba7de..a6459fc06 100644 --- a/electrum/sql_db.py +++ b/electrum/sql_db.py @@ -26,7 +26,6 @@ class SqlDB(Logger): def __init__(self, asyncio_loop: asyncio.BaseEventLoop, path, commit_interval=None): Logger.__init__(self) self.asyncio_loop = asyncio_loop - asyncio.set_event_loop(asyncio_loop) self.stopping = False self.stopped_event = asyncio.Event() self.path = path diff --git a/electrum/tests/test_lnpeer.py b/electrum/tests/test_lnpeer.py index 9ba096a1c..dbe948495 100644 --- a/electrum/tests/test_lnpeer.py +++ b/electrum/tests/test_lnpeer.py @@ -15,6 +15,7 @@ from aiorpcx import timeout_after, TaskTimeout import electrum import electrum.trampoline from electrum import bitcoin +from electrum import util from electrum import constants from electrum.network import Network from electrum.ecc import ECPrivkey @@ -62,7 +63,7 @@ class MockNetwork: user_config = {} user_dir = tempfile.mkdtemp(prefix="electrum-lnpeer-test-") self.config = simple_config.SimpleConfig(user_config, read_user_dir_function=lambda: user_dir) - self.asyncio_loop = asyncio.get_event_loop() + self.asyncio_loop = util.get_asyncio_loop() self.channel_db = ChannelDB(self) self.channel_db.data_loaded.set() self.path_finder = LNPathFinder(self.channel_db) @@ -1361,4 +1362,4 @@ class TestPeer(TestCaseForTestnet): def run(coro): - return asyncio.run_coroutine_threadsafe(coro, loop=asyncio.get_event_loop()).result() + return asyncio.run_coroutine_threadsafe(coro, loop=util.get_asyncio_loop()).result() diff --git a/electrum/tests/test_lnrouter.py b/electrum/tests/test_lnrouter.py index 09b045f6e..863842291 100644 --- a/electrum/tests/test_lnrouter.py +++ b/electrum/tests/test_lnrouter.py @@ -4,6 +4,7 @@ import tempfile import shutil import asyncio +from electrum import util from electrum.util import bh2u, bfh, create_and_start_event_loop from electrum.lnutil import ShortChannelID from electrum.lnonion import (OnionHopsDataSingle, new_onion_packet, @@ -64,7 +65,7 @@ class Test_LNRouter(TestCaseForTestnet): """ class fake_network: config = self.config - asyncio_loop = asyncio.get_event_loop() + asyncio_loop = util.get_asyncio_loop() trigger_callback = lambda *args: None register_callback = lambda *args: None interface = None diff --git a/electrum/tests/test_lntransport.py b/electrum/tests/test_lntransport.py index 4d355bb83..54ca0e30d 100644 --- a/electrum/tests/test_lntransport.py +++ b/electrum/tests/test_lntransport.py @@ -1,5 +1,6 @@ import asyncio +from electrum import util from electrum.ecc import ECPrivkey from electrum.lnutil import LNPeerAddr from electrum.lntransport import LNResponderTransport, LNTransport @@ -11,6 +12,15 @@ from .test_bitcoin import needs_test_with_all_chacha20_implementations class TestLNTransport(ElectrumTestCase): + def setUp(self): + super().setUp() + self.asyncio_loop, self._stop_loop, self._loop_thread = util.create_and_start_event_loop() + + def tearDown(self): + self.asyncio_loop.call_soon_threadsafe(self._stop_loop.set_result, 1) + self._loop_thread.join(timeout=1) + super().tearDown() + @needs_test_with_all_chacha20_implementations def test_responder(self): # local static @@ -38,11 +48,11 @@ class TestLNTransport(ElectrumTestCase): assert num_bytes == 66 return bytes.fromhex('00b9e3a702e93e3a9948c2ed6e5fd7590a6e1c3a0344cfc9d5b57357049aa22355361aa02e55a8fc28fef5bd6d71ad0c38228dc68b1c466263b47fdf31e560e139ba') transport = LNResponderTransport(ls_priv, Reader(), Writer()) - asyncio.get_event_loop().run_until_complete(transport.handshake(epriv=e_priv)) + asyncio.run_coroutine_threadsafe( + transport.handshake(epriv=e_priv), self.asyncio_loop).result() @needs_test_with_all_chacha20_implementations def test_loop(self): - loop = asyncio.get_event_loop() responder_shaked = asyncio.Event() server_shaked = asyncio.Event() responder_key = ECPrivkey.generate_random_key() @@ -96,4 +106,4 @@ class TestLNTransport(ElectrumTestCase): server.close() await server.wait_closed() - loop.run_until_complete(f()) + asyncio.run_coroutine_threadsafe(f(), self.asyncio_loop).result() diff --git a/electrum/tests/test_network.py b/electrum/tests/test_network.py index b433a5e26..36ff1b451 100644 --- a/electrum/tests/test_network.py +++ b/electrum/tests/test_network.py @@ -8,6 +8,7 @@ from electrum import blockchain from electrum.interface import Interface, ServerAddr from electrum.crypto import sha256 from electrum.util import bh2u +from electrum import util from . import ElectrumTestCase @@ -17,7 +18,9 @@ class MockTaskGroup: class MockNetwork: taskgroup = MockTaskGroup() - asyncio_loop = asyncio.get_event_loop() + + def __init__(self): + self.asyncio_loop = util.get_asyncio_loop() class MockInterface(Interface): def __init__(self, config): @@ -52,9 +55,15 @@ class TestNetwork(ElectrumTestCase): def setUp(self): super().setUp() + self.asyncio_loop, self._stop_loop, self._loop_thread = util.create_and_start_event_loop() self.config = SimpleConfig({'electrum_path': self.electrum_path}) self.interface = MockInterface(self.config) + def tearDown(self): + self.asyncio_loop.call_soon_threadsafe(self._stop_loop.set_result, 1) + self._loop_thread.join(timeout=1) + super().tearDown() + def test_fork_noconflict(self): blockchain.blockchains = {} self.interface.q.put_nowait({'block_height': 8, 'mock': {'catchup':1, 'check': lambda x: False, 'connect': lambda x: False}}) @@ -66,7 +75,8 @@ class TestNetwork(ElectrumTestCase): self.interface.q.put_nowait({'block_height': 5, 'mock': {'binary':1,'check':lambda x: True, 'connect': lambda x: True}}) self.interface.q.put_nowait({'block_height': 6, 'mock': {'binary':1,'check':lambda x: True, 'connect': lambda x: True}}) ifa = self.interface - self.assertEqual(('fork', 8), asyncio.get_event_loop().run_until_complete(ifa.sync_until(8, next_height=7))) + fut = asyncio.run_coroutine_threadsafe(ifa.sync_until(8, next_height=7), util.get_asyncio_loop()) + self.assertEqual(('fork', 8), fut.result()) self.assertEqual(self.interface.q.qsize(), 0) def test_fork_conflict(self): @@ -80,7 +90,8 @@ class TestNetwork(ElectrumTestCase): self.interface.q.put_nowait({'block_height': 5, 'mock': {'binary':1,'check':lambda x: True, 'connect': lambda x: True}}) self.interface.q.put_nowait({'block_height': 6, 'mock': {'binary':1,'check':lambda x: True, 'connect': lambda x: True}}) ifa = self.interface - self.assertEqual(('fork', 8), asyncio.get_event_loop().run_until_complete(ifa.sync_until(8, next_height=7))) + fut = asyncio.run_coroutine_threadsafe(ifa.sync_until(8, next_height=7), util.get_asyncio_loop()) + self.assertEqual(('fork', 8), fut.result()) self.assertEqual(self.interface.q.qsize(), 0) def test_can_connect_during_backward(self): @@ -93,7 +104,8 @@ class TestNetwork(ElectrumTestCase): self.interface.q.put_nowait({'block_height': 3, 'mock': {'catchup':1, 'check': lambda x: False, 'connect': lambda x: True}}) self.interface.q.put_nowait({'block_height': 4, 'mock': {'catchup':1, 'check': lambda x: False, 'connect': lambda x: True}}) ifa = self.interface - self.assertEqual(('catchup', 5), asyncio.get_event_loop().run_until_complete(ifa.sync_until(8, next_height=4))) + fut = asyncio.run_coroutine_threadsafe(ifa.sync_until(8, next_height=4), util.get_asyncio_loop()) + self.assertEqual(('catchup', 5), fut.result()) self.assertEqual(self.interface.q.qsize(), 0) def mock_fork(self, bad_header): @@ -113,7 +125,8 @@ class TestNetwork(ElectrumTestCase): self.interface.q.put_nowait({'block_height': 5, 'mock': {'catchup':1, 'check': lambda x: False, 'connect': lambda x: True}}) self.interface.q.put_nowait({'block_height': 6, 'mock': {'catchup':1, 'check': lambda x: False, 'connect': lambda x: True}}) ifa = self.interface - self.assertEqual(('catchup', 7), asyncio.get_event_loop().run_until_complete(ifa.sync_until(8, next_height=6))) + fut = asyncio.run_coroutine_threadsafe(ifa.sync_until(8, next_height=6), util.get_asyncio_loop()) + self.assertEqual(('catchup', 7), fut.result()) self.assertEqual(self.interface.q.qsize(), 0) diff --git a/electrum/tests/test_storage_upgrade.py b/electrum/tests/test_storage_upgrade.py index 47f64869e..e8ca06f0e 100644 --- a/electrum/tests/test_storage_upgrade.py +++ b/electrum/tests/test_storage_upgrade.py @@ -20,12 +20,9 @@ class TestStorageUpgrade(WalletTestCase): def setUp(self): super().setUp() - self.asyncio_loop, self._stop_loop, self._loop_thread = util.create_and_start_event_loop() def tearDown(self): super().tearDown() - self.asyncio_loop.call_soon_threadsafe(self._stop_loop.set_result, 1) - self._loop_thread.join(timeout=1) def testnet_wallet(func): # note: it's ok to modify global network constants in subclasses of SequentialTestCase diff --git a/electrum/tests/test_wallet.py b/electrum/tests/test_wallet.py index 74c3bca57..7a90a3e35 100644 --- a/electrum/tests/test_wallet.py +++ b/electrum/tests/test_wallet.py @@ -35,6 +35,7 @@ class WalletTestCase(ElectrumTestCase): def setUp(self): super(WalletTestCase, self).setUp() + self.asyncio_loop, self._stop_loop, self._loop_thread = util.create_and_start_event_loop() self.user_dir = tempfile.mkdtemp() self.config = SimpleConfig({'electrum_path': self.user_dir}) @@ -45,6 +46,8 @@ class WalletTestCase(ElectrumTestCase): sys.stdout = self._stdout_buffer def tearDown(self): + self.asyncio_loop.call_soon_threadsafe(self._stop_loop.set_result, 1) + self._loop_thread.join(timeout=1) super(WalletTestCase, self).tearDown() shutil.rmtree(self.user_dir) # Restore the "real" stdout @@ -241,12 +244,9 @@ class TestWalletPassword(WalletTestCase): def setUp(self): super().setUp() - self.asyncio_loop, self._stop_loop, self._loop_thread = util.create_and_start_event_loop() def tearDown(self): super().tearDown() - self.asyncio_loop.call_soon_threadsafe(self._stop_loop.set_result, 1) - self._loop_thread.join(timeout=1) def test_update_password_of_imported_wallet(self): wallet_str = '{"addr_history":{"1364Js2VG66BwRdkaoxAaFtdPb1eQgn8Dr":[],"15CyDgLffJsJgQrhcyooFH4gnVDG82pUrA":[],"1Exet2BhHsFxKTwhnfdsBMkPYLGvobxuW6":[]},"addresses":{"change":[],"receiving":["1364Js2VG66BwRdkaoxAaFtdPb1eQgn8Dr","1Exet2BhHsFxKTwhnfdsBMkPYLGvobxuW6","15CyDgLffJsJgQrhcyooFH4gnVDG82pUrA"]},"keystore":{"keypairs":{"0344b1588589958b0bcab03435061539e9bcf54677c104904044e4f8901f4ebdf5":"L2sED74axVXC4H8szBJ4rQJrkfem7UMc6usLCPUoEWxDCFGUaGUM","0389508c13999d08ffae0f434a085f4185922d64765c0bff2f66e36ad7f745cc5f":"L3Gi6EQLvYw8gEEUckmqawkevfj9s8hxoQDFveQJGZHTfyWnbk1U","04575f52b82f159fa649d2a4c353eb7435f30206f0a6cb9674fbd659f45082c37d559ffd19bea9c0d3b7dcc07a7b79f4cffb76026d5d4dff35341efe99056e22d2":"5JyVyXU1LiRXATvRTQvR9Kp8Rx1X84j2x49iGkjSsXipydtByUq"},"type":"imported"},"pruned_txo":{},"seed_version":13,"stored_height":-1,"transactions":{},"tx_fees":{},"txi":{},"txo":{},"use_encryption":false,"verified_tx3":{},"wallet_type":"standard","winpos-qt":[100,100,840,405]}' diff --git a/electrum/tests/test_wallet_vertical.py b/electrum/tests/test_wallet_vertical.py index 7037e77e0..6f41f65b0 100644 --- a/electrum/tests/test_wallet_vertical.py +++ b/electrum/tests/test_wallet_vertical.py @@ -9,6 +9,7 @@ import copy from electrum import storage, bitcoin, keystore, bip32, slip39, wallet from electrum import Transaction from electrum import SimpleConfig +from electrum import util from electrum.address_synchronizer import TX_HEIGHT_UNCONFIRMED, TX_HEIGHT_UNCONF_PARENT from electrum.wallet import (sweep, Multisig_Wallet, Standard_Wallet, Imported_Wallet, restore_wallet_from_text, Abstract_Wallet, BumpFeeStrategy) @@ -18,6 +19,7 @@ from electrum.util import ( from electrum.transaction import (TxOutput, Transaction, PartialTransaction, PartialTxOutput, PartialTxInput, tx_from_any, TxOutpoint) from electrum.mnemonic import seed_type +from electrum.network import Network from electrum.plugins.trustedcoin import trustedcoin @@ -699,8 +701,14 @@ class TestWalletSending(TestCaseForTestnet): def setUp(self): super().setUp() + self.asyncio_loop, self._stop_loop, self._loop_thread = util.create_and_start_event_loop() self.config = SimpleConfig({'electrum_path': self.electrum_path}) + def tearDown(self): + self.asyncio_loop.call_soon_threadsafe(self._stop_loop.set_result, 1) + self._loop_thread.join(timeout=1) + super().tearDown() + def create_standard_wallet_from_seed(self, seed_words, *, config=None, gap_limit=2): if config is None: config = self.config @@ -1369,14 +1377,7 @@ class TestWalletSending(TestCaseForTestnet): raise Exception("unexpected txid") def has_internet_connection(self): return True - def run_from_another_thread(self, coro, *, timeout=None): - loop, stop_loop, loop_thread = create_and_start_event_loop() - fut = asyncio.run_coroutine_threadsafe(coro, loop) - try: - return fut.result(timeout) - finally: - loop.call_soon_threadsafe(stop_loop.set_result, 1) - loop_thread.join(timeout=1) + run_from_another_thread = Network.run_from_another_thread def get_local_height(self): return 0 def blockchain(self): @@ -1429,14 +1430,7 @@ class TestWalletSending(TestCaseForTestnet): raise Exception("unexpected txid") def has_internet_connection(self): return True - def run_from_another_thread(self, coro, *, timeout=None): - loop, stop_loop, loop_thread = create_and_start_event_loop() - fut = asyncio.run_coroutine_threadsafe(coro, loop) - try: - return fut.result(timeout) - finally: - loop.call_soon_threadsafe(stop_loop.set_result, 1) - loop_thread.join(timeout=1) + run_from_another_thread = Network.run_from_another_thread def get_local_height(self): return 0 def blockchain(self): @@ -1844,8 +1838,8 @@ class TestWalletSending(TestCaseForTestnet): network = NetworkMock() dest_addr = 'tb1q3ws2p0qjk5vrravv065xqlnkckvzcpclk79eu2' sweep_coro = sweep(privkeys, network=network, config=self.config, to_address=dest_addr, fee=5000, locktime=1325785, tx_version=1) - loop = asyncio.get_event_loop() - tx = loop.run_until_complete(sweep_coro) + loop = util.get_asyncio_loop() + tx = asyncio.run_coroutine_threadsafe(sweep_coro, loop).result() tx_copy = tx_from_any(tx.serialize()) self.assertEqual('010000000129349e5641d79915e9d0282fdbaee8c3df0b6731bab9d70bf626e8588bde24ac010000004847304402206bf0d0a93abae0d5873a62ebf277a5dd2f33837821e8b93e74d04e19d71b578002201a6d729bc159941ef5c4c9e5fe13ece9fc544351ba531b00f68ba549c8b38a9a01fdffffff01b82e0f00000000001600148ba0a0bc12b51831f58c7ea8607e76c5982c071fd93a1400', @@ -2199,14 +2193,7 @@ class TestWalletSending(TestCaseForTestnet): raise Exception("unexpected txid") def has_internet_connection(self): return True - def run_from_another_thread(self, coro, *, timeout=None): - loop, stop_loop, loop_thread = create_and_start_event_loop() - fut = asyncio.run_coroutine_threadsafe(coro, loop) - try: - return fut.result(timeout) - finally: - loop.call_soon_threadsafe(stop_loop.set_result, 1) - loop_thread.join(timeout=1) + run_from_another_thread = Network.run_from_another_thread def get_local_height(self): return 0 def blockchain(self): @@ -3284,8 +3271,14 @@ class TestWalletHistory_DoubleSpend(TestCaseForTestnet): def setUp(self): super().setUp() + self.asyncio_loop, self._stop_loop, self._loop_thread = util.create_and_start_event_loop() self.config = SimpleConfig({'electrum_path': self.electrum_path}) + def tearDown(self): + self.asyncio_loop.call_soon_threadsafe(self._stop_loop.set_result, 1) + self._loop_thread.join(timeout=1) + super().tearDown() + @mock.patch.object(wallet.Abstract_Wallet, 'save_db') def test_restoring_wallet_without_manual_delete(self, mock_save_db): w = restore_wallet_from_text("small rapid pattern language comic denial donate extend tide fever burden barrel", diff --git a/electrum/util.py b/electrum/util.py index fb0657cb8..eaf584792 100644 --- a/electrum/util.py +++ b/electrum/util.py @@ -1031,7 +1031,7 @@ def parse_URI(uri: str, on_pr: Callable = None, *, loop=None) -> dict: request = await pr.get_payment_request(r) if on_pr: on_pr(request) - loop = loop or asyncio.get_event_loop() + loop = loop or get_asyncio_loop() asyncio.run_coroutine_threadsafe(get_payment_request(), loop) return out @@ -1319,7 +1319,6 @@ class NetworkJobOnDefaultServer(Logger, ABC): """ def __init__(self, network: 'Network'): Logger.__init__(self) - asyncio.set_event_loop(network.asyncio_loop) self.network = network self.interface = None # type: Interface self._restart_lock = asyncio.Lock() @@ -1384,9 +1383,41 @@ class NetworkJobOnDefaultServer(Logger, ABC): return s +_asyncio_event_loop = None # type: Optional[asyncio.AbstractEventLoop] +def get_asyncio_loop() -> asyncio.AbstractEventLoop: + """Returns the global asyncio event loop we use.""" + if _asyncio_event_loop is None: + raise Exception("event loop not created yet") + return _asyncio_event_loop + + def create_and_start_event_loop() -> Tuple[asyncio.AbstractEventLoop, asyncio.Future, threading.Thread]: + global _asyncio_event_loop + if _asyncio_event_loop is not None: + raise Exception("there is already a running event loop") + + # asyncio.get_event_loop() became deprecated in python3.10. (see https://github.com/python/cpython/issues/83710) + # We set a custom event loop policy purely to be compatible with code that + # relies on asyncio.get_event_loop(). + # - in python 3.8-3.9, asyncio.Event.__init__, asyncio.Lock.__init__, + # and similar, calls get_event_loop. see https://github.com/python/cpython/pull/23420 + class MyEventLoopPolicy(asyncio.DefaultEventLoopPolicy): + def get_event_loop(self): + # In case electrum is being used as a library, there might be other + # event loops in use besides ours. To minimise interfering with those, + # if there is a loop running in the current thread, return that: + running_loop = get_running_loop() + if running_loop is not None: + return running_loop + # Otherwise, return our global loop: + return get_asyncio_loop() + asyncio.set_event_loop_policy(MyEventLoopPolicy()) + + loop = asyncio.new_event_loop() + _asyncio_event_loop = loop + def on_exception(loop, context): """Suppress spurious messages it appears we cannot control.""" SUPPRESS_MESSAGE_REGEX = re.compile('SSL handshake|Fatal read error on|' @@ -1396,13 +1427,21 @@ def create_and_start_event_loop() -> Tuple[asyncio.AbstractEventLoop, return loop.default_exception_handler(context) - loop = asyncio.get_event_loop() + def run_event_loop(): + try: + loop.run_until_complete(stopping_fut) + finally: + # clean-up + global _asyncio_event_loop + _asyncio_event_loop = None + loop.set_exception_handler(on_exception) # loop.set_debug(1) stopping_fut = loop.create_future() - loop_thread = threading.Thread(target=loop.run_until_complete, - args=(stopping_fut,), - name='EventLoop') + loop_thread = threading.Thread( + target=run_event_loop, + name='EventLoop', + ) loop_thread.start() return loop, stopping_fut, loop_thread @@ -1558,7 +1597,7 @@ class CallbackManager: on the event loop. """ if self.asyncio_loop is None: - self.asyncio_loop = asyncio.get_event_loop() + self.asyncio_loop = get_asyncio_loop() assert self.asyncio_loop.is_running(), "event loop not running" with self.callback_lock: callbacks = self.callbacks[event][:] @@ -1643,7 +1682,7 @@ class NetworkRetryManager(Generic[_NetAddrType]): class MySocksProxy(aiorpcx.SOCKSProxy): async def open_connection(self, host=None, port=None, **kwargs): - loop = asyncio.get_event_loop() + loop = asyncio.get_running_loop() reader = asyncio.StreamReader(loop=loop) protocol = asyncio.StreamReaderProtocol(reader, loop=loop) transport, _ = await self.create_connection( @@ -1753,6 +1792,7 @@ class nullcontext: def get_running_loop() -> Optional[asyncio.AbstractEventLoop]: + """Returns the asyncio event loop that is *running in this thread*, if any.""" try: return asyncio.get_running_loop() except RuntimeError: From 872ce82418b1876cf4c9dc49ec40c03deef806f8 Mon Sep 17 00:00:00 2001 From: SomberNight Date: Fri, 29 Apr 2022 18:48:33 +0200 Subject: [PATCH 2/2] tests: clean up event-loop creation --- electrum/tests/__init__.py | 4 ++++ electrum/tests/test_commands.py | 13 ------------- electrum/tests/test_lnpeer.py | 5 +---- electrum/tests/test_lnrouter.py | 5 +---- electrum/tests/test_lntransport.py | 9 --------- electrum/tests/test_network.py | 6 ------ electrum/tests/test_storage_upgrade.py | 6 ------ electrum/tests/test_wallet.py | 3 --- electrum/tests/test_wallet_vertical.py | 14 +------------- 9 files changed, 7 insertions(+), 58 deletions(-) diff --git a/electrum/tests/__init__.py b/electrum/tests/__init__.py index dbfc9ada0..b9c7eefe1 100644 --- a/electrum/tests/__init__.py +++ b/electrum/tests/__init__.py @@ -6,6 +6,7 @@ import shutil import electrum import electrum.logging from electrum import constants +from electrum import util # Set this locally to make the test suite run faster. @@ -37,9 +38,12 @@ class ElectrumTestCase(SequentialTestCase): def setUp(self): super().setUp() + self.asyncio_loop, self._stop_loop, self._loop_thread = util.create_and_start_event_loop() self.electrum_path = tempfile.mkdtemp() def tearDown(self): + self.asyncio_loop.call_soon_threadsafe(self._stop_loop.set_result, 1) + self._loop_thread.join(timeout=1) super().tearDown() shutil.rmtree(self.electrum_path) diff --git a/electrum/tests/test_commands.py b/electrum/tests/test_commands.py index 053976749..c16d2ccd6 100644 --- a/electrum/tests/test_commands.py +++ b/electrum/tests/test_commands.py @@ -2,7 +2,6 @@ import unittest from unittest import mock from decimal import Decimal -from electrum.util import create_and_start_event_loop from electrum.commands import Commands, eval_bool from electrum import storage, wallet from electrum.wallet import restore_wallet_from_text @@ -18,14 +17,8 @@ class TestCommands(ElectrumTestCase): def setUp(self): super().setUp() - self.asyncio_loop, self._stop_loop, self._loop_thread = create_and_start_event_loop() self.config = SimpleConfig({'electrum_path': self.electrum_path}) - def tearDown(self): - super().tearDown() - self.asyncio_loop.call_soon_threadsafe(self._stop_loop.set_result, 1) - self._loop_thread.join(timeout=1) - def test_setconfig_non_auth_number(self): self.assertEqual(7777, Commands._setconfig_normalize_value('rpcport', "7777")) self.assertEqual(7777, Commands._setconfig_normalize_value('rpcport', '7777')) @@ -135,14 +128,8 @@ class TestCommandsTestnet(TestCaseForTestnet): def setUp(self): super().setUp() - self.asyncio_loop, self._stop_loop, self._loop_thread = create_and_start_event_loop() self.config = SimpleConfig({'electrum_path': self.electrum_path}) - def tearDown(self): - super().tearDown() - self.asyncio_loop.call_soon_threadsafe(self._stop_loop.set_result, 1) - self._loop_thread.join(timeout=1) - def test_convert_xkey(self): cmds = Commands(config=self.config) xpubs = { diff --git a/electrum/tests/test_lnpeer.py b/electrum/tests/test_lnpeer.py index dbe948495..de9bf535a 100644 --- a/electrum/tests/test_lnpeer.py +++ b/electrum/tests/test_lnpeer.py @@ -22,7 +22,7 @@ from electrum.ecc import ECPrivkey from electrum import simple_config, lnutil from electrum.lnaddr import lnencode, LnAddr, lndecode from electrum.bitcoin import COIN, sha256 -from electrum.util import bh2u, create_and_start_event_loop, NetworkRetryManager, bfh, OldTaskGroup +from electrum.util import bh2u, NetworkRetryManager, bfh, OldTaskGroup from electrum.lnpeer import Peer from electrum.lnutil import LNPeerAddr, Keypair, privkey_to_pubkey from electrum.lnutil import PaymentFailure, LnFeatures, HTLCOwner @@ -369,7 +369,6 @@ class TestPeer(TestCaseForTestnet): def setUp(self): super().setUp() - self.asyncio_loop, self._stop_loop, self._loop_thread = create_and_start_event_loop() self._lnworkers_created = [] # type: List[MockLNWallet] def tearDown(self): @@ -380,8 +379,6 @@ class TestPeer(TestCaseForTestnet): self._lnworkers_created.clear() run(cleanup_lnworkers()) - self.asyncio_loop.call_soon_threadsafe(self._stop_loop.set_result, 1) - self._loop_thread.join(timeout=1) super().tearDown() def prepare_peers(self, alice_channel: Channel, bob_channel: Channel): diff --git a/electrum/tests/test_lnrouter.py b/electrum/tests/test_lnrouter.py index 863842291..1f466bd38 100644 --- a/electrum/tests/test_lnrouter.py +++ b/electrum/tests/test_lnrouter.py @@ -5,7 +5,7 @@ import shutil import asyncio from electrum import util -from electrum.util import bh2u, bfh, create_and_start_event_loop +from electrum.util import bh2u, bfh from electrum.lnutil import ShortChannelID from electrum.lnonion import (OnionHopsDataSingle, new_onion_packet, process_onion_packet, _decode_onion_error, decode_onion_error, @@ -33,7 +33,6 @@ class Test_LNRouter(TestCaseForTestnet): def setUp(self): super().setUp() - self.asyncio_loop, self._stop_loop, self._loop_thread = create_and_start_event_loop() self.config = SimpleConfig({'electrum_path': self.electrum_path}) def tearDown(self): @@ -41,8 +40,6 @@ class Test_LNRouter(TestCaseForTestnet): if self.cdb: self.cdb.stop() asyncio.run_coroutine_threadsafe(self.cdb.stopped_event.wait(), self.asyncio_loop).result() - self.asyncio_loop.call_soon_threadsafe(self._stop_loop.set_result, 1) - self._loop_thread.join(timeout=1) super().tearDown() def prepare_graph(self): diff --git a/electrum/tests/test_lntransport.py b/electrum/tests/test_lntransport.py index 54ca0e30d..c24b38e32 100644 --- a/electrum/tests/test_lntransport.py +++ b/electrum/tests/test_lntransport.py @@ -12,15 +12,6 @@ from .test_bitcoin import needs_test_with_all_chacha20_implementations class TestLNTransport(ElectrumTestCase): - def setUp(self): - super().setUp() - self.asyncio_loop, self._stop_loop, self._loop_thread = util.create_and_start_event_loop() - - def tearDown(self): - self.asyncio_loop.call_soon_threadsafe(self._stop_loop.set_result, 1) - self._loop_thread.join(timeout=1) - super().tearDown() - @needs_test_with_all_chacha20_implementations def test_responder(self): # local static diff --git a/electrum/tests/test_network.py b/electrum/tests/test_network.py index 36ff1b451..66d905b76 100644 --- a/electrum/tests/test_network.py +++ b/electrum/tests/test_network.py @@ -55,15 +55,9 @@ class TestNetwork(ElectrumTestCase): def setUp(self): super().setUp() - self.asyncio_loop, self._stop_loop, self._loop_thread = util.create_and_start_event_loop() self.config = SimpleConfig({'electrum_path': self.electrum_path}) self.interface = MockInterface(self.config) - def tearDown(self): - self.asyncio_loop.call_soon_threadsafe(self._stop_loop.set_result, 1) - self._loop_thread.join(timeout=1) - super().tearDown() - def test_fork_noconflict(self): blockchain.blockchains = {} self.interface.q.put_nowait({'block_height': 8, 'mock': {'catchup':1, 'check': lambda x: False, 'connect': lambda x: False}}) diff --git a/electrum/tests/test_storage_upgrade.py b/electrum/tests/test_storage_upgrade.py index e8ca06f0e..e5c582db0 100644 --- a/electrum/tests/test_storage_upgrade.py +++ b/electrum/tests/test_storage_upgrade.py @@ -18,12 +18,6 @@ from .test_wallet import WalletTestCase # TODO hw wallet with client version 2.6.x (single-, and multiacc) class TestStorageUpgrade(WalletTestCase): - def setUp(self): - super().setUp() - - def tearDown(self): - super().tearDown() - def testnet_wallet(func): # note: it's ok to modify global network constants in subclasses of SequentialTestCase def wrapper(self, *args, **kwargs): diff --git a/electrum/tests/test_wallet.py b/electrum/tests/test_wallet.py index 7a90a3e35..fa49c7c4c 100644 --- a/electrum/tests/test_wallet.py +++ b/electrum/tests/test_wallet.py @@ -35,7 +35,6 @@ class WalletTestCase(ElectrumTestCase): def setUp(self): super(WalletTestCase, self).setUp() - self.asyncio_loop, self._stop_loop, self._loop_thread = util.create_and_start_event_loop() self.user_dir = tempfile.mkdtemp() self.config = SimpleConfig({'electrum_path': self.user_dir}) @@ -46,8 +45,6 @@ class WalletTestCase(ElectrumTestCase): sys.stdout = self._stdout_buffer def tearDown(self): - self.asyncio_loop.call_soon_threadsafe(self._stop_loop.set_result, 1) - self._loop_thread.join(timeout=1) super(WalletTestCase, self).tearDown() shutil.rmtree(self.user_dir) # Restore the "real" stdout diff --git a/electrum/tests/test_wallet_vertical.py b/electrum/tests/test_wallet_vertical.py index 6f41f65b0..130b72e9a 100644 --- a/electrum/tests/test_wallet_vertical.py +++ b/electrum/tests/test_wallet_vertical.py @@ -14,7 +14,7 @@ from electrum.address_synchronizer import TX_HEIGHT_UNCONFIRMED, TX_HEIGHT_UNCON from electrum.wallet import (sweep, Multisig_Wallet, Standard_Wallet, Imported_Wallet, restore_wallet_from_text, Abstract_Wallet, BumpFeeStrategy) from electrum.util import ( - bfh, bh2u, create_and_start_event_loop, NotEnoughFunds, UnrelatedTransactionException, + bfh, bh2u, NotEnoughFunds, UnrelatedTransactionException, UserFacingException) from electrum.transaction import (TxOutput, Transaction, PartialTransaction, PartialTxOutput, PartialTxInput, tx_from_any, TxOutpoint) @@ -701,14 +701,8 @@ class TestWalletSending(TestCaseForTestnet): def setUp(self): super().setUp() - self.asyncio_loop, self._stop_loop, self._loop_thread = util.create_and_start_event_loop() self.config = SimpleConfig({'electrum_path': self.electrum_path}) - def tearDown(self): - self.asyncio_loop.call_soon_threadsafe(self._stop_loop.set_result, 1) - self._loop_thread.join(timeout=1) - super().tearDown() - def create_standard_wallet_from_seed(self, seed_words, *, config=None, gap_limit=2): if config is None: config = self.config @@ -3271,14 +3265,8 @@ class TestWalletHistory_DoubleSpend(TestCaseForTestnet): def setUp(self): super().setUp() - self.asyncio_loop, self._stop_loop, self._loop_thread = util.create_and_start_event_loop() self.config = SimpleConfig({'electrum_path': self.electrum_path}) - def tearDown(self): - self.asyncio_loop.call_soon_threadsafe(self._stop_loop.set_result, 1) - self._loop_thread.join(timeout=1) - super().tearDown() - @mock.patch.object(wallet.Abstract_Wallet, 'save_db') def test_restoring_wallet_without_manual_delete(self, mock_save_db): w = restore_wallet_from_text("small rapid pattern language comic denial donate extend tide fever burden barrel",