Browse Source

session.py: groups retain cost of closed sessions

Based on issues pointed out by and discussions with SomberNight
patch-2
Neil Booth 6 years ago
parent
commit
18c5fcad26
  1. 7
      electrumx/lib/text.py
  2. 74
      electrumx/server/session.py

7
electrumx/lib/text.py

@ -33,15 +33,16 @@ def groups_lines(data):
data is the return value of rpc_groups().'''
fmt = ('{:<14} {:>9} {:>8} {:>6} {:>6} {:>8}'
fmt = ('{:<14} {:>9} {:>8} {:>8} {:>6} {:>6} {:>8}'
'{:>7} {:>9} {:>7} {:>9}')
yield fmt.format('Name', 'Sessions', 'Cost', 'Reqs', 'Txs', 'Subs',
yield fmt.format('Name', 'Sessions', 'Cost', 'Retained', 'Reqs', 'Txs', 'Subs',
'Recv', 'Recv KB', 'Sent', 'Sent KB')
for (name, session_count, cost, reqs, txs_sent, subs,
for (name, session_count, cost, retained_cost, reqs, txs_sent, subs,
recv_count, recv_size, send_count, send_size) in data:
yield fmt.format(name,
'{:,d}'.format(session_count),
'{:,d}'.format(int(cost)),
'{:,d}'.format(int(retained_cost)),
'{:,d}'.format(reqs),
'{:,d}'.format(txs_sent),
'{:,d}'.format(subs),

74
electrumx/server/session.py

@ -16,10 +16,10 @@ import os
import pylru
import ssl
import time
from collections import defaultdict
from functools import partial
from ipaddress import ip_address
import attr
from aiorpcx import (
RPCSession, JSONRPCAutoDetect, JSONRPCConnection,
TaskGroup, handler_invocation, RPCError, Request, sleep, Event, FinalRPCError
@ -79,6 +79,13 @@ def assert_tx_hash(value):
raise RPCError(BAD_REQUEST, f'{value} should be a transaction hash')
@attr.s(slots=True)
class SessionGroup(object):
name = attr.ib()
sessions = attr.ib()
retained_cost = attr.ib()
class SessionManager(object):
'''Holds global state about all sessions.'''
@ -93,8 +100,8 @@ class SessionManager(object):
self.shutdown_event = shutdown_event
self.logger = util.class_logger(__name__, self.__class__.__name__)
self.servers = {}
self.sessions = {} # Map of session to iterable of its groups
self.groups = defaultdict(set) # Map of groups to set of sessions
self.sessions = {} # session->iterable of its SessionGroups
self.session_groups = {} # group name->SessionGroup instance
self.txs_sent = 0
self.start_time = time.time()
self.history_cache = pylru.lrucache(256)
@ -233,13 +240,26 @@ class SessionManager(object):
async def _recalc_concurrency(self):
'''Periodically recalculate session concurrency.'''
session_class = self.env.coin.SESSIONCLS
period = 100
while True:
await sleep(100)
await sleep(period)
hard_limit = session_class.cost_hard_limit
# Reduce retained group cost
refund = period * hard_limit / 5000
dead_groups = []
for group in self.session_groups.values():
group.retained_cost = max(0.0, group.retained_cost - refund)
if group.retained_cost == 0 and not group.sessions:
dead_groups.append(group)
# Remove dead groups
for group in dead_groups:
self.session_groups.pop(group.name)
async with TaskGroup() as group:
for session in sorted(self.sessions, key=lambda s: s.cost, reverse=True):
# Subs have an on-going cost so decay more slowly with more subs
session.cost_decay_per_sec = (
session_class.cost_hard_limit / (10000 + 5 * session.sub_count()))
session.cost_decay_per_sec = hard_limit / (10000 + 5 * session.sub_count())
try:
session.recalc_concurrency()
except FinalRPCError:
@ -253,7 +273,7 @@ class SessionManager(object):
'daemon_height': self.daemon.cached_height(),
'db_height': self.db.db_height,
'errors': sum(s.errors for s in self.sessions),
'groups': len(self.groups),
'groups': len(self.session_groups),
'logged': len([s for s in self.sessions if s.log_me]),
'paused': sum(not s._can_send.is_set() for s in self.sessions),
'pid': os.getpid(),
@ -289,10 +309,12 @@ class SessionManager(object):
def _group_data(self):
'''Returned to the RPC 'groups' call.'''
result = []
for group, sessions in self.groups.items():
result.append([group,
for name, group in self.session_groups.items():
sessions = group.sessions
result.append([name,
len(sessions),
sum(s.cost for s in sessions),
group.retained_cost,
sum(s.count_pending_items() for s in sessions),
sum(s.txs_sent for s in sessions),
sum(s.sub_count() for s in sessions),
@ -495,8 +517,10 @@ class SessionManager(object):
groups = self.sessions.get(session)
if groups is None:
return 0
groups_cost = sum(other.cost for group in groups for other in self.groups[group])
return (groups_cost - session.cost * len(groups)) * 0.03
other_sessions_cost = (sum(other.cost for group in groups for other in group.sessions)
- session.cost * len(groups))
retained_cost = sum(group.retained_cost for group in groups)
return (other_sessions_cost + retained_cost) * 0.02
def session_count(self):
'''The number of connections that we've sent something to.'''
@ -558,34 +582,44 @@ class SessionManager(object):
for session in self.sessions:
await group.spawn(session.notify, touched, height_changed)
def ip_addr_bucket(self, session):
def _ip_addr_group_name(self, session):
ip_addr = session._address
if not ip_addr:
return 'unknown_ip_addr'
return 'unknown_ip'
ip_addr = ip_addr[0]
if ':' in ip_addr:
ip_addr = ip_address(ip_addr)
return ':'.join(ip_addr.exploded.split(':')[:3])
return '.'.join(ip_addr.split('.')[:3])
def _timeslice_name(self, session):
return f't{int(session.start_time - self.start_time) // 300}'
def _session_group(self, name):
group = self.session_groups.get(name)
if not group:
group = SessionGroup(name, set(), 0)
self.session_groups[name] = group
return group
def add_session(self, session):
self.session_event.set()
# Return the session groups
time_slot = int(session.start_time - self.start_time) // 900
groups = (f't{time_slot}', self.ip_addr_bucket(session))
groups = (
self._session_group(self._timeslice_name(session)),
self._session_group(self._ip_addr_group_name(session)),
)
self.sessions[session] = groups
for group in groups:
self.groups[group].add(session)
group.sessions.add(session)
def remove_session(self, session):
'''Remove a session from our sessions list if there.'''
self.session_event.set()
groups = self.sessions.pop(session)
for group in groups:
group_set = self.groups[group]
group_set.remove(session)
if not group_set:
self.groups.pop(group)
group.retained_cost += session.cost
group.sessions.remove(session)
class SessionBase(RPCSession):

Loading…
Cancel
Save