From 79a9e9199480a997b08b7233a7c6215e8ff827af Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Sun, 4 Dec 2016 11:44:25 +0900 Subject: [PATCH] Handle closing sessions a little differently Move connection type to first flag letter. All seeing eye is not stale. --- server/protocol.py | 53 +++++++++++++++++++++++++--------------------- 1 file changed, 29 insertions(+), 24 deletions(-) diff --git a/server/protocol.py b/server/protocol.py index ad24954..6696c52 100644 --- a/server/protocol.py +++ b/server/protocol.py @@ -373,13 +373,8 @@ class ServerManager(util.LoggedClass): self.logger.info('server listening sockets closed, waiting ' '{:d} seconds for socket cleanup'.format(secs)) limit = time.time() + secs - while self.sessions: - if time.time() < limit: - await asyncio.sleep(4) - else: - for session in list(self.sessions): - self.close_session(session, hard=True) - await asyncio.sleep(0) + while self.sessions and time.time() < limit: + await asyncio.sleep(4) self.logger.info('{:,d} sessions remaining' .format(len(self.sessions))) @@ -401,14 +396,10 @@ class ServerManager(util.LoggedClass): future = self.sessions.pop(session) future.cancel() - def close_session(self, session, hard=False): + def close_session(self, session): '''Close the session's transport and cancel its future.''' session.transport.close() self.sessions[session].cancel() - if hard: - self.remove_session(session) - socket = session.transport.get_extra_info('socket') - socket.close() return 'disconnected {:d}'.format(session.id_) def toggle_logging(self, session): @@ -423,11 +414,13 @@ class ServerManager(util.LoggedClass): self.next_stale_check = now + 60 cutoff = now - self.env.session_timeout stale = [session for session in self.sessions - if session.last_recv < cutoff] + if session.last_recv < cutoff + and session.client != 'all_seeing_eye' + and not session.is_closing()] for session in stale: - self.close_session(session, hard=True) + self.close_session(session) if stale: - self.logger.info('dropped stale connections {}' + self.logger.info('closing stale connections {}' .format([session.id_ for session in stale])) def new_subscription(self): @@ -441,12 +434,13 @@ class ServerManager(util.LoggedClass): def session_count(self): '''The number of connections that we've sent something to.''' - return len([s for s in self.sessions if s.send_count]) + return len(self.sessions) def server_summary(self): '''A one-line summary of server state.''' return { 'blocks': self.bp.db_height, + 'closing': len([s for s in self.sessions if s.is_closing()]), 'errors': sum(s.error_count for s in self.sessions), 'peers': len(self.irc.peers), 'sessions': self.session_count(), @@ -465,15 +459,14 @@ class ServerManager(util.LoggedClass): return ('{:3d}:{:02d}:{:02d}' .format(t // 3600, (t % 3600) // 60, t % 60)) - fmt = ('{:<4} {:>7} {:>23} {:>15} {:>7} ' + fmt = ('{:<6} {:<3} {:>23} {:>15} {:>7} ' '{:>7} {:>7} {:>7} {:>7} {:>5} {:>9}') - yield fmt.format('Type', 'ID ', 'Peer', 'Client', 'Subs', + yield fmt.format('ID', 'Flg', 'Peer', 'Client', 'Subs', 'Recv', 'Recv KB', 'Sent', 'Sent KB', 'Txs', 'Time') - for (kind, id_, log_me, peer, subs, client, recv_count, recv_size, + for (id_, flags, peer, subs, client, recv_count, recv_size, send_count, send_size, txs_sent, time) in data: - yield fmt.format(kind, str(id_) + ('L' if log_me else ' '), - peer, client, + yield fmt.format(id_, flags, peer, client, '{:,d}'.format(subs), '{:,d}'.format(recv_count), '{:,d}'.format(recv_size // 1024), @@ -486,9 +479,8 @@ class ServerManager(util.LoggedClass): '''Returned to the RPC 'sessions' call.''' now = time.time() sessions = sorted(self.sessions.keys(), key=lambda s: s.start) - return [(session.kind, - session.id_, - session.log_me, + return [(session.id_, + session.flags(), session.peername(for_log=for_log), session.sub_count(), session.client, @@ -563,6 +555,19 @@ class Session(JSONRPC): self.bandwidth_limit = env.bandwidth_limit self.txs_sent = 0 + def is_closing(self): + '''True if this session is closing.''' + return self.transport and self.transport.is_closing() + + def flags(self): + '''Status flags.''' + status = self.kind[0] + if self.is_closing(): + status += 'C' + if self.log_me: + status += 'L' + return status + def connection_made(self, transport): '''Handle an incoming client connection.''' super().connection_made(transport)