|
|
@ -373,13 +373,8 @@ class ServerManager(util.LoggedClass): |
|
|
|
self.logger.info('server listening sockets closed, waiting ' |
|
|
|
'{:d} seconds for socket cleanup'.format(secs)) |
|
|
|
limit = time.time() + secs |
|
|
|
while self.sessions: |
|
|
|
if time.time() < limit: |
|
|
|
await asyncio.sleep(4) |
|
|
|
else: |
|
|
|
for session in list(self.sessions): |
|
|
|
self.close_session(session, hard=True) |
|
|
|
await asyncio.sleep(0) |
|
|
|
while self.sessions and time.time() < limit: |
|
|
|
await asyncio.sleep(4) |
|
|
|
self.logger.info('{:,d} sessions remaining' |
|
|
|
.format(len(self.sessions))) |
|
|
|
|
|
|
@ -401,14 +396,10 @@ class ServerManager(util.LoggedClass): |
|
|
|
future = self.sessions.pop(session) |
|
|
|
future.cancel() |
|
|
|
|
|
|
|
def close_session(self, session, hard=False): |
|
|
|
def close_session(self, session): |
|
|
|
'''Close the session's transport and cancel its future.''' |
|
|
|
session.transport.close() |
|
|
|
self.sessions[session].cancel() |
|
|
|
if hard: |
|
|
|
self.remove_session(session) |
|
|
|
socket = session.transport.get_extra_info('socket') |
|
|
|
socket.close() |
|
|
|
return 'disconnected {:d}'.format(session.id_) |
|
|
|
|
|
|
|
def toggle_logging(self, session): |
|
|
@ -423,11 +414,13 @@ class ServerManager(util.LoggedClass): |
|
|
|
self.next_stale_check = now + 60 |
|
|
|
cutoff = now - self.env.session_timeout |
|
|
|
stale = [session for session in self.sessions |
|
|
|
if session.last_recv < cutoff] |
|
|
|
if session.last_recv < cutoff |
|
|
|
and session.client != 'all_seeing_eye' |
|
|
|
and not session.is_closing()] |
|
|
|
for session in stale: |
|
|
|
self.close_session(session, hard=True) |
|
|
|
self.close_session(session) |
|
|
|
if stale: |
|
|
|
self.logger.info('dropped stale connections {}' |
|
|
|
self.logger.info('closing stale connections {}' |
|
|
|
.format([session.id_ for session in stale])) |
|
|
|
|
|
|
|
def new_subscription(self): |
|
|
@ -441,12 +434,13 @@ class ServerManager(util.LoggedClass): |
|
|
|
|
|
|
|
def session_count(self): |
|
|
|
'''The number of connections that we've sent something to.''' |
|
|
|
return len([s for s in self.sessions if s.send_count]) |
|
|
|
return len(self.sessions) |
|
|
|
|
|
|
|
def server_summary(self): |
|
|
|
'''A one-line summary of server state.''' |
|
|
|
return { |
|
|
|
'blocks': self.bp.db_height, |
|
|
|
'closing': len([s for s in self.sessions if s.is_closing()]), |
|
|
|
'errors': sum(s.error_count for s in self.sessions), |
|
|
|
'peers': len(self.irc.peers), |
|
|
|
'sessions': self.session_count(), |
|
|
@ -465,15 +459,14 @@ class ServerManager(util.LoggedClass): |
|
|
|
return ('{:3d}:{:02d}:{:02d}' |
|
|
|
.format(t // 3600, (t % 3600) // 60, t % 60)) |
|
|
|
|
|
|
|
fmt = ('{:<4} {:>7} {:>23} {:>15} {:>7} ' |
|
|
|
fmt = ('{:<6} {:<3} {:>23} {:>15} {:>7} ' |
|
|
|
'{:>7} {:>7} {:>7} {:>7} {:>5} {:>9}') |
|
|
|
yield fmt.format('Type', 'ID ', 'Peer', 'Client', 'Subs', |
|
|
|
yield fmt.format('ID', 'Flg', 'Peer', 'Client', 'Subs', |
|
|
|
'Recv', 'Recv KB', 'Sent', 'Sent KB', |
|
|
|
'Txs', 'Time') |
|
|
|
for (kind, id_, log_me, peer, subs, client, recv_count, recv_size, |
|
|
|
for (id_, flags, peer, subs, client, recv_count, recv_size, |
|
|
|
send_count, send_size, txs_sent, time) in data: |
|
|
|
yield fmt.format(kind, str(id_) + ('L' if log_me else ' '), |
|
|
|
peer, client, |
|
|
|
yield fmt.format(id_, flags, peer, client, |
|
|
|
'{:,d}'.format(subs), |
|
|
|
'{:,d}'.format(recv_count), |
|
|
|
'{:,d}'.format(recv_size // 1024), |
|
|
@ -486,9 +479,8 @@ class ServerManager(util.LoggedClass): |
|
|
|
'''Returned to the RPC 'sessions' call.''' |
|
|
|
now = time.time() |
|
|
|
sessions = sorted(self.sessions.keys(), key=lambda s: s.start) |
|
|
|
return [(session.kind, |
|
|
|
session.id_, |
|
|
|
session.log_me, |
|
|
|
return [(session.id_, |
|
|
|
session.flags(), |
|
|
|
session.peername(for_log=for_log), |
|
|
|
session.sub_count(), |
|
|
|
session.client, |
|
|
@ -563,6 +555,19 @@ class Session(JSONRPC): |
|
|
|
self.bandwidth_limit = env.bandwidth_limit |
|
|
|
self.txs_sent = 0 |
|
|
|
|
|
|
|
def is_closing(self): |
|
|
|
'''True if this session is closing.''' |
|
|
|
return self.transport and self.transport.is_closing() |
|
|
|
|
|
|
|
def flags(self): |
|
|
|
'''Status flags.''' |
|
|
|
status = self.kind[0] |
|
|
|
if self.is_closing(): |
|
|
|
status += 'C' |
|
|
|
if self.log_me: |
|
|
|
status += 'L' |
|
|
|
return status |
|
|
|
|
|
|
|
def connection_made(self, transport): |
|
|
|
'''Handle an incoming client connection.''' |
|
|
|
super().connection_made(transport) |
|
|
@ -993,8 +998,9 @@ class ElectrumX(Session): |
|
|
|
|
|
|
|
async def version(self, params): |
|
|
|
'''Return the server version as a string.''' |
|
|
|
if len(params) == 2: |
|
|
|
if params: |
|
|
|
self.client = str(params[0]) |
|
|
|
if len(params) > 1: |
|
|
|
self.protocol_version = params[1] |
|
|
|
return VERSION |
|
|
|
|
|
|
|