Browse Source

restructure synchronizer/verifier <--> interface coupling

3.3.3.1
SomberNight 7 years ago
parent
commit
26172686b8
No known key found for this signature in database GPG Key ID: B33B5F232C6271E9
  1. 26
      electrum/address_synchronizer.py
  2. 32
      electrum/interface.py
  3. 7
      electrum/network.py
  4. 27
      electrum/synchronizer.py
  5. 40
      electrum/verifier.py

26
electrum/address_synchronizer.py

@ -26,6 +26,8 @@ import asyncio
import itertools import itertools
from collections import defaultdict from collections import defaultdict
from aiorpcx import TaskGroup
from . import bitcoin from . import bitcoin
from .bitcoin import COINBASE_MATURITY, TYPE_ADDRESS, TYPE_PUBKEY from .bitcoin import COINBASE_MATURITY, TYPE_ADDRESS, TYPE_PUBKEY
from .util import PrintError, profiler, bfh, VerifiedTxInfo, TxMinedStatus from .util import PrintError, profiler, bfh, VerifiedTxInfo, TxMinedStatus
@ -59,6 +61,7 @@ class AddressSynchronizer(PrintError):
# verifier (SPV) and synchronizer are started in start_threads # verifier (SPV) and synchronizer are started in start_threads
self.synchronizer = None self.synchronizer = None
self.verifier = None self.verifier = None
self.sync_restart_lock = asyncio.Lock()
# locks: if you need to take multiple ones, acquire them in the order they are defined here! # locks: if you need to take multiple ones, acquire them in the order they are defined here!
self.lock = threading.RLock() self.lock = threading.RLock()
self.transaction_lock = threading.RLock() self.transaction_lock = threading.RLock()
@ -135,20 +138,15 @@ class AddressSynchronizer(PrintError):
# add it in case it was previously unconfirmed # add it in case it was previously unconfirmed
self.add_unverified_tx(tx_hash, tx_height) self.add_unverified_tx(tx_hash, tx_height)
def on_default_server_changed(self, evt): async def on_default_server_changed(self, evt):
for i in self.network.futures: async with self.sync_restart_lock:
if i.done() and i.exception(): interface = self.network.interface
raise i.exception() if interface is None:
if not i.done(): return # we should get called again soon
i.cancel() await interface.group.spawn(self.verifier.main(interface))
self.network.futures.clear() await interface.group.spawn(self.synchronizer.send_subscriptions(interface))
if self.network.interface is None: await interface.group.spawn(self.synchronizer.handle_status(interface))
return await interface.group.spawn(self.synchronizer.main())
# FIXME there are races here.. network.interface can become None
self.network.futures.append(asyncio.get_event_loop().create_task(self.verifier.main()))
self.network.futures.append(asyncio.get_event_loop().create_task(self.synchronizer.send_subscriptions()))
self.network.futures.append(asyncio.get_event_loop().create_task(self.synchronizer.handle_status()))
self.network.futures.append(asyncio.get_event_loop().create_task(self.synchronizer.main()))
def start_threads(self, network): def start_threads(self, network):
self.network = network self.network = network

32
electrum/interface.py

@ -32,7 +32,7 @@ import traceback
import aiorpcx import aiorpcx
import asyncio import asyncio
import concurrent.futures import concurrent.futures
from aiorpcx import ClientSession, Notification from aiorpcx import ClientSession, Notification, TaskGroup
import requests import requests
@ -82,11 +82,15 @@ class Interface(PrintError):
self.port = int(self.port) self.port = int(self.port)
self.config_path = config_path self.config_path = config_path
self.cert_path = os.path.join(self.config_path, 'certs', self.host) self.cert_path = os.path.join(self.config_path, 'certs', self.host)
self.fut = asyncio.get_event_loop().create_task(self.run())
self.tip_header = None self.tip_header = None
self.tip = 0 self.tip = 0
self.blockchain = None self.blockchain = None
self.network = network self.network = network
# TODO combine?
self.fut = asyncio.get_event_loop().create_task(self.run())
self.group = TaskGroup()
if proxy: if proxy:
username, pw = proxy.get('user'), proxy.get('password') username, pw = proxy.get('user'), proxy.get('password')
if not username or not pw: if not username or not pw:
@ -231,18 +235,24 @@ class Interface(PrintError):
self.tip = subscription_res['height'] self.tip = subscription_res['height']
self.mark_ready() self.mark_ready()
copy_header_queue = asyncio.Queue() copy_header_queue = asyncio.Queue()
block_retriever = asyncio.get_event_loop().create_task(self.run_fetch_blocks(subscription_res, copy_header_queue)) async with self.group as group:
while True: await group.spawn(self.run_fetch_blocks(subscription_res, copy_header_queue))
try: await group.spawn(self.subscribe_to_headers(header_queue, copy_header_queue))
new_header = await asyncio.wait_for(header_queue.get(), 300) # NOTE: group.__aexit__ will be called here; this is needed to notice exceptions in the group!
self.tip_header = new_header
self.tip = new_header['block_height'] async def subscribe_to_headers(self, header_queue, copy_header_queue):
await copy_header_queue.put(new_header) while True:
except concurrent.futures.TimeoutError: try:
await asyncio.wait_for(session.send_request('server.ping'), 5) new_header = await asyncio.wait_for(header_queue.get(), 300)
self.tip_header = new_header
self.tip = new_header['block_height']
await copy_header_queue.put(new_header)
except concurrent.futures.TimeoutError:
await asyncio.wait_for(self.session.send_request('server.ping'), 5)
def close(self): def close(self):
self.fut.cancel() self.fut.cancel()
asyncio.get_event_loop().create_task(self.group.cancel_remaining())
@aiosafe @aiosafe
async def run_fetch_blocks(self, sub_reply, replies): async def run_fetch_blocks(self, sub_reply, replies):

7
electrum/network.py

@ -243,7 +243,6 @@ class Network(PrintError):
self.start_network(deserialize_server(self.default_server)[2], self.start_network(deserialize_server(self.default_server)[2],
deserialize_proxy(self.config.get('proxy'))) deserialize_proxy(self.config.get('proxy')))
self.asyncio_loop = asyncio.get_event_loop() self.asyncio_loop = asyncio.get_event_loop()
self.futures = []
self.server_info_job = asyncio.Future() self.server_info_job = asyncio.Future()
# just to not trigger a warning from switch_to_interface the first time we change default_server # just to not trigger a warning from switch_to_interface the first time we change default_server
self.server_info_job.set_result(1) self.server_info_job.set_result(1)
@ -278,7 +277,11 @@ class Network(PrintError):
def trigger_callback(self, event, *args): def trigger_callback(self, event, *args):
with self.callback_lock: with self.callback_lock:
callbacks = self.callbacks[event][:] callbacks = self.callbacks[event][:]
[callback(event, *args) for callback in callbacks] for callback in callbacks:
if asyncio.iscoroutinefunction(callback):
asyncio.run_coroutine_threadsafe(callback(event, *args), self.asyncio_loop)
else:
callback(event, *args)
def read_recent_servers(self): def read_recent_servers(self):
if not self.config.path: if not self.config.path:

27
electrum/synchronizer.py

@ -159,20 +159,16 @@ class Synchronizer(PrintError):
await self.status_queue.put((h, status)) await self.status_queue.put((h, status))
self.requested_addrs.remove(addr) self.requested_addrs.remove(addr)
@aiosafe async def send_subscriptions(self, interface):
async def send_subscriptions(self): while True:
async with TaskGroup() as group: addr = await self.add_queue.get()
while True: await interface.group.spawn(self.subscribe_to_address(addr))
addr = await self.add_queue.get()
await group.spawn(self.subscribe_to_address(addr)) async def handle_status(self, interface):
while True:
@aiosafe h, status = await self.status_queue.get()
async def handle_status(self): addr = self.scripthash_to_address[h]
async with TaskGroup() as group: await interface.group.spawn(self.on_address_status(addr, status))
while True:
h, status = await self.status_queue.get()
addr = self.scripthash_to_address[h]
await group.spawn(self.on_address_status(addr, status))
@property @property
def session(self): def session(self):
@ -180,11 +176,10 @@ class Synchronizer(PrintError):
assert s is not None assert s is not None
return s return s
@aiosafe
async def main(self): async def main(self):
for addr in self.wallet.get_addresses(): self.add(addr) for addr in self.wallet.get_addresses(): self.add(addr)
while True: while True:
await asyncio.sleep(1) await asyncio.sleep(0.1)
self.wallet.synchronize() self.wallet.synchronize()
up_to_date = self.is_up_to_date() up_to_date = self.is_up_to_date()
if up_to_date != self.wallet.is_up_to_date(): if up_to_date != self.wallet.is_up_to_date():

40
electrum/verifier.py

@ -48,13 +48,12 @@ class SPV(ThreadJob):
self.merkle_roots = {} # txid -> merkle root (once it has been verified) self.merkle_roots = {} # txid -> merkle root (once it has been verified)
self.requested_merkle = set() # txid set of pending requests self.requested_merkle = set() # txid set of pending requests
@aiosafe async def main(self, interface):
async def main(self):
while True: while True:
await self._request_proofs() await self._request_proofs(interface)
await asyncio.sleep(1) await asyncio.sleep(0.1)
async def _request_proofs(self): async def _request_proofs(self, interface):
blockchain = self.network.blockchain() blockchain = self.network.blockchain()
if not blockchain: if not blockchain:
self.print_error("no blockchain") self.print_error("no blockchain")
@ -63,22 +62,21 @@ class SPV(ThreadJob):
local_height = self.network.get_local_height() local_height = self.network.get_local_height()
unverified = self.wallet.get_unverified_txs() unverified = self.wallet.get_unverified_txs()
async with TaskGroup() as group: for tx_hash, tx_height in unverified.items():
for tx_hash, tx_height in unverified.items(): # do not request merkle branch before headers are available
# do not request merkle branch before headers are available if tx_height <= 0 or tx_height > local_height:
if tx_height <= 0 or tx_height > local_height: continue
continue
header = blockchain.read_header(tx_height)
header = blockchain.read_header(tx_height) if header is None:
if header is None: index = tx_height // 2016
index = tx_height // 2016 if index < len(blockchain.checkpoints):
if index < len(blockchain.checkpoints): await interface.group.spawn(self.network.request_chunk, tx_height, None)
await group.spawn(self.network.request_chunk, tx_height, None) elif (tx_hash not in self.requested_merkle
elif (tx_hash not in self.requested_merkle and tx_hash not in self.merkle_roots):
and tx_hash not in self.merkle_roots): self.print_error('requested merkle', tx_hash)
self.print_error('requested merkle', tx_hash) self.requested_merkle.add(tx_hash)
self.requested_merkle.add(tx_hash) await interface.group.spawn(self._request_and_verify_single_proof, tx_hash, tx_height)
await group.spawn(self._request_and_verify_single_proof, tx_hash, tx_height)
if self.network.blockchain() != self.blockchain: if self.network.blockchain() != self.blockchain:
self.blockchain = self.network.blockchain() self.blockchain = self.network.blockchain()

Loading…
Cancel
Save