Browse Source

wallet: put Sync and Verifier in their own TaskGroup, and that into interface.group

3.3.3.1
SomberNight 6 years ago
parent
commit
e829d6bbcf
No known key found for this signature in database GPG Key ID: B33B5F232C6271E9
  1. 54
      electrum/address_synchronizer.py
  2. 2
      electrum/daemon.py
  3. 2
      electrum/gui/kivy/main_window.py
  4. 2
      electrum/gui/qt/__init__.py
  5. 2
      electrum/gui/stdio.py
  6. 2
      electrum/gui/text.py
  7. 22
      electrum/interface.py
  8. 14
      electrum/network.py
  9. 8
      electrum/synchronizer.py
  10. 18
      electrum/util.py
  11. 12
      electrum/verifier.py
  12. 2
      run_electrum

54
electrum/address_synchronizer.py

@ -26,11 +26,9 @@ import asyncio
import itertools import itertools
from collections import defaultdict from collections import defaultdict
from aiorpcx import TaskGroup
from . import bitcoin from . import bitcoin
from .bitcoin import COINBASE_MATURITY, TYPE_ADDRESS, TYPE_PUBKEY from .bitcoin import COINBASE_MATURITY, TYPE_ADDRESS, TYPE_PUBKEY
from .util import PrintError, profiler, bfh, VerifiedTxInfo, TxMinedStatus from .util import PrintError, profiler, bfh, VerifiedTxInfo, TxMinedStatus, aiosafe, CustomTaskGroup
from .transaction import Transaction, TxOutput from .transaction import Transaction, TxOutput
from .synchronizer import Synchronizer from .synchronizer import Synchronizer
from .verifier import SPV from .verifier import SPV
@ -62,6 +60,7 @@ class AddressSynchronizer(PrintError):
self.synchronizer = None self.synchronizer = None
self.verifier = None self.verifier = None
self.sync_restart_lock = asyncio.Lock() self.sync_restart_lock = asyncio.Lock()
self.group = None
# locks: if you need to take multiple ones, acquire them in the order they are defined here! # locks: if you need to take multiple ones, acquire them in the order they are defined here!
self.lock = threading.RLock() self.lock = threading.RLock()
self.transaction_lock = threading.RLock() self.transaction_lock = threading.RLock()
@ -138,34 +137,45 @@ class AddressSynchronizer(PrintError):
# add it in case it was previously unconfirmed # add it in case it was previously unconfirmed
self.add_unverified_tx(tx_hash, tx_height) self.add_unverified_tx(tx_hash, tx_height)
async def on_default_server_changed(self, evt): @aiosafe
async def on_default_server_changed(self, event):
async with self.sync_restart_lock: async with self.sync_restart_lock:
interface = self.network.interface self.stop_threads()
if interface is None: await self._start_threads()
return # we should get called again soon
self.verifier = SPV(self.network, self) def start_network(self, network):
self.synchronizer = Synchronizer(self)
await interface.group.spawn(self.verifier.main(interface))
await interface.group.spawn(self.synchronizer.send_subscriptions(interface))
await interface.group.spawn(self.synchronizer.handle_status(interface))
await interface.group.spawn(self.synchronizer.main())
def start_threads(self, network):
self.network = network self.network = network
if self.network is not None: if self.network is not None:
self.network.register_callback(self.on_default_server_changed, ['default_server_changed']) self.network.register_callback(self.on_default_server_changed, ['default_server_changed'])
self.network.trigger_callback('default_server_changed') asyncio.run_coroutine_threadsafe(self._start_threads(), network.asyncio_loop)
else:
self.verifier = None async def _start_threads(self):
self.synchronizer = None interface = self.network.interface
if interface is None:
return # we should get called again soon
self.verifier = SPV(self.network, self)
self.synchronizer = synchronizer = Synchronizer(self)
assert self.group is None, 'group already exists'
self.group = CustomTaskGroup()
async def job():
async with self.group as group:
await group.spawn(self.verifier.main(group))
await group.spawn(self.synchronizer.send_subscriptions(group))
await group.spawn(self.synchronizer.handle_status(group))
await group.spawn(self.synchronizer.main())
# we are being cancelled now
interface.session.unsubscribe(synchronizer.status_queue)
await interface.group.spawn(job)
def stop_threads(self): def stop_threads(self):
if self.network: if self.network:
#self.network.remove_jobs([self.verifier])
self.synchronizer = None self.synchronizer = None
self.verifier = None self.verifier = None
# Now no references to the synchronizer or verifier if self.group:
# remain so they will be GC-ed asyncio.run_coroutine_threadsafe(self.group.cancel_remaining(), self.network.asyncio_loop)
self.group = None
self.storage.put('stored_height', self.get_local_height()) self.storage.put('stored_height', self.get_local_height())
self.save_transactions() self.save_transactions()
self.save_verified_tx() self.save_verified_tx()

2
electrum/daemon.py

@ -243,7 +243,7 @@ class Daemon(DaemonThread):
if storage.get_action(): if storage.get_action():
return return
wallet = Wallet(storage) wallet = Wallet(storage)
wallet.start_threads(self.network) wallet.start_network(self.network)
self.wallets[path] = wallet self.wallets[path] = wallet
return wallet return wallet

2
electrum/gui/kivy/main_window.py

@ -512,7 +512,7 @@ class ElectrumWindow(App):
def on_wizard_complete(self, wizard, wallet): def on_wizard_complete(self, wizard, wallet):
if wallet: # wizard returned a wallet if wallet: # wizard returned a wallet
wallet.start_threads(self.daemon.network) wallet.start_network(self.daemon.network)
self.daemon.add_wallet(wallet) self.daemon.add_wallet(wallet)
self.load_wallet(wallet) self.load_wallet(wallet)
elif not self.wallet: elif not self.wallet:

2
electrum/gui/qt/__init__.py

@ -236,7 +236,7 @@ class ElectrumGui:
if not self.daemon.get_wallet(wallet.storage.path): if not self.daemon.get_wallet(wallet.storage.path):
# wallet was not in memory # wallet was not in memory
wallet.start_threads(self.daemon.network) wallet.start_network(self.daemon.network)
self.daemon.add_wallet(wallet) self.daemon.add_wallet(wallet)
try: try:
for w in self.windows: for w in self.windows:

2
electrum/gui/stdio.py

@ -34,7 +34,7 @@ class ElectrumGui:
self.str_fee = "" self.str_fee = ""
self.wallet = Wallet(storage) self.wallet = Wallet(storage)
self.wallet.start_threads(self.network) self.wallet.start_network(self.network)
self.contacts = self.wallet.contacts self.contacts = self.wallet.contacts
self.network.register_callback(self.on_network, ['updated', 'banner']) self.network.register_callback(self.on_network, ['updated', 'banner'])

2
electrum/gui/text.py

@ -30,7 +30,7 @@ class ElectrumGui:
password = getpass.getpass('Password:', stream=None) password = getpass.getpass('Password:', stream=None)
storage.decrypt(password) storage.decrypt(password)
self.wallet = Wallet(storage) self.wallet = Wallet(storage)
self.wallet.start_threads(self.network) self.wallet.start_network(self.network)
self.contacts = self.wallet.contacts self.contacts = self.wallet.contacts
locale.setlocale(locale.LC_ALL, '') locale.setlocale(locale.LC_ALL, '')

22
electrum/interface.py

@ -24,24 +24,21 @@
# SOFTWARE. # SOFTWARE.
import os import os
import re import re
import socket
import ssl import ssl
import sys import sys
import traceback import traceback
import asyncio import asyncio
import concurrent.futures
from typing import Tuple, Union from typing import Tuple, Union
import aiorpcx import aiorpcx
from aiorpcx import ClientSession, Notification, TaskGroup from aiorpcx import ClientSession, Notification
from .util import PrintError, aiosafe, bfh, AIOSafeSilentException from .util import PrintError, aiosafe, bfh, AIOSafeSilentException, CustomTaskGroup
from . import util from . import util
from . import x509 from . import x509
from . import pem from . import pem
from .version import ELECTRUM_VERSION, PROTOCOL_VERSION from .version import ELECTRUM_VERSION, PROTOCOL_VERSION
from . import blockchain from . import blockchain
from .blockchain import deserialize_header
from . import constants from . import constants
@ -83,6 +80,14 @@ class NotificationSession(ClientSession):
self.cache[key] = result self.cache[key] = result
await queue.put(params + [result]) await queue.put(params + [result])
def unsubscribe(self, queue):
"""Unsubscribe a callback to free object references to enable GC."""
# note: we can't unsubscribe from the server, so we keep receiving
# subsequent notifications
for v in self.subscriptions.values():
if queue in v:
v.remove(queue)
# FIXME this is often raised inside a TaskGroup, but then it's not silent :( # FIXME this is often raised inside a TaskGroup, but then it's not silent :(
class GracefulDisconnect(AIOSafeSilentException): pass class GracefulDisconnect(AIOSafeSilentException): pass
@ -94,13 +99,6 @@ class ErrorParsingSSLCert(Exception): pass
class ErrorGettingSSLCertFromServer(Exception): pass class ErrorGettingSSLCertFromServer(Exception): pass
class CustomTaskGroup(TaskGroup):
def spawn(self, *args, **kwargs):
if self._closed:
raise asyncio.CancelledError()
return super().spawn(*args, **kwargs)
def deserialize_server(server_str: str) -> Tuple[str, str, str]: def deserialize_server(server_str: str) -> Tuple[str, str, str]:
# host might be IPv6 address, hence do rsplit: # host might be IPv6 address, hence do rsplit:

14
electrum/network.py

@ -211,9 +211,6 @@ class Network(PrintError):
self.banner = '' self.banner = ''
self.donation_address = '' self.donation_address = ''
self.relay_fee = None self.relay_fee = None
# callbacks passed with subscriptions
self.subscriptions = defaultdict(list) # note: needs self.callback_lock
self.sub_cache = {} # note: needs self.interface_lock
# callbacks set by the GUI # callbacks set by the GUI
self.callbacks = defaultdict(list) # note: needs self.callback_lock self.callbacks = defaultdict(list) # note: needs self.callback_lock
@ -272,6 +269,7 @@ class Network(PrintError):
callbacks = self.callbacks[event][:] callbacks = self.callbacks[event][:]
for callback in callbacks: for callback in callbacks:
if asyncio.iscoroutinefunction(callback): if asyncio.iscoroutinefunction(callback):
# FIXME: if callback throws, we will lose the traceback
asyncio.run_coroutine_threadsafe(callback(event, *args), self.asyncio_loop) asyncio.run_coroutine_threadsafe(callback(event, *args), self.asyncio_loop)
else: else:
callback(event, *args) callback(event, *args)
@ -605,16 +603,6 @@ class Network(PrintError):
""" hashable index for subscriptions and cache""" """ hashable index for subscriptions and cache"""
return str(method) + (':' + str(params[0]) if params else '') return str(method) + (':' + str(params[0]) if params else '')
def unsubscribe(self, callback):
'''Unsubscribe a callback to free object references to enable GC.'''
# Note: we can't unsubscribe from the server, so if we receive
# subsequent notifications process_response() will emit a harmless
# "received unexpected notification" warning
with self.callback_lock:
for v in self.subscriptions.values():
if callback in v:
v.remove(callback)
@with_interface_lock @with_interface_lock
def connection_down(self, server): def connection_down(self, server):
'''A connection to server either went down, or was never made. '''A connection to server either went down, or was never made.

8
electrum/synchronizer.py

@ -144,16 +144,16 @@ class Synchronizer(PrintError):
await self.session.subscribe('blockchain.scripthash.subscribe', [h], self.status_queue) await self.session.subscribe('blockchain.scripthash.subscribe', [h], self.status_queue)
self.requested_addrs.remove(addr) self.requested_addrs.remove(addr)
async def send_subscriptions(self, interface): async def send_subscriptions(self, group: TaskGroup):
while True: while True:
addr = await self.add_queue.get() addr = await self.add_queue.get()
await interface.group.spawn(self.subscribe_to_address, addr) await group.spawn(self.subscribe_to_address, addr)
async def handle_status(self, interface): async def handle_status(self, group: TaskGroup):
while True: while True:
h, status = await self.status_queue.get() h, status = await self.status_queue.get()
addr = self.scripthash_to_address[h] addr = self.scripthash_to_address[h]
await interface.group.spawn(self.on_address_status, addr, status) await group.spawn(self.on_address_status, addr, status)
@property @property
def session(self): def session(self):

18
electrum/util.py

@ -35,14 +35,15 @@ import stat
import inspect import inspect
from locale import localeconv from locale import localeconv
import asyncio import asyncio
import urllib.request, urllib.parse, urllib.error
from .i18n import _ import queue
import aiohttp import aiohttp
from aiohttp_socks import SocksConnector, SocksVer from aiohttp_socks import SocksConnector, SocksVer
from aiorpcx import TaskGroup
from .i18n import _
import urllib.request, urllib.parse, urllib.error
import queue
def inv_dict(d): def inv_dict(d):
return {v: k for k, v in d.items()} return {v: k for k, v in d.items()}
@ -972,3 +973,12 @@ def make_aiohttp_session(proxy):
return aiohttp.ClientSession(headers={'User-Agent' : 'Electrum'}, timeout=aiohttp.ClientTimeout(total=10), connector=connector) return aiohttp.ClientSession(headers={'User-Agent' : 'Electrum'}, timeout=aiohttp.ClientTimeout(total=10), connector=connector)
else: else:
return aiohttp.ClientSession(headers={'User-Agent' : 'Electrum'}, timeout=aiohttp.ClientTimeout(total=10)) return aiohttp.ClientSession(headers={'User-Agent' : 'Electrum'}, timeout=aiohttp.ClientTimeout(total=10))
class CustomTaskGroup(TaskGroup):
def spawn(self, *args, **kwargs):
# don't complain if group is already closed.
if self._closed:
raise asyncio.CancelledError()
return super().spawn(*args, **kwargs)

12
electrum/verifier.py

@ -24,6 +24,8 @@
import asyncio import asyncio
from typing import Sequence, Optional from typing import Sequence, Optional
from aiorpcx import TaskGroup
from .util import ThreadJob, bh2u, VerifiedTxInfo from .util import ThreadJob, bh2u, VerifiedTxInfo
from .bitcoin import Hash, hash_decode, hash_encode from .bitcoin import Hash, hash_decode, hash_encode
from .transaction import Transaction from .transaction import Transaction
@ -47,12 +49,12 @@ class SPV(ThreadJob):
self.merkle_roots = {} # txid -> merkle root (once it has been verified) self.merkle_roots = {} # txid -> merkle root (once it has been verified)
self.requested_merkle = set() # txid set of pending requests self.requested_merkle = set() # txid set of pending requests
async def main(self, interface): async def main(self, group: TaskGroup):
while True: while True:
await self._request_proofs(interface) await self._request_proofs(group)
await asyncio.sleep(0.1) await asyncio.sleep(0.1)
async def _request_proofs(self, interface): async def _request_proofs(self, group: TaskGroup):
blockchain = self.network.blockchain() blockchain = self.network.blockchain()
if not blockchain: if not blockchain:
self.print_error("no blockchain") self.print_error("no blockchain")
@ -70,12 +72,12 @@ class SPV(ThreadJob):
if header is None: if header is None:
index = tx_height // 2016 index = tx_height // 2016
if index < len(blockchain.checkpoints): if index < len(blockchain.checkpoints):
await interface.group.spawn(self.network.request_chunk(tx_height, None, can_return_early=True)) await group.spawn(self.network.request_chunk(tx_height, None, can_return_early=True))
elif (tx_hash not in self.requested_merkle elif (tx_hash not in self.requested_merkle
and tx_hash not in self.merkle_roots): and tx_hash not in self.merkle_roots):
self.print_error('requested merkle', tx_hash) self.print_error('requested merkle', tx_hash)
self.requested_merkle.add(tx_hash) self.requested_merkle.add(tx_hash)
await interface.group.spawn(self._request_and_verify_single_proof, tx_hash, tx_height) await group.spawn(self._request_and_verify_single_proof, tx_hash, tx_height)
if self.network.blockchain() != self.blockchain: if self.network.blockchain() != self.blockchain:
self.blockchain = self.network.blockchain() self.blockchain = self.network.blockchain()

2
run_electrum

@ -135,7 +135,7 @@ def run_non_RPC(config):
if not config.get('offline'): if not config.get('offline'):
network = Network(config) network = Network(config)
network.start() network.start()
wallet.start_threads(network) wallet.start_network(network)
print_msg("Recovering wallet...") print_msg("Recovering wallet...")
wallet.synchronize() wallet.synchronize()
wallet.wait_until_synchronized() wallet.wait_until_synchronized()

Loading…
Cancel
Save