diff --git a/docs/rpc-interface.rst b/docs/rpc-interface.rst index 12a9116..ecdcd12 100644 --- a/docs/rpc-interface.rst +++ b/docs/rpc-interface.rst @@ -45,8 +45,14 @@ have that effect. disconnect ---------- -Disconnect the given session IDs. Session IDs can be seen in the logs -or with the `sessions`_ RPC command:: +Disonnect the given session IDs or group names. + +Session IDs can be obtained in the logs or with the `sessions`_ RPC command. Group +names can be optained with the `groups`_ RPC command. + +The special string :const:`all` disconnects all sessions. + +Example:: $ electrumx_rpc disconnect 2 3 [ @@ -54,10 +60,6 @@ or with the `sessions`_ RPC command:: "disconnected 3" ] -ElectrumX initiates the socket close process for the passed sessions. -Whilst most connections close quickly, it can take several minutes for -Python to shut some SSL connections down. - getinfo ------- @@ -66,46 +68,72 @@ A typical result is as follows (with annotated comments):: $ electrumx_rpc getinfo { - "closing": 1, # The number of sessions being closed down - "daemon": "192.168.0.2:8332/", # The daemon URL without auth info - "daemon_height": 520527, # The daemon's height when last queried - "db_height": 520527, # The height to which the DB is flushed - "errors": 0, # Errors across current sessions - "groups": 7, # The number of session groups - "logged": 0, # The number of sessions being logged - "paused": 0, # The number of paused sessions - "peers": { # Various categories of server peers - "bad": 0, # Not responding or invalid height etc. - "good": 28, # Responding with good data - "never": 0, # Never managed to connect - "stale": 0, # Was "good" but not recently connected - "total": 28 # Sum of the above - }, - "pid": 85861, # Server's process ID - "requests": 0, # Unprocessed requests across all sessions - "sessions": 43, # Total number of sessions - "sessions_with_subs": 4, # Number of sessions with history subscriptions - "subs": 84, # Script hash subscriptions across all sessions - "txs_sent": 4, # Transactions sent since server started - "uptime": "06h 48m 00s" # Time since server started + "coin": "BitcoinSegwit", + "daemon": "127.0.0.1:9334/", + "daemon height": 572154, # The daemon's height when last queried + "db height": 572154, # The height to which the DB is flushed + "groups": 586, # The number of session groups + "history cache": "185,014 lookups 9,756 hits 1,000 entries", + "merkle cache": "280 lookups 54 hits 213 entries", + "peers": { # Peer information + "bad": 1, + "good": 51, + "never": 2, + "stale": 0, + "total": 54 + }, + "pid": 11804, # Process ID + "request counts": { # Count of RPC requests by method name + "blockchain.block.header": 245, + "blockchain.block.headers": 70, + "blockchain.estimatefee": 12776, + "blockchain.headers.subscribe": 2825, + "blockchain.relayfee": 740, + "blockchain.scripthash.get_history": 196, + "blockchain.scripthash.subscribe": 184626, + "blockchain.transaction.broadcast": 19, + "blockchain.transaction.get": 213, + "blockchain.transaction.get_merkle": 289, + "getinfo": 3, + "groups": 1, + "mempool.get_fee_histogram": 3194, + "server.add_peer": 9, + "server.banner": 740, + "server.donation_address": 754, + "server.features": 50, + "server.peers.subscribe": 792, + "server.ping": 6412, + "server.version": 2866 + }, + "request total": 216820, # Total requests served + "sessions": { # Live session stats + "count": 670, + "count with subs": 45, + "errors": 0, + "logged": 0, + "paused": 0, + "pending requests": 79, # Number of requests currently being processed + "subs": 36292 # Total subscriptions + }, + "tx hashes cache": "289 lookups 38 hits 213 entries", + "txs sent": 19, # Transactions broadcast + "uptime": "01h 39m 04s", + "version": "ElectrumX 1.10.1" } Each ill-formed request, or one that does not follow the Electrum -protocol, increments the error count of the session that sent it. If -the error count reaches a certain level (currently ``10``) that client -is disconnected. +protocol, increments the error count of the session that sent it. :ref:`logging ` of sessions can be enabled by RPC. For more information on peers see :ref:`here `. -Clients that are slow to consume data sent to them are :dfn:`paused` -until their socket buffer drains sufficiently, at which point -processing of requests resumes. +Clients that are slow to consume data sent to them are :dfn:`paused` until their socket +buffer drains sufficiently, at which point processing of requests resumes. -Apart from very short intervals, typically after a new block or when -a client has just connected, the number of unprocessed requests -should normally be zero. +Apart from very short intervals, typically after a new block or when a client has just +connected, the number of unprocessed requests should be low, say 250 or fewer. If it is +over 1,000 the server is overloaded. Sessions are put into groups, primarily as an anti-DoS measure. Currently each session goes into two groups: one for an IP subnet, and one based on the timeslice it connected @@ -122,11 +150,22 @@ The output is quite similar to the `sessions`_ command. log --- -Toggle logging of the given session IDs. All incoming requests for a -logged session are written to the server log. Session IDs can be seen -in the logs or with the `sessions`_ RPC command:: +Toggle logging of the given session IDs or group names. All incoming requests for a +logged session are written to the server log. The arguments are case-insensitive. + +When a group is specified, logging is toggled for its current members only; there is no +effect on future group members. + +Session IDs can be obtained in the logs or with the `sessions`_ RPC command. Group +names can be optained with the `groups`_ RPC command. + +The special string :const:`all` turns on logging of all current and future sessions, +:const:`none` turns off logging of all current and future sessions, and :const:`new` +toggles logging of future sessions. - $ electrumx_rpc log 0 1 2 3 4 5 +Example:: + + $ electrumx_rpc log new 0 1 2 3 4 5 [ "log 0: False", "log 1: False", @@ -136,20 +175,8 @@ in the logs or with the `sessions`_ RPC command:: "unknown session: 5" ] -The return value shows this command turned off logging for sesssions -0, 1 and 2. It was turned on for sessions 3 and 4, and there was no -session 5. - -lognew ------- - -Toggle logging of the new sessions. Their requests are written to the -server log:: - - $ electrumx_rpc lognew - "lognew: True" - -The return value shows that new session logging is now on. +The return value shows this command turned off logging for sesssions 0, 1 and 2. It was +turned on for sessions 3 and 4, and there was no session 5. .. _peers: diff --git a/electrumx/server/session.py b/electrumx/server/session.py index 954969c..4446dea 100644 --- a/electrumx/server/session.py +++ b/electrumx/server/session.py @@ -100,6 +100,15 @@ class SessionGroup(object): return self.retained_cost + self.session_cost() +@attr.s(slots=True) +class SessionReferences(object): + # All attributes are sets but groups is a list + sessions = attr.ib() + groups = attr.ib() + specials = attr.ib() # Lower-case strings: 'none', 'new' and 'all'. + unknown = attr.ib() # Strings + + class SessionManager(object): '''Holds global state about all sessions.''' @@ -135,12 +144,13 @@ class SessionManager(object): if issubclass(env.coin.SESSIONCLS, DashElectrumX): self.mn_cache_height = 0 self.mn_cache = [] + self._task_group = TaskGroup() # Event triggered when electrumx is listening for incoming requests. self.server_listening = Event() self.session_event = Event() # Set up the RPC request handlers - cmds = ('add_peer daemon_url disconnect getinfo groups log lognew peers ' + cmds = ('add_peer daemon_url disconnect getinfo groups log peers ' 'query reorg sessions stop'.split()) LocalRPC.request_handlers = {cmd: getattr(self, 'rpc_' + cmd) for cmd in cmds} @@ -221,29 +231,12 @@ class SessionManager(object): self.logger.info(line) self.logger.info(json.dumps(self._get_info())) - def _lookup_session(self, session_id): - try: - session_id = int(session_id) - except (ValueError, TypeError): - pass - else: - for session in self.sessions: - if session.session_id == session_id: - return session - return None - - async def _for_each_session(self, session_ids, operation): - if not isinstance(session_ids, list): - raise RPCError(BAD_REQUEST, 'expected a list of session IDs') - - result = [] - for session_id in session_ids: - session = self._lookup_session(session_id) - if session: - result.append(await operation(session)) - else: - result.append(f'unknown session: {session_id}') - return result + async def _disconnect_sessions(self, sessions, reason, *, force_after=1.0): + if sessions: + session_ids = ', '.join(str(session.session_id) for session in sessions) + self.logger.info(f'{reason} session ids {session_ids}') + for session in sessions: + await self._task_group.spawn(session.close(force_after=force_after)) async def _clear_stale_sessions(self): '''Cut off sessions that haven't done anything for 10 minutes.''' @@ -252,13 +245,8 @@ class SessionManager(object): stale_cutoff = time.time() - self.env.session_timeout stale_sessions = [session for session in self.sessions if session.last_recv < stale_cutoff] - if stale_sessions: - text = ', '.join(str(session.session_id) - for session in stale_sessions) - self.logger.info(f'closing stale connections {text}') - async with TaskGroup() as group: - for session in stale_sessions: - await group.spawn(session.close()) + await self._disconnect_sessions(stale_sessions, 'closing stale') + del stale_sessions async def _recalc_concurrency(self): '''Periodically recalculate session concurrency.''' @@ -279,14 +267,16 @@ class SessionManager(object): for group in dead_groups: self.session_groups.pop(group.name) - async with TaskGroup() as group: - for session in list(self.sessions): - # Subs have an on-going cost so decay more slowly with more subs - session.cost_decay_per_sec = hard_limit / (10000 + 5 * session.sub_count()) - try: - session.recalc_concurrency() - except ExcessiveSessionCostError: - await group.spawn(session.close()) + sessions_to_close = [] + for session in self.sessions: + # Subs have an on-going cost so decay more slowly with more subs + session.cost_decay_per_sec = hard_limit / (10000 + 5 * session.sub_count()) + try: + session.recalc_concurrency() + except ExcessiveSessionCostError: + sessions_to_close.append(session) + await self._disconnect_sessions(sessions_to_close, 'closing expensive') + del sessions_to_close def _get_info(self): '''A summary of server state.''' @@ -370,6 +360,39 @@ class SessionManager(object): self.hsub_results = {'hex': raw.hex(), 'height': height} self.notified_height = height + def _session_references(self, items, special_strings): + '''Return a SessionReferences object.''' + if not isinstance(items, list) or not all(isinstance(item, str) for item in items): + raise RPCError(BAD_REQUEST, 'expected a list of session IDs') + + sessions_by_id = {session.session_id: session for session in self.sessions} + groups_by_name = self.session_groups + + sessions = set() + groups = set() # Names as groups are not hashable + specials = set() + unknown = set() + + for item in items: + if item.isdigit(): + session = sessions_by_id.get(int(item)) + if session: + sessions.add(session) + else: + unknown.add(item) + else: + lc_item = item.lower() + if lc_item in special_strings: + specials.add(lc_item) + else: + if lc_item in groups_by_name: + groups.add(lc_item) + else: + unknown.add(item) + + groups = [groups_by_name[group] for group in groups] + return SessionReferences(sessions, groups, specials, unknown) + # --- LocalRPC command handlers async def rpc_add_peer(self, real_name): @@ -385,29 +408,61 @@ class SessionManager(object): session_ids: array of session IDs ''' - async def close(session): - '''Close the session's transport.''' - await session.close(force_after=2) - return f'disconnected {session.session_id}' + refs = self._session_references(session_ids, {'all'}) + result = [] - return await self._for_each_session(session_ids, close) + if 'all' in refs.specials: + sessions = self.sessions + result.append('disconnecting all sessions') + else: + sessions = refs.sessions + result.extend(f'disconnecting session {session.session_id}' for session in sessions) + for group in refs.groups: + result.append(f'disconnecting group {group.name}') + sessions.update(group.sessions) + result.extend(f'unknown: {item}' for item in refs.unknown) + + await self._disconnect_sessions(sessions, 'local RPC request to disconnect') + return result async def rpc_log(self, session_ids): '''Toggle logging of sesssions. - session_ids: array of session IDs + session_ids: array of session or group IDs, or 'all', 'none', 'new' ''' - async def toggle_logging(session): - '''Toggle logging of the session.''' - session.toggle_logging() - return f'log {session.session_id}: {session.log_me}' + refs = self._session_references(session_ids, {'all', 'none', 'new'}) - return await self._for_each_session(session_ids, toggle_logging) - - async def rpc_lognew(self): - '''Toggle logging of new sesssions.''' - SessionBase.log_new = not SessionBase.log_new - return f'lognew: {SessionBase.log_new}' + result = [] + if 'all' in refs.specials: + for session in self.sessions: + session.log_me = True + SessionBase.log_new = True + result.append('logging all sessions') + elif 'none' in refs.specials: + for session in self.sessions: + session.log_me = False + SessionBase.log_new = False + result.append('logging no sessions') + + def add_result(text, value): + result.append(f'logging {text}' if value else f'not logging {text}') + + if not result: + sessions = refs.sessions + if 'new' in refs.specials: + SessionBase.log_new = not SessionBase.log_new + add_result('new sessions', SessionBase.log_new) + for session in sessions: + session.log_me = not session.log_me + add_result(f'session {session.session_id}', session.log_me) + for group in refs.groups: + for session in group.sessions.difference(sessions): + sessions.add(session) + session.log_me = not session.log_me + add_result(f'session {session.session_id}', session.log_me) + + result.extend(f'unknown: {item}' for item in refs.unknown) + return result async def rpc_daemon_url(self, daemon_url): '''Replace the daemon URL.''' @@ -538,7 +593,7 @@ class SessionManager(object): await self._start_external_servers() # Peer discovery should start after the external servers # because we connect to ourself - async with TaskGroup() as group: + async with self._task_group as group: await group.spawn(self.peer_mgr.discover_peers()) await group.spawn(self._clear_stale_sessions()) await group.spawn(self._recalc_concurrency()) @@ -690,9 +745,8 @@ class SessionManager(object): for hashX in set(cache).intersection(touched): del cache[hashX] - async with TaskGroup() as group: - for session in self.sessions: - await group.spawn(session.notify, touched, height_changed) + for session in self.sessions: + await self._task_group.spawn(session.notify, touched, height_changed) def _ip_addr_group_name(self, session): ip_addr = session.peer_address() @@ -773,9 +827,6 @@ class SessionBase(RPCSession): return 'xx.xx.xx.xx:xx' return super().peer_address_str() - def toggle_logging(self): - self.log_me = not self.log_me - def flags(self): '''Status flags.''' status = self.kind[0] diff --git a/electrumx_rpc b/electrumx_rpc index 4de53a5..e21c917 100755 --- a/electrumx_rpc +++ b/electrumx_rpc @@ -22,7 +22,6 @@ import electrumx.lib.text as text simple_commands = { 'getinfo': 'Print a summary of server state', 'groups': 'Print current session groups', - 'lognew': 'Toggle logging of new sessions', 'peers': 'Print information about peer servers for the same coin', 'sessions': 'Print information about client sessions', 'stop': 'Shut down the server cleanly', @@ -30,7 +29,7 @@ simple_commands = { session_commands = { 'disconnect': 'Disconnect sessions', - 'log': 'Toggle logging of sessions', + 'log': 'Control logging of sessions', } other_commands = { @@ -94,7 +93,7 @@ def main(): for command, help in session_commands.items(): parser = subparsers.add_parser(command, help=help) - parser.add_argument('session_ids', nargs='+', type=int, + parser.add_argument('session_ids', nargs='+', type=str, help='list of session ids') for command, data in other_commands.items():