From 18af57059f1d7606e735d62e864f6bd3ff513f88 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Sat, 7 Jan 2017 15:11:39 +0900 Subject: [PATCH] Improve group handling. I believe this fixes #94 --- server/controller.py | 42 ++++++++++++++++++++++++------------------ 1 file changed, 24 insertions(+), 18 deletions(-) diff --git a/server/controller.py b/server/controller.py index 3febb59..ce8bef4 100644 --- a/server/controller.py +++ b/server/controller.py @@ -53,8 +53,9 @@ class Controller(util.LoggedClass): self.irc = IRC(env) self.env = env self.servers = {} + # Map of session to the key of its list in self.groups self.sessions = {} - self.groups = defaultdict(set) + self.groups = defaultdict(list) self.txs_sent = 0 self.next_log_sessions = 0 self.state = self.CATCHING_UP @@ -108,9 +109,10 @@ class Controller(util.LoggedClass): def session_priority(self, session): if isinstance(session, LocalRPC): return 0 - group_bandwidth = sum(s.bandwidth_used for s in self.sessions[session]) + gid = self.sessions[session] + group_bandwidth = sum(s.bandwidth_used for s in self.groups[gid]) return 1 + (bisect_left(self.bands, session.bandwidth_used) - + bisect_left(self.bands, group_bandwidth) + 1) // 2 + + bisect_left(self.bands, group_bandwidth)) // 2 def is_deprioritized(self, session): return self.session_priority(session) > self.BANDS @@ -333,9 +335,9 @@ class Controller(util.LoggedClass): if now > self.next_stale_check: self.next_stale_check = now + 300 self.clear_stale_sessions() - group = self.groups[int(session.start - self.start) // 900] - group.add(session) - self.sessions[session] = group + gid = int(session.start - self.start) // 900 + self.groups[gid].append(session) + self.sessions[session] = gid session.log_info('{} {}, {:,d} total' .format(session.kind, session.peername(), len(self.sessions))) @@ -350,8 +352,9 @@ class Controller(util.LoggedClass): def remove_session(self, session): '''Remove a session from our sessions list if there.''' if session in self.sessions: - group = self.sessions.pop(session) - group.remove(session) + gid = self.sessions.pop(session) + assert gid in self.groups + self.groups[gid].remove(session) self.subscription_count -= session.sub_count() def close_session(self, session): @@ -385,13 +388,16 @@ class Controller(util.LoggedClass): self.logger.info('closing stale connections {}'.format(stale)) # Consolidate small groups - keys = [k for k, v in self.groups.items() if len(v) <= 4 - and sum(session.bandwidth_used for session in v) < 10000] - if len(keys) > 1: - group = set.union(*(self.groups[key] for key in keys)) - for key in keys: - del self.groups[key] - self.groups[max(keys)] = group + gids = [gid for gid, l in self.groups.items() if len(l) <= 4 + and sum(session.bandwidth_used for session in l) < 10000] + if len(gids) > 1: + sessions = sum([self.groups[gid] for gid in gids], []) + new_gid = max(gids) + for gid in gids: + del self.groups[gid] + for session in sessions: + self.sessions[session] = new_gid + self.groups[new_gid] = sessions def new_subscription(self): if self.subscription_count >= self.max_subs: @@ -457,9 +463,9 @@ class Controller(util.LoggedClass): def group_data(self): '''Returned to the RPC 'groups' call.''' result = [] - for group_id in sorted(self.groups.keys()): - sessions = self.groups[group_id] - result.append([group_id, + for gid in sorted(self.groups.keys()): + sessions = self.groups[gid] + result.append([gid, len(sessions), sum(s.bandwidth_used for s in sessions), sum(s.requests_remaining() for s in sessions),