Browse Source

Improve disconnect and log local RPC commands.

Remove lognew.  Update docs.
patch-2
Neil Booth 6 years ago
parent
commit
6030252e89
  1. 135
      docs/rpc-interface.rst
  2. 159
      electrumx/server/session.py
  3. 5
      electrumx_rpc

135
docs/rpc-interface.rst

@ -45,8 +45,14 @@ have that effect.
disconnect disconnect
---------- ----------
Disconnect the given session IDs. Session IDs can be seen in the logs Disonnect the given session IDs or group names.
or with the `sessions`_ RPC command::
Session IDs can be obtained in the logs or with the `sessions`_ RPC command. Group
names can be optained with the `groups`_ RPC command.
The special string :const:`all` disconnects all sessions.
Example::
$ electrumx_rpc disconnect 2 3 $ electrumx_rpc disconnect 2 3
[ [
@ -54,10 +60,6 @@ or with the `sessions`_ RPC command::
"disconnected 3" "disconnected 3"
] ]
ElectrumX initiates the socket close process for the passed sessions.
Whilst most connections close quickly, it can take several minutes for
Python to shut some SSL connections down.
getinfo getinfo
------- -------
@ -66,46 +68,72 @@ A typical result is as follows (with annotated comments)::
$ electrumx_rpc getinfo $ electrumx_rpc getinfo
{ {
"closing": 1, # The number of sessions being closed down "coin": "BitcoinSegwit",
"daemon": "192.168.0.2:8332/", # The daemon URL without auth info "daemon": "127.0.0.1:9334/",
"daemon_height": 520527, # The daemon's height when last queried "daemon height": 572154, # The daemon's height when last queried
"db_height": 520527, # The height to which the DB is flushed "db height": 572154, # The height to which the DB is flushed
"errors": 0, # Errors across current sessions "groups": 586, # The number of session groups
"groups": 7, # The number of session groups "history cache": "185,014 lookups 9,756 hits 1,000 entries",
"logged": 0, # The number of sessions being logged "merkle cache": "280 lookups 54 hits 213 entries",
"paused": 0, # The number of paused sessions "peers": { # Peer information
"peers": { # Various categories of server peers "bad": 1,
"bad": 0, # Not responding or invalid height etc. "good": 51,
"good": 28, # Responding with good data "never": 2,
"never": 0, # Never managed to connect "stale": 0,
"stale": 0, # Was "good" but not recently connected "total": 54
"total": 28 # Sum of the above
}, },
"pid": 85861, # Server's process ID "pid": 11804, # Process ID
"requests": 0, # Unprocessed requests across all sessions "request counts": { # Count of RPC requests by method name
"sessions": 43, # Total number of sessions "blockchain.block.header": 245,
"sessions_with_subs": 4, # Number of sessions with history subscriptions "blockchain.block.headers": 70,
"subs": 84, # Script hash subscriptions across all sessions "blockchain.estimatefee": 12776,
"txs_sent": 4, # Transactions sent since server started "blockchain.headers.subscribe": 2825,
"uptime": "06h 48m 00s" # Time since server started "blockchain.relayfee": 740,
"blockchain.scripthash.get_history": 196,
"blockchain.scripthash.subscribe": 184626,
"blockchain.transaction.broadcast": 19,
"blockchain.transaction.get": 213,
"blockchain.transaction.get_merkle": 289,
"getinfo": 3,
"groups": 1,
"mempool.get_fee_histogram": 3194,
"server.add_peer": 9,
"server.banner": 740,
"server.donation_address": 754,
"server.features": 50,
"server.peers.subscribe": 792,
"server.ping": 6412,
"server.version": 2866
},
"request total": 216820, # Total requests served
"sessions": { # Live session stats
"count": 670,
"count with subs": 45,
"errors": 0,
"logged": 0,
"paused": 0,
"pending requests": 79, # Number of requests currently being processed
"subs": 36292 # Total subscriptions
},
"tx hashes cache": "289 lookups 38 hits 213 entries",
"txs sent": 19, # Transactions broadcast
"uptime": "01h 39m 04s",
"version": "ElectrumX 1.10.1"
} }
Each ill-formed request, or one that does not follow the Electrum Each ill-formed request, or one that does not follow the Electrum
protocol, increments the error count of the session that sent it. If protocol, increments the error count of the session that sent it.
the error count reaches a certain level (currently ``10``) that client
is disconnected.
:ref:`logging <session logging>` of sessions can be enabled by RPC. :ref:`logging <session logging>` of sessions can be enabled by RPC.
For more information on peers see :ref:`here <peers>`. For more information on peers see :ref:`here <peers>`.
Clients that are slow to consume data sent to them are :dfn:`paused` Clients that are slow to consume data sent to them are :dfn:`paused` until their socket
until their socket buffer drains sufficiently, at which point buffer drains sufficiently, at which point processing of requests resumes.
processing of requests resumes.
Apart from very short intervals, typically after a new block or when Apart from very short intervals, typically after a new block or when a client has just
a client has just connected, the number of unprocessed requests connected, the number of unprocessed requests should be low, say 250 or fewer. If it is
should normally be zero. over 1,000 the server is overloaded.
Sessions are put into groups, primarily as an anti-DoS measure. Currently each session Sessions are put into groups, primarily as an anti-DoS measure. Currently each session
goes into two groups: one for an IP subnet, and one based on the timeslice it connected goes into two groups: one for an IP subnet, and one based on the timeslice it connected
@ -122,11 +150,22 @@ The output is quite similar to the `sessions`_ command.
log log
--- ---
Toggle logging of the given session IDs. All incoming requests for a Toggle logging of the given session IDs or group names. All incoming requests for a
logged session are written to the server log. Session IDs can be seen logged session are written to the server log. The arguments are case-insensitive.
in the logs or with the `sessions`_ RPC command::
When a group is specified, logging is toggled for its current members only; there is no
effect on future group members.
Session IDs can be obtained in the logs or with the `sessions`_ RPC command. Group
names can be optained with the `groups`_ RPC command.
$ electrumx_rpc log 0 1 2 3 4 5 The special string :const:`all` turns on logging of all current and future sessions,
:const:`none` turns off logging of all current and future sessions, and :const:`new`
toggles logging of future sessions.
Example::
$ electrumx_rpc log new 0 1 2 3 4 5
[ [
"log 0: False", "log 0: False",
"log 1: False", "log 1: False",
@ -136,20 +175,8 @@ in the logs or with the `sessions`_ RPC command::
"unknown session: 5" "unknown session: 5"
] ]
The return value shows this command turned off logging for sesssions The return value shows this command turned off logging for sesssions 0, 1 and 2. It was
0, 1 and 2. It was turned on for sessions 3 and 4, and there was no turned on for sessions 3 and 4, and there was no session 5.
session 5.
lognew
------
Toggle logging of the new sessions. Their requests are written to the
server log::
$ electrumx_rpc lognew
"lognew: True"
The return value shows that new session logging is now on.
.. _peers: .. _peers:

159
electrumx/server/session.py

@ -100,6 +100,15 @@ class SessionGroup(object):
return self.retained_cost + self.session_cost() return self.retained_cost + self.session_cost()
@attr.s(slots=True)
class SessionReferences(object):
# All attributes are sets but groups is a list
sessions = attr.ib()
groups = attr.ib()
specials = attr.ib() # Lower-case strings: 'none', 'new' and 'all'.
unknown = attr.ib() # Strings
class SessionManager(object): class SessionManager(object):
'''Holds global state about all sessions.''' '''Holds global state about all sessions.'''
@ -135,12 +144,13 @@ class SessionManager(object):
if issubclass(env.coin.SESSIONCLS, DashElectrumX): if issubclass(env.coin.SESSIONCLS, DashElectrumX):
self.mn_cache_height = 0 self.mn_cache_height = 0
self.mn_cache = [] self.mn_cache = []
self._task_group = TaskGroup()
# Event triggered when electrumx is listening for incoming requests. # Event triggered when electrumx is listening for incoming requests.
self.server_listening = Event() self.server_listening = Event()
self.session_event = Event() self.session_event = Event()
# Set up the RPC request handlers # Set up the RPC request handlers
cmds = ('add_peer daemon_url disconnect getinfo groups log lognew peers ' cmds = ('add_peer daemon_url disconnect getinfo groups log peers '
'query reorg sessions stop'.split()) 'query reorg sessions stop'.split())
LocalRPC.request_handlers = {cmd: getattr(self, 'rpc_' + cmd) LocalRPC.request_handlers = {cmd: getattr(self, 'rpc_' + cmd)
for cmd in cmds} for cmd in cmds}
@ -221,29 +231,12 @@ class SessionManager(object):
self.logger.info(line) self.logger.info(line)
self.logger.info(json.dumps(self._get_info())) self.logger.info(json.dumps(self._get_info()))
def _lookup_session(self, session_id): async def _disconnect_sessions(self, sessions, reason, *, force_after=1.0):
try: if sessions:
session_id = int(session_id) session_ids = ', '.join(str(session.session_id) for session in sessions)
except (ValueError, TypeError): self.logger.info(f'{reason} session ids {session_ids}')
pass for session in sessions:
else: await self._task_group.spawn(session.close(force_after=force_after))
for session in self.sessions:
if session.session_id == session_id:
return session
return None
async def _for_each_session(self, session_ids, operation):
if not isinstance(session_ids, list):
raise RPCError(BAD_REQUEST, 'expected a list of session IDs')
result = []
for session_id in session_ids:
session = self._lookup_session(session_id)
if session:
result.append(await operation(session))
else:
result.append(f'unknown session: {session_id}')
return result
async def _clear_stale_sessions(self): async def _clear_stale_sessions(self):
'''Cut off sessions that haven't done anything for 10 minutes.''' '''Cut off sessions that haven't done anything for 10 minutes.'''
@ -252,13 +245,8 @@ class SessionManager(object):
stale_cutoff = time.time() - self.env.session_timeout stale_cutoff = time.time() - self.env.session_timeout
stale_sessions = [session for session in self.sessions stale_sessions = [session for session in self.sessions
if session.last_recv < stale_cutoff] if session.last_recv < stale_cutoff]
if stale_sessions: await self._disconnect_sessions(stale_sessions, 'closing stale')
text = ', '.join(str(session.session_id) del stale_sessions
for session in stale_sessions)
self.logger.info(f'closing stale connections {text}')
async with TaskGroup() as group:
for session in stale_sessions:
await group.spawn(session.close())
async def _recalc_concurrency(self): async def _recalc_concurrency(self):
'''Periodically recalculate session concurrency.''' '''Periodically recalculate session concurrency.'''
@ -279,14 +267,16 @@ class SessionManager(object):
for group in dead_groups: for group in dead_groups:
self.session_groups.pop(group.name) self.session_groups.pop(group.name)
async with TaskGroup() as group: sessions_to_close = []
for session in list(self.sessions): for session in self.sessions:
# Subs have an on-going cost so decay more slowly with more subs # Subs have an on-going cost so decay more slowly with more subs
session.cost_decay_per_sec = hard_limit / (10000 + 5 * session.sub_count()) session.cost_decay_per_sec = hard_limit / (10000 + 5 * session.sub_count())
try: try:
session.recalc_concurrency() session.recalc_concurrency()
except ExcessiveSessionCostError: except ExcessiveSessionCostError:
await group.spawn(session.close()) sessions_to_close.append(session)
await self._disconnect_sessions(sessions_to_close, 'closing expensive')
del sessions_to_close
def _get_info(self): def _get_info(self):
'''A summary of server state.''' '''A summary of server state.'''
@ -370,6 +360,39 @@ class SessionManager(object):
self.hsub_results = {'hex': raw.hex(), 'height': height} self.hsub_results = {'hex': raw.hex(), 'height': height}
self.notified_height = height self.notified_height = height
def _session_references(self, items, special_strings):
'''Return a SessionReferences object.'''
if not isinstance(items, list) or not all(isinstance(item, str) for item in items):
raise RPCError(BAD_REQUEST, 'expected a list of session IDs')
sessions_by_id = {session.session_id: session for session in self.sessions}
groups_by_name = self.session_groups
sessions = set()
groups = set() # Names as groups are not hashable
specials = set()
unknown = set()
for item in items:
if item.isdigit():
session = sessions_by_id.get(int(item))
if session:
sessions.add(session)
else:
unknown.add(item)
else:
lc_item = item.lower()
if lc_item in special_strings:
specials.add(lc_item)
else:
if lc_item in groups_by_name:
groups.add(lc_item)
else:
unknown.add(item)
groups = [groups_by_name[group] for group in groups]
return SessionReferences(sessions, groups, specials, unknown)
# --- LocalRPC command handlers # --- LocalRPC command handlers
async def rpc_add_peer(self, real_name): async def rpc_add_peer(self, real_name):
@ -385,29 +408,61 @@ class SessionManager(object):
session_ids: array of session IDs session_ids: array of session IDs
''' '''
async def close(session): refs = self._session_references(session_ids, {'all'})
'''Close the session's transport.''' result = []
await session.close(force_after=2)
return f'disconnected {session.session_id}'
return await self._for_each_session(session_ids, close) if 'all' in refs.specials:
sessions = self.sessions
result.append('disconnecting all sessions')
else:
sessions = refs.sessions
result.extend(f'disconnecting session {session.session_id}' for session in sessions)
for group in refs.groups:
result.append(f'disconnecting group {group.name}')
sessions.update(group.sessions)
result.extend(f'unknown: {item}' for item in refs.unknown)
await self._disconnect_sessions(sessions, 'local RPC request to disconnect')
return result
async def rpc_log(self, session_ids): async def rpc_log(self, session_ids):
'''Toggle logging of sesssions. '''Toggle logging of sesssions.
session_ids: array of session IDs session_ids: array of session or group IDs, or 'all', 'none', 'new'
''' '''
async def toggle_logging(session): refs = self._session_references(session_ids, {'all', 'none', 'new'})
'''Toggle logging of the session.'''
session.toggle_logging()
return f'log {session.session_id}: {session.log_me}'
return await self._for_each_session(session_ids, toggle_logging) result = []
if 'all' in refs.specials:
for session in self.sessions:
session.log_me = True
SessionBase.log_new = True
result.append('logging all sessions')
elif 'none' in refs.specials:
for session in self.sessions:
session.log_me = False
SessionBase.log_new = False
result.append('logging no sessions')
async def rpc_lognew(self): def add_result(text, value):
'''Toggle logging of new sesssions.''' result.append(f'logging {text}' if value else f'not logging {text}')
if not result:
sessions = refs.sessions
if 'new' in refs.specials:
SessionBase.log_new = not SessionBase.log_new SessionBase.log_new = not SessionBase.log_new
return f'lognew: {SessionBase.log_new}' add_result('new sessions', SessionBase.log_new)
for session in sessions:
session.log_me = not session.log_me
add_result(f'session {session.session_id}', session.log_me)
for group in refs.groups:
for session in group.sessions.difference(sessions):
sessions.add(session)
session.log_me = not session.log_me
add_result(f'session {session.session_id}', session.log_me)
result.extend(f'unknown: {item}' for item in refs.unknown)
return result
async def rpc_daemon_url(self, daemon_url): async def rpc_daemon_url(self, daemon_url):
'''Replace the daemon URL.''' '''Replace the daemon URL.'''
@ -538,7 +593,7 @@ class SessionManager(object):
await self._start_external_servers() await self._start_external_servers()
# Peer discovery should start after the external servers # Peer discovery should start after the external servers
# because we connect to ourself # because we connect to ourself
async with TaskGroup() as group: async with self._task_group as group:
await group.spawn(self.peer_mgr.discover_peers()) await group.spawn(self.peer_mgr.discover_peers())
await group.spawn(self._clear_stale_sessions()) await group.spawn(self._clear_stale_sessions())
await group.spawn(self._recalc_concurrency()) await group.spawn(self._recalc_concurrency())
@ -690,9 +745,8 @@ class SessionManager(object):
for hashX in set(cache).intersection(touched): for hashX in set(cache).intersection(touched):
del cache[hashX] del cache[hashX]
async with TaskGroup() as group:
for session in self.sessions: for session in self.sessions:
await group.spawn(session.notify, touched, height_changed) await self._task_group.spawn(session.notify, touched, height_changed)
def _ip_addr_group_name(self, session): def _ip_addr_group_name(self, session):
ip_addr = session.peer_address() ip_addr = session.peer_address()
@ -773,9 +827,6 @@ class SessionBase(RPCSession):
return 'xx.xx.xx.xx:xx' return 'xx.xx.xx.xx:xx'
return super().peer_address_str() return super().peer_address_str()
def toggle_logging(self):
self.log_me = not self.log_me
def flags(self): def flags(self):
'''Status flags.''' '''Status flags.'''
status = self.kind[0] status = self.kind[0]

5
electrumx_rpc

@ -22,7 +22,6 @@ import electrumx.lib.text as text
simple_commands = { simple_commands = {
'getinfo': 'Print a summary of server state', 'getinfo': 'Print a summary of server state',
'groups': 'Print current session groups', 'groups': 'Print current session groups',
'lognew': 'Toggle logging of new sessions',
'peers': 'Print information about peer servers for the same coin', 'peers': 'Print information about peer servers for the same coin',
'sessions': 'Print information about client sessions', 'sessions': 'Print information about client sessions',
'stop': 'Shut down the server cleanly', 'stop': 'Shut down the server cleanly',
@ -30,7 +29,7 @@ simple_commands = {
session_commands = { session_commands = {
'disconnect': 'Disconnect sessions', 'disconnect': 'Disconnect sessions',
'log': 'Toggle logging of sessions', 'log': 'Control logging of sessions',
} }
other_commands = { other_commands = {
@ -94,7 +93,7 @@ def main():
for command, help in session_commands.items(): for command, help in session_commands.items():
parser = subparsers.add_parser(command, help=help) parser = subparsers.add_parser(command, help=help)
parser.add_argument('session_ids', nargs='+', type=int, parser.add_argument('session_ids', nargs='+', type=str,
help='list of session ids') help='list of session ids')
for command, data in other_commands.items(): for command, data in other_commands.items():

Loading…
Cancel
Save