|
|
@ -262,8 +262,9 @@ class SessionManager(object): |
|
|
|
for session in stale_sessions) |
|
|
|
self.logger.info(f'closing stale connections {text}') |
|
|
|
# Give the sockets some time to close gracefully |
|
|
|
for session in stale_sessions: |
|
|
|
await session.spawn(session.close()) |
|
|
|
async with TaskGroup() as group: |
|
|
|
for session in stale_sessions: |
|
|
|
await group.spawn(session.close()) |
|
|
|
|
|
|
|
# Consolidate small groups |
|
|
|
bw_limit = self.env.bandwidth_limit |
|
|
@ -512,10 +513,9 @@ class SessionManager(object): |
|
|
|
finally: |
|
|
|
# Close servers then sessions |
|
|
|
await self._close_servers(list(self.servers.keys())) |
|
|
|
for session in list(self.sessions): |
|
|
|
await session.spawn(session.close(force_after=1)) |
|
|
|
for session in list(self.sessions): |
|
|
|
await session.closed_event.wait() |
|
|
|
async with TaskGroup() as group: |
|
|
|
for session in list(self.sessions): |
|
|
|
await group.spawn(session.close(force_after=1)) |
|
|
|
|
|
|
|
def session_count(self): |
|
|
|
'''The number of connections that we've sent something to.''' |
|
|
@ -573,8 +573,9 @@ class SessionManager(object): |
|
|
|
for hashX in set(hc).intersection(touched): |
|
|
|
del hc[hashX] |
|
|
|
|
|
|
|
for session in self.sessions: |
|
|
|
await session.spawn(session.notify, touched, height_changed) |
|
|
|
async with TaskGroup() as group: |
|
|
|
for session in self.sessions: |
|
|
|
await group.spawn(session.notify, touched, height_changed) |
|
|
|
|
|
|
|
def add_session(self, session): |
|
|
|
self.sessions.add(session) |
|
|
|