diff --git a/server/protocol.py b/server/protocol.py index 0711c05..cf6eb35 100644 --- a/server/protocol.py +++ b/server/protocol.py @@ -50,7 +50,7 @@ class BlockServer(BlockProcessor): class ServerManager(LoggedClass): '''Manages the servers.''' - AsyncTask = namedtuple('AsyncTask', 'session job') + MgrTask = namedtuple('MgrTask', 'session task') def __init__(self, bp, env): super().__init__() @@ -59,7 +59,7 @@ class ServerManager(LoggedClass): self.servers = [] self.irc = IRC(env) self.sessions = set() - self.tasks = asyncio.Queue() + self.queue = asyncio.Queue() self.current_task = None async def start_server(self, kind, *args, **kw_args): @@ -127,27 +127,34 @@ class ServerManager(LoggedClass): self.sessions.remove(session) if self.current_task and session == self.current_task.session: self.logger.info('cancelling running task') - self.current_task.job.cancel() + self.current_task.task.cancel() - def add_task(self, session, job): + def add_task(self, session, request): assert session in self.sessions - task = asyncio.ensure_future(job) - self.tasks.put_nowait(self.AsyncTask(session, task)) + self.queue.put_nowait((session, request)) async def run_tasks(self): '''Asynchronously run through the task queue.''' while True: - task = await self.tasks.get() + session, request = await self.queue.get() + if not session in self.sessions: + continue + coro = session.handle_json_request(request) + task = asyncio.ensure_future(coro) try: - if not task.session in self.sessions: - self.logger.info('cancelling task of gone session') - task.job.cancel() - self.current_task = task - await task.job + self.current_task = self.MgrTask(session, task) + start = time.time() + await task + secs = time.time() - start + if secs > 1: + self.logger.warning('slow request for {} took {:.1f}s: {}' + .format(session.peername(), secs, + request)) except asyncio.CancelledError: self.logger.info('cancelled task noted') except Exception: # Getting here should probably be considered a bug and fixed + self.logger.error('error handling request {}'.format(request)) traceback.print_exc() finally: self.current_task = None @@ -179,7 +186,7 @@ class ServerManager(LoggedClass): '''Returned to the RPC 'sessions' call.''' now = time.time() return [(session.kind, - session.peername(), + session.peername(for_log=False), len(session.hash168s), 'RPC' if isinstance(session, LocalRPC) else session.client, session.recv_count, session.recv_size, @@ -215,7 +222,7 @@ class Session(JSONRPC): def connection_made(self, transport): '''Handle an incoming client connection.''' super().connection_made(transport) - self.logger.info('connection from {}'.format(self.peername(True))) + self.logger.info('connection from {}'.format(self.peername())) self.manager.add_session(self) def connection_lost(self, exc): @@ -224,7 +231,7 @@ class Session(JSONRPC): if self.error_count or self.send_size >= 250000: self.logger.info('{} disconnected. ' 'Sent {:,d} bytes in {:,d} messages {:,d} errors' - .format(self.peername(True), self.send_size, + .format(self.peername(), self.send_size, self.send_count, self.error_count)) self.manager.remove_session(self) @@ -234,9 +241,9 @@ class Session(JSONRPC): def on_json_request(self, request): '''Queue the request for asynchronous handling.''' - self.manager.add_task(self, self.handle_json_request(request)) + self.manager.add_task(self, request) - def peername(self, for_log=False): + def peername(self, *, for_log=True): # Anonymi{z, s}e all IP addresses that will be stored in a log if for_log and self.env.anon_logs and self.peer_info: info = ["XX.XX.XX.XX", "XX"]