Browse Source

Merge pull request #7756 from SomberNight/202204_qt_taskthread_cancel

qt taskthread cleaner shutdown
patch-4
ThomasV 3 years ago
committed by GitHub
parent
commit
3e6595ae98
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 31
      electrum/gui/qt/bip39_recovery_dialog.py
  2. 47
      electrum/gui/qt/main_window.py
  3. 38
      electrum/gui/qt/util.py
  4. 5
      electrum/lnworker.py

31
electrum/gui/qt/bip39_recovery_dialog.py

@ -2,6 +2,9 @@
# Distributed under the MIT software license, see the accompanying # Distributed under the MIT software license, see the accompanying
# file LICENCE or http://www.opensource.org/licenses/mit-license.php # file LICENCE or http://www.opensource.org/licenses/mit-license.php
import asyncio
import concurrent.futures
from PyQt5.QtCore import Qt from PyQt5.QtCore import Qt
from PyQt5.QtWidgets import QWidget, QVBoxLayout, QGridLayout, QLabel, QListWidget, QListWidgetItem from PyQt5.QtWidgets import QWidget, QVBoxLayout, QGridLayout, QLabel, QListWidget, QListWidgetItem
@ -29,15 +32,27 @@ class Bip39RecoveryDialog(WindowModalDialog):
self.content = QVBoxLayout() self.content = QVBoxLayout()
self.content.addWidget(QLabel(_('Scanning common paths for existing accounts...'))) self.content.addWidget(QLabel(_('Scanning common paths for existing accounts...')))
vbox.addLayout(self.content) vbox.addLayout(self.content)
self.thread = TaskThread(self)
self.thread.finished.connect(self.deleteLater) # see #3956
network = Network.get_instance()
coro = account_discovery(network, self.get_account_xpub)
fut = asyncio.run_coroutine_threadsafe(coro, network.asyncio_loop)
self.thread.add(
fut.result,
on_success=self.on_recovery_success,
on_error=self.on_recovery_error,
cancel=fut.cancel,
)
self.ok_button = OkButton(self) self.ok_button = OkButton(self)
self.ok_button.clicked.connect(self.on_ok_button_click) self.ok_button.clicked.connect(self.on_ok_button_click)
self.ok_button.setEnabled(False) self.ok_button.setEnabled(False)
vbox.addLayout(Buttons(CancelButton(self), self.ok_button)) cancel_button = CancelButton(self)
cancel_button.clicked.connect(fut.cancel)
vbox.addLayout(Buttons(cancel_button, self.ok_button))
self.finished.connect(self.on_finished) self.finished.connect(self.on_finished)
self.show() self.show()
self.thread = TaskThread(self)
self.thread.finished.connect(self.deleteLater) # see #3956
self.thread.add(self.recovery, self.on_recovery_success, None, self.on_recovery_error)
def on_finished(self): def on_finished(self):
self.thread.stop() self.thread.stop()
@ -47,11 +62,6 @@ class Bip39RecoveryDialog(WindowModalDialog):
account = item.data(self.ROLE_ACCOUNT) account = item.data(self.ROLE_ACCOUNT)
self.on_account_select(account) self.on_account_select(account)
def recovery(self):
network = Network.get_instance()
coroutine = account_discovery(network, self.get_account_xpub)
return network.run_from_another_thread(coroutine)
def on_recovery_success(self, accounts): def on_recovery_success(self, accounts):
self.clear_content() self.clear_content()
if len(accounts) == 0: if len(accounts) == 0:
@ -67,6 +77,9 @@ class Bip39RecoveryDialog(WindowModalDialog):
self.content.addWidget(self.list) self.content.addWidget(self.list)
def on_recovery_error(self, exc_info): def on_recovery_error(self, exc_info):
e = exc_info[1]
if isinstance(e, concurrent.futures.CancelledError):
return
self.clear_content() self.clear_content()
self.content.addWidget(QLabel(_('Error: Account discovery failed.'))) self.content.addWidget(QLabel(_('Error: Account discovery failed.')))
_logger.error(f"recovery error", exc_info=exc_info) _logger.error(f"recovery error", exc_info=exc_info)

47
electrum/gui/qt/main_window.py

@ -36,7 +36,8 @@ import base64
from functools import partial from functools import partial
import queue import queue
import asyncio import asyncio
from typing import Optional, TYPE_CHECKING, Sequence, List, Union from typing import Optional, TYPE_CHECKING, Sequence, List, Union, Dict, Set
import concurrent.futures
from PyQt5.QtGui import QPixmap, QKeySequence, QIcon, QCursor, QFont from PyQt5.QtGui import QPixmap, QKeySequence, QIcon, QCursor, QFont
from PyQt5.QtCore import Qt, QRect, QStringListModel, QSize, pyqtSignal, QPoint from PyQt5.QtCore import Qt, QRect, QStringListModel, QSize, pyqtSignal, QPoint
@ -203,6 +204,9 @@ class ElectrumWindow(QMainWindow, MessageBoxMixin, Logger):
self.pending_invoice = None self.pending_invoice = None
Logger.__init__(self) Logger.__init__(self)
self._coroutines_scheduled = set() # type: Set[concurrent.futures.Future]
self.thread = TaskThread(self, self.on_error)
self.tx_notification_queue = queue.Queue() self.tx_notification_queue = queue.Queue()
self.tx_notification_last_time = 0 self.tx_notification_last_time = 0
@ -318,16 +322,23 @@ class ElectrumWindow(QMainWindow, MessageBoxMixin, Logger):
self._update_check_thread.start() self._update_check_thread.start()
def run_coroutine_from_thread(self, coro, on_result=None): def run_coroutine_from_thread(self, coro, on_result=None):
def task(): if self._cleaned_up:
self.logger.warning(f"stopping or already stopped but run_coroutine_from_thread was called.")
return
async def wrapper():
try: try:
f = asyncio.run_coroutine_threadsafe(coro, self.network.asyncio_loop) res = await coro
r = f.result()
if on_result:
on_result(r)
except Exception as e: except Exception as e:
self.logger.exception("exception in coro scheduled via window.wallet") self.logger.exception("exception in coro scheduled via window.wallet")
self.show_error_signal.emit(str(e)) self.show_error_signal.emit(repr(e))
self.wallet.thread.add(task) else:
if on_result:
on_result(res)
finally:
self._coroutines_scheduled.discard(fut)
fut = asyncio.run_coroutine_threadsafe(wrapper(), self.network.asyncio_loop)
self._coroutines_scheduled.add(fut)
def on_fx_history(self): def on_fx_history(self):
self.history_model.refresh('fx_history') self.history_model.refresh('fx_history')
@ -400,7 +411,7 @@ class ElectrumWindow(QMainWindow, MessageBoxMixin, Logger):
def on_error(self, exc_info): def on_error(self, exc_info):
e = exc_info[1] e = exc_info[1]
if isinstance(e, UserCancelled): if isinstance(e, (UserCancelled, concurrent.futures.CancelledError)):
pass pass
elif isinstance(e, UserFacingException): elif isinstance(e, UserFacingException):
self.show_error(str(e)) self.show_error(str(e))
@ -502,7 +513,6 @@ class ElectrumWindow(QMainWindow, MessageBoxMixin, Logger):
@profiler @profiler
def load_wallet(self, wallet: Abstract_Wallet): def load_wallet(self, wallet: Abstract_Wallet):
wallet.thread = TaskThread(self, self.on_error)
self.update_recently_visited(wallet.storage.path) self.update_recently_visited(wallet.storage.path)
if wallet.has_lightning(): if wallet.has_lightning():
util.trigger_callback('channels_updated', wallet) util.trigger_callback('channels_updated', wallet)
@ -1592,11 +1602,8 @@ class ElectrumWindow(QMainWindow, MessageBoxMixin, Logger):
if not self.question(msg): if not self.question(msg):
return return
self.save_pending_invoice() self.save_pending_invoice()
def task():
coro = self.wallet.lnworker.pay_invoice(invoice, amount_msat=amount_msat, attempts=LN_NUM_PAYMENT_ATTEMPTS) coro = self.wallet.lnworker.pay_invoice(invoice, amount_msat=amount_msat, attempts=LN_NUM_PAYMENT_ATTEMPTS)
fut = asyncio.run_coroutine_threadsafe(coro, self.network.asyncio_loop) self.run_coroutine_from_thread(coro)
return fut.result()
self.wallet.thread.add(task)
def on_request_status(self, wallet, key, status): def on_request_status(self, wallet, key, status):
if wallet != self.wallet: if wallet != self.wallet:
@ -2709,7 +2716,7 @@ class ElectrumWindow(QMainWindow, MessageBoxMixin, Logger):
# (signature) wrapped C/C++ object has been deleted # (signature) wrapped C/C++ object has been deleted
pass pass
self.wallet.thread.add(task, on_success=show_signed_message) self.thread.add(task, on_success=show_signed_message)
def do_verify(self, address, message, signature): def do_verify(self, address, message, signature):
address = address.text().strip() address = address.text().strip()
@ -2782,7 +2789,7 @@ class ElectrumWindow(QMainWindow, MessageBoxMixin, Logger):
# (message_e) wrapped C/C++ object has been deleted # (message_e) wrapped C/C++ object has been deleted
pass pass
self.wallet.thread.add(task, on_success=setText) self.thread.add(task, on_success=setText)
def do_encrypt(self, message_e, pubkey_e, encrypted_e): def do_encrypt(self, message_e, pubkey_e, encrypted_e):
message = message_e.toPlainText() message = message_e.toPlainText()
@ -3234,9 +3241,11 @@ class ElectrumWindow(QMainWindow, MessageBoxMixin, Logger):
if self._cleaned_up: if self._cleaned_up:
return return
self._cleaned_up = True self._cleaned_up = True
if self.wallet.thread: if self.thread:
self.wallet.thread.stop() self.thread.stop()
self.wallet.thread = None self.thread = None
for fut in self._coroutines_scheduled:
fut.cancel()
util.unregister_callback(self.on_network) util.unregister_callback(self.on_network)
self.config.set_key("is_maximized", self.isMaximized()) self.config.set_key("is_maximized", self.isMaximized())
if not self.isMaximized(): if not self.isMaximized():

38
electrum/gui/qt/util.py

@ -29,6 +29,7 @@ from PyQt5.QtWidgets import (QPushButton, QLabel, QMessageBox, QHBoxLayout,
from electrum.i18n import _, languages from electrum.i18n import _, languages
from electrum.util import FileImportFailed, FileExportFailed, make_aiohttp_session, resource_path from electrum.util import FileImportFailed, FileExportFailed, make_aiohttp_session, resource_path
from electrum.invoices import PR_UNPAID, PR_PAID, PR_EXPIRED, PR_INFLIGHT, PR_UNKNOWN, PR_FAILED, PR_ROUTING, PR_UNCONFIRMED from electrum.invoices import PR_UNPAID, PR_PAID, PR_EXPIRED, PR_INFLIGHT, PR_UNKNOWN, PR_FAILED, PR_ROUTING, PR_UNCONFIRMED
from electrum.logging import Logger
if TYPE_CHECKING: if TYPE_CHECKING:
from .main_window import ElectrumWindow from .main_window import ElectrumWindow
@ -902,7 +903,7 @@ class PasswordLineEdit(QLineEdit):
super().clear() super().clear()
class TaskThread(QThread): class TaskThread(QThread, Logger):
'''Thread that runs background tasks. Callbacks are guaranteed '''Thread that runs background tasks. Callbacks are guaranteed
to happen in the context of its parent.''' to happen in the context of its parent.'''
@ -911,24 +912,35 @@ class TaskThread(QThread):
cb_success: Optional[Callable] cb_success: Optional[Callable]
cb_done: Optional[Callable] cb_done: Optional[Callable]
cb_error: Optional[Callable] cb_error: Optional[Callable]
cancel: Optional[Callable] = None
doneSig = pyqtSignal(object, object, object) doneSig = pyqtSignal(object, object, object)
def __init__(self, parent, on_error=None): def __init__(self, parent, on_error=None):
super(TaskThread, self).__init__(parent) QThread.__init__(self, parent)
Logger.__init__(self)
self.on_error = on_error self.on_error = on_error
self.tasks = queue.Queue() self.tasks = queue.Queue()
self._cur_task = None # type: Optional[TaskThread.Task]
self._stopping = False
self.doneSig.connect(self.on_done) self.doneSig.connect(self.on_done)
self.start() self.start()
def add(self, task, on_success=None, on_done=None, on_error=None): def add(self, task, on_success=None, on_done=None, on_error=None, *, cancel=None):
if self._stopping:
self.logger.warning(f"stopping or already stopped but tried to add new task.")
return
on_error = on_error or self.on_error on_error = on_error or self.on_error
self.tasks.put(TaskThread.Task(task, on_success, on_done, on_error)) task_ = TaskThread.Task(task, on_success, on_done, on_error, cancel=cancel)
self.tasks.put(task_)
def run(self): def run(self):
while True: while True:
if self._stopping:
break
task = self.tasks.get() # type: TaskThread.Task task = self.tasks.get() # type: TaskThread.Task
if not task: self._cur_task = task
if not task or self._stopping:
break break
try: try:
result = task.task() result = task.task()
@ -944,7 +956,21 @@ class TaskThread(QThread):
cb_result(result) cb_result(result)
def stop(self): def stop(self):
self.tasks.put(None) self._stopping = True
# try to cancel currently running task now.
# if the task does not implement "cancel", we will have to wait until it finishes.
task = self._cur_task
if task and task.cancel:
task.cancel()
# cancel the remaining tasks in the queue
while True:
try:
task = self.tasks.get_nowait()
except queue.Empty:
break
if task and task.cancel:
task.cancel()
self.tasks.put(None) # in case the thread is still waiting on the queue
self.exit() self.exit()
self.wait() self.wait()

5
electrum/lnworker.py

@ -1106,7 +1106,9 @@ class LNWallet(LNWorker):
self.save_payment_info(info) self.save_payment_info(info)
self.wallet.set_label(key, lnaddr.get_description()) self.wallet.set_label(key, lnaddr.get_description())
self.logger.info(f"pay_invoice starting session for RHASH={payment_hash.hex()}")
self.set_invoice_status(key, PR_INFLIGHT) self.set_invoice_status(key, PR_INFLIGHT)
success = False
try: try:
await self.pay_to_node( await self.pay_to_node(
node_pubkey=invoice_pubkey, node_pubkey=invoice_pubkey,
@ -1121,8 +1123,9 @@ class LNWallet(LNWorker):
success = True success = True
except PaymentFailure as e: except PaymentFailure as e:
self.logger.info(f'payment failure: {e!r}') self.logger.info(f'payment failure: {e!r}')
success = False
reason = str(e) reason = str(e)
finally:
self.logger.info(f"pay_invoice ending session for RHASH={payment_hash.hex()}. {success=}")
if success: if success:
self.set_invoice_status(key, PR_PAID) self.set_invoice_status(key, PR_PAID)
util.trigger_callback('payment_succeeded', self.wallet, key) util.trigger_callback('payment_succeeded', self.wallet, key)

Loading…
Cancel
Save