From a05ef140d6542aed4f19ebe305d994dc6f8d7d94 Mon Sep 17 00:00:00 2001 From: SomberNight Date: Mon, 4 Apr 2022 15:47:56 +0200 Subject: [PATCH 1/3] address_sync: split off unconfirmed_tx from unverified_tx --- electrum/address_synchronizer.py | 48 ++++++++++++++++++++++---------- electrum/verifier.py | 3 +- 2 files changed, 36 insertions(+), 15 deletions(-) diff --git a/electrum/address_synchronizer.py b/electrum/address_synchronizer.py index 607aa9fcb..8782557f9 100644 --- a/electrum/address_synchronizer.py +++ b/electrum/address_synchronizer.py @@ -85,8 +85,10 @@ class AddressSynchronizer(Logger): self.lock = threading.RLock() self.transaction_lock = threading.RLock() self.future_tx = {} # type: Dict[str, int] # txid -> wanted height - # Transactions pending verification. txid -> tx_height. Access with self.lock. - self.unverified_tx = defaultdict(int) + # Txs the server claims are mined but still pending verification: + self.unverified_tx = defaultdict(int) # type: Dict[str, int] # txid -> height. Access with self.lock. + # Txs the server claims are in the mempool: + self.unconfirmed_tx = defaultdict(int) # type: Dict[str, int] # txid -> height. Access with self.lock. # true when synchronized self._up_to_date = False # thread local storage for caching stuff @@ -176,7 +178,7 @@ class AddressSynchronizer(Logger): hist = self.db.get_addr_history(addr) for tx_hash, tx_height in hist: # add it in case it was previously unconfirmed - self.add_unverified_tx(tx_hash, tx_height) + self.add_unverified_or_unconfirmed_tx(tx_hash, tx_height) def start_network(self, network: Optional['Network']) -> None: self.network = network @@ -379,6 +381,7 @@ class AddressSynchronizer(Logger): self.db.remove_tx_fee(tx_hash) self.db.remove_verified_tx(tx_hash) self.unverified_tx.pop(tx_hash, None) + self.unconfirmed_tx.pop(tx_hash, None) if tx: for idx, txo in enumerate(tx.outputs()): scripthash = bitcoin.script_to_scripthash(txo.scriptpubkey.hex()) @@ -396,7 +399,7 @@ class AddressSynchronizer(Logger): return children def receive_tx_callback(self, tx_hash: str, tx: Transaction, tx_height: int) -> None: - self.add_unverified_tx(tx_hash, tx_height) + self.add_unverified_or_unconfirmed_tx(tx_hash, tx_height) self.add_transaction(tx, allow_unrelated=True) def receive_history_callback(self, addr: str, hist, tx_fees: Dict[str, int]): @@ -406,6 +409,7 @@ class AddressSynchronizer(Logger): if (tx_hash, height) not in hist: # make tx local self.unverified_tx.pop(tx_hash, None) + self.unconfirmed_tx.pop(tx_hash, None) self.db.remove_verified_tx(tx_hash) if self.verifier: self.verifier.remove_spv_proof_for_tx(tx_hash) @@ -413,7 +417,7 @@ class AddressSynchronizer(Logger): for tx_hash, tx_height in hist: # add it in case it was previously unconfirmed - self.add_unverified_tx(tx_hash, tx_height) + self.add_unverified_or_unconfirmed_tx(tx_hash, tx_height) # if addr is new, we have to recompute txi and txo tx = self.db.get_transaction(tx_hash) if tx is None: @@ -459,17 +463,26 @@ class AddressSynchronizer(Logger): self._history_local.clear() self._get_addr_balance_cache = {} # invalidate cache - def get_txpos(self, tx_hash): + def get_txpos(self, tx_hash: str) -> Tuple[int, int]: """Returns (height, txpos) tuple, even if the tx is unverified.""" with self.lock: verified_tx_mined_info = self.db.get_verified_tx(tx_hash) if verified_tx_mined_info: - return verified_tx_mined_info.height, verified_tx_mined_info.txpos + height = verified_tx_mined_info.height + txpos = verified_tx_mined_info.txpos + assert height > 0, height + assert txpos is not None + return height, txpos elif tx_hash in self.unverified_tx: height = self.unverified_tx[tx_hash] - return (height, -1) if height > 0 else ((1e9 - height), -1) + assert height > 0, height + return height, -1 + elif tx_hash in self.unconfirmed_tx: + height = self.unconfirmed_tx[tx_hash] + assert height <= 0, height + return (10**9 - height), -1 else: - return (1e9+1, -1) + return (10**9 + 1), -1 def with_local_height_cached(func): # get local height only once, as it's relatively expensive. @@ -558,17 +571,21 @@ class AddressSynchronizer(Logger): assert self.is_mine(addr), "address needs to be is_mine to be watched" await self._address_history_changed_events[addr].wait() - def add_unverified_tx(self, tx_hash, tx_height): + def add_unverified_or_unconfirmed_tx(self, tx_hash, tx_height): if self.db.is_in_verified_tx(tx_hash): - if tx_height in (TX_HEIGHT_UNCONFIRMED, TX_HEIGHT_UNCONF_PARENT): + if tx_height <= 0: + # tx was previously SPV-verified but now in mempool (probably reorg) with self.lock: self.db.remove_verified_tx(tx_hash) + self.unconfirmed_tx[tx_hash] = tx_height if self.verifier: self.verifier.remove_spv_proof_for_tx(tx_hash) else: with self.lock: - # tx will be verified only if height > 0 - self.unverified_tx[tx_hash] = tx_height + if tx_height > 0: + self.unverified_tx[tx_hash] = tx_height + else: + self.unconfirmed_tx[tx_hash] = tx_height def remove_unverified_tx(self, tx_hash, tx_height): with self.lock: @@ -584,7 +601,7 @@ class AddressSynchronizer(Logger): tx_mined_status = self.get_tx_height(tx_hash) util.trigger_callback('verified', self, tx_hash, tx_mined_status) - def get_unverified_txs(self): + def get_unverified_txs(self) -> Dict[str, int]: '''Returns a map from tx hash to transaction height''' with self.lock: return dict(self.unverified_tx) # copy @@ -638,6 +655,9 @@ class AddressSynchronizer(Logger): elif tx_hash in self.unverified_tx: height = self.unverified_tx[tx_hash] return TxMinedInfo(height=height, conf=0) + elif tx_hash in self.unconfirmed_tx: + height = self.unconfirmed_tx[tx_hash] + return TxMinedInfo(height=height, conf=0) elif tx_hash in self.future_tx: num_blocks_remainining = self.future_tx[tx_hash] - self.get_local_height() if num_blocks_remainining > 0: diff --git a/electrum/verifier.py b/electrum/verifier.py index dec907a77..8b2ef24ef 100644 --- a/electrum/verifier.py +++ b/electrum/verifier.py @@ -187,7 +187,8 @@ class SPV(NetworkJobOnDefaultServer): self.requested_merkle.discard(tx_hash) def is_up_to_date(self): - return not self.requested_merkle + return (not self.requested_merkle + and not self.wallet.unverified_tx) def verify_tx_is_in_block(tx_hash: str, merkle_branch: Sequence[str], From 30650c524cefd979963f60cef7b5da3e81ab67bc Mon Sep 17 00:00:00 2001 From: SomberNight Date: Mon, 4 Apr 2022 19:24:29 +0200 Subject: [PATCH 2/3] address_sync: "up_to_date" now waits for SPV --- electrum/address_synchronizer.py | 7 ++++--- electrum/gui/stdio.py | 4 +++- electrum/gui/text.py | 6 +++--- electrum/synchronizer.py | 10 ++++++++-- electrum/wallet.py | 12 +++++++++--- 5 files changed, 27 insertions(+), 12 deletions(-) diff --git a/electrum/address_synchronizer.py b/electrum/address_synchronizer.py index 8782557f9..04cff9dfe 100644 --- a/electrum/address_synchronizer.py +++ b/electrum/address_synchronizer.py @@ -90,7 +90,7 @@ class AddressSynchronizer(Logger): # Txs the server claims are in the mempool: self.unconfirmed_tx = defaultdict(int) # type: Dict[str, int] # txid -> height. Access with self.lock. # true when synchronized - self._up_to_date = False + self._up_to_date = False # considers both Synchronizer and Verifier # thread local storage for caching stuff self.threadlocal_cache = threading.local() @@ -922,5 +922,6 @@ class AddressSynchronizer(Logger): c, u, x = self.get_addr_balance(address) return c+u+x == 0 - def synchronize(self): - pass + def synchronize(self) -> int: + """Returns the number of new addresses we generated.""" + return 0 diff --git a/electrum/gui/stdio.py b/electrum/gui/stdio.py index 80990da47..5b0030ce6 100644 --- a/electrum/gui/stdio.py +++ b/electrum/gui/stdio.py @@ -2,10 +2,12 @@ from decimal import Decimal import getpass import datetime import logging +from typing import Optional from electrum.gui import BaseElectrumGui from electrum import util from electrum import WalletStorage, Wallet +from electrum.wallet import Abstract_Wallet from electrum.wallet_db import WalletDB from electrum.util import format_satoshis from electrum.bitcoin import is_address, COIN @@ -41,7 +43,7 @@ class ElectrumGui(BaseElectrumGui): self.str_amount = "" self.str_fee = "" - self.wallet = Wallet(db, storage, config=config) + self.wallet = Wallet(db, storage, config=config) # type: Optional[Abstract_Wallet] self.wallet.start_network(self.network) self.contacts = self.wallet.contacts diff --git a/electrum/gui/text.py b/electrum/gui/text.py index 5814b8ef5..749661074 100644 --- a/electrum/gui/text.py +++ b/electrum/gui/text.py @@ -6,7 +6,7 @@ import locale from decimal import Decimal import getpass import logging -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Optional import electrum from electrum.gui import BaseElectrumGui @@ -14,7 +14,7 @@ from electrum import util from electrum.util import format_satoshis from electrum.bitcoin import is_address, COIN from electrum.transaction import PartialTxOutput -from electrum.wallet import Wallet +from electrum.wallet import Wallet, Abstract_Wallet from electrum.wallet_db import WalletDB from electrum.storage import WalletStorage from electrum.network import NetworkParameters, TxBroadcastError, BestEffortRequestFailed @@ -42,7 +42,7 @@ class ElectrumGui(BaseElectrumGui): password = getpass.getpass('Password:', stream=None) storage.decrypt(password) db = WalletDB(storage.read(), manual_upgrades=False) - self.wallet = Wallet(db, storage, config=config) + self.wallet = Wallet(db, storage, config=config) # type: Optional[Abstract_Wallet] self.wallet.start_network(self.network) self.contacts = self.wallet.contacts diff --git a/electrum/synchronizer.py b/electrum/synchronizer.py index dacfed265..14945a718 100644 --- a/electrum/synchronizer.py +++ b/electrum/synchronizer.py @@ -260,8 +260,14 @@ class Synchronizer(SynchronizerBase): # main loop while True: await asyncio.sleep(0.1) - await run_in_thread(self.wallet.synchronize) - up_to_date = self.is_up_to_date() + # note: we only generate new HD addresses if the existing ones + # have history that are mined and SPV-verified. This inherently couples + # the Sychronizer and the Verifier. + hist_done = self.is_up_to_date() + spv_done = self.wallet.verifier.is_up_to_date() if self.wallet.verifier else True + num_new_addrs = await run_in_thread(self.wallet.synchronize) + up_to_date = hist_done and spv_done and num_new_addrs == 0 + # see if status changed if (up_to_date != self.wallet.is_up_to_date() or up_to_date and self._processed_some_notifications): self._processed_some_notifications = False diff --git a/electrum/wallet.py b/electrum/wallet.py index 5a8edf2e8..aafdfa35e 100644 --- a/electrum/wallet.py +++ b/electrum/wallet.py @@ -3043,11 +3043,13 @@ class Deterministic_Wallet(Abstract_Wallet): self._not_old_change_addresses.append(address) return address - def synchronize_sequence(self, for_change): + def synchronize_sequence(self, for_change: bool) -> int: + count = 0 # num new addresses we generated limit = self.gap_limit_for_change if for_change else self.gap_limit while True: num_addr = self.db.num_change_addresses() if for_change else self.db.num_receiving_addresses() if num_addr < limit: + count += 1 self.create_new_address(for_change) continue if for_change: @@ -3055,15 +3057,19 @@ class Deterministic_Wallet(Abstract_Wallet): else: last_few_addresses = self.get_receiving_addresses(slice_start=-limit) if any(map(self.address_is_old, last_few_addresses)): + count += 1 self.create_new_address(for_change) else: break + return count @AddressSynchronizer.with_local_height_cached def synchronize(self): + count = 0 with self.lock: - self.synchronize_sequence(False) - self.synchronize_sequence(True) + count += self.synchronize_sequence(False) + count += self.synchronize_sequence(True) + return count def get_all_known_addresses_beyond_gap_limit(self): # note that we don't stop at first large gap From 428dff90f88e887a00ed0a2d2b06f3bde4ae68e8 Mon Sep 17 00:00:00 2001 From: SomberNight Date: Mon, 4 Apr 2022 20:38:16 +0200 Subject: [PATCH 3/3] address_sync: include verifier in sync_state progress indicator --- electrum/address_synchronizer.py | 22 +++++++++++++++++----- electrum/synchronizer.py | 11 ----------- electrum/util.py | 8 ++++++++ electrum/verifier.py | 4 ++++ 4 files changed, 29 insertions(+), 16 deletions(-) diff --git a/electrum/address_synchronizer.py b/electrum/address_synchronizer.py index 04cff9dfe..fe9a35d86 100644 --- a/electrum/address_synchronizer.py +++ b/electrum/address_synchronizer.py @@ -672,8 +672,14 @@ class AddressSynchronizer(Logger): with self.lock: status_changed = self._up_to_date != up_to_date self._up_to_date = up_to_date - if self.network: - self.network.notify('status') + # reset sync state progress indicator + if up_to_date: + if self.synchronizer: + self.synchronizer.reset_request_counters() + if self.verifier: + self.verifier.reset_request_counters() + # fire triggers + util.trigger_callback('status') if status_changed: self.logger.info(f'set_up_to_date: {up_to_date}') @@ -681,10 +687,16 @@ class AddressSynchronizer(Logger): return self._up_to_date def get_history_sync_state_details(self) -> Tuple[int, int]: + nsent, nans = 0, 0 if self.synchronizer: - return self.synchronizer.num_requests_sent_and_answered() - else: - return 0, 0 + n1, n2 = self.synchronizer.num_requests_sent_and_answered() + nsent += n1 + nans += n2 + if self.verifier: + n1, n2 = self.verifier.num_requests_sent_and_answered() + nsent += n1 + nans += n2 + return nsent, nans @with_transaction_lock def get_tx_delta(self, tx_hash: str, address: str) -> int: diff --git a/electrum/synchronizer.py b/electrum/synchronizer.py index 14945a718..f2ad4485b 100644 --- a/electrum/synchronizer.py +++ b/electrum/synchronizer.py @@ -60,7 +60,6 @@ class SynchronizerBase(NetworkJobOnDefaultServer): """ def __init__(self, network: 'Network'): self.asyncio_loop = network.asyncio_loop - self._reset_request_counters() NetworkJobOnDefaultServer.__init__(self, network) @@ -69,7 +68,6 @@ class SynchronizerBase(NetworkJobOnDefaultServer): self.requested_addrs = set() self.scripthash_to_address = {} self._processed_some_notifications = False # so that we don't miss them - self._reset_request_counters() # Queues self.add_queue = asyncio.Queue() self.status_queue = asyncio.Queue() @@ -85,10 +83,6 @@ class SynchronizerBase(NetworkJobOnDefaultServer): # we are being cancelled now self.session.unsubscribe(self.status_queue) - def _reset_request_counters(self): - self._requests_sent = 0 - self._requests_answered = 0 - def add(self, addr): asyncio.run_coroutine_threadsafe(self._add_address(addr), self.asyncio_loop) @@ -129,9 +123,6 @@ class SynchronizerBase(NetworkJobOnDefaultServer): await self.taskgroup.spawn(self._on_address_status, addr, status) self._processed_some_notifications = True - def num_requests_sent_and_answered(self) -> Tuple[int, int]: - return self._requests_sent, self._requests_answered - async def main(self): raise NotImplementedError() # implemented by subclasses @@ -271,8 +262,6 @@ class Synchronizer(SynchronizerBase): if (up_to_date != self.wallet.is_up_to_date() or up_to_date and self._processed_some_notifications): self._processed_some_notifications = False - if up_to_date: - self._reset_request_counters() self.wallet.set_up_to_date(up_to_date) util.trigger_callback('wallet_updated', self.wallet) diff --git a/electrum/util.py b/electrum/util.py index ee960c82e..19ebc462f 100644 --- a/electrum/util.py +++ b/electrum/util.py @@ -1326,6 +1326,7 @@ class NetworkJobOnDefaultServer(Logger, ABC): server connection changes. """ self.taskgroup = OldTaskGroup() + self.reset_request_counters() async def _start(self, interface: 'Interface'): self.interface = interface @@ -1357,6 +1358,13 @@ class NetworkJobOnDefaultServer(Logger, ABC): self._reset() await self._start(interface) + def reset_request_counters(self): + self._requests_sent = 0 + self._requests_answered = 0 + + def num_requests_sent_and_answered(self) -> Tuple[int, int]: + return self._requests_sent, self._requests_answered + @property def session(self): s = self.interface.session diff --git a/electrum/verifier.py b/electrum/verifier.py index 8b2ef24ef..73e7977e9 100644 --- a/electrum/verifier.py +++ b/electrum/verifier.py @@ -87,6 +87,7 @@ class SPV(NetworkJobOnDefaultServer): header = self.blockchain.read_header(tx_height) if header is None: if tx_height < constants.net.max_checkpoint(): + # FIXME these requests are not counted (self._requests_sent += 1) await self.taskgroup.spawn(self.interface.request_chunk(tx_height, None, can_return_early=True)) continue # request now @@ -96,6 +97,7 @@ class SPV(NetworkJobOnDefaultServer): async def _request_and_verify_single_proof(self, tx_hash, tx_height): try: + self._requests_sent += 1 async with self._network_request_semaphore: merkle = await self.interface.get_merkle_for_transaction(tx_hash, tx_height) except aiorpcx.jsonrpc.RPCError: @@ -103,6 +105,8 @@ class SPV(NetworkJobOnDefaultServer): self.wallet.remove_unverified_tx(tx_hash, tx_height) self.requested_merkle.discard(tx_hash) return + finally: + self._requests_answered += 1 # Verify the hash of the server-provided merkle branch to a # transaction matches the merkle root of its block if tx_height != merkle.get('block_height'):