diff --git a/electrum/interface.py b/electrum/interface.py index 522d291fb..fe27e7e32 100644 --- a/electrum/interface.py +++ b/electrum/interface.py @@ -953,6 +953,12 @@ class Interface(Logger): if height < prev_height: raise RequestCorrupted(f'heights of confirmed txs must be in increasing order') prev_height = height + hashes = set(map(lambda item: item['tx_hash'], res)) + if len(hashes) != len(res): + # Either server is sending garbage... or maybe if server is race-prone + # a recently mined tx could be included in both last block and mempool? + # Still, it's simplest to just disregard the response. + raise RequestCorrupted(f"server history has non-unique txids for sh={sh}") return res async def listunspent_for_scripthash(self, sh: str) -> List[dict]: diff --git a/electrum/synchronizer.py b/electrum/synchronizer.py index 7f31de859..b653b0721 100644 --- a/electrum/synchronizer.py +++ b/electrum/synchronizer.py @@ -24,7 +24,7 @@ # SOFTWARE. import asyncio import hashlib -from typing import Dict, List, TYPE_CHECKING, Tuple +from typing import Dict, List, TYPE_CHECKING, Tuple, Set from collections import defaultdict import logging @@ -35,7 +35,7 @@ from .transaction import Transaction, PartialTransaction from .util import bh2u, make_aiohttp_session, NetworkJobOnDefaultServer, random_shuffled_copy from .bitcoin import address_to_scripthash, is_address from .logging import Logger -from .interface import GracefulDisconnect +from .interface import GracefulDisconnect, NetworkTimeout if TYPE_CHECKING: from .network import Network @@ -153,6 +153,7 @@ class Synchronizer(SynchronizerBase): super()._reset() self.requested_tx = {} self.requested_histories = set() + self._stale_histories = dict() # type: Dict[str, asyncio.Task] def diagnostic_name(self): return self.wallet.diagnostic_name() @@ -160,34 +161,43 @@ class Synchronizer(SynchronizerBase): def is_up_to_date(self): return (not self.requested_addrs and not self.requested_histories - and not self.requested_tx) + and not self.requested_tx + and not self._stale_histories) async def _on_address_status(self, addr, status): history = self.wallet.db.get_addr_history(addr) if history_status(history) == status: return + # No point in requesting history twice for the same announced status. + # However if we got announced a new status, we should request history again: if (addr, status) in self.requested_histories: return # request address history self.requested_histories.add((addr, status)) + self._stale_histories.pop(addr, asyncio.Future()).cancel() h = address_to_scripthash(addr) self._requests_sent += 1 async with self._network_request_semaphore: result = await self.interface.get_history_for_scripthash(h) self._requests_answered += 1 self.logger.info(f"receiving history {addr} {len(result)}") - hashes = set(map(lambda item: item['tx_hash'], result)) hist = list(map(lambda item: (item['tx_hash'], item['height']), result)) # tx_fees tx_fees = [(item['tx_hash'], item.get('fee')) for item in result] tx_fees = dict(filter(lambda x:x[1] is not None, tx_fees)) - # Check that txids are unique - if len(hashes) != len(result): - self.logger.info(f"error: server history has non-unique txids: {addr}") # Check that the status corresponds to what was announced - elif history_status(hist) != status: - self.logger.info(f"error: status mismatch: {addr}") + if history_status(hist) != status: + # could happen naturally if history changed between getting status and history (race) + self.logger.info(f"error: status mismatch: {addr}. we'll wait a bit for status update.") + # The server is supposed to send a new status notification, which will trigger a new + # get_history. We shall wait a bit for this to happen, otherwise we disconnect. + async def disconnect_if_still_stale(): + timeout = self.network.get_network_timeout_seconds(NetworkTimeout.Generic) + await asyncio.sleep(timeout) + raise SynchronizerFailure(f"timeout reached waiting for addr {addr}: history still stale") + self._stale_histories[addr] = await self.taskgroup.spawn(disconnect_if_still_stale) else: + self._stale_histories.pop(addr, asyncio.Future()).cancel() # Store received history self.wallet.receive_history_callback(addr, hist, tx_fees) # Request transactions we don't have