From 26172686b87780ab5341b86d62fb97d1a74e6a08 Mon Sep 17 00:00:00 2001 From: SomberNight Date: Fri, 7 Sep 2018 19:34:28 +0200 Subject: [PATCH] restructure synchronizer/verifier <--> interface coupling --- electrum/address_synchronizer.py | 26 ++++++++++----------- electrum/interface.py | 32 ++++++++++++++++--------- electrum/network.py | 7 ++++-- electrum/synchronizer.py | 27 +++++++++------------ electrum/verifier.py | 40 +++++++++++++++----------------- 5 files changed, 68 insertions(+), 64 deletions(-) diff --git a/electrum/address_synchronizer.py b/electrum/address_synchronizer.py index 65a754026..80708b18d 100644 --- a/electrum/address_synchronizer.py +++ b/electrum/address_synchronizer.py @@ -26,6 +26,8 @@ import asyncio import itertools from collections import defaultdict +from aiorpcx import TaskGroup + from . import bitcoin from .bitcoin import COINBASE_MATURITY, TYPE_ADDRESS, TYPE_PUBKEY from .util import PrintError, profiler, bfh, VerifiedTxInfo, TxMinedStatus @@ -59,6 +61,7 @@ class AddressSynchronizer(PrintError): # verifier (SPV) and synchronizer are started in start_threads self.synchronizer = None self.verifier = None + self.sync_restart_lock = asyncio.Lock() # locks: if you need to take multiple ones, acquire them in the order they are defined here! self.lock = threading.RLock() self.transaction_lock = threading.RLock() @@ -135,20 +138,15 @@ class AddressSynchronizer(PrintError): # add it in case it was previously unconfirmed self.add_unverified_tx(tx_hash, tx_height) - def on_default_server_changed(self, evt): - for i in self.network.futures: - if i.done() and i.exception(): - raise i.exception() - if not i.done(): - i.cancel() - self.network.futures.clear() - if self.network.interface is None: - return - # FIXME there are races here.. network.interface can become None - self.network.futures.append(asyncio.get_event_loop().create_task(self.verifier.main())) - self.network.futures.append(asyncio.get_event_loop().create_task(self.synchronizer.send_subscriptions())) - self.network.futures.append(asyncio.get_event_loop().create_task(self.synchronizer.handle_status())) - self.network.futures.append(asyncio.get_event_loop().create_task(self.synchronizer.main())) + async def on_default_server_changed(self, evt): + async with self.sync_restart_lock: + interface = self.network.interface + if interface is None: + return # we should get called again soon + await interface.group.spawn(self.verifier.main(interface)) + await interface.group.spawn(self.synchronizer.send_subscriptions(interface)) + await interface.group.spawn(self.synchronizer.handle_status(interface)) + await interface.group.spawn(self.synchronizer.main()) def start_threads(self, network): self.network = network diff --git a/electrum/interface.py b/electrum/interface.py index d0e63f2f9..517253abd 100644 --- a/electrum/interface.py +++ b/electrum/interface.py @@ -32,7 +32,7 @@ import traceback import aiorpcx import asyncio import concurrent.futures -from aiorpcx import ClientSession, Notification +from aiorpcx import ClientSession, Notification, TaskGroup import requests @@ -82,11 +82,15 @@ class Interface(PrintError): self.port = int(self.port) self.config_path = config_path self.cert_path = os.path.join(self.config_path, 'certs', self.host) - self.fut = asyncio.get_event_loop().create_task(self.run()) self.tip_header = None self.tip = 0 self.blockchain = None self.network = network + + # TODO combine? + self.fut = asyncio.get_event_loop().create_task(self.run()) + self.group = TaskGroup() + if proxy: username, pw = proxy.get('user'), proxy.get('password') if not username or not pw: @@ -231,18 +235,24 @@ class Interface(PrintError): self.tip = subscription_res['height'] self.mark_ready() copy_header_queue = asyncio.Queue() - block_retriever = asyncio.get_event_loop().create_task(self.run_fetch_blocks(subscription_res, copy_header_queue)) - while True: - try: - new_header = await asyncio.wait_for(header_queue.get(), 300) - self.tip_header = new_header - self.tip = new_header['block_height'] - await copy_header_queue.put(new_header) - except concurrent.futures.TimeoutError: - await asyncio.wait_for(session.send_request('server.ping'), 5) + async with self.group as group: + await group.spawn(self.run_fetch_blocks(subscription_res, copy_header_queue)) + await group.spawn(self.subscribe_to_headers(header_queue, copy_header_queue)) + # NOTE: group.__aexit__ will be called here; this is needed to notice exceptions in the group! + + async def subscribe_to_headers(self, header_queue, copy_header_queue): + while True: + try: + new_header = await asyncio.wait_for(header_queue.get(), 300) + self.tip_header = new_header + self.tip = new_header['block_height'] + await copy_header_queue.put(new_header) + except concurrent.futures.TimeoutError: + await asyncio.wait_for(self.session.send_request('server.ping'), 5) def close(self): self.fut.cancel() + asyncio.get_event_loop().create_task(self.group.cancel_remaining()) @aiosafe async def run_fetch_blocks(self, sub_reply, replies): diff --git a/electrum/network.py b/electrum/network.py index 85fe82139..8cf94273d 100644 --- a/electrum/network.py +++ b/electrum/network.py @@ -243,7 +243,6 @@ class Network(PrintError): self.start_network(deserialize_server(self.default_server)[2], deserialize_proxy(self.config.get('proxy'))) self.asyncio_loop = asyncio.get_event_loop() - self.futures = [] self.server_info_job = asyncio.Future() # just to not trigger a warning from switch_to_interface the first time we change default_server self.server_info_job.set_result(1) @@ -278,7 +277,11 @@ class Network(PrintError): def trigger_callback(self, event, *args): with self.callback_lock: callbacks = self.callbacks[event][:] - [callback(event, *args) for callback in callbacks] + for callback in callbacks: + if asyncio.iscoroutinefunction(callback): + asyncio.run_coroutine_threadsafe(callback(event, *args), self.asyncio_loop) + else: + callback(event, *args) def read_recent_servers(self): if not self.config.path: diff --git a/electrum/synchronizer.py b/electrum/synchronizer.py index 3fe16a251..841303124 100644 --- a/electrum/synchronizer.py +++ b/electrum/synchronizer.py @@ -159,20 +159,16 @@ class Synchronizer(PrintError): await self.status_queue.put((h, status)) self.requested_addrs.remove(addr) - @aiosafe - async def send_subscriptions(self): - async with TaskGroup() as group: - while True: - addr = await self.add_queue.get() - await group.spawn(self.subscribe_to_address(addr)) - - @aiosafe - async def handle_status(self): - async with TaskGroup() as group: - while True: - h, status = await self.status_queue.get() - addr = self.scripthash_to_address[h] - await group.spawn(self.on_address_status(addr, status)) + async def send_subscriptions(self, interface): + while True: + addr = await self.add_queue.get() + await interface.group.spawn(self.subscribe_to_address(addr)) + + async def handle_status(self, interface): + while True: + h, status = await self.status_queue.get() + addr = self.scripthash_to_address[h] + await interface.group.spawn(self.on_address_status(addr, status)) @property def session(self): @@ -180,11 +176,10 @@ class Synchronizer(PrintError): assert s is not None return s - @aiosafe async def main(self): for addr in self.wallet.get_addresses(): self.add(addr) while True: - await asyncio.sleep(1) + await asyncio.sleep(0.1) self.wallet.synchronize() up_to_date = self.is_up_to_date() if up_to_date != self.wallet.is_up_to_date(): diff --git a/electrum/verifier.py b/electrum/verifier.py index 39ecf82af..f1bc1aad6 100644 --- a/electrum/verifier.py +++ b/electrum/verifier.py @@ -48,13 +48,12 @@ class SPV(ThreadJob): self.merkle_roots = {} # txid -> merkle root (once it has been verified) self.requested_merkle = set() # txid set of pending requests - @aiosafe - async def main(self): + async def main(self, interface): while True: - await self._request_proofs() - await asyncio.sleep(1) + await self._request_proofs(interface) + await asyncio.sleep(0.1) - async def _request_proofs(self): + async def _request_proofs(self, interface): blockchain = self.network.blockchain() if not blockchain: self.print_error("no blockchain") @@ -63,22 +62,21 @@ class SPV(ThreadJob): local_height = self.network.get_local_height() unverified = self.wallet.get_unverified_txs() - async with TaskGroup() as group: - for tx_hash, tx_height in unverified.items(): - # do not request merkle branch before headers are available - if tx_height <= 0 or tx_height > local_height: - continue - - header = blockchain.read_header(tx_height) - if header is None: - index = tx_height // 2016 - if index < len(blockchain.checkpoints): - await group.spawn(self.network.request_chunk, tx_height, None) - elif (tx_hash not in self.requested_merkle - and tx_hash not in self.merkle_roots): - self.print_error('requested merkle', tx_hash) - self.requested_merkle.add(tx_hash) - await group.spawn(self._request_and_verify_single_proof, tx_hash, tx_height) + for tx_hash, tx_height in unverified.items(): + # do not request merkle branch before headers are available + if tx_height <= 0 or tx_height > local_height: + continue + + header = blockchain.read_header(tx_height) + if header is None: + index = tx_height // 2016 + if index < len(blockchain.checkpoints): + await interface.group.spawn(self.network.request_chunk, tx_height, None) + elif (tx_hash not in self.requested_merkle + and tx_hash not in self.merkle_roots): + self.print_error('requested merkle', tx_hash) + self.requested_merkle.add(tx_hash) + await interface.group.spawn(self._request_and_verify_single_proof, tx_hash, tx_height) if self.network.blockchain() != self.blockchain: self.blockchain = self.network.blockchain()