diff --git a/contrib/requirements/requirements.txt b/contrib/requirements/requirements.txt index 99b859c30..3860c9cd4 100644 --- a/contrib/requirements/requirements.txt +++ b/contrib/requirements/requirements.txt @@ -8,3 +8,4 @@ jsonrpclib-pelix PySocks>=1.6.6 qdarkstyle<3.0 typing>=3.0.0 +aiorpcx>=0.7.1 diff --git a/electrum/address_synchronizer.py b/electrum/address_synchronizer.py index d0e8dff1f..ed9f190e3 100644 --- a/electrum/address_synchronizer.py +++ b/electrum/address_synchronizer.py @@ -22,6 +22,7 @@ # SOFTWARE. import threading +import asyncio import itertools from collections import defaultdict @@ -138,16 +139,18 @@ class AddressSynchronizer(PrintError): self.network = network if self.network is not None: self.verifier = SPV(self.network, self) - self.synchronizer = Synchronizer(self, network) - #network.add_jobs([self.verifier, self.synchronizer]) + self.synchronizer = Synchronizer(self) + #network.add_jobs([self.verifier]) + self.network.futures.append(asyncio.run_coroutine_threadsafe(self.synchronizer.send_subscriptions(), self.network.asyncio_loop)) + self.network.futures.append(asyncio.run_coroutine_threadsafe(self.synchronizer.handle_status(), self.network.asyncio_loop)) + self.network.futures.append(asyncio.run_coroutine_threadsafe(self.synchronizer.main(), self.network.asyncio_loop)) else: self.verifier = None self.synchronizer = None def stop_threads(self): if self.network: - #self.network.remove_jobs([self.synchronizer, self.verifier]) - self.synchronizer.release() + #self.network.remove_jobs([self.verifier]) self.synchronizer = None self.verifier = None # Now no references to the synchronizer or verifier diff --git a/electrum/network.py b/electrum/network.py index de1b7b8cf..e00a9c057 100644 --- a/electrum/network.py +++ b/electrum/network.py @@ -20,6 +20,7 @@ # ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN # CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. +import asyncio import time import queue import os @@ -204,7 +205,6 @@ class Network(PrintError): self.callback_lock = threading.Lock() self.pending_sends_lock = threading.Lock() self.recent_servers_lock = threading.RLock() # <- re-entrant - self.subscribed_addresses_lock = threading.Lock() self.blockchains_lock = threading.Lock() self.pending_sends = [] @@ -226,7 +226,6 @@ class Network(PrintError): util.make_dir(dir_path) # subscriptions and requests - self.subscribed_addresses = set() # note: needs self.subscribed_addresses_lock self.h2addr = {} # Requests from client we've not seen a response to self.unanswered_requests = {} @@ -245,6 +244,7 @@ class Network(PrintError): self.start_network(deserialize_server(self.default_server)[2], deserialize_proxy(self.config.get('proxy'))) self.asyncio_loop = asyncio.get_event_loop() + self.futures = [] def with_interface_lock(func): def func_wrapper(self, *args, **kwargs): @@ -338,26 +338,6 @@ class Network(PrintError): interface.queue_request(method, params, message_id) return message_id - @with_interface_lock - def send_subscriptions(self): - assert self.interface - self.print_error('sending subscriptions to', self.interface.server, len(self.unanswered_requests), len(self.subscribed_addresses)) - self.sub_cache.clear() - # Resend unanswered requests - requests = self.unanswered_requests.values() - self.unanswered_requests = {} - for request in requests: - message_id = self.queue_request(request[0], request[1]) - self.unanswered_requests[message_id] = request - self.queue_request('server.banner', []) - self.queue_request('server.donation_address', []) - self.queue_request('server.peers.subscribe', []) - self.request_fee_estimates() - self.queue_request('blockchain.relayfee', []) - with self.subscribed_addresses_lock: - for h in self.subscribed_addresses: - self.queue_request('blockchain.scripthash.subscribe', [h]) - def request_fee_estimates(self): from .simple_config import FEE_ETA_TARGETS self.config.requested_fee_estimates() @@ -578,7 +558,6 @@ class Network(PrintError): # fixme: we don't want to close headers sub #self.close_interface(self.interface) self.interface = i - self.send_subscriptions() self.set_status('connected') self.notify('updated') self.notify('interfaces') @@ -683,11 +662,6 @@ class Network(PrintError): # Copy the request method and params to the response response['method'] = method response['params'] = params - # Only once we've received a response to an addr subscription - # add it to the list; avoids double-sends on reconnection - if method == 'blockchain.scripthash.subscribe': - with self.subscribed_addresses_lock: - self.subscribed_addresses.add(params[0]) else: if not response: # Closed remotely / misbehaving self.connection_down(interface.server) @@ -1078,6 +1052,7 @@ class Network(PrintError): self.asyncio_loop.run_until_complete(self.gat) except concurrent.futures.CancelledError: pass + [f.cancel() for f in self.futures] def on_notify_header(self, interface, header_dict): try: @@ -1212,21 +1187,6 @@ class Network(PrintError): callback(x2) return cb2 - def subscribe_to_addresses(self, addresses, callback): - hash2address = { - bitcoin.address_to_scripthash(address): address - for address in addresses} - self.h2addr.update(hash2address) - msgs = [ - ('blockchain.scripthash.subscribe', [x]) - for x in hash2address.keys()] - self.send(msgs, self.map_scripthash_to_address(callback)) - - def request_address_history(self, address, callback): - h = bitcoin.address_to_scripthash(address) - self.h2addr.update({h: address}) - self.send([('blockchain.scripthash.get_history', [h])], self.map_scripthash_to_address(callback)) - # NOTE this method handles exceptions and a special edge case, counter to # what the other ElectrumX methods do. This is unexpected. def broadcast_transaction(self, transaction, callback=None): diff --git a/electrum/synchronizer.py b/electrum/synchronizer.py index 48635df81..6cc3b7bdf 100644 --- a/electrum/synchronizer.py +++ b/electrum/synchronizer.py @@ -22,102 +22,86 @@ # ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN # CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. +import traceback +import ssl +import asyncio +from aiorpcx import ClientSession, Request, Notification, TaskGroup from threading import Lock import hashlib +import concurrent.futures # from .bitcoin import Hash, hash_encode from .transaction import Transaction -from .util import ThreadJob, bh2u +from .util import ThreadJob, bh2u, PrintError, aiosafe +from .bitcoin import address_to_scripthash +from .version import ELECTRUM_VERSION, PROTOCOL_VERSION -class Synchronizer(ThreadJob): +def history_status(h): + if not h: + return None + status = '' + for tx_hash, height in h: + status += tx_hash + ':%d:' % height + return bh2u(hashlib.sha256(status.encode('ascii')).digest()) + + + +class NotificationSession(ClientSession): + + def __init__(self, queue, *args, **kwargs): + super(NotificationSession, self).__init__(*args, **kwargs) + self.queue = queue + + @aiosafe + async def handle_request(self, request): + if isinstance(request, Notification): + if request.method == 'blockchain.scripthash.subscribe': + args = request.args + await self.queue.put((args[0], args[1])) + + +class Synchronizer(PrintError): '''The synchronizer keeps the wallet up-to-date with its set of addresses and their transactions. It subscribes over the network to wallet addresses, gets the wallet to generate new addresses when necessary, requests the transaction history of any addresses we don't have the full history of, and requests binary transaction data of any transactions the wallet doesn't have. - - External interface: __init__() and add() member functions. ''' - - def __init__(self, wallet, network): + def __init__(self, wallet): self.wallet = wallet - self.network = network - self.new_addresses = set() - # Entries are (tx_hash, tx_height) tuples self.requested_tx = {} self.requested_histories = {} self.requested_addrs = set() - self.lock = Lock() + self.scripthash_to_address = {} + # Queues + self.add_queue = asyncio.Queue() + self.status_queue = asyncio.Queue() - self.initialized = False - self.initialize() - - def parse_response(self, response): - if response.get('error'): - self.print_error("response error:", response) - return None, None - return response['params'], response['result'] + async def send_version(self): + r = await self.session.send_request('server.version', [ELECTRUM_VERSION, PROTOCOL_VERSION]) def is_up_to_date(self): - return (not self.requested_tx and not self.requested_histories - and not self.requested_addrs) - - def release(self): - self.network.unsubscribe(self.on_address_status) - - def add(self, address): - '''This can be called from the proxy or GUI threads.''' - with self.lock: - self.new_addresses.add(address) - - def subscribe_to_addresses(self, addresses): - if addresses: - self.requested_addrs |= addresses - self.network.subscribe_to_addresses(addresses, self.on_address_status) - - def get_status(self, h): - if not h: - return None - status = '' - for tx_hash, height in h: - status += tx_hash + ':%d:' % height - return bh2u(hashlib.sha256(status.encode('ascii')).digest()) - - def on_address_status(self, response): - if self.wallet.synchronizer is None and self.initialized: - return # we have been killed, this was just an orphan callback - params, result = self.parse_response(response) - if not params: - return - addr = params[0] + return (not self.requested_addrs and not self.requested_histories) + + def add(self, addr): + self.requested_addrs.add(addr) + self.add_queue.put_nowait(addr) + + async def on_address_status(self, addr, status): history = self.wallet.history.get(addr, []) - if self.get_status(history) != result: - # note that at this point 'result' can be None; - # if we had a history for addr but now the server is telling us - # there is no history - if addr not in self.requested_histories: - self.requested_histories[addr] = result - self.network.request_address_history(addr, self.on_address_history) - # remove addr from list only after it is added to requested_histories - if addr in self.requested_addrs: # Notifications won't be in - self.requested_addrs.remove(addr) - - def on_address_history(self, response): - if self.wallet.synchronizer is None and self.initialized: - return # we have been killed, this was just an orphan callback - params, result = self.parse_response(response) - if not params: + if history_status(history) == status: return - addr = params[0] - try: - server_status = self.requested_histories[addr] - except KeyError: - # note: server_status can be None even if we asked for the history, - # so it is not sufficient to test that - self.print_error("receiving history (unsolicited)", addr, len(result)) + # note that at this point 'result' can be None; + # if we had a history for addr but now the server is telling us + # there is no history + if addr in self.requested_histories: return + # request address history + self.requested_histories[addr] = status + h = address_to_scripthash(addr) + result = await self.session.send_request("blockchain.scripthash.get_history", [h]) self.print_error("receiving history", addr, len(result)) hashes = set(map(lambda item: item['tx_hash'], result)) hist = list(map(lambda item: (item['tx_hash'], item['height']), result)) @@ -128,23 +112,44 @@ class Synchronizer(ThreadJob): if len(hashes) != len(result): self.print_error("error: server history has non-unique txids: %s"% addr) # Check that the status corresponds to what was announced - elif self.get_status(hist) != server_status: + elif history_status(hist) != status: self.print_error("error: status mismatch: %s" % addr) else: # Store received history self.wallet.receive_history_callback(addr, hist, tx_fees) # Request transactions we don't have - self.request_missing_txs(hist) + # "hist" is a list of [tx_hash, tx_height] lists + transaction_hashes = [] + for tx_hash, tx_height in hist: + if tx_hash in self.requested_tx: + continue + if tx_hash in self.wallet.transactions: + continue + transaction_hashes.append(tx_hash) + self.requested_tx[tx_hash] = tx_height + + for tx_hash in transaction_hashes: + await self.get_transaction(tx_hash) + # Remove request; this allows up_to_date to be True self.requested_histories.pop(addr) - def on_tx_response(self, response): - if self.wallet.synchronizer is None and self.initialized: - return # we have been killed, this was just an orphan callback - params, result = self.parse_response(response) - if not params: - return - tx_hash = params[0] + async def request_missing_txs(self, hist): + # "hist" is a list of [tx_hash, tx_height] lists + transaction_hashes = [] + for tx_hash, tx_height in hist: + if tx_hash in self.requested_tx: + continue + if tx_hash in self.wallet.transactions: + continue + transaction_hashes.append(tx_hash) + self.requested_tx[tx_hash] = tx_height + + for tx_hash in transaction_hashes: + await self.get_transaction(tx_hash) + + async def get_transaction(self, tx_hash): + result = await self.session.send_request('blockchain.transaction.get', [tx_hash]) tx = Transaction(result) try: tx.deserialize() @@ -160,54 +165,44 @@ class Synchronizer(ThreadJob): self.print_error("received tx %s height: %d bytes: %d" % (tx_hash, tx_height, len(tx.raw))) # callbacks - self.network.trigger_callback('new_transaction', tx) - if not self.requested_tx: - self.network.trigger_callback('updated') - - def request_missing_txs(self, hist): - # "hist" is a list of [tx_hash, tx_height] lists - transaction_hashes = [] - for tx_hash, tx_height in hist: - if tx_hash in self.requested_tx: - continue - if tx_hash in self.wallet.transactions: - continue - transaction_hashes.append(tx_hash) - self.requested_tx[tx_hash] = tx_height - - self.network.get_transactions(transaction_hashes, self.on_tx_response) - - def initialize(self): - '''Check the initial state of the wallet. Subscribe to all its - addresses, and request any transactions in its address history - we don't have. - ''' - for history in self.wallet.history.values(): - # Old electrum servers returned ['*'] when all history for - # the address was pruned. This no longer happens but may - # remain in old wallets. - if history == ['*']: - continue - self.request_missing_txs(history) - - if self.requested_tx: - self.print_error("missing tx", self.requested_tx) - self.subscribe_to_addresses(set(self.wallet.get_addresses())) - self.initialized = True - - def run(self): - '''Called from the network proxy thread main loop.''' - # 1. Create new addresses - self.wallet.synchronize() - - # 2. Subscribe to new addresses - with self.lock: - addresses = self.new_addresses - self.new_addresses = set() - self.subscribe_to_addresses(addresses) - - # 3. Detect if situation has changed - up_to_date = self.is_up_to_date() - if up_to_date != self.wallet.is_up_to_date(): - self.wallet.set_up_to_date(up_to_date) - self.network.trigger_callback('updated') + self.wallet.network.trigger_callback('new_transaction', tx) + + async def subscribe_to_address(self, addr): + h = address_to_scripthash(addr) + self.scripthash_to_address[h] = addr + status = await self.session.send_request('blockchain.scripthash.subscribe', [h]) + await self.status_queue.put((h, status)) + self.requested_addrs.remove(addr) + + @aiosafe + async def send_subscriptions(self): + async with TaskGroup() as group: + while True: + addr = await self.add_queue.get() + await group.spawn(self.subscribe_to_address(addr)) + + @aiosafe + async def handle_status(self): + async with TaskGroup() as group: + while True: + h, status = await self.status_queue.get() + addr = self.scripthash_to_address[h] + await group.spawn(self.on_address_status(addr, status)) + + @aiosafe + async def main(self): + conn = self.wallet.network.default_server + host, port, protocol = conn.split(':') + sslc = ssl.SSLContext(ssl.PROTOCOL_TLS) if protocol == 's' else None + async with NotificationSession(self.status_queue, host, int(port), ssl=sslc) as session: + self.session = session + await self.send_version() + self.wallet.synchronizer = self + for addr in self.wallet.get_addresses(): self.add(addr) + while True: + await asyncio.sleep(1) + self.wallet.synchronize() + up_to_date = self.is_up_to_date() + if up_to_date != self.wallet.is_up_to_date(): + self.wallet.set_up_to_date(up_to_date) + self.wallet.network.trigger_callback('updated') diff --git a/run_electrum b/run_electrum index dd35c3516..3db004e22 100755 --- a/run_electrum +++ b/run_electrum @@ -48,6 +48,7 @@ def check_imports(): import qrcode import google.protobuf import jsonrpclib + import aiorpcx except ImportError as e: sys.exit("Error: %s. Try 'sudo pip install '"%str(e)) # the following imports are for pyinstaller