From 4b3a285871161916708f31b0d43c25c13a1b5d7d Mon Sep 17 00:00:00 2001 From: SomberNight Date: Wed, 27 Feb 2019 21:48:33 +0100 Subject: [PATCH] exchange_rate: some clean-up and fixes - generation of currencies.json was broken - removed some dead exchanges --- electrum/exchange_rate.py | 103 ++++++++++++++------------------------ electrum/network.py | 24 +++++---- electrum/util.py | 2 +- 3 files changed, 53 insertions(+), 76 deletions(-) diff --git a/electrum/exchange_rate.py b/electrum/exchange_rate.py index 28fdd3738..f24230e05 100644 --- a/electrum/exchange_rate.py +++ b/electrum/exchange_rate.py @@ -8,10 +8,11 @@ import time import csv import decimal from decimal import Decimal -import concurrent.futures import traceback from typing import Sequence +from aiorpcx.curio import timeout_after, TaskTimeout, TaskGroup + from .bitcoin import COIN from .i18n import _ from .util import (PrintError, ThreadJob, make_dir, log_exceptions, @@ -40,14 +41,18 @@ class ExchangeBase(PrintError): async def get_raw(self, site, get_string): # APIs must have https url = ''.join(['https://', site, get_string]) - async with make_aiohttp_session(Network.get_instance().proxy) as session: + network = Network.get_instance() + proxy = network.proxy if network else None + async with make_aiohttp_session(proxy) as session: async with session.get(url) as response: return await response.text() async def get_json(self, site, get_string): # APIs must have https url = ''.join(['https://', site, get_string]) - async with make_aiohttp_session(Network.get_instance().proxy) as session: + network = Network.get_instance() + proxy = network.proxy if network else None + async with make_aiohttp_session(proxy) as session: async with session.get(url) as response: # set content_type to None to disable checking MIME type return await response.json(content_type=None) @@ -60,7 +65,6 @@ class ExchangeBase(PrintError): def name(self): return self.__class__.__name__ - @log_exceptions async def update_safe(self, ccy): try: self.print_error("getting fx quotes for", ccy) @@ -71,9 +75,6 @@ class ExchangeBase(PrintError): self.quotes = {} self.on_quotes() - def update(self, ccy): - asyncio.get_event_loop().create_task(self.update_safe(ccy)) - def read_historical_rates(self, ccy, cache_dir): filename = os.path.join(cache_dir, self.name() + '_'+ ccy) if os.path.exists(filename): @@ -123,8 +124,8 @@ class ExchangeBase(PrintError): def historical_rate(self, ccy, d_t): return self.history.get(ccy, {}).get(d_t.strftime('%Y-%m-%d'), 'NaN') - def get_currencies(self): - rates = self.get_rates('') + async def get_currencies(self): + rates = await self.get_rates('') return sorted([str(a) for (a, b) in rates.items() if b is not None and len(a)==3]) class BitcoinAverage(ExchangeBase): @@ -229,20 +230,6 @@ class BlockchainInfo(ExchangeBase): return dict([(r, Decimal(json[r]['15m'])) for r in json]) -class BTCChina(ExchangeBase): - - async def get_rates(self, ccy): - json = await self.get_json('data.btcchina.com', '/data/ticker') - return {'CNY': Decimal(json['ticker']['last'])} - - -class BTCParalelo(ExchangeBase): - - async def get_rates(self, ccy): - json = await self.get_json('btcparalelo.com', '/api/price') - return {'VEF': Decimal(json['price'])} - - class Coinbase(ExchangeBase): async def get_rates(self, ccy): @@ -280,20 +267,6 @@ class CoinDesk(ExchangeBase): return json['bpi'] -class Coinsecure(ExchangeBase): - - async def get_rates(self, ccy): - json = await self.get_json('api.coinsecure.in', '/v0/noauth/newticker') - return {'INR': Decimal(json['lastprice'] / 100.0 )} - - -class Foxbit(ExchangeBase): - - async def get_rates(self,ccy): - json = await self.get_json('api.bitvalor.com', '/v1/ticker.json') - return {'BRL': Decimal(json['ticker_1h']['exchanges']['FOX']['last'])} - - class itBit(ExchangeBase): async def get_rates(self, ccy): @@ -344,23 +317,6 @@ class TheRockTrading(ExchangeBase): '/v1/funds/BTCEUR/ticker') return {'EUR': Decimal(json['last'])} -class Unocoin(ExchangeBase): - - async def get_rates(self, ccy): - json = await self.get_json('www.unocoin.com', 'trade?buy') - return {'INR': Decimal(json)} - - -class WEX(ExchangeBase): - - async def get_rates(self, ccy): - json_eur = await self.get_json('wex.nz', '/api/3/ticker/btc_eur') - json_rub = await self.get_json('wex.nz', '/api/3/ticker/btc_rur') - json_usd = await self.get_json('wex.nz', '/api/3/ticker/btc_usd') - return {'EUR': Decimal(json_eur['btc_eur']['last']), - 'RUB': Decimal(json_rub['btc_rur']['last']), - 'USD': Decimal(json_usd['btc_usd']['last'])} - class Winkdex(ExchangeBase): @@ -394,25 +350,39 @@ def dictinvert(d): return inv def get_exchanges_and_currencies(): + # load currencies.json from disk path = resource_path('currencies.json') try: with open(path, 'r', encoding='utf-8') as f: return json.loads(f.read()) except: pass + # or if not present, generate it now. + print("cannot find currencies.json. will regenerate it now.") d = {} is_exchange = lambda obj: (inspect.isclass(obj) and issubclass(obj, ExchangeBase) and obj != ExchangeBase) exchanges = dict(inspect.getmembers(sys.modules[__name__], is_exchange)) - for name, klass in exchanges.items(): - exchange = klass(None, None) + + async def get_currencies_safe(name, exchange): try: - d[name] = exchange.get_currencies() + d[name] = await exchange.get_currencies() print(name, "ok") except: print(name, "error") - continue + + async def query_all_exchanges_for_their_ccys_over_network(): + async with timeout_after(10): + async with TaskGroup() as group: + for name, klass in exchanges.items(): + exchange = klass(None, None) + await group.spawn(get_currencies_safe(name, exchange)) + loop = asyncio.get_event_loop() + try: + loop.run_until_complete(query_all_exchanges_for_their_ccys_over_network()) + except Exception as e: + pass with open(path, 'w', encoding='utf-8') as f: f.write(json.dumps(d, indent=4, sort_keys=True)) return d @@ -478,17 +448,18 @@ class FxThread(ThreadJob): async def run(self): while True: + # approx. every 2.5 minutes, refresh spot price try: - await asyncio.wait_for(self._trigger.wait(), 150) - except concurrent.futures.TimeoutError: + async with timeout_after(150): + await self._trigger.wait() + self._trigger.clear() + # we were manually triggered, so get historical rates + if self.is_enabled() and self.show_history(): + self.exchange.get_historical_rates(self.ccy, self.cache_dir) + except TaskTimeout: pass - else: - self._trigger.clear() - if self.is_enabled(): - if self.show_history(): - self.exchange.get_historical_rates(self.ccy, self.cache_dir) if self.is_enabled(): - self.exchange.update(self.ccy) + await self.exchange.update_safe(self.ccy) def is_enabled(self): return bool(self.config.get('use_exchange_rate')) diff --git a/electrum/network.py b/electrum/network.py index 3a290536a..81b4bc4c9 100644 --- a/electrum/network.py +++ b/electrum/network.py @@ -287,7 +287,7 @@ class Network(PrintError): return fut.result() @staticmethod - def get_instance(): + def get_instance() -> Optional["Network"]: return INSTANCE def with_recent_servers_lock(func): @@ -1147,8 +1147,8 @@ class Network(PrintError): raise await asyncio.sleep(0.1) - - async def _send_http_on_proxy(self, method: str, url: str, params: str = None, body: bytes = None, json: dict = None, headers=None, on_finish=None): + @classmethod + async def _send_http_on_proxy(cls, method: str, url: str, params: str = None, body: bytes = None, json: dict = None, headers=None, on_finish=None): async def default_on_finish(resp: ClientResponse): resp.raise_for_status() return await resp.text() @@ -1156,7 +1156,9 @@ class Network(PrintError): headers = {} if on_finish is None: on_finish = default_on_finish - async with make_aiohttp_session(self.proxy) as session: + network = cls.get_instance() + proxy = network.proxy if network else None + async with make_aiohttp_session(proxy) as session: if method == 'get': async with session.get(url, params=params, headers=headers) as resp: return await on_finish(resp) @@ -1171,11 +1173,15 @@ class Network(PrintError): else: assert False - @staticmethod - def send_http_on_proxy(method, url, **kwargs): - network = Network.get_instance() - assert network._loop_thread is not threading.currentThread() - coro = asyncio.run_coroutine_threadsafe(network._send_http_on_proxy(method, url, **kwargs), network.asyncio_loop) + @classmethod + def send_http_on_proxy(cls, method, url, **kwargs): + network = cls.get_instance() + if network: + assert network._loop_thread is not threading.currentThread() + loop = network.asyncio_loop + else: + loop = asyncio.get_event_loop() + coro = asyncio.run_coroutine_threadsafe(cls._send_http_on_proxy(method, url, **kwargs), loop) return coro.result(5) diff --git a/electrum/util.py b/electrum/util.py index 22d8ca9ae..ab3c29552 100644 --- a/electrum/util.py +++ b/electrum/util.py @@ -948,7 +948,7 @@ class TxMinedInfo(NamedTuple): header_hash: Optional[str] = None # hash of block that mined tx -def make_aiohttp_session(proxy: dict, headers=None, timeout=None): +def make_aiohttp_session(proxy: Optional[dict], headers=None, timeout=None): if headers is None: headers = {'User-Agent': 'Electrum'} if timeout is None: