Browse Source

asyncio: stop using get_event_loop(). introduce ~singleton loop.

asyncio.get_event_loop() became deprecated in python3.10. (see https://github.com/python/cpython/issues/83710)
```
.../electrum/electrum/daemon.py:470: DeprecationWarning: There is no current event loop
  self.asyncio_loop = asyncio.get_event_loop()
.../electrum/electrum/network.py:276: DeprecationWarning: There is no current event loop
  self.asyncio_loop = asyncio.get_event_loop()
```
Also, according to that thread, "set_event_loop() [... is] not deprecated by oversight".
So, we stop using get_event_loop() and set_event_loop() in our own code.
Note that libraries we use (such as the stdlib for python <3.10), might call get_event_loop,
which then relies on us having called set_event_loop e.g. for the GUI thread. To work around
this, a custom event loop policy providing a get_event_loop implementation is used.

Previously, we have been using a single asyncio event loop, created with
util.create_and_start_event_loop, and code in many places got a reference to this loop
using asyncio.get_event_loop().
Now, we still use a single asyncio event loop, but it is now stored as a global in
util._asyncio_event_loop (access with util.get_asyncio_loop()).

I believe these changes also fix https://github.com/spesmilo/electrum/issues/5376
patch-4
SomberNight 3 years ago
parent
commit
2c57c78ebe
No known key found for this signature in database GPG Key ID: B33B5F232C6271E9
  1. 2
      electrum/commands.py
  2. 4
      electrum/daemon.py
  3. 4
      electrum/exchange_rate.py
  4. 2
      electrum/gui/kivy/main_window.py
  5. 2
      electrum/lnworker.py
  6. 12
      electrum/network.py
  7. 1
      electrum/sql_db.py
  8. 5
      electrum/tests/test_lnpeer.py
  9. 3
      electrum/tests/test_lnrouter.py
  10. 16
      electrum/tests/test_lntransport.py
  11. 23
      electrum/tests/test_network.py
  12. 3
      electrum/tests/test_storage_upgrade.py
  13. 6
      electrum/tests/test_wallet.py
  14. 45
      electrum/tests/test_wallet_vertical.py
  15. 56
      electrum/util.py

2
electrum/commands.py

@ -187,7 +187,7 @@ class Commands:
kwargs.pop('wallet')
coro = f(*args, **kwargs)
fut = asyncio.run_coroutine_threadsafe(coro, asyncio.get_event_loop())
fut = asyncio.run_coroutine_threadsafe(coro, util.get_asyncio_loop())
result = fut.result()
if self._callback:

4
electrum/daemon.py

@ -124,7 +124,7 @@ def request(config: SimpleConfig, endpoint, args=(), timeout=60):
rpc_user, rpc_password = get_rpc_credentials(config)
server_url = 'http://%s:%d' % (host, port)
auth = aiohttp.BasicAuth(login=rpc_user, password=rpc_password)
loop = asyncio.get_event_loop()
loop = util.get_asyncio_loop()
async def request_coroutine():
if socktype == 'unix':
connector = aiohttp.UnixConnector(path=path)
@ -467,7 +467,7 @@ class Daemon(Logger):
if 'wallet_path' in config.cmdline_options:
self.logger.warning("Ignoring parameter 'wallet_path' for daemon. "
"Use the load_wallet command instead.")
self.asyncio_loop = asyncio.get_event_loop()
self.asyncio_loop = util.get_asyncio_loop()
self.network = None
if not config.get('offline'):
self.network = Network(config, daemon=self)

4
electrum/exchange_rate.py

@ -148,7 +148,7 @@ class ExchangeBase(Logger):
if h is None:
h = self.read_historical_rates(ccy, cache_dir)
if h is None or h['timestamp'] < time.time() - 24*3600:
asyncio.get_event_loop().create_task(self.get_historical_rates_safe(ccy, cache_dir))
util.get_asyncio_loop().create_task(self.get_historical_rates_safe(ccy, cache_dir))
def history_ccys(self) -> Sequence[str]:
return []
@ -471,7 +471,7 @@ def get_exchanges_and_currencies():
for name, klass in exchanges.items():
exchange = klass(None, None)
await group.spawn(get_currencies_safe(name, exchange))
loop = asyncio.get_event_loop()
loop = util.get_asyncio_loop()
try:
loop.run_until_complete(query_all_exchanges_for_their_ccys_over_network())
except Exception as e:

2
electrum/gui/kivy/main_window.py

@ -394,7 +394,7 @@ class ElectrumWindow(App, Logger):
self.is_exit = False
self.wallet = None # type: Optional[Abstract_Wallet]
self.pause_time = 0
self.asyncio_loop = asyncio.get_event_loop()
self.asyncio_loop = util.get_asyncio_loop()
self.password = None
self._use_single_password = False
self.resume_dialog = None

2
electrum/lnworker.py

@ -496,7 +496,7 @@ class LNWorker(Logger, NetworkRetryManager[LNPeerAddr]):
# Try DNS-resolving the host (if needed). This is simply so that
# the caller gets a nice exception if it cannot be resolved.
try:
await asyncio.get_event_loop().getaddrinfo(host, port)
await asyncio.get_running_loop().getaddrinfo(host, port)
except socket.gaierror:
raise ConnStringFormatError(_('Hostname does not resolve (getaddrinfo failed)'))
# add peer

12
electrum/network.py

@ -273,7 +273,7 @@ class Network(Logger, NetworkRetryManager[ServerAddr]):
init_retry_delay_urgent=1,
)
self.asyncio_loop = asyncio.get_event_loop()
self.asyncio_loop = util.get_asyncio_loop()
assert self.asyncio_loop.is_running(), "event loop not running"
assert isinstance(config, SimpleConfig), f"config should be a SimpleConfig instead of {type(config)}"
@ -381,9 +381,11 @@ class Network(Logger, NetworkRetryManager[ServerAddr]):
self.channel_db = None
self.path_finder = None
def run_from_another_thread(self, coro, *, timeout=None):
assert util.get_running_loop() != self.asyncio_loop, 'must not be called from network thread'
fut = asyncio.run_coroutine_threadsafe(coro, self.asyncio_loop)
@classmethod
def run_from_another_thread(cls, coro, *, timeout=None):
loop = util.get_asyncio_loop()
assert util.get_running_loop() != loop, 'must not be called from asyncio thread'
fut = asyncio.run_coroutine_threadsafe(coro, loop)
return fut.result(timeout)
@staticmethod
@ -1321,7 +1323,7 @@ class Network(Logger, NetworkRetryManager[ServerAddr]):
assert util.get_running_loop() != network.asyncio_loop
loop = network.asyncio_loop
else:
loop = asyncio.get_event_loop()
loop = util.get_asyncio_loop()
coro = asyncio.run_coroutine_threadsafe(cls._send_http_on_proxy(method, url, **kwargs), loop)
# note: _send_http_on_proxy has its own timeout, so no timeout here:
return coro.result()

1
electrum/sql_db.py

@ -26,7 +26,6 @@ class SqlDB(Logger):
def __init__(self, asyncio_loop: asyncio.BaseEventLoop, path, commit_interval=None):
Logger.__init__(self)
self.asyncio_loop = asyncio_loop
asyncio.set_event_loop(asyncio_loop)
self.stopping = False
self.stopped_event = asyncio.Event()
self.path = path

5
electrum/tests/test_lnpeer.py

@ -15,6 +15,7 @@ from aiorpcx import timeout_after, TaskTimeout
import electrum
import electrum.trampoline
from electrum import bitcoin
from electrum import util
from electrum import constants
from electrum.network import Network
from electrum.ecc import ECPrivkey
@ -62,7 +63,7 @@ class MockNetwork:
user_config = {}
user_dir = tempfile.mkdtemp(prefix="electrum-lnpeer-test-")
self.config = simple_config.SimpleConfig(user_config, read_user_dir_function=lambda: user_dir)
self.asyncio_loop = asyncio.get_event_loop()
self.asyncio_loop = util.get_asyncio_loop()
self.channel_db = ChannelDB(self)
self.channel_db.data_loaded.set()
self.path_finder = LNPathFinder(self.channel_db)
@ -1361,4 +1362,4 @@ class TestPeer(TestCaseForTestnet):
def run(coro):
return asyncio.run_coroutine_threadsafe(coro, loop=asyncio.get_event_loop()).result()
return asyncio.run_coroutine_threadsafe(coro, loop=util.get_asyncio_loop()).result()

3
electrum/tests/test_lnrouter.py

@ -4,6 +4,7 @@ import tempfile
import shutil
import asyncio
from electrum import util
from electrum.util import bh2u, bfh, create_and_start_event_loop
from electrum.lnutil import ShortChannelID
from electrum.lnonion import (OnionHopsDataSingle, new_onion_packet,
@ -64,7 +65,7 @@ class Test_LNRouter(TestCaseForTestnet):
"""
class fake_network:
config = self.config
asyncio_loop = asyncio.get_event_loop()
asyncio_loop = util.get_asyncio_loop()
trigger_callback = lambda *args: None
register_callback = lambda *args: None
interface = None

16
electrum/tests/test_lntransport.py

@ -1,5 +1,6 @@
import asyncio
from electrum import util
from electrum.ecc import ECPrivkey
from electrum.lnutil import LNPeerAddr
from electrum.lntransport import LNResponderTransport, LNTransport
@ -11,6 +12,15 @@ from .test_bitcoin import needs_test_with_all_chacha20_implementations
class TestLNTransport(ElectrumTestCase):
def setUp(self):
super().setUp()
self.asyncio_loop, self._stop_loop, self._loop_thread = util.create_and_start_event_loop()
def tearDown(self):
self.asyncio_loop.call_soon_threadsafe(self._stop_loop.set_result, 1)
self._loop_thread.join(timeout=1)
super().tearDown()
@needs_test_with_all_chacha20_implementations
def test_responder(self):
# local static
@ -38,11 +48,11 @@ class TestLNTransport(ElectrumTestCase):
assert num_bytes == 66
return bytes.fromhex('00b9e3a702e93e3a9948c2ed6e5fd7590a6e1c3a0344cfc9d5b57357049aa22355361aa02e55a8fc28fef5bd6d71ad0c38228dc68b1c466263b47fdf31e560e139ba')
transport = LNResponderTransport(ls_priv, Reader(), Writer())
asyncio.get_event_loop().run_until_complete(transport.handshake(epriv=e_priv))
asyncio.run_coroutine_threadsafe(
transport.handshake(epriv=e_priv), self.asyncio_loop).result()
@needs_test_with_all_chacha20_implementations
def test_loop(self):
loop = asyncio.get_event_loop()
responder_shaked = asyncio.Event()
server_shaked = asyncio.Event()
responder_key = ECPrivkey.generate_random_key()
@ -96,4 +106,4 @@ class TestLNTransport(ElectrumTestCase):
server.close()
await server.wait_closed()
loop.run_until_complete(f())
asyncio.run_coroutine_threadsafe(f(), self.asyncio_loop).result()

23
electrum/tests/test_network.py

@ -8,6 +8,7 @@ from electrum import blockchain
from electrum.interface import Interface, ServerAddr
from electrum.crypto import sha256
from electrum.util import bh2u
from electrum import util
from . import ElectrumTestCase
@ -17,7 +18,9 @@ class MockTaskGroup:
class MockNetwork:
taskgroup = MockTaskGroup()
asyncio_loop = asyncio.get_event_loop()
def __init__(self):
self.asyncio_loop = util.get_asyncio_loop()
class MockInterface(Interface):
def __init__(self, config):
@ -52,9 +55,15 @@ class TestNetwork(ElectrumTestCase):
def setUp(self):
super().setUp()
self.asyncio_loop, self._stop_loop, self._loop_thread = util.create_and_start_event_loop()
self.config = SimpleConfig({'electrum_path': self.electrum_path})
self.interface = MockInterface(self.config)
def tearDown(self):
self.asyncio_loop.call_soon_threadsafe(self._stop_loop.set_result, 1)
self._loop_thread.join(timeout=1)
super().tearDown()
def test_fork_noconflict(self):
blockchain.blockchains = {}
self.interface.q.put_nowait({'block_height': 8, 'mock': {'catchup':1, 'check': lambda x: False, 'connect': lambda x: False}})
@ -66,7 +75,8 @@ class TestNetwork(ElectrumTestCase):
self.interface.q.put_nowait({'block_height': 5, 'mock': {'binary':1,'check':lambda x: True, 'connect': lambda x: True}})
self.interface.q.put_nowait({'block_height': 6, 'mock': {'binary':1,'check':lambda x: True, 'connect': lambda x: True}})
ifa = self.interface
self.assertEqual(('fork', 8), asyncio.get_event_loop().run_until_complete(ifa.sync_until(8, next_height=7)))
fut = asyncio.run_coroutine_threadsafe(ifa.sync_until(8, next_height=7), util.get_asyncio_loop())
self.assertEqual(('fork', 8), fut.result())
self.assertEqual(self.interface.q.qsize(), 0)
def test_fork_conflict(self):
@ -80,7 +90,8 @@ class TestNetwork(ElectrumTestCase):
self.interface.q.put_nowait({'block_height': 5, 'mock': {'binary':1,'check':lambda x: True, 'connect': lambda x: True}})
self.interface.q.put_nowait({'block_height': 6, 'mock': {'binary':1,'check':lambda x: True, 'connect': lambda x: True}})
ifa = self.interface
self.assertEqual(('fork', 8), asyncio.get_event_loop().run_until_complete(ifa.sync_until(8, next_height=7)))
fut = asyncio.run_coroutine_threadsafe(ifa.sync_until(8, next_height=7), util.get_asyncio_loop())
self.assertEqual(('fork', 8), fut.result())
self.assertEqual(self.interface.q.qsize(), 0)
def test_can_connect_during_backward(self):
@ -93,7 +104,8 @@ class TestNetwork(ElectrumTestCase):
self.interface.q.put_nowait({'block_height': 3, 'mock': {'catchup':1, 'check': lambda x: False, 'connect': lambda x: True}})
self.interface.q.put_nowait({'block_height': 4, 'mock': {'catchup':1, 'check': lambda x: False, 'connect': lambda x: True}})
ifa = self.interface
self.assertEqual(('catchup', 5), asyncio.get_event_loop().run_until_complete(ifa.sync_until(8, next_height=4)))
fut = asyncio.run_coroutine_threadsafe(ifa.sync_until(8, next_height=4), util.get_asyncio_loop())
self.assertEqual(('catchup', 5), fut.result())
self.assertEqual(self.interface.q.qsize(), 0)
def mock_fork(self, bad_header):
@ -113,7 +125,8 @@ class TestNetwork(ElectrumTestCase):
self.interface.q.put_nowait({'block_height': 5, 'mock': {'catchup':1, 'check': lambda x: False, 'connect': lambda x: True}})
self.interface.q.put_nowait({'block_height': 6, 'mock': {'catchup':1, 'check': lambda x: False, 'connect': lambda x: True}})
ifa = self.interface
self.assertEqual(('catchup', 7), asyncio.get_event_loop().run_until_complete(ifa.sync_until(8, next_height=6)))
fut = asyncio.run_coroutine_threadsafe(ifa.sync_until(8, next_height=6), util.get_asyncio_loop())
self.assertEqual(('catchup', 7), fut.result())
self.assertEqual(self.interface.q.qsize(), 0)

3
electrum/tests/test_storage_upgrade.py

@ -20,12 +20,9 @@ class TestStorageUpgrade(WalletTestCase):
def setUp(self):
super().setUp()
self.asyncio_loop, self._stop_loop, self._loop_thread = util.create_and_start_event_loop()
def tearDown(self):
super().tearDown()
self.asyncio_loop.call_soon_threadsafe(self._stop_loop.set_result, 1)
self._loop_thread.join(timeout=1)
def testnet_wallet(func):
# note: it's ok to modify global network constants in subclasses of SequentialTestCase

6
electrum/tests/test_wallet.py

@ -35,6 +35,7 @@ class WalletTestCase(ElectrumTestCase):
def setUp(self):
super(WalletTestCase, self).setUp()
self.asyncio_loop, self._stop_loop, self._loop_thread = util.create_and_start_event_loop()
self.user_dir = tempfile.mkdtemp()
self.config = SimpleConfig({'electrum_path': self.user_dir})
@ -45,6 +46,8 @@ class WalletTestCase(ElectrumTestCase):
sys.stdout = self._stdout_buffer
def tearDown(self):
self.asyncio_loop.call_soon_threadsafe(self._stop_loop.set_result, 1)
self._loop_thread.join(timeout=1)
super(WalletTestCase, self).tearDown()
shutil.rmtree(self.user_dir)
# Restore the "real" stdout
@ -241,12 +244,9 @@ class TestWalletPassword(WalletTestCase):
def setUp(self):
super().setUp()
self.asyncio_loop, self._stop_loop, self._loop_thread = util.create_and_start_event_loop()
def tearDown(self):
super().tearDown()
self.asyncio_loop.call_soon_threadsafe(self._stop_loop.set_result, 1)
self._loop_thread.join(timeout=1)
def test_update_password_of_imported_wallet(self):
wallet_str = '{"addr_history":{"1364Js2VG66BwRdkaoxAaFtdPb1eQgn8Dr":[],"15CyDgLffJsJgQrhcyooFH4gnVDG82pUrA":[],"1Exet2BhHsFxKTwhnfdsBMkPYLGvobxuW6":[]},"addresses":{"change":[],"receiving":["1364Js2VG66BwRdkaoxAaFtdPb1eQgn8Dr","1Exet2BhHsFxKTwhnfdsBMkPYLGvobxuW6","15CyDgLffJsJgQrhcyooFH4gnVDG82pUrA"]},"keystore":{"keypairs":{"0344b1588589958b0bcab03435061539e9bcf54677c104904044e4f8901f4ebdf5":"L2sED74axVXC4H8szBJ4rQJrkfem7UMc6usLCPUoEWxDCFGUaGUM","0389508c13999d08ffae0f434a085f4185922d64765c0bff2f66e36ad7f745cc5f":"L3Gi6EQLvYw8gEEUckmqawkevfj9s8hxoQDFveQJGZHTfyWnbk1U","04575f52b82f159fa649d2a4c353eb7435f30206f0a6cb9674fbd659f45082c37d559ffd19bea9c0d3b7dcc07a7b79f4cffb76026d5d4dff35341efe99056e22d2":"5JyVyXU1LiRXATvRTQvR9Kp8Rx1X84j2x49iGkjSsXipydtByUq"},"type":"imported"},"pruned_txo":{},"seed_version":13,"stored_height":-1,"transactions":{},"tx_fees":{},"txi":{},"txo":{},"use_encryption":false,"verified_tx3":{},"wallet_type":"standard","winpos-qt":[100,100,840,405]}'

45
electrum/tests/test_wallet_vertical.py

@ -9,6 +9,7 @@ import copy
from electrum import storage, bitcoin, keystore, bip32, slip39, wallet
from electrum import Transaction
from electrum import SimpleConfig
from electrum import util
from electrum.address_synchronizer import TX_HEIGHT_UNCONFIRMED, TX_HEIGHT_UNCONF_PARENT
from electrum.wallet import (sweep, Multisig_Wallet, Standard_Wallet, Imported_Wallet,
restore_wallet_from_text, Abstract_Wallet, BumpFeeStrategy)
@ -18,6 +19,7 @@ from electrum.util import (
from electrum.transaction import (TxOutput, Transaction, PartialTransaction, PartialTxOutput,
PartialTxInput, tx_from_any, TxOutpoint)
from electrum.mnemonic import seed_type
from electrum.network import Network
from electrum.plugins.trustedcoin import trustedcoin
@ -699,8 +701,14 @@ class TestWalletSending(TestCaseForTestnet):
def setUp(self):
super().setUp()
self.asyncio_loop, self._stop_loop, self._loop_thread = util.create_and_start_event_loop()
self.config = SimpleConfig({'electrum_path': self.electrum_path})
def tearDown(self):
self.asyncio_loop.call_soon_threadsafe(self._stop_loop.set_result, 1)
self._loop_thread.join(timeout=1)
super().tearDown()
def create_standard_wallet_from_seed(self, seed_words, *, config=None, gap_limit=2):
if config is None:
config = self.config
@ -1369,14 +1377,7 @@ class TestWalletSending(TestCaseForTestnet):
raise Exception("unexpected txid")
def has_internet_connection(self):
return True
def run_from_another_thread(self, coro, *, timeout=None):
loop, stop_loop, loop_thread = create_and_start_event_loop()
fut = asyncio.run_coroutine_threadsafe(coro, loop)
try:
return fut.result(timeout)
finally:
loop.call_soon_threadsafe(stop_loop.set_result, 1)
loop_thread.join(timeout=1)
run_from_another_thread = Network.run_from_another_thread
def get_local_height(self):
return 0
def blockchain(self):
@ -1429,14 +1430,7 @@ class TestWalletSending(TestCaseForTestnet):
raise Exception("unexpected txid")
def has_internet_connection(self):
return True
def run_from_another_thread(self, coro, *, timeout=None):
loop, stop_loop, loop_thread = create_and_start_event_loop()
fut = asyncio.run_coroutine_threadsafe(coro, loop)
try:
return fut.result(timeout)
finally:
loop.call_soon_threadsafe(stop_loop.set_result, 1)
loop_thread.join(timeout=1)
run_from_another_thread = Network.run_from_another_thread
def get_local_height(self):
return 0
def blockchain(self):
@ -1844,8 +1838,8 @@ class TestWalletSending(TestCaseForTestnet):
network = NetworkMock()
dest_addr = 'tb1q3ws2p0qjk5vrravv065xqlnkckvzcpclk79eu2'
sweep_coro = sweep(privkeys, network=network, config=self.config, to_address=dest_addr, fee=5000, locktime=1325785, tx_version=1)
loop = asyncio.get_event_loop()
tx = loop.run_until_complete(sweep_coro)
loop = util.get_asyncio_loop()
tx = asyncio.run_coroutine_threadsafe(sweep_coro, loop).result()
tx_copy = tx_from_any(tx.serialize())
self.assertEqual('010000000129349e5641d79915e9d0282fdbaee8c3df0b6731bab9d70bf626e8588bde24ac010000004847304402206bf0d0a93abae0d5873a62ebf277a5dd2f33837821e8b93e74d04e19d71b578002201a6d729bc159941ef5c4c9e5fe13ece9fc544351ba531b00f68ba549c8b38a9a01fdffffff01b82e0f00000000001600148ba0a0bc12b51831f58c7ea8607e76c5982c071fd93a1400',
@ -2199,14 +2193,7 @@ class TestWalletSending(TestCaseForTestnet):
raise Exception("unexpected txid")
def has_internet_connection(self):
return True
def run_from_another_thread(self, coro, *, timeout=None):
loop, stop_loop, loop_thread = create_and_start_event_loop()
fut = asyncio.run_coroutine_threadsafe(coro, loop)
try:
return fut.result(timeout)
finally:
loop.call_soon_threadsafe(stop_loop.set_result, 1)
loop_thread.join(timeout=1)
run_from_another_thread = Network.run_from_another_thread
def get_local_height(self):
return 0
def blockchain(self):
@ -3284,8 +3271,14 @@ class TestWalletHistory_DoubleSpend(TestCaseForTestnet):
def setUp(self):
super().setUp()
self.asyncio_loop, self._stop_loop, self._loop_thread = util.create_and_start_event_loop()
self.config = SimpleConfig({'electrum_path': self.electrum_path})
def tearDown(self):
self.asyncio_loop.call_soon_threadsafe(self._stop_loop.set_result, 1)
self._loop_thread.join(timeout=1)
super().tearDown()
@mock.patch.object(wallet.Abstract_Wallet, 'save_db')
def test_restoring_wallet_without_manual_delete(self, mock_save_db):
w = restore_wallet_from_text("small rapid pattern language comic denial donate extend tide fever burden barrel",

56
electrum/util.py

@ -1031,7 +1031,7 @@ def parse_URI(uri: str, on_pr: Callable = None, *, loop=None) -> dict:
request = await pr.get_payment_request(r)
if on_pr:
on_pr(request)
loop = loop or asyncio.get_event_loop()
loop = loop or get_asyncio_loop()
asyncio.run_coroutine_threadsafe(get_payment_request(), loop)
return out
@ -1319,7 +1319,6 @@ class NetworkJobOnDefaultServer(Logger, ABC):
"""
def __init__(self, network: 'Network'):
Logger.__init__(self)
asyncio.set_event_loop(network.asyncio_loop)
self.network = network
self.interface = None # type: Interface
self._restart_lock = asyncio.Lock()
@ -1384,9 +1383,41 @@ class NetworkJobOnDefaultServer(Logger, ABC):
return s
_asyncio_event_loop = None # type: Optional[asyncio.AbstractEventLoop]
def get_asyncio_loop() -> asyncio.AbstractEventLoop:
"""Returns the global asyncio event loop we use."""
if _asyncio_event_loop is None:
raise Exception("event loop not created yet")
return _asyncio_event_loop
def create_and_start_event_loop() -> Tuple[asyncio.AbstractEventLoop,
asyncio.Future,
threading.Thread]:
global _asyncio_event_loop
if _asyncio_event_loop is not None:
raise Exception("there is already a running event loop")
# asyncio.get_event_loop() became deprecated in python3.10. (see https://github.com/python/cpython/issues/83710)
# We set a custom event loop policy purely to be compatible with code that
# relies on asyncio.get_event_loop().
# - in python 3.8-3.9, asyncio.Event.__init__, asyncio.Lock.__init__,
# and similar, calls get_event_loop. see https://github.com/python/cpython/pull/23420
class MyEventLoopPolicy(asyncio.DefaultEventLoopPolicy):
def get_event_loop(self):
# In case electrum is being used as a library, there might be other
# event loops in use besides ours. To minimise interfering with those,
# if there is a loop running in the current thread, return that:
running_loop = get_running_loop()
if running_loop is not None:
return running_loop
# Otherwise, return our global loop:
return get_asyncio_loop()
asyncio.set_event_loop_policy(MyEventLoopPolicy())
loop = asyncio.new_event_loop()
_asyncio_event_loop = loop
def on_exception(loop, context):
"""Suppress spurious messages it appears we cannot control."""
SUPPRESS_MESSAGE_REGEX = re.compile('SSL handshake|Fatal read error on|'
@ -1396,13 +1427,21 @@ def create_and_start_event_loop() -> Tuple[asyncio.AbstractEventLoop,
return
loop.default_exception_handler(context)
loop = asyncio.get_event_loop()
def run_event_loop():
try:
loop.run_until_complete(stopping_fut)
finally:
# clean-up
global _asyncio_event_loop
_asyncio_event_loop = None
loop.set_exception_handler(on_exception)
# loop.set_debug(1)
stopping_fut = loop.create_future()
loop_thread = threading.Thread(target=loop.run_until_complete,
args=(stopping_fut,),
name='EventLoop')
loop_thread = threading.Thread(
target=run_event_loop,
name='EventLoop',
)
loop_thread.start()
return loop, stopping_fut, loop_thread
@ -1558,7 +1597,7 @@ class CallbackManager:
on the event loop.
"""
if self.asyncio_loop is None:
self.asyncio_loop = asyncio.get_event_loop()
self.asyncio_loop = get_asyncio_loop()
assert self.asyncio_loop.is_running(), "event loop not running"
with self.callback_lock:
callbacks = self.callbacks[event][:]
@ -1643,7 +1682,7 @@ class NetworkRetryManager(Generic[_NetAddrType]):
class MySocksProxy(aiorpcx.SOCKSProxy):
async def open_connection(self, host=None, port=None, **kwargs):
loop = asyncio.get_event_loop()
loop = asyncio.get_running_loop()
reader = asyncio.StreamReader(loop=loop)
protocol = asyncio.StreamReaderProtocol(reader, loop=loop)
transport, _ = await self.create_connection(
@ -1753,6 +1792,7 @@ class nullcontext:
def get_running_loop() -> Optional[asyncio.AbstractEventLoop]:
"""Returns the asyncio event loop that is *running in this thread*, if any."""
try:
return asyncio.get_running_loop()
except RuntimeError:

Loading…
Cancel
Save