diff --git a/server/controller.py b/server/controller.py index cd159b3..6cfd030 100644 --- a/server/controller.py +++ b/server/controller.py @@ -14,7 +14,6 @@ client-serving data such as histories. import asyncio import signal import ssl -import traceback from functools import partial from server.daemon import Daemon @@ -37,15 +36,12 @@ class Controller(LoggedClass): self.daemon = Daemon(env.daemon_url, env.debug) self.block_processor = BlockProcessor(env, self.daemon, on_update=self.on_update) - JSONRPC.init(self.block_processor, self.daemon, self.coin, - self.add_job) + JSONRPC.init(self.block_processor, self.daemon, self.coin) self.servers = [] - self.jobs = asyncio.Queue() def start(self): '''Prime the event loop with asynchronous jobs.''' coros = self.block_processor.coros() - coros.append(self.run_jobs()) for coro in coros: asyncio.ensure_future(coro) @@ -109,19 +105,3 @@ class Controller(LoggedClass): .format(signame)) for task in asyncio.Task.all_tasks(self.loop): task.cancel() - - def add_job(self, coro): - '''Queue a job for asynchronous processing.''' - self.jobs.put_nowait(coro) - - async def run_jobs(self): - '''Asynchronously run through the job queue.''' - while True: - job = await self.jobs.get() - try: - await job - except asyncio.CancelledError: - raise - except Exception: - # Getting here should probably be considered a bug and fixed - traceback.print_exc() diff --git a/server/protocol.py b/server/protocol.py index a2ec055..4efad5b 100644 --- a/server/protocol.py +++ b/server/protocol.py @@ -11,7 +11,6 @@ import asyncio import codecs import json -import struct import traceback from functools import partial @@ -33,6 +32,8 @@ def json_notification(method, params): class JSONRPC(asyncio.Protocol, LoggedClass): '''Base class that manages a JSONRPC connection.''' SESSIONS = set() + # Queue for aynchronous job processing. + JOBS = None def __init__(self): super().__init__() @@ -40,6 +41,26 @@ class JSONRPC(asyncio.Protocol, LoggedClass): self.send_count = 0 self.send_size = 0 self.error_count = 0 + self.init_jobs() + + @classmethod + def init_jobs(cls): + if not cls.JOBS: + cls.JOBS = asyncio.Queue() + asyncio.ensure_future(cls.run_jobs()) + + @classmethod + async def run_jobs(cls): + '''Asynchronously run through the job queue.''' + while True: + job = await cls.JOBS.get() + try: + await job + except asyncio.CancelledError: + raise + except Exception: + # Getting here should probably be considered a bug and fixed + traceback.print_exc() def connection_made(self, transport): '''Handle an incoming client connection.''' @@ -79,7 +100,7 @@ class JSONRPC(asyncio.Protocol, LoggedClass): except Exception as e: self.logger.info('error decoding JSON message: {}'.format(e)) else: - self.ADD_JOB(self.request_handler(message)) + self.JOBS.put_nowait(self.request_handler(message)) async def request_handler(self, request): '''Called asynchronously.''' @@ -168,11 +189,10 @@ class JSONRPC(asyncio.Protocol, LoggedClass): raise RPCError('params should be empty: {}'.format(params)) @classmethod - def init(cls, block_processor, daemon, coin, add_job): + def init(cls, block_processor, daemon, coin): cls.BLOCK_PROCESSOR = block_processor cls.DAEMON = daemon cls.COIN = coin - cls.ADD_JOB = add_job @classmethod def height(cls):