diff --git a/README.rst b/README.rst index c39fdf0..0d49516 100644 --- a/README.rst +++ b/README.rst @@ -135,6 +135,14 @@ version prior to the release of 1.0. ChangeLog ========= +Version 0.10.13 +--------------- + +* worked around asyncio issue to suppress the annoying log spew on shutdown + that makes it look like a bomb hit +* implement peer subscriptions as real subscriptions with incremental updates +* misc cleanups + Version 0.10.12 --------------- diff --git a/docs/ENVIRONMENT.rst b/docs/ENVIRONMENT.rst index 41f4284..0835acd 100644 --- a/docs/ENVIRONMENT.rst +++ b/docs/ENVIRONMENT.rst @@ -89,7 +89,7 @@ These environment variables are optional: * **SSL_PORT** If set ElectrumX will serve SSL clients on **HOST**:**SSL_PORT**. - If set SSL_CERTFILE and SSL_KEYFILE must be defined and be + If set then SSL_CERTFILE and SSL_KEYFILE must be defined and be filesystem paths to those SSL files. * **RPC_PORT** diff --git a/lib/script.py b/lib/script.py index d29ce1a..fbb2337 100644 --- a/lib/script.py +++ b/lib/script.py @@ -209,7 +209,7 @@ class Script(object): n += dlen ops.append(op) - except: + except Exception: # Truncated script; e.g. tx_hash # ebc9fa1196a59e192352d76c0f6e73167046b9d37b8302b6bb6968dfd279b767 raise ScriptError('truncated script') diff --git a/query.py b/query.py index 3824d71..283f33d 100755 --- a/query.py +++ b/query.py @@ -50,7 +50,7 @@ def main(): try: limit = int(sys.argv[argc]) argc += 1 - except: + except Exception: limit = 10 for addr in sys.argv[argc:]: print('Address: ', addr) diff --git a/server/block_processor.py b/server/block_processor.py index ea8fb8c..74ae878 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -138,9 +138,10 @@ class BlockProcessor(server.db.DB): Coordinate backing up in case of chain reorganisations. ''' - def __init__(self, env, daemon): + def __init__(self, env, controller, daemon): super().__init__(env) self.daemon = daemon + self.controller = controller # These are our state as we move ahead of DB state self.fs_height = self.db_height @@ -190,6 +191,7 @@ class BlockProcessor(server.db.DB): async def main_loop(self): '''Main loop for block processing.''' + self.controller.ensure_future(self.prefetcher.main_loop()) await self.prefetcher.reset_height() while True: @@ -205,16 +207,11 @@ class BlockProcessor(server.db.DB): self.logger.info('flushing state to DB for a clean shutdown...') self.flush(True) - async def executor(self, func, *args, **kwargs): - '''Run func taking args in the executor.''' - loop = asyncio.get_event_loop() - await loop.run_in_executor(None, partial(func, *args, **kwargs)) - async def first_caught_up(self): '''Called when first caught up to daemon after starting.''' # Flush everything with updated first_sync->False state. self.first_sync = False - await self.executor(self.flush, True) + await self.controller.run_in_executor(self.flush, True) if self.utxo_db.for_sync: self.logger.info('{} synced to height {:,d}' .format(VERSION, self.height)) @@ -240,7 +237,8 @@ class BlockProcessor(server.db.DB): if hprevs == chain: start = time.time() - await self.executor(self.advance_blocks, blocks, headers) + await self.controller.run_in_executor(self.advance_blocks, + blocks, headers) if not self.first_sync: s = '' if len(blocks) == 1 else 's' self.logger.info('processed {:,d} block{} in {:.1f}s' @@ -277,14 +275,14 @@ class BlockProcessor(server.db.DB): self.logger.info('chain reorg detected') else: self.logger.info('faking a reorg of {:,d} blocks'.format(count)) - await self.executor(self.flush, True) + await self.controller.run_in_executor(self.flush, True) hashes = await self.reorg_hashes(count) # Reverse and convert to hex strings. hashes = [hash_to_str(hash) for hash in reversed(hashes)] for hex_hashes in chunks(hashes, 50): blocks = await self.daemon.raw_blocks(hex_hashes) - await self.executor(self.backup_blocks, blocks) + await self.controller.run_in_executor(self.backup_blocks, blocks) await self.prefetcher.reset_height() async def reorg_hashes(self, count): diff --git a/server/controller.py b/server/controller.py index 803d557..334608b 100644 --- a/server/controller.py +++ b/server/controller.py @@ -11,6 +11,8 @@ import json import os import ssl import time +import traceback +import warnings from bisect import bisect_left from collections import defaultdict from concurrent.futures import ThreadPoolExecutor @@ -49,9 +51,9 @@ class Controller(util.LoggedClass): self.start_time = time.time() self.coin = env.coin self.daemon = Daemon(env.coin.daemon_urls(env.daemon_url)) - self.bp = BlockProcessor(env, self.daemon) - self.mempool = MemPool(self.bp) - self.peers = PeerManager(env) + self.bp = BlockProcessor(env, self, self.daemon) + self.mempool = MemPool(self.bp, self) + self.peers = PeerManager(env, self) self.env = env self.servers = {} # Map of session to the key of its list in self.groups @@ -63,6 +65,7 @@ class Controller(util.LoggedClass): self.max_sessions = env.max_sessions self.low_watermark = self.max_sessions * 19 // 20 self.max_subs = env.max_subs + self.futures = set() # Cache some idea of room to avoid recounting on each subscription self.subs_room = 0 self.next_stale_check = 0 @@ -88,12 +91,10 @@ class Controller(util.LoggedClass): ('server', 'banner donation_address'), ] - handlers = {'.'.join([prefix, suffix]): - getattr(self, suffix.replace('.', '_')) - for prefix, suffixes in rpcs - for suffix in suffixes.split()} - handlers['server.peers.subscribe'] = self.peers.subscribe - self.electrumx_handlers = handlers + self.electrumx_handlers = {'.'.join([prefix, suffix]): + getattr(self, suffix.replace('.', '_')) + for prefix, suffixes in rpcs + for suffix in suffixes.split()} async def mempool_transactions(self, hashX): '''Generate (hex_hash, tx_fee, unconfirmed) tuples for mempool @@ -199,43 +200,64 @@ class Controller(util.LoggedClass): if session.items: self.enqueue_session(session) - def initiate_shutdown(self): - '''Call this function to start the shutdown process.''' - self.shutdown_event.set() + async def run_in_executor(self, func, *args): + '''Wait whilst running func in the executor.''' + return await self.loop.run_in_executor(None, func, *args) + + def schedule_executor(self, func, *args): + '''Schedule running func in the executor, return a task.''' + return self.ensure_future(self.run_in_executor(func, *args)) + + def ensure_future(self, coro): + '''Schedule the coro to be run.''' + future = asyncio.ensure_future(coro) + future.add_done_callback(self.on_future_done) + self.futures.add(future) + return future + + def on_future_done(self, future): + '''Collect the result of a future after removing it from our set.''' + self.futures.remove(future) + try: + future.result() + except asyncio.CancelledError: + pass + except Exception: + self.log_error(traceback.format_exc()) + + async def wait_for_bp_catchup(self): + '''Called when the block processor catches up.''' + await self.bp.caught_up_event.wait() + self.logger.info('block processor has caught up') + self.ensure_future(self.peers.main_loop()) + self.ensure_future(self.start_servers()) + self.ensure_future(self.mempool.main_loop()) + self.ensure_future(self.enqueue_delayed_sessions()) + self.ensure_future(self.notify()) + for n in range(4): + self.ensure_future(self.serve_requests()) async def main_loop(self): '''Controller main loop.''' - def add_future(coro): - futures.append(asyncio.ensure_future(coro)) - - async def await_bp_catchup(): - '''Wait for the block processor to catch up. - - Then start the servers and the peer manager. - ''' - await self.bp.caught_up_event.wait() - self.logger.info('block processor has caught up') - add_future(self.peers.main_loop()) - add_future(self.start_servers()) - add_future(self.mempool.main_loop()) - add_future(self.enqueue_delayed_sessions()) - add_future(self.notify()) - for n in range(4): - add_future(self.serve_requests()) - - futures = [] - add_future(self.bp.main_loop()) - add_future(self.bp.prefetcher.main_loop()) - add_future(await_bp_catchup()) - - # Perform a clean shutdown when this event is signalled. - await self.shutdown_event.wait() + self.ensure_future(self.bp.main_loop()) + self.ensure_future(self.wait_for_bp_catchup()) + # Shut down cleanly after waiting for shutdown to be signalled + await self.shutdown_event.wait() self.logger.info('shutting down') - await self.shutdown(futures) + await self.shutdown() + # Avoid log spew on shutdown for partially opened SSL sockets + try: + del asyncio.sslproto._SSLProtocolTransport.__del__ + except Exception: + pass self.logger.info('shutdown complete') - async def shutdown(self, futures): + def initiate_shutdown(self): + '''Call this function to start the shutdown process.''' + self.shutdown_event.set() + + async def shutdown(self): '''Perform the shutdown sequence.''' self.state = self.SHUTTING_DOWN @@ -244,13 +266,13 @@ class Controller(util.LoggedClass): for session in self.sessions: self.close_session(session) - # Cancel the futures - for future in futures: + # Cancel pending futures + for future in self.futures: future.cancel() # Wait for all futures to finish - while any(not future.done() for future in futures): - await asyncio.sleep(1) + while not all (future.done() for future in self.futures): + await asyncio.sleep(0.1) # Finally shut down the block processor and executor self.bp.shutdown(self.executor) @@ -334,6 +356,11 @@ class Controller(util.LoggedClass): for session in sessions: await session.notify(self.bp.db_height, touched) + def notify_peers(self, updates): + '''Notify of peer updates.''' + for session in self.sessions: + session.notify_peers(updates) + def electrum_header(self, height): '''Return the binary header at the given height.''' if not 0 <= height <= self.bp.db_height: @@ -525,7 +552,7 @@ class Controller(util.LoggedClass): def lookup_session(self, session_id): try: session_id = int(session_id) - except: + except Exception: pass else: for session in self.sessions: @@ -581,7 +608,7 @@ class Controller(util.LoggedClass): def rpc_peers(self): '''Return a list of server peers, currently taken from IRC.''' - return self.peers.peer_list() + return self.peers.peer_dict() def rpc_reorg(self, count=3): '''Force a reorg of the given number of blocks. @@ -599,7 +626,7 @@ class Controller(util.LoggedClass): if isinstance(address, str): try: return self.coin.address_to_hashX(address) - except: + except Exception: pass raise RPCError('{} is not a valid address'.format(address)) @@ -694,8 +721,7 @@ class Controller(util.LoggedClass): limit = self.env.max_send // 97 return list(self.bp.get_history(hashX, limit=limit)) - loop = asyncio.get_event_loop() - history = await loop.run_in_executor(None, job) + history = await self.run_in_executor(job) self.history_cache[hashX] = history return history @@ -725,8 +751,8 @@ class Controller(util.LoggedClass): '''Get UTXOs asynchronously to reduce latency.''' def job(): return list(self.bp.get_utxos(hashX, limit=None)) - loop = asyncio.get_event_loop() - return await loop.run_in_executor(None, job) + + return await self.run_in_executor(job) def get_chunk(self, index): '''Return header chunk as hex. Index is a non-negative integer.''' diff --git a/server/env.py b/server/env.py index 2304284..167b2aa 100644 --- a/server/env.py +++ b/server/env.py @@ -83,7 +83,7 @@ class Env(LoggedClass): return default try: return int(value) - except: + except Exception: raise self.Error('cannot convert envvar {} value {} to an integer' .format(envvar, value)) diff --git a/server/mempool.py b/server/mempool.py index 0a0a952..387b12c 100644 --- a/server/mempool.py +++ b/server/mempool.py @@ -31,9 +31,10 @@ class MemPool(util.LoggedClass): A pair is a (hashX, value) tuple. tx hashes are hex strings. ''' - def __init__(self, bp): + def __init__(self, bp, controller): super().__init__() self.daemon = bp.daemon + self.controller = controller self.coin = bp.coin self.db = bp self.touched = bp.touched @@ -139,7 +140,6 @@ class MemPool(util.LoggedClass): break def async_process_some(self, unfetched, limit): - loop = asyncio.get_event_loop() pending = [] txs = self.txs @@ -162,9 +162,8 @@ class MemPool(util.LoggedClass): deferred = pending pending = [] - def job(): - return self.process_raw_txs(raw_txs, deferred) - result, deferred = await loop.run_in_executor(None, job) + result, deferred = await self.controller.run_in_executor \ + (self.process_raw_txs, raw_txs, deferred) pending.extend(deferred) hashXs = self.hashXs diff --git a/server/peers.py b/server/peers.py index 31519a3..7017f04 100644 --- a/server/peers.py +++ b/server/peers.py @@ -8,10 +8,9 @@ '''Peer management.''' import asyncio +import itertools import socket -import traceback from collections import namedtuple -from functools import partial import lib.util as util from server.irc import IRC @@ -30,30 +29,32 @@ class PeerManager(util.LoggedClass): VERSION = '1.0' DEFAULT_PORTS = {'t': 50001, 's': 50002} - def __init__(self, env): + def __init__(self, env, controller): super().__init__() self.env = env - self.loop = asyncio.get_event_loop() + self.controller = controller self.irc = IRC(env, self) - self.futures = set() - self.identities = [] + self.pruning = None + self._identities = [] # Keyed by nick self.irc_peers = {} + self.updated_nicks = set() # We can have a Tor identity inaddition to a normal one - self.identities.append(NetIdentity(env.report_host, - env.report_tcp_port, - env.report_ssl_port, - '')) + self._identities.append(self.identity(env.report_host, + env.report_tcp_port, + env.report_ssl_port, + '')) if env.report_host_tor.endswith('.onion'): - self.identities.append(NetIdentity(env.report_host_tor, - env.report_tcp_port_tor, - env.report_ssl_port_tor, - '_tor')) + self._identities.append(self.identity(env.report_host_tor, + env.report_tcp_port_tor, + env.report_ssl_port_tor, + '_tor')) - async def executor(self, func, *args, **kwargs): - '''Run func taking args in the executor.''' - await self.loop.run_in_executor(None, partial(func, *args, **kwargs)) + @classmethod + def identity(self, host, tcp_port, ssl_port, suffix): + '''Returns a NetIdentity object. Unpublished ports are None.''' + return NetIdentity(host, tcp_port or None, ssl_port or None, suffix) @classmethod def real_name(cls, identity): @@ -70,38 +71,29 @@ class PeerManager(util.LoggedClass): ssl = port_text('s', identity.ssl_port) return '{} v{}{}{}'.format(identity.host, cls.VERSION, tcp, ssl) - def ensure_future(self, coro): - '''Convert a coro into a future and add it to our pending list - to be waited for.''' - self.futures.add(asyncio.ensure_future(coro)) + def identities(self): + '''Return a list of network identities of this server.''' + return self._identities + + async def refresh_peer_subs(self): + for n in itertools.count(): + await asyncio.sleep(60) + updates = [self.irc_peers[nick] for nick in self.updated_nicks + if nick in self.irc_peers] + if updates: + self.controller.notify_peers(updates) + self.updated_nicks.clear() - def start_irc(self): - '''Start up the IRC connections if enabled.''' + async def main_loop(self): + '''Not a loop for now...''' + self.controller.ensure_future(self.refresh_peer_subs()) if self.env.irc: name_pairs = [(self.real_name(identity), identity.nick_suffix) - for identity in self.identities] - self.ensure_future(self.irc.start(name_pairs)) + for identity in self._identities] + self.controller.ensure_future(self.irc.start(name_pairs)) else: self.logger.info('IRC is disabled') - async def main_loop(self): - '''Start and then enter the main loop.''' - self.start_irc() - - try: - while True: - await asyncio.sleep(10) - done = [future for future in self.futures if future.done()] - self.futures.difference_update(done) - for future in done: - try: - future.result() - except: - self.log_error(traceback.format_exc()) - finally: - for future in self.futures: - future.cancel() - def dns_lookup_peer(self, nick, hostname, details): try: ip_addr = None @@ -110,6 +102,7 @@ class PeerManager(util.LoggedClass): except socket.error: pass # IPv6? ip_addr = ip_addr or hostname + self.updated_nicks.add(nick) self.irc_peers[nick] = IRCPeer(ip_addr, hostname, details) self.logger.info('new IRC peer {} at {} ({})' .format(nick, hostname, details)) @@ -119,7 +112,7 @@ class PeerManager(util.LoggedClass): def add_irc_peer(self, *args): '''Schedule DNS lookup of peer.''' - self.ensure_future(self.executor(self.dns_lookup_peer, *args)) + self.controller.schedule_executor(self.dns_lookup_peer, *args) def remove_irc_peer(self, nick): '''Remove a peer from our IRC peers map.''' @@ -129,11 +122,9 @@ class PeerManager(util.LoggedClass): def count(self): return len(self.irc_peers) - def peer_list(self): + def peer_dict(self): return self.irc_peers - def subscribe(self): - '''Returns the server peers as a list of (ip, host, details) tuples. - - Despite the name this is not currently treated as a subscription.''' + def peer_list(self): + '''Returns the server peers as a list of (ip, host, details) tuples.''' return list(self.irc_peers.values()) diff --git a/server/session.py b/server/session.py index 8916f49..a4e7fd0 100644 --- a/server/session.py +++ b/server/session.py @@ -7,8 +7,6 @@ '''Classes for local RPC server and remote client TCP/SSL servers.''' - -import asyncio import time import traceback from functools import partial @@ -106,6 +104,7 @@ class ElectrumX(SessionBase): super().__init__(*args, **kwargs) self.subscribe_headers = False self.subscribe_height = False + self.subscribe_peers = False self.notified_height = None self.max_send = self.env.max_send self.max_subs = self.env.max_session_subs @@ -115,6 +114,8 @@ class ElectrumX(SessionBase): 'blockchain.headers.subscribe': self.headers_subscribe, 'blockchain.numblocks.subscribe': self.numblocks_subscribe, 'blockchain.transaction.broadcast': self.transaction_broadcast, + 'server.features': self.server_features, + 'server.peers.subscribe': self.peers_subscribe, 'server.version': self.server_version, } @@ -168,6 +169,23 @@ class ElectrumX(SessionBase): self.subscribe_height = True return self.height() + def peers_subscribe(self, incremental=False): + '''Returns the server peers as a list of (ip, host, details) tuples. + + If incremental is False there is no subscription. If True the + remote side will receive notifications of new or modified + peers (peers that disappeared are not notified). + ''' + self.subscribe_peers = incremental + return self.controller.peers.peer_list() + + def notify_peers(self, updates): + '''Notify of peer updates. Updates are sent as a list in the same + format as the subscription reply, as the first parameter. + ''' + if self.subscribe_peers: + self.send_notification('server.peers.subscribe', [updates]) + async def address_subscribe(self, address): '''Subscribe to an address. @@ -181,6 +199,20 @@ class ElectrumX(SessionBase): self.hashX_subs[hashX] = address return status + def server_features(self): + '''Returns a dictionary of server features.''' + peers = self.controller.peers + hosts = {identity.host: { + 'tcp_port': identity.tcp_port, + 'ssl_port': identity.ssl_port, + 'pruning': peers.pruning, + 'version': peers.VERSION, + } for identity in self.controller.peers.identities()} + + return { + 'hosts': hosts, + } + def server_version(self, client_name=None, protocol_version=None): '''Returns the server version as a string. diff --git a/server/version.py b/server/version.py index ff7b40c..7c310f2 100644 --- a/server/version.py +++ b/server/version.py @@ -1 +1 @@ -VERSION = "ElectrumX 0.10.12" +VERSION = "ElectrumX 0.10.13"