diff --git a/electrum/lnpeer.py b/electrum/lnpeer.py index 7157761a2..4479234ea 100644 --- a/electrum/lnpeer.py +++ b/electrum/lnpeer.py @@ -219,10 +219,7 @@ class Peer(Logger): if constants.net.rev_genesis_bytes() not in their_chains: raise GracefulDisconnect(f"no common chain found with remote. (they sent: {their_chains})") # all checks passed - if self.channel_db and isinstance(self.transport, LNTransport): - self.channel_db.add_recent_peer(self.transport.peer_addr) - for chan in self.channels.values(): - chan.add_or_update_peer_addr(self.transport.peer_addr) + self.lnworker.on_peer_successfully_established(self) self._received_init = True self.maybe_set_initialized() diff --git a/electrum/lntransport.py b/electrum/lntransport.py index c006f36ec..257f02b12 100644 --- a/electrum/lntransport.py +++ b/electrum/lntransport.py @@ -155,6 +155,8 @@ class LNTransportBase: class LNResponderTransport(LNTransportBase): + """Transport initiated by remote party.""" + def __init__(self, privkey: bytes, reader: StreamReader, writer: StreamWriter): LNTransportBase.__init__(self) self.reader = reader @@ -211,7 +213,9 @@ class LNResponderTransport(LNTransportBase): self.init_counters(ck) return rs + class LNTransport(LNTransportBase): + """Transport initiated by local party.""" def __init__(self, privkey: bytes, peer_addr: LNPeerAddr): LNTransportBase.__init__(self) diff --git a/electrum/lnworker.py b/electrum/lnworker.py index cc8e55bd9..6c1fb4b6f 100644 --- a/electrum/lnworker.py +++ b/electrum/lnworker.py @@ -77,9 +77,11 @@ SAVED_PR_STATUS = [PR_PAID, PR_UNPAID, PR_INFLIGHT] # status that are persisted NUM_PEERS_TARGET = 4 -PEER_RETRY_INTERVAL = 600 # seconds -PEER_RETRY_INTERVAL_FOR_CHANNELS = 30 # seconds -GRAPH_DOWNLOAD_SECONDS = 600 + +MAX_RETRY_DELAY_FOR_PEERS = 3600 # sec +INIT_RETRY_DELAY_FOR_PEERS = 600 # sec +MAX_RETRY_DELAY_FOR_CHANNEL_PEERS = 300 # sec +INIT_RETRY_DELAY_FOR_CHANNEL_PEERS = 4 # sec FALLBACK_NODE_LIST_TESTNET = ( LNPeerAddr(host='203.132.95.10', port=9735, pubkey=bfh('038863cf8ab91046230f561cd5b386cbff8309fa02e3f0c3ed161a3aeb64a643b9')), @@ -156,6 +158,8 @@ class LNWorker(Logger): self.features |= LnFeatures.VAR_ONION_OPT self.features |= LnFeatures.PAYMENT_SECRET_OPT + self._last_tried_peer = {} # type: Dict[LNPeerAddr, Tuple[float, int]] # LNPeerAddr -> (unix ts, num_attempts) + def channels_for_peer(self, node_id): return {} @@ -204,8 +208,7 @@ class LNWorker(Logger): continue peers = await self._get_next_peers_to_try() for peer in peers: - last_tried = self._last_tried_peer.get(peer, 0) - if last_tried + PEER_RETRY_INTERVAL < now: + if self._can_retry_peer(peer, now=now): await self._add_peer(peer.host, peer.port, peer.pubkey) async def _add_peer(self, host, port, node_id) -> Peer: @@ -214,7 +217,8 @@ class LNWorker(Logger): port = int(port) peer_addr = LNPeerAddr(host, port, node_id) transport = LNTransport(self.node_keypair.privkey, peer_addr) - self._last_tried_peer[peer_addr] = time.time() + last_time, num_attempts = self._last_tried_peer.get(peer_addr, (0, 0)) + self._last_tried_peer[peer_addr] = time.time(), num_attempts + 1 self.logger.info(f"adding peer {peer_addr}") peer = Peer(self, node_id, transport) await self.taskgroup.spawn(peer.main_loop()) @@ -233,7 +237,6 @@ class LNWorker(Logger): self.network = network self.config = network.config self.channel_db = self.network.channel_db - self._last_tried_peer = {} # type: Dict[LNPeerAddr, float] # LNPeerAddr -> unix timestamp self._add_peers_from_config() asyncio.run_coroutine_threadsafe(self.main_loop(), self.network.asyncio_loop) @@ -259,20 +262,43 @@ class LNWorker(Logger): #self.logger.info(f'is_good {peer.host}') return True + def on_peer_successfully_established(self, peer: Peer) -> None: + if isinstance(peer.transport, LNTransport): + peer_addr = peer.transport.peer_addr + # reset connection attempt count + self._last_tried_peer[peer_addr] = time.time(), 0 + # add into channel db + if self.channel_db: + self.channel_db.add_recent_peer(peer_addr) + # save network address into channels we might have with peer + for chan in peer.channels.values(): + chan.add_or_update_peer_addr(peer_addr) + + def _can_retry_peer(self, peer: LNPeerAddr, *, + now: float = None, for_channel: bool = False) -> bool: + if now is None: + now = time.time() + last_time, num_attempts = self._last_tried_peer.get(peer, (0, 0)) + if for_channel: + delay = min(MAX_RETRY_DELAY_FOR_CHANNEL_PEERS, + INIT_RETRY_DELAY_FOR_CHANNEL_PEERS * 2 ** num_attempts) + else: + delay = min(MAX_RETRY_DELAY_FOR_PEERS, + INIT_RETRY_DELAY_FOR_PEERS * 2 ** num_attempts) + next_time = last_time + delay + return next_time < now + async def _get_next_peers_to_try(self) -> Sequence[LNPeerAddr]: now = time.time() await self.channel_db.data_loaded.wait() - recent_peers = self.channel_db.get_recent_peers() - # maintenance for last tried times - # due to this, below we can just test membership in _last_tried_peer - for peer in list(self._last_tried_peer): - if now >= self._last_tried_peer[peer] + PEER_RETRY_INTERVAL: - del self._last_tried_peer[peer] # first try from recent peers + recent_peers = self.channel_db.get_recent_peers() for peer in recent_peers: + if not peer: + continue if peer.pubkey in self.peers: continue - if peer in self._last_tried_peer: + if not self._can_retry_peer(peer, now=now): continue if not self.is_good_peer(peer): continue @@ -289,7 +315,7 @@ class LNWorker(Logger): peer = LNPeerAddr(host, port, node_id) except ValueError: continue - if peer in self._last_tried_peer: + if not self._can_retry_peer(peer, now=now): continue if not self.is_good_peer(peer): continue @@ -304,7 +330,7 @@ class LNWorker(Logger): else: return [] # regtest?? - fallback_list = [peer for peer in fallback_list if peer not in self._last_tried_peer] + fallback_list = [peer for peer in fallback_list if self._can_retry_peer(peer, now=now)] if fallback_list: return [random.choice(fallback_list)] @@ -1269,18 +1295,10 @@ class LNWallet(LNWorker): peer_addresses.append(LNPeerAddr(host, port, chan.node_id)) # will try addresses stored in channel storage peer_addresses += list(chan.get_peer_addresses()) + # Done gathering addresses. # Now select first one that has not failed recently. - # Use long retry interval to check. This ensures each address we gathered gets a chance. - for peer in peer_addresses: - last_tried = self._last_tried_peer.get(peer, 0) - if last_tried + PEER_RETRY_INTERVAL < now: - await self._add_peer(peer.host, peer.port, peer.pubkey) - return - # Still here? That means all addresses failed ~recently. - # Use short retry interval now. for peer in peer_addresses: - last_tried = self._last_tried_peer.get(peer, 0) - if last_tried + PEER_RETRY_INTERVAL_FOR_CHANNELS < now: + if self._can_retry_peer(peer, for_channel=True, now=now): await self._add_peer(peer.host, peer.port, peer.pubkey) return