Browse Source

More work

master
Neil Booth 8 years ago
parent
commit
1a9e8cdcd4
  1. 7
      lib/jsonrpc.py
  2. 102
      server/protocol.py

7
lib/jsonrpc.py

@ -67,8 +67,8 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
Assumes JSON messages are newline-separated and that newlines Assumes JSON messages are newline-separated and that newlines
cannot appear in the JSON other than to separate lines. Incoming cannot appear in the JSON other than to separate lines. Incoming
messages are queued on the messages queue for later asynchronous requests are passed to enqueue_request(), which should arrange for
processing, and should be passed to the handle_request() function. their asynchronous handling via the request's process() method.
Derived classes may want to override connection_made() and Derived classes may want to override connection_made() and
connection_lost() but should be sure to call the implementation in connection_lost() but should be sure to call the implementation in
@ -145,7 +145,6 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
self.send_size = 0 self.send_size = 0
self.error_count = 0 self.error_count = 0
self.peer_info = None self.peer_info = None
self.messages = asyncio.Queue()
# Sends longer than max_send are prevented, instead returning # Sends longer than max_send are prevented, instead returning
# an oversized request error to other end of the network # an oversized request error to other end of the network
# connection. The request causing it is logged. Values under # connection. The request causing it is logged. Values under
@ -408,7 +407,7 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
# --- derived classes are intended to override these functions # --- derived classes are intended to override these functions
def enqueue_request(self, request): def enqueue_request(self, request):
'''Enqueue a request for later asynchronous processing.''' '''Enqueue a request for later asynchronous processing.'''
self.messages.put_nowait(request) raise NotImplementedError
async def handle_notification(self, method, params): async def handle_notification(self, method, params):
'''Handle a notification.''' '''Handle a notification.'''

102
server/protocol.py

@ -14,6 +14,7 @@ import json
import ssl import ssl
import time import time
import traceback import traceback
from bisect import bisect_left
from collections import defaultdict, namedtuple from collections import defaultdict, namedtuple
from functools import partial from functools import partial
@ -217,6 +218,7 @@ class ServerManager(util.LoggedClass):
''' '''
MgrTask = namedtuple('MgrTask', 'session task') MgrTask = namedtuple('MgrTask', 'session task')
N = 5
class NotificationRequest(object): class NotificationRequest(object):
def __init__(self, fn_call): def __init__(self, fn_call):
@ -231,6 +233,7 @@ class ServerManager(util.LoggedClass):
self.env = env self.env = env
self.servers = [] self.servers = []
self.sessions = {} self.sessions = {}
self.groups = defaultdict(set)
self.txs_sent = 0 self.txs_sent = 0
self.next_log_sessions = 0 self.next_log_sessions = 0
self.max_subs = env.max_subs self.max_subs = env.max_subs
@ -238,9 +241,13 @@ class ServerManager(util.LoggedClass):
self.next_stale_check = 0 self.next_stale_check = 0
self.history_cache = pylru.lrucache(256) self.history_cache = pylru.lrucache(256)
self.header_cache = pylru.lrucache(8) self.header_cache = pylru.lrucache(8)
self.queue = asyncio.PriorityQueue()
self.delayed_queue = []
self.next_request_id = 0
self.height = 0 self.height = 0
self.futures = [] self.futures = []
env.max_send = max(350000, env.max_send) env.max_send = max(350000, env.max_send)
self.setup_bands()
self.logger.info('session timeout: {:,d} seconds' self.logger.info('session timeout: {:,d} seconds'
.format(env.session_timeout)) .format(env.session_timeout))
self.logger.info('session bandwidth limit {:,d} bytes' self.logger.info('session bandwidth limit {:,d} bytes'
@ -266,6 +273,50 @@ class ServerManager(util.LoggedClass):
''' '''
return self.mempool.value(hash168) return self.mempool.value(hash168)
async def serve_requests(self):
'''Asynchronously run through the task queue.'''
while True:
priority_, id_, request = await self.queue.get()
try:
await request.process()
except asyncio.CancelledError:
break
except Exception:
# Getting here should probably be considered a bug and fixed
self.log_error('error handling request {}'.format(request))
traceback.print_exc()
def setup_bands(self):
bands = []
limit = env.bandwidth_limit
for n in range(self.N):
bands.append(limit)
limit //= 4
limit = env.bandwidth_limit
for n in range(self.N):
limit += limit // 2
bands.append(limit)
self.bands = sorted(bands)
self.logger.info('bands: {}'.format(self.bands))
def session_priority(self, session):
if isinstance(session, LocalRPC):
return 0
group_bandwidth = sum(s.bandwidth_used for s in self.sessions[session])
return (bisect_left(self.bands, session.bandwidth_used)
+ bisect_left(self.bands, group_bandwidth) + 1) // 2
def enqueue_request(self, session, request):
priority = self.session_priority(session)
item = (priority, self.next_request_id, request)
self.next_request_id += 1
secs = priority - self.N
if secs >= 0:
self.delayed_queue.append((time.time() + secs, item))
else:
self.queue.put_nowait(item)
async def main_loop(self): async def main_loop(self):
'''Server manager main loop.''' '''Server manager main loop.'''
def add_future(coro): def add_future(coro):
@ -277,6 +328,8 @@ class ServerManager(util.LoggedClass):
add_future(self.mempool.main_loop(self.bp.event)) add_future(self.mempool.main_loop(self.bp.event))
add_future(self.irc.start(self.bp.event)) add_future(self.irc.start(self.bp.event))
add_future(self.start_servers(self.bp.event)) add_future(self.start_servers(self.bp.event))
for n in range(4):
add_future(self.serve_requests())
for future in asyncio.as_completed(self.futures): for future in asyncio.as_completed(self.futures):
try: try:
@ -403,27 +456,24 @@ class ServerManager(util.LoggedClass):
.format(len(self.sessions))) .format(len(self.sessions)))
def add_session(self, session): def add_session(self, session):
# Some connections are acknowledged after the servers are closed
if not self.servers:
return
self.clear_stale_sessions() self.clear_stale_sessions()
coro = session.serve_requests() group = self.groups[int(self.start - self.manager.start) // 60]
future = asyncio.ensure_future(coro) group.add(session)
self.sessions[session] = future self.sessions[session] = group
session.log_info('connection from {}, {:,d} total' session.log_info('connection from {}, {:,d} total'
.format(session.peername(), len(self.sessions))) .format(session.peername(), len(self.sessions)))
# Some connections are acknowledged after the servers are closed
if not self.servers:
self.close_session(session)
def remove_session(self, session): def remove_session(self, session):
# It might have been forcefully removed earlier by close_session() group = self.sessions.pop(session)
if session in self.sessions: group.remove(session)
self.subscription_count -= session.sub_count() self.subscription_count -= session.sub_count()
future = self.sessions.pop(session)
future.cancel()
def close_session(self, session): def close_session(self, session):
'''Close the session's transport and cancel its future.''' '''Close the session's transport and cancel its future.'''
session.transport.close() session.transport.close()
self.sessions[session].cancel()
return 'disconnected {:d}'.format(session.id_) return 'disconnected {:d}'.format(session.id_)
def toggle_logging(self, session): def toggle_logging(self, session):
@ -436,6 +486,9 @@ class ServerManager(util.LoggedClass):
now = time.time() now = time.time()
if now > self.next_stale_check: if now > self.next_stale_check:
self.next_stale_check = now + 60 self.next_stale_check = now + 60
# Clear out empty groups
for key in [k for k, v in self.groups.items() if not v]:
del self.groups[k]
cutoff = now - self.env.session_timeout cutoff = now - self.env.session_timeout
stale = [session for session in self.sessions stale = [session for session in self.sessions
if session.last_recv < cutoff if session.last_recv < cutoff
@ -465,6 +518,7 @@ class ServerManager(util.LoggedClass):
'blocks': self.bp.db_height, 'blocks': self.bp.db_height,
'closing': len([s for s in self.sessions if s.is_closing()]), 'closing': len([s for s in self.sessions if s.is_closing()]),
'errors': sum(s.error_count for s in self.sessions), 'errors': sum(s.error_count for s in self.sessions),
'logged': len([s for s in self.sessions if s.log_me]),
'peers': len(self.irc.peers), 'peers': len(self.irc.peers),
'sessions': self.session_count(), 'sessions': self.session_count(),
'txs_sent': self.txs_sent, 'txs_sent': self.txs_sent,
@ -482,9 +536,9 @@ class ServerManager(util.LoggedClass):
return ('{:3d}:{:02d}:{:02d}' return ('{:3d}:{:02d}:{:02d}'
.format(t // 3600, (t % 3600) // 60, t % 60)) .format(t // 3600, (t % 3600) // 60, t % 60))
fmt = ('{:<6} {:<3} {:>23} {:>15} {:>7} ' fmt = ('{:<6} {:<5} {:>23} {:>15} {:>7} '
'{:>7} {:>7} {:>7} {:>7} {:>5} {:>9}') '{:>7} {:>7} {:>7} {:>7} {:>5} {:>9}')
yield fmt.format('ID', 'Flg', 'Peer', 'Client', 'Subs', yield fmt.format('ID', 'Flags', 'Peer', 'Client', 'Subs',
'Recv', 'Recv KB', 'Sent', 'Sent KB', 'Recv', 'Recv KB', 'Sent', 'Sent KB',
'Txs', 'Time') 'Txs', 'Time')
for (id_, flags, peer, subs, client, recv_count, recv_size, for (id_, flags, peer, subs, client, recv_count, recv_size,
@ -501,7 +555,7 @@ class ServerManager(util.LoggedClass):
def session_data(self, for_log): def session_data(self, for_log):
'''Returned to the RPC 'sessions' call.''' '''Returned to the RPC 'sessions' call.'''
now = time.time() now = time.time()
sessions = sorted(self.sessions.keys(), key=lambda s: s.start) sessions = sorted(self.sessions, key=lambda s: s.start)
return [(session.id_, return [(session.id_,
session.flags(), session.flags(),
session.peername(for_log=for_log), session.peername(for_log=for_log),
@ -577,7 +631,7 @@ class Session(JSONRPC):
self.max_send = env.max_send self.max_send = env.max_send
self.bandwidth_limit = env.bandwidth_limit self.bandwidth_limit = env.bandwidth_limit
self.txs_sent = 0 self.txs_sent = 0
self.bucket = int(self.start - self.manager.start) // 60 self.priority = 1
def is_closing(self): def is_closing(self):
'''True if this session is closing.''' '''True if this session is closing.'''
@ -592,6 +646,9 @@ class Session(JSONRPC):
status += 'L' status += 'L'
return status return status
def enqueue_request(self, request):
self.manager.enqueue_request(self, request)
def connection_made(self, transport): def connection_made(self, transport):
'''Handle an incoming client connection.''' '''Handle an incoming client connection.'''
super().connection_made(transport) super().connection_made(transport)
@ -615,19 +672,6 @@ class Session(JSONRPC):
return await handler(params) return await handler(params)
async def serve_requests(self):
'''Asynchronously run through the task queue.'''
while True:
request = await self.messages.get()
try:
await request.process()
except asyncio.CancelledError:
break
except Exception:
# Getting here should probably be considered a bug and fixed
self.log_error('error handling request {}'.format(request))
traceback.print_exc()
def sub_count(self): def sub_count(self):
return 0 return 0

Loading…
Cancel
Save