Browse Source

Add session logging facility

Move session logging code to protocol.py from electrum_rpc.py
Use it for periodic logging controlled by envvar LOG_SESSIONS
For each session, track sent transaction stats and show that
per-session instead of errors
master
Neil Booth 8 years ago
parent
commit
c08ade5861
  1. 3
      docs/ENV-NOTES
  2. 26
      electrumx_rpc.py
  3. 1
      server/env.py
  4. 66
      server/protocol.py

3
docs/ENV-NOTES

@ -36,6 +36,9 @@ BANNER_FILE - a path to a banner file to serve to clients. The banner file
is re-read for each new client. The string $VERSION in your is re-read for each new client. The string $VERSION in your
banner file will be replaced with the ElectrumX version you banner file will be replaced with the ElectrumX version you
are runnning, such as 'ElectrumX 0.7.11'. are runnning, such as 'ElectrumX 0.7.11'.
LOG_SESSIONS - the number of seconds between printing session statistics to
the log. Defaults to 3600. Set to zero to suppress this
logging.
ANON_LOGS - set to anything non-empty to remove IP addresses from ANON_LOGS - set to anything non-empty to remove IP addresses from
logs. By default IP addresses will be logged. logs. By default IP addresses will be logged.
DONATION_ADDRESS - server donation address. Defaults to none. DONATION_ADDRESS - server donation address. Defaults to none.

26
electrumx_rpc.py

@ -17,6 +17,7 @@ from functools import partial
from os import environ from os import environ
from lib.jsonrpc import JSONRPC from lib.jsonrpc import JSONRPC
from server.protocol import ServerManager
class RPCClient(JSONRPC): class RPCClient(JSONRPC):
@ -36,33 +37,12 @@ class RPCClient(JSONRPC):
async def handle_response(self, result, error, method): async def handle_response(self, result, error, method):
if result and method == 'sessions': if result and method == 'sessions':
self.print_sessions(result) for line in ServerManager.sessions_text_lines(result):
print(line)
else: else:
value = {'error': error} if error else result value = {'error': error} if error else result
print(json.dumps(value, indent=4, sort_keys=True)) print(json.dumps(value, indent=4, sort_keys=True))
def print_sessions(self, result):
def data_fmt(count, size):
return '{:,d}/{:,d}KB'.format(count, size // 1024)
def time_fmt(t):
t = int(t)
return ('{:3d}:{:02d}:{:02d}'
.format(t // 3600, (t % 3600) // 60, t % 60))
fmt = ('{:<4} {:>23} {:>15} {:>7} '
'{:>7} {:>7} {:>7} {:>7} {:>5} {:>9}')
print(fmt.format('Type', 'Peer', 'Client', 'Subs',
'Recv #', 'Recv KB', 'Sent #', 'Sent KB',
'Errs', 'Time'))
for (kind, peer, subs, client, recv_count, recv_size,
send_count, send_size, error_count, time) in result:
print(fmt.format(kind, peer, client, '{:,d}'.format(subs),
'{:,d}'.format(recv_count),
'{:,d}'.format(recv_size // 1024),
'{:,d}'.format(send_count),
'{:,d}'.format(send_size // 1024),
'{:,d}'.format(error_count),
time_fmt(time)))
def main(): def main():
'''Send the RPC command to the server and print the result.''' '''Send the RPC command to the server and print the result.'''

1
server/env.py

@ -41,6 +41,7 @@ class Env(LoggedClass):
self.max_subscriptions = self.integer('MAX_SUBSCRIPTIONS', 10000) self.max_subscriptions = self.integer('MAX_SUBSCRIPTIONS', 10000)
self.banner_file = self.default('BANNER_FILE', None) self.banner_file = self.default('BANNER_FILE', None)
self.anon_logs = self.default('ANON_LOGS', False) self.anon_logs = self.default('ANON_LOGS', False)
self.log_sessions = self.default('LOG_SESSIONS', 3600)
# The electrum client takes the empty string as unspecified # The electrum client takes the empty string as unspecified
self.donation_address = self.default('DONATION_ADDRESS', '') self.donation_address = self.default('DONATION_ADDRESS', '')
self.db_engine = self.default('DB_ENGINE', 'leveldb') self.db_engine = self.default('DB_ENGINE', 'leveldb')

66
server/protocol.py

@ -224,6 +224,8 @@ class ServerManager(util.LoggedClass):
self.env = env self.env = env
self.servers = [] self.servers = []
self.sessions = {} self.sessions = {}
self.txs_sent = 0
self.next_log_sessions = 0
self.max_subs = env.max_subs self.max_subs = env.max_subs
self.subscription_count = 0 self.subscription_count = 0
self.futures = [] self.futures = []
@ -314,6 +316,13 @@ class ServerManager(util.LoggedClass):
# Use a tuple to distinguish from JSON # Use a tuple to distinguish from JSON
triple = (self.bp.db_height, touched, cache) triple = (self.bp.db_height, touched, cache)
session.messages.put_nowait(triple) session.messages.put_nowait(triple)
# Periodically log sessions
if self.env.log_sessions and time.time() > self.next_log_sessions:
data = self.session_data(for_log=True)
for line in ServerManager.sessions_text_lines(data):
self.logger.info(line)
self.logger.info(json.dumps(self.server_summary()))
self.next_log_sessions = time.time() + self.env.log_sessions
async def shutdown(self): async def shutdown(self):
'''Call to shutdown the servers. Returns when done.''' '''Call to shutdown the servers. Returns when done.'''
@ -372,32 +381,66 @@ class ServerManager(util.LoggedClass):
return self.irc.peers return self.irc.peers
def session_count(self): def session_count(self):
'''Returns a dictionary.''' '''The number of connections that we've sent something to.'''
active = len([s for s in self.sessions if s.send_count]) return len([s for s in self.sessions if s.send_count])
total = len(self.sessions)
return {'active': active, 'inert': total - active, 'total': total}
async def rpc_getinfo(self, params): def server_summary(self):
'''The RPC 'getinfo' call.''' '''A one-line summary of server state.'''
return { return {
'blocks': self.bp.db_height, 'blocks': self.bp.db_height,
'errors': sum(s.error_count for s in self.sessions),
'peers': len(self.irc.peers), 'peers': len(self.irc.peers),
'sessions': self.session_count(), 'sessions': self.session_count(),
'txs_sent': self.txs_sent,
'watched': self.subscription_count, 'watched': self.subscription_count,
} }
async def rpc_sessions(self, params): @staticmethod
def sessions_text_lines(data):
'''A generator returning lines for a list of sessions.
data is the return value of rpc_sessions().'''
def time_fmt(t):
t = int(t)
return ('{:3d}:{:02d}:{:02d}'
.format(t // 3600, (t % 3600) // 60, t % 60))
fmt = ('{:<4} {:>23} {:>15} {:>7} '
'{:>7} {:>7} {:>7} {:>7} {:>5} {:>9}')
yield fmt.format('Type', 'Peer', 'Client', 'Subs',
'Recv', 'Recv KB', 'Sent', 'Sent KB',
'Txs', 'Time')
for (kind, peer, subs, client, recv_count, recv_size,
send_count, send_size, txs_sent, time) in data:
yield fmt.format(kind, peer, client,
'{:,d}'.format(subs),
'{:,d}'.format(recv_count),
'{:,d}'.format(recv_size // 1024),
'{:,d}'.format(send_count),
'{:,d}'.format(send_size // 1024),
'{:,d}'.format(txs_sent),
time_fmt(time))
def session_data(self, for_log):
'''Returned to the RPC 'sessions' call.''' '''Returned to the RPC 'sessions' call.'''
now = time.time() now = time.time()
sessions = sorted(self.sessions.keys(), key=lambda s: s.start)
return [(session.kind, return [(session.kind,
session.peername(for_log=False), session.peername(for_log=for_log),
session.sub_count(), session.sub_count(),
session.client, session.client,
session.recv_count, session.recv_size, session.recv_count, session.recv_size,
session.send_count, session.send_size, session.send_count, session.send_size,
session.error_count, session.txs_sent,
now - session.start) now - session.start)
for session in self.sessions] for session in sessions]
async def rpc_getinfo(self, params):
return self.server_summary()
async def rpc_sessions(self, params):
return self.session_data(for_log=False)
async def rpc_numsessions(self, params): async def rpc_numsessions(self, params):
return self.session_count() return self.session_count()
@ -428,6 +471,7 @@ class Session(JSONRPC):
self.client = 'unknown' self.client = 'unknown'
self.anon_logs = env.anon_logs self.anon_logs = env.anon_logs
self.max_send = env.max_send self.max_send = env.max_send
self.txs_sent = 0
def connection_made(self, transport): def connection_made(self, transport):
'''Handle an incoming client connection.''' '''Handle an incoming client connection.'''
@ -776,6 +820,8 @@ class ElectrumX(Session):
''' '''
try: try:
tx_hash = await self.daemon.sendrawtransaction(params) tx_hash = await self.daemon.sendrawtransaction(params)
self.txs_sent += 1
self.manager.txs_sent += 1
self.logger.info('sent tx: {}'.format(tx_hash)) self.logger.info('sent tx: {}'.format(tx_hash))
return tx_hash return tx_hash
except DaemonError as e: except DaemonError as e:

Loading…
Cancel
Save