diff --git a/electrum/bitcoin.py b/electrum/bitcoin.py index abaa5ed24..6bebf498b 100644 --- a/electrum/bitcoin.py +++ b/electrum/bitcoin.py @@ -284,13 +284,15 @@ def script_to_address(script, *, net=None): assert t == TYPE_ADDRESS return addr -def address_to_script(addr, *, net=None): +def address_to_script(addr: str, *, net=None) -> str: if net is None: net = constants.net + if not is_address(addr, net=net): + raise BitcoinException(f"invalid bitcoin address: {addr}") witver, witprog = segwit_addr.decode(net.SEGWIT_HRP, addr) if witprog is not None: if not (0 <= witver <= 16): - raise BitcoinException('impossible witness version: {}'.format(witver)) + raise BitcoinException(f'impossible witness version: {witver}') OP_n = witver + 0x50 if witver > 0 else 0 script = bh2u(bytes([OP_n])) script += push_script(bh2u(bytes(witprog))) @@ -305,7 +307,7 @@ def address_to_script(addr, *, net=None): script += push_script(bh2u(hash_160_)) script += '87' # op_equal else: - raise BitcoinException('unknown address type: {}'.format(addrtype)) + raise BitcoinException(f'unknown address type: {addrtype}') return script def address_to_scripthash(addr): @@ -491,24 +493,28 @@ def address_from_private_key(sec): public_key = ecc.ECPrivkey(privkey).get_public_key_hex(compressed=compressed) return pubkey_to_address(txin_type, public_key) -def is_segwit_address(addr): +def is_segwit_address(addr, *, net=None): + if net is None: net = constants.net try: - witver, witprog = segwit_addr.decode(constants.net.SEGWIT_HRP, addr) + witver, witprog = segwit_addr.decode(net.SEGWIT_HRP, addr) except Exception as e: return False return witprog is not None -def is_b58_address(addr): +def is_b58_address(addr, *, net=None): + if net is None: net = constants.net try: addrtype, h = b58_address_to_hash160(addr) except Exception as e: return False - if addrtype not in [constants.net.ADDRTYPE_P2PKH, constants.net.ADDRTYPE_P2SH]: + if addrtype not in [net.ADDRTYPE_P2PKH, net.ADDRTYPE_P2SH]: return False return addr == hash160_to_b58_address(h, addrtype) -def is_address(addr): - return is_segwit_address(addr) or is_b58_address(addr) +def is_address(addr, *, net=None): + if net is None: net = constants.net + return is_segwit_address(addr, net=net) \ + or is_b58_address(addr, net=net) def is_private_key(key): diff --git a/electrum/blockchain.py b/electrum/blockchain.py index 0f8749056..2c72f6b37 100644 --- a/electrum/blockchain.py +++ b/electrum/blockchain.py @@ -72,7 +72,11 @@ def hash_header(header: dict) -> str: return '0' * 64 if header.get('prev_block_hash') is None: header['prev_block_hash'] = '00'*32 - return hash_encode(sha256d(bfh(serialize_header(header)))) + return hash_raw_header(serialize_header(header)) + + +def hash_raw_header(header: str) -> str: + return hash_encode(sha256d(bfh(header))) blockchains = {} # type: Dict[int, Blockchain] diff --git a/electrum/daemon.py b/electrum/daemon.py index 029231c97..b811905f3 100644 --- a/electrum/daemon.py +++ b/electrum/daemon.py @@ -30,15 +30,14 @@ import traceback import sys import threading from typing import Dict, Optional, Tuple -import re import jsonrpclib from .jsonrpc import VerifyingJSONRPCServer from .version import ELECTRUM_VERSION from .network import Network -from .util import json_decode, DaemonThread -from .util import print_error, to_string +from .util import (json_decode, DaemonThread, print_error, to_string, + create_and_start_event_loop) from .wallet import Wallet, Abstract_Wallet from .storage import WalletStorage from .commands import known_commands, Commands @@ -128,7 +127,7 @@ class Daemon(DaemonThread): if fd is None and listen_jsonrpc: fd, server = get_fd_or_server(config) if fd is None: raise Exception('failed to lock daemon; already running?') - self.create_and_start_event_loop() + self.asyncio_loop, self._stop_loop, self._loop_thread = create_and_start_event_loop() if config.get('offline'): self.network = None else: @@ -330,22 +329,3 @@ class Daemon(DaemonThread): except BaseException as e: traceback.print_exc(file=sys.stdout) # app will exit now - - def create_and_start_event_loop(self): - def on_exception(loop, context): - """Suppress spurious messages it appears we cannot control.""" - SUPPRESS_MESSAGE_REGEX = re.compile('SSL handshake|Fatal read error on|' - 'SSL error in data received') - message = context.get('message') - if message and SUPPRESS_MESSAGE_REGEX.match(message): - return - loop.default_exception_handler(context) - - self.asyncio_loop = asyncio.get_event_loop() - self.asyncio_loop.set_exception_handler(on_exception) - # self.asyncio_loop.set_debug(1) - self._stop_loop = asyncio.Future() - self._loop_thread = threading.Thread(target=self.asyncio_loop.run_until_complete, - args=(self._stop_loop,), - name='EventLoop') - self._loop_thread.start() diff --git a/electrum/network.py b/electrum/network.py index cb34d638f..36c957d8b 100644 --- a/electrum/network.py +++ b/electrum/network.py @@ -32,7 +32,7 @@ import json import sys import ipaddress import asyncio -from typing import NamedTuple, Optional, Sequence, List, Dict +from typing import NamedTuple, Optional, Sequence, List, Dict, Tuple import traceback import dns @@ -53,7 +53,7 @@ NODES_RETRY_INTERVAL = 60 SERVER_RETRY_INTERVAL = 10 -def parse_servers(result): +def parse_servers(result: Sequence[Tuple[str, str, List[str]]]) -> Dict[str, dict]: """ parse servers list into dict format""" servers = {} for item in result: @@ -170,6 +170,7 @@ class Network(PrintError): INSTANCE = self self.asyncio_loop = asyncio.get_event_loop() + assert self.asyncio_loop.is_running(), "event loop not running" self._loop_thread = None # type: threading.Thread # set by caller; only used for sanity checks if config is None: @@ -225,6 +226,8 @@ class Network(PrintError): self.server_queue = None self.proxy = None + self._set_status('disconnected') + def run_from_another_thread(self, coro): assert self._loop_thread != threading.current_thread(), 'must not be called from network thread' fut = asyncio.run_coroutine_threadsafe(coro, self.asyncio_loop) @@ -411,10 +414,10 @@ class Network(PrintError): out = filter_noonion(out) return out - def _start_interface(self, server): + def _start_interface(self, server: str): if server not in self.interfaces and server not in self.connecting: if server == self.default_server: - self.print_error("connecting to %s as new interface" % server) + self.print_error(f"connecting to {server} as new interface") self._set_status('connecting') self.connecting.add(server) self.server_queue.put(server) diff --git a/electrum/scripts/bip70.py b/electrum/scripts/bip70.py index 2e04bfe76..7b3d0de2c 100755 --- a/electrum/scripts/bip70.py +++ b/electrum/scripts/bip70.py @@ -1,5 +1,6 @@ #!/usr/bin/env python3 # create a BIP70 payment request signed with a certificate +# FIXME: the code here is outdated, and no longer working import tlslite diff --git a/electrum/scripts/block_headers.py b/electrum/scripts/block_headers.py index 649a0493e..abf6a2595 100755 --- a/electrum/scripts/block_headers.py +++ b/electrum/scripts/block_headers.py @@ -3,29 +3,33 @@ # A simple script that connects to a server and displays block headers import time -import sys +import asyncio -from .. import SimpleConfig, Network -from electrum.util import print_msg, json_encode +from electrum.network import Network +from electrum.util import print_msg, json_encode, create_and_start_event_loop, log_exceptions # start network -c = SimpleConfig() -network = Network(c) +loop, stopping_fut, loop_thread = create_and_start_event_loop() +network = Network() network.start() # wait until connected -while network.is_connecting(): - time.sleep(0.1) +while not network.is_connected(): + time.sleep(1) + print_msg("waiting for network to get connected...") -if not network.is_connected(): - print_msg("daemon is not connected") - sys.exit(1) +header_queue = asyncio.Queue() -# 2. send the subscription -callback = lambda response: print_msg(json_encode(response.get('result'))) -network.send([('server.version',["block_headers script", "1.2"])], callback) -network.subscribe_to_headers(callback) +@log_exceptions +async def f(): + try: + await network.interface.session.subscribe('blockchain.headers.subscribe', [], header_queue) + # 3. wait for results + while network.is_connected(): + header = await header_queue.get() + print_msg(json_encode(header)) + finally: + stopping_fut.set_result(1) -# 3. wait for results -while network.is_connected(): - time.sleep(1) +# 2. send the subscription +asyncio.run_coroutine_threadsafe(f(), loop) diff --git a/electrum/scripts/estimate_fee.py b/electrum/scripts/estimate_fee.py index 85f63cef0..bcb8c4978 100755 --- a/electrum/scripts/estimate_fee.py +++ b/electrum/scripts/estimate_fee.py @@ -1,7 +1,29 @@ #!/usr/bin/env python3 -from . import util import json -from electrum.network import filter_protocol -peers = filter_protocol(util.get_peers()) -results = util.send_request(peers, 'blockchain.estimatefee', [2]) -print(json.dumps(results, indent=4)) +import asyncio +from statistics import median +from numbers import Number + +from electrum.network import filter_protocol, Network +from electrum.util import create_and_start_event_loop, log_exceptions + +import util + + +loop, stopping_fut, loop_thread = create_and_start_event_loop() +network = Network() +network.start() + +@log_exceptions +async def f(): + try: + peers = await util.get_peers(network) + peers = filter_protocol(peers) + results = await util.send_request(network, peers, 'blockchain.estimatefee', [2]) + print(json.dumps(results, indent=4)) + feerate_estimates = filter(lambda x: isinstance(x, Number), results.values()) + print(f"median feerate: {median(feerate_estimates)}") + finally: + stopping_fut.set_result(1) + +asyncio.run_coroutine_threadsafe(f(), loop) diff --git a/electrum/scripts/get_history.py b/electrum/scripts/get_history.py index c83f99d91..a97698288 100755 --- a/electrum/scripts/get_history.py +++ b/electrum/scripts/get_history.py @@ -1,9 +1,12 @@ #!/usr/bin/env python3 import sys -from .. import Network -from electrum.util import json_encode, print_msg +import asyncio + from electrum import bitcoin +from electrum.network import Network +from electrum.util import json_encode, print_msg, create_and_start_event_loop, log_exceptions + try: addr = sys.argv[1] @@ -11,8 +14,17 @@ except Exception: print("usage: get_history ") sys.exit(1) -n = Network() -n.start() -_hash = bitcoin.address_to_scripthash(addr) -h = n.get_history_for_scripthash(_hash) -print_msg(json_encode(h)) +loop, stopping_fut, loop_thread = create_and_start_event_loop() +network = Network() +network.start() + +@log_exceptions +async def f(): + try: + sh = bitcoin.address_to_scripthash(addr) + hist = await network.get_history_for_scripthash(sh) + print_msg(json_encode(hist)) + finally: + stopping_fut.set_result(1) + +asyncio.run_coroutine_threadsafe(f(), loop) diff --git a/electrum/scripts/peers.py b/electrum/scripts/peers.py index a887f0795..e8ec5b033 100755 --- a/electrum/scripts/peers.py +++ b/electrum/scripts/peers.py @@ -1,14 +1,28 @@ #!/usr/bin/env python3 +import asyncio -from . import util +from electrum.network import filter_protocol, Network +from electrum.util import create_and_start_event_loop, log_exceptions +from electrum.blockchain import hash_raw_header -from electrum.network import filter_protocol -from electrum.blockchain import hash_header +import util -peers = util.get_peers() -peers = filter_protocol(peers, 's') -results = util.send_request(peers, 'blockchain.headers.subscribe', []) +loop, stopping_fut, loop_thread = create_and_start_event_loop() +network = Network() +network.start() -for n,v in sorted(results.items(), key=lambda x:x[1].get('block_height')): - print("%60s"%n, v.get('block_height'), hash_header(v)) +@log_exceptions +async def f(): + try: + peers = await util.get_peers(network) + peers = filter_protocol(peers, 's') + results = await util.send_request(network, peers, 'blockchain.headers.subscribe', []) + for server, header in sorted(results.items(), key=lambda x: x[1].get('height')): + height = header.get('height') + blockhash = hash_raw_header(header.get('hex')) + print("%60s" % server, height, blockhash) + finally: + stopping_fut.set_result(1) + +asyncio.run_coroutine_threadsafe(f(), loop) diff --git a/electrum/scripts/servers.py b/electrum/scripts/servers.py index c7201bc38..0c104f43b 100755 --- a/electrum/scripts/servers.py +++ b/electrum/scripts/servers.py @@ -1,10 +1,27 @@ #!/usr/bin/env python3 - -from .. import set_verbosity -from electrum.network import filter_version -from . import util import json -set_verbosity(False) +import asyncio + +from electrum.network import filter_version, Network +from electrum.util import create_and_start_event_loop, log_exceptions +from electrum import constants + +import util + + +#constants.set_testnet() + +loop, stopping_fut, loop_thread = create_and_start_event_loop() +network = Network() +network.start() + +@log_exceptions +async def f(): + try: + peers = await util.get_peers(network) + peers = filter_version(peers) + print(json.dumps(peers, sort_keys=True, indent=4)) + finally: + stopping_fut.set_result(1) -servers = filter_version(util.get_peers()) -print(json.dumps(servers, sort_keys = True, indent = 4)) +asyncio.run_coroutine_threadsafe(f(), loop) diff --git a/electrum/scripts/txradar.py b/electrum/scripts/txradar.py index dda732274..8c150d8fe 100755 --- a/electrum/scripts/txradar.py +++ b/electrum/scripts/txradar.py @@ -1,20 +1,38 @@ #!/usr/bin/env python3 -from . import util import sys +import asyncio + +from electrum.network import filter_protocol, Network +from electrum.util import create_and_start_event_loop, log_exceptions + +import util + + try: - tx = sys.argv[1] + txid = sys.argv[1] except: print("usage: txradar txid") sys.exit(1) -peers = util.get_peers() -results = util.send_request(peers, 'blockchain.transaction.get', [tx]) -r1 = [] -r2 = [] +loop, stopping_fut, loop_thread = create_and_start_event_loop() +network = Network() +network.start() -for k, v in results.items(): - (r1 if v else r2).append(k) +@log_exceptions +async def f(): + try: + peers = await util.get_peers(network) + peers = filter_protocol(peers, 's') + results = await util.send_request(network, peers, 'blockchain.transaction.get', [txid]) + r1, r2 = [], [] + for k, v in results.items(): + (r1 if not isinstance(v, Exception) else r2).append(k) + print(f"Received {len(results)} answers") + try: propagation = len(r1) * 100. / (len(r1) + len(r2)) + except ZeroDivisionError: propagation = 0 + print(f"Propagation rate: {propagation:.1f} percent") + finally: + stopping_fut.set_result(1) -print("Received %d answers"%len(results)) -print("Propagation rate: %.1f percent" % (len(r1) *100./(len(r1)+ len(r2)))) +asyncio.run_coroutine_threadsafe(f(), loop) diff --git a/electrum/scripts/util.py b/electrum/scripts/util.py index 266348f75..43a95d7d2 100644 --- a/electrum/scripts/util.py +++ b/electrum/scripts/util.py @@ -1,87 +1,46 @@ -import select, time, queue -# import electrum -from .. import Connection, Interface, SimpleConfig +import asyncio +from typing import List, Sequence -from electrum.network import parse_servers -from collections import defaultdict +from aiorpcx import TaskGroup + +from electrum.network import parse_servers, Network +from electrum.interface import Interface -# electrum.util.set_verbosity(1) -def get_interfaces(servers, timeout=10): - '''Returns a map of servers to connected interfaces. If any - connections fail or timeout, they will be missing from the map. - ''' - assert type(servers) is list - socket_queue = queue.Queue() - config = SimpleConfig() - connecting = {} - for server in servers: - if server not in connecting: - connecting[server] = Connection(server, socket_queue, config.path) - interfaces = {} - timeout = time.time() + timeout - count = 0 - while time.time() < timeout and count < len(servers): - try: - server, socket = socket_queue.get(True, 0.3) - except queue.Empty: - continue - if socket: - interfaces[server] = Interface(server, socket) - count += 1 - return interfaces -def wait_on_interfaces(interfaces, timeout=10): - '''Return a map of servers to a list of (request, response) tuples. - Waits timeout seconds, or until each interface has a response''' - result = defaultdict(list) - timeout = time.time() + timeout - while len(result) < len(interfaces) and time.time() < timeout: - rin = [i for i in interfaces.values()] - win = [i for i in interfaces.values() if i.unsent_requests] - rout, wout, xout = select.select(rin, win, [], 1) - for interface in wout: - interface.send_requests() - for interface in rout: - responses = interface.get_responses() - if responses: - result[interface.server].extend(responses) - return result +#electrum.util.set_verbosity(True) -def get_peers(): - config = SimpleConfig() - peers = {} - # 1. get connected interfaces - server = config.get('server') - if server is None: - print("You need to set a secure server, for example (for mainnet): 'electrum setconfig server helicarrier.bauerj.eu:50002:s'") - return [] - interfaces = get_interfaces([server]) - if not interfaces: - print("No connection to", server) - return [] - # 2. get list of peers - interface = interfaces[server] - interface.queue_request('server.peers.subscribe', [], 0) - responses = wait_on_interfaces(interfaces).get(server) - if responses: - response = responses[0][1] # One response, (req, response) tuple - peers = parse_servers(response.get('result')) +async def get_peers(network: Network): + while not network.is_connected(): + await asyncio.sleep(1) + interface = network.interface + session = interface.session + print(f"asking server {interface.server} for its peers") + peers = parse_servers(await session.send_request('server.peers.subscribe')) + print(f"got {len(peers)} servers") return peers -def send_request(peers, method, params): - print("Contacting %d servers"%len(peers)) - interfaces = get_interfaces(peers) - print("%d servers could be reached" % len(interfaces)) - for peer in peers: - if not peer in interfaces: - print("Connection failed:", peer) - for msg_id, i in enumerate(interfaces.values()): - i.queue_request(method, params, msg_id) - responses = wait_on_interfaces(interfaces) - for peer in interfaces: - if not peer in responses: - print(peer, "did not answer") - results = dict(zip(responses.keys(), [t[0][1].get('result') for t in responses.values()])) - print("%d answers"%len(results)) - return results +async def send_request(network: Network, servers: List[str], method: str, params: Sequence): + print(f"contacting {len(servers)} servers") + num_connecting = len(network.connecting) + for server in servers: + network._start_interface(server) + # sleep a bit + for _ in range(10): + if len(network.connecting) < num_connecting: + break + await asyncio.sleep(1) + print(f"connected to {len(network.interfaces)} servers. sending request to all.") + responses = dict() + async def get_response(iface: Interface): + try: + res = await iface.session.send_request(method, params, timeout=10) + except Exception as e: + print(f"server {iface.server} errored or timed out: ({repr(e)})") + res = e + responses[iface.server] = res + async with TaskGroup() as group: + for interface in network.interfaces.values(): + await group.spawn(get_response(interface)) + print("%d answers" % len(responses)) + return responses diff --git a/electrum/scripts/watch_address.py b/electrum/scripts/watch_address.py index 8fd5d4914..851160de5 100755 --- a/electrum/scripts/watch_address.py +++ b/electrum/scripts/watch_address.py @@ -1,10 +1,12 @@ #!/usr/bin/env python3 import sys -import time -from electrum import bitcoin -from .. import SimpleConfig, Network -from electrum.util import print_msg, json_encode +import asyncio + +from electrum.network import Network +from electrum.util import print_msg, create_and_start_event_loop +from electrum.synchronizer import SynchronizerBase + try: addr = sys.argv[1] @@ -12,25 +14,31 @@ except Exception: print("usage: watch_address ") sys.exit(1) -sh = bitcoin.address_to_scripthash(addr) - # start network -c = SimpleConfig() -network = Network(c) +loop = create_and_start_event_loop()[0] +network = Network() network.start() -# wait until connected -while network.is_connecting(): - time.sleep(0.1) -if not network.is_connected(): - print_msg("daemon is not connected") - sys.exit(1) +class Notifier(SynchronizerBase): + def __init__(self, network): + SynchronizerBase.__init__(self, network) + self.watched_addresses = set() + self.watch_queue = asyncio.Queue() + + async def main(self): + # resend existing subscriptions if we were restarted + for addr in self.watched_addresses: + await self._add_address(addr) + # main loop + while True: + addr = await self.watch_queue.get() + self.watched_addresses.add(addr) + await self._add_address(addr) + + async def _on_address_status(self, addr, status): + print_msg(f"addr {addr}, status {status}") -# 2. send the subscription -callback = lambda response: print_msg(json_encode(response.get('result'))) -network.subscribe_to_address(addr, callback) -# 3. wait for results -while network.is_connected(): - time.sleep(1) +notifier = Notifier(network) +asyncio.run_coroutine_threadsafe(notifier.watch_queue.put(addr), loop) diff --git a/electrum/synchronizer.py b/electrum/synchronizer.py index 0a6ed687e..cdde6d248 100644 --- a/electrum/synchronizer.py +++ b/electrum/synchronizer.py @@ -31,7 +31,7 @@ from aiorpcx import TaskGroup, run_in_thread from .transaction import Transaction from .util import bh2u, make_aiohttp_session, NetworkJobOnDefaultServer -from .bitcoin import address_to_scripthash +from .bitcoin import address_to_scripthash, is_address if TYPE_CHECKING: from .network import Network @@ -77,7 +77,8 @@ class SynchronizerBase(NetworkJobOnDefaultServer): def add(self, addr): asyncio.run_coroutine_threadsafe(self._add_address(addr), self.asyncio_loop) - async def _add_address(self, addr): + async def _add_address(self, addr: str): + if not is_address(addr): raise ValueError(f"invalid bitcoin address {addr}") if addr in self.requested_addrs: return self.requested_addrs.add(addr) await self.add_queue.put(addr) diff --git a/electrum/util.py b/electrum/util.py index f4cb8ddd6..08ea73d40 100644 --- a/electrum/util.py +++ b/electrum/util.py @@ -278,7 +278,7 @@ class DaemonThread(threading.Thread, PrintError): self.print_error("stopped") -verbosity = '*' +verbosity = '' def set_verbosity(filters: Union[str, bool]): global verbosity if type(filters) is bool: # backwards compat @@ -983,3 +983,26 @@ class NetworkJobOnDefaultServer(PrintError): s = self.interface.session assert s is not None return s + + +def create_and_start_event_loop() -> Tuple[asyncio.AbstractEventLoop, + asyncio.Future, + threading.Thread]: + def on_exception(loop, context): + """Suppress spurious messages it appears we cannot control.""" + SUPPRESS_MESSAGE_REGEX = re.compile('SSL handshake|Fatal read error on|' + 'SSL error in data received') + message = context.get('message') + if message and SUPPRESS_MESSAGE_REGEX.match(message): + return + loop.default_exception_handler(context) + + loop = asyncio.get_event_loop() + loop.set_exception_handler(on_exception) + # loop.set_debug(1) + stopping_fut = asyncio.Future() + loop_thread = threading.Thread(target=loop.run_until_complete, + args=(stopping_fut,), + name='EventLoop') + loop_thread.start() + return loop, stopping_fut, loop_thread