|
@ -316,33 +316,36 @@ class ServerManager(LoggedClass): |
|
|
if self.irc_future: |
|
|
if self.irc_future: |
|
|
self.irc_future.cancel() |
|
|
self.irc_future.cancel() |
|
|
for session in self.sessions: |
|
|
for session in self.sessions: |
|
|
session.transport.close() |
|
|
self.close_session(session) |
|
|
|
|
|
|
|
|
async def wait_shutdown(self): |
|
|
async def wait_shutdown(self): |
|
|
# Wait for servers to close |
|
|
# Wait for servers to close |
|
|
for server in self.servers: |
|
|
for server in self.servers: |
|
|
await server.wait_closed() |
|
|
await server.wait_closed() |
|
|
# Just in case a connection came in |
|
|
|
|
|
await asyncio.sleep(0) |
|
|
|
|
|
self.servers = [] |
|
|
self.servers = [] |
|
|
self.logger.info('server listening sockets closed') |
|
|
|
|
|
limit = time.time() + 15 |
|
|
secs = 60 |
|
|
|
|
|
self.logger.info('server listening sockets closed, waiting ' |
|
|
|
|
|
'{:d} seconds for socket cleanup'.format(secs)) |
|
|
|
|
|
|
|
|
|
|
|
limit = time.time() + secs |
|
|
while self.sessions and time.time() < limit: |
|
|
while self.sessions and time.time() < limit: |
|
|
|
|
|
await asyncio.sleep(4) |
|
|
self.logger.info('{:,d} sessions remaining' |
|
|
self.logger.info('{:,d} sessions remaining' |
|
|
.format(len(self.sessions))) |
|
|
.format(len(self.sessions))) |
|
|
await asyncio.sleep(3) |
|
|
|
|
|
if self.sessions: |
|
|
|
|
|
self.logger.info('forcibly closing {:,d} stragglers' |
|
|
|
|
|
.format(len(self.sessions))) |
|
|
|
|
|
for future in self.sessions.values(): |
|
|
|
|
|
future.cancel() |
|
|
|
|
|
await asyncio.sleep(1) |
|
|
|
|
|
|
|
|
|
|
|
def add_session(self, session): |
|
|
def add_session(self, session): |
|
|
assert self.servers |
|
|
|
|
|
assert session not in self.sessions |
|
|
|
|
|
coro = session.serve_requests() |
|
|
coro = session.serve_requests() |
|
|
self.sessions[session] = asyncio.ensure_future(coro) |
|
|
future = asyncio.ensure_future(coro) |
|
|
|
|
|
self.sessions[session] = future |
|
|
|
|
|
# Some connections are acknowledged after the servers are closed |
|
|
|
|
|
if not self.servers: |
|
|
|
|
|
self.close_session(session) |
|
|
|
|
|
|
|
|
|
|
|
def close_session(self, session): |
|
|
|
|
|
'''Close the session's transport and cancel its future.''' |
|
|
|
|
|
session.transport.close() |
|
|
|
|
|
self.sessions[session].cancel() |
|
|
|
|
|
|
|
|
def remove_session(self, session): |
|
|
def remove_session(self, session): |
|
|
self.subscription_count -= session.sub_count() |
|
|
self.subscription_count -= session.sub_count() |
|
|