diff --git a/electrumx/lib/text.py b/electrumx/lib/text.py new file mode 100644 index 0000000..8546eb9 --- /dev/null +++ b/electrumx/lib/text.py @@ -0,0 +1,82 @@ +import time + +import electrumx.lib.util as util + + +def sessions_lines(data): + '''A generator returning lines for a list of sessions. + + data is the return value of rpc_sessions().''' + fmt = ('{:<6} {:<5} {:>17} {:>5} {:>5} {:>5} ' + '{:>7} {:>7} {:>7} {:>7} {:>7} {:>9} {:>21}') + yield fmt.format('ID', 'Flags', 'Client', 'Proto', + 'Reqs', 'Txs', 'Subs', + 'Recv', 'Recv KB', 'Sent', 'Sent KB', 'Time', 'Peer') + for (id_, flags, peer, client, proto, reqs, txs_sent, subs, + recv_count, recv_size, send_count, send_size, time) in data: + yield fmt.format(id_, flags, client, proto, + '{:,d}'.format(reqs), + '{:,d}'.format(txs_sent), + '{:,d}'.format(subs), + '{:,d}'.format(recv_count), + '{:,d}'.format(recv_size // 1024), + '{:,d}'.format(send_count), + '{:,d}'.format(send_size // 1024), + util.formatted_time(time, sep=''), peer) + + +def groups_lines(data): + '''A generator returning lines for a list of groups. + + data is the return value of rpc_groups().''' + + fmt = ('{:<6} {:>9} {:>9} {:>6} {:>6} {:>8}' + '{:>7} {:>9} {:>7} {:>9}') + yield fmt.format('ID', 'Sessions', 'Bwidth KB', 'Reqs', 'Txs', 'Subs', + 'Recv', 'Recv KB', 'Sent', 'Sent KB') + for (id_, session_count, bandwidth, reqs, txs_sent, subs, + recv_count, recv_size, send_count, send_size) in data: + yield fmt.format(id_, + '{:,d}'.format(session_count), + '{:,d}'.format(bandwidth // 1024), + '{:,d}'.format(reqs), + '{:,d}'.format(txs_sent), + '{:,d}'.format(subs), + '{:,d}'.format(recv_count), + '{:,d}'.format(recv_size // 1024), + '{:,d}'.format(send_count), + '{:,d}'.format(send_size // 1024)) + + +def peers_lines(data): + '''A generator returning lines for a list of peers. + + data is the return value of rpc_peers().''' + def time_fmt(t): + if not t: + return 'Never' + return util.formatted_time(now - t) + + now = time.time() + fmt = ('{:<30} {:<6} {:>5} {:>5} {:<17} {:>4} ' + '{:>4} {:>8} {:>11} {:>11} {:>5} {:>20} {:<15}') + yield fmt.format('Host', 'Status', 'TCP', 'SSL', 'Server', 'Min', + 'Max', 'Pruning', 'Last Good', 'Last Try', + 'Tries', 'Source', 'IP Address') + for item in data: + features = item['features'] + hostname = item['host'] + host = features['hosts'][hostname] + yield fmt.format(hostname[:30], + item['status'], + host.get('tcp_port') or '', + host.get('ssl_port') or '', + features['server_version'] or 'unknown', + features['protocol_min'], + features['protocol_max'], + features['pruning'] or '', + time_fmt(item['last_good']), + time_fmt(item['last_try']), + item['try_count'], + item['source'][:20], + item['ip_addr'] or '') diff --git a/electrumx/server/controller.py b/electrumx/server/controller.py index 65a9063..daf67d2 100644 --- a/electrumx/server/controller.py +++ b/electrumx/server/controller.py @@ -6,38 +6,21 @@ # and warranty status of this software. import asyncio -import itertools -import json -import os -import ssl -import time import traceback -from bisect import bisect_left -from collections import defaultdict from concurrent.futures import ThreadPoolExecutor -from functools import partial import pylru from aiorpcx import RPCError, TaskSet, _version as aiorpcx_version import electrumx from electrumx.lib.hash import hash_to_hex_str, hex_str_to_hash -from electrumx.lib.peer import Peer from electrumx.lib.server_base import ServerBase import electrumx.lib.util as util from electrumx.server.daemon import DaemonError from electrumx.server.mempool import MemPool from electrumx.server.peers import PeerManager -from electrumx.server.session import (LocalRPC, BAD_REQUEST, DAEMON_ERROR, - non_negative_integer) - - -class SessionGroup(object): - - def __init__(self, gid): - self.gid = gid - # Concurrency per group - self.semaphore = asyncio.Semaphore(20) +from electrumx.server.session import (BAD_REQUEST, DAEMON_ERROR, + SessionManager, non_negative_integer) class Controller(ServerBase): @@ -47,7 +30,6 @@ class Controller(ServerBase): up with the daemon. ''' - CATCHING_UP, LISTENING, PAUSED, SHUTTING_DOWN = range(4) AIORPCX_MIN = (0, 5, 6) def __init__(self, env): @@ -66,69 +48,39 @@ class Controller(ServerBase): self.logger.info(f'event loop policy: {env.loop_policy}') self.coin = env.coin - self.servers = {} self.tasks = TaskSet() - self.sessions = set() - self.cur_group = SessionGroup(0) - self.txs_sent = 0 - self.next_log_sessions = 0 - self.state = self.CATCHING_UP - self.max_sessions = env.max_sessions - self.low_watermark = self.max_sessions * 19 // 20 - self.max_subs = env.max_subs - # Cache some idea of room to avoid recounting on each subscription - self.subs_room = 0 - self.next_stale_check = 0 self.history_cache = pylru.lrucache(256) self.header_cache = pylru.lrucache(8) self.cache_height = 0 self.cache_mn_height = 0 self.mn_cache = pylru.lrucache(256) env.max_send = max(350000, env.max_send) - # Set up the RPC request handlers - cmds = ('add_peer daemon_url disconnect getinfo groups log peers ' - 'reorg sessions stop'.split()) - self.rpc_handlers = {cmd: getattr(self, 'rpc_' + cmd) for cmd in cmds} self.loop = asyncio.get_event_loop() self.executor = ThreadPoolExecutor() self.loop.set_default_executor(self.executor) # The complex objects. Note PeerManager references self.loop (ugh) + self.session_mgr = SessionManager(env, self) self.daemon = self.coin.DAEMON(env) self.bp = self.coin.BLOCK_PROCESSOR(env, self, self.daemon) self.mempool = MemPool(self.bp, self) self.peer_mgr = PeerManager(env, self) - # Event triggered when electrumx is listening for incoming requests. - self.server_listening = asyncio.Event() - async def start_servers(self): '''Start the RPC server and schedule the external servers to be started once the block processor has caught up. ''' - if self.env.rpc_port is not None: - await self.start_server('RPC', self.env.cs_host(for_rpc=True), - self.env.rpc_port) - + await self.session_mgr.start_rpc_server() self.create_task(self.bp.main_loop()) self.create_task(self.wait_for_bp_catchup()) async def shutdown(self): '''Perform the shutdown sequence.''' - self.state = self.SHUTTING_DOWN - - # Close servers and sessions, and cancel all tasks - self.close_servers(list(self.servers.keys())) - for session in self.sessions: - session.abort() + # Not certain of ordering here self.tasks.cancel_all() - - # Wait for the above to take effect + await self.session_mgr.shutdown() await self.tasks.wait() - for session in list(self.sessions): - await session.wait_closed() - # Finally shut down the block processor and executor self.bp.shutdown(self.executor) @@ -147,10 +99,6 @@ class Controller(ServerBase): ''' return self.mempool.value(hashX) - def sent_tx(self, tx_hash): - '''Call when a TX is sent.''' - self.txs_sent += 1 - async def run_in_executor(self, func, *args): '''Wait whilst running func in the executor.''' return await self.loop.run_in_executor(None, func, *args) @@ -173,30 +121,6 @@ class Controller(ServerBase): except Exception as e: self.logger.exception(f'uncaught task exception: {e}') - async def housekeeping(self): - '''Regular housekeeping checks.''' - n = 0 - while True: - n += 1 - await asyncio.sleep(15) - if n % 10 == 0: - self.clear_stale_sessions() - - # Start listening for incoming connections if paused and - # session count has fallen - if (self.state == self.PAUSED and - len(self.sessions) <= self.low_watermark): - await self.start_external_servers() - - # Periodically log sessions - if self.env.log_sessions and time.time() > self.next_log_sessions: - if self.next_log_sessions: - data = self.session_data(for_log=True) - for line in Controller.sessions_text_lines(data): - self.logger.info(line) - self.logger.info(json.dumps(self.getinfo())) - self.next_log_sessions = time.time() + self.env.log_sessions - async def wait_for_bp_catchup(self): '''Wait for the block processor to catch up, and for the mempool to synchronize, then kick off server background processes.''' @@ -204,67 +128,8 @@ class Controller(ServerBase): self.create_task(self.mempool.main_loop()) await self.mempool.synchronized_event.wait() self.create_task(self.peer_mgr.main_loop()) - self.create_task(self.log_start_external_servers()) - self.create_task(self.housekeeping()) - - def close_servers(self, kinds): - '''Close the servers of the given kinds (TCP etc.).''' - if kinds: - self.logger.info('closing down {} listening servers' - .format(', '.join(kinds))) - for kind in kinds: - server = self.servers.pop(kind, None) - if server: - server.close() - - async def start_server(self, kind, *args, **kw_args): - protocol_class = LocalRPC if kind == 'RPC' else self.coin.SESSIONCLS - protocol_factory = partial(protocol_class, self, kind) - server = self.loop.create_server(protocol_factory, *args, **kw_args) - - host, port = args[:2] - try: - self.servers[kind] = await server - except Exception as e: - self.logger.error('{} server failed to listen on {}:{:d} :{}' - .format(kind, host, port, e)) - else: - self.logger.info('{} server listening on {}:{:d}' - .format(kind, host, port)) - - async def log_start_external_servers(self): - '''Start TCP and SSL servers.''' - self.logger.info('max session count: {:,d}'.format(self.max_sessions)) - self.logger.info('session timeout: {:,d} seconds' - .format(self.env.session_timeout)) - self.logger.info('session bandwidth limit {:,d} bytes' - .format(self.env.bandwidth_limit)) - self.logger.info('max response size {:,d} bytes' - .format(self.env.max_send)) - self.logger.info('max subscriptions across all sessions: {:,d}' - .format(self.max_subs)) - self.logger.info('max subscriptions per session: {:,d}' - .format(self.env.max_session_subs)) - if self.env.drop_client is not None: - self.logger.info('drop clients matching: {}' - .format(self.env.drop_client.pattern)) - await self.start_external_servers() - - async def start_external_servers(self): - '''Start listening on TCP and SSL ports, but only if the respective - port was given in the environment. - ''' - self.state = self.LISTENING - - env = self.env - host = env.cs_host(for_rpc=False) - if env.tcp_port is not None: - await self.start_server('TCP', host, env.tcp_port) - if env.ssl_port is not None: - sslc = ssl.SSLContext(ssl.PROTOCOL_TLS) - sslc.load_cert_chain(env.ssl_certfile, keyfile=env.ssl_keyfile) - await self.start_server('SSL', host, env.ssl_port, ssl=sslc) - self.server_listening.set() + self.create_task(self.session_mgr.start_serving()) + self.create_task(self.session_mgr.housekeeping()) def notify_sessions(self, touched): '''Notify sessions about height changes and touched addresses.''' @@ -278,19 +143,7 @@ class Controller(ServerBase): self.cache_height = height self.header_cache.clear() - # Height notifications are synchronous. Those sessions with - # touched addresses are scheduled for asynchronous completion - for session in self.sessions: - if isinstance(session, LocalRPC): - continue - session_touched = session.notify(height, touched) - if session_touched is not None: - self.create_task(session.notify_async(session_touched)) - - def notify_peers(self, updates): - '''Notify of peer updates.''' - for session in self.sessions: - session.notify_peers(updates) + self.session_mgr.notify(height, touched) def raw_header(self, height): '''Return the binary header at the given height.''' @@ -307,299 +160,6 @@ class Controller(ServerBase): height) return self.header_cache[height] - def add_session(self, session): - self.sessions.add(session) - if (len(self.sessions) >= self.max_sessions - and self.state == self.LISTENING): - self.state = self.PAUSED - session.logger.info('maximum sessions {:,d} reached, stopping new ' - 'connections until count drops to {:,d}' - .format(self.max_sessions, self.low_watermark)) - self.close_servers(['TCP', 'SSL']) - gid = int(session.start_time - self.start_time) // 900 - if self.cur_group.gid != gid: - self.cur_group = SessionGroup(gid) - return self.cur_group - - def remove_session(self, session): - '''Remove a session from our sessions list if there.''' - self.sessions.remove(session) - - def close_session(self, session): - '''Close the session's transport.''' - session.close() - return 'disconnected {:d}'.format(session.session_id) - - def toggle_logging(self, session): - '''Toggle logging of the session.''' - session.toggle_logging() - return 'log {:d}: {}'.format(session.session_id, session.log_me) - - def _group_map(self): - group_map = defaultdict(list) - for session in self.sessions: - group_map[session.group].append(session) - return group_map - - def clear_stale_sessions(self): - '''Cut off sessions that haven't done anything for 10 minutes.''' - now = time.time() - stale_cutoff = now - self.env.session_timeout - - stale = [] - for session in self.sessions: - if session.is_closing(): - session.abort() - elif session.last_recv < stale_cutoff: - self.close_session(session) - stale.append(session.session_id) - if stale: - self.logger.info('closing stale connections {}'.format(stale)) - - # Consolidate small groups - bw_limit = self.env.bandwidth_limit - group_map = self._group_map() - groups = [group for group, sessions in group_map.items() - if len(sessions) <= 5 and - sum(s.bw_charge for s in sessions) < bw_limit] - if len(groups) > 1: - new_group = groups[-1] - for group in groups: - for session in group_map[group]: - session.group = new_group - - def session_count(self): - '''The number of connections that we've sent something to.''' - return len(self.sessions) - - def getinfo(self): - '''A one-line summary of server state.''' - group_map = self._group_map() - return { - 'version': electrumx.version, - 'daemon': self.daemon.logged_url(), - 'daemon_height': self.daemon.cached_height(), - 'db_height': self.bp.db_height, - 'closing': len([s for s in self.sessions if s.is_closing()]), - 'errors': sum(s.rpc.errors for s in self.sessions), - 'groups': len(group_map), - 'logged': len([s for s in self.sessions if s.log_me]), - 'paused': sum(s.paused for s in self.sessions), - 'pid': os.getpid(), - 'peers': self.peer_mgr.info(), - 'requests': sum(s.count_pending_items() for s in self.sessions), - 'sessions': self.session_count(), - 'subs': self.sub_count(), - 'txs_sent': self.txs_sent, - 'uptime': util.formatted_time(time.time() - self.start_time), - } - - def sub_count(self): - return sum(s.sub_count() for s in self.sessions) - - @staticmethod - def groups_text_lines(data): - '''A generator returning lines for a list of groups. - - data is the return value of rpc_groups().''' - - fmt = ('{:<6} {:>9} {:>9} {:>6} {:>6} {:>8}' - '{:>7} {:>9} {:>7} {:>9}') - yield fmt.format('ID', 'Sessions', 'Bwidth KB', 'Reqs', 'Txs', 'Subs', - 'Recv', 'Recv KB', 'Sent', 'Sent KB') - for (id_, session_count, bandwidth, reqs, txs_sent, subs, - recv_count, recv_size, send_count, send_size) in data: - yield fmt.format(id_, - '{:,d}'.format(session_count), - '{:,d}'.format(bandwidth // 1024), - '{:,d}'.format(reqs), - '{:,d}'.format(txs_sent), - '{:,d}'.format(subs), - '{:,d}'.format(recv_count), - '{:,d}'.format(recv_size // 1024), - '{:,d}'.format(send_count), - '{:,d}'.format(send_size // 1024)) - - def group_data(self): - '''Returned to the RPC 'groups' call.''' - result = [] - group_map = self._group_map() - for group, sessions in group_map.items(): - result.append([group.gid, - len(sessions), - sum(s.bw_charge for s in sessions), - sum(s.count_pending_items() for s in sessions), - sum(s.txs_sent for s in sessions), - sum(s.sub_count() for s in sessions), - sum(s.recv_count for s in sessions), - sum(s.recv_size for s in sessions), - sum(s.send_count for s in sessions), - sum(s.send_size for s in sessions), - ]) - return result - - @staticmethod - def peers_text_lines(data): - '''A generator returning lines for a list of peers. - - data is the return value of rpc_peers().''' - def time_fmt(t): - if not t: - return 'Never' - return util.formatted_time(now - t) - - now = time.time() - fmt = ('{:<30} {:<6} {:>5} {:>5} {:<17} {:>4} ' - '{:>4} {:>8} {:>11} {:>11} {:>5} {:>20} {:<15}') - yield fmt.format('Host', 'Status', 'TCP', 'SSL', 'Server', 'Min', - 'Max', 'Pruning', 'Last Good', 'Last Try', - 'Tries', 'Source', 'IP Address') - for item in data: - features = item['features'] - hostname = item['host'] - host = features['hosts'][hostname] - yield fmt.format(hostname[:30], - item['status'], - host.get('tcp_port') or '', - host.get('ssl_port') or '', - features['server_version'] or 'unknown', - features['protocol_min'], - features['protocol_max'], - features['pruning'] or '', - time_fmt(item['last_good']), - time_fmt(item['last_try']), - item['try_count'], - item['source'][:20], - item['ip_addr'] or '') - - @staticmethod - def sessions_text_lines(data): - '''A generator returning lines for a list of sessions. - - data is the return value of rpc_sessions().''' - fmt = ('{:<6} {:<5} {:>17} {:>5} {:>5} {:>5} ' - '{:>7} {:>7} {:>7} {:>7} {:>7} {:>9} {:>21}') - yield fmt.format('ID', 'Flags', 'Client', 'Proto', - 'Reqs', 'Txs', 'Subs', - 'Recv', 'Recv KB', 'Sent', 'Sent KB', 'Time', 'Peer') - for (id_, flags, peer, client, proto, reqs, txs_sent, subs, - recv_count, recv_size, send_count, send_size, time) in data: - yield fmt.format(id_, flags, client, proto, - '{:,d}'.format(reqs), - '{:,d}'.format(txs_sent), - '{:,d}'.format(subs), - '{:,d}'.format(recv_count), - '{:,d}'.format(recv_size // 1024), - '{:,d}'.format(send_count), - '{:,d}'.format(send_size // 1024), - util.formatted_time(time, sep=''), peer) - - def session_data(self, for_log): - '''Returned to the RPC 'sessions' call.''' - now = time.time() - sessions = sorted(self.sessions, key=lambda s: s.start_time) - return [(session.session_id, - session.flags(), - session.peer_address_str(for_log=for_log), - session.client, - session.protocol_version_string(), - session.count_pending_items(), - session.txs_sent, - session.sub_count(), - session.recv_count, session.recv_size, - session.send_count, session.send_size, - now - session.start_time) - for session in sessions] - - def lookup_session(self, session_id): - try: - session_id = int(session_id) - except Exception: - pass - else: - for session in self.sessions: - if session.session_id == session_id: - return session - return None - - def for_each_session(self, session_ids, operation): - if not isinstance(session_ids, list): - raise RPCError(BAD_REQUEST, 'expected a list of session IDs') - - result = [] - for session_id in session_ids: - session = self.lookup_session(session_id) - if session: - result.append(operation(session)) - else: - result.append('unknown session: {}'.format(session_id)) - return result - - # Local RPC command handlers - - def rpc_add_peer(self, real_name): - '''Add a peer. - - real_name: a real name, as would appear on IRC - ''' - peer = Peer.from_real_name(real_name, 'RPC') - self.peer_mgr.add_peers([peer]) - return "peer '{}' added".format(real_name) - - def rpc_disconnect(self, session_ids): - '''Disconnect sesssions. - - session_ids: array of session IDs - ''' - return self.for_each_session(session_ids, self.close_session) - - def rpc_log(self, session_ids): - '''Toggle logging of sesssions. - - session_ids: array of session IDs - ''' - return self.for_each_session(session_ids, self.toggle_logging) - - def rpc_daemon_url(self, daemon_url=None): - '''Replace the daemon URL.''' - daemon_url = daemon_url or self.env.daemon_url - try: - self.daemon.set_urls(self.coin.daemon_urls(daemon_url)) - except Exception as e: - raise RPCError(BAD_REQUEST, f'an error occured: {e}') - return 'now using daemon at {}'.format(self.daemon.logged_url()) - - def rpc_stop(self): - '''Shut down the server cleanly.''' - self.loop.call_soon(self.shutdown_event.set) - return 'stopping' - - def rpc_getinfo(self): - '''Return summary information about the server process.''' - return self.getinfo() - - def rpc_groups(self): - '''Return statistics about the session groups.''' - return self.group_data() - - def rpc_peers(self): - '''Return a list of data about server peers.''' - return self.peer_mgr.rpc_data() - - def rpc_sessions(self): - '''Return statistics about connected sessions.''' - return self.session_data(for_log=False) - - def rpc_reorg(self, count=3): - '''Force a reorg of the given number of blocks. - - count: number of blocks to reorg (default 3) - ''' - count = non_negative_integer(count) - if not self.bp.force_chain_reorg(count): - raise RPCError(BAD_REQUEST, 'still catching up with daemon') - return 'scheduled a reorg of {:,d} blocks'.format(count) - # Helpers for RPC "blockchain" command handlers def assert_tx_hash(self, value): @@ -619,14 +179,6 @@ class Controller(ServerBase): except DaemonError as e: raise RPCError(DAEMON_ERROR, f'daemon error: {e}') - def new_subscription(self): - if self.subs_room <= 0: - self.subs_room = self.max_subs - self.sub_count() - if self.subs_room <= 0: - raise RPCError(BAD_REQUEST, f'server subscription limit ' - f'{self.max_subs:,d} reached') - self.subs_room -= 1 - async def get_history(self, hashX): '''Get history asynchronously to reduce latency.''' if hashX in self.history_cache: diff --git a/electrumx/server/session.py b/electrumx/server/session.py index 734defc..f002ce5 100644 --- a/electrumx/server/session.py +++ b/electrumx/server/session.py @@ -7,18 +7,25 @@ '''Classes for local RPC server and remote client TCP/SSL servers.''' +import asyncio import codecs +import datetime import itertools +import json +import os +import ssl import time -import datetime +from collections import defaultdict from functools import partial from aiorpcx import ServerSession, JSONRPCAutoDetect, RPCError import electrumx +import electrumx.lib.text as text +import electrumx.lib.util as util from electrumx.lib.hash import (sha256, hash_to_hex_str, hex_str_to_hash, HASHX_LEN) -import electrumx.lib.util as util +from electrumx.lib.peer import Peer from electrumx.server.daemon import DaemonError @@ -50,6 +57,7 @@ def non_negative_integer(value): class Semaphores(object): + '''For aiorpcX's semaphore handling.''' def __init__(self, semaphores): self.semaphores = semaphores @@ -65,6 +73,384 @@ class Semaphores(object): semaphore.release() +class SessionGroup(object): + + def __init__(self, gid): + self.gid = gid + # Concurrency per group + self.semaphore = asyncio.Semaphore(20) + + +class SessionManager(object): + '''Holds global state about all sessions.''' + + CATCHING_UP, LISTENING, PAUSED, SHUTTING_DOWN = range(4) + + def __init__(self, env, controller): + self.env = env + self.controller = controller + self.logger = util.class_logger(__name__, self.__class__.__name__) + self.servers = {} + self.sessions = set() + self.max_sessions = env.max_sessions + self.low_watermark = self.max_sessions * 19 // 20 + self.max_subs = env.max_subs + self.next_log_sessions = 0 + self.cur_group = SessionGroup(0) + self.state = self.CATCHING_UP + self.txs_sent = 0 + self.start_time = time.time() + # Cache some idea of room to avoid recounting on each subscription + self.subs_room = 0 + # Event triggered when electrumx is listening for incoming requests. + self.server_listening = asyncio.Event() + # Set up the RPC request handlers + cmds = ('add_peer daemon_url disconnect getinfo groups log peers ' + 'reorg sessions stop'.split()) + self.rpc_handlers = {cmd: getattr(self, 'rpc_' + cmd) for cmd in cmds} + + async def _start_server(self, kind, *args, **kw_args): + loop = asyncio.get_event_loop() + if kind == 'RPC': + protocol_class = LocalRPC + else: + protocol_class = self.env.coin.SESSIONCLS + protocol_factory = partial(protocol_class, self, self.controller, kind) + server = loop.create_server(protocol_factory, *args, **kw_args) + + host, port = args[:2] + try: + self.servers[kind] = await server + except Exception as e: + self.logger.error('{} server failed to listen on {}:{:d} :{}' + .format(kind, host, port, e)) + else: + self.logger.info('{} server listening on {}:{:d}' + .format(kind, host, port)) + + async def _start_external_servers(self): + '''Start listening on TCP and SSL ports, but only if the respective + port was given in the environment. + ''' + env = self.env + host = env.cs_host(for_rpc=False) + if env.tcp_port is not None: + await self._start_server('TCP', host, env.tcp_port) + if env.ssl_port is not None: + sslc = ssl.SSLContext(ssl.PROTOCOL_TLS) + sslc.load_cert_chain(env.ssl_certfile, keyfile=env.ssl_keyfile) + await self._start_server('SSL', host, env.ssl_port, ssl=sslc) + # Change state + self.state = self.LISTENING + self.server_listening.set() + + def _close_servers(self, kinds): + '''Close the servers of the given kinds (TCP etc.).''' + if kinds: + self.logger.info('closing down {} listening servers' + .format(', '.join(kinds))) + for kind in kinds: + server = self.servers.pop(kind, None) + if server: + server.close() + + def _group_map(self): + group_map = defaultdict(list) + for session in self.sessions: + group_map[session.group].append(session) + return group_map + + def _sub_count(self): + return sum(s.sub_count() for s in self.sessions) + + def _lookup_session(self, session_id): + try: + session_id = int(session_id) + except Exception: + pass + else: + for session in self.sessions: + if session.session_id == session_id: + return session + return None + + def _for_each_session(self, session_ids, operation): + if not isinstance(session_ids, list): + raise RPCError(BAD_REQUEST, 'expected a list of session IDs') + + result = [] + for session_id in session_ids: + session = self._lookup_session(session_id) + if session: + result.append(operation(session)) + else: + result.append('unknown session: {}'.format(session_id)) + return result + + def _close_session(self, session): + '''Close the session's transport.''' + session.close() + return 'disconnected {:d}'.format(session.session_id) + + def _clear_stale_sessions(self): + '''Cut off sessions that haven't done anything for 10 minutes.''' + now = time.time() + stale_cutoff = now - self.env.session_timeout + + stale = [] + for session in self.sessions: + if session.is_closing(): + session.abort() + elif session.last_recv < stale_cutoff: + self._close_session(session) + stale.append(session.session_id) + if stale: + self.logger.info('closing stale connections {}'.format(stale)) + + # Consolidate small groups + bw_limit = self.env.bandwidth_limit + group_map = self._group_map() + groups = [group for group, sessions in group_map.items() + if len(sessions) <= 5 and + sum(s.bw_charge for s in sessions) < bw_limit] + if len(groups) > 1: + new_group = groups[-1] + for group in groups: + for session in group_map[group]: + session.group = new_group + + def _getinfo(self): + '''A one-line summary of server state.''' + group_map = self._group_map() + daemon = self.controller.daemon + bp = self.controller.bp + peer_mgr = self.controller.peer_mgr + return { + 'version': electrumx.version, + 'daemon': daemon.logged_url(), + 'daemon_height': daemon.cached_height(), + 'db_height': bp.db_height, + 'closing': len([s for s in self.sessions if s.is_closing()]), + 'errors': sum(s.rpc.errors for s in self.sessions), + 'groups': len(group_map), + 'logged': len([s for s in self.sessions if s.log_me]), + 'paused': sum(s.paused for s in self.sessions), + 'pid': os.getpid(), + 'peers': peer_mgr.info(), + 'requests': sum(s.count_pending_items() for s in self.sessions), + 'sessions': self._session_count(), + 'subs': self._sub_count(), + 'txs_sent': self.txs_sent, + 'uptime': util.formatted_time(time.time() - self.start_time), + } + + def _session_data(self, for_log): + '''Returned to the RPC 'sessions' call.''' + now = time.time() + sessions = sorted(self.sessions, key=lambda s: s.start_time) + return [(session.session_id, + session.flags(), + session.peer_address_str(for_log=for_log), + session.client, + session.protocol_version_string(), + session.count_pending_items(), + session.txs_sent, + session.sub_count(), + session.recv_count, session.recv_size, + session.send_count, session.send_size, + now - session.start_time) + for session in sessions] + + def _group_data(self): + '''Returned to the RPC 'groups' call.''' + result = [] + group_map = self._group_map() + for group, sessions in group_map.items(): + result.append([group.gid, + len(sessions), + sum(s.bw_charge for s in sessions), + sum(s.count_pending_items() for s in sessions), + sum(s.txs_sent for s in sessions), + sum(s.sub_count() for s in sessions), + sum(s.recv_count for s in sessions), + sum(s.recv_size for s in sessions), + sum(s.send_count for s in sessions), + sum(s.send_size for s in sessions), + ]) + return result + + # --- LocalRPC command handlers + + def rpc_add_peer(self, real_name): + '''Add a peer. + + real_name: a real name, as would appear on IRC + ''' + peer = Peer.from_real_name(real_name, 'RPC') + self.controller.peer_mgr.add_peers([peer]) + return "peer '{}' added".format(real_name) + + def rpc_disconnect(self, session_ids): + '''Disconnect sesssions. + + session_ids: array of session IDs + ''' + return self._for_each_session(session_ids, self._close_session) + + def rpc_log(self, session_ids): + '''Toggle logging of sesssions. + + session_ids: array of session IDs + ''' + def toggle_logging(session): + '''Toggle logging of the session.''' + session.toggle_logging() + return 'log {:d}: {}'.format(session.session_id, session.log_me) + + return self._for_each_session(session_ids, toggle_logging) + + def rpc_daemon_url(self, daemon_url=None): + '''Replace the daemon URL.''' + daemon_url = daemon_url or self.env.daemon_url + daemon = self.controller.daemon + try: + daemon.set_urls(self.env.coin.daemon_urls(daemon_url)) + except Exception as e: + raise RPCError(BAD_REQUEST, f'an error occured: {e}') + return 'now using daemon at {}'.format(daemon.logged_url()) + + def rpc_stop(self): + '''Shut down the server cleanly.''' + loop = asyncio.get_event_loop() + loop.call_soon(self.controller.shutdown_event.set) + return 'stopping' + + def rpc_getinfo(self): + '''Return summary information about the server process.''' + return self._getinfo() + + def rpc_groups(self): + '''Return statistics about the session groups.''' + return self._group_data() + + def rpc_peers(self): + '''Return a list of data about server peers.''' + return self.controller.peer_mgr.rpc_data() + + def rpc_sessions(self): + '''Return statistics about connected sessions.''' + return self._session_data(for_log=False) + + def rpc_reorg(self, count=3): + '''Force a reorg of the given number of blocks. + + count: number of blocks to reorg (default 3) + ''' + count = non_negative_integer(count) + if not self.controller.bp.force_chain_reorg(count): + raise RPCError(BAD_REQUEST, 'still catching up with daemon') + return 'scheduled a reorg of {:,d} blocks'.format(count) + + # --- External Interface + + async def start_serving(self): + '''Start TCP and SSL servers.''' + self.logger.info('max session count: {:,d}'.format(self.max_sessions)) + self.logger.info('session timeout: {:,d} seconds' + .format(self.env.session_timeout)) + self.logger.info('session bandwidth limit {:,d} bytes' + .format(self.env.bandwidth_limit)) + self.logger.info('max response size {:,d} bytes' + .format(self.env.max_send)) + self.logger.info('max subscriptions across all sessions: {:,d}' + .format(self.max_subs)) + self.logger.info('max subscriptions per session: {:,d}' + .format(self.env.max_session_subs)) + if self.env.drop_client is not None: + self.logger.info('drop clients matching: {}' + .format(self.env.drop_client.pattern)) + await self._start_external_servers() + + async def start_rpc_server(self): + if self.env.rpc_port is not None: + await self._start_server('RPC', self.env.cs_host(for_rpc=True), + self.env.rpc_port) + + async def shutdown(self): + '''Close servers and sessions.''' + self.state = self.SHUTTING_DOWN + self._close_servers(list(self.servers.keys())) + for session in self.sessions: + session.abort() + for session in list(self.sessions): + await session.wait_closed() + + def session_count(self): + '''The number of connections that we've sent something to.''' + return len(self.sessions) + + def notify(self, height, touched): + # Height notifications are synchronous. Those sessions with + # touched addresses are scheduled for asynchronous completion + create_task = self.controller.create_task + for session in self.sessions: + if isinstance(session, LocalRPC): + continue + session_touched = session.notify(height, touched) + if session_touched is not None: + create_task(session.notify_async(session_touched)) + + async def housekeeping(self): + '''Regular housekeeping checks.''' + n = 0 + while True: + n += 1 + await asyncio.sleep(15) + if n % 10 == 0: + self._clear_stale_sessions() + + # Start listening for incoming connections if paused and + # session count has fallen + if (self.state == self.PAUSED and + len(self.sessions) <= self.low_watermark): + await self._start_external_servers() + + # Periodically log sessions + if self.env.log_sessions and time.time() > self.next_log_sessions: + if self.next_log_sessions: + data = self._session_data(for_log=True) + for line in text.sessions_lines(data): + self.logger.info(line) + self.logger.info(json.dumps(self._getinfo())) + self.next_log_sessions = time.time() + self.env.log_sessions + + def add_session(self, session): + self.sessions.add(session) + if (len(self.sessions) >= self.max_sessions + and self.state == self.LISTENING): + self.state = self.PAUSED + session.logger.info('maximum sessions {:,d} reached, stopping new ' + 'connections until count drops to {:,d}' + .format(self.max_sessions, self.low_watermark)) + self._close_servers(['TCP', 'SSL']) + gid = int(session.start_time - self.start_time) // 900 + if self.cur_group.gid != gid: + self.cur_group = SessionGroup(gid) + return self.cur_group + + def remove_session(self, session): + '''Remove a session from our sessions list if there.''' + self.sessions.remove(session) + + def new_subscription(self): + if self.subs_room <= 0: + self.subs_room = self.max_subs - self._sub_count() + if self.subs_room <= 0: + raise RPCError(BAD_REQUEST, f'server subscription limit ' + f'{self.max_subs:,d} reached') + self.subs_room -= 1 + + class SessionBase(ServerSession): '''Base class of ElectrumX JSON sessions. @@ -75,11 +461,12 @@ class SessionBase(ServerSession): MAX_CHUNK_SIZE = 2016 session_counter = itertools.count() - def __init__(self, controller, kind): + def __init__(self, session_mgr, controller, kind): super().__init__(rpc_protocol=JSONRPCAutoDetect) self.logger = util.class_logger(__name__, self.__class__.__name__) - self.kind = kind # 'RPC', 'TCP' etc. + self.session_mgr = session_mgr self.controller = controller + self.kind = kind # 'RPC', 'TCP' etc. self.bp = controller.bp self.env = controller.env self.coin = self.env.coin @@ -126,14 +513,14 @@ class SessionBase(ServerSession): context = {'conn_id': f'{self.session_id}'} self.logger = util.ConnectionLogger(self.logger, context) self.rpc.logger = self.logger - self.group = self.controller.add_session(self) + self.group = self.session_mgr.add_session(self) self.logger.info(f'{self.kind} {self.peer_address_str()}, ' - f'{len(self.controller.sessions):,d} total') + f'{self.session_mgr.session_count():,d} total') def connection_lost(self, exc): '''Handle client disconnection.''' super().connection_lost(exc) - self.controller.remove_session(self) + self.session_mgr.remove_session(self) msg = '' if self.paused: msg += ' whilst paused' @@ -349,7 +736,7 @@ class ElectrumX(SessionBase): f'{self.max_subs:,d} reached') # Now let the controller check its limit - self.controller.new_subscription() + self.session_mgr.new_subscription() self.hashX_subs[hashX] = alias return await self.address_status(hashX) @@ -621,8 +1008,8 @@ class ElectrumX(SessionBase): try: tx_hash = await self.daemon.sendrawtransaction([raw_tx]) self.txs_sent += 1 + self.session_mgr.txs_sent += 1 self.logger.info('sent tx: {}'.format(tx_hash)) - self.controller.sent_tx(tx_hash) return tx_hash except DaemonError as e: error, = e.args @@ -706,7 +1093,7 @@ class LocalRPC(SessionBase): def request_handler(self, method): '''Return the async handler for the given request method.''' - return self.controller.rpc_handlers.get(method) + return self.session_mgr.rpc_handlers.get(method) class DashElectrumX(ElectrumX): diff --git a/electrumx_rpc b/electrumx_rpc index 09d24ca..8312f82 100755 --- a/electrumx_rpc +++ b/electrumx_rpc @@ -15,9 +15,9 @@ import asyncio import json from os import environ -from aiorpcx import ClientSession +import electrumx.lib.text as text -from electrumx import Controller +from aiorpcx import ClientSession def main(): @@ -46,7 +46,7 @@ def main(): async with ClientSession('localhost', port) as session: result = await session.send_request(method, params, timeout=15) if method in ('groups', 'peers', 'sessions'): - lines_func = getattr(Controller, f'{method}_text_lines') + lines_func = getattr(text, f'{method}_lines') for line in lines_func(result): print(line) else: