From 8f36c9167de1bfd2505ef5516cd43661962b9c27 Mon Sep 17 00:00:00 2001 From: Janus Date: Wed, 29 Aug 2018 18:41:51 +0200 Subject: [PATCH] aiorpcx: remove callback based code, add session to Interface --- electrum/address_synchronizer.py | 18 +- electrum/gui/kivy/main_window.py | 5 +- electrum/gui/qt/network_dialog.py | 4 +- electrum/interface.py | 41 +- electrum/network.py | 607 ++---------------------------- electrum/verifier.py | 36 +- electrum/version.py | 2 +- 7 files changed, 102 insertions(+), 611 deletions(-) diff --git a/electrum/address_synchronizer.py b/electrum/address_synchronizer.py index ed9f190e3..07b0f9588 100644 --- a/electrum/address_synchronizer.py +++ b/electrum/address_synchronizer.py @@ -135,15 +135,25 @@ class AddressSynchronizer(PrintError): # add it in case it was previously unconfirmed self.add_unverified_tx(tx_hash, tx_height) + def on_default_server_changed(self, evt): + for i in self.network.futures: + if i.done() and i.exception(): + raise i.exception() + if not i.done(): + i.cancel() + self.network.futures.clear() + self.network.futures.append(asyncio.get_event_loop().create_task(self.verifier.main())) + self.network.futures.append(asyncio.get_event_loop().create_task(self.synchronizer.send_subscriptions())) + self.network.futures.append(asyncio.get_event_loop().create_task(self.synchronizer.handle_status())) + assert self.network.interface.session is not None + self.network.futures.append(asyncio.get_event_loop().create_task(self.synchronizer.main())) + def start_threads(self, network): self.network = network if self.network is not None: self.verifier = SPV(self.network, self) self.synchronizer = Synchronizer(self) - #network.add_jobs([self.verifier]) - self.network.futures.append(asyncio.run_coroutine_threadsafe(self.synchronizer.send_subscriptions(), self.network.asyncio_loop)) - self.network.futures.append(asyncio.run_coroutine_threadsafe(self.synchronizer.handle_status(), self.network.asyncio_loop)) - self.network.futures.append(asyncio.run_coroutine_threadsafe(self.synchronizer.main(), self.network.asyncio_loop)) + self.network.register_callback(self.on_default_server_changed, ['default_server_changed']) else: self.verifier = None self.synchronizer = None diff --git a/electrum/gui/kivy/main_window.py b/electrum/gui/kivy/main_window.py index b5468bb7f..2ea871e22 100644 --- a/electrum/gui/kivy/main_window.py +++ b/electrum/gui/kivy/main_window.py @@ -15,6 +15,7 @@ from electrum.util import profiler, InvalidPassword from electrum.plugin import run_hook from electrum.util import format_satoshis, format_satoshis_plain from electrum.paymentrequest import PR_UNPAID, PR_PAID, PR_UNKNOWN, PR_EXPIRED +from electrum import blockchain from .i18n import _ from kivy.app import App @@ -114,10 +115,10 @@ class ElectrumWindow(App): from .uix.dialogs.choice_dialog import ChoiceDialog chains = self.network.get_blockchains() def cb(name): - for index, b in self.network.blockchains.items(): + for index, b in blockchain.blockchains.items(): if name == b.get_name(): self.network.follow_chain(index) - names = [self.network.blockchains[b].get_name() for b in chains] + names = [blockchain.blockchains[b].get_name() for b in chains] if len(names) > 1: cur_chain = self.network.blockchain().get_name() ChoiceDialog(_('Choose your chain'), names, cur_chain, cb).open() diff --git a/electrum/gui/qt/network_dialog.py b/electrum/gui/qt/network_dialog.py index 0acdbcd25..268e01b7f 100644 --- a/electrum/gui/qt/network_dialog.py +++ b/electrum/gui/qt/network_dialog.py @@ -31,7 +31,7 @@ from PyQt5.QtWidgets import * import PyQt5.QtCore as QtCore from electrum.i18n import _ -from electrum import constants +from electrum import constants, blockchain from electrum.util import print_error from electrum.network import serialize_server, deserialize_server @@ -103,7 +103,7 @@ class NodesListWidget(QTreeWidget): chains = network.get_blockchains() n_chains = len(chains) for k, items in chains.items(): - b = network.blockchains[k] + b = blockchain.blockchains[k] name = b.get_name() if n_chains >1: x = QTreeWidgetItem([name + '@%d'%b.get_forkpoint(), '%d'%b.height()]) diff --git a/electrum/interface.py b/electrum/interface.py index 5dd9c60e3..5ed8aa714 100644 --- a/electrum/interface.py +++ b/electrum/interface.py @@ -31,6 +31,7 @@ import threading import traceback import aiorpcx import asyncio +import concurrent.futures import requests @@ -43,18 +44,23 @@ from . import x509 from . import pem from .version import ELECTRUM_VERSION, PROTOCOL_VERSION from .util import NotificationSession +from . import blockchain class Interface(PrintError): - def __init__(self, server, config_path, connecting, proxy): + def __init__(self, network, server, config_path, proxy): self.exception = None - self.connecting = connecting + self.ready = asyncio.Future() self.server = server self.host, self.port, self.protocol = self.server.split(':') self.port = int(self.port) self.config_path = config_path self.cert_path = os.path.join(self.config_path, 'certs', self.host) self.fut = asyncio.get_event_loop().create_task(self.run()) + self.tip_header = None + self.tip = 0 + self.blockchain = None + self.network = network if proxy: proxy['user'] = proxy.get('user', '') if proxy['user'] == '': @@ -71,7 +77,7 @@ class Interface(PrintError): elif proxy['mode'] == "socks5": self.proxy = aiorpcx.socks.SOCKSProxy((proxy['host'], int(proxy['port'])), aiorpcx.socks.SOCKS5, auth) else: - raise NotImplementedError + raise NotImplementedError # http proxy not available with aiorpcx else: self.proxy = None @@ -128,7 +134,17 @@ class Interface(PrintError): assert False def mark_ready(self): - self.connecting.remove(self.server) + assert self.tip_header + chain = blockchain.check_header(self.tip_header) + if not chain: + self.blockchain = blockchain.blockchains[0] + else: + self.blockchain = chain + + self.print_error("set blockchain with height", self.blockchain.height()) + + if not self.ready.done(): + self.ready.set_result(1) async def save_certificate(self): if not os.path.exists(self.cert_path): @@ -161,16 +177,25 @@ class Interface(PrintError): return None async def open_session(self, sslc, do_sleep=True, execute_after_connect=lambda: None): - async with NotificationSession(None, None, self.host, self.port, ssl=sslc, proxy=self.proxy) as session: + q = asyncio.Queue() + async with NotificationSession(None, q, self.host, self.port, ssl=sslc, proxy=self.proxy) as session: ver = await session.send_request('server.version', [ELECTRUM_VERSION, PROTOCOL_VERSION]) - print(ver) + self.print_error(ver, do_sleep, self.host) connect_hook_executed = False while do_sleep: if not connect_hook_executed: connect_hook_executed = True + res = await session.send_request('blockchain.headers.subscribe') + self.tip_header = blockchain.deserialize_header(bfh(res['hex']), res['height']) + self.tip = res['height'] execute_after_connect() - await asyncio.wait_for(session.send_request('server.ping'), 5) - await asyncio.sleep(300) + self.session = session + try: + new_header = await asyncio.wait_for(q.get(), 300) + self.tip_header = new_header + self.tip = new_header['block_height'] + except concurrent.futures.TimeoutError: + await asyncio.wait_for(session.send_request('server.ping'), 5) def queue_request(self, method, params, msg_id): pass diff --git a/electrum/network.py b/electrum/network.py index e00a9c057..4509b6382 100644 --- a/electrum/network.py +++ b/electrum/network.py @@ -20,14 +20,12 @@ # ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN # CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. -import asyncio +import concurrent.futures import time import queue import os -import errno import random import re -import select from collections import defaultdict import threading import socket @@ -39,18 +37,15 @@ import dns import dns.resolver from . import util -from .util import print_error, PrintError -from . import bitcoin +from .util import PrintError, print_error, bfh from .bitcoin import COIN from . import constants from . import blockchain -from .version import ELECTRUM_VERSION, PROTOCOL_VERSION -from .i18n import _ -from .blockchain import InvalidHeader from .interface import Interface import asyncio import concurrent.futures +from .version import PROTOCOL_VERSION NODES_RETRY_INTERVAL = 60 SERVER_RETRY_INTERVAL = 10 @@ -163,7 +158,6 @@ def deserialize_server(server_str): def serialize_server(host, port, protocol): return str(':'.join([host, port, protocol])) - class Network(PrintError): """The Network class manages a set of connections to remote electrum servers, each connected socket is handled by an Interface() object. @@ -183,10 +177,10 @@ class Network(PrintError): config = {} # Do not use mutables as default values! self.config = SimpleConfig(config) if isinstance(config, dict) else config self.num_server = 10 if not self.config.get('oneserver') else 0 - self.blockchains = blockchain.read_blockchains(self.config) # note: needs self.blockchains_lock - self.print_error("blockchains", self.blockchains.keys()) + blockchain.blockchains = blockchain.read_blockchains(self.config) # note: needs self.blockchains_lock + self.print_error("blockchains", list(blockchain.blockchains.keys())) self.blockchain_index = config.get('blockchain_index', 0) - if self.blockchain_index not in self.blockchains.keys(): + if self.blockchain_index not in blockchain.blockchains.keys(): self.blockchain_index = 0 # Server for addresses and transactions self.default_server = self.config.get('server', None) @@ -201,6 +195,7 @@ class Network(PrintError): self.default_server = pick_random_server() # locks: if you need to take multiple ones, acquire them in the order they are defined here! + self.bhi_lock = asyncio.Lock() self.interface_lock = threading.RLock() # <- re-entrant self.callback_lock = threading.Lock() self.pending_sends_lock = threading.Lock() @@ -322,29 +317,6 @@ class Network(PrintError): def is_connecting(self): return self.connection_status == 'connecting' - @with_interface_lock - def queue_request(self, method, params, interface=None): - # If you want to queue a request on any interface it must go - # through this function so message ids are properly tracked - if interface is None: - interface = self.interface - if interface is None: - self.print_error('warning: dropping request', method, params) - return - message_id = self.message_id - self.message_id += 1 - if self.debug: - self.print_error(interface.host, "-->", method, params, message_id) - interface.queue_request(method, params, message_id) - return message_id - - def request_fee_estimates(self): - from .simple_config import FEE_ETA_TARGETS - self.config.requested_fee_estimates() - self.queue_request('mempool.get_fee_histogram', []) - for i in FEE_ETA_TARGETS: - self.queue_request('blockchain.estimatefee', [i]) - def get_status_value(self, key): if key == 'status': value = self.connection_status @@ -415,11 +387,6 @@ class Network(PrintError): self.start_interface(server) return server - def start_interfaces(self): - self.start_interface(self.default_server) - for i in range(self.num_server - 1): - self.start_random_interface() - def set_proxy(self, proxy): self.proxy = proxy # Store these somewhere so we can un-monkey-patch @@ -475,7 +442,6 @@ class Network(PrintError): self.disconnected_servers = set([]) # note: needs self.interface_lock self.protocol = protocol self.set_proxy(proxy) - self.start_interfaces() @with_interface_lock def stop_network(self): @@ -558,6 +524,7 @@ class Network(PrintError): # fixme: we don't want to close headers sub #self.close_interface(self.interface) self.interface = i + self.trigger_callback('default_server_changed') self.set_status('connected') self.notify('updated') self.notify('interfaces') @@ -638,91 +605,6 @@ class Network(PrintError): """ hashable index for subscriptions and cache""" return str(method) + (':' + str(params[0]) if params else '') - def process_responses(self, interface): - responses = interface.get_responses() - for request, response in responses: - if request: - method, params, message_id = request - k = self.get_index(method, params) - # client requests go through self.send() with a - # callback, are only sent to the current interface, - # and are placed in the unanswered_requests dictionary - client_req = self.unanswered_requests.pop(message_id, None) - if client_req: - if interface != self.interface: - # we probably changed the current interface - # in the meantime; drop this. - return - callbacks = [client_req[2]] - else: - # fixme: will only work for subscriptions - k = self.get_index(method, params) - callbacks = list(self.subscriptions.get(k, [])) - - # Copy the request method and params to the response - response['method'] = method - response['params'] = params - else: - if not response: # Closed remotely / misbehaving - self.connection_down(interface.server) - break - # Rewrite response shape to match subscription request response - method = response.get('method') - params = response.get('params') - k = self.get_index(method, params) - if method == 'blockchain.headers.subscribe': - response['result'] = params[0] - response['params'] = [] - elif method == 'blockchain.scripthash.subscribe': - response['params'] = [params[0]] # addr - response['result'] = params[1] - callbacks = list(self.subscriptions.get(k, [])) - - # update cache if it's a subscription - if method.endswith('.subscribe'): - with self.interface_lock: - self.sub_cache[k] = response - # Response is now in canonical form - self.process_response(interface, response, callbacks) - - def send(self, messages, callback): - '''Messages is a list of (method, params) tuples''' - messages = list(messages) - with self.pending_sends_lock: - self.pending_sends.append((messages, callback)) - - @with_interface_lock - def process_pending_sends(self): - # Requests needs connectivity. If we don't have an interface, - # we cannot process them. - if not self.interface: - return - - with self.pending_sends_lock: - sends = self.pending_sends - self.pending_sends = [] - - for messages, callback in sends: - for method, params in messages: - r = None - if method.endswith('.subscribe'): - k = self.get_index(method, params) - # add callback to list - l = list(self.subscriptions.get(k, [])) - if callback not in l: - l.append(callback) - with self.callback_lock: - self.subscriptions[k] = l - # check cached response for subscriptions - r = self.sub_cache.get(k) - - if r is not None: - self.print_error("cache hit", k) - callback(r) - else: - message_id = self.queue_request(method, params) - self.unanswered_requests[message_id] = method, params, callback - def unsubscribe(self, callback): '''Unsubscribe a callback to free object references to enable GC.''' # Note: we can't unsubscribe from the server, so if we receive @@ -744,297 +626,37 @@ class Network(PrintError): self.close_interface(self.interfaces[server]) self.notify('interfaces') with self.blockchains_lock: - for b in self.blockchains.values(): + for b in blockchain.blockchains.values(): if b.catch_up == server: b.catch_up = None + @util.aiosafe async def new_interface(self, server): # todo: get tip first, then decide which checkpoint to use. self.add_recent_server(server) - interface = Interface(server, self.config.path, self.connecting, self.proxy) - interface.blockchain = None - interface.tip_header = None - interface.tip = 0 - interface.mode = 'default' - interface.request = None - with self.interface_lock: - self.interfaces[server] = interface - # server.version should be the first message - params = [ELECTRUM_VERSION, PROTOCOL_VERSION] - self.queue_request('server.version', params, interface) - self.queue_request('blockchain.headers.subscribe', [True], interface) - if server == self.default_server: - self.switch_to_interface(server) - #self.notify('interfaces') - - def maintain_sockets(self): - '''Socket maintenance.''' - # Responses to connection attempts? - while not self.socket_queue.empty(): - server = self.socket_queue.get() - if server in self.connecting: - self.connecting.remove(server) - - if socket: - self.new_interface(server) - else: - self.connection_down(server) - - # Send pings and shut down stale interfaces - # must use copy of values - with self.interface_lock: - interfaces = list(self.interfaces.values()) - for interface in interfaces: - if interface.has_timed_out(): - self.connection_down(interface.server) - elif interface.ping_required(): - self.queue_request('server.ping', [], interface) - - now = time.time() - # nodes - with self.interface_lock: - if len(self.interfaces) + len(self.connecting) < self.num_server: - self.start_random_interface() - if now - self.nodes_retry_time > NODES_RETRY_INTERVAL: - self.print_error('network: retrying connections') - self.disconnected_servers = set([]) - self.nodes_retry_time = now - - # main interface - with self.interface_lock: - if not self.is_connected(): - if self.auto_connect: - if not self.is_connecting(): - self.switch_to_random_interface() - else: - if self.default_server in self.disconnected_servers: - if now - self.server_retry_time > SERVER_RETRY_INTERVAL: - self.disconnected_servers.remove(self.default_server) - self.server_retry_time = now - else: - self.switch_to_interface(self.default_server) - else: - if self.config.is_fee_estimates_update_required(): - self.request_fee_estimates() - def request_chunk(self, interface, index): - if index in self.requested_chunks: - return - interface.print_error("requesting chunk %d" % index) - self.requested_chunks.add(index) - height = index * 2016 - self.queue_request('blockchain.block.headers', [height, 2016], - interface) - - def on_block_headers(self, interface, response): - '''Handle receiving a chunk of block headers''' - error = response.get('error') - result = response.get('result') - params = response.get('params') - blockchain = interface.blockchain - if result is None or params is None or error is not None: - interface.print_error(error or 'bad response') - return - # Ignore unsolicited chunks - height = params[0] - index = height // 2016 - if index * 2016 != height or index not in self.requested_chunks: - interface.print_error("received chunk %d (unsolicited)" % index) - return - else: - interface.print_error("received chunk %d" % index) - self.requested_chunks.remove(index) - hexdata = result['hex'] - connect = blockchain.connect_chunk(index, hexdata) - if not connect: - self.connection_down(interface.server) - return - if index >= len(blockchain.checkpoints): - # If not finished, get the next chunk - if blockchain.height() < interface.tip: - self.request_chunk(interface, index+1) - else: - interface.mode = 'default' - interface.print_error('catch up done', blockchain.height()) - blockchain.catch_up = None - else: - # the verifier must have asked for this chunk - pass - self.notify('updated') - - def on_get_header(self, interface, response): - '''Handle receiving a single block header''' - header = response.get('result') - if not header: - interface.print_error(response) - self.connection_down(interface.server) - return - height = header.get('block_height') - #interface.print_error('got header', height, blockchain.hash_header(header)) - if interface.request != height: - interface.print_error("unsolicited header",interface.request, height) + interface = Interface(self, server, self.config.path, self.proxy) + try: + await asyncio.wait_for(interface.ready, 5) + except BaseException as e: + import traceback + traceback.print_exc() + self.print_error(interface.server, "couldn't launch because", str(e), str(type(e))) self.connection_down(interface.server) return - chain = blockchain.check_header(header) - if interface.mode == 'backward': - can_connect = blockchain.can_connect(header) - if can_connect and can_connect.catch_up is None: - interface.mode = 'catch_up' - interface.blockchain = can_connect - interface.blockchain.save_header(header) - next_height = height + 1 - interface.blockchain.catch_up = interface.server - elif chain: - # FIXME should await "initial chunk download". - # binary search will NOT do the correct thing if we don't yet - # have all headers up to the fork height - interface.print_error("binary search") - interface.mode = 'binary' - interface.blockchain = chain - interface.good = height - next_height = (interface.bad + interface.good) // 2 - assert next_height >= self.max_checkpoint(), (interface.bad, interface.good) - else: - if height == 0: - self.connection_down(interface.server) - next_height = None - else: - interface.bad = height - interface.bad_header = header - delta = interface.tip - height - next_height = max(self.max_checkpoint(), interface.tip - 2 * delta) - if height == next_height: - self.connection_down(interface.server) - next_height = None - - elif interface.mode == 'binary': - if chain: - interface.good = height - interface.blockchain = chain - else: - interface.bad = height - interface.bad_header = header - if interface.bad != interface.good + 1: - next_height = (interface.bad + interface.good) // 2 - assert next_height >= self.max_checkpoint() - elif not interface.blockchain.can_connect(interface.bad_header, check_height=False): - self.connection_down(interface.server) - next_height = None - else: - branch = self.blockchains.get(interface.bad) - if branch is not None: - if branch.check_header(interface.bad_header): - interface.print_error('joining chain', interface.bad) - next_height = None - elif branch.parent().check_header(header): - interface.print_error('reorg', interface.bad, interface.tip) - interface.blockchain = branch.parent() - next_height = interface.bad - else: - interface.print_error('forkpoint conflicts with existing fork', branch.path()) - branch.write(b'', 0) - branch.save_header(interface.bad_header) - interface.mode = 'catch_up' - interface.blockchain = branch - next_height = interface.bad + 1 - interface.blockchain.catch_up = interface.server - else: - bh = interface.blockchain.height() - next_height = None - if bh > interface.good: - if not interface.blockchain.check_header(interface.bad_header): - b = interface.blockchain.fork(interface.bad_header) - with self.blockchains_lock: - self.blockchains[interface.bad] = b - interface.blockchain = b - interface.print_error("new chain", b.forkpoint) - interface.mode = 'catch_up' - maybe_next_height = interface.bad + 1 - if maybe_next_height <= interface.tip: - next_height = maybe_next_height - interface.blockchain.catch_up = interface.server - else: - assert bh == interface.good - if interface.blockchain.catch_up is None and bh < interface.tip: - interface.print_error("catching up from %d"% (bh + 1)) - interface.mode = 'catch_up' - next_height = bh + 1 - interface.blockchain.catch_up = interface.server - - self.notify('updated') - - elif interface.mode == 'catch_up': - can_connect = interface.blockchain.can_connect(header) - if can_connect: - interface.blockchain.save_header(header) - next_height = height + 1 if height < interface.tip else None - else: - # go back - interface.print_error("cannot connect", height) - interface.mode = 'backward' - interface.bad = height - interface.bad_header = header - next_height = height - 1 - - if next_height is None: - # exit catch_up state - interface.print_error('catch up done', interface.blockchain.height()) - interface.blockchain.catch_up = None - self.switch_lagging_interface() - self.notify('updated') - - else: - raise Exception(interface.mode) - # If not finished, get the next header - if next_height is not None: - if next_height < 0: - self.connection_down(interface.server) - next_height = None - elif interface.mode == 'catch_up' and interface.tip > next_height + 50: - self.request_chunk(interface, next_height // 2016) - else: - self.request_header(interface, next_height) - if next_height is None: - interface.mode = 'default' - interface.request = None - self.notify('updated') + finally: + self.connecting.remove(server) - # refresh network dialog - self.notify('interfaces') - - def maintain_requests(self): with self.interface_lock: - interfaces = list(self.interfaces.values()) - for interface in interfaces: - if interface.request and time.time() - interface.request_time > 20: - interface.print_error("blockchain request timed out") - self.connection_down(interface.server) - continue + self.interfaces[server] = interface - def wait_on_sockets(self): - # Python docs say Windows doesn't like empty selects. - # Sleep to prevent busy looping - if not self.interfaces: - time.sleep(0.1) - return - with self.interface_lock: - interfaces = list(self.interfaces.values()) - rin = [i for i in interfaces] - win = [i for i in interfaces if i.num_requests()] - try: - rout, wout, xout = select.select(rin, win, [], 0.1) - except socket.error as e: - if e.errno == errno.EINTR: - return - raise - assert not xout - for interface in wout: - interface.send_requests() - for interface in rout: - self.process_responses(interface) + if server == self.default_server: + self.switch_to_interface(server) + + #self.notify('interfaces') def init_headers_file(self): - b = self.blockchains[0] + b = blockchain.blockchains[0] filename = b.path() length = 80 * len(constants.net.CHECKPOINTS) * 2016 if not os.path.exists(filename) or os.path.getsize(filename) < length: @@ -1052,73 +674,18 @@ class Network(PrintError): self.asyncio_loop.run_until_complete(self.gat) except concurrent.futures.CancelledError: pass - [f.cancel() for f in self.futures] - - def on_notify_header(self, interface, header_dict): - try: - header_hex, height = header_dict['hex'], header_dict['height'] - except KeyError: - # no point in keeping this connection without headers sub - self.connection_down(interface.server) - return - try: - header = blockchain.deserialize_header(util.bfh(header_hex), height) - except InvalidHeader: - self.connection_down(interface.server) - return - #interface.print_error('notified of header', height, blockchain.hash_header(header)) - if height < self.max_checkpoint(): - self.connection_down(interface.server) - return - interface.tip_header = header - interface.tip = height - if interface.mode != 'default': - return - b = blockchain.check_header(header) - if b: - interface.blockchain = b - self.switch_lagging_interface() - self.notify('updated') - self.notify('interfaces') - return - b = blockchain.can_connect(header) - if b: - interface.blockchain = b - b.save_header(header) - self.switch_lagging_interface() - self.notify('updated') - self.notify('interfaces') - return - with self.blockchains_lock: - tip = max([x.height() for x in self.blockchains.values()]) - if tip >=0: - interface.mode = 'backward' - interface.bad = height - interface.bad_header = header - self.request_header(interface, min(tip +1, height - 1)) - else: - chain = self.blockchains[0] - if chain.catch_up is None: - chain.catch_up = interface - interface.mode = 'catch_up' - interface.blockchain = chain - with self.blockchains_lock: - self.print_error("switching to catchup mode", tip, self.blockchains) - self.request_header(interface, 0) - else: - self.print_error("chain already catching up with", chain.catch_up.server) @with_interface_lock def blockchain(self): if self.interface and self.interface.blockchain is not None: self.blockchain_index = self.interface.blockchain.forkpoint - return self.blockchains[self.blockchain_index] + return blockchain.blockchains[self.blockchain_index] @with_interface_lock def get_blockchains(self): out = {} with self.blockchains_lock: - blockchain_items = list(self.blockchains.items()) + blockchain_items = list(blockchain.blockchains.items()) for k, b in blockchain_items: r = list(filter(lambda i: i.blockchain==b, list(self.interfaces.values()))) if r: @@ -1126,14 +693,14 @@ class Network(PrintError): return out def follow_chain(self, index): - blockchain = self.blockchains.get(index) - if blockchain: + bc = blockchain.blockchains.get(index) + if bc: self.blockchain_index = index self.config.set_key('blockchain_index', index) with self.interface_lock: interfaces = list(self.interfaces.values()) for i in interfaces: - if i.blockchain == blockchain: + if i.blockchain == bc: self.switch_to_interface(i.server) break else: @@ -1149,119 +716,6 @@ class Network(PrintError): def get_local_height(self): return self.blockchain().height() - @staticmethod - def __wait_for(it): - """Wait for the result of calling lambda `it`.""" - q = queue.Queue() - it(q.put) - try: - result = q.get(block=True, timeout=30) - except queue.Empty: - raise util.TimeoutException(_('Server did not answer')) - - if result.get('error'): - raise Exception(result.get('error')) - - return result.get('result') - - @staticmethod - def __with_default_synchronous_callback(invocation, callback): - """ Use this method if you want to make the network request - synchronous. """ - if not callback: - return Network.__wait_for(invocation) - - invocation(callback) - - def request_header(self, interface, height): - self.queue_request('blockchain.block.get_header', [height], interface) - interface.request = height - interface.req_time = time.time() - - def map_scripthash_to_address(self, callback): - def cb2(x): - x2 = x.copy() - p = x2.pop('params') - addr = self.h2addr[p[0]] - x2['params'] = [addr] - callback(x2) - return cb2 - - # NOTE this method handles exceptions and a special edge case, counter to - # what the other ElectrumX methods do. This is unexpected. - def broadcast_transaction(self, transaction, callback=None): - command = 'blockchain.transaction.broadcast' - invocation = lambda c: self.send([(command, [str(transaction)])], c) - - if callback: - invocation(callback) - return - - try: - out = Network.__wait_for(invocation) - except BaseException as e: - return False, "error: " + str(e) - - if out != transaction.txid(): - return False, "error: " + out - - return True, out - - def get_history_for_scripthash(self, hash, callback=None): - command = 'blockchain.scripthash.get_history' - invocation = lambda c: self.send([(command, [hash])], c) - - return Network.__with_default_synchronous_callback(invocation, callback) - - def subscribe_to_headers(self, callback=None): - command = 'blockchain.headers.subscribe' - invocation = lambda c: self.send([(command, [True])], c) - - return Network.__with_default_synchronous_callback(invocation, callback) - - def subscribe_to_address(self, address, callback=None): - command = 'blockchain.address.subscribe' - invocation = lambda c: self.send([(command, [address])], c) - - return Network.__with_default_synchronous_callback(invocation, callback) - - def get_merkle_for_transaction(self, tx_hash, tx_height, callback=None): - command = 'blockchain.transaction.get_merkle' - invocation = lambda c: self.send([(command, [tx_hash, tx_height])], c) - - return Network.__with_default_synchronous_callback(invocation, callback) - - def subscribe_to_scripthash(self, scripthash, callback=None): - command = 'blockchain.scripthash.subscribe' - invocation = lambda c: self.send([(command, [scripthash])], c) - - return Network.__with_default_synchronous_callback(invocation, callback) - - def get_transaction(self, transaction_hash, callback=None): - command = 'blockchain.transaction.get' - invocation = lambda c: self.send([(command, [transaction_hash])], c) - - return Network.__with_default_synchronous_callback(invocation, callback) - - def get_transactions(self, transaction_hashes, callback=None): - command = 'blockchain.transaction.get' - messages = [(command, [tx_hash]) for tx_hash in transaction_hashes] - invocation = lambda c: self.send(messages, c) - - return Network.__with_default_synchronous_callback(invocation, callback) - - def listunspent_for_scripthash(self, scripthash, callback=None): - command = 'blockchain.scripthash.listunspent' - invocation = lambda c: self.send([(command, [scripthash])], c) - - return Network.__with_default_synchronous_callback(invocation, callback) - - def get_balance_for_scripthash(self, scripthash, callback=None): - command = 'blockchain.scripthash.get_balance' - invocation = lambda c: self.send([(command, [scripthash])], c) - - return Network.__with_default_synchronous_callback(invocation, callback) - def export_checkpoints(self, path): # run manually from the console to generate checkpoints cp = self.blockchain().get_checkpoints() @@ -1310,3 +764,4 @@ class Network(PrintError): if changed: self.notify('updated') await asyncio.sleep(1) + diff --git a/electrum/verifier.py b/electrum/verifier.py index 4a0d82ec4..f64e7fa47 100644 --- a/electrum/verifier.py +++ b/electrum/verifier.py @@ -21,9 +21,10 @@ # CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. +import asyncio from typing import Sequence, Optional -from .util import ThreadJob, bh2u, VerifiedTxInfo +from .util import ThreadJob, bh2u, VerifiedTxInfo, aiosafe from .bitcoin import Hash, hash_decode, hash_encode from .transaction import Transaction from .blockchain import hash_header @@ -45,17 +46,21 @@ class SPV(ThreadJob): self.merkle_roots = {} # txid -> merkle root (once it has been verified) self.requested_merkle = set() # txid set of pending requests - def run(self): - interface = self.network.interface - if not interface: - return + @aiosafe + async def main(self): + while True: + await self.run() + await asyncio.sleep(1) - blockchain = interface.blockchain + async def run(self): + blockchain = self.network.blockchain() if not blockchain: + self.print_error("no blockchain") return local_height = self.network.get_local_height() unverified = self.wallet.get_unverified_txs() + #print("verifier run", len(unverified)) for tx_hash, tx_height in unverified.items(): # do not request merkle branch before headers are available if tx_height <= 0 or tx_height > local_height: @@ -65,31 +70,26 @@ class SPV(ThreadJob): if header is None: index = tx_height // 2016 if index < len(blockchain.checkpoints): - self.network.request_chunk(interface, index) + # FIXME disabled until async block header download has been merged + pass #await self.network.request_chunk(tx_height, None) elif (tx_hash not in self.requested_merkle and tx_hash not in self.merkle_roots): - self.network.get_merkle_for_transaction( - tx_hash, - tx_height, - self.verify_merkle) self.print_error('requested merkle', tx_hash) self.requested_merkle.add(tx_hash) + self.verify_merkle(tx_hash, await self.network.get_merkle_for_transaction( + tx_hash, + tx_height + )) if self.network.blockchain() != self.blockchain: self.blockchain = self.network.blockchain() self.undo_verifications() - def verify_merkle(self, response): + def verify_merkle(self, tx_hash, merkle): if self.wallet.verifier is None: return # we have been killed, this was just an orphan callback - if response.get('error'): - self.print_error('received an error:', response) - return - params = response['params'] - merkle = response['result'] # Verify the hash of the server-provided merkle branch to a # transaction matches the merkle root of its block - tx_hash = params[0] tx_height = merkle.get('block_height') pos = merkle.get('pos') merkle_branch = merkle.get('merkle') diff --git a/electrum/version.py b/electrum/version.py index cf185907d..53387af39 100644 --- a/electrum/version.py +++ b/electrum/version.py @@ -1,7 +1,7 @@ ELECTRUM_VERSION = '3.2.3' # version of the client package APK_VERSION = '3.2.3.1' # read by buildozer.spec -PROTOCOL_VERSION = '1.2' # protocol version requested +PROTOCOL_VERSION = '1.4' # protocol version requested # The hash of the mnemonic seed must begin with this SEED_PREFIX = '01' # Standard wallet