diff --git a/server/protocol.py b/server/protocol.py index 9cbb3a6..7a77b1e 100644 --- a/server/protocol.py +++ b/server/protocol.py @@ -111,7 +111,8 @@ class ServerManager(LoggedClass): cache = {} for session in self.sessions: if isinstance(session, ElectrumX): - session.notify(height, touched, cache) + # Use a tuple to distinguish from JSON + session.jobs.put_nowait((height, touched, cache)) def stop(self): '''Close listening servers.''' @@ -197,7 +198,7 @@ class Session(JSONRPC): self.coin = bp.coin self.kind = kind self.hash168s = set() - self.requests = asyncio.Queue() + self.jobs = asyncio.Queue() self.current_task = None self.client = 'unknown' @@ -223,26 +224,23 @@ class Session(JSONRPC): def on_json_request(self, request): '''Queue the request for asynchronous handling.''' - self.requests.put_nowait(request) + self.jobs.put_nowait(request) async def serve_requests(self): '''Asynchronously run through the task queue.''' while True: await asyncio.sleep(0) - request = await self.requests.get() + job = await self.jobs.get() try: - start = time.time() - await self.handle_json_request(request) - secs = time.time() - start - if secs > 1: - self.logger.warning('slow request for {} took {:.1f}s: {}' - .format(self.peername(), secs, - request)) + if isinstance(job, tuple): # Height / mempool notification + await self.notify(*job) + else: + await self.handle_json_request(job) except asyncio.CancelledError: break except Exception: # Getting here should probably be considered a bug and fixed - self.logger.error('error handling request {}'.format(request)) + self.logger.error('error handling request {}'.format(job)) traceback.print_exc() def peername(self, *, for_log=True): @@ -325,7 +323,7 @@ class ElectrumX(Session): for prefix, suffixes in rpcs for suffix in suffixes.split()} - def notify(self, height, touched, cache): + async def notify(self, height, touched, cache): '''Notify the client about changes in height and touched addresses. Cache is a shared cache for this update. @@ -349,13 +347,18 @@ class ElectrumX(Session): self.send_json(payload) hash168_to_address = self.coin.hash168_to_address - for hash168 in self.hash168s.intersection(touched): + matches = self.hash168s.intersection(touched) + for hash168 in matches: address = hash168_to_address(hash168) status = self.address_status(hash168) payload = json_notification_payload( 'blockchain.address.subscribe', (address, status)) self.send_json(payload) + if matches: + self.logger.info('notified {} of {} addresses' + .format(self.peername(), len(matches))) + def height(self): '''Return the block processor's current height.''' return self.bp.height