diff --git a/electrum/lnutil.py b/electrum/lnutil.py index 2f40af4a7..3b1bec8da 100644 --- a/electrum/lnutil.py +++ b/electrum/lnutil.py @@ -617,6 +617,9 @@ class EncumberedTransaction(NamedTuple("EncumberedTransaction", [('name', str), d2['tx'] = Transaction(d['tx']) return EncumberedTransaction(**d2) + def __str__(self): + return super().__str__()[:-1] + ", txid: " + self.tx.txid() + ")" + NUM_MAX_HOPS_IN_PAYMENT_PATH = 20 NUM_MAX_EDGES_IN_PAYMENT_PATH = NUM_MAX_HOPS_IN_PAYMENT_PATH + 1 diff --git a/electrum/lnwatcher.py b/electrum/lnwatcher.py index 05fd85971..dd4f1d7a9 100644 --- a/electrum/lnwatcher.py +++ b/electrum/lnwatcher.py @@ -8,6 +8,7 @@ import os from collections import defaultdict import asyncio from enum import IntEnum, auto +from typing import NamedTuple, Dict import jsonrpclib @@ -20,6 +21,11 @@ from .address_synchronizer import AddressSynchronizer, TX_HEIGHT_LOCAL, TX_HEIGH if TYPE_CHECKING: from .network import Network +class ListenerItem(NamedTuple): + # this is triggered when the lnwatcher is all done with the outpoint used as index in LNWatcher.tx_progress + all_done : asyncio.Event + # txs we broadcast are put on this queue so that the test can wait for them to get mined + tx_queue : asyncio.Queue class TxMinedDepth(IntEnum): """ IntEnum because we call min() in get_deepest_tx_mined_depth_for_txids """ @@ -62,6 +68,9 @@ class LNWatcher(PrintError): watchtower_url = self.config.get('watchtower_url') self.watchtower = jsonrpclib.Server(watchtower_url) if watchtower_url else None self.watchtower_queue = asyncio.Queue() + # this maps funding_outpoints to ListenerItems, which have an event for when the watcher is done, + # and a queue for seeing which txs are being published + self.tx_progress = {} # type: Dict[str, ListenerItem] def with_watchtower(func): def wrapper(self, *args, **kwargs): @@ -151,8 +160,9 @@ class LNWatcher(PrintError): self.stop_and_delete(funding_outpoint) def stop_and_delete(self, funding_outpoint): + if funding_outpoint in self.tx_progress: + self.tx_progress[funding_outpoint].all_done.set() # TODO delete channel from watcher_db - pass async def inspect_tx_candidate(self, funding_outpoint, prev_tx): """Returns True iff found any not-deeply-spent outputs that we could @@ -164,6 +174,7 @@ class LNWatcher(PrintError): self.watch_address(o.address) not_yet_watching = True if not_yet_watching: + self.print_error('prev_tx', prev_tx, 'not yet watching') return True # get all possible responses we have prev_txid = prev_tx.txid() @@ -171,10 +182,12 @@ class LNWatcher(PrintError): encumbered_sweep_txns = self.sweepstore[funding_outpoint][prev_txid] if len(encumbered_sweep_txns) == 0: if self.get_tx_mined_depth(prev_txid) == TxMinedDepth.DEEP: + self.print_error(e_tx.name, 'have no follow-up transactions and prevtx mined deep, returning') return False # check if any response applies keep_watching_this = False local_height = self.network.get_local_height() + self.print_error(funding_outpoint, 'iterating over encumbered txs') for e_tx in list(encumbered_sweep_txns): conflicts = self.addr_sync.get_conflicting_transactions(e_tx.tx.txid(), e_tx.tx, include_self=True) conflict_mined_depth = self.get_deepest_tx_mined_depth_for_txids(conflicts) @@ -188,32 +201,46 @@ class LNWatcher(PrintError): broadcast = True if e_tx.cltv_expiry: if local_height > e_tx.cltv_expiry: - self.print_error('CLTV ({} > {}) fulfilled'.format(local_height, e_tx.cltv_expiry)) + self.print_error(e_tx.name, 'CLTV ({} > {}) fulfilled'.format(local_height, e_tx.cltv_expiry)) else: - self.print_error('waiting for {}: CLTV ({} > {}), funding outpoint {} and tx {}' + self.print_error(e_tx.name, 'waiting for {}: CLTV ({} > {}), funding outpoint {} and tx {}' .format(e_tx.name, local_height, e_tx.cltv_expiry, funding_outpoint[:8], prev_tx.txid()[:8])) broadcast = False if e_tx.csv_delay: if num_conf < e_tx.csv_delay: - self.print_error('waiting for {}: CSV ({} >= {}), funding outpoint {} and tx {}' + self.print_error(e_tx.name, 'waiting for {}: CSV ({} >= {}), funding outpoint {} and tx {}' .format(e_tx.name, num_conf, e_tx.csv_delay, funding_outpoint[:8], prev_tx.txid()[:8])) broadcast = False if broadcast: - if not await self.broadcast_or_log(e_tx): - self.print_error(f'encumbered tx: {str(e_tx)}, prev_txid: {prev_txid}') + if not await self.broadcast_or_log(funding_outpoint, e_tx): + self.print_error(e_tx.name, f'could not publish encumbered tx: {str(e_tx)}, prev_txid: {prev_txid}, prev_tx height:', tx_height, 'local_height', local_height) else: - # not mined or in mempool + self.print_error(e_tx.name, 'status', conflict_mined_depth, 'recursing...') + # mined or in mempool keep_watching_this |= await self.inspect_tx_candidate(funding_outpoint, e_tx.tx) return keep_watching_this - async def broadcast_or_log(self, e_tx): + async def broadcast_or_log(self, funding_outpoint, e_tx): + height = self.addr_sync.get_tx_height(e_tx.tx.txid()).height + if height != TX_HEIGHT_LOCAL: + return + try: + await self.network.get_transaction(e_tx.tx.txid()) + except: + pass + else: + self.print_error('already published, returning') + return try: txid = await self.network.broadcast_transaction(e_tx.tx) except Exception as e: self.print_error(f'broadcast: {e_tx.name}: failure: {repr(e)}') else: self.print_error(f'broadcast: {e_tx.name}: success. txid: {txid}') + if funding_outpoint in self.tx_progress: + await self.tx_progress[funding_outpoint].tx_queue.put(e_tx) + return txid @with_watchtower def add_sweep_tx(self, funding_outpoint: str, prev_txid: str, sweeptx):