From 223b62554ead397bb94013c0d9c95b63a0708ea6 Mon Sep 17 00:00:00 2001 From: SomberNight Date: Wed, 15 Apr 2020 21:41:33 +0200 Subject: [PATCH] lntransport: use network proxy if available fixes #4824 --- electrum/interface.py | 21 ++------------------ electrum/lnpeer.py | 3 ++- electrum/lntransport.py | 13 ++++++++++--- electrum/lnworker.py | 13 +++++++++++-- electrum/tests/test_lntransport.py | 2 +- electrum/util.py | 31 ++++++++++++++++++++++++++++++ 6 files changed, 57 insertions(+), 26 deletions(-) diff --git a/electrum/interface.py b/electrum/interface.py index 86fa5e0c8..26677aefe 100644 --- a/electrum/interface.py +++ b/electrum/interface.py @@ -43,7 +43,7 @@ from aiorpcx.jsonrpc import JSONRPC, CodeMessageError from aiorpcx.rawsocket import RSClient import certifi -from .util import ignore_exceptions, log_exceptions, bfh, SilentTaskGroup +from .util import ignore_exceptions, log_exceptions, bfh, SilentTaskGroup, MySocksProxy from . import util from . import x509 from . import pem @@ -277,7 +277,7 @@ class Interface(Logger): self.blockchain = None # type: Optional[Blockchain] self._requested_chunks = set() # type: Set[int] self.network = network - self._set_proxy(proxy) + self.proxy = MySocksProxy.from_proxy_dict(proxy) self.session = None # type: Optional[NotificationSession] self._ipaddr_bucket = None @@ -310,23 +310,6 @@ class Interface(Logger): def __str__(self): return f"" - def _set_proxy(self, proxy: dict): - if proxy: - username, pw = proxy.get('user'), proxy.get('password') - if not username or not pw: - auth = None - else: - auth = aiorpcx.socks.SOCKSUserAuth(username, pw) - addr = NetAddress(proxy['host'], proxy['port']) - if proxy['mode'] == "socks4": - self.proxy = aiorpcx.socks.SOCKSProxy(addr, aiorpcx.socks.SOCKS4a, auth) - elif proxy['mode'] == "socks5": - self.proxy = aiorpcx.socks.SOCKSProxy(addr, aiorpcx.socks.SOCKS5, auth) - else: - raise NotImplementedError # http proxy not available with aiorpcx - else: - self.proxy = None - async def is_server_ca_signed(self, ca_ssl_context): """Given a CA enforcing SSL context, returns True if the connection can be established. Returns False if the server has a self-signed diff --git a/electrum/lnpeer.py b/electrum/lnpeer.py index 4479234ea..e92a30cb6 100644 --- a/electrum/lnpeer.py +++ b/electrum/lnpeer.py @@ -251,7 +251,8 @@ class Peer(Logger): return await func(self, *args, **kwargs) except GracefulDisconnect as e: self.logger.log(e.log_level, f"Disconnecting: {repr(e)}") - except (LightningPeerConnectionClosed, IncompatibleLightningFeatures) as e: + except (LightningPeerConnectionClosed, IncompatibleLightningFeatures, + aiorpcx.socks.SOCKSError) as e: self.logger.info(f"Disconnecting: {repr(e)}") finally: self.close_and_cleanup() diff --git a/electrum/lntransport.py b/electrum/lntransport.py index 257f02b12..a28d2da24 100644 --- a/electrum/lntransport.py +++ b/electrum/lntransport.py @@ -8,12 +8,14 @@ import hashlib import asyncio from asyncio import StreamReader, StreamWriter +from typing import Optional from .crypto import sha256, hmac_oneshot, chacha20_poly1305_encrypt, chacha20_poly1305_decrypt from .lnutil import (get_ecdh, privkey_to_pubkey, LightningPeerConnectionClosed, HandshakeFailed, LNPeerAddr) from . import ecc -from .util import bh2u +from .util import bh2u, MySocksProxy + class HandshakeState(object): prologue = b"lightning" @@ -217,17 +219,22 @@ class LNResponderTransport(LNTransportBase): class LNTransport(LNTransportBase): """Transport initiated by local party.""" - def __init__(self, privkey: bytes, peer_addr: LNPeerAddr): + def __init__(self, privkey: bytes, peer_addr: LNPeerAddr, *, + proxy: Optional[dict]): LNTransportBase.__init__(self) assert type(privkey) is bytes and len(privkey) == 32 self.privkey = privkey self.peer_addr = peer_addr + self.proxy = MySocksProxy.from_proxy_dict(proxy) def name(self): return self.peer_addr.net_addr_str() async def handshake(self): - self.reader, self.writer = await asyncio.open_connection(self.peer_addr.host, self.peer_addr.port) + if not self.proxy: + self.reader, self.writer = await asyncio.open_connection(self.peer_addr.host, self.peer_addr.port) + else: + self.reader, self.writer = await self.proxy.open_connection(self.peer_addr.host, self.peer_addr.port) hs = HandshakeState(self.peer_addr.pubkey) # Get a new ephemeral key epriv, epub = create_ephemeral_key() diff --git a/electrum/lnworker.py b/electrum/lnworker.py index f8d863db0..49999b703 100644 --- a/electrum/lnworker.py +++ b/electrum/lnworker.py @@ -162,6 +162,8 @@ class LNWorker(Logger, NetworkRetryManager[LNPeerAddr]): self.features |= LnFeatures.VAR_ONION_OPT self.features |= LnFeatures.PAYMENT_SECRET_OPT + util.register_callback(self.on_proxy_changed, ['proxy_set']) + @property def peers(self) -> Mapping[bytes, Peer]: """Returns a read-only copy of peers.""" @@ -191,6 +193,7 @@ class LNWorker(Logger, NetworkRetryManager[LNPeerAddr]): await self.taskgroup.spawn(peer.main_loop()) try: # FIXME: server.close(), server.wait_closed(), etc... ? + # TODO: onion hidden service? server = await asyncio.start_server(cb, addr, int(port)) except OSError as e: self.logger.error(f"cannot listen for lightning p2p. error: {e!r}") @@ -224,7 +227,8 @@ class LNWorker(Logger, NetworkRetryManager[LNPeerAddr]): return self._peers[node_id] port = int(port) peer_addr = LNPeerAddr(host, port, node_id) - transport = LNTransport(self.node_keypair.privkey, peer_addr) + transport = LNTransport(self.node_keypair.privkey, peer_addr, + proxy=self.network.proxy) self._trying_addr_now(peer_addr) self.logger.info(f"adding peer {peer_addr}") peer = Peer(self, node_id, transport) @@ -381,6 +385,10 @@ class LNWorker(Logger, NetworkRetryManager[LNPeerAddr]): choice = random.choice(addr_list) return choice + def on_proxy_changed(self, event, *args): + for peer in self.peers.values(): + peer.close_and_cleanup() + class LNGossip(LNWorker): max_age = 14*24*3600 @@ -1415,7 +1423,8 @@ class LNBackups(Logger): async def request_force_close(self, channel_id): cb = self.channel_backups[channel_id].cb peer_addr = LNPeerAddr(cb.host, cb.port, cb.node_id) - transport = LNTransport(cb.privkey, peer_addr) + transport = LNTransport(cb.privkey, peer_addr, + proxy=self.network.proxy) peer = Peer(self, cb.node_id, transport) await self.taskgroup.spawn(peer._message_loop()) await peer.initialized diff --git a/electrum/tests/test_lntransport.py b/electrum/tests/test_lntransport.py index dff48ca5c..2dc1b4950 100644 --- a/electrum/tests/test_lntransport.py +++ b/electrum/tests/test_lntransport.py @@ -57,7 +57,7 @@ class TestLNTransport(ElectrumTestCase): server = server_future.result() # type: asyncio.Server async def connect(): peer_addr = LNPeerAddr('127.0.0.1', 42898, responder_key.get_public_key_bytes()) - t = LNTransport(initiator_key.get_secret_bytes(), peer_addr) + t = LNTransport(initiator_key.get_secret_bytes(), peer_addr, proxy=None) await t.handshake() t.send_bytes(b'hello from client') self.assertEqual(await t.read_messages().__anext__(), b'hello from server') diff --git a/electrum/util.py b/electrum/util.py index 8e7d441d2..6f998bb7f 100644 --- a/electrum/util.py +++ b/electrum/util.py @@ -46,6 +46,7 @@ import random import aiohttp from aiohttp_socks import ProxyConnector, ProxyType +import aiorpcx from aiorpcx import TaskGroup import certifi import dns.resolver @@ -1397,3 +1398,33 @@ class NetworkRetryManager(Generic[_NetAddrType]): def _clear_addr_retry_times(self) -> None: self._last_tried_addr.clear() + + +class MySocksProxy(aiorpcx.SOCKSProxy): + + async def open_connection(self, host=None, port=None, **kwargs): + loop = asyncio.get_event_loop() + reader = asyncio.StreamReader(loop=loop) + protocol = asyncio.StreamReaderProtocol(reader, loop=loop) + transport, _ = await self.create_connection( + lambda: protocol, host, port, **kwargs) + writer = asyncio.StreamWriter(transport, protocol, reader, loop) + return reader, writer + + @classmethod + def from_proxy_dict(cls, proxy: dict = None) -> Optional['MySocksProxy']: + if not proxy: + return None + username, pw = proxy.get('user'), proxy.get('password') + if not username or not pw: + auth = None + else: + auth = aiorpcx.socks.SOCKSUserAuth(username, pw) + addr = aiorpcx.NetAddress(proxy['host'], proxy['port']) + if proxy['mode'] == "socks4": + ret = cls(addr, aiorpcx.socks.SOCKS4a, auth) + elif proxy['mode'] == "socks5": + ret = cls(addr, aiorpcx.socks.SOCKS5, auth) + else: + raise NotImplementedError # http proxy not available with aiorpcx + return ret