Browse Source

Add session ID for each session

Show it in the logs with each session-specific message.
Show the sessions ID in the sessions list with an L suffix if logging.
Add RPC commands to toggle logging of, and disconnect, a session.
Closes #55
master
Neil Booth 8 years ago
parent
commit
c4e7878407
  1. 27
      lib/jsonrpc.py
  2. 10
      lib/util.py
  3. 81
      server/protocol.py

27
lib/jsonrpc.py

@ -64,6 +64,7 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
INTERNAL_ERROR = -32603 INTERNAL_ERROR = -32603
ID_TYPES = (type(None), str, numbers.Number) ID_TYPES = (type(None), str, numbers.Number)
NEXT_SESSION_ID = 0
class RPCError(Exception): class RPCError(Exception):
'''RPC handlers raise this error.''' '''RPC handlers raise this error.'''
@ -104,6 +105,10 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
# If buffered incoming data exceeds this the connection is closed # If buffered incoming data exceeds this the connection is closed
self.max_buffer_size = 1000000 self.max_buffer_size = 1000000
self.anon_logs = False self.anon_logs = False
self.id_ = JSONRPC.NEXT_SESSION_ID
JSONRPC.NEXT_SESSION_ID += 1
self.log_prefix = '[{:d}] '.format(self.id_)
self.log_me = False
def peername(self, *, for_log=True): def peername(self, *, for_log=True):
'''Return the peer name of this connection.''' '''Return the peer name of this connection.'''
@ -141,10 +146,10 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
# Close abuvsive connections where buffered data exceeds limit # Close abuvsive connections where buffered data exceeds limit
buffer_size = len(data) + sum(len(part) for part in self.parts) buffer_size = len(data) + sum(len(part) for part in self.parts)
if buffer_size > self.max_buffer_size: if buffer_size > self.max_buffer_size:
self.logger.error('read buffer of {:,d} bytes exceeds {:,d} ' self.log_error('read buffer of {:,d} bytes exceeds {:,d} '
'byte limit, closing {}' 'byte limit, closing {}'
.format(buffer_size, self.max_buffer_size, .format(buffer_size, self.max_buffer_size,
self.peername())) self.peername()))
self.transport.close() self.transport.close()
# Do nothing if this connection is closing # Do nothing if this connection is closing
@ -186,6 +191,8 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
'''Queue the request for asynchronous handling.''' '''Queue the request for asynchronous handling.'''
self.messages.put_nowait(message) self.messages.put_nowait(message)
if self.log_me:
self.log_info('queued {}'.format(message))
def send_json_notification(self, method, params): def send_json_notification(self, method, params):
'''Create a json notification.''' '''Create a json notification.'''
@ -218,7 +225,7 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
data = (json.dumps(payload) + '\n').encode() data = (json.dumps(payload) + '\n').encode()
except TypeError: except TypeError:
msg = 'JSON encoding failure: {}'.format(payload) msg = 'JSON encoding failure: {}'.format(payload)
self.logger.error(msg) self.log_error(msg)
self.send_json_error(msg, self.INTERNAL_ERROR, id_) self.send_json_error(msg, self.INTERNAL_ERROR, id_)
else: else:
if len(data) > max(1000, self.max_send): if len(data) > max(1000, self.max_send):
@ -241,10 +248,9 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
excess = self.bandwidth_used - self.bandwidth_limit excess = self.bandwidth_used - self.bandwidth_limit
if excess > 0: if excess > 0:
secs = 1 + excess // self.bandwidth_limit secs = 1 + excess // self.bandwidth_limit
self.logger.warning('{} has high bandwidth use of {:,d} bytes, ' self.log_warning('high bandwidth use of {:,d} bytes, '
'sleeping {:d}s' 'sleeping {:d}s'
.format(self.peername(), self.bandwidth_used, .format(self.bandwidth_used, secs))
secs))
await asyncio.sleep(secs) await asyncio.sleep(secs)
if isinstance(message, list): if isinstance(message, list):
@ -256,8 +262,7 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
try: try:
self.send_json(payload) self.send_json(payload)
except self.LargeRequestError: except self.LargeRequestError:
self.logger.warning('blocked large request from {}: {}' self.log_warning('blocked large request {}'.format(message))
.format(self.peername(), message))
async def batch_payload(self, batch): async def batch_payload(self, batch):
'''Return the JSON payload corresponding to a batch JSON request.''' '''Return the JSON payload corresponding to a batch JSON request.'''

10
lib/util.py

@ -20,6 +20,16 @@ class LoggedClass(object):
def __init__(self): def __init__(self):
self.logger = logging.getLogger(self.__class__.__name__) self.logger = logging.getLogger(self.__class__.__name__)
self.logger.setLevel(logging.INFO) self.logger.setLevel(logging.INFO)
self.log_prefix = ''
def log_info(self, msg):
self.logger.info(self.log_prefix + msg)
def log_warning(self, msg):
self.logger.warning(self.log_prefix + msg)
def log_error(self, msg):
self.logger.error(self.log_prefix + msg)
# Method decorator. To be used for calculations that will always # Method decorator. To be used for calculations that will always

81
server/protocol.py

@ -361,21 +361,30 @@ class ServerManager(util.LoggedClass):
coro = session.serve_requests() coro = session.serve_requests()
future = asyncio.ensure_future(coro) future = asyncio.ensure_future(coro)
self.sessions[session] = future self.sessions[session] = future
self.logger.info('connection from {}, {:,d} total' session.log_info('connection from {}, {:,d} total'
.format(session.peername(), len(self.sessions))) .format(session.peername(), len(self.sessions)))
# Some connections are acknowledged after the servers are closed # Some connections are acknowledged after the servers are closed
if not self.servers: if not self.servers:
self.close_session(session) self.close_session(session)
def remove_session(self, session):
self.subscription_count -= session.sub_count()
future = self.sessions.pop(session)
future.cancel()
def close_session(self, session): def close_session(self, session):
'''Close the session's transport and cancel its future.''' '''Close the session's transport and cancel its future.'''
session.transport.close() session.transport.close()
self.sessions[session].cancel() self.sessions[session].cancel()
return '{:d} disconnected'.format(session.id_)
def remove_session(self, session): def toggle_logging(self, session):
self.subscription_count -= session.sub_count() '''Close the session's transport and cancel its future.'''
future = self.sessions.pop(session) session.log_me = not session.log_me
future.cancel() if session.log_me:
return 'logging {:d}'.format(session.id_)
else:
return 'not logging {:d}'.format(session.id_)
def clear_stale_sessions(self): def clear_stale_sessions(self):
'''Cut off sessions that haven't done anything for 10 minutes.''' '''Cut off sessions that haven't done anything for 10 minutes.'''
@ -426,14 +435,15 @@ class ServerManager(util.LoggedClass):
return ('{:3d}:{:02d}:{:02d}' return ('{:3d}:{:02d}:{:02d}'
.format(t // 3600, (t % 3600) // 60, t % 60)) .format(t // 3600, (t % 3600) // 60, t % 60))
fmt = ('{:<4} {:>23} {:>15} {:>7} ' fmt = ('{:<4} {:>7} {:>23} {:>15} {:>7} '
'{:>7} {:>7} {:>7} {:>7} {:>5} {:>9}') '{:>7} {:>7} {:>7} {:>7} {:>5} {:>9}')
yield fmt.format('Type', 'Peer', 'Client', 'Subs', yield fmt.format('Type', 'ID ', 'Peer', 'Client', 'Subs',
'Recv', 'Recv KB', 'Sent', 'Sent KB', 'Recv', 'Recv KB', 'Sent', 'Sent KB',
'Txs', 'Time') 'Txs', 'Time')
for (kind, peer, subs, client, recv_count, recv_size, for (kind, id_, log_me, peer, subs, client, recv_count, recv_size,
send_count, send_size, txs_sent, time) in data: send_count, send_size, txs_sent, time) in data:
yield fmt.format(kind, peer, client, yield fmt.format(kind, str(id_) + ('L' if log_me else ' '),
peer, client,
'{:,d}'.format(subs), '{:,d}'.format(subs),
'{:,d}'.format(recv_count), '{:,d}'.format(recv_count),
'{:,d}'.format(recv_size // 1024), '{:,d}'.format(recv_size // 1024),
@ -447,6 +457,8 @@ class ServerManager(util.LoggedClass):
now = time.time() now = time.time()
sessions = sorted(self.sessions.keys(), key=lambda s: s.start) sessions = sorted(self.sessions.keys(), key=lambda s: s.start)
return [(session.kind, return [(session.kind,
session.id_,
session.log_me,
session.peername(for_log=for_log), session.peername(for_log=for_log),
session.sub_count(), session.sub_count(),
session.client, session.client,
@ -456,6 +468,33 @@ class ServerManager(util.LoggedClass):
now - session.start) now - session.start)
for session in sessions] for session in sessions]
def lookup_session(self, param):
try:
id_ = int(param)
except:
pass
else:
for session in self.sessions:
if session.id_ == id_:
return session
return None
def for_each_session(self, params, operation):
result = []
for param in params:
session = self.lookup_session(param)
if session:
result.append(operation(session))
else:
result.append('unknown session: {}'.format(param))
return result
async def rpc_disconnect(self, params):
return self.for_each_session(params, self.close_session)
async def rpc_log(self, params):
return self.for_each_session(params, self.toggle_logging)
async def rpc_getinfo(self, params): async def rpc_getinfo(self, params):
return self.server_summary() return self.server_summary()
@ -503,10 +542,10 @@ class Session(JSONRPC):
'''Handle client disconnection.''' '''Handle client disconnection.'''
super().connection_lost(exc) super().connection_lost(exc)
if self.error_count or self.send_size >= 1024*1024: if self.error_count or self.send_size >= 1024*1024:
self.logger.info('{} disconnected. ' self.log_info('disconnected. Sent {:,d} bytes in {:,d} messages '
'Sent {:,d} bytes in {:,d} messages {:,d} errors' '{:,d} errors'
.format(self.peername(), self.send_size, .format(self.send_size, self.send_count,
self.send_count, self.error_count)) self.error_count))
self.manager.remove_session(self) self.manager.remove_session(self)
async def handle_request(self, method, params): async def handle_request(self, method, params):
@ -532,7 +571,7 @@ class Session(JSONRPC):
break break
except Exception: except Exception:
# Getting here should probably be considered a bug and fixed # Getting here should probably be considered a bug and fixed
self.logger.error('error handling request {}'.format(message)) self.log_error('error handling request {}'.format(message))
traceback.print_exc() traceback.print_exc()
def sub_count(self): def sub_count(self):
@ -656,8 +695,7 @@ class ElectrumX(Session):
self.send_json(payload) self.send_json(payload)
if matches: if matches:
self.logger.info('notified {} of {} addresses' self.log_info('notified of {:,d} addresses'.format(len(matches)))
.format(self.peername(), len(matches)))
def height(self): def height(self):
'''Return the current flushed database height.''' '''Return the current flushed database height.'''
@ -843,12 +881,12 @@ class ElectrumX(Session):
tx_hash = await self.daemon.sendrawtransaction(params) tx_hash = await self.daemon.sendrawtransaction(params)
self.txs_sent += 1 self.txs_sent += 1
self.manager.txs_sent += 1 self.manager.txs_sent += 1
self.logger.info('sent tx: {}'.format(tx_hash)) self.log_info('sent tx: {}'.format(tx_hash))
return tx_hash return tx_hash
except DaemonError as e: except DaemonError as e:
error = e.args[0] error = e.args[0]
message = error['message'] message = error['message']
self.logger.info('sendrawtransaction: {}'.format(message)) self.log_info('sendrawtransaction: {}'.format(message))
if 'non-mandatory-script-verify-flag' in message: if 'non-mandatory-script-verify-flag' in message:
return ( return (
'Your client produced a transaction that is not accepted ' 'Your client produced a transaction that is not accepted '
@ -904,8 +942,8 @@ class ElectrumX(Session):
with codecs.open(self.env.banner_file, 'r', 'utf-8') as f: with codecs.open(self.env.banner_file, 'r', 'utf-8') as f:
banner = f.read() banner = f.read()
except Exception as e: except Exception as e:
self.logger.error('reading banner file {}: {}' self.log_error('reading banner file {}: {}'
.format(self.env.banner_file, e)) .format(self.env.banner_file, e))
else: else:
network_info = await self.daemon.getnetworkinfo() network_info = await self.daemon.getnetworkinfo()
version = network_info['version'] version = network_info['version']
@ -950,7 +988,8 @@ class LocalRPC(Session):
def __init__(self, *args): def __init__(self, *args):
super().__init__(*args) super().__init__(*args)
cmds = 'getinfo sessions numsessions peers numpeers'.split() cmds = ('disconnect getinfo log numpeers numsessions peers sessions'
.split())
self.handlers = {cmd: getattr(self.manager, 'rpc_{}'.format(cmd)) self.handlers = {cmd: getattr(self.manager, 'rpc_{}'.format(cmd))
for cmd in cmds} for cmd in cmds}
self.client = 'RPC' self.client = 'RPC'

Loading…
Cancel
Save