Browse Source

Minor improvements to client handling

Don't create a task until ready to execute it
Log expensive tasks
Assume peers are for logging unless explicitly not stated
master
Neil Booth 8 years ago
parent
commit
be45a9e228
  1. 41
      server/protocol.py

41
server/protocol.py

@ -50,7 +50,7 @@ class BlockServer(BlockProcessor):
class ServerManager(LoggedClass): class ServerManager(LoggedClass):
'''Manages the servers.''' '''Manages the servers.'''
AsyncTask = namedtuple('AsyncTask', 'session job') MgrTask = namedtuple('MgrTask', 'session task')
def __init__(self, bp, env): def __init__(self, bp, env):
super().__init__() super().__init__()
@ -59,7 +59,7 @@ class ServerManager(LoggedClass):
self.servers = [] self.servers = []
self.irc = IRC(env) self.irc = IRC(env)
self.sessions = set() self.sessions = set()
self.tasks = asyncio.Queue() self.queue = asyncio.Queue()
self.current_task = None self.current_task = None
async def start_server(self, kind, *args, **kw_args): async def start_server(self, kind, *args, **kw_args):
@ -127,27 +127,34 @@ class ServerManager(LoggedClass):
self.sessions.remove(session) self.sessions.remove(session)
if self.current_task and session == self.current_task.session: if self.current_task and session == self.current_task.session:
self.logger.info('cancelling running task') self.logger.info('cancelling running task')
self.current_task.job.cancel() self.current_task.task.cancel()
def add_task(self, session, job): def add_task(self, session, request):
assert session in self.sessions assert session in self.sessions
task = asyncio.ensure_future(job) self.queue.put_nowait((session, request))
self.tasks.put_nowait(self.AsyncTask(session, task))
async def run_tasks(self): async def run_tasks(self):
'''Asynchronously run through the task queue.''' '''Asynchronously run through the task queue.'''
while True: while True:
task = await self.tasks.get() session, request = await self.queue.get()
if not session in self.sessions:
continue
coro = session.handle_json_request(request)
task = asyncio.ensure_future(coro)
try: try:
if not task.session in self.sessions: self.current_task = self.MgrTask(session, task)
self.logger.info('cancelling task of gone session') start = time.time()
task.job.cancel() await task
self.current_task = task secs = time.time() - start
await task.job if secs > 1:
self.logger.warning('slow request for {} took {:.1f}s: {}'
.format(session.peername(), secs,
request))
except asyncio.CancelledError: except asyncio.CancelledError:
self.logger.info('cancelled task noted') self.logger.info('cancelled task noted')
except Exception: except Exception:
# Getting here should probably be considered a bug and fixed # Getting here should probably be considered a bug and fixed
self.logger.error('error handling request {}'.format(request))
traceback.print_exc() traceback.print_exc()
finally: finally:
self.current_task = None self.current_task = None
@ -179,7 +186,7 @@ class ServerManager(LoggedClass):
'''Returned to the RPC 'sessions' call.''' '''Returned to the RPC 'sessions' call.'''
now = time.time() now = time.time()
return [(session.kind, return [(session.kind,
session.peername(), session.peername(for_log=False),
len(session.hash168s), len(session.hash168s),
'RPC' if isinstance(session, LocalRPC) else session.client, 'RPC' if isinstance(session, LocalRPC) else session.client,
session.recv_count, session.recv_size, session.recv_count, session.recv_size,
@ -215,7 +222,7 @@ class Session(JSONRPC):
def connection_made(self, transport): def connection_made(self, transport):
'''Handle an incoming client connection.''' '''Handle an incoming client connection.'''
super().connection_made(transport) super().connection_made(transport)
self.logger.info('connection from {}'.format(self.peername(True))) self.logger.info('connection from {}'.format(self.peername()))
self.manager.add_session(self) self.manager.add_session(self)
def connection_lost(self, exc): def connection_lost(self, exc):
@ -224,7 +231,7 @@ class Session(JSONRPC):
if self.error_count or self.send_size >= 250000: if self.error_count or self.send_size >= 250000:
self.logger.info('{} disconnected. ' self.logger.info('{} disconnected. '
'Sent {:,d} bytes in {:,d} messages {:,d} errors' 'Sent {:,d} bytes in {:,d} messages {:,d} errors'
.format(self.peername(True), self.send_size, .format(self.peername(), self.send_size,
self.send_count, self.error_count)) self.send_count, self.error_count))
self.manager.remove_session(self) self.manager.remove_session(self)
@ -234,9 +241,9 @@ class Session(JSONRPC):
def on_json_request(self, request): def on_json_request(self, request):
'''Queue the request for asynchronous handling.''' '''Queue the request for asynchronous handling.'''
self.manager.add_task(self, self.handle_json_request(request)) self.manager.add_task(self, request)
def peername(self, for_log=False): def peername(self, *, for_log=True):
# Anonymi{z, s}e all IP addresses that will be stored in a log # Anonymi{z, s}e all IP addresses that will be stored in a log
if for_log and self.env.anon_logs and self.peer_info: if for_log and self.env.anon_logs and self.peer_info:
info = ["XX.XX.XX.XX", "XX"] info = ["XX.XX.XX.XX", "XX"]

Loading…
Cancel
Save