Browse Source

Merge pull request #7661 from SomberNight/202202_py38

bump min python to 3.8, and migrate to aiorpcx 0.22
patch-4
ghost43 3 years ago
committed by GitHub
parent
commit
586d3a4361
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 16
      .cirrus.yml
  2. 2
      README.rst
  3. 6
      contrib/deterministic-build/requirements.txt
  4. 2
      contrib/freeze_packages.sh
  5. 2
      contrib/requirements/requirements.txt
  6. 6
      electrum/address_synchronizer.py
  7. 7
      electrum/bip39_recovery.py
  8. 12
      electrum/daemon.py
  9. 9
      electrum/exchange_rate.py
  10. 12
      electrum/interface.py
  11. 10
      electrum/lnpeer.py
  12. 2
      electrum/lntransport.py
  13. 16
      electrum/lnworker.py
  14. 22
      electrum/network.py
  15. 6
      electrum/synchronizer.py
  16. 38
      electrum/tests/test_lnpeer.py
  17. 9
      electrum/tests/test_lntransport.py
  18. 89
      electrum/util.py
  19. 8
      electrum/wallet.py
  20. 6
      run_electrum
  21. 2
      setup.py
  22. 1
      tox.ini

16
.cirrus.yml

@ -10,14 +10,12 @@ task:
TOXENV: py3 TOXENV: py3
ELECTRUM_PYTHON_NAME: python3 ELECTRUM_PYTHON_NAME: python3
matrix: matrix:
- env:
ELECTRUM_PYTHON_VERSION: 3.6
- env:
ELECTRUM_PYTHON_VERSION: 3.7
- env: - env:
ELECTRUM_PYTHON_VERSION: 3.8 ELECTRUM_PYTHON_VERSION: 3.8
- env: - env:
ELECTRUM_PYTHON_VERSION: 3.9 ELECTRUM_PYTHON_VERSION: 3.9
- env:
ELECTRUM_PYTHON_VERSION: 3.10
- env: - env:
ELECTRUM_PYTHON_VERSION: 3 ELECTRUM_PYTHON_VERSION: 3
- env: - env:
@ -80,7 +78,7 @@ task:
locale_script: locale_script:
- contrib/push_locale - contrib/push_locale
env: env:
ELECTRUM_IMAGE: python:3.7 ELECTRUM_IMAGE: python:3.8
ELECTRUM_REQUIREMENTS_CI: contrib/requirements/requirements-travis.txt ELECTRUM_REQUIREMENTS_CI: contrib/requirements/requirements-travis.txt
# in addition, crowdin_api_key is set as an "override" in https://cirrus-ci.com/settings/... # in addition, crowdin_api_key is set as an "override" in https://cirrus-ci.com/settings/...
depends_on: depends_on:
@ -109,8 +107,8 @@ task:
- apt-get update - apt-get update
- apt-get -y install libsecp256k1-0 curl jq bc - apt-get -y install libsecp256k1-0 curl jq bc
- pip3 install .[tests] - pip3 install .[tests]
#- pip3 install e-x # Broken because of https://github.com/spesmilo/electrumx/issues/117 , use older version for now. # install e-x some commits after 1.16.0 tag, where it uses same aiorpcx as electrum
- pip3 install electrumx==1.15.0 - pip3 install git+https://github.com/spesmilo/electrumx.git@c8d2cc0d5cf9e549a90ca876d85fed9a90b8c4ed
- "BITCOIND_VERSION=$(curl https://bitcoincore.org/en/download/ | grep -E -i --only-matching 'Latest version: [0-9\\.]+' | grep -E --only-matching '[0-9\\.]+')" - "BITCOIND_VERSION=$(curl https://bitcoincore.org/en/download/ | grep -E -i --only-matching 'Latest version: [0-9\\.]+' | grep -E --only-matching '[0-9\\.]+')"
- BITCOIND_FILENAME=bitcoin-$BITCOIND_VERSION-x86_64-linux-gnu.tar.gz - BITCOIND_FILENAME=bitcoin-$BITCOIND_VERSION-x86_64-linux-gnu.tar.gz
- BITCOIND_PATH=/tmp/bitcoind/$BITCOIND_FILENAME - BITCOIND_PATH=/tmp/bitcoind/$BITCOIND_FILENAME
@ -146,7 +144,7 @@ task:
flake8_script: flake8_script:
- flake8 . --count --select=$ELECTRUM_LINTERS --show-source --statistics - flake8 . --count --select=$ELECTRUM_LINTERS --show-source --statistics
env: env:
ELECTRUM_IMAGE: python:3.7 ELECTRUM_IMAGE: python:3.8
ELECTRUM_REQUIREMENTS: contrib/requirements/requirements.txt ELECTRUM_REQUIREMENTS: contrib/requirements/requirements.txt
matrix: matrix:
- name: Flake8 Mandatory - name: Flake8 Mandatory
@ -223,7 +221,7 @@ task:
task: task:
name: Submodules name: Submodules
container: container:
image: python:3.7 image: python:3.8
cpu: 1 cpu: 1
memory: 1G memory: 1G
fetch_script: fetch_script:

2
README.rst

@ -5,7 +5,7 @@ Electrum - Lightweight Bitcoin client
Licence: MIT Licence Licence: MIT Licence
Author: Thomas Voegtlin Author: Thomas Voegtlin
Language: Python (>= 3.6) Language: Python (>= 3.8)
Homepage: https://electrum.org/ Homepage: https://electrum.org/

6
contrib/deterministic-build/requirements.txt

@ -74,9 +74,9 @@ aiohttp==3.8.1 \
aiohttp-socks==0.7.1 \ aiohttp-socks==0.7.1 \
--hash=sha256:2215cac4891ef3fa14b7d600ed343ed0f0a670c23b10e4142aa862b3db20341a \ --hash=sha256:2215cac4891ef3fa14b7d600ed343ed0f0a670c23b10e4142aa862b3db20341a \
--hash=sha256:94bcff5ef73611c6c6231c2ffc1be4af1599abec90dbd2fdbbd63233ec2fb0ff --hash=sha256:94bcff5ef73611c6c6231c2ffc1be4af1599abec90dbd2fdbbd63233ec2fb0ff
aiorpcX==0.18.7 \ aiorpcX==0.22.1 \
--hash=sha256:7fa48423e1c06cd0ffb7b60f2cca7e819b6cbbf57d4bc8a82944994ef5038f05 \ --hash=sha256:6026f7bed3432e206589c94dcf599be8cd85b5736b118c7275845c1bd922a553 \
--hash=sha256:808a9ec9172df11677a0f7b459b69d1a6cf8b19c19da55541fa31fb1afce5ce7 --hash=sha256:e74f9fbed3fd21598e71fe05066618fc2c06feec504fe29490ddda05fdbdde62
aiosignal==1.2.0 \ aiosignal==1.2.0 \
--hash=sha256:26e62109036cd181df6e6ad646f91f0dcfd05fe16d0cb924138ff2ab75d64e3a \ --hash=sha256:26e62109036cd181df6e6ad646f91f0dcfd05fe16d0cb924138ff2ab75d64e3a \
--hash=sha256:78ed67db6c7b7ced4f98e495e572106d5c432a93e1ddd1bf475e1dc05f5b7df2 --hash=sha256:78ed67db6c7b7ced4f98e495e572106d5c432a93e1ddd1bf475e1dc05f5b7df2

2
contrib/freeze_packages.sh

@ -8,7 +8,7 @@ contrib=$(dirname "$0")
# note: we should not use a higher version of python than what the binaries bundle # note: we should not use a higher version of python than what the binaries bundle
if [[ ! "$SYSTEM_PYTHON" ]] ; then if [[ ! "$SYSTEM_PYTHON" ]] ; then
SYSTEM_PYTHON=$(which python3.6) || printf "" SYSTEM_PYTHON=$(which python3.8) || printf ""
else else
SYSTEM_PYTHON=$(which $SYSTEM_PYTHON) || printf "" SYSTEM_PYTHON=$(which $SYSTEM_PYTHON) || printf ""
fi fi

2
contrib/requirements/requirements.txt

@ -1,7 +1,7 @@
qrcode qrcode
protobuf>=3.12 protobuf>=3.12
qdarkstyle>=2.7 qdarkstyle>=2.7
aiorpcx>=0.18.7,<0.19 aiorpcx>=0.22.0,<0.23
aiohttp>=3.3.0,<4.0.0 aiohttp>=3.3.0,<4.0.0
aiohttp_socks>=0.3 aiohttp_socks>=0.3
certifi certifi

6
electrum/address_synchronizer.py

@ -28,11 +28,9 @@ import itertools
from collections import defaultdict from collections import defaultdict
from typing import TYPE_CHECKING, Dict, Optional, Set, Tuple, NamedTuple, Sequence, List from typing import TYPE_CHECKING, Dict, Optional, Set, Tuple, NamedTuple, Sequence, List
from aiorpcx import TaskGroup
from . import bitcoin, util from . import bitcoin, util
from .bitcoin import COINBASE_MATURITY from .bitcoin import COINBASE_MATURITY
from .util import profiler, bfh, TxMinedInfo, UnrelatedTransactionException, with_lock from .util import profiler, bfh, TxMinedInfo, UnrelatedTransactionException, with_lock, OldTaskGroup
from .transaction import Transaction, TxOutput, TxInput, PartialTxInput, TxOutpoint, PartialTransaction from .transaction import Transaction, TxOutput, TxInput, PartialTxInput, TxOutpoint, PartialTransaction
from .synchronizer import Synchronizer from .synchronizer import Synchronizer
from .verifier import SPV from .verifier import SPV
@ -193,7 +191,7 @@ class AddressSynchronizer(Logger):
async def stop(self): async def stop(self):
if self.network: if self.network:
try: try:
async with TaskGroup() as group: async with OldTaskGroup() as group:
if self.synchronizer: if self.synchronizer:
await group.spawn(self.synchronizer.stop()) await group.spawn(self.synchronizer.stop())
if self.verifier: if self.verifier:

7
electrum/bip39_recovery.py

@ -4,20 +4,19 @@
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
from aiorpcx import TaskGroup
from . import bitcoin from . import bitcoin
from .constants import BIP39_WALLET_FORMATS from .constants import BIP39_WALLET_FORMATS
from .bip32 import BIP32_PRIME, BIP32Node from .bip32 import BIP32_PRIME, BIP32Node
from .bip32 import convert_bip32_path_to_list_of_uint32 as bip32_str_to_ints from .bip32 import convert_bip32_path_to_list_of_uint32 as bip32_str_to_ints
from .bip32 import convert_bip32_intpath_to_strpath as bip32_ints_to_str from .bip32 import convert_bip32_intpath_to_strpath as bip32_ints_to_str
from .util import OldTaskGroup
if TYPE_CHECKING: if TYPE_CHECKING:
from .network import Network from .network import Network
async def account_discovery(network: 'Network', get_account_xpub): async def account_discovery(network: 'Network', get_account_xpub):
async with TaskGroup() as group: async with OldTaskGroup() as group:
account_scan_tasks = [] account_scan_tasks = []
for wallet_format in BIP39_WALLET_FORMATS: for wallet_format in BIP39_WALLET_FORMATS:
account_scan = scan_for_active_accounts(network, get_account_xpub, wallet_format) account_scan = scan_for_active_accounts(network, get_account_xpub, wallet_format)
@ -46,7 +45,7 @@ async def scan_for_active_accounts(network: 'Network', get_account_xpub, wallet_
async def account_has_history(network: 'Network', account_node: BIP32Node, script_type: str) -> bool: async def account_has_history(network: 'Network', account_node: BIP32Node, script_type: str) -> bool:
gap_limit = 20 gap_limit = 20
async with TaskGroup() as group: async with OldTaskGroup() as group:
get_history_tasks = [] get_history_tasks = []
for address_index in range(gap_limit): for address_index in range(gap_limit):
address_node = account_node.subkey_at_public_derivation("0/" + str(address_index)) address_node = account_node.subkey_at_public_derivation("0/" + str(address_index))

12
electrum/daemon.py

@ -36,13 +36,13 @@ import json
import aiohttp import aiohttp
from aiohttp import web, client_exceptions from aiohttp import web, client_exceptions
from aiorpcx import TaskGroup, timeout_after, TaskTimeout, ignore_after from aiorpcx import timeout_after, TaskTimeout, ignore_after
from . import util from . import util
from .network import Network from .network import Network
from .util import (json_decode, to_bytes, to_string, profiler, standardize_path, constant_time_compare) from .util import (json_decode, to_bytes, to_string, profiler, standardize_path, constant_time_compare)
from .invoices import PR_PAID, PR_EXPIRED from .invoices import PR_PAID, PR_EXPIRED
from .util import log_exceptions, ignore_exceptions, randrange from .util import log_exceptions, ignore_exceptions, randrange, OldTaskGroup
from .wallet import Wallet, Abstract_Wallet from .wallet import Wallet, Abstract_Wallet
from .storage import WalletStorage from .storage import WalletStorage
from .wallet_db import WalletDB from .wallet_db import WalletDB
@ -493,7 +493,7 @@ class Daemon(Logger):
self._stop_entered = False self._stop_entered = False
self._stopping_soon_or_errored = threading.Event() self._stopping_soon_or_errored = threading.Event()
self._stopped_event = threading.Event() self._stopped_event = threading.Event()
self.taskgroup = TaskGroup() self.taskgroup = OldTaskGroup()
asyncio.run_coroutine_threadsafe(self._run(jobs=daemon_jobs), self.asyncio_loop) asyncio.run_coroutine_threadsafe(self._run(jobs=daemon_jobs), self.asyncio_loop)
@log_exceptions @log_exceptions
@ -505,8 +505,6 @@ class Daemon(Logger):
async with self.taskgroup as group: async with self.taskgroup as group:
[await group.spawn(job) for job in jobs] [await group.spawn(job) for job in jobs]
await group.spawn(asyncio.Event().wait) # run forever (until cancel) await group.spawn(asyncio.Event().wait) # run forever (until cancel)
except asyncio.CancelledError:
raise
except Exception as e: except Exception as e:
self.logger.exception("taskgroup died.") self.logger.exception("taskgroup died.")
util.send_exception_to_crash_reporter(e) util.send_exception_to_crash_reporter(e)
@ -593,12 +591,12 @@ class Daemon(Logger):
if self.gui_object: if self.gui_object:
self.gui_object.stop() self.gui_object.stop()
self.logger.info("stopping all wallets") self.logger.info("stopping all wallets")
async with TaskGroup() as group: async with OldTaskGroup() as group:
for k, wallet in self._wallets.items(): for k, wallet in self._wallets.items():
await group.spawn(wallet.stop()) await group.spawn(wallet.stop())
self.logger.info("stopping network and taskgroup") self.logger.info("stopping network and taskgroup")
async with ignore_after(2): async with ignore_after(2):
async with TaskGroup() as group: async with OldTaskGroup() as group:
if self.network: if self.network:
await group.spawn(self.network.stop(full_shutdown=True)) await group.spawn(self.network.stop(full_shutdown=True))
await group.spawn(self.taskgroup.cancel_remaining()) await group.spawn(self.taskgroup.cancel_remaining())

9
electrum/exchange_rate.py

@ -10,13 +10,13 @@ import decimal
from decimal import Decimal from decimal import Decimal
from typing import Sequence, Optional from typing import Sequence, Optional
from aiorpcx.curio import timeout_after, TaskTimeout, TaskGroup from aiorpcx.curio import timeout_after, TaskTimeout
import aiohttp import aiohttp
from . import util from . import util
from .bitcoin import COIN from .bitcoin import COIN
from .i18n import _ from .i18n import _
from .util import (ThreadJob, make_dir, log_exceptions, from .util import (ThreadJob, make_dir, log_exceptions, OldTaskGroup,
make_aiohttp_session, resource_path) make_aiohttp_session, resource_path)
from .network import Network from .network import Network
from .simple_config import SimpleConfig from .simple_config import SimpleConfig
@ -80,9 +80,6 @@ class ExchangeBase(Logger):
self.logger.info(f"getting fx quotes for {ccy}") self.logger.info(f"getting fx quotes for {ccy}")
self.quotes = await self.get_rates(ccy) self.quotes = await self.get_rates(ccy)
self.logger.info("received fx quotes") self.logger.info("received fx quotes")
except asyncio.CancelledError:
# CancelledError must be passed-through for cancellation to work
raise
except aiohttp.ClientError as e: except aiohttp.ClientError as e:
self.logger.info(f"failed fx quotes: {repr(e)}") self.logger.info(f"failed fx quotes: {repr(e)}")
self.quotes = {} self.quotes = {}
@ -452,7 +449,7 @@ def get_exchanges_and_currencies():
async def query_all_exchanges_for_their_ccys_over_network(): async def query_all_exchanges_for_their_ccys_over_network():
async with timeout_after(10): async with timeout_after(10):
async with TaskGroup() as group: async with OldTaskGroup() as group:
for name, klass in exchanges.items(): for name, klass in exchanges.items():
exchange = klass(None, None) exchange = klass(None, None)
await group.spawn(get_currencies_safe(name, exchange)) await group.spawn(get_currencies_safe(name, exchange))

12
electrum/interface.py

@ -38,16 +38,15 @@ import hashlib
import functools import functools
import aiorpcx import aiorpcx
from aiorpcx import TaskGroup
from aiorpcx import RPCSession, Notification, NetAddress, NewlineFramer from aiorpcx import RPCSession, Notification, NetAddress, NewlineFramer
from aiorpcx.curio import timeout_after, TaskTimeout from aiorpcx.curio import timeout_after, TaskTimeout
from aiorpcx.jsonrpc import JSONRPC, CodeMessageError from aiorpcx.jsonrpc import JSONRPC, CodeMessageError
from aiorpcx.rawsocket import RSClient from aiorpcx.rawsocket import RSClient
import certifi import certifi
from .util import (ignore_exceptions, log_exceptions, bfh, SilentTaskGroup, MySocksProxy, from .util import (ignore_exceptions, log_exceptions, bfh, MySocksProxy,
is_integer, is_non_negative_integer, is_hash256_str, is_hex_str, is_integer, is_non_negative_integer, is_hash256_str, is_hex_str,
is_int_or_float, is_non_negative_int_or_float) is_int_or_float, is_non_negative_int_or_float, OldTaskGroup)
from . import util from . import util
from . import x509 from . import x509
from . import pem from . import pem
@ -376,12 +375,11 @@ class Interface(Logger):
# Dump network messages (only for this interface). Set at runtime from the console. # Dump network messages (only for this interface). Set at runtime from the console.
self.debug = False self.debug = False
self.taskgroup = SilentTaskGroup() self.taskgroup = OldTaskGroup()
async def spawn_task(): async def spawn_task():
task = await self.network.taskgroup.spawn(self.run()) task = await self.network.taskgroup.spawn(self.run())
if sys.version_info >= (3, 8): task.set_name(f"interface::{str(server)}")
task.set_name(f"interface::{str(server)}")
asyncio.run_coroutine_threadsafe(spawn_task(), self.network.asyncio_loop) asyncio.run_coroutine_threadsafe(spawn_task(), self.network.asyncio_loop)
@property @property
@ -676,7 +674,7 @@ class Interface(Logger):
async def request_fee_estimates(self): async def request_fee_estimates(self):
from .simple_config import FEE_ETA_TARGETS from .simple_config import FEE_ETA_TARGETS
while True: while True:
async with TaskGroup() as group: async with OldTaskGroup() as group:
fee_tasks = [] fee_tasks = []
for i in FEE_ETA_TARGETS: for i in FEE_ETA_TARGETS:
fee_tasks.append((i, await group.spawn(self.get_estimatefee(i)))) fee_tasks.append((i, await group.spawn(self.get_estimatefee(i))))

10
electrum/lnpeer.py

@ -14,14 +14,14 @@ from datetime import datetime
import functools import functools
import aiorpcx import aiorpcx
from aiorpcx import TaskGroup, ignore_after from aiorpcx import ignore_after
from .crypto import sha256, sha256d from .crypto import sha256, sha256d
from . import bitcoin, util from . import bitcoin, util
from . import ecc from . import ecc
from .ecc import sig_string_from_r_and_s, der_sig_from_sig_string from .ecc import sig_string_from_r_and_s, der_sig_from_sig_string
from . import constants from . import constants
from .util import (bh2u, bfh, log_exceptions, ignore_exceptions, chunks, SilentTaskGroup, from .util import (bh2u, bfh, log_exceptions, ignore_exceptions, chunks, OldTaskGroup,
UnrelatedTransactionException) UnrelatedTransactionException)
from . import transaction from . import transaction
from .bitcoin import make_op_return from .bitcoin import make_op_return
@ -105,7 +105,7 @@ class Peer(Logger):
self.announcement_signatures = defaultdict(asyncio.Queue) self.announcement_signatures = defaultdict(asyncio.Queue)
self.orphan_channel_updates = OrderedDict() # type: OrderedDict[ShortChannelID, dict] self.orphan_channel_updates = OrderedDict() # type: OrderedDict[ShortChannelID, dict]
Logger.__init__(self) Logger.__init__(self)
self.taskgroup = SilentTaskGroup() self.taskgroup = OldTaskGroup()
# HTLCs offered by REMOTE, that we started removing but are still active: # HTLCs offered by REMOTE, that we started removing but are still active:
self.received_htlcs_pending_removal = set() # type: Set[Tuple[Channel, int]] self.received_htlcs_pending_removal = set() # type: Set[Tuple[Channel, int]]
self.received_htlc_removed_event = asyncio.Event() self.received_htlc_removed_event = asyncio.Event()
@ -1859,7 +1859,7 @@ class Peer(Logger):
# we can get triggered for events that happen on the downstream peer. # we can get triggered for events that happen on the downstream peer.
# TODO: trampoline forwarding relies on the polling # TODO: trampoline forwarding relies on the polling
async with ignore_after(0.1): async with ignore_after(0.1):
async with TaskGroup(wait=any) as group: async with OldTaskGroup(wait=any) as group:
await group.spawn(self._received_revack_event.wait()) await group.spawn(self._received_revack_event.wait())
await group.spawn(self.downstream_htlc_resolved_event.wait()) await group.spawn(self.downstream_htlc_resolved_event.wait())
self._htlc_switch_iterstart_event.set() self._htlc_switch_iterstart_event.set()
@ -1943,7 +1943,7 @@ class Peer(Logger):
await self._htlc_switch_iterstart_event.wait() await self._htlc_switch_iterstart_event.wait()
await self._htlc_switch_iterdone_event.wait() await self._htlc_switch_iterdone_event.wait()
async with TaskGroup(wait=any) as group: async with OldTaskGroup(wait=any) as group:
await group.spawn(htlc_switch_iteration()) await group.spawn(htlc_switch_iteration())
await group.spawn(self.got_disconnected.wait()) await group.spawn(self.got_disconnected.wait())

2
electrum/lntransport.py

@ -123,8 +123,6 @@ class LNTransportBase:
break break
try: try:
s = await self.reader.read(2**10) s = await self.reader.read(2**10)
except asyncio.CancelledError:
raise
except Exception: except Exception:
s = None s = None
if not s: if not s:

16
electrum/lnworker.py

@ -22,11 +22,11 @@ import urllib.parse
import dns.resolver import dns.resolver
import dns.exception import dns.exception
from aiorpcx import run_in_thread, TaskGroup, NetAddress, ignore_after from aiorpcx import run_in_thread, NetAddress, ignore_after
from . import constants, util from . import constants, util
from . import keystore from . import keystore
from .util import profiler, chunks from .util import profiler, chunks, OldTaskGroup
from .invoices import PR_TYPE_LN, PR_UNPAID, PR_EXPIRED, PR_PAID, PR_INFLIGHT, PR_FAILED, PR_ROUTING, LNInvoice, LN_EXPIRY_NEVER from .invoices import PR_TYPE_LN, PR_UNPAID, PR_EXPIRED, PR_PAID, PR_INFLIGHT, PR_FAILED, PR_ROUTING, LNInvoice, LN_EXPIRY_NEVER
from .util import NetworkRetryManager, JsonRPCClient from .util import NetworkRetryManager, JsonRPCClient
from .lnutil import LN_MAX_FUNDING_SAT from .lnutil import LN_MAX_FUNDING_SAT
@ -39,7 +39,7 @@ from .crypto import sha256
from .bip32 import BIP32Node from .bip32 import BIP32Node
from .util import bh2u, bfh, InvoiceError, resolve_dns_srv, is_ip_address, log_exceptions from .util import bh2u, bfh, InvoiceError, resolve_dns_srv, is_ip_address, log_exceptions
from .crypto import chacha20_encrypt, chacha20_decrypt from .crypto import chacha20_encrypt, chacha20_decrypt
from .util import ignore_exceptions, make_aiohttp_session, SilentTaskGroup from .util import ignore_exceptions, make_aiohttp_session
from .util import timestamp_to_datetime, random_shuffled_copy from .util import timestamp_to_datetime, random_shuffled_copy
from .util import MyEncoder, is_private_netaddress from .util import MyEncoder, is_private_netaddress
from .logging import Logger from .logging import Logger
@ -200,7 +200,7 @@ class LNWorker(Logger, NetworkRetryManager[LNPeerAddr]):
self.node_keypair = generate_keypair(BIP32Node.from_xkey(xprv), LnKeyFamily.NODE_KEY) self.node_keypair = generate_keypair(BIP32Node.from_xkey(xprv), LnKeyFamily.NODE_KEY)
self.backup_key = generate_keypair(BIP32Node.from_xkey(xprv), LnKeyFamily.BACKUP_CIPHER).privkey self.backup_key = generate_keypair(BIP32Node.from_xkey(xprv), LnKeyFamily.BACKUP_CIPHER).privkey
self._peers = {} # type: Dict[bytes, Peer] # pubkey -> Peer # needs self.lock self._peers = {} # type: Dict[bytes, Peer] # pubkey -> Peer # needs self.lock
self.taskgroup = SilentTaskGroup() self.taskgroup = OldTaskGroup()
self.listen_server = None # type: Optional[asyncio.AbstractServer] self.listen_server = None # type: Optional[asyncio.AbstractServer]
self.features = features self.features = features
self.network = None # type: Optional[Network] self.network = None # type: Optional[Network]
@ -266,8 +266,6 @@ class LNWorker(Logger, NetworkRetryManager[LNPeerAddr]):
try: try:
async with self.taskgroup as group: async with self.taskgroup as group:
await group.spawn(self._maintain_connectivity()) await group.spawn(self._maintain_connectivity())
except asyncio.CancelledError:
raise
except Exception as e: except Exception as e:
self.logger.exception("taskgroup died.") self.logger.exception("taskgroup died.")
finally: finally:
@ -769,13 +767,13 @@ class LNWallet(LNWorker):
# to wait a bit for it to become irrevocably removed. # to wait a bit for it to become irrevocably removed.
# Note: we don't wait for *all htlcs* to get removed, only for those # Note: we don't wait for *all htlcs* to get removed, only for those
# that we can already fail/fulfill. e.g. forwarded htlcs cannot be removed # that we can already fail/fulfill. e.g. forwarded htlcs cannot be removed
async with TaskGroup() as group: async with OldTaskGroup() as group:
for peer in self.peers.values(): for peer in self.peers.values():
await group.spawn(peer.wait_one_htlc_switch_iteration()) await group.spawn(peer.wait_one_htlc_switch_iteration())
while True: while True:
if all(not peer.received_htlcs_pending_removal for peer in self.peers.values()): if all(not peer.received_htlcs_pending_removal for peer in self.peers.values()):
break break
async with TaskGroup(wait=any) as group: async with OldTaskGroup(wait=any) as group:
for peer in self.peers.values(): for peer in self.peers.values():
await group.spawn(peer.received_htlc_removed_event.wait()) await group.spawn(peer.received_htlc_removed_event.wait())
@ -2271,7 +2269,7 @@ class LNWallet(LNWorker):
transport = LNTransport(privkey, peer_addr, proxy=self.network.proxy) transport = LNTransport(privkey, peer_addr, proxy=self.network.proxy)
peer = Peer(self, node_id, transport, is_channel_backup=True) peer = Peer(self, node_id, transport, is_channel_backup=True)
try: try:
async with TaskGroup(wait=any) as group: async with OldTaskGroup(wait=any) as group:
await group.spawn(peer._message_loop()) await group.spawn(peer._message_loop())
await group.spawn(peer.trigger_force_close(channel_id)) await group.spawn(peer.trigger_force_close(channel_id))
return return

22
electrum/network.py

@ -40,12 +40,12 @@ import copy
import functools import functools
import aiorpcx import aiorpcx
from aiorpcx import TaskGroup, ignore_after from aiorpcx import ignore_after
from aiohttp import ClientResponse from aiohttp import ClientResponse
from . import util from . import util
from .util import (log_exceptions, ignore_exceptions, from .util import (log_exceptions, ignore_exceptions, OldTaskGroup,
bfh, SilentTaskGroup, make_aiohttp_session, send_exception_to_crash_reporter, bfh, make_aiohttp_session, send_exception_to_crash_reporter,
is_hash256_str, is_non_negative_integer, MyEncoder, NetworkRetryManager, is_hash256_str, is_non_negative_integer, MyEncoder, NetworkRetryManager,
nullcontext) nullcontext)
from .bitcoin import COIN from .bitcoin import COIN
@ -246,7 +246,7 @@ class Network(Logger, NetworkRetryManager[ServerAddr]):
LOGGING_SHORTCUT = 'n' LOGGING_SHORTCUT = 'n'
taskgroup: Optional[TaskGroup] taskgroup: Optional[OldTaskGroup]
interface: Optional[Interface] interface: Optional[Interface]
interfaces: Dict[ServerAddr, Interface] interfaces: Dict[ServerAddr, Interface]
_connecting_ifaces: Set[ServerAddr] _connecting_ifaces: Set[ServerAddr]
@ -462,7 +462,7 @@ class Network(Logger, NetworkRetryManager[ServerAddr]):
async def get_relay_fee(): async def get_relay_fee():
self.relay_fee = await interface.get_relay_fee() self.relay_fee = await interface.get_relay_fee()
async with TaskGroup() as group: async with OldTaskGroup() as group:
await group.spawn(get_banner) await group.spawn(get_banner)
await group.spawn(get_donation_address) await group.spawn(get_donation_address)
await group.spawn(get_server_peers) await group.spawn(get_server_peers)
@ -839,7 +839,7 @@ class Network(Logger, NetworkRetryManager[ServerAddr]):
assert iface.ready.done(), "interface not ready yet" assert iface.ready.done(), "interface not ready yet"
# try actual request # try actual request
try: try:
async with TaskGroup(wait=any) as group: async with OldTaskGroup(wait=any) as group:
task = await group.spawn(func(self, *args, **kwargs)) task = await group.spawn(func(self, *args, **kwargs))
await group.spawn(iface.got_disconnected.wait()) await group.spawn(iface.got_disconnected.wait())
except RequestTimedOut: except RequestTimedOut:
@ -1184,7 +1184,7 @@ class Network(Logger, NetworkRetryManager[ServerAddr]):
async def _start(self): async def _start(self):
assert not self.taskgroup assert not self.taskgroup
self.taskgroup = taskgroup = SilentTaskGroup() self.taskgroup = taskgroup = OldTaskGroup()
assert not self.interface and not self.interfaces assert not self.interface and not self.interfaces
assert not self._connecting_ifaces assert not self._connecting_ifaces
assert not self._closing_ifaces assert not self._closing_ifaces
@ -1202,8 +1202,6 @@ class Network(Logger, NetworkRetryManager[ServerAddr]):
async with taskgroup as group: async with taskgroup as group:
await group.spawn(self._maintain_sessions()) await group.spawn(self._maintain_sessions())
[await group.spawn(job) for job in self._jobs] [await group.spawn(job) for job in self._jobs]
except asyncio.CancelledError:
raise
except Exception as e: except Exception as e:
self.logger.exception("taskgroup died.") self.logger.exception("taskgroup died.")
finally: finally:
@ -1227,7 +1225,7 @@ class Network(Logger, NetworkRetryManager[ServerAddr]):
# timeout: if full_shutdown, it is up to the caller to time us out, # timeout: if full_shutdown, it is up to the caller to time us out,
# otherwise if e.g. restarting due to proxy changes, we time out fast # otherwise if e.g. restarting due to proxy changes, we time out fast
async with (nullcontext() if full_shutdown else ignore_after(1)): async with (nullcontext() if full_shutdown else ignore_after(1)):
async with TaskGroup() as group: async with OldTaskGroup() as group:
await group.spawn(self.taskgroup.cancel_remaining()) await group.spawn(self.taskgroup.cancel_remaining())
if full_shutdown: if full_shutdown:
await group.spawn(self.stop_gossip(full_shutdown=full_shutdown)) await group.spawn(self.stop_gossip(full_shutdown=full_shutdown))
@ -1280,7 +1278,7 @@ class Network(Logger, NetworkRetryManager[ServerAddr]):
except asyncio.CancelledError: except asyncio.CancelledError:
# suppress spurious cancellations # suppress spurious cancellations
group = self.taskgroup group = self.taskgroup
if not group or group.closed(): if not group or group.joined:
raise raise
await asyncio.sleep(0.1) await asyncio.sleep(0.1)
@ -1354,7 +1352,7 @@ class Network(Logger, NetworkRetryManager[ServerAddr]):
except Exception as e: except Exception as e:
res = e res = e
responses[interface.server] = res responses[interface.server] = res
async with TaskGroup() as group: async with OldTaskGroup() as group:
for server in servers: for server in servers:
await group.spawn(get_response(server)) await group.spawn(get_response(server))
return responses return responses

6
electrum/synchronizer.py

@ -28,11 +28,11 @@ from typing import Dict, List, TYPE_CHECKING, Tuple, Set
from collections import defaultdict from collections import defaultdict
import logging import logging
from aiorpcx import TaskGroup, run_in_thread, RPCError from aiorpcx import run_in_thread, RPCError
from . import util from . import util
from .transaction import Transaction, PartialTransaction from .transaction import Transaction, PartialTransaction
from .util import bh2u, make_aiohttp_session, NetworkJobOnDefaultServer, random_shuffled_copy from .util import bh2u, make_aiohttp_session, NetworkJobOnDefaultServer, random_shuffled_copy, OldTaskGroup
from .bitcoin import address_to_scripthash, is_address from .bitcoin import address_to_scripthash, is_address
from .logging import Logger from .logging import Logger
from .interface import GracefulDisconnect, NetworkTimeout from .interface import GracefulDisconnect, NetworkTimeout
@ -218,7 +218,7 @@ class Synchronizer(SynchronizerBase):
self.requested_tx[tx_hash] = tx_height self.requested_tx[tx_hash] = tx_height
if not transaction_hashes: return if not transaction_hashes: return
async with TaskGroup() as group: async with OldTaskGroup() as group:
for tx_hash in transaction_hashes: for tx_hash in transaction_hashes:
await group.spawn(self._get_transaction(tx_hash, allow_server_not_finding_tx=allow_server_not_finding_tx)) await group.spawn(self._get_transaction(tx_hash, allow_server_not_finding_tx=allow_server_not_finding_tx))

38
electrum/tests/test_lnpeer.py

@ -10,7 +10,7 @@ from concurrent import futures
import unittest import unittest
from typing import Iterable, NamedTuple, Tuple, List, Dict from typing import Iterable, NamedTuple, Tuple, List, Dict
from aiorpcx import TaskGroup, timeout_after, TaskTimeout from aiorpcx import timeout_after, TaskTimeout
import electrum import electrum
import electrum.trampoline import electrum.trampoline
@ -21,7 +21,7 @@ from electrum.ecc import ECPrivkey
from electrum import simple_config, lnutil from electrum import simple_config, lnutil
from electrum.lnaddr import lnencode, LnAddr, lndecode from electrum.lnaddr import lnencode, LnAddr, lndecode
from electrum.bitcoin import COIN, sha256 from electrum.bitcoin import COIN, sha256
from electrum.util import bh2u, create_and_start_event_loop, NetworkRetryManager, bfh from electrum.util import bh2u, create_and_start_event_loop, NetworkRetryManager, bfh, OldTaskGroup
from electrum.lnpeer import Peer, UpfrontShutdownScriptViolation from electrum.lnpeer import Peer, UpfrontShutdownScriptViolation
from electrum.lnutil import LNPeerAddr, Keypair, privkey_to_pubkey from electrum.lnutil import LNPeerAddr, Keypair, privkey_to_pubkey
from electrum.lnutil import LightningPeerConnectionClosed, RemoteMisbehaving from electrum.lnutil import LightningPeerConnectionClosed, RemoteMisbehaving
@ -125,7 +125,7 @@ class MockLNWallet(Logger, NetworkRetryManager[LNPeerAddr]):
NetworkRetryManager.__init__(self, max_retry_delay_normal=1, init_retry_delay_normal=1) NetworkRetryManager.__init__(self, max_retry_delay_normal=1, init_retry_delay_normal=1)
self.node_keypair = local_keypair self.node_keypair = local_keypair
self.network = MockNetwork(tx_queue) self.network = MockNetwork(tx_queue)
self.taskgroup = TaskGroup() self.taskgroup = OldTaskGroup()
self.lnwatcher = None self.lnwatcher = None
self.listen_server = None self.listen_server = None
self._channels = {chan.channel_id: chan for chan in chans} self._channels = {chan.channel_id: chan for chan in chans}
@ -365,7 +365,7 @@ class TestPeer(TestCaseForTestnet):
def tearDown(self): def tearDown(self):
async def cleanup_lnworkers(): async def cleanup_lnworkers():
async with TaskGroup() as group: async with OldTaskGroup() as group:
for lnworker in self._lnworkers_created: for lnworker in self._lnworkers_created:
await group.spawn(lnworker.stop()) await group.spawn(lnworker.stop())
self._lnworkers_created.clear() self._lnworkers_created.clear()
@ -569,7 +569,7 @@ class TestPeer(TestCaseForTestnet):
self.assertEqual(PR_PAID, w2.get_payment_status(lnaddr.paymenthash)) self.assertEqual(PR_PAID, w2.get_payment_status(lnaddr.paymenthash))
raise PaymentDone() raise PaymentDone()
async def f(): async def f():
async with TaskGroup() as group: async with OldTaskGroup() as group:
await group.spawn(p1._message_loop()) await group.spawn(p1._message_loop())
await group.spawn(p1.htlc_switch()) await group.spawn(p1.htlc_switch())
await group.spawn(p2._message_loop()) await group.spawn(p2._message_loop())
@ -643,7 +643,7 @@ class TestPeer(TestCaseForTestnet):
raise PaymentDone() raise PaymentDone()
async def f(): async def f():
async with TaskGroup() as group: async with OldTaskGroup() as group:
await group.spawn(p1._message_loop()) await group.spawn(p1._message_loop())
await group.spawn(p1.htlc_switch()) await group.spawn(p1.htlc_switch())
await group.spawn(p2._message_loop()) await group.spawn(p2._message_loop())
@ -667,10 +667,10 @@ class TestPeer(TestCaseForTestnet):
async with max_htlcs_in_flight: async with max_htlcs_in_flight:
await w1.pay_invoice(pay_req) await w1.pay_invoice(pay_req)
async def many_payments(): async def many_payments():
async with TaskGroup() as group: async with OldTaskGroup() as group:
pay_reqs_tasks = [await group.spawn(self.prepare_invoice(w2, amount_msat=payment_value_msat)) pay_reqs_tasks = [await group.spawn(self.prepare_invoice(w2, amount_msat=payment_value_msat))
for i in range(num_payments)] for i in range(num_payments)]
async with TaskGroup() as group: async with OldTaskGroup() as group:
for pay_req_task in pay_reqs_tasks: for pay_req_task in pay_reqs_tasks:
lnaddr, pay_req = pay_req_task.result() lnaddr, pay_req = pay_req_task.result()
await group.spawn(single_payment(pay_req)) await group.spawn(single_payment(pay_req))
@ -696,7 +696,7 @@ class TestPeer(TestCaseForTestnet):
self.assertEqual(PR_PAID, graph.workers['dave'].get_payment_status(lnaddr.paymenthash)) self.assertEqual(PR_PAID, graph.workers['dave'].get_payment_status(lnaddr.paymenthash))
raise PaymentDone() raise PaymentDone()
async def f(): async def f():
async with TaskGroup() as group: async with OldTaskGroup() as group:
for peer in peers: for peer in peers:
await group.spawn(peer._message_loop()) await group.spawn(peer._message_loop())
await group.spawn(peer.htlc_switch()) await group.spawn(peer.htlc_switch())
@ -740,7 +740,7 @@ class TestPeer(TestCaseForTestnet):
[edge.short_channel_id for edge in log[0].route]) [edge.short_channel_id for edge in log[0].route])
raise PaymentDone() raise PaymentDone()
async def f(): async def f():
async with TaskGroup() as group: async with OldTaskGroup() as group:
for peer in peers: for peer in peers:
await group.spawn(peer._message_loop()) await group.spawn(peer._message_loop())
await group.spawn(peer.htlc_switch()) await group.spawn(peer.htlc_switch())
@ -764,7 +764,7 @@ class TestPeer(TestCaseForTestnet):
self.assertEqual(OnionFailureCode.TEMPORARY_NODE_FAILURE, log[0].failure_msg.code) self.assertEqual(OnionFailureCode.TEMPORARY_NODE_FAILURE, log[0].failure_msg.code)
raise PaymentDone() raise PaymentDone()
async def f(): async def f():
async with TaskGroup() as group: async with OldTaskGroup() as group:
for peer in peers: for peer in peers:
await group.spawn(peer._message_loop()) await group.spawn(peer._message_loop())
await group.spawn(peer.htlc_switch()) await group.spawn(peer.htlc_switch())
@ -799,7 +799,7 @@ class TestPeer(TestCaseForTestnet):
self.assertEqual(500100000000, graph.channels[('dave', 'bob')].balance(LOCAL)) self.assertEqual(500100000000, graph.channels[('dave', 'bob')].balance(LOCAL))
raise PaymentDone() raise PaymentDone()
async def f(): async def f():
async with TaskGroup() as group: async with OldTaskGroup() as group:
for peer in peers: for peer in peers:
await group.spawn(peer._message_loop()) await group.spawn(peer._message_loop())
await group.spawn(peer.htlc_switch()) await group.spawn(peer.htlc_switch())
@ -862,7 +862,7 @@ class TestPeer(TestCaseForTestnet):
raise PaymentDone() raise PaymentDone()
async def f(): async def f():
async with TaskGroup() as group: async with OldTaskGroup() as group:
for peer in peers: for peer in peers:
await group.spawn(peer._message_loop()) await group.spawn(peer._message_loop())
await group.spawn(peer.htlc_switch()) await group.spawn(peer.htlc_switch())
@ -909,7 +909,7 @@ class TestPeer(TestCaseForTestnet):
raise NoPathFound() raise NoPathFound()
async def f(kwargs): async def f(kwargs):
async with TaskGroup() as group: async with OldTaskGroup() as group:
for peer in peers: for peer in peers:
await group.spawn(peer._message_loop()) await group.spawn(peer._message_loop())
await group.spawn(peer.htlc_switch()) await group.spawn(peer.htlc_switch())
@ -948,7 +948,7 @@ class TestPeer(TestCaseForTestnet):
async def f(): async def f():
await turn_on_trampoline_alice() await turn_on_trampoline_alice()
async with TaskGroup() as group: async with OldTaskGroup() as group:
for peer in peers: for peer in peers:
await group.spawn(peer._message_loop()) await group.spawn(peer._message_loop())
await group.spawn(peer.htlc_switch()) await group.spawn(peer.htlc_switch())
@ -1026,7 +1026,7 @@ class TestPeer(TestCaseForTestnet):
raise SuccessfulTest() raise SuccessfulTest()
async def f(): async def f():
async with TaskGroup() as group: async with OldTaskGroup() as group:
for peer in peers: for peer in peers:
await group.spawn(peer._message_loop()) await group.spawn(peer._message_loop())
await group.spawn(peer.htlc_switch()) await group.spawn(peer.htlc_switch())
@ -1196,7 +1196,7 @@ class TestPeer(TestCaseForTestnet):
raise SuccessfulTest() raise SuccessfulTest()
async def f(): async def f():
async with TaskGroup() as group: async with OldTaskGroup() as group:
for peer in [p1, p2]: for peer in [p1, p2]:
await group.spawn(peer._message_loop()) await group.spawn(peer._message_loop())
await group.spawn(peer.htlc_switch()) await group.spawn(peer.htlc_switch())
@ -1223,7 +1223,7 @@ class TestPeer(TestCaseForTestnet):
failing_task = None failing_task = None
async def f(): async def f():
nonlocal failing_task nonlocal failing_task
async with TaskGroup() as group: async with OldTaskGroup() as group:
await group.spawn(p1._message_loop()) await group.spawn(p1._message_loop())
await group.spawn(p1.htlc_switch()) await group.spawn(p1.htlc_switch())
failing_task = await group.spawn(p2._message_loop()) failing_task = await group.spawn(p2._message_loop())
@ -1252,7 +1252,7 @@ class TestPeer(TestCaseForTestnet):
failing_task = None failing_task = None
async def f(): async def f():
nonlocal failing_task nonlocal failing_task
async with TaskGroup() as group: async with OldTaskGroup() as group:
await group.spawn(p1._message_loop()) await group.spawn(p1._message_loop())
await group.spawn(p1.htlc_switch()) await group.spawn(p1.htlc_switch())
failing_task = await group.spawn(p2._message_loop()) failing_task = await group.spawn(p2._message_loop())

9
electrum/tests/test_lntransport.py

@ -3,8 +3,7 @@ import asyncio
from electrum.ecc import ECPrivkey from electrum.ecc import ECPrivkey
from electrum.lnutil import LNPeerAddr from electrum.lnutil import LNPeerAddr
from electrum.lntransport import LNResponderTransport, LNTransport from electrum.lntransport import LNResponderTransport, LNTransport
from electrum.util import OldTaskGroup
from aiorpcx import TaskGroup
from . import ElectrumTestCase from . import ElectrumTestCase
from .test_bitcoin import needs_test_with_all_chacha20_implementations from .test_bitcoin import needs_test_with_all_chacha20_implementations
@ -73,7 +72,7 @@ class TestLNTransport(ElectrumTestCase):
async def cb(reader, writer): async def cb(reader, writer):
t = LNResponderTransport(responder_key.get_secret_bytes(), reader, writer) t = LNResponderTransport(responder_key.get_secret_bytes(), reader, writer)
self.assertEqual(await t.handshake(), initiator_key.get_public_key_bytes()) self.assertEqual(await t.handshake(), initiator_key.get_public_key_bytes())
async with TaskGroup() as group: async with OldTaskGroup() as group:
await group.spawn(read_messages(t, messages_sent_by_client)) await group.spawn(read_messages(t, messages_sent_by_client))
await group.spawn(write_messages(t, messages_sent_by_server)) await group.spawn(write_messages(t, messages_sent_by_server))
responder_shaked.set() responder_shaked.set()
@ -81,7 +80,7 @@ class TestLNTransport(ElectrumTestCase):
peer_addr = LNPeerAddr('127.0.0.1', 42898, responder_key.get_public_key_bytes()) peer_addr = LNPeerAddr('127.0.0.1', 42898, responder_key.get_public_key_bytes())
t = LNTransport(initiator_key.get_secret_bytes(), peer_addr, proxy=None) t = LNTransport(initiator_key.get_secret_bytes(), peer_addr, proxy=None)
await t.handshake() await t.handshake()
async with TaskGroup() as group: async with OldTaskGroup() as group:
await group.spawn(read_messages(t, messages_sent_by_server)) await group.spawn(read_messages(t, messages_sent_by_server))
await group.spawn(write_messages(t, messages_sent_by_client)) await group.spawn(write_messages(t, messages_sent_by_client))
server_shaked.set() server_shaked.set()
@ -89,7 +88,7 @@ class TestLNTransport(ElectrumTestCase):
async def f(): async def f():
server = await asyncio.start_server(cb, '127.0.0.1', 42898) server = await asyncio.start_server(cb, '127.0.0.1', 42898)
try: try:
async with TaskGroup() as group: async with OldTaskGroup() as group:
await group.spawn(connect()) await group.spawn(connect())
await group.spawn(responder_shaked.wait()) await group.spawn(responder_shaked.wait())
await group.spawn(server_shaked.wait()) await group.spawn(server_shaked.wait())

89
electrum/util.py

@ -52,7 +52,6 @@ import attr
import aiohttp import aiohttp
from aiohttp_socks import ProxyConnector, ProxyType from aiohttp_socks import ProxyConnector, ProxyType
import aiorpcx import aiorpcx
from aiorpcx import TaskGroup
import certifi import certifi
import dns.resolver import dns.resolver
@ -1178,9 +1177,6 @@ def ignore_exceptions(func):
async def wrapper(*args, **kwargs): async def wrapper(*args, **kwargs):
try: try:
return await func(*args, **kwargs) return await func(*args, **kwargs)
except asyncio.CancelledError:
# note: with python 3.8, CancelledError no longer inherits Exception, so this catch is redundant
raise
except Exception as e: except Exception as e:
pass pass
return wrapper return wrapper
@ -1229,13 +1225,76 @@ def make_aiohttp_session(proxy: Optional[dict], headers=None, timeout=None):
return aiohttp.ClientSession(headers=headers, timeout=timeout, connector=connector) return aiohttp.ClientSession(headers=headers, timeout=timeout, connector=connector)
class SilentTaskGroup(TaskGroup): class OldTaskGroup(aiorpcx.TaskGroup):
"""Automatically raises exceptions on join; as in aiorpcx prior to version 0.20.
That is, when using TaskGroup as a context manager, if any task encounters an exception,
we would like that exception to be re-raised (propagated out). For the wait=all case,
the OldTaskGroup class is emulating the following code-snippet:
```
async with TaskGroup() as group:
await group.spawn(task1())
await group.spawn(task2())
def spawn(self, *args, **kwargs): async for task in group:
# don't complain if group is already closed. if not task.cancelled():
if self._closed: task.result()
raise asyncio.CancelledError() ```
return super().spawn(*args, **kwargs) So instead of the above, one can just write:
```
async with OldTaskGroup() as group:
await group.spawn(task1())
await group.spawn(task2())
```
"""
async def join(self):
if self._wait is all:
exc = False
try:
async for task in self:
if not task.cancelled():
task.result()
except BaseException: # including asyncio.CancelledError
exc = True
raise
finally:
if exc:
await self.cancel_remaining()
await super().join()
else:
await super().join()
if self.completed:
self.completed.result()
# We monkey-patch aiorpcx TimeoutAfter (used by timeout_after and ignore_after API),
# to fix a timing issue present in asyncio as a whole re timing out tasks.
# To see the issue we are trying to fix, consider example:
# async def outer_task():
# async with timeout_after(0.1):
# await inner_task()
# When the 0.1 sec timeout expires, inner_task will get cancelled by timeout_after (=internal cancellation).
# If around the same time (in terms of event loop iterations) another coroutine
# cancels outer_task (=external cancellation), there will be a race.
# Both cancellations work by propagating a CancelledError out to timeout_after, which then
# needs to decide (in TimeoutAfter.__aexit__) whether it's due to an internal or external cancellation.
# AFAICT asyncio provides no reliable way of distinguishing between the two.
# This patch tries to always give priority to external cancellations.
# see https://github.com/kyuupichan/aiorpcX/issues/44
# see https://github.com/aio-libs/async-timeout/issues/229
# see https://bugs.python.org/issue42130 and https://bugs.python.org/issue45098
def _aiorpcx_monkeypatched_set_new_deadline(task, deadline):
def timeout_task():
task._orig_cancel()
task._timed_out = None if getattr(task, "_externally_cancelled", False) else deadline
def mycancel(*args, **kwargs):
task._orig_cancel(*args, **kwargs)
task._externally_cancelled = True
task._timed_out = None
if not hasattr(task, "_orig_cancel"):
task._orig_cancel = task.cancel
task.cancel = mycancel
task._deadline_handle = task._loop.call_at(deadline, timeout_task)
aiorpcx.curio._set_new_deadline = _aiorpcx_monkeypatched_set_new_deadline
class NetworkJobOnDefaultServer(Logger, ABC): class NetworkJobOnDefaultServer(Logger, ABC):
@ -1263,14 +1322,14 @@ class NetworkJobOnDefaultServer(Logger, ABC):
"""Initialise fields. Called every time the underlying """Initialise fields. Called every time the underlying
server connection changes. server connection changes.
""" """
self.taskgroup = SilentTaskGroup() self.taskgroup = OldTaskGroup()
async def _start(self, interface: 'Interface'): async def _start(self, interface: 'Interface'):
self.interface = interface self.interface = interface
await interface.taskgroup.spawn(self._run_tasks(taskgroup=self.taskgroup)) await interface.taskgroup.spawn(self._run_tasks(taskgroup=self.taskgroup))
@abstractmethod @abstractmethod
async def _run_tasks(self, *, taskgroup: TaskGroup) -> None: async def _run_tasks(self, *, taskgroup: OldTaskGroup) -> None:
"""Start tasks in taskgroup. Called every time the underlying """Start tasks in taskgroup. Called every time the underlying
server connection changes. server connection changes.
""" """
@ -1669,10 +1728,8 @@ class nullcontext:
async def __aexit__(self, *excinfo): async def __aexit__(self, *excinfo):
pass pass
def get_running_loop():
"""Mimics _get_running_loop convenient functionality for sanity checks on all python versions""" def get_running_loop() -> Optional[asyncio.AbstractEventLoop]:
if sys.version_info < (3, 7):
return asyncio._get_running_loop()
try: try:
return asyncio.get_running_loop() return asyncio.get_running_loop()
except RuntimeError: except RuntimeError:

8
electrum/wallet.py

@ -46,13 +46,13 @@ import itertools
import threading import threading
import enum import enum
from aiorpcx import TaskGroup, timeout_after, TaskTimeout, ignore_after from aiorpcx import timeout_after, TaskTimeout, ignore_after
from .i18n import _ from .i18n import _
from .bip32 import BIP32Node, convert_bip32_intpath_to_strpath, convert_bip32_path_to_list_of_uint32 from .bip32 import BIP32Node, convert_bip32_intpath_to_strpath, convert_bip32_path_to_list_of_uint32
from .crypto import sha256 from .crypto import sha256
from . import util from . import util
from .util import (NotEnoughFunds, UserCancelled, profiler, from .util import (NotEnoughFunds, UserCancelled, profiler, OldTaskGroup,
format_satoshis, format_fee_satoshis, NoDynamicFeeEstimates, format_satoshis, format_fee_satoshis, NoDynamicFeeEstimates,
WalletFileException, BitcoinException, WalletFileException, BitcoinException,
InvalidPassword, format_time, timestamp_to_datetime, Satoshis, InvalidPassword, format_time, timestamp_to_datetime, Satoshis,
@ -134,7 +134,7 @@ async def _append_utxos_to_inputs(*, inputs: List[PartialTxInput], network: 'Net
inputs.append(txin) inputs.append(txin)
u = await network.listunspent_for_scripthash(scripthash) u = await network.listunspent_for_scripthash(scripthash)
async with TaskGroup() as group: async with OldTaskGroup() as group:
for item in u: for item in u:
if len(inputs) >= imax: if len(inputs) >= imax:
break break
@ -155,7 +155,7 @@ async def sweep_preparations(privkeys, network: 'Network', imax=100):
inputs = [] # type: List[PartialTxInput] inputs = [] # type: List[PartialTxInput]
keypairs = {} keypairs = {}
async with TaskGroup() as group: async with OldTaskGroup() as group:
for sec in privkeys: for sec in privkeys:
txin_type, privkey, compressed = bitcoin.deserialize_privkey(sec) txin_type, privkey, compressed = bitcoin.deserialize_privkey(sec)
await group.spawn(find_utxos_for_privkey(txin_type, privkey, compressed)) await group.spawn(find_utxos_for_privkey(txin_type, privkey, compressed))

6
run_electrum

@ -27,7 +27,7 @@ import os
import sys import sys
MIN_PYTHON_VERSION = "3.6.1" # FIXME duplicated from setup.py MIN_PYTHON_VERSION = "3.8.0" # FIXME duplicated from setup.py
_min_python_version_tuple = tuple(map(int, (MIN_PYTHON_VERSION.split(".")))) _min_python_version_tuple = tuple(map(int, (MIN_PYTHON_VERSION.split("."))))
@ -63,8 +63,8 @@ def check_imports():
import aiorpcx import aiorpcx
except ImportError as e: except ImportError as e:
sys.exit(f"Error: {str(e)}. Try 'sudo python3 -m pip install <module-name>'") sys.exit(f"Error: {str(e)}. Try 'sudo python3 -m pip install <module-name>'")
if not ((0, 18, 7) <= aiorpcx._version < (0, 19)): if not ((0, 22, 0) <= aiorpcx._version < (0, 23)):
raise RuntimeError(f'aiorpcX version {aiorpcx._version} does not match required: 0.18.7<=ver<0.19') raise RuntimeError(f'aiorpcX version {aiorpcx._version} does not match required: 0.22.0<=ver<0.23')
# the following imports are for pyinstaller # the following imports are for pyinstaller
from google.protobuf import descriptor from google.protobuf import descriptor
from google.protobuf import message from google.protobuf import message

2
setup.py

@ -12,7 +12,7 @@ import subprocess
from setuptools import setup, find_packages from setuptools import setup, find_packages
from setuptools.command.install import install from setuptools.command.install import install
MIN_PYTHON_VERSION = "3.6.1" MIN_PYTHON_VERSION = "3.8.0"
_min_python_version_tuple = tuple(map(int, (MIN_PYTHON_VERSION.split(".")))) _min_python_version_tuple = tuple(map(int, (MIN_PYTHON_VERSION.split("."))))

1
tox.ini

@ -1,5 +1,4 @@
[tox] [tox]
envlist = py36, py37
[testenv] [testenv]
deps= deps=

Loading…
Cancel
Save