|
|
@ -233,11 +233,15 @@ class ServerManager(util.LoggedClass): |
|
|
|
self.next_stale_check = 0 |
|
|
|
self.history_cache = pylru.lrucache(512) |
|
|
|
self.futures = [] |
|
|
|
self.cl_kizami = 0 |
|
|
|
self.cl_session_id = 0 |
|
|
|
env.max_send = max(350000, env.max_send) |
|
|
|
self.logger.info('session timeout: {:,d} seconds' |
|
|
|
.format(env.session_timeout)) |
|
|
|
self.logger.info('session bandwidth limit {:,d} bytes' |
|
|
|
.format(env.bandwidth_limit)) |
|
|
|
self.logger.info('new sessions limited to {:,d} every {:,d}s' |
|
|
|
.format(env.conn_limit, env.conn_limit_secs)) |
|
|
|
self.logger.info('max response size {:,d} bytes'.format(env.max_send)) |
|
|
|
self.logger.info('max subscriptions across all sessions: {:,d}' |
|
|
|
.format(self.max_subs)) |
|
|
@ -379,18 +383,30 @@ class ServerManager(util.LoggedClass): |
|
|
|
.format(len(self.sessions))) |
|
|
|
|
|
|
|
def add_session(self, session): |
|
|
|
self.clear_stale_sessions() |
|
|
|
coro = session.serve_requests() |
|
|
|
future = asyncio.ensure_future(coro) |
|
|
|
self.sessions[session] = future |
|
|
|
session.log_info('connection from {}, {:,d} total' |
|
|
|
.format(session.peername(), len(self.sessions))) |
|
|
|
# Some connections are acknowledged after the servers are closed |
|
|
|
if not self.servers: |
|
|
|
self.close_session(session) |
|
|
|
now = time.time() |
|
|
|
self.clear_stale_sessions(now) |
|
|
|
if now > self.cl_kizami: |
|
|
|
self.cl_kizami = now + self.env.conn_limit_secs |
|
|
|
self.cl_session_id = session.id_ |
|
|
|
count = session.id_ - self.cl_session_id |
|
|
|
if count > self.env.conn_limit: |
|
|
|
session.log_info('closing connection from {}: {:,d} in last {:,d}s' |
|
|
|
.format(session.peername(), count, |
|
|
|
self.env.conn_limit_secs)) |
|
|
|
session.transport.close() |
|
|
|
else: |
|
|
|
coro = session.serve_requests() |
|
|
|
future = asyncio.ensure_future(coro) |
|
|
|
self.sessions[session] = future |
|
|
|
session.log_info('connection from {}, {:,d} total' |
|
|
|
.format(session.peername(), len(self.sessions))) |
|
|
|
# Some connections are acknowledged after the servers are closed |
|
|
|
if not self.servers: |
|
|
|
self.close_session(session) |
|
|
|
|
|
|
|
def remove_session(self, session): |
|
|
|
# It might have been forcefully removed earlier by close_session() |
|
|
|
# or never added because of connection rate limiting |
|
|
|
if session in self.sessions: |
|
|
|
self.subscription_count -= session.sub_count() |
|
|
|
future = self.sessions.pop(session) |
|
|
@ -407,9 +423,8 @@ class ServerManager(util.LoggedClass): |
|
|
|
session.log_me = not session.log_me |
|
|
|
return 'log {:d}: {}'.format(session.id_, session.log_me) |
|
|
|
|
|
|
|
def clear_stale_sessions(self): |
|
|
|
def clear_stale_sessions(self, now): |
|
|
|
'''Cut off sessions that haven't done anything for 10 minutes.''' |
|
|
|
now = time.time() |
|
|
|
if now > self.next_stale_check: |
|
|
|
self.next_stale_check = now + 60 |
|
|
|
cutoff = now - self.env.session_timeout |
|
|
|