From 831225492eeafb8cd7a9e768be7f25d28c57046c Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Sun, 13 Nov 2016 14:54:02 +0900 Subject: [PATCH] Better RPC sessions stats --- electrumx_rpc.py | 26 +++++++++++++++++++++----- server/protocol.py | 24 +++++++++++++----------- 2 files changed, 34 insertions(+), 16 deletions(-) diff --git a/electrumx_rpc.py b/electrumx_rpc.py index 0bc7a0e..f7f05e1 100755 --- a/electrumx_rpc.py +++ b/electrumx_rpc.py @@ -44,12 +44,28 @@ class RPCClient(asyncio.Protocol): if error: print("ERROR: {}".format(error)) else: + def data_fmt(count, size): + return '{:,d}/{:,d}KB'.format(count, size // 1024) + def time_fmt(t): + t = int(t) + return ('{:3d}:{:02d}:{:02d}' + .format(t // 3600, (t % 3600) // 60, t % 60)) + if self.method == 'sessions': - fmt = '{:<4} {:>23} {:>7} {:>15} {:>7}' - print(fmt.format('Type', 'Peer', 'Subs', 'Client', 'Time')) - for kind, peer, subs, client, time in result: - print(fmt.format(kind, peer, '{:,d}'.format(subs), - client, '{:,d}'.format(int(time)))) + fmt = ('{:<4} {:>23} {:>15} {:>5} ' + '{:>7} {:>7} {:>7} {:>7} {:>5} {:>9}') + print(fmt.format('Type', 'Peer', 'Client', 'Subs', + 'Snt #', 'Snt MB', 'Rcv #', 'Rcv MB', + 'Errs', 'Time')) + for (kind, peer, subs, client, recv_count, recv_size, + send_count, send_size, error_count, time) in result: + print(fmt.format(kind, peer, client, '{:,d}'.format(subs), + '{:,d}'.format(recv_count), + '{:,.1f}'.format(recv_size / 1048576), + '{:,d}'.format(send_count), + '{:,.1f}'.format(send_size / 1048576), + '{:,d}'.format(error_count), + time_fmt(time))) else: pprint.pprint(result, indent=4) diff --git a/server/protocol.py b/server/protocol.py index 5a17257..fb55466 100644 --- a/server/protocol.py +++ b/server/protocol.py @@ -33,9 +33,13 @@ class BlockServer(BlockProcessor): def __init__(self, env): super().__init__(env) self.server_mgr = ServerManager(self, env) + self.bs_caught_up = False async def caught_up(self, mempool_hashes): await super().caught_up(mempool_hashes) + if not self.bs_caught_up: + await self.server_mgr.start_servers() + self.bs_caught_up = True self.server_mgr.notify(self.height, self.touched) def stop(self): @@ -64,6 +68,7 @@ class ServerManager(LoggedClass): protocol = partial(protocol_class, self, self.bp, self.env, kind) server = loop.create_server(protocol, *args, **kw_args) + host, port = args[:2] try: self.servers.append(await server) except asyncio.CancelledError: @@ -103,17 +108,11 @@ class ServerManager(LoggedClass): else: self.logger.info('IRC disabled') - async def notify(self, height, touched): - '''Notify electrum clients about height changes and touched addresses. - - Start listening if not yet listening. - ''' - if not self.servers: - await self.start_servers() - + def notify(self, height, touched): + '''Notify sessions about height changes and touched addresses.''' sessions = [session for session in self.sessions if isinstance(session, ElectrumX)] - self.ElectrumX.notify(sessions, height, touched) + ElectrumX.notify(sessions, height, touched) def stop(self): '''Close the listening servers.''' @@ -157,7 +156,7 @@ class ServerManager(LoggedClass): return self.irc.peers def session_count(self): - return len(self.manager.sessions) + return len(self.sessions) def info(self): '''Returned in the RPC 'getinfo' call.''' @@ -179,6 +178,9 @@ class ServerManager(LoggedClass): session.peername(), len(session.hash168s), 'RPC' if isinstance(session, LocalRPC) else session.client, + session.recv_count, session.recv_size, + session.send_count, session.send_size, + session.error_count, now - session.start) for session in self.sessions] @@ -211,7 +213,7 @@ class Session(JSONRPC): 'Sent {:,d} bytes in {:,d} messages {:,d} errors' .format(self.peername(), self.send_size, self.send_count, self.error_count)) - self.maanger.remove_session(self) + self.manager.remove_session(self) def method_handler(self, method): '''Return the handler that will handle the RPC method.'''