diff --git a/electrum/gui/qt/bip39_recovery_dialog.py b/electrum/gui/qt/bip39_recovery_dialog.py index 0588296fa..875eaaeb5 100644 --- a/electrum/gui/qt/bip39_recovery_dialog.py +++ b/electrum/gui/qt/bip39_recovery_dialog.py @@ -2,6 +2,9 @@ # Distributed under the MIT software license, see the accompanying # file LICENCE or http://www.opensource.org/licenses/mit-license.php +import asyncio +import concurrent.futures + from PyQt5.QtCore import Qt from PyQt5.QtWidgets import QWidget, QVBoxLayout, QGridLayout, QLabel, QListWidget, QListWidgetItem @@ -29,15 +32,27 @@ class Bip39RecoveryDialog(WindowModalDialog): self.content = QVBoxLayout() self.content.addWidget(QLabel(_('Scanning common paths for existing accounts...'))) vbox.addLayout(self.content) + + self.thread = TaskThread(self) + self.thread.finished.connect(self.deleteLater) # see #3956 + network = Network.get_instance() + coro = account_discovery(network, self.get_account_xpub) + fut = asyncio.run_coroutine_threadsafe(coro, network.asyncio_loop) + self.thread.add( + fut.result, + on_success=self.on_recovery_success, + on_error=self.on_recovery_error, + cancel=fut.cancel, + ) + self.ok_button = OkButton(self) self.ok_button.clicked.connect(self.on_ok_button_click) self.ok_button.setEnabled(False) - vbox.addLayout(Buttons(CancelButton(self), self.ok_button)) + cancel_button = CancelButton(self) + cancel_button.clicked.connect(fut.cancel) + vbox.addLayout(Buttons(cancel_button, self.ok_button)) self.finished.connect(self.on_finished) self.show() - self.thread = TaskThread(self) - self.thread.finished.connect(self.deleteLater) # see #3956 - self.thread.add(self.recovery, self.on_recovery_success, None, self.on_recovery_error) def on_finished(self): self.thread.stop() @@ -47,11 +62,6 @@ class Bip39RecoveryDialog(WindowModalDialog): account = item.data(self.ROLE_ACCOUNT) self.on_account_select(account) - def recovery(self): - network = Network.get_instance() - coroutine = account_discovery(network, self.get_account_xpub) - return network.run_from_another_thread(coroutine) - def on_recovery_success(self, accounts): self.clear_content() if len(accounts) == 0: @@ -67,6 +77,9 @@ class Bip39RecoveryDialog(WindowModalDialog): self.content.addWidget(self.list) def on_recovery_error(self, exc_info): + e = exc_info[1] + if isinstance(e, concurrent.futures.CancelledError): + return self.clear_content() self.content.addWidget(QLabel(_('Error: Account discovery failed.'))) _logger.error(f"recovery error", exc_info=exc_info) diff --git a/electrum/gui/qt/main_window.py b/electrum/gui/qt/main_window.py index d7d090131..7866eb370 100644 --- a/electrum/gui/qt/main_window.py +++ b/electrum/gui/qt/main_window.py @@ -36,7 +36,8 @@ import base64 from functools import partial import queue import asyncio -from typing import Optional, TYPE_CHECKING, Sequence, List, Union +from typing import Optional, TYPE_CHECKING, Sequence, List, Union, Dict, Set +import concurrent.futures from PyQt5.QtGui import QPixmap, QKeySequence, QIcon, QCursor, QFont from PyQt5.QtCore import Qt, QRect, QStringListModel, QSize, pyqtSignal, QPoint @@ -203,6 +204,9 @@ class ElectrumWindow(QMainWindow, MessageBoxMixin, Logger): self.pending_invoice = None Logger.__init__(self) + self._coroutines_scheduled = set() # type: Set[concurrent.futures.Future] + self.thread = TaskThread(self, self.on_error) + self.tx_notification_queue = queue.Queue() self.tx_notification_last_time = 0 @@ -318,16 +322,23 @@ class ElectrumWindow(QMainWindow, MessageBoxMixin, Logger): self._update_check_thread.start() def run_coroutine_from_thread(self, coro, on_result=None): - def task(): + if self._cleaned_up: + self.logger.warning(f"stopping or already stopped but run_coroutine_from_thread was called.") + return + async def wrapper(): try: - f = asyncio.run_coroutine_threadsafe(coro, self.network.asyncio_loop) - r = f.result() - if on_result: - on_result(r) + res = await coro except Exception as e: self.logger.exception("exception in coro scheduled via window.wallet") - self.show_error_signal.emit(str(e)) - self.wallet.thread.add(task) + self.show_error_signal.emit(repr(e)) + else: + if on_result: + on_result(res) + finally: + self._coroutines_scheduled.discard(fut) + + fut = asyncio.run_coroutine_threadsafe(wrapper(), self.network.asyncio_loop) + self._coroutines_scheduled.add(fut) def on_fx_history(self): self.history_model.refresh('fx_history') @@ -400,7 +411,7 @@ class ElectrumWindow(QMainWindow, MessageBoxMixin, Logger): def on_error(self, exc_info): e = exc_info[1] - if isinstance(e, UserCancelled): + if isinstance(e, (UserCancelled, concurrent.futures.CancelledError)): pass elif isinstance(e, UserFacingException): self.show_error(str(e)) @@ -502,7 +513,6 @@ class ElectrumWindow(QMainWindow, MessageBoxMixin, Logger): @profiler def load_wallet(self, wallet: Abstract_Wallet): - wallet.thread = TaskThread(self, self.on_error) self.update_recently_visited(wallet.storage.path) if wallet.has_lightning(): util.trigger_callback('channels_updated', wallet) @@ -1592,11 +1602,8 @@ class ElectrumWindow(QMainWindow, MessageBoxMixin, Logger): if not self.question(msg): return self.save_pending_invoice() - def task(): - coro = self.wallet.lnworker.pay_invoice(invoice, amount_msat=amount_msat, attempts=LN_NUM_PAYMENT_ATTEMPTS) - fut = asyncio.run_coroutine_threadsafe(coro, self.network.asyncio_loop) - return fut.result() - self.wallet.thread.add(task) + coro = self.wallet.lnworker.pay_invoice(invoice, amount_msat=amount_msat, attempts=LN_NUM_PAYMENT_ATTEMPTS) + self.run_coroutine_from_thread(coro) def on_request_status(self, wallet, key, status): if wallet != self.wallet: @@ -2709,7 +2716,7 @@ class ElectrumWindow(QMainWindow, MessageBoxMixin, Logger): # (signature) wrapped C/C++ object has been deleted pass - self.wallet.thread.add(task, on_success=show_signed_message) + self.thread.add(task, on_success=show_signed_message) def do_verify(self, address, message, signature): address = address.text().strip() @@ -2782,7 +2789,7 @@ class ElectrumWindow(QMainWindow, MessageBoxMixin, Logger): # (message_e) wrapped C/C++ object has been deleted pass - self.wallet.thread.add(task, on_success=setText) + self.thread.add(task, on_success=setText) def do_encrypt(self, message_e, pubkey_e, encrypted_e): message = message_e.toPlainText() @@ -3234,9 +3241,11 @@ class ElectrumWindow(QMainWindow, MessageBoxMixin, Logger): if self._cleaned_up: return self._cleaned_up = True - if self.wallet.thread: - self.wallet.thread.stop() - self.wallet.thread = None + if self.thread: + self.thread.stop() + self.thread = None + for fut in self._coroutines_scheduled: + fut.cancel() util.unregister_callback(self.on_network) self.config.set_key("is_maximized", self.isMaximized()) if not self.isMaximized(): diff --git a/electrum/gui/qt/util.py b/electrum/gui/qt/util.py index 40a4c1af9..d9158ecc3 100644 --- a/electrum/gui/qt/util.py +++ b/electrum/gui/qt/util.py @@ -29,6 +29,7 @@ from PyQt5.QtWidgets import (QPushButton, QLabel, QMessageBox, QHBoxLayout, from electrum.i18n import _, languages from electrum.util import FileImportFailed, FileExportFailed, make_aiohttp_session, resource_path from electrum.invoices import PR_UNPAID, PR_PAID, PR_EXPIRED, PR_INFLIGHT, PR_UNKNOWN, PR_FAILED, PR_ROUTING, PR_UNCONFIRMED +from electrum.logging import Logger if TYPE_CHECKING: from .main_window import ElectrumWindow @@ -902,7 +903,7 @@ class PasswordLineEdit(QLineEdit): super().clear() -class TaskThread(QThread): +class TaskThread(QThread, Logger): '''Thread that runs background tasks. Callbacks are guaranteed to happen in the context of its parent.''' @@ -911,24 +912,35 @@ class TaskThread(QThread): cb_success: Optional[Callable] cb_done: Optional[Callable] cb_error: Optional[Callable] + cancel: Optional[Callable] = None doneSig = pyqtSignal(object, object, object) def __init__(self, parent, on_error=None): - super(TaskThread, self).__init__(parent) + QThread.__init__(self, parent) + Logger.__init__(self) self.on_error = on_error self.tasks = queue.Queue() + self._cur_task = None # type: Optional[TaskThread.Task] + self._stopping = False self.doneSig.connect(self.on_done) self.start() - def add(self, task, on_success=None, on_done=None, on_error=None): + def add(self, task, on_success=None, on_done=None, on_error=None, *, cancel=None): + if self._stopping: + self.logger.warning(f"stopping or already stopped but tried to add new task.") + return on_error = on_error or self.on_error - self.tasks.put(TaskThread.Task(task, on_success, on_done, on_error)) + task_ = TaskThread.Task(task, on_success, on_done, on_error, cancel=cancel) + self.tasks.put(task_) def run(self): while True: + if self._stopping: + break task = self.tasks.get() # type: TaskThread.Task - if not task: + self._cur_task = task + if not task or self._stopping: break try: result = task.task() @@ -944,7 +956,21 @@ class TaskThread(QThread): cb_result(result) def stop(self): - self.tasks.put(None) + self._stopping = True + # try to cancel currently running task now. + # if the task does not implement "cancel", we will have to wait until it finishes. + task = self._cur_task + if task and task.cancel: + task.cancel() + # cancel the remaining tasks in the queue + while True: + try: + task = self.tasks.get_nowait() + except queue.Empty: + break + if task and task.cancel: + task.cancel() + self.tasks.put(None) # in case the thread is still waiting on the queue self.exit() self.wait() diff --git a/electrum/lnworker.py b/electrum/lnworker.py index 5b934f449..3ae2ed4cd 100644 --- a/electrum/lnworker.py +++ b/electrum/lnworker.py @@ -1106,7 +1106,9 @@ class LNWallet(LNWorker): self.save_payment_info(info) self.wallet.set_label(key, lnaddr.get_description()) + self.logger.info(f"pay_invoice starting session for RHASH={payment_hash.hex()}") self.set_invoice_status(key, PR_INFLIGHT) + success = False try: await self.pay_to_node( node_pubkey=invoice_pubkey, @@ -1121,8 +1123,9 @@ class LNWallet(LNWorker): success = True except PaymentFailure as e: self.logger.info(f'payment failure: {e!r}') - success = False reason = str(e) + finally: + self.logger.info(f"pay_invoice ending session for RHASH={payment_hash.hex()}. {success=}") if success: self.set_invoice_status(key, PR_PAID) util.trigger_callback('payment_succeeded', self.wallet, key)