diff --git a/electrum/lnworker.py b/electrum/lnworker.py index 38c21acce..f7e8fd3f2 100644 --- a/electrum/lnworker.py +++ b/electrum/lnworker.py @@ -43,7 +43,7 @@ from .util import ignore_exceptions, make_aiohttp_session, SilentTaskGroup from .util import timestamp_to_datetime, random_shuffled_copy from .util import MyEncoder, is_private_netaddress from .logging import Logger -from .lntransport import LNTransport, LNResponderTransport +from .lntransport import LNTransport, LNResponderTransport, LNTransportBase from .lnpeer import Peer, LN_P2P_NETWORK_TIMEOUT from .lnaddr import lnencode, LnAddr, lndecode from .ecc import der_sig_from_sig_string @@ -254,10 +254,7 @@ class LNWorker(Logger, NetworkRetryManager[LNPeerAddr]): except Exception as e: self.logger.info(f'handshake failure from incoming connection: {e!r}') return - peer = Peer(self, node_id, transport) - with self.lock: - self._peers[node_id] = peer - await self.taskgroup.spawn(peer.main_loop()) + await self._add_peer_from_transport(node_id=node_id, transport=transport) try: self.listen_server = await asyncio.start_server(cb, addr, netaddr.port) except OSError as e: @@ -303,10 +300,18 @@ class LNWorker(Logger, NetworkRetryManager[LNPeerAddr]): raise ErrorAddingPeer("cannot connect to self") transport = LNTransport(self.node_keypair.privkey, peer_addr, proxy=self.network.proxy) + peer = await self._add_peer_from_transport(node_id=node_id, transport=transport) + return peer + + async def _add_peer_from_transport(self, *, node_id: bytes, transport: LNTransportBase) -> Peer: peer = Peer(self, node_id, transport) - await self.taskgroup.spawn(peer.main_loop()) with self.lock: + existing_peer = self._peers.get(node_id) + if existing_peer: + existing_peer.close_and_cleanup() + assert node_id not in self._peers self._peers[node_id] = peer + await self.taskgroup.spawn(peer.main_loop()) return peer def peer_closed(self, peer: Peer) -> None: @@ -2165,6 +2170,7 @@ class LNWallet(LNWorker): except Exception as e: self.logger.info(f'failed to connect {host} {e}') continue + # TODO close/cleanup the transport else: raise Exception('failed to connect')