diff --git a/electrum/commands.py b/electrum/commands.py index 19f45d6e0..03f7d33a3 100644 --- a/electrum/commands.py +++ b/electrum/commands.py @@ -181,7 +181,7 @@ class Commands: walletless server query, results are not checked by SPV. """ sh = bitcoin.address_to_scripthash(address) - return self.network.get_history_for_scripthash(sh) + return self.network.run_from_another_thread(self.network.get_history_for_scripthash(sh)) @command('w') def listunspent(self): @@ -199,7 +199,7 @@ class Commands: is a walletless server query, results are not checked by SPV. """ sh = bitcoin.address_to_scripthash(address) - return self.network.listunspent_for_scripthash(sh) + return self.network.run_from_another_thread(self.network.listunspent_for_scripthash(sh)) @command('') def serialize(self, jsontx): @@ -322,7 +322,7 @@ class Commands: server query, results are not checked by SPV. """ sh = bitcoin.address_to_scripthash(address) - out = self.network.get_balance_for_scripthash(sh) + out = self.network.run_from_another_thread(self.network.get_balance_for_scripthash(sh)) out["confirmed"] = str(Decimal(out["confirmed"])/COIN) out["unconfirmed"] = str(Decimal(out["unconfirmed"])/COIN) return out @@ -331,7 +331,7 @@ class Commands: def getmerkle(self, txid, height): """Get Merkle branch of a transaction included in a block. Electrum uses this to verify transactions (Simple Payment Verification).""" - return self.network.get_merkle_for_transaction(txid, int(height)) + return self.network.run_from_another_thread(self.network.get_merkle_for_transaction(txid, int(height))) @command('n') def getservers(self): @@ -517,7 +517,7 @@ class Commands: if self.wallet and txid in self.wallet.transactions: tx = self.wallet.transactions[txid] else: - raw = self.network.get_transaction(txid) + raw = self.network.run_from_another_thread(self.network.get_transaction(txid)) if raw: tx = Transaction(raw) else: @@ -637,6 +637,7 @@ class Commands: @command('n') def notify(self, address, URL): """Watch an address. Every time the address changes, a http POST is sent to the URL.""" + raise NotImplementedError() # TODO this method is currently broken def callback(x): import urllib.request headers = {'content-type':'application/json'} diff --git a/electrum/interface.py b/electrum/interface.py index 2b3b44540..78c114f6b 100644 --- a/electrum/interface.py +++ b/electrum/interface.py @@ -76,7 +76,7 @@ class NotificationSession(ClientSession): super().send_request(*args, **kwargs), timeout) except asyncio.TimeoutError as e: - raise GracefulDisconnect('request timed out: {}'.format(args)) from e + raise RequestTimedOut('request timed out: {}'.format(args)) from e async def subscribe(self, method, params, queue): # note: until the cache is written for the first time, @@ -105,6 +105,7 @@ class NotificationSession(ClientSession): class GracefulDisconnect(Exception): pass +class RequestTimedOut(GracefulDisconnect): pass class ErrorParsingSSLCert(Exception): pass class ErrorGettingSSLCertFromServer(Exception): pass @@ -140,6 +141,7 @@ class Interface(PrintError): self._requested_chunks = set() self.network = network self._set_proxy(proxy) + self.session = None self.tip_header = None self.tip = 0 diff --git a/electrum/network.py b/electrum/network.py index f54c9e062..a449b0aef 100644 --- a/electrum/network.py +++ b/electrum/network.py @@ -45,7 +45,7 @@ from .bitcoin import COIN from . import constants from . import blockchain from .blockchain import Blockchain, HEADER_SIZE -from .interface import Interface, serialize_server, deserialize_server +from .interface import Interface, serialize_server, deserialize_server, RequestTimedOut from .version import PROTOCOL_VERSION from .simple_config import SimpleConfig @@ -638,13 +638,34 @@ class Network(PrintError): with b.lock: b.update_size() - async def get_merkle_for_transaction(self, tx_hash, tx_height): + def best_effort_reliable(func): + async def make_reliable_wrapper(self, *args, **kwargs): + for i in range(10): + iface = self.interface + session = iface.session if iface else None + if not session: + # no main interface; try again + await asyncio.sleep(0.1) + continue + try: + return await func(self, *args, **kwargs) + except RequestTimedOut: + if self.interface != iface: + # main interface changed; try again + continue + raise + raise Exception('no interface to do request on... gave up.') + return make_reliable_wrapper + + @best_effort_reliable + async def get_merkle_for_transaction(self, tx_hash: str, tx_height: int) -> dict: return await self.interface.session.send_request('blockchain.transaction.get_merkle', [tx_hash, tx_height]) + @best_effort_reliable async def broadcast_transaction(self, tx, timeout=10): try: out = await self.interface.session.send_request('blockchain.transaction.broadcast', [str(tx)], timeout=timeout) - except asyncio.TimeoutError as e: + except RequestTimedOut as e: return False, "error: operation timed out" except Exception as e: return False, "error: " + str(e) @@ -653,10 +674,27 @@ class Network(PrintError): return False, "error: " + out return True, out + @best_effort_reliable async def request_chunk(self, height, tip=None, *, can_return_early=False): return await self.interface.request_chunk(height, tip=tip, can_return_early=can_return_early) - def blockchain(self): + @best_effort_reliable + async def get_transaction(self, tx_hash: str) -> str: + return await self.interface.session.send_request('blockchain.transaction.get', [tx_hash]) + + @best_effort_reliable + async def get_history_for_scripthash(self, sh: str) -> List[dict]: + return await self.interface.session.send_request('blockchain.scripthash.get_history', [sh]) + + @best_effort_reliable + async def listunspent_for_scripthash(self, sh: str) -> List[dict]: + return await self.interface.session.send_request('blockchain.scripthash.listunspent', [sh]) + + @best_effort_reliable + async def get_balance_for_scripthash(self, sh: str) -> dict: + return await self.interface.session.send_request('blockchain.scripthash.get_balance', [sh]) + + def blockchain(self) -> Blockchain: interface = self.interface if interface and interface.blockchain is not None: self.blockchain_index = interface.blockchain.forkpoint diff --git a/electrum/synchronizer.py b/electrum/synchronizer.py index 568d2f844..cb84f963f 100644 --- a/electrum/synchronizer.py +++ b/electrum/synchronizer.py @@ -51,6 +51,7 @@ class Synchronizer(PrintError): ''' def __init__(self, wallet): self.wallet = wallet + self.network = wallet.network self.asyncio_loop = wallet.network.asyncio_loop self.requested_tx = {} self.requested_histories = {} @@ -86,7 +87,7 @@ class Synchronizer(PrintError): # request address history self.requested_histories[addr] = status h = address_to_scripthash(addr) - result = await self.session.send_request("blockchain.scripthash.get_history", [h]) + result = await self.network.get_history_for_scripthash(h) self.print_error("receiving history", addr, len(result)) hashes = set(map(lambda item: item['tx_hash'], result)) hist = list(map(lambda item: (item['tx_hash'], item['height']), result)) @@ -125,7 +126,7 @@ class Synchronizer(PrintError): await group.spawn(self._get_transaction, tx_hash) async def _get_transaction(self, tx_hash): - result = await self.session.send_request('blockchain.transaction.get', [tx_hash]) + result = await self.network.get_transaction(tx_hash) tx = Transaction(result) try: tx.deserialize()