|
|
@ -462,11 +462,12 @@ class ServerManager(util.LoggedClass): |
|
|
|
self.logger.info('cleanly closing client sessions, please wait...') |
|
|
|
for session in self.sessions: |
|
|
|
self.close_session(session) |
|
|
|
self.logger.info('server listening sockets closed, waiting ' |
|
|
|
self.logger.info('listening sockets closed, waiting up to ' |
|
|
|
'{:d} seconds for socket cleanup'.format(secs)) |
|
|
|
limit = time.time() + secs |
|
|
|
while self.sessions and time.time() < limit: |
|
|
|
await asyncio.sleep(4) |
|
|
|
self.clear_stale_sessions(grace=secs//2) |
|
|
|
await asyncio.sleep(2) |
|
|
|
self.logger.info('{:,d} sessions remaining' |
|
|
|
.format(len(self.sessions))) |
|
|
|
|
|
|
@ -474,7 +475,10 @@ class ServerManager(util.LoggedClass): |
|
|
|
# Some connections are acknowledged after the servers are closed |
|
|
|
if not self.servers: |
|
|
|
return |
|
|
|
self.clear_stale_sessions() |
|
|
|
now = time.time() |
|
|
|
if now > self.next_stale_check: |
|
|
|
self.next_stale_check = now + 60 |
|
|
|
self.clear_stale_sessions() |
|
|
|
group = self.groups[int(session.start - self.start) // 60] |
|
|
|
group.add(session) |
|
|
|
self.sessions[session] = group |
|
|
@ -496,23 +500,30 @@ class ServerManager(util.LoggedClass): |
|
|
|
session.log_me = not session.log_me |
|
|
|
return 'log {:d}: {}'.format(session.id_, session.log_me) |
|
|
|
|
|
|
|
def clear_stale_sessions(self): |
|
|
|
'''Cut off sessions that haven't done anything for 10 minutes.''' |
|
|
|
def clear_stale_sessions(self, grace=15): |
|
|
|
'''Cut off sessions that haven't done anything for 10 minutes. Force |
|
|
|
close stubborn connections that won't close cleanly after a |
|
|
|
short grace period. |
|
|
|
''' |
|
|
|
now = time.time() |
|
|
|
if now > self.next_stale_check: |
|
|
|
self.next_stale_check = now + 60 |
|
|
|
# Clear out empty groups |
|
|
|
for key in [k for k, v in self.groups.items() if not v]: |
|
|
|
del self.groups[key] |
|
|
|
cutoff = now - self.env.session_timeout |
|
|
|
stale = [session for session in self.sessions |
|
|
|
if session.last_recv < cutoff |
|
|
|
and not session.is_closing()] |
|
|
|
for session in stale: |
|
|
|
self.close_session(session) |
|
|
|
if stale: |
|
|
|
self.logger.info('closing stale connections {}' |
|
|
|
.format([session.id_ for session in stale])) |
|
|
|
shutdown_cutoff = now - grace |
|
|
|
stale_cutoff = now - self.env.session_timeout |
|
|
|
|
|
|
|
stale = [] |
|
|
|
for session in self.sessions: |
|
|
|
if session.is_closing(): |
|
|
|
if session.stop <= shutdown_cutoff and session.socket: |
|
|
|
# Should trigger a call to connection_lost very soon |
|
|
|
self.socket.shutdown(socket.SHUT_RDWR) |
|
|
|
else: |
|
|
|
if session.last_recv < stale_cutoff: |
|
|
|
self.close_session(session) |
|
|
|
stale.append(session.id_) |
|
|
|
if stale: |
|
|
|
self.logger.info('closing stale connections {}'.format(stale)) |
|
|
|
# Clear out empty groups |
|
|
|
for key in [k for k, v in self.groups.items() if not v]: |
|
|
|
del self.groups[key] |
|
|
|
|
|
|
|
def new_subscription(self): |
|
|
|
if self.subscription_count >= self.max_subs: |
|
|
@ -542,6 +553,52 @@ class ServerManager(util.LoggedClass): |
|
|
|
'watched': self.subscription_count, |
|
|
|
} |
|
|
|
|
|
|
|
@staticmethod |
|
|
|
def text_lines(method, data): |
|
|
|
if method == 'sessions': |
|
|
|
return ServerManager.sessions_text_lines(data) |
|
|
|
else: |
|
|
|
return ServerManager.groups_text_lines(data) |
|
|
|
|
|
|
|
@staticmethod |
|
|
|
def groups_text_lines(data): |
|
|
|
'''A generator returning lines for a list of groups. |
|
|
|
|
|
|
|
data is the return value of rpc_groups().''' |
|
|
|
|
|
|
|
fmt = ('{:<6} {:>9} {:>6} {:>6} {:>8}' |
|
|
|
'{:>7} {:>9} {:>7} {:>9}') |
|
|
|
yield fmt.format('ID', 'Bw Qta KB', 'Reqs', 'Txs', 'Subs', |
|
|
|
'Recv', 'Recv KB', 'Sent', 'Sent KB') |
|
|
|
for (id_, bandwidth, reqs, txs_sent, subs, |
|
|
|
recv_count, recv_size, send_count, send_size) in data: |
|
|
|
yield fmt.format(id_, |
|
|
|
'{:,d}'.format(bandwidth // 1024), |
|
|
|
'{:,d}'.format(reqs), |
|
|
|
'{:,d}'.format(txs_sent), |
|
|
|
'{:,d}'.format(subs), |
|
|
|
'{:,d}'.format(recv_count), |
|
|
|
'{:,d}'.format(recv_size // 1024), |
|
|
|
'{:,d}'.format(send_count), |
|
|
|
'{:,d}'.format(send_size // 1024)) |
|
|
|
|
|
|
|
def group_data(self): |
|
|
|
'''Returned to the RPC 'groups' call.''' |
|
|
|
result = [] |
|
|
|
for group_id in sorted(self.groups.keys()): |
|
|
|
sessions = self.groups[group_id] |
|
|
|
result.append([group_id, |
|
|
|
sum(s.bandwidth_used for s in sessions), |
|
|
|
sum(s.requests_remaining() for s in sessions), |
|
|
|
sum(s.txs_sent for s in sessions), |
|
|
|
sum(s.sub_count() for s in sessions), |
|
|
|
sum(s.recv_count for s in sessions), |
|
|
|
sum(s.recv_size for s in sessions), |
|
|
|
sum(s.send_count for s in sessions), |
|
|
|
sum(s.send_size for s in sessions), |
|
|
|
]) |
|
|
|
return result |
|
|
|
|
|
|
|
@staticmethod |
|
|
|
def sessions_text_lines(data): |
|
|
|
'''A generator returning lines for a list of sessions. |
|
|
@ -553,11 +610,10 @@ class ServerManager(util.LoggedClass): |
|
|
|
return ('{:3d}:{:02d}:{:02d}' |
|
|
|
.format(t // 3600, (t % 3600) // 60, t % 60)) |
|
|
|
|
|
|
|
fmt = ('{:<6} {:<5} {:>23} {:>15} {:>7} ' |
|
|
|
'{:>7} {:>7} {:>7} {:>7} {:>5} {:>9}') |
|
|
|
yield fmt.format('ID', 'Flags', 'Peer', 'Client', 'Reqs', |
|
|
|
'Txs', 'Subs', 'Recv', 'Recv KB', 'Sent', |
|
|
|
'Sent KB', 'Time') |
|
|
|
fmt = ('{:<6} {:<5} {:>23} {:>15} {:>5} {:>5} ' |
|
|
|
'{:>7} {:>7} {:>7} {:>7} {:>7} {:>9}') |
|
|
|
yield fmt.format('ID', 'Flags', 'Peer', 'Client', 'Reqs', 'Txs', |
|
|
|
'Subs', 'Recv', 'Recv KB', 'Sent', 'Sent KB', 'Time') |
|
|
|
for (id_, flags, peer, client, reqs, txs_sent, subs, |
|
|
|
recv_count, recv_size, send_count, send_size, time) in data: |
|
|
|
yield fmt.format(id_, flags, peer, client, |
|
|
@ -617,13 +673,7 @@ class ServerManager(util.LoggedClass): |
|
|
|
return self.server_summary() |
|
|
|
|
|
|
|
async def rpc_groups(self, params): |
|
|
|
result = {} |
|
|
|
msg = '{:,d} sessions, {:,d} requests, {:,d}KB b/w quota used' |
|
|
|
for group, sessions in self.groups.items(): |
|
|
|
bandwidth = sum(s.bandwidth_used for s in sessions) |
|
|
|
reqs = sum(s.requests_remaining() for s in sessions) |
|
|
|
result[group] = msg.format(len(sessions), reqs, bandwidth // 1024) |
|
|
|
return result |
|
|
|
return self.group_data() |
|
|
|
|
|
|
|
async def rpc_sessions(self, params): |
|
|
|
return self.session_data(for_log=False) |
|
|
|