From f733cb89475872d5d59188ed8fccfbaa3f35b0ec Mon Sep 17 00:00:00 2001 From: Janus Date: Mon, 27 Aug 2018 20:39:36 +0200 Subject: [PATCH] aiorpcx: socks support --- electrum/gui/kivy/uix/ui_screens/proxy.kv | 2 +- electrum/gui/qt/network_dialog.py | 2 +- electrum/interface.py | 47 +++++++++++++++++------ electrum/network.py | 39 +++++++++---------- electrum/util.py | 22 +++++++++++ 5 files changed, 77 insertions(+), 35 deletions(-) diff --git a/electrum/gui/kivy/uix/ui_screens/proxy.kv b/electrum/gui/kivy/uix/ui_screens/proxy.kv index 738a8ffd5..b7231855e 100644 --- a/electrum/gui/kivy/uix/ui_screens/proxy.kv +++ b/electrum/gui/kivy/uix/ui_screens/proxy.kv @@ -14,7 +14,7 @@ Popup: height: '48dp' size_hint_y: None text: app.proxy_config.get('mode', 'none') - values: ['none', 'socks4', 'socks5', 'http'] + values: ['none', 'socks4', 'socks5'] Label: text: _('Host') TextInput: diff --git a/electrum/gui/qt/network_dialog.py b/electrum/gui/qt/network_dialog.py index ecc7695b8..0acdbcd25 100644 --- a/electrum/gui/qt/network_dialog.py +++ b/electrum/gui/qt/network_dialog.py @@ -239,7 +239,7 @@ class NetworkChoiceLayout(object): self.proxy_cb.clicked.connect(self.set_proxy) self.proxy_mode = QComboBox() - self.proxy_mode.addItems(['SOCKS4', 'SOCKS5', 'HTTP']) + self.proxy_mode.addItems(['SOCKS4', 'SOCKS5']) self.proxy_host = QLineEdit() self.proxy_host.setFixedWidth(200) self.proxy_port = QLineEdit() diff --git a/electrum/interface.py b/electrum/interface.py index 563a43e52..5dd9c60e3 100644 --- a/electrum/interface.py +++ b/electrum/interface.py @@ -34,7 +34,7 @@ import asyncio import requests -from .util import PrintError +from .util import PrintError, aiosafe, bfh ca_path = requests.certs.where() @@ -42,16 +42,38 @@ from . import util from . import x509 from . import pem from .version import ELECTRUM_VERSION, PROTOCOL_VERSION +from .util import NotificationSession class Interface(PrintError): - def __init__(self, server, config_path, connecting): + def __init__(self, server, config_path, connecting, proxy): + self.exception = None self.connecting = connecting self.server = server self.host, self.port, self.protocol = self.server.split(':') + self.port = int(self.port) self.config_path = config_path self.cert_path = os.path.join(self.config_path, 'certs', self.host) self.fut = asyncio.get_event_loop().create_task(self.run()) + if proxy: + proxy['user'] = proxy.get('user', '') + if proxy['user'] == '': + proxy['user'] = 'sampleuser' # aiorpcx doesn't allow empty user + proxy['password'] = proxy.get('password', '') + if proxy['password'] == '': + proxy['password'] = 'samplepassword' + try: + auth = aiorpcx.socks.SOCKSUserAuth(proxy['user'], proxy['password']) + except KeyError: + auth = None + if proxy['mode'] == "socks4": + self.proxy = aiorpcx.socks.SOCKSProxy((proxy['host'], int(proxy['port'])), aiorpcx.socks.SOCKS4a, auth) + elif proxy['mode'] == "socks5": + self.proxy = aiorpcx.socks.SOCKSProxy((proxy['host'], int(proxy['port'])), aiorpcx.socks.SOCKS5, auth) + else: + raise NotImplementedError + else: + self.proxy = None def diagnostic_name(self): return self.host @@ -67,8 +89,8 @@ class Interface(PrintError): @util.aiosafe async def run(self): if self.protocol != 's': - await self.open_session(None, execute_after_connect=lambda: self.connecting.remove(self.server)) - return + await self.open_session(None, execute_after_connect=self.mark_ready) + assert False ca_sslc = ssl.create_default_context(ssl.Purpose.SERVER_AUTH) exists = os.path.exists(self.cert_path) @@ -84,8 +106,8 @@ class Interface(PrintError): x = x509.X509(b) try: x.check_date() - except x509.CertificateError: - self.print_error("certificate has expired:", self.cert_path) + except x509.CertificateError as e: + self.print_error("certificate problem", e) os.unlink(self.cert_path) exists = False if not exists: @@ -102,7 +124,11 @@ class Interface(PrintError): else: sslc = ssl.create_default_context(ssl.Purpose.SERVER_AUTH, cafile=self.cert_path) sslc.check_hostname = 0 - await self.open_session(sslc, execute_after_connect=lambda: self.connecting.remove(self.server)) + await self.open_session(sslc, execute_after_connect=self.mark_ready) + assert False + + def mark_ready(self): + self.connecting.remove(self.server) async def save_certificate(self): if not os.path.exists(self.cert_path): @@ -129,13 +155,13 @@ class Interface(PrintError): async def get_certificate(self): sslc = ssl.SSLContext() try: - async with aiorpcx.ClientSession(self.host, self.port, ssl=sslc) as session: + async with aiorpcx.ClientSession(self.host, self.port, ssl=sslc, proxy=self.proxy) as session: return session.transport._ssl_protocol._sslpipe._sslobj.getpeercert(True) except ValueError: return None async def open_session(self, sslc, do_sleep=True, execute_after_connect=lambda: None): - async with aiorpcx.ClientSession(self.host, self.port, ssl=sslc) as session: + async with NotificationSession(None, None, self.host, self.port, ssl=sslc, proxy=self.proxy) as session: ver = await session.send_request('server.version', [ELECTRUM_VERSION, PROTOCOL_VERSION]) print(ver) connect_hook_executed = False @@ -146,9 +172,6 @@ class Interface(PrintError): await asyncio.wait_for(session.send_request('server.ping'), 5) await asyncio.sleep(300) - def has_timed_out(self): - return self.fut.done() - def queue_request(self, method, params, msg_id): pass diff --git a/electrum/network.py b/electrum/network.py index 9e5b33d83..de1b7b8cf 100644 --- a/electrum/network.py +++ b/electrum/network.py @@ -36,7 +36,6 @@ import ipaddress import dns import dns.resolver -import socks from . import util from .util import print_error, PrintError @@ -434,6 +433,7 @@ class Network(PrintError): server = pick_random_server(self.get_servers(), self.protocol, exclude_set) if server: self.start_interface(server) + return server def start_interfaces(self): self.start_interface(self.default_server) @@ -444,22 +444,13 @@ class Network(PrintError): self.proxy = proxy # Store these somewhere so we can un-monkey-patch if not hasattr(socket, "_socketobject"): - socket._socketobject = socket.socket socket._getaddrinfo = socket.getaddrinfo if proxy: self.print_error('setting proxy', proxy) proxy_mode = proxy_modes.index(proxy["mode"]) + 1 - socks.setdefaultproxy(proxy_mode, - proxy["host"], - int(proxy["port"]), - # socks.py seems to want either None or a non-empty string - username=(proxy.get("user", "") or None), - password=(proxy.get("password", "") or None)) - socket.socket = socks.socksocket # prevent dns leaks, see http://stackoverflow.com/questions/13184205/dns-over-proxy socket.getaddrinfo = lambda *args: [(socket.AF_INET, socket.SOCK_STREAM, 6, '', (args[0], args[1]))] else: - socket.socket = socket._socketobject if sys.platform == 'win32': # On Windows, socket.getaddrinfo takes a mutex, and might hold it for up to 10 seconds # when dns-resolving. To speed it up drastically, we resolve dns ourselves, outside that lock. @@ -783,10 +774,10 @@ class Network(PrintError): if b.catch_up == server: b.catch_up = None - def new_interface(self, server): + async def new_interface(self, server): # todo: get tip first, then decide which checkpoint to use. self.add_recent_server(server) - interface = Interface(server, self.config.path, self.connecting) + interface = Interface(server, self.config.path, self.connecting, self.proxy) interface.blockchain = None interface.tip_header = None interface.tip = 0 @@ -1082,9 +1073,7 @@ class Network(PrintError): def _run(self): self.init_headers_file() - these = [self.maintain_sessions()] - these = [self.asyncio_loop.create_task(x) for x in these] - self.gat = asyncio.gather(*these) + self.gat = self.asyncio_loop.create_task(self.maintain_sessions()) try: self.asyncio_loop.run_until_complete(self.gat) except concurrent.futures.CancelledError: @@ -1339,17 +1328,25 @@ class Network(PrintError): while True: while self.socket_queue.qsize() > 0: server = self.socket_queue.get() - self.new_interface(server) + asyncio.get_event_loop().create_task(self.new_interface(server)) remove = [] for k, i in self.interfaces.items(): - if i.has_timed_out(): + if i.fut.done(): + if i.exception: + try: + raise i.exception + except BaseException as e: + self.print_error(i.server, "errored because", str(e), str(type(e))) + else: + assert False, "interface future should not finish without exception" remove.append(k) changed = False for k in remove: self.connection_down(k) changed = True - for i in range(self.num_server - len(self.interfaces)): - self.start_random_interface() - changed = True - if changed: self.notify('updated') + for i in range(self.num_server - len(self.interfaces) - len(self.connecting)): + if self.start_random_interface(): + changed = True + if changed: + self.notify('updated') await asyncio.sleep(1) diff --git a/electrum/util.py b/electrum/util.py index 4a3629cd4..ec4d44342 100644 --- a/electrum/util.py +++ b/electrum/util.py @@ -38,6 +38,7 @@ from locale import localeconv from .i18n import _ +from aiorpcx import ClientSession, Notification import urllib.request, urllib.parse, urllib.error import queue @@ -947,3 +948,24 @@ VerifiedTxInfo = NamedTuple("VerifiedTxInfo", [("height", int), ("timestamp", int), ("txpos", int), ("header_hash", str)]) + +from .blockchain import deserialize_header + +class NotificationSession(ClientSession): + + def __init__(self, scripthash, header, *args, **kwargs): + super(NotificationSession, self).__init__(*args, **kwargs) + self.scripthash = scripthash + self.header = header + + @aiosafe + async def handle_request(self, request): + if isinstance(request, Notification): + if request.method == 'blockchain.scripthash.subscribe' and self.scripthash is not None: + args = request.args + await self.scripthash.put((args[0], args[1])) + elif request.method == 'blockchain.headers.subscribe' and self.header is not None: + deser = deserialize_header(bfh(request.args[0]['hex']), request.args[0]['height']) + await self.header.put(deser) + else: + assert False, request.method