Browse Source

Merge pull request #7792 from SomberNight/202204_asyncio_cleanups

asyncio: stop using get_event_loop(). introduce ~singleton loop.
patch-4
ThomasV 3 years ago
committed by GitHub
parent
commit
6a9df7d827
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      electrum/commands.py
  2. 4
      electrum/daemon.py
  3. 4
      electrum/exchange_rate.py
  4. 2
      electrum/gui/kivy/main_window.py
  5. 2
      electrum/lnworker.py
  6. 12
      electrum/network.py
  7. 1
      electrum/sql_db.py
  8. 4
      electrum/tests/__init__.py
  9. 13
      electrum/tests/test_commands.py
  10. 10
      electrum/tests/test_lnpeer.py
  11. 8
      electrum/tests/test_lnrouter.py
  12. 7
      electrum/tests/test_lntransport.py
  13. 17
      electrum/tests/test_network.py
  14. 9
      electrum/tests/test_storage_upgrade.py
  15. 3
      electrum/tests/test_wallet.py
  16. 35
      electrum/tests/test_wallet_vertical.py
  17. 56
      electrum/util.py

2
electrum/commands.py

@ -187,7 +187,7 @@ class Commands:
kwargs.pop('wallet')
coro = f(*args, **kwargs)
fut = asyncio.run_coroutine_threadsafe(coro, asyncio.get_event_loop())
fut = asyncio.run_coroutine_threadsafe(coro, util.get_asyncio_loop())
result = fut.result()
if self._callback:

4
electrum/daemon.py

@ -124,7 +124,7 @@ def request(config: SimpleConfig, endpoint, args=(), timeout=60):
rpc_user, rpc_password = get_rpc_credentials(config)
server_url = 'http://%s:%d' % (host, port)
auth = aiohttp.BasicAuth(login=rpc_user, password=rpc_password)
loop = asyncio.get_event_loop()
loop = util.get_asyncio_loop()
async def request_coroutine():
if socktype == 'unix':
connector = aiohttp.UnixConnector(path=path)
@ -467,7 +467,7 @@ class Daemon(Logger):
if 'wallet_path' in config.cmdline_options:
self.logger.warning("Ignoring parameter 'wallet_path' for daemon. "
"Use the load_wallet command instead.")
self.asyncio_loop = asyncio.get_event_loop()
self.asyncio_loop = util.get_asyncio_loop()
self.network = None
if not config.get('offline'):
self.network = Network(config, daemon=self)

4
electrum/exchange_rate.py

@ -148,7 +148,7 @@ class ExchangeBase(Logger):
if h is None:
h = self.read_historical_rates(ccy, cache_dir)
if h is None or h['timestamp'] < time.time() - 24*3600:
asyncio.get_event_loop().create_task(self.get_historical_rates_safe(ccy, cache_dir))
util.get_asyncio_loop().create_task(self.get_historical_rates_safe(ccy, cache_dir))
def history_ccys(self) -> Sequence[str]:
return []
@ -471,7 +471,7 @@ def get_exchanges_and_currencies():
for name, klass in exchanges.items():
exchange = klass(None, None)
await group.spawn(get_currencies_safe(name, exchange))
loop = asyncio.get_event_loop()
loop = util.get_asyncio_loop()
try:
loop.run_until_complete(query_all_exchanges_for_their_ccys_over_network())
except Exception as e:

2
electrum/gui/kivy/main_window.py

@ -394,7 +394,7 @@ class ElectrumWindow(App, Logger):
self.is_exit = False
self.wallet = None # type: Optional[Abstract_Wallet]
self.pause_time = 0
self.asyncio_loop = asyncio.get_event_loop()
self.asyncio_loop = util.get_asyncio_loop()
self.password = None
self._use_single_password = False
self.resume_dialog = None

2
electrum/lnworker.py

@ -496,7 +496,7 @@ class LNWorker(Logger, NetworkRetryManager[LNPeerAddr]):
# Try DNS-resolving the host (if needed). This is simply so that
# the caller gets a nice exception if it cannot be resolved.
try:
await asyncio.get_event_loop().getaddrinfo(host, port)
await asyncio.get_running_loop().getaddrinfo(host, port)
except socket.gaierror:
raise ConnStringFormatError(_('Hostname does not resolve (getaddrinfo failed)'))
# add peer

12
electrum/network.py

@ -273,7 +273,7 @@ class Network(Logger, NetworkRetryManager[ServerAddr]):
init_retry_delay_urgent=1,
)
self.asyncio_loop = asyncio.get_event_loop()
self.asyncio_loop = util.get_asyncio_loop()
assert self.asyncio_loop.is_running(), "event loop not running"
assert isinstance(config, SimpleConfig), f"config should be a SimpleConfig instead of {type(config)}"
@ -381,9 +381,11 @@ class Network(Logger, NetworkRetryManager[ServerAddr]):
self.channel_db = None
self.path_finder = None
def run_from_another_thread(self, coro, *, timeout=None):
assert util.get_running_loop() != self.asyncio_loop, 'must not be called from network thread'
fut = asyncio.run_coroutine_threadsafe(coro, self.asyncio_loop)
@classmethod
def run_from_another_thread(cls, coro, *, timeout=None):
loop = util.get_asyncio_loop()
assert util.get_running_loop() != loop, 'must not be called from asyncio thread'
fut = asyncio.run_coroutine_threadsafe(coro, loop)
return fut.result(timeout)
@staticmethod
@ -1321,7 +1323,7 @@ class Network(Logger, NetworkRetryManager[ServerAddr]):
assert util.get_running_loop() != network.asyncio_loop
loop = network.asyncio_loop
else:
loop = asyncio.get_event_loop()
loop = util.get_asyncio_loop()
coro = asyncio.run_coroutine_threadsafe(cls._send_http_on_proxy(method, url, **kwargs), loop)
# note: _send_http_on_proxy has its own timeout, so no timeout here:
return coro.result()

1
electrum/sql_db.py

@ -26,7 +26,6 @@ class SqlDB(Logger):
def __init__(self, asyncio_loop: asyncio.BaseEventLoop, path, commit_interval=None):
Logger.__init__(self)
self.asyncio_loop = asyncio_loop
asyncio.set_event_loop(asyncio_loop)
self.stopping = False
self.stopped_event = asyncio.Event()
self.path = path

4
electrum/tests/__init__.py

@ -6,6 +6,7 @@ import shutil
import electrum
import electrum.logging
from electrum import constants
from electrum import util
# Set this locally to make the test suite run faster.
@ -37,9 +38,12 @@ class ElectrumTestCase(SequentialTestCase):
def setUp(self):
super().setUp()
self.asyncio_loop, self._stop_loop, self._loop_thread = util.create_and_start_event_loop()
self.electrum_path = tempfile.mkdtemp()
def tearDown(self):
self.asyncio_loop.call_soon_threadsafe(self._stop_loop.set_result, 1)
self._loop_thread.join(timeout=1)
super().tearDown()
shutil.rmtree(self.electrum_path)

13
electrum/tests/test_commands.py

@ -2,7 +2,6 @@ import unittest
from unittest import mock
from decimal import Decimal
from electrum.util import create_and_start_event_loop
from electrum.commands import Commands, eval_bool
from electrum import storage, wallet
from electrum.wallet import restore_wallet_from_text
@ -18,14 +17,8 @@ class TestCommands(ElectrumTestCase):
def setUp(self):
super().setUp()
self.asyncio_loop, self._stop_loop, self._loop_thread = create_and_start_event_loop()
self.config = SimpleConfig({'electrum_path': self.electrum_path})
def tearDown(self):
super().tearDown()
self.asyncio_loop.call_soon_threadsafe(self._stop_loop.set_result, 1)
self._loop_thread.join(timeout=1)
def test_setconfig_non_auth_number(self):
self.assertEqual(7777, Commands._setconfig_normalize_value('rpcport', "7777"))
self.assertEqual(7777, Commands._setconfig_normalize_value('rpcport', '7777'))
@ -135,14 +128,8 @@ class TestCommandsTestnet(TestCaseForTestnet):
def setUp(self):
super().setUp()
self.asyncio_loop, self._stop_loop, self._loop_thread = create_and_start_event_loop()
self.config = SimpleConfig({'electrum_path': self.electrum_path})
def tearDown(self):
super().tearDown()
self.asyncio_loop.call_soon_threadsafe(self._stop_loop.set_result, 1)
self._loop_thread.join(timeout=1)
def test_convert_xkey(self):
cmds = Commands(config=self.config)
xpubs = {

10
electrum/tests/test_lnpeer.py

@ -15,13 +15,14 @@ from aiorpcx import timeout_after, TaskTimeout
import electrum
import electrum.trampoline
from electrum import bitcoin
from electrum import util
from electrum import constants
from electrum.network import Network
from electrum.ecc import ECPrivkey
from electrum import simple_config, lnutil
from electrum.lnaddr import lnencode, LnAddr, lndecode
from electrum.bitcoin import COIN, sha256
from electrum.util import bh2u, create_and_start_event_loop, NetworkRetryManager, bfh, OldTaskGroup
from electrum.util import bh2u, NetworkRetryManager, bfh, OldTaskGroup
from electrum.lnpeer import Peer
from electrum.lnutil import LNPeerAddr, Keypair, privkey_to_pubkey
from electrum.lnutil import PaymentFailure, LnFeatures, HTLCOwner
@ -62,7 +63,7 @@ class MockNetwork:
user_config = {}
user_dir = tempfile.mkdtemp(prefix="electrum-lnpeer-test-")
self.config = simple_config.SimpleConfig(user_config, read_user_dir_function=lambda: user_dir)
self.asyncio_loop = asyncio.get_event_loop()
self.asyncio_loop = util.get_asyncio_loop()
self.channel_db = ChannelDB(self)
self.channel_db.data_loaded.set()
self.path_finder = LNPathFinder(self.channel_db)
@ -368,7 +369,6 @@ class TestPeer(TestCaseForTestnet):
def setUp(self):
super().setUp()
self.asyncio_loop, self._stop_loop, self._loop_thread = create_and_start_event_loop()
self._lnworkers_created = [] # type: List[MockLNWallet]
def tearDown(self):
@ -379,8 +379,6 @@ class TestPeer(TestCaseForTestnet):
self._lnworkers_created.clear()
run(cleanup_lnworkers())
self.asyncio_loop.call_soon_threadsafe(self._stop_loop.set_result, 1)
self._loop_thread.join(timeout=1)
super().tearDown()
def prepare_peers(self, alice_channel: Channel, bob_channel: Channel):
@ -1361,4 +1359,4 @@ class TestPeer(TestCaseForTestnet):
def run(coro):
return asyncio.run_coroutine_threadsafe(coro, loop=asyncio.get_event_loop()).result()
return asyncio.run_coroutine_threadsafe(coro, loop=util.get_asyncio_loop()).result()

8
electrum/tests/test_lnrouter.py

@ -4,7 +4,8 @@ import tempfile
import shutil
import asyncio
from electrum.util import bh2u, bfh, create_and_start_event_loop
from electrum import util
from electrum.util import bh2u, bfh
from electrum.lnutil import ShortChannelID
from electrum.lnonion import (OnionHopsDataSingle, new_onion_packet,
process_onion_packet, _decode_onion_error, decode_onion_error,
@ -32,7 +33,6 @@ class Test_LNRouter(TestCaseForTestnet):
def setUp(self):
super().setUp()
self.asyncio_loop, self._stop_loop, self._loop_thread = create_and_start_event_loop()
self.config = SimpleConfig({'electrum_path': self.electrum_path})
def tearDown(self):
@ -40,8 +40,6 @@ class Test_LNRouter(TestCaseForTestnet):
if self.cdb:
self.cdb.stop()
asyncio.run_coroutine_threadsafe(self.cdb.stopped_event.wait(), self.asyncio_loop).result()
self.asyncio_loop.call_soon_threadsafe(self._stop_loop.set_result, 1)
self._loop_thread.join(timeout=1)
super().tearDown()
def prepare_graph(self):
@ -64,7 +62,7 @@ class Test_LNRouter(TestCaseForTestnet):
"""
class fake_network:
config = self.config
asyncio_loop = asyncio.get_event_loop()
asyncio_loop = util.get_asyncio_loop()
trigger_callback = lambda *args: None
register_callback = lambda *args: None
interface = None

7
electrum/tests/test_lntransport.py

@ -1,5 +1,6 @@
import asyncio
from electrum import util
from electrum.ecc import ECPrivkey
from electrum.lnutil import LNPeerAddr
from electrum.lntransport import LNResponderTransport, LNTransport
@ -38,11 +39,11 @@ class TestLNTransport(ElectrumTestCase):
assert num_bytes == 66
return bytes.fromhex('00b9e3a702e93e3a9948c2ed6e5fd7590a6e1c3a0344cfc9d5b57357049aa22355361aa02e55a8fc28fef5bd6d71ad0c38228dc68b1c466263b47fdf31e560e139ba')
transport = LNResponderTransport(ls_priv, Reader(), Writer())
asyncio.get_event_loop().run_until_complete(transport.handshake(epriv=e_priv))
asyncio.run_coroutine_threadsafe(
transport.handshake(epriv=e_priv), self.asyncio_loop).result()
@needs_test_with_all_chacha20_implementations
def test_loop(self):
loop = asyncio.get_event_loop()
responder_shaked = asyncio.Event()
server_shaked = asyncio.Event()
responder_key = ECPrivkey.generate_random_key()
@ -96,4 +97,4 @@ class TestLNTransport(ElectrumTestCase):
server.close()
await server.wait_closed()
loop.run_until_complete(f())
asyncio.run_coroutine_threadsafe(f(), self.asyncio_loop).result()

17
electrum/tests/test_network.py

@ -8,6 +8,7 @@ from electrum import blockchain
from electrum.interface import Interface, ServerAddr
from electrum.crypto import sha256
from electrum.util import bh2u
from electrum import util
from . import ElectrumTestCase
@ -17,7 +18,9 @@ class MockTaskGroup:
class MockNetwork:
taskgroup = MockTaskGroup()
asyncio_loop = asyncio.get_event_loop()
def __init__(self):
self.asyncio_loop = util.get_asyncio_loop()
class MockInterface(Interface):
def __init__(self, config):
@ -66,7 +69,8 @@ class TestNetwork(ElectrumTestCase):
self.interface.q.put_nowait({'block_height': 5, 'mock': {'binary':1,'check':lambda x: True, 'connect': lambda x: True}})
self.interface.q.put_nowait({'block_height': 6, 'mock': {'binary':1,'check':lambda x: True, 'connect': lambda x: True}})
ifa = self.interface
self.assertEqual(('fork', 8), asyncio.get_event_loop().run_until_complete(ifa.sync_until(8, next_height=7)))
fut = asyncio.run_coroutine_threadsafe(ifa.sync_until(8, next_height=7), util.get_asyncio_loop())
self.assertEqual(('fork', 8), fut.result())
self.assertEqual(self.interface.q.qsize(), 0)
def test_fork_conflict(self):
@ -80,7 +84,8 @@ class TestNetwork(ElectrumTestCase):
self.interface.q.put_nowait({'block_height': 5, 'mock': {'binary':1,'check':lambda x: True, 'connect': lambda x: True}})
self.interface.q.put_nowait({'block_height': 6, 'mock': {'binary':1,'check':lambda x: True, 'connect': lambda x: True}})
ifa = self.interface
self.assertEqual(('fork', 8), asyncio.get_event_loop().run_until_complete(ifa.sync_until(8, next_height=7)))
fut = asyncio.run_coroutine_threadsafe(ifa.sync_until(8, next_height=7), util.get_asyncio_loop())
self.assertEqual(('fork', 8), fut.result())
self.assertEqual(self.interface.q.qsize(), 0)
def test_can_connect_during_backward(self):
@ -93,7 +98,8 @@ class TestNetwork(ElectrumTestCase):
self.interface.q.put_nowait({'block_height': 3, 'mock': {'catchup':1, 'check': lambda x: False, 'connect': lambda x: True}})
self.interface.q.put_nowait({'block_height': 4, 'mock': {'catchup':1, 'check': lambda x: False, 'connect': lambda x: True}})
ifa = self.interface
self.assertEqual(('catchup', 5), asyncio.get_event_loop().run_until_complete(ifa.sync_until(8, next_height=4)))
fut = asyncio.run_coroutine_threadsafe(ifa.sync_until(8, next_height=4), util.get_asyncio_loop())
self.assertEqual(('catchup', 5), fut.result())
self.assertEqual(self.interface.q.qsize(), 0)
def mock_fork(self, bad_header):
@ -113,7 +119,8 @@ class TestNetwork(ElectrumTestCase):
self.interface.q.put_nowait({'block_height': 5, 'mock': {'catchup':1, 'check': lambda x: False, 'connect': lambda x: True}})
self.interface.q.put_nowait({'block_height': 6, 'mock': {'catchup':1, 'check': lambda x: False, 'connect': lambda x: True}})
ifa = self.interface
self.assertEqual(('catchup', 7), asyncio.get_event_loop().run_until_complete(ifa.sync_until(8, next_height=6)))
fut = asyncio.run_coroutine_threadsafe(ifa.sync_until(8, next_height=6), util.get_asyncio_loop())
self.assertEqual(('catchup', 7), fut.result())
self.assertEqual(self.interface.q.qsize(), 0)

9
electrum/tests/test_storage_upgrade.py

@ -18,15 +18,6 @@ from .test_wallet import WalletTestCase
# TODO hw wallet with client version 2.6.x (single-, and multiacc)
class TestStorageUpgrade(WalletTestCase):
def setUp(self):
super().setUp()
self.asyncio_loop, self._stop_loop, self._loop_thread = util.create_and_start_event_loop()
def tearDown(self):
super().tearDown()
self.asyncio_loop.call_soon_threadsafe(self._stop_loop.set_result, 1)
self._loop_thread.join(timeout=1)
def testnet_wallet(func):
# note: it's ok to modify global network constants in subclasses of SequentialTestCase
def wrapper(self, *args, **kwargs):

3
electrum/tests/test_wallet.py

@ -241,12 +241,9 @@ class TestWalletPassword(WalletTestCase):
def setUp(self):
super().setUp()
self.asyncio_loop, self._stop_loop, self._loop_thread = util.create_and_start_event_loop()
def tearDown(self):
super().tearDown()
self.asyncio_loop.call_soon_threadsafe(self._stop_loop.set_result, 1)
self._loop_thread.join(timeout=1)
def test_update_password_of_imported_wallet(self):
wallet_str = '{"addr_history":{"1364Js2VG66BwRdkaoxAaFtdPb1eQgn8Dr":[],"15CyDgLffJsJgQrhcyooFH4gnVDG82pUrA":[],"1Exet2BhHsFxKTwhnfdsBMkPYLGvobxuW6":[]},"addresses":{"change":[],"receiving":["1364Js2VG66BwRdkaoxAaFtdPb1eQgn8Dr","1Exet2BhHsFxKTwhnfdsBMkPYLGvobxuW6","15CyDgLffJsJgQrhcyooFH4gnVDG82pUrA"]},"keystore":{"keypairs":{"0344b1588589958b0bcab03435061539e9bcf54677c104904044e4f8901f4ebdf5":"L2sED74axVXC4H8szBJ4rQJrkfem7UMc6usLCPUoEWxDCFGUaGUM","0389508c13999d08ffae0f434a085f4185922d64765c0bff2f66e36ad7f745cc5f":"L3Gi6EQLvYw8gEEUckmqawkevfj9s8hxoQDFveQJGZHTfyWnbk1U","04575f52b82f159fa649d2a4c353eb7435f30206f0a6cb9674fbd659f45082c37d559ffd19bea9c0d3b7dcc07a7b79f4cffb76026d5d4dff35341efe99056e22d2":"5JyVyXU1LiRXATvRTQvR9Kp8Rx1X84j2x49iGkjSsXipydtByUq"},"type":"imported"},"pruned_txo":{},"seed_version":13,"stored_height":-1,"transactions":{},"tx_fees":{},"txi":{},"txo":{},"use_encryption":false,"verified_tx3":{},"wallet_type":"standard","winpos-qt":[100,100,840,405]}'

35
electrum/tests/test_wallet_vertical.py

@ -9,15 +9,17 @@ import copy
from electrum import storage, bitcoin, keystore, bip32, slip39, wallet
from electrum import Transaction
from electrum import SimpleConfig
from electrum import util
from electrum.address_synchronizer import TX_HEIGHT_UNCONFIRMED, TX_HEIGHT_UNCONF_PARENT
from electrum.wallet import (sweep, Multisig_Wallet, Standard_Wallet, Imported_Wallet,
restore_wallet_from_text, Abstract_Wallet, BumpFeeStrategy)
from electrum.util import (
bfh, bh2u, create_and_start_event_loop, NotEnoughFunds, UnrelatedTransactionException,
bfh, bh2u, NotEnoughFunds, UnrelatedTransactionException,
UserFacingException)
from electrum.transaction import (TxOutput, Transaction, PartialTransaction, PartialTxOutput,
PartialTxInput, tx_from_any, TxOutpoint)
from electrum.mnemonic import seed_type
from electrum.network import Network
from electrum.plugins.trustedcoin import trustedcoin
@ -1369,14 +1371,7 @@ class TestWalletSending(TestCaseForTestnet):
raise Exception("unexpected txid")
def has_internet_connection(self):
return True
def run_from_another_thread(self, coro, *, timeout=None):
loop, stop_loop, loop_thread = create_and_start_event_loop()
fut = asyncio.run_coroutine_threadsafe(coro, loop)
try:
return fut.result(timeout)
finally:
loop.call_soon_threadsafe(stop_loop.set_result, 1)
loop_thread.join(timeout=1)
run_from_another_thread = Network.run_from_another_thread
def get_local_height(self):
return 0
def blockchain(self):
@ -1429,14 +1424,7 @@ class TestWalletSending(TestCaseForTestnet):
raise Exception("unexpected txid")
def has_internet_connection(self):
return True
def run_from_another_thread(self, coro, *, timeout=None):
loop, stop_loop, loop_thread = create_and_start_event_loop()
fut = asyncio.run_coroutine_threadsafe(coro, loop)
try:
return fut.result(timeout)
finally:
loop.call_soon_threadsafe(stop_loop.set_result, 1)
loop_thread.join(timeout=1)
run_from_another_thread = Network.run_from_another_thread
def get_local_height(self):
return 0
def blockchain(self):
@ -1844,8 +1832,8 @@ class TestWalletSending(TestCaseForTestnet):
network = NetworkMock()
dest_addr = 'tb1q3ws2p0qjk5vrravv065xqlnkckvzcpclk79eu2'
sweep_coro = sweep(privkeys, network=network, config=self.config, to_address=dest_addr, fee=5000, locktime=1325785, tx_version=1)
loop = asyncio.get_event_loop()
tx = loop.run_until_complete(sweep_coro)
loop = util.get_asyncio_loop()
tx = asyncio.run_coroutine_threadsafe(sweep_coro, loop).result()
tx_copy = tx_from_any(tx.serialize())
self.assertEqual('010000000129349e5641d79915e9d0282fdbaee8c3df0b6731bab9d70bf626e8588bde24ac010000004847304402206bf0d0a93abae0d5873a62ebf277a5dd2f33837821e8b93e74d04e19d71b578002201a6d729bc159941ef5c4c9e5fe13ece9fc544351ba531b00f68ba549c8b38a9a01fdffffff01b82e0f00000000001600148ba0a0bc12b51831f58c7ea8607e76c5982c071fd93a1400',
@ -2199,14 +2187,7 @@ class TestWalletSending(TestCaseForTestnet):
raise Exception("unexpected txid")
def has_internet_connection(self):
return True
def run_from_another_thread(self, coro, *, timeout=None):
loop, stop_loop, loop_thread = create_and_start_event_loop()
fut = asyncio.run_coroutine_threadsafe(coro, loop)
try:
return fut.result(timeout)
finally:
loop.call_soon_threadsafe(stop_loop.set_result, 1)
loop_thread.join(timeout=1)
run_from_another_thread = Network.run_from_another_thread
def get_local_height(self):
return 0
def blockchain(self):

56
electrum/util.py

@ -1031,7 +1031,7 @@ def parse_URI(uri: str, on_pr: Callable = None, *, loop=None) -> dict:
request = await pr.get_payment_request(r)
if on_pr:
on_pr(request)
loop = loop or asyncio.get_event_loop()
loop = loop or get_asyncio_loop()
asyncio.run_coroutine_threadsafe(get_payment_request(), loop)
return out
@ -1319,7 +1319,6 @@ class NetworkJobOnDefaultServer(Logger, ABC):
"""
def __init__(self, network: 'Network'):
Logger.__init__(self)
asyncio.set_event_loop(network.asyncio_loop)
self.network = network
self.interface = None # type: Interface
self._restart_lock = asyncio.Lock()
@ -1384,9 +1383,41 @@ class NetworkJobOnDefaultServer(Logger, ABC):
return s
_asyncio_event_loop = None # type: Optional[asyncio.AbstractEventLoop]
def get_asyncio_loop() -> asyncio.AbstractEventLoop:
"""Returns the global asyncio event loop we use."""
if _asyncio_event_loop is None:
raise Exception("event loop not created yet")
return _asyncio_event_loop
def create_and_start_event_loop() -> Tuple[asyncio.AbstractEventLoop,
asyncio.Future,
threading.Thread]:
global _asyncio_event_loop
if _asyncio_event_loop is not None:
raise Exception("there is already a running event loop")
# asyncio.get_event_loop() became deprecated in python3.10. (see https://github.com/python/cpython/issues/83710)
# We set a custom event loop policy purely to be compatible with code that
# relies on asyncio.get_event_loop().
# - in python 3.8-3.9, asyncio.Event.__init__, asyncio.Lock.__init__,
# and similar, calls get_event_loop. see https://github.com/python/cpython/pull/23420
class MyEventLoopPolicy(asyncio.DefaultEventLoopPolicy):
def get_event_loop(self):
# In case electrum is being used as a library, there might be other
# event loops in use besides ours. To minimise interfering with those,
# if there is a loop running in the current thread, return that:
running_loop = get_running_loop()
if running_loop is not None:
return running_loop
# Otherwise, return our global loop:
return get_asyncio_loop()
asyncio.set_event_loop_policy(MyEventLoopPolicy())
loop = asyncio.new_event_loop()
_asyncio_event_loop = loop
def on_exception(loop, context):
"""Suppress spurious messages it appears we cannot control."""
SUPPRESS_MESSAGE_REGEX = re.compile('SSL handshake|Fatal read error on|'
@ -1396,13 +1427,21 @@ def create_and_start_event_loop() -> Tuple[asyncio.AbstractEventLoop,
return
loop.default_exception_handler(context)
loop = asyncio.get_event_loop()
def run_event_loop():
try:
loop.run_until_complete(stopping_fut)
finally:
# clean-up
global _asyncio_event_loop
_asyncio_event_loop = None
loop.set_exception_handler(on_exception)
# loop.set_debug(1)
stopping_fut = loop.create_future()
loop_thread = threading.Thread(target=loop.run_until_complete,
args=(stopping_fut,),
name='EventLoop')
loop_thread = threading.Thread(
target=run_event_loop,
name='EventLoop',
)
loop_thread.start()
return loop, stopping_fut, loop_thread
@ -1558,7 +1597,7 @@ class CallbackManager:
on the event loop.
"""
if self.asyncio_loop is None:
self.asyncio_loop = asyncio.get_event_loop()
self.asyncio_loop = get_asyncio_loop()
assert self.asyncio_loop.is_running(), "event loop not running"
with self.callback_lock:
callbacks = self.callbacks[event][:]
@ -1643,7 +1682,7 @@ class NetworkRetryManager(Generic[_NetAddrType]):
class MySocksProxy(aiorpcx.SOCKSProxy):
async def open_connection(self, host=None, port=None, **kwargs):
loop = asyncio.get_event_loop()
loop = asyncio.get_running_loop()
reader = asyncio.StreamReader(loop=loop)
protocol = asyncio.StreamReaderProtocol(reader, loop=loop)
transport, _ = await self.create_connection(
@ -1753,6 +1792,7 @@ class nullcontext:
def get_running_loop() -> Optional[asyncio.AbstractEventLoop]:
"""Returns the asyncio event loop that is *running in this thread*, if any."""
try:
return asyncio.get_running_loop()
except RuntimeError:

Loading…
Cancel
Save