|
|
@ -83,9 +83,16 @@ def assert_tx_hash(value): |
|
|
|
@attr.s(slots=True) |
|
|
|
class SessionGroup(object): |
|
|
|
name = attr.ib() |
|
|
|
weight = attr.ib() |
|
|
|
sessions = attr.ib() |
|
|
|
retained_cost = attr.ib() |
|
|
|
|
|
|
|
def session_cost(self): |
|
|
|
return sum(session.cost for session in self.sessions) |
|
|
|
|
|
|
|
def cost(self): |
|
|
|
return self.retained_cost + self.session_cost() |
|
|
|
|
|
|
|
|
|
|
|
class SessionManager(object): |
|
|
|
'''Holds global state about all sessions.''' |
|
|
@ -314,7 +321,7 @@ class SessionManager(object): |
|
|
|
sessions = group.sessions |
|
|
|
result.append([name, |
|
|
|
len(sessions), |
|
|
|
sum(s.cost for s in sessions), |
|
|
|
group.session_cost(), |
|
|
|
group.retained_cost, |
|
|
|
sum(s.unanswered_request_count() for s in sessions), |
|
|
|
sum(s.txs_sent for s in sessions), |
|
|
@ -506,17 +513,13 @@ class SessionManager(object): |
|
|
|
await group.spawn(session.close(force_after=1)) |
|
|
|
|
|
|
|
def extra_cost(self, session): |
|
|
|
# Add 3% of the cost of other sessions in its groups |
|
|
|
# Note there is no guarantee that session is still in self.sessions. Example traceback: |
|
|
|
# notify_sessions->notify->address_status->bump_cost->recalc_concurrency->extra_cost |
|
|
|
# during which there are many places the sesssion could be removed |
|
|
|
groups = self.sessions.get(session) |
|
|
|
if groups is None: |
|
|
|
return 0 |
|
|
|
other_sessions_cost = (sum(other.cost for group in groups for other in group.sessions) |
|
|
|
- session.cost * len(groups)) * 0.03 |
|
|
|
retained_cost = sum(group.retained_cost / len(group.sessions) for group in groups) |
|
|
|
return other_sessions_cost + retained_cost |
|
|
|
return sum((group.cost() - session.cost) * group.weight for group in groups) |
|
|
|
|
|
|
|
def session_count(self): |
|
|
|
'''The number of connections that we've sent something to.''' |
|
|
@ -586,10 +589,10 @@ class SessionManager(object): |
|
|
|
def _timeslice_name(self, session): |
|
|
|
return f't{int(session.start_time - self.start_time) // 300}' |
|
|
|
|
|
|
|
def _session_group(self, name): |
|
|
|
def _session_group(self, name, weight): |
|
|
|
group = self.session_groups.get(name) |
|
|
|
if not group: |
|
|
|
group = SessionGroup(name, set(), 0) |
|
|
|
group = SessionGroup(name, weight, set(), 0) |
|
|
|
self.session_groups[name] = group |
|
|
|
return group |
|
|
|
|
|
|
@ -597,8 +600,8 @@ class SessionManager(object): |
|
|
|
self.session_event.set() |
|
|
|
# Return the session groups |
|
|
|
groups = ( |
|
|
|
self._session_group(self._timeslice_name(session)), |
|
|
|
self._session_group(self._ip_addr_group_name(session)), |
|
|
|
self._session_group(self._timeslice_name(session), 0.03), |
|
|
|
self._session_group(self._ip_addr_group_name(session), 1.0), |
|
|
|
) |
|
|
|
self.sessions[session] = groups |
|
|
|
for group in groups: |
|
|
|