|
|
@ -53,8 +53,9 @@ class Controller(util.LoggedClass): |
|
|
|
self.irc = IRC(env) |
|
|
|
self.env = env |
|
|
|
self.servers = {} |
|
|
|
# Map of session to the key of its list in self.groups |
|
|
|
self.sessions = {} |
|
|
|
self.groups = defaultdict(set) |
|
|
|
self.groups = defaultdict(list) |
|
|
|
self.txs_sent = 0 |
|
|
|
self.next_log_sessions = 0 |
|
|
|
self.state = self.CATCHING_UP |
|
|
@ -108,9 +109,10 @@ class Controller(util.LoggedClass): |
|
|
|
def session_priority(self, session): |
|
|
|
if isinstance(session, LocalRPC): |
|
|
|
return 0 |
|
|
|
group_bandwidth = sum(s.bandwidth_used for s in self.sessions[session]) |
|
|
|
gid = self.sessions[session] |
|
|
|
group_bandwidth = sum(s.bandwidth_used for s in self.groups[gid]) |
|
|
|
return 1 + (bisect_left(self.bands, session.bandwidth_used) |
|
|
|
+ bisect_left(self.bands, group_bandwidth) + 1) // 2 |
|
|
|
+ bisect_left(self.bands, group_bandwidth)) // 2 |
|
|
|
|
|
|
|
def is_deprioritized(self, session): |
|
|
|
return self.session_priority(session) > self.BANDS |
|
|
@ -333,9 +335,9 @@ class Controller(util.LoggedClass): |
|
|
|
if now > self.next_stale_check: |
|
|
|
self.next_stale_check = now + 300 |
|
|
|
self.clear_stale_sessions() |
|
|
|
group = self.groups[int(session.start - self.start) // 900] |
|
|
|
group.add(session) |
|
|
|
self.sessions[session] = group |
|
|
|
gid = int(session.start - self.start) // 900 |
|
|
|
self.groups[gid].append(session) |
|
|
|
self.sessions[session] = gid |
|
|
|
session.log_info('{} {}, {:,d} total' |
|
|
|
.format(session.kind, session.peername(), |
|
|
|
len(self.sessions))) |
|
|
@ -350,8 +352,9 @@ class Controller(util.LoggedClass): |
|
|
|
def remove_session(self, session): |
|
|
|
'''Remove a session from our sessions list if there.''' |
|
|
|
if session in self.sessions: |
|
|
|
group = self.sessions.pop(session) |
|
|
|
group.remove(session) |
|
|
|
gid = self.sessions.pop(session) |
|
|
|
assert gid in self.groups |
|
|
|
self.groups[gid].remove(session) |
|
|
|
self.subscription_count -= session.sub_count() |
|
|
|
|
|
|
|
def close_session(self, session): |
|
|
@ -385,13 +388,16 @@ class Controller(util.LoggedClass): |
|
|
|
self.logger.info('closing stale connections {}'.format(stale)) |
|
|
|
|
|
|
|
# Consolidate small groups |
|
|
|
keys = [k for k, v in self.groups.items() if len(v) <= 4 |
|
|
|
and sum(session.bandwidth_used for session in v) < 10000] |
|
|
|
if len(keys) > 1: |
|
|
|
group = set.union(*(self.groups[key] for key in keys)) |
|
|
|
for key in keys: |
|
|
|
del self.groups[key] |
|
|
|
self.groups[max(keys)] = group |
|
|
|
gids = [gid for gid, l in self.groups.items() if len(l) <= 4 |
|
|
|
and sum(session.bandwidth_used for session in l) < 10000] |
|
|
|
if len(gids) > 1: |
|
|
|
sessions = sum([self.groups[gid] for gid in gids], []) |
|
|
|
new_gid = max(gids) |
|
|
|
for gid in gids: |
|
|
|
del self.groups[gid] |
|
|
|
for session in sessions: |
|
|
|
self.sessions[session] = new_gid |
|
|
|
self.groups[new_gid] = sessions |
|
|
|
|
|
|
|
def new_subscription(self): |
|
|
|
if self.subscription_count >= self.max_subs: |
|
|
@ -457,9 +463,9 @@ class Controller(util.LoggedClass): |
|
|
|
def group_data(self): |
|
|
|
'''Returned to the RPC 'groups' call.''' |
|
|
|
result = [] |
|
|
|
for group_id in sorted(self.groups.keys()): |
|
|
|
sessions = self.groups[group_id] |
|
|
|
result.append([group_id, |
|
|
|
for gid in sorted(self.groups.keys()): |
|
|
|
sessions = self.groups[gid] |
|
|
|
result.append([gid, |
|
|
|
len(sessions), |
|
|
|
sum(s.bandwidth_used for s in sessions), |
|
|
|
sum(s.requests_remaining() for s in sessions), |
|
|
|