diff --git a/lib/jsonrpc.py b/lib/jsonrpc.py index 7aa1764..f58cf6c 100644 --- a/lib/jsonrpc.py +++ b/lib/jsonrpc.py @@ -64,6 +64,7 @@ class JSONRPC(asyncio.Protocol, LoggedClass): INTERNAL_ERROR = -32603 ID_TYPES = (type(None), str, numbers.Number) + NEXT_SESSION_ID = 0 class RPCError(Exception): '''RPC handlers raise this error.''' @@ -104,6 +105,10 @@ class JSONRPC(asyncio.Protocol, LoggedClass): # If buffered incoming data exceeds this the connection is closed self.max_buffer_size = 1000000 self.anon_logs = False + self.id_ = JSONRPC.NEXT_SESSION_ID + JSONRPC.NEXT_SESSION_ID += 1 + self.log_prefix = '[{:d}] '.format(self.id_) + self.log_me = False def peername(self, *, for_log=True): '''Return the peer name of this connection.''' @@ -141,10 +146,10 @@ class JSONRPC(asyncio.Protocol, LoggedClass): # Close abuvsive connections where buffered data exceeds limit buffer_size = len(data) + sum(len(part) for part in self.parts) if buffer_size > self.max_buffer_size: - self.logger.error('read buffer of {:,d} bytes exceeds {:,d} ' - 'byte limit, closing {}' - .format(buffer_size, self.max_buffer_size, - self.peername())) + self.log_error('read buffer of {:,d} bytes exceeds {:,d} ' + 'byte limit, closing {}' + .format(buffer_size, self.max_buffer_size, + self.peername())) self.transport.close() # Do nothing if this connection is closing @@ -186,6 +191,8 @@ class JSONRPC(asyncio.Protocol, LoggedClass): '''Queue the request for asynchronous handling.''' self.messages.put_nowait(message) + if self.log_me: + self.log_info('queued {}'.format(message)) def send_json_notification(self, method, params): '''Create a json notification.''' @@ -218,7 +225,7 @@ class JSONRPC(asyncio.Protocol, LoggedClass): data = (json.dumps(payload) + '\n').encode() except TypeError: msg = 'JSON encoding failure: {}'.format(payload) - self.logger.error(msg) + self.log_error(msg) self.send_json_error(msg, self.INTERNAL_ERROR, id_) else: if len(data) > max(1000, self.max_send): @@ -241,10 +248,9 @@ class JSONRPC(asyncio.Protocol, LoggedClass): excess = self.bandwidth_used - self.bandwidth_limit if excess > 0: secs = 1 + excess // self.bandwidth_limit - self.logger.warning('{} has high bandwidth use of {:,d} bytes, ' - 'sleeping {:d}s' - .format(self.peername(), self.bandwidth_used, - secs)) + self.log_warning('high bandwidth use of {:,d} bytes, ' + 'sleeping {:d}s' + .format(self.bandwidth_used, secs)) await asyncio.sleep(secs) if isinstance(message, list): @@ -256,8 +262,7 @@ class JSONRPC(asyncio.Protocol, LoggedClass): try: self.send_json(payload) except self.LargeRequestError: - self.logger.warning('blocked large request from {}: {}' - .format(self.peername(), message)) + self.log_warning('blocked large request {}'.format(message)) async def batch_payload(self, batch): '''Return the JSON payload corresponding to a batch JSON request.''' diff --git a/lib/util.py b/lib/util.py index 3feab9d..de0d2f7 100644 --- a/lib/util.py +++ b/lib/util.py @@ -20,6 +20,16 @@ class LoggedClass(object): def __init__(self): self.logger = logging.getLogger(self.__class__.__name__) self.logger.setLevel(logging.INFO) + self.log_prefix = '' + + def log_info(self, msg): + self.logger.info(self.log_prefix + msg) + + def log_warning(self, msg): + self.logger.warning(self.log_prefix + msg) + + def log_error(self, msg): + self.logger.error(self.log_prefix + msg) # Method decorator. To be used for calculations that will always diff --git a/server/protocol.py b/server/protocol.py index 2ebc82d..531b3e0 100644 --- a/server/protocol.py +++ b/server/protocol.py @@ -361,21 +361,30 @@ class ServerManager(util.LoggedClass): coro = session.serve_requests() future = asyncio.ensure_future(coro) self.sessions[session] = future - self.logger.info('connection from {}, {:,d} total' + session.log_info('connection from {}, {:,d} total' .format(session.peername(), len(self.sessions))) # Some connections are acknowledged after the servers are closed if not self.servers: self.close_session(session) + def remove_session(self, session): + self.subscription_count -= session.sub_count() + future = self.sessions.pop(session) + future.cancel() + def close_session(self, session): '''Close the session's transport and cancel its future.''' session.transport.close() self.sessions[session].cancel() + return '{:d} disconnected'.format(session.id_) - def remove_session(self, session): - self.subscription_count -= session.sub_count() - future = self.sessions.pop(session) - future.cancel() + def toggle_logging(self, session): + '''Close the session's transport and cancel its future.''' + session.log_me = not session.log_me + if session.log_me: + return 'logging {:d}'.format(session.id_) + else: + return 'not logging {:d}'.format(session.id_) def clear_stale_sessions(self): '''Cut off sessions that haven't done anything for 10 minutes.''' @@ -426,14 +435,15 @@ class ServerManager(util.LoggedClass): return ('{:3d}:{:02d}:{:02d}' .format(t // 3600, (t % 3600) // 60, t % 60)) - fmt = ('{:<4} {:>23} {:>15} {:>7} ' + fmt = ('{:<4} {:>7} {:>23} {:>15} {:>7} ' '{:>7} {:>7} {:>7} {:>7} {:>5} {:>9}') - yield fmt.format('Type', 'Peer', 'Client', 'Subs', + yield fmt.format('Type', 'ID ', 'Peer', 'Client', 'Subs', 'Recv', 'Recv KB', 'Sent', 'Sent KB', 'Txs', 'Time') - for (kind, peer, subs, client, recv_count, recv_size, + for (kind, id_, log_me, peer, subs, client, recv_count, recv_size, send_count, send_size, txs_sent, time) in data: - yield fmt.format(kind, peer, client, + yield fmt.format(kind, str(id_) + ('L' if log_me else ' '), + peer, client, '{:,d}'.format(subs), '{:,d}'.format(recv_count), '{:,d}'.format(recv_size // 1024), @@ -447,6 +457,8 @@ class ServerManager(util.LoggedClass): now = time.time() sessions = sorted(self.sessions.keys(), key=lambda s: s.start) return [(session.kind, + session.id_, + session.log_me, session.peername(for_log=for_log), session.sub_count(), session.client, @@ -456,6 +468,33 @@ class ServerManager(util.LoggedClass): now - session.start) for session in sessions] + def lookup_session(self, param): + try: + id_ = int(param) + except: + pass + else: + for session in self.sessions: + if session.id_ == id_: + return session + return None + + def for_each_session(self, params, operation): + result = [] + for param in params: + session = self.lookup_session(param) + if session: + result.append(operation(session)) + else: + result.append('unknown session: {}'.format(param)) + return result + + async def rpc_disconnect(self, params): + return self.for_each_session(params, self.close_session) + + async def rpc_log(self, params): + return self.for_each_session(params, self.toggle_logging) + async def rpc_getinfo(self, params): return self.server_summary() @@ -503,10 +542,10 @@ class Session(JSONRPC): '''Handle client disconnection.''' super().connection_lost(exc) if self.error_count or self.send_size >= 1024*1024: - self.logger.info('{} disconnected. ' - 'Sent {:,d} bytes in {:,d} messages {:,d} errors' - .format(self.peername(), self.send_size, - self.send_count, self.error_count)) + self.log_info('disconnected. Sent {:,d} bytes in {:,d} messages ' + '{:,d} errors' + .format(self.send_size, self.send_count, + self.error_count)) self.manager.remove_session(self) async def handle_request(self, method, params): @@ -532,7 +571,7 @@ class Session(JSONRPC): break except Exception: # Getting here should probably be considered a bug and fixed - self.logger.error('error handling request {}'.format(message)) + self.log_error('error handling request {}'.format(message)) traceback.print_exc() def sub_count(self): @@ -656,8 +695,7 @@ class ElectrumX(Session): self.send_json(payload) if matches: - self.logger.info('notified {} of {} addresses' - .format(self.peername(), len(matches))) + self.log_info('notified of {:,d} addresses'.format(len(matches))) def height(self): '''Return the current flushed database height.''' @@ -843,12 +881,12 @@ class ElectrumX(Session): tx_hash = await self.daemon.sendrawtransaction(params) self.txs_sent += 1 self.manager.txs_sent += 1 - self.logger.info('sent tx: {}'.format(tx_hash)) + self.log_info('sent tx: {}'.format(tx_hash)) return tx_hash except DaemonError as e: error = e.args[0] message = error['message'] - self.logger.info('sendrawtransaction: {}'.format(message)) + self.log_info('sendrawtransaction: {}'.format(message)) if 'non-mandatory-script-verify-flag' in message: return ( 'Your client produced a transaction that is not accepted ' @@ -904,8 +942,8 @@ class ElectrumX(Session): with codecs.open(self.env.banner_file, 'r', 'utf-8') as f: banner = f.read() except Exception as e: - self.logger.error('reading banner file {}: {}' - .format(self.env.banner_file, e)) + self.log_error('reading banner file {}: {}' + .format(self.env.banner_file, e)) else: network_info = await self.daemon.getnetworkinfo() version = network_info['version'] @@ -950,7 +988,8 @@ class LocalRPC(Session): def __init__(self, *args): super().__init__(*args) - cmds = 'getinfo sessions numsessions peers numpeers'.split() + cmds = ('disconnect getinfo log numpeers numsessions peers sessions' + .split()) self.handlers = {cmd: getattr(self.manager, 'rpc_{}'.format(cmd)) for cmd in cmds} self.client = 'RPC'