diff --git a/contrib/requirements/requirements.txt b/contrib/requirements/requirements.txt index 3860c9cd4..3536b894e 100644 --- a/contrib/requirements/requirements.txt +++ b/contrib/requirements/requirements.txt @@ -9,3 +9,5 @@ PySocks>=1.6.6 qdarkstyle<3.0 typing>=3.0.0 aiorpcx>=0.7.1 +aiohttp +aiohttp_socks diff --git a/electrum/base_crash_reporter.py b/electrum/base_crash_reporter.py index a5702e2e6..6cd94a49b 100644 --- a/electrum/base_crash_reporter.py +++ b/electrum/base_crash_reporter.py @@ -19,6 +19,7 @@ # ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN # CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. +import asyncio import json import locale import traceback @@ -26,14 +27,13 @@ import subprocess import sys import os -import requests - from .version import ELECTRUM_VERSION from .import constants from .i18n import _ +from .util import make_aiohttp_session -class BaseCrashReporter(object): +class BaseCrashReporter: report_server = "https://crashhub.electrum.org" config_key = "show_crash_reporter" issue_template = """

Traceback

@@ -60,16 +60,22 @@ class BaseCrashReporter(object): def __init__(self, exctype, value, tb): self.exc_args = (exctype, value, tb) - def send_report(self, endpoint="/crash"): + def send_report(self, asyncio_loop, proxy, endpoint="/crash"): if constants.net.GENESIS[-4:] not in ["4943", "e26f"] and ".electrum.org" in BaseCrashReporter.report_server: # Gah! Some kind of altcoin wants to send us crash reports. raise Exception(_("Missing report URL.")) report = self.get_traceback_info() report.update(self.get_additional_info()) report = json.dumps(report) - response = requests.post(BaseCrashReporter.report_server + endpoint, data=report) + coro = self.do_post(proxy, BaseCrashReporter.report_server + endpoint, data=report) + response = asyncio.run_coroutine_threadsafe(coro, asyncio_loop).result(1) return response + async def do_post(self, proxy, url, data): + async with make_aiohttp_session(proxy) as session: + async with session.post(url, data=data) as resp: + return await resp.text() + def get_traceback_info(self): exc_string = str(self.exc_args[1]) stack = traceback.extract_tb(self.exc_args[2]) @@ -125,4 +131,4 @@ class BaseCrashReporter(object): raise NotImplementedError def get_os_version(self): - raise NotImplementedError + raise NotImplementedError diff --git a/electrum/daemon.py b/electrum/daemon.py index 72d6ece8b..f2a9316e0 100644 --- a/electrum/daemon.py +++ b/electrum/daemon.py @@ -22,6 +22,7 @@ # ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN # CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. +import asyncio import ast import os import time @@ -126,10 +127,9 @@ class Daemon(DaemonThread): self.network = None else: self.network = Network(config) - self.network.start() self.fx = FxThread(config, self.network) - #if self.network: - # self.network.add_jobs([self.fx]) + if self.network: + self.network.start(self.fx.run()) self.gui = None self.wallets = {} # Setup JSONRPC server diff --git a/electrum/exchange_rate.py b/electrum/exchange_rate.py index 1d543a95a..e94c0468e 100644 --- a/electrum/exchange_rate.py +++ b/electrum/exchange_rate.py @@ -1,18 +1,21 @@ +import asyncio +import aiohttp +from aiohttp_socks import SocksConnector, SocksVer from datetime import datetime import inspect -import requests import sys import os import json -from threading import Thread import time import csv import decimal from decimal import Decimal +import concurrent.futures from .bitcoin import COIN from .i18n import _ -from .util import PrintError, ThreadJob, make_dir +from .util import PrintError, ThreadJob, make_dir, aiosafe +from .util import make_aiohttp_session # See https://en.wikipedia.org/wiki/ISO_4217 @@ -23,6 +26,7 @@ CCY_PRECISIONS = {'BHD': 3, 'BIF': 0, 'BYR': 0, 'CLF': 4, 'CLP': 0, 'RWF': 0, 'TND': 3, 'UGX': 0, 'UYI': 0, 'VND': 0, 'VUV': 0, 'XAF': 0, 'XAU': 4, 'XOF': 0, 'XPF': 0} +PROXY = None class ExchangeBase(PrintError): @@ -32,34 +36,41 @@ class ExchangeBase(PrintError): self.on_quotes = on_quotes self.on_history = on_history - def get_json(self, site, get_string): + async def get_raw(self, site, get_string): # APIs must have https url = ''.join(['https://', site, get_string]) - response = requests.request('GET', url, headers={'User-Agent' : 'Electrum'}, timeout=10) - return response.json() + async with make_aiohttp_session(PROXY) as session: + async with session.get(url) as response: + return await response.text() - def get_csv(self, site, get_string): + async def get_json(self, site, get_string): + # APIs must have https url = ''.join(['https://', site, get_string]) - response = requests.request('GET', url, headers={'User-Agent' : 'Electrum'}) - reader = csv.DictReader(response.content.decode().split('\n')) + async with make_aiohttp_session(PROXY) as session: + async with session.get(url) as response: + return await response.json() + + async def get_csv(self, site, get_string): + raw = await self.get_raw(site, get_string) + reader = csv.DictReader(raw.split('\n')) return list(reader) def name(self): return self.__class__.__name__ - def update_safe(self, ccy): + @aiosafe + async def update_safe(self, ccy): try: self.print_error("getting fx quotes for", ccy) - self.quotes = self.get_rates(ccy) + self.quotes = await self.get_rates(ccy) self.print_error("received fx quotes") except BaseException as e: self.print_error("failed fx quotes:", e) + self.quotes = {} self.on_quotes() def update(self, ccy): - t = Thread(target=self.update_safe, args=(ccy,)) - t.setDaemon(True) - t.start() + asyncio.get_event_loop().create_task(self.update_safe(ccy)) def read_historical_rates(self, ccy, cache_dir): filename = os.path.join(cache_dir, self.name() + '_'+ ccy) @@ -78,13 +89,16 @@ class ExchangeBase(PrintError): self.on_history() return h - def get_historical_rates_safe(self, ccy, cache_dir): + @aiosafe + async def get_historical_rates_safe(self, ccy, cache_dir): try: self.print_error("requesting fx history for", ccy) - h = self.request_history(ccy) + h = await self.request_history(ccy) self.print_error("received fx history for", ccy) except BaseException as e: self.print_error("failed fx history:", e) + import traceback + traceback.print_exc() return filename = os.path.join(cache_dir, self.name() + '_' + ccy) with open(filename, 'w', encoding='utf-8') as f: @@ -100,9 +114,7 @@ class ExchangeBase(PrintError): if h is None: h = self.read_historical_rates(ccy, cache_dir) if h is None or h['timestamp'] < time.time() - 24*3600: - t = Thread(target=self.get_historical_rates_safe, args=(ccy, cache_dir)) - t.setDaemon(True) - t.start() + asyncio.get_event_loop().create_task(self.get_historical_rates_safe(ccy, cache_dir)) def history_ccys(self): return [] @@ -116,8 +128,8 @@ class ExchangeBase(PrintError): class BitcoinAverage(ExchangeBase): - def get_rates(self, ccy): - json = self.get_json('apiv2.bitcoinaverage.com', '/indices/global/ticker/short') + async def get_rates(self, ccy): + json = await self.get_json('apiv2.bitcoinaverage.com', '/indices/global/ticker/short') return dict([(r.replace("BTC", ""), Decimal(json[r]['last'])) for r in json if r != 'timestamp']) @@ -126,8 +138,8 @@ class BitcoinAverage(ExchangeBase): 'MXN', 'NOK', 'NZD', 'PLN', 'RON', 'RUB', 'SEK', 'SGD', 'USD', 'ZAR'] - def request_history(self, ccy): - history = self.get_csv('apiv2.bitcoinaverage.com', + async def request_history(self, ccy): + history = await self.get_csv('apiv2.bitcoinaverage.com', "/indices/global/history/BTC%s?period=alltime&format=csv" % ccy) return dict([(h['DateTime'][:10], h['Average']) for h in history]) @@ -135,8 +147,8 @@ class BitcoinAverage(ExchangeBase): class Bitcointoyou(ExchangeBase): - def get_rates(self, ccy): - json = self.get_json('bitcointoyou.com', "/API/ticker.aspx") + async def get_rates(self, ccy): + json = await self.get_json('bitcointoyou.com', "/API/ticker.aspx") return {'BRL': Decimal(json['ticker']['last'])} def history_ccys(self): @@ -145,8 +157,8 @@ class Bitcointoyou(ExchangeBase): class BitcoinVenezuela(ExchangeBase): - def get_rates(self, ccy): - json = self.get_json('api.bitcoinvenezuela.com', '/') + async def get_rates(self, ccy): + json = await self.get_json('api.bitcoinvenezuela.com', '/') rates = [(r, json['BTC'][r]) for r in json['BTC'] if json['BTC'][r] is not None] # Giving NULL for LTC return dict(rates) @@ -154,85 +166,86 @@ class BitcoinVenezuela(ExchangeBase): def history_ccys(self): return ['ARS', 'EUR', 'USD', 'VEF'] - def request_history(self, ccy): - return self.get_json('api.bitcoinvenezuela.com', - "/historical/index.php?coin=BTC")[ccy +'_BTC'] + async def request_history(self, ccy): + json = await self.get_json('api.bitcoinvenezuela.com', + "/historical/index.php?coin=BTC") + return json[ccy +'_BTC'] class Bitbank(ExchangeBase): - def get_rates(self, ccy): - json = self.get_json('public.bitbank.cc', '/btc_jpy/ticker') + async def get_rates(self, ccy): + json = await self.get_json('public.bitbank.cc', '/btc_jpy/ticker') return {'JPY': Decimal(json['data']['last'])} class BitFlyer(ExchangeBase): - def get_rates(self, ccy): - json = self.get_json('bitflyer.jp', '/api/echo/price') + async def get_rates(self, ccy): + json = await self.get_json('bitflyer.jp', '/api/echo/price') return {'JPY': Decimal(json['mid'])} class Bitmarket(ExchangeBase): - def get_rates(self, ccy): - json = self.get_json('www.bitmarket.pl', '/json/BTCPLN/ticker.json') + async def get_rates(self, ccy): + json = await self.get_json('www.bitmarket.pl', '/json/BTCPLN/ticker.json') return {'PLN': Decimal(json['last'])} class BitPay(ExchangeBase): - def get_rates(self, ccy): - json = self.get_json('bitpay.com', '/api/rates') + async def get_rates(self, ccy): + json = await self.get_json('bitpay.com', '/api/rates') return dict([(r['code'], Decimal(r['rate'])) for r in json]) class Bitso(ExchangeBase): - def get_rates(self, ccy): - json = self.get_json('api.bitso.com', '/v2/ticker') + async def get_rates(self, ccy): + json = await self.get_json('api.bitso.com', '/v2/ticker') return {'MXN': Decimal(json['last'])} class BitStamp(ExchangeBase): - def get_rates(self, ccy): - json = self.get_json('www.bitstamp.net', '/api/ticker/') + async def get_rates(self, ccy): + json = await self.get_json('www.bitstamp.net', '/api/ticker/') return {'USD': Decimal(json['last'])} class Bitvalor(ExchangeBase): - def get_rates(self,ccy): - json = self.get_json('api.bitvalor.com', '/v1/ticker.json') + async def get_rates(self,ccy): + json = await self.get_json('api.bitvalor.com', '/v1/ticker.json') return {'BRL': Decimal(json['ticker_1h']['total']['last'])} class BlockchainInfo(ExchangeBase): - def get_rates(self, ccy): - json = self.get_json('blockchain.info', '/ticker') + async def get_rates(self, ccy): + json = await self.get_json('blockchain.info', '/ticker') return dict([(r, Decimal(json[r]['15m'])) for r in json]) class BTCChina(ExchangeBase): - def get_rates(self, ccy): - json = self.get_json('data.btcchina.com', '/data/ticker') + async def get_rates(self, ccy): + json = await self.get_json('data.btcchina.com', '/data/ticker') return {'CNY': Decimal(json['ticker']['last'])} class BTCParalelo(ExchangeBase): - def get_rates(self, ccy): - json = self.get_json('btcparalelo.com', '/api/price') + async def get_rates(self, ccy): + json = await self.get_json('btcparalelo.com', '/api/price') return {'VEF': Decimal(json['price'])} class Coinbase(ExchangeBase): - def get_rates(self, ccy): - json = self.get_json('coinbase.com', + async def get_rates(self, ccy): + json = await self.get_json('coinbase.com', '/api/v1/currencies/exchange_rates') return dict([(r[7:].upper(), Decimal(json[r])) for r in json if r.startswith('btc_to_')]) @@ -240,13 +253,13 @@ class Coinbase(ExchangeBase): class CoinDesk(ExchangeBase): - def get_currencies(self): - dicts = self.get_json('api.coindesk.com', + async def get_currencies(self): + dicts = await self.get_json('api.coindesk.com', '/v1/bpi/supported-currencies.json') return [d['currency'] for d in dicts] - def get_rates(self, ccy): - json = self.get_json('api.coindesk.com', + async def get_rates(self, ccy): + json = await self.get_json('api.coindesk.com', '/v1/bpi/currentprice/%s.json' % ccy) result = {ccy: Decimal(json['bpi'][ccy]['rate_float'])} return result @@ -257,35 +270,35 @@ class CoinDesk(ExchangeBase): def history_ccys(self): return self.history_starts().keys() - def request_history(self, ccy): + async def request_history(self, ccy): start = self.history_starts()[ccy] end = datetime.today().strftime('%Y-%m-%d') # Note ?currency and ?index don't work as documented. Sigh. query = ('/v1/bpi/historical/close.json?start=%s&end=%s' % (start, end)) - json = self.get_json('api.coindesk.com', query) + json = await self.get_json('api.coindesk.com', query) return json['bpi'] class Coinsecure(ExchangeBase): - def get_rates(self, ccy): - json = self.get_json('api.coinsecure.in', '/v0/noauth/newticker') + async def get_rates(self, ccy): + json = await self.get_json('api.coinsecure.in', '/v0/noauth/newticker') return {'INR': Decimal(json['lastprice'] / 100.0 )} class Foxbit(ExchangeBase): - def get_rates(self,ccy): - json = self.get_json('api.bitvalor.com', '/v1/ticker.json') + async def get_rates(self,ccy): + json = await self.get_json('api.bitvalor.com', '/v1/ticker.json') return {'BRL': Decimal(json['ticker_1h']['exchanges']['FOX']['last'])} class itBit(ExchangeBase): - def get_rates(self, ccy): + async def get_rates(self, ccy): ccys = ['USD', 'EUR', 'SGD'] - json = self.get_json('api.itbit.com', '/v1/markets/XBT%s/ticker' % ccy) + json = await self.get_json('api.itbit.com', '/v1/markets/XBT%s/ticker' % ccy) result = dict.fromkeys(ccys) if ccy in ccys: result[ccy] = Decimal(json['lastPrice']) @@ -294,10 +307,10 @@ class itBit(ExchangeBase): class Kraken(ExchangeBase): - def get_rates(self, ccy): + async def get_rates(self, ccy): ccys = ['EUR', 'USD', 'CAD', 'GBP', 'JPY'] pairs = ['XBT%s' % c for c in ccys] - json = self.get_json('api.kraken.com', + json = await self.get_json('api.kraken.com', '/0/public/Ticker?pair=%s' % ','.join(pairs)) return dict((k[-3:], Decimal(float(v['c'][0]))) for k, v in json['result'].items()) @@ -305,45 +318,45 @@ class Kraken(ExchangeBase): class LocalBitcoins(ExchangeBase): - def get_rates(self, ccy): - json = self.get_json('localbitcoins.com', + async def get_rates(self, ccy): + json = await self.get_json('localbitcoins.com', '/bitcoinaverage/ticker-all-currencies/') return dict([(r, Decimal(json[r]['rates']['last'])) for r in json]) class MercadoBitcoin(ExchangeBase): - def get_rates(self, ccy): - json = self.get_json('api.bitvalor.com', '/v1/ticker.json') + async def get_rates(self, ccy): + json = await self.get_json('api.bitvalor.com', '/v1/ticker.json') return {'BRL': Decimal(json['ticker_1h']['exchanges']['MBT']['last'])} class NegocieCoins(ExchangeBase): - def get_rates(self,ccy): - json = self.get_json('api.bitvalor.com', '/v1/ticker.json') + async def get_rates(self,ccy): + json = await self.get_json('api.bitvalor.com', '/v1/ticker.json') return {'BRL': Decimal(json['ticker_1h']['exchanges']['NEG']['last'])} class TheRockTrading(ExchangeBase): - def get_rates(self, ccy): - json = self.get_json('api.therocktrading.com', + async def get_rates(self, ccy): + json = await self.get_json('api.therocktrading.com', '/v1/funds/BTCEUR/ticker') return {'EUR': Decimal(json['last'])} class Unocoin(ExchangeBase): - def get_rates(self, ccy): - json = self.get_json('www.unocoin.com', 'trade?buy') + async def get_rates(self, ccy): + json = await self.get_json('www.unocoin.com', 'trade?buy') return {'INR': Decimal(json)} class WEX(ExchangeBase): - def get_rates(self, ccy): - json_eur = self.get_json('wex.nz', '/api/3/ticker/btc_eur') - json_rub = self.get_json('wex.nz', '/api/3/ticker/btc_rur') - json_usd = self.get_json('wex.nz', '/api/3/ticker/btc_usd') + async def get_rates(self, ccy): + json_eur = await self.get_json('wex.nz', '/api/3/ticker/btc_eur') + json_rub = await self.get_json('wex.nz', '/api/3/ticker/btc_rur') + json_usd = await self.get_json('wex.nz', '/api/3/ticker/btc_usd') return {'EUR': Decimal(json_eur['btc_eur']['last']), 'RUB': Decimal(json_rub['btc_rur']['last']), 'USD': Decimal(json_usd['btc_usd']['last'])} @@ -351,15 +364,15 @@ class WEX(ExchangeBase): class Winkdex(ExchangeBase): - def get_rates(self, ccy): - json = self.get_json('winkdex.com', '/api/v0/price') + async def get_rates(self, ccy): + json = await self.get_json('winkdex.com', '/api/v0/price') return {'USD': Decimal(json['price'] / 100.0)} def history_ccys(self): return ['USD'] - def request_history(self, ccy): - json = self.get_json('winkdex.com', + async def request_history(self, ccy): + json = await self.get_json('winkdex.com', "/api/v0/series?start_time=1342915200") history = json['series'][0]['results'] return dict([(h['timestamp'][:10], h['price'] / 100.0) @@ -367,8 +380,8 @@ class Winkdex(ExchangeBase): class Zaif(ExchangeBase): - def get_rates(self, ccy): - json = self.get_json('api.zaif.jp', '/api/1/last_price/btc_jpy') + async def get_rates(self, ccy): + json = await self.get_json('api.zaif.jp', '/api/1/last_price/btc_jpy') return {'JPY': Decimal(json['last_price'])} @@ -381,7 +394,6 @@ def dictinvert(d): return inv def get_exchanges_and_currencies(): - import os, json path = os.path.join(os.path.dirname(__file__), 'currencies.json') try: with open(path, 'r', encoding='utf-8') as f: @@ -426,13 +438,22 @@ class FxThread(ThreadJob): def __init__(self, config, network): self.config = config self.network = network + self.network.register_callback(self.set_proxy, ['proxy_set']) self.ccy = self.get_currency() self.history_used_spot = False self.ccy_combo = None self.hist_checkbox = None self.cache_dir = os.path.join(config.path, 'cache') + self.trigger = asyncio.Event() + self.trigger.set() self.set_exchange(self.config_exchange()) make_dir(self.cache_dir) + self.set_proxy('bogus', self.network.proxy) + + def set_proxy(self, trigger_name, *args): + global PROXY + PROXY = args[0] + self.trigger.set() def get_currencies(self, h): d = get_exchanges_by_ccy(h) @@ -451,13 +472,18 @@ class FxThread(ThreadJob): rounded_amount = amount return fmt_str.format(rounded_amount) - def run(self): - # This runs from the plugins thread which catches exceptions - if self.is_enabled(): - if self.timeout ==0 and self.show_history(): - self.exchange.get_historical_rates(self.ccy, self.cache_dir) - if self.timeout <= time.time(): - self.timeout = time.time() + 150 + async def run(self): + while True: + try: + await asyncio.wait_for(self.trigger.wait(), 150) + except concurrent.futures.TimeoutError: + pass + else: + self.trigger.clear() + if self.is_enabled(): + if self.show_history(): + self.exchange.get_historical_rates(self.ccy, self.cache_dir) + if self.is_enabled(): self.exchange.update(self.ccy) def is_enabled(self): @@ -497,7 +523,7 @@ class FxThread(ThreadJob): def set_currency(self, ccy): self.ccy = ccy self.config.set_key('currency', ccy, True) - self.timeout = 0 # Because self.ccy changes + self.trigger.set() # Because self.ccy changes self.on_quotes() def set_exchange(self, name): @@ -508,7 +534,7 @@ class FxThread(ThreadJob): self.exchange = class_(self.on_quotes, self.on_history) # A new exchange means new fx quotes, initially empty. Force # a quote refresh - self.timeout = 0 + self.trigger.set() self.exchange.read_historical_rates(self.ccy, self.cache_dir) def on_quotes(self): diff --git a/electrum/gui/kivy/uix/dialogs/crash_reporter.py b/electrum/gui/kivy/uix/dialogs/crash_reporter.py index 04582b953..00e96f2f0 100644 --- a/electrum/gui/kivy/uix/dialogs/crash_reporter.py +++ b/electrum/gui/kivy/uix/dialogs/crash_reporter.py @@ -1,6 +1,7 @@ import sys +import json -import requests +from aiohttp.client_exceptions import ClientError from kivy import base, utils from kivy.clock import Clock from kivy.core.window import Window @@ -102,6 +103,11 @@ class CrashReporter(BaseCrashReporter, Factory.Popup): self.ids.crash_message.text = BaseCrashReporter.CRASH_MESSAGE self.ids.request_help_message.text = BaseCrashReporter.REQUEST_HELP_MESSAGE self.ids.describe_error_message.text = BaseCrashReporter.DESCRIBE_ERROR_MESSAGE + self.proxy = self.main_window.network.proxy + self.main_window.network.register_callback(self.set_proxy, ['proxy_set']) + + def set_proxy(self, evt, proxy): + self.proxy = proxy def show_contents(self): details = CrashReportDetails(self.get_report_string()) @@ -115,8 +121,8 @@ class CrashReporter(BaseCrashReporter, Factory.Popup): def send_report(self): try: - response = BaseCrashReporter.send_report(self, "/crash.json").json() - except requests.exceptions.RequestException: + response = json.loads(BaseCrashReporter.send_report(self, self.main_window.network.asyncio_loop, self.proxy, "/crash.json")) + except (ValueError, ClientError): self.show_popup(_('Unable to send report'), _("Please check your network connection.")) else: self.show_popup(_('Report sent'), response["text"]) diff --git a/electrum/gui/qt/exception_window.py b/electrum/gui/qt/exception_window.py index 1774a043f..c4e337f1a 100644 --- a/electrum/gui/qt/exception_window.py +++ b/electrum/gui/qt/exception_window.py @@ -41,6 +41,10 @@ class Exception_Window(BaseCrashReporter, QWidget, MessageBoxMixin): def __init__(self, main_window, exctype, value, tb): BaseCrashReporter.__init__(self, exctype, value, tb) self.main_window = main_window + + self.proxy = self.main_window.network.proxy + self.main_window.network.register_callback(self.set_proxy, ['proxy_set']) + QWidget.__init__(self) self.setWindowTitle('Electrum - ' + _('An Error Occurred')) self.setMinimumSize(600, 300) @@ -88,16 +92,19 @@ class Exception_Window(BaseCrashReporter, QWidget, MessageBoxMixin): self.setLayout(main_box) self.show() + def set_proxy(self, evt, proxy): + self.proxy = proxy + def send_report(self): try: - response = BaseCrashReporter.send_report(self) + response = BaseCrashReporter.send_report(self, self.main_window.network.asyncio_loop, self.proxy) except BaseException as e: traceback.print_exc(file=sys.stderr) self.main_window.show_critical(_('There was a problem with the automatic reporting:') + '\n' + str(e) + '\n' + _("Please report this issue manually.")) return - QMessageBox.about(self, _("Crash report"), response.text) + QMessageBox.about(self, _("Crash report"), response) self.close() def on_close(self): diff --git a/electrum/network.py b/electrum/network.py index f0aa4a61f..7d6e49504 100644 --- a/electrum/network.py +++ b/electrum/network.py @@ -110,7 +110,7 @@ def pick_random_server(hostmap = None, protocol = 's', exclude_set = set()): from .simple_config import SimpleConfig -proxy_modes = ['socks4', 'socks5', 'http'] +proxy_modes = ['socks4', 'socks5'] def serialize_proxy(p): @@ -437,6 +437,7 @@ class Network(PrintError): socket.getaddrinfo = self._fast_getaddrinfo else: socket.getaddrinfo = socket._getaddrinfo + self.trigger_callback('proxy_set', self.proxy) @staticmethod def _fast_getaddrinfo(host, *args, **kwargs): @@ -710,9 +711,13 @@ class Network(PrintError): with b.lock: b.update_size() - def _run(self): + def _run(self, fx): self.init_headers_file() - self.gat = self.asyncio_loop.create_task(self.maintain_sessions()) + jobs = [self.maintain_sessions()] + if fx: + jobs.append(fx) + jobs = [self.asyncio_loop.create_task(x) for x in jobs] + self.gat = asyncio.gather(*jobs) try: self.asyncio_loop.run_until_complete(self.gat) except concurrent.futures.CancelledError: @@ -789,8 +794,8 @@ class Network(PrintError): def max_checkpoint(cls): return max(0, len(constants.net.CHECKPOINTS) * 2016 - 1) - def start(self): - self.fut = threading.Thread(target=self._run) + def start(self, fx=None): + self.fut = threading.Thread(target=self._run, args=(fx,)) self.fut.start() def stop(self): diff --git a/electrum/plugins/labels/labels.py b/electrum/plugins/labels/labels.py index d5fa50b2c..cf0833c5d 100644 --- a/electrum/plugins/labels/labels.py +++ b/electrum/plugins/labels/labels.py @@ -1,6 +1,6 @@ +import asyncio import hashlib import requests -import threading import json import sys import traceback @@ -10,7 +10,7 @@ import base64 from electrum.plugin import BasePlugin, hook from electrum.crypto import aes_encrypt_with_iv, aes_decrypt_with_iv from electrum.i18n import _ - +from electrum.util import aiosafe, make_aiohttp_session class LabelsPlugin(BasePlugin): @@ -18,11 +18,11 @@ class LabelsPlugin(BasePlugin): BasePlugin.__init__(self, parent, config, name) self.target_host = 'labels.electrum.org' self.wallets = {} + self.proxy = None def encode(self, wallet, msg): password, iv, wallet_id = self.wallets[wallet] - encrypted = aes_encrypt_with_iv(password, iv, - msg.encode('utf8')) + encrypted = aes_encrypt_with_iv(password, iv, msg.encode('utf8')) return base64.b64encode(encrypted).decode() def decode(self, wallet, message): @@ -55,37 +55,27 @@ class LabelsPlugin(BasePlugin): "walletNonce": nonce, "externalId": self.encode(wallet, item), "encryptedLabel": self.encode(wallet, label)} - t = threading.Thread(target=self.do_request_safe, - args=["POST", "/label", False, bundle]) - t.setDaemon(True) - t.start() + asyncio.get_event_loop().create_task(self.do_post_safe("/label", False, bundle)) # Caller will write the wallet self.set_nonce(wallet, nonce + 1) - def do_request(self, method, url = "/labels", is_batch=False, data=None): + @aiosafe + async def do_post_safe(self, *args): + await self.do_post(*args) + + async def do_get(self, url = "/labels"): + url = 'https://' + self.target_host + url + async with make_aiohttp_session(self.proxy) as session: + async with session.get(url) as result: + return await result.json() + + async def do_post(self, url = "/labels", data=None): url = 'https://' + self.target_host + url - kwargs = {'headers': {}} - if method == 'GET' and data: - kwargs['params'] = data - elif method == 'POST' and data: - kwargs['data'] = json.dumps(data) - kwargs['headers']['Content-Type'] = 'application/json' - response = requests.request(method, url, **kwargs) - if response.status_code != 200: - raise Exception(response.status_code, response.text) - response = response.json() - if "error" in response: - raise Exception(response["error"]) - return response - - def do_request_safe(self, *args, **kwargs): - try: - self.do_request(*args, **kwargs) - except BaseException as e: - #traceback.print_exc(file=sys.stderr) - self.print_error('error doing request') - - def push_thread(self, wallet): + async with make_aiohttp_session(self.proxy) as session: + async with session.post(url, data=data) as result: + return await result.json() + + async def push_thread(self, wallet): wallet_data = self.wallets.get(wallet, None) if not wallet_data: raise Exception('Wallet {} not loaded'.format(wallet)) @@ -102,16 +92,16 @@ class LabelsPlugin(BasePlugin): continue bundle["labels"].append({'encryptedLabel': encoded_value, 'externalId': encoded_key}) - self.do_request("POST", "/labels", True, bundle) + await self.do_post("/labels", bundle) - def pull_thread(self, wallet, force): + async def pull_thread(self, wallet, force): wallet_data = self.wallets.get(wallet, None) if not wallet_data: raise Exception('Wallet {} not loaded'.format(wallet)) wallet_id = wallet_data[2] nonce = 1 if force else self.get_nonce(wallet) - 1 self.print_error("asking for labels since nonce", nonce) - response = self.do_request("GET", ("/labels/since/%d/for/%s" % (nonce, wallet_id) )) + response = await self.do_get("/labels/since/%d/for/%s" % (nonce, wallet_id)) if response["labels"] is None: self.print_error('no new labels') return @@ -140,12 +130,15 @@ class LabelsPlugin(BasePlugin): self.set_nonce(wallet, response["nonce"] + 1) self.on_pulled(wallet) - def pull_thread_safe(self, wallet, force): - try: - self.pull_thread(wallet, force) - except BaseException as e: - # traceback.print_exc(file=sys.stderr) - self.print_error('could not retrieve labels') + @aiosafe + async def pull_safe_thread(self, wallet, force): + await self.pull_thread(wallet, force) + + def pull(self, wallet, force): + return asyncio.run_coroutine_threadsafe(self.pull_thread(wallet, force), wallet.network.asyncio_loop).result() + + def push(self, wallet): + return asyncio.run_coroutine_threadsafe(self.push_thread(wallet), wallet.network.asyncio_loop).result() def start_wallet(self, wallet): nonce = self.get_nonce(wallet) @@ -159,9 +152,14 @@ class LabelsPlugin(BasePlugin): wallet_id = hashlib.sha256(mpk).hexdigest() self.wallets[wallet] = (password, iv, wallet_id) # If there is an auth token we can try to actually start syncing - t = threading.Thread(target=self.pull_thread_safe, args=(wallet, False)) - t.setDaemon(True) - t.start() + asyncio.get_event_loop().create_task(self.pull_safe_thread(wallet, False)) + self.proxy = wallet.network.proxy + wallet.network.register_callback(self.set_proxy, ['proxy_set']) def stop_wallet(self, wallet): + wallet.network.unregister_callback('proxy_set') self.wallets.pop(wallet, None) + + def set_proxy(self, evt_name, new_proxy): + self.proxy = new_proxy + self.print_error("proxy set") diff --git a/electrum/plugins/labels/qt.py b/electrum/plugins/labels/qt.py index df4ae55c2..2a66d98eb 100644 --- a/electrum/plugins/labels/qt.py +++ b/electrum/plugins/labels/qt.py @@ -38,11 +38,11 @@ class Plugin(LabelsPlugin): hbox = QHBoxLayout() hbox.addWidget(QLabel("Label sync options:")) upload = ThreadedButton("Force upload", - partial(self.push_thread, wallet), + partial(self.push, wallet), partial(self.done_processing_success, d), partial(self.done_processing_error, d)) download = ThreadedButton("Force download", - partial(self.pull_thread, wallet, True), + partial(self.pull, wallet, True), partial(self.done_processing_success, d), partial(self.done_processing_error, d)) vbox = QVBoxLayout() diff --git a/electrum/util.py b/electrum/util.py index 4a3629cd4..61f0b9f54 100644 --- a/electrum/util.py +++ b/electrum/util.py @@ -37,6 +37,8 @@ from locale import localeconv from .i18n import _ +import aiohttp +from aiohttp_socks import SocksConnector, SocksVer import urllib.request, urllib.parse, urllib.error import queue @@ -947,3 +949,17 @@ VerifiedTxInfo = NamedTuple("VerifiedTxInfo", [("height", int), ("timestamp", int), ("txpos", int), ("header_hash", str)]) + +def make_aiohttp_session(proxy): + if proxy: + connector = SocksConnector( + socks_ver=SocksVer.SOCKS5 if proxy['mode'] == 'socks5' else SocksVer.SOCKS4, + host=proxy['host'], + port=int(proxy['port']), + username=proxy.get('user', None), + password=proxy.get('password', None), + rdns=True + ) + return aiohttp.ClientSession(headers={'User-Agent' : 'Electrum'}, timeout=aiohttp.ClientTimeout(total=10), connector=connector) + else: + return aiohttp.ClientSession(headers={'User-Agent' : 'Electrum'}, timeout=aiohttp.ClientTimeout(total=10))