Browse Source

Move job handling out of the controller

Controller on its last legs...
master
Neil Booth 8 years ago
parent
commit
7f03b0fa73
  1. 22
      server/controller.py
  2. 28
      server/protocol.py

22
server/controller.py

@ -14,7 +14,6 @@ client-serving data such as histories.
import asyncio import asyncio
import signal import signal
import ssl import ssl
import traceback
from functools import partial from functools import partial
from server.daemon import Daemon from server.daemon import Daemon
@ -37,15 +36,12 @@ class Controller(LoggedClass):
self.daemon = Daemon(env.daemon_url, env.debug) self.daemon = Daemon(env.daemon_url, env.debug)
self.block_processor = BlockProcessor(env, self.daemon, self.block_processor = BlockProcessor(env, self.daemon,
on_update=self.on_update) on_update=self.on_update)
JSONRPC.init(self.block_processor, self.daemon, self.coin, JSONRPC.init(self.block_processor, self.daemon, self.coin)
self.add_job)
self.servers = [] self.servers = []
self.jobs = asyncio.Queue()
def start(self): def start(self):
'''Prime the event loop with asynchronous jobs.''' '''Prime the event loop with asynchronous jobs.'''
coros = self.block_processor.coros() coros = self.block_processor.coros()
coros.append(self.run_jobs())
for coro in coros: for coro in coros:
asyncio.ensure_future(coro) asyncio.ensure_future(coro)
@ -109,19 +105,3 @@ class Controller(LoggedClass):
.format(signame)) .format(signame))
for task in asyncio.Task.all_tasks(self.loop): for task in asyncio.Task.all_tasks(self.loop):
task.cancel() task.cancel()
def add_job(self, coro):
'''Queue a job for asynchronous processing.'''
self.jobs.put_nowait(coro)
async def run_jobs(self):
'''Asynchronously run through the job queue.'''
while True:
job = await self.jobs.get()
try:
await job
except asyncio.CancelledError:
raise
except Exception:
# Getting here should probably be considered a bug and fixed
traceback.print_exc()

28
server/protocol.py

@ -11,7 +11,6 @@
import asyncio import asyncio
import codecs import codecs
import json import json
import struct
import traceback import traceback
from functools import partial from functools import partial
@ -33,6 +32,8 @@ def json_notification(method, params):
class JSONRPC(asyncio.Protocol, LoggedClass): class JSONRPC(asyncio.Protocol, LoggedClass):
'''Base class that manages a JSONRPC connection.''' '''Base class that manages a JSONRPC connection.'''
SESSIONS = set() SESSIONS = set()
# Queue for aynchronous job processing.
JOBS = None
def __init__(self): def __init__(self):
super().__init__() super().__init__()
@ -40,6 +41,26 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
self.send_count = 0 self.send_count = 0
self.send_size = 0 self.send_size = 0
self.error_count = 0 self.error_count = 0
self.init_jobs()
@classmethod
def init_jobs(cls):
if not cls.JOBS:
cls.JOBS = asyncio.Queue()
asyncio.ensure_future(cls.run_jobs())
@classmethod
async def run_jobs(cls):
'''Asynchronously run through the job queue.'''
while True:
job = await cls.JOBS.get()
try:
await job
except asyncio.CancelledError:
raise
except Exception:
# Getting here should probably be considered a bug and fixed
traceback.print_exc()
def connection_made(self, transport): def connection_made(self, transport):
'''Handle an incoming client connection.''' '''Handle an incoming client connection.'''
@ -79,7 +100,7 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
except Exception as e: except Exception as e:
self.logger.info('error decoding JSON message: {}'.format(e)) self.logger.info('error decoding JSON message: {}'.format(e))
else: else:
self.ADD_JOB(self.request_handler(message)) self.JOBS.put_nowait(self.request_handler(message))
async def request_handler(self, request): async def request_handler(self, request):
'''Called asynchronously.''' '''Called asynchronously.'''
@ -168,11 +189,10 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
raise RPCError('params should be empty: {}'.format(params)) raise RPCError('params should be empty: {}'.format(params))
@classmethod @classmethod
def init(cls, block_processor, daemon, coin, add_job): def init(cls, block_processor, daemon, coin):
cls.BLOCK_PROCESSOR = block_processor cls.BLOCK_PROCESSOR = block_processor
cls.DAEMON = daemon cls.DAEMON = daemon
cls.COIN = coin cls.COIN = coin
cls.ADD_JOB = add_job
@classmethod @classmethod
def height(cls): def height(cls):

Loading…
Cancel
Save