Browse Source

asyncio: labels, crash_reporter, fx: migrate requests use to aiohttp

3.3.3.1
Janus 6 years ago
parent
commit
6e80ba7b4f
  1. 2
      contrib/requirements/requirements.txt
  2. 18
      electrum/base_crash_reporter.py
  3. 6
      electrum/daemon.py
  4. 218
      electrum/exchange_rate.py
  5. 12
      electrum/gui/kivy/uix/dialogs/crash_reporter.py
  6. 11
      electrum/gui/qt/exception_window.py
  7. 15
      electrum/network.py
  8. 84
      electrum/plugins/labels/labels.py
  9. 4
      electrum/plugins/labels/qt.py
  10. 16
      electrum/util.py

2
contrib/requirements/requirements.txt

@ -9,3 +9,5 @@ PySocks>=1.6.6
qdarkstyle<3.0
typing>=3.0.0
aiorpcx>=0.7.1
aiohttp
aiohttp_socks

18
electrum/base_crash_reporter.py

@ -19,6 +19,7 @@
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import asyncio
import json
import locale
import traceback
@ -26,14 +27,13 @@ import subprocess
import sys
import os
import requests
from .version import ELECTRUM_VERSION
from .import constants
from .i18n import _
from .util import make_aiohttp_session
class BaseCrashReporter(object):
class BaseCrashReporter:
report_server = "https://crashhub.electrum.org"
config_key = "show_crash_reporter"
issue_template = """<h2>Traceback</h2>
@ -60,16 +60,22 @@ class BaseCrashReporter(object):
def __init__(self, exctype, value, tb):
self.exc_args = (exctype, value, tb)
def send_report(self, endpoint="/crash"):
def send_report(self, asyncio_loop, proxy, endpoint="/crash"):
if constants.net.GENESIS[-4:] not in ["4943", "e26f"] and ".electrum.org" in BaseCrashReporter.report_server:
# Gah! Some kind of altcoin wants to send us crash reports.
raise Exception(_("Missing report URL."))
report = self.get_traceback_info()
report.update(self.get_additional_info())
report = json.dumps(report)
response = requests.post(BaseCrashReporter.report_server + endpoint, data=report)
coro = self.do_post(proxy, BaseCrashReporter.report_server + endpoint, data=report)
response = asyncio.run_coroutine_threadsafe(coro, asyncio_loop).result(1)
return response
async def do_post(self, proxy, url, data):
async with make_aiohttp_session(proxy) as session:
async with session.post(url, data=data) as resp:
return await resp.text()
def get_traceback_info(self):
exc_string = str(self.exc_args[1])
stack = traceback.extract_tb(self.exc_args[2])
@ -125,4 +131,4 @@ class BaseCrashReporter(object):
raise NotImplementedError
def get_os_version(self):
raise NotImplementedError
raise NotImplementedError

6
electrum/daemon.py

@ -22,6 +22,7 @@
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import asyncio
import ast
import os
import time
@ -126,10 +127,9 @@ class Daemon(DaemonThread):
self.network = None
else:
self.network = Network(config)
self.network.start()
self.fx = FxThread(config, self.network)
#if self.network:
# self.network.add_jobs([self.fx])
if self.network:
self.network.start(self.fx.run())
self.gui = None
self.wallets = {}
# Setup JSONRPC server

218
electrum/exchange_rate.py

@ -1,18 +1,21 @@
import asyncio
import aiohttp
from aiohttp_socks import SocksConnector, SocksVer
from datetime import datetime
import inspect
import requests
import sys
import os
import json
from threading import Thread
import time
import csv
import decimal
from decimal import Decimal
import concurrent.futures
from .bitcoin import COIN
from .i18n import _
from .util import PrintError, ThreadJob, make_dir
from .util import PrintError, ThreadJob, make_dir, aiosafe
from .util import make_aiohttp_session
# See https://en.wikipedia.org/wiki/ISO_4217
@ -23,6 +26,7 @@ CCY_PRECISIONS = {'BHD': 3, 'BIF': 0, 'BYR': 0, 'CLF': 4, 'CLP': 0,
'RWF': 0, 'TND': 3, 'UGX': 0, 'UYI': 0, 'VND': 0,
'VUV': 0, 'XAF': 0, 'XAU': 4, 'XOF': 0, 'XPF': 0}
PROXY = None
class ExchangeBase(PrintError):
@ -32,34 +36,41 @@ class ExchangeBase(PrintError):
self.on_quotes = on_quotes
self.on_history = on_history
def get_json(self, site, get_string):
async def get_raw(self, site, get_string):
# APIs must have https
url = ''.join(['https://', site, get_string])
response = requests.request('GET', url, headers={'User-Agent' : 'Electrum'}, timeout=10)
return response.json()
async with make_aiohttp_session(PROXY) as session:
async with session.get(url) as response:
return await response.text()
def get_csv(self, site, get_string):
async def get_json(self, site, get_string):
# APIs must have https
url = ''.join(['https://', site, get_string])
response = requests.request('GET', url, headers={'User-Agent' : 'Electrum'})
reader = csv.DictReader(response.content.decode().split('\n'))
async with make_aiohttp_session(PROXY) as session:
async with session.get(url) as response:
return await response.json()
async def get_csv(self, site, get_string):
raw = await self.get_raw(site, get_string)
reader = csv.DictReader(raw.split('\n'))
return list(reader)
def name(self):
return self.__class__.__name__
def update_safe(self, ccy):
@aiosafe
async def update_safe(self, ccy):
try:
self.print_error("getting fx quotes for", ccy)
self.quotes = self.get_rates(ccy)
self.quotes = await self.get_rates(ccy)
self.print_error("received fx quotes")
except BaseException as e:
self.print_error("failed fx quotes:", e)
self.quotes = {}
self.on_quotes()
def update(self, ccy):
t = Thread(target=self.update_safe, args=(ccy,))
t.setDaemon(True)
t.start()
asyncio.get_event_loop().create_task(self.update_safe(ccy))
def read_historical_rates(self, ccy, cache_dir):
filename = os.path.join(cache_dir, self.name() + '_'+ ccy)
@ -78,13 +89,16 @@ class ExchangeBase(PrintError):
self.on_history()
return h
def get_historical_rates_safe(self, ccy, cache_dir):
@aiosafe
async def get_historical_rates_safe(self, ccy, cache_dir):
try:
self.print_error("requesting fx history for", ccy)
h = self.request_history(ccy)
h = await self.request_history(ccy)
self.print_error("received fx history for", ccy)
except BaseException as e:
self.print_error("failed fx history:", e)
import traceback
traceback.print_exc()
return
filename = os.path.join(cache_dir, self.name() + '_' + ccy)
with open(filename, 'w', encoding='utf-8') as f:
@ -100,9 +114,7 @@ class ExchangeBase(PrintError):
if h is None:
h = self.read_historical_rates(ccy, cache_dir)
if h is None or h['timestamp'] < time.time() - 24*3600:
t = Thread(target=self.get_historical_rates_safe, args=(ccy, cache_dir))
t.setDaemon(True)
t.start()
asyncio.get_event_loop().create_task(self.get_historical_rates_safe(ccy, cache_dir))
def history_ccys(self):
return []
@ -116,8 +128,8 @@ class ExchangeBase(PrintError):
class BitcoinAverage(ExchangeBase):
def get_rates(self, ccy):
json = self.get_json('apiv2.bitcoinaverage.com', '/indices/global/ticker/short')
async def get_rates(self, ccy):
json = await self.get_json('apiv2.bitcoinaverage.com', '/indices/global/ticker/short')
return dict([(r.replace("BTC", ""), Decimal(json[r]['last']))
for r in json if r != 'timestamp'])
@ -126,8 +138,8 @@ class BitcoinAverage(ExchangeBase):
'MXN', 'NOK', 'NZD', 'PLN', 'RON', 'RUB', 'SEK', 'SGD', 'USD',
'ZAR']
def request_history(self, ccy):
history = self.get_csv('apiv2.bitcoinaverage.com',
async def request_history(self, ccy):
history = await self.get_csv('apiv2.bitcoinaverage.com',
"/indices/global/history/BTC%s?period=alltime&format=csv" % ccy)
return dict([(h['DateTime'][:10], h['Average'])
for h in history])
@ -135,8 +147,8 @@ class BitcoinAverage(ExchangeBase):
class Bitcointoyou(ExchangeBase):
def get_rates(self, ccy):
json = self.get_json('bitcointoyou.com', "/API/ticker.aspx")
async def get_rates(self, ccy):
json = await self.get_json('bitcointoyou.com', "/API/ticker.aspx")
return {'BRL': Decimal(json['ticker']['last'])}
def history_ccys(self):
@ -145,8 +157,8 @@ class Bitcointoyou(ExchangeBase):
class BitcoinVenezuela(ExchangeBase):
def get_rates(self, ccy):
json = self.get_json('api.bitcoinvenezuela.com', '/')
async def get_rates(self, ccy):
json = await self.get_json('api.bitcoinvenezuela.com', '/')
rates = [(r, json['BTC'][r]) for r in json['BTC']
if json['BTC'][r] is not None] # Giving NULL for LTC
return dict(rates)
@ -154,85 +166,86 @@ class BitcoinVenezuela(ExchangeBase):
def history_ccys(self):
return ['ARS', 'EUR', 'USD', 'VEF']
def request_history(self, ccy):
return self.get_json('api.bitcoinvenezuela.com',
"/historical/index.php?coin=BTC")[ccy +'_BTC']
async def request_history(self, ccy):
json = await self.get_json('api.bitcoinvenezuela.com',
"/historical/index.php?coin=BTC")
return json[ccy +'_BTC']
class Bitbank(ExchangeBase):
def get_rates(self, ccy):
json = self.get_json('public.bitbank.cc', '/btc_jpy/ticker')
async def get_rates(self, ccy):
json = await self.get_json('public.bitbank.cc', '/btc_jpy/ticker')
return {'JPY': Decimal(json['data']['last'])}
class BitFlyer(ExchangeBase):
def get_rates(self, ccy):
json = self.get_json('bitflyer.jp', '/api/echo/price')
async def get_rates(self, ccy):
json = await self.get_json('bitflyer.jp', '/api/echo/price')
return {'JPY': Decimal(json['mid'])}
class Bitmarket(ExchangeBase):
def get_rates(self, ccy):
json = self.get_json('www.bitmarket.pl', '/json/BTCPLN/ticker.json')
async def get_rates(self, ccy):
json = await self.get_json('www.bitmarket.pl', '/json/BTCPLN/ticker.json')
return {'PLN': Decimal(json['last'])}
class BitPay(ExchangeBase):
def get_rates(self, ccy):
json = self.get_json('bitpay.com', '/api/rates')
async def get_rates(self, ccy):
json = await self.get_json('bitpay.com', '/api/rates')
return dict([(r['code'], Decimal(r['rate'])) for r in json])
class Bitso(ExchangeBase):
def get_rates(self, ccy):
json = self.get_json('api.bitso.com', '/v2/ticker')
async def get_rates(self, ccy):
json = await self.get_json('api.bitso.com', '/v2/ticker')
return {'MXN': Decimal(json['last'])}
class BitStamp(ExchangeBase):
def get_rates(self, ccy):
json = self.get_json('www.bitstamp.net', '/api/ticker/')
async def get_rates(self, ccy):
json = await self.get_json('www.bitstamp.net', '/api/ticker/')
return {'USD': Decimal(json['last'])}
class Bitvalor(ExchangeBase):
def get_rates(self,ccy):
json = self.get_json('api.bitvalor.com', '/v1/ticker.json')
async def get_rates(self,ccy):
json = await self.get_json('api.bitvalor.com', '/v1/ticker.json')
return {'BRL': Decimal(json['ticker_1h']['total']['last'])}
class BlockchainInfo(ExchangeBase):
def get_rates(self, ccy):
json = self.get_json('blockchain.info', '/ticker')
async def get_rates(self, ccy):
json = await self.get_json('blockchain.info', '/ticker')
return dict([(r, Decimal(json[r]['15m'])) for r in json])
class BTCChina(ExchangeBase):
def get_rates(self, ccy):
json = self.get_json('data.btcchina.com', '/data/ticker')
async def get_rates(self, ccy):
json = await self.get_json('data.btcchina.com', '/data/ticker')
return {'CNY': Decimal(json['ticker']['last'])}
class BTCParalelo(ExchangeBase):
def get_rates(self, ccy):
json = self.get_json('btcparalelo.com', '/api/price')
async def get_rates(self, ccy):
json = await self.get_json('btcparalelo.com', '/api/price')
return {'VEF': Decimal(json['price'])}
class Coinbase(ExchangeBase):
def get_rates(self, ccy):
json = self.get_json('coinbase.com',
async def get_rates(self, ccy):
json = await self.get_json('coinbase.com',
'/api/v1/currencies/exchange_rates')
return dict([(r[7:].upper(), Decimal(json[r]))
for r in json if r.startswith('btc_to_')])
@ -240,13 +253,13 @@ class Coinbase(ExchangeBase):
class CoinDesk(ExchangeBase):
def get_currencies(self):
dicts = self.get_json('api.coindesk.com',
async def get_currencies(self):
dicts = await self.get_json('api.coindesk.com',
'/v1/bpi/supported-currencies.json')
return [d['currency'] for d in dicts]
def get_rates(self, ccy):
json = self.get_json('api.coindesk.com',
async def get_rates(self, ccy):
json = await self.get_json('api.coindesk.com',
'/v1/bpi/currentprice/%s.json' % ccy)
result = {ccy: Decimal(json['bpi'][ccy]['rate_float'])}
return result
@ -257,35 +270,35 @@ class CoinDesk(ExchangeBase):
def history_ccys(self):
return self.history_starts().keys()
def request_history(self, ccy):
async def request_history(self, ccy):
start = self.history_starts()[ccy]
end = datetime.today().strftime('%Y-%m-%d')
# Note ?currency and ?index don't work as documented. Sigh.
query = ('/v1/bpi/historical/close.json?start=%s&end=%s'
% (start, end))
json = self.get_json('api.coindesk.com', query)
json = await self.get_json('api.coindesk.com', query)
return json['bpi']
class Coinsecure(ExchangeBase):
def get_rates(self, ccy):
json = self.get_json('api.coinsecure.in', '/v0/noauth/newticker')
async def get_rates(self, ccy):
json = await self.get_json('api.coinsecure.in', '/v0/noauth/newticker')
return {'INR': Decimal(json['lastprice'] / 100.0 )}
class Foxbit(ExchangeBase):
def get_rates(self,ccy):
json = self.get_json('api.bitvalor.com', '/v1/ticker.json')
async def get_rates(self,ccy):
json = await self.get_json('api.bitvalor.com', '/v1/ticker.json')
return {'BRL': Decimal(json['ticker_1h']['exchanges']['FOX']['last'])}
class itBit(ExchangeBase):
def get_rates(self, ccy):
async def get_rates(self, ccy):
ccys = ['USD', 'EUR', 'SGD']
json = self.get_json('api.itbit.com', '/v1/markets/XBT%s/ticker' % ccy)
json = await self.get_json('api.itbit.com', '/v1/markets/XBT%s/ticker' % ccy)
result = dict.fromkeys(ccys)
if ccy in ccys:
result[ccy] = Decimal(json['lastPrice'])
@ -294,10 +307,10 @@ class itBit(ExchangeBase):
class Kraken(ExchangeBase):
def get_rates(self, ccy):
async def get_rates(self, ccy):
ccys = ['EUR', 'USD', 'CAD', 'GBP', 'JPY']
pairs = ['XBT%s' % c for c in ccys]
json = self.get_json('api.kraken.com',
json = await self.get_json('api.kraken.com',
'/0/public/Ticker?pair=%s' % ','.join(pairs))
return dict((k[-3:], Decimal(float(v['c'][0])))
for k, v in json['result'].items())
@ -305,45 +318,45 @@ class Kraken(ExchangeBase):
class LocalBitcoins(ExchangeBase):
def get_rates(self, ccy):
json = self.get_json('localbitcoins.com',
async def get_rates(self, ccy):
json = await self.get_json('localbitcoins.com',
'/bitcoinaverage/ticker-all-currencies/')
return dict([(r, Decimal(json[r]['rates']['last'])) for r in json])
class MercadoBitcoin(ExchangeBase):
def get_rates(self, ccy):
json = self.get_json('api.bitvalor.com', '/v1/ticker.json')
async def get_rates(self, ccy):
json = await self.get_json('api.bitvalor.com', '/v1/ticker.json')
return {'BRL': Decimal(json['ticker_1h']['exchanges']['MBT']['last'])}
class NegocieCoins(ExchangeBase):
def get_rates(self,ccy):
json = self.get_json('api.bitvalor.com', '/v1/ticker.json')
async def get_rates(self,ccy):
json = await self.get_json('api.bitvalor.com', '/v1/ticker.json')
return {'BRL': Decimal(json['ticker_1h']['exchanges']['NEG']['last'])}
class TheRockTrading(ExchangeBase):
def get_rates(self, ccy):
json = self.get_json('api.therocktrading.com',
async def get_rates(self, ccy):
json = await self.get_json('api.therocktrading.com',
'/v1/funds/BTCEUR/ticker')
return {'EUR': Decimal(json['last'])}
class Unocoin(ExchangeBase):
def get_rates(self, ccy):
json = self.get_json('www.unocoin.com', 'trade?buy')
async def get_rates(self, ccy):
json = await self.get_json('www.unocoin.com', 'trade?buy')
return {'INR': Decimal(json)}
class WEX(ExchangeBase):
def get_rates(self, ccy):
json_eur = self.get_json('wex.nz', '/api/3/ticker/btc_eur')
json_rub = self.get_json('wex.nz', '/api/3/ticker/btc_rur')
json_usd = self.get_json('wex.nz', '/api/3/ticker/btc_usd')
async def get_rates(self, ccy):
json_eur = await self.get_json('wex.nz', '/api/3/ticker/btc_eur')
json_rub = await self.get_json('wex.nz', '/api/3/ticker/btc_rur')
json_usd = await self.get_json('wex.nz', '/api/3/ticker/btc_usd')
return {'EUR': Decimal(json_eur['btc_eur']['last']),
'RUB': Decimal(json_rub['btc_rur']['last']),
'USD': Decimal(json_usd['btc_usd']['last'])}
@ -351,15 +364,15 @@ class WEX(ExchangeBase):
class Winkdex(ExchangeBase):
def get_rates(self, ccy):
json = self.get_json('winkdex.com', '/api/v0/price')
async def get_rates(self, ccy):
json = await self.get_json('winkdex.com', '/api/v0/price')
return {'USD': Decimal(json['price'] / 100.0)}
def history_ccys(self):
return ['USD']
def request_history(self, ccy):
json = self.get_json('winkdex.com',
async def request_history(self, ccy):
json = await self.get_json('winkdex.com',
"/api/v0/series?start_time=1342915200")
history = json['series'][0]['results']
return dict([(h['timestamp'][:10], h['price'] / 100.0)
@ -367,8 +380,8 @@ class Winkdex(ExchangeBase):
class Zaif(ExchangeBase):
def get_rates(self, ccy):
json = self.get_json('api.zaif.jp', '/api/1/last_price/btc_jpy')
async def get_rates(self, ccy):
json = await self.get_json('api.zaif.jp', '/api/1/last_price/btc_jpy')
return {'JPY': Decimal(json['last_price'])}
@ -381,7 +394,6 @@ def dictinvert(d):
return inv
def get_exchanges_and_currencies():
import os, json
path = os.path.join(os.path.dirname(__file__), 'currencies.json')
try:
with open(path, 'r', encoding='utf-8') as f:
@ -426,13 +438,22 @@ class FxThread(ThreadJob):
def __init__(self, config, network):
self.config = config
self.network = network
self.network.register_callback(self.set_proxy, ['proxy_set'])
self.ccy = self.get_currency()
self.history_used_spot = False
self.ccy_combo = None
self.hist_checkbox = None
self.cache_dir = os.path.join(config.path, 'cache')
self.trigger = asyncio.Event()
self.trigger.set()
self.set_exchange(self.config_exchange())
make_dir(self.cache_dir)
self.set_proxy('bogus', self.network.proxy)
def set_proxy(self, trigger_name, *args):
global PROXY
PROXY = args[0]
self.trigger.set()
def get_currencies(self, h):
d = get_exchanges_by_ccy(h)
@ -451,13 +472,18 @@ class FxThread(ThreadJob):
rounded_amount = amount
return fmt_str.format(rounded_amount)
def run(self):
# This runs from the plugins thread which catches exceptions
if self.is_enabled():
if self.timeout ==0 and self.show_history():
self.exchange.get_historical_rates(self.ccy, self.cache_dir)
if self.timeout <= time.time():
self.timeout = time.time() + 150
async def run(self):
while True:
try:
await asyncio.wait_for(self.trigger.wait(), 150)
except concurrent.futures.TimeoutError:
pass
else:
self.trigger.clear()
if self.is_enabled():
if self.show_history():
self.exchange.get_historical_rates(self.ccy, self.cache_dir)
if self.is_enabled():
self.exchange.update(self.ccy)
def is_enabled(self):
@ -497,7 +523,7 @@ class FxThread(ThreadJob):
def set_currency(self, ccy):
self.ccy = ccy
self.config.set_key('currency', ccy, True)
self.timeout = 0 # Because self.ccy changes
self.trigger.set() # Because self.ccy changes
self.on_quotes()
def set_exchange(self, name):
@ -508,7 +534,7 @@ class FxThread(ThreadJob):
self.exchange = class_(self.on_quotes, self.on_history)
# A new exchange means new fx quotes, initially empty. Force
# a quote refresh
self.timeout = 0
self.trigger.set()
self.exchange.read_historical_rates(self.ccy, self.cache_dir)
def on_quotes(self):

12
electrum/gui/kivy/uix/dialogs/crash_reporter.py

@ -1,6 +1,7 @@
import sys
import json
import requests
from aiohttp.client_exceptions import ClientError
from kivy import base, utils
from kivy.clock import Clock
from kivy.core.window import Window
@ -102,6 +103,11 @@ class CrashReporter(BaseCrashReporter, Factory.Popup):
self.ids.crash_message.text = BaseCrashReporter.CRASH_MESSAGE
self.ids.request_help_message.text = BaseCrashReporter.REQUEST_HELP_MESSAGE
self.ids.describe_error_message.text = BaseCrashReporter.DESCRIBE_ERROR_MESSAGE
self.proxy = self.main_window.network.proxy
self.main_window.network.register_callback(self.set_proxy, ['proxy_set'])
def set_proxy(self, evt, proxy):
self.proxy = proxy
def show_contents(self):
details = CrashReportDetails(self.get_report_string())
@ -115,8 +121,8 @@ class CrashReporter(BaseCrashReporter, Factory.Popup):
def send_report(self):
try:
response = BaseCrashReporter.send_report(self, "/crash.json").json()
except requests.exceptions.RequestException:
response = json.loads(BaseCrashReporter.send_report(self, self.main_window.network.asyncio_loop, self.proxy, "/crash.json"))
except (ValueError, ClientError):
self.show_popup(_('Unable to send report'), _("Please check your network connection."))
else:
self.show_popup(_('Report sent'), response["text"])

11
electrum/gui/qt/exception_window.py

@ -41,6 +41,10 @@ class Exception_Window(BaseCrashReporter, QWidget, MessageBoxMixin):
def __init__(self, main_window, exctype, value, tb):
BaseCrashReporter.__init__(self, exctype, value, tb)
self.main_window = main_window
self.proxy = self.main_window.network.proxy
self.main_window.network.register_callback(self.set_proxy, ['proxy_set'])
QWidget.__init__(self)
self.setWindowTitle('Electrum - ' + _('An Error Occurred'))
self.setMinimumSize(600, 300)
@ -88,16 +92,19 @@ class Exception_Window(BaseCrashReporter, QWidget, MessageBoxMixin):
self.setLayout(main_box)
self.show()
def set_proxy(self, evt, proxy):
self.proxy = proxy
def send_report(self):
try:
response = BaseCrashReporter.send_report(self)
response = BaseCrashReporter.send_report(self, self.main_window.network.asyncio_loop, self.proxy)
except BaseException as e:
traceback.print_exc(file=sys.stderr)
self.main_window.show_critical(_('There was a problem with the automatic reporting:') + '\n' +
str(e) + '\n' +
_("Please report this issue manually."))
return
QMessageBox.about(self, _("Crash report"), response.text)
QMessageBox.about(self, _("Crash report"), response)
self.close()
def on_close(self):

15
electrum/network.py

@ -110,7 +110,7 @@ def pick_random_server(hostmap = None, protocol = 's', exclude_set = set()):
from .simple_config import SimpleConfig
proxy_modes = ['socks4', 'socks5', 'http']
proxy_modes = ['socks4', 'socks5']
def serialize_proxy(p):
@ -437,6 +437,7 @@ class Network(PrintError):
socket.getaddrinfo = self._fast_getaddrinfo
else:
socket.getaddrinfo = socket._getaddrinfo
self.trigger_callback('proxy_set', self.proxy)
@staticmethod
def _fast_getaddrinfo(host, *args, **kwargs):
@ -710,9 +711,13 @@ class Network(PrintError):
with b.lock:
b.update_size()
def _run(self):
def _run(self, fx):
self.init_headers_file()
self.gat = self.asyncio_loop.create_task(self.maintain_sessions())
jobs = [self.maintain_sessions()]
if fx:
jobs.append(fx)
jobs = [self.asyncio_loop.create_task(x) for x in jobs]
self.gat = asyncio.gather(*jobs)
try:
self.asyncio_loop.run_until_complete(self.gat)
except concurrent.futures.CancelledError:
@ -789,8 +794,8 @@ class Network(PrintError):
def max_checkpoint(cls):
return max(0, len(constants.net.CHECKPOINTS) * 2016 - 1)
def start(self):
self.fut = threading.Thread(target=self._run)
def start(self, fx=None):
self.fut = threading.Thread(target=self._run, args=(fx,))
self.fut.start()
def stop(self):

84
electrum/plugins/labels/labels.py

@ -1,6 +1,6 @@
import asyncio
import hashlib
import requests
import threading
import json
import sys
import traceback
@ -10,7 +10,7 @@ import base64
from electrum.plugin import BasePlugin, hook
from electrum.crypto import aes_encrypt_with_iv, aes_decrypt_with_iv
from electrum.i18n import _
from electrum.util import aiosafe, make_aiohttp_session
class LabelsPlugin(BasePlugin):
@ -18,11 +18,11 @@ class LabelsPlugin(BasePlugin):
BasePlugin.__init__(self, parent, config, name)
self.target_host = 'labels.electrum.org'
self.wallets = {}
self.proxy = None
def encode(self, wallet, msg):
password, iv, wallet_id = self.wallets[wallet]
encrypted = aes_encrypt_with_iv(password, iv,
msg.encode('utf8'))
encrypted = aes_encrypt_with_iv(password, iv, msg.encode('utf8'))
return base64.b64encode(encrypted).decode()
def decode(self, wallet, message):
@ -55,37 +55,27 @@ class LabelsPlugin(BasePlugin):
"walletNonce": nonce,
"externalId": self.encode(wallet, item),
"encryptedLabel": self.encode(wallet, label)}
t = threading.Thread(target=self.do_request_safe,
args=["POST", "/label", False, bundle])
t.setDaemon(True)
t.start()
asyncio.get_event_loop().create_task(self.do_post_safe("/label", False, bundle))
# Caller will write the wallet
self.set_nonce(wallet, nonce + 1)
def do_request(self, method, url = "/labels", is_batch=False, data=None):
@aiosafe
async def do_post_safe(self, *args):
await self.do_post(*args)
async def do_get(self, url = "/labels"):
url = 'https://' + self.target_host + url
async with make_aiohttp_session(self.proxy) as session:
async with session.get(url) as result:
return await result.json()
async def do_post(self, url = "/labels", data=None):
url = 'https://' + self.target_host + url
kwargs = {'headers': {}}
if method == 'GET' and data:
kwargs['params'] = data
elif method == 'POST' and data:
kwargs['data'] = json.dumps(data)
kwargs['headers']['Content-Type'] = 'application/json'
response = requests.request(method, url, **kwargs)
if response.status_code != 200:
raise Exception(response.status_code, response.text)
response = response.json()
if "error" in response:
raise Exception(response["error"])
return response
def do_request_safe(self, *args, **kwargs):
try:
self.do_request(*args, **kwargs)
except BaseException as e:
#traceback.print_exc(file=sys.stderr)
self.print_error('error doing request')
def push_thread(self, wallet):
async with make_aiohttp_session(self.proxy) as session:
async with session.post(url, data=data) as result:
return await result.json()
async def push_thread(self, wallet):
wallet_data = self.wallets.get(wallet, None)
if not wallet_data:
raise Exception('Wallet {} not loaded'.format(wallet))
@ -102,16 +92,16 @@ class LabelsPlugin(BasePlugin):
continue
bundle["labels"].append({'encryptedLabel': encoded_value,
'externalId': encoded_key})
self.do_request("POST", "/labels", True, bundle)
await self.do_post("/labels", bundle)
def pull_thread(self, wallet, force):
async def pull_thread(self, wallet, force):
wallet_data = self.wallets.get(wallet, None)
if not wallet_data:
raise Exception('Wallet {} not loaded'.format(wallet))
wallet_id = wallet_data[2]
nonce = 1 if force else self.get_nonce(wallet) - 1
self.print_error("asking for labels since nonce", nonce)
response = self.do_request("GET", ("/labels/since/%d/for/%s" % (nonce, wallet_id) ))
response = await self.do_get("/labels/since/%d/for/%s" % (nonce, wallet_id))
if response["labels"] is None:
self.print_error('no new labels')
return
@ -140,12 +130,15 @@ class LabelsPlugin(BasePlugin):
self.set_nonce(wallet, response["nonce"] + 1)
self.on_pulled(wallet)
def pull_thread_safe(self, wallet, force):
try:
self.pull_thread(wallet, force)
except BaseException as e:
# traceback.print_exc(file=sys.stderr)
self.print_error('could not retrieve labels')
@aiosafe
async def pull_safe_thread(self, wallet, force):
await self.pull_thread(wallet, force)
def pull(self, wallet, force):
return asyncio.run_coroutine_threadsafe(self.pull_thread(wallet, force), wallet.network.asyncio_loop).result()
def push(self, wallet):
return asyncio.run_coroutine_threadsafe(self.push_thread(wallet), wallet.network.asyncio_loop).result()
def start_wallet(self, wallet):
nonce = self.get_nonce(wallet)
@ -159,9 +152,14 @@ class LabelsPlugin(BasePlugin):
wallet_id = hashlib.sha256(mpk).hexdigest()
self.wallets[wallet] = (password, iv, wallet_id)
# If there is an auth token we can try to actually start syncing
t = threading.Thread(target=self.pull_thread_safe, args=(wallet, False))
t.setDaemon(True)
t.start()
asyncio.get_event_loop().create_task(self.pull_safe_thread(wallet, False))
self.proxy = wallet.network.proxy
wallet.network.register_callback(self.set_proxy, ['proxy_set'])
def stop_wallet(self, wallet):
wallet.network.unregister_callback('proxy_set')
self.wallets.pop(wallet, None)
def set_proxy(self, evt_name, new_proxy):
self.proxy = new_proxy
self.print_error("proxy set")

4
electrum/plugins/labels/qt.py

@ -38,11 +38,11 @@ class Plugin(LabelsPlugin):
hbox = QHBoxLayout()
hbox.addWidget(QLabel("Label sync options:"))
upload = ThreadedButton("Force upload",
partial(self.push_thread, wallet),
partial(self.push, wallet),
partial(self.done_processing_success, d),
partial(self.done_processing_error, d))
download = ThreadedButton("Force download",
partial(self.pull_thread, wallet, True),
partial(self.pull, wallet, True),
partial(self.done_processing_success, d),
partial(self.done_processing_error, d))
vbox = QVBoxLayout()

16
electrum/util.py

@ -37,6 +37,8 @@ from locale import localeconv
from .i18n import _
import aiohttp
from aiohttp_socks import SocksConnector, SocksVer
import urllib.request, urllib.parse, urllib.error
import queue
@ -947,3 +949,17 @@ VerifiedTxInfo = NamedTuple("VerifiedTxInfo", [("height", int),
("timestamp", int),
("txpos", int),
("header_hash", str)])
def make_aiohttp_session(proxy):
if proxy:
connector = SocksConnector(
socks_ver=SocksVer.SOCKS5 if proxy['mode'] == 'socks5' else SocksVer.SOCKS4,
host=proxy['host'],
port=int(proxy['port']),
username=proxy.get('user', None),
password=proxy.get('password', None),
rdns=True
)
return aiohttp.ClientSession(headers={'User-Agent' : 'Electrum'}, timeout=aiohttp.ClientTimeout(total=10), connector=connector)
else:
return aiohttp.ClientSession(headers={'User-Agent' : 'Electrum'}, timeout=aiohttp.ClientTimeout(total=10))

Loading…
Cancel
Save