Browse Source

Serve sessions in a priority queue.

Add new RPC command requests.
Adjust sessions RPC command to show pending requests.
Only keep first 15 chars in version string.
Set socket timeout
Try more forceful closing of socket
Fixes #67
master
Neil Booth 8 years ago
parent
commit
2a461bd98c
  1. 11
      electrumx_rpc.py
  2. 56
      lib/jsonrpc.py
  3. 149
      server/protocol.py

11
electrumx_rpc.py

@ -22,13 +22,20 @@ from server.protocol import ServerManager
class RPCClient(JSONRPC):
def __init__(self):
super().__init__()
self.queue = asyncio.Queue()
def enqueue_request(self, request):
self.queue.put_nowait(request)
async def send_and_wait(self, method, params, timeout=None):
# Raise incoming buffer size - presumably connection is trusted
self.max_buffer_size = 5000000
payload = self.request_payload(method, id_=method, params=params)
self.encode_and_send_payload(payload)
future = asyncio.ensure_future(self.messages.get())
future = asyncio.ensure_future(self.queue.get())
for f in asyncio.as_completed([future], timeout=timeout):
try:
request = await f
@ -36,7 +43,7 @@ class RPCClient(JSONRPC):
future.cancel()
print('request timed out after {}s'.format(timeout))
else:
await request.process()
await request.process(1)
async def handle_response(self, result, error, method):
if result and method == 'sessions':

56
lib/jsonrpc.py

@ -10,6 +10,7 @@
import asyncio
import json
import numbers
import socket
import time
from lib.util import LoggedClass
@ -20,46 +21,61 @@ class SingleRequest(object):
def __init__(self, session, payload):
self.payload = payload
self.session = session
self.count = 1
async def process(self):
def remaining(self):
return self.count
async def process(self, limit):
'''Asynchronously handle the JSON request.'''
binary = await self.session.process_single_payload(self.payload)
if binary:
self.session._send_bytes(binary)
self.count = 0
return 1
def __str__(self):
return str(self.payload)
class BatchRequest(object):
'''An object that represents a batch request and its processing state.
Batches are processed in parts chunks.
Batches are processed in chunks.
'''
CUHNK_SIZE = 3
def __init__(self, session, payload):
self.session = session
self.payload = payload
self.done = 0
self.parts = []
async def process(self):
def remaining(self):
return len(self.payload) - self.done
async def process(self, limit):
'''Asynchronously handle the JSON batch according to the JSON 2.0
spec.'''
for n in range(self.CHUNK_SIZE):
if self.done >= len(self.payload):
if self.parts:
binary = b'[' + b', '.join(self.parts) + b']'
self.session._send_bytes(binary)
return
count = min(limit, self.remaining())
for n in range(count):
item = self.payload[self.done]
part = await self.session.process_single_payload(item)
if part:
self.parts.append(part)
self.done += 1
# Re-enqueue to continue the rest later
self.session.enqueue_request(self)
return b''
total_len = sum(len(part) + 2 for part in self.parts)
self.session.check_oversized_request(total_len)
if not self.remaining():
if self.parts:
binary = b'[' + b', '.join(self.parts) + b']'
self.session._send_bytes(binary)
return count
def __str__(self):
return str(self.payload)
class JSONRPC(asyncio.Protocol, LoggedClass):
@ -135,6 +151,7 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
self.bandwidth_used = 0
self.bandwidth_limit = 5000000
self.transport = None
self.socket = None
# Parts of an incomplete JSON line. We buffer them until
# getting a newline.
self.parts = []
@ -170,11 +187,18 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
'''Handle an incoming client connection.'''
self.transport = transport
self.peer_info = transport.get_extra_info('peername')
self.socket = transport.get_extra_info('socket')
self.socket.settimeout(10)
def connection_lost(self, exc):
'''Handle client disconnection.'''
pass
def close_connection(self):
if self.transport:
self.transport.close()
self.socket.shutdown(socket.SHUT_RDWR)
def using_bandwidth(self, amount):
now = time.time()
# Reduce the recorded usage in proportion to the elapsed time
@ -200,7 +224,7 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
'byte limit, closing {}'
.format(buffer_size, self.max_buffer_size,
self.peername()))
self.transport.close()
self.close_connection()
# Do nothing if this connection is closing
if self.transport.is_closing():
@ -274,7 +298,7 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
self.transport.write(binary)
self.transport.write(b'\n')
if close or self.error_count > 10:
self.transport.close()
self.close_connection()
def send_json_error(self, message, code, id_=None, close=False):
'''Send a JSON error and close the connection by default.'''

149
server/protocol.py

@ -67,8 +67,6 @@ class MemPool(util.LoggedClass):
await asyncio.sleep(5)
except DaemonError as e:
self.logger.info('ignoring daemon error: {}'.format(e))
except asyncio.CancelledError:
break
async def update(self):
'''Update state given the current mempool to the passed set of hashes.
@ -217,12 +215,18 @@ class ServerManager(util.LoggedClass):
up with the daemon.
'''
MgrTask = namedtuple('MgrTask', 'session task')
N = 5
BANDS = 5
class NotificationRequest(object):
def __init__(self, fn_call):
self.process = fn_call
self.fn_call = fn_call
def remaining(self):
return 0
async def process(self, limit):
await self.fn_call()
return 0
def __init__(self, env):
super().__init__()
@ -242,8 +246,8 @@ class ServerManager(util.LoggedClass):
self.history_cache = pylru.lrucache(256)
self.header_cache = pylru.lrucache(8)
self.queue = asyncio.PriorityQueue()
self.delayed_queue = []
self.next_request_id = 0
self.delayed_sessions = []
self.next_queue_id = 0
self.height = 0
self.futures = []
env.max_send = max(350000, env.max_send)
@ -273,27 +277,14 @@ class ServerManager(util.LoggedClass):
'''
return self.mempool.value(hash168)
async def serve_requests(self):
'''Asynchronously run through the task queue.'''
while True:
priority_, id_, request = await self.queue.get()
try:
await request.process()
except asyncio.CancelledError:
break
except Exception:
# Getting here should probably be considered a bug and fixed
self.log_error('error handling request {}'.format(request))
traceback.print_exc()
def setup_bands(self):
bands = []
limit = env.bandwidth_limit
for n in range(self.N):
limit = self.env.bandwidth_limit
for n in range(self.BANDS):
bands.append(limit)
limit //= 4
limit = env.bandwidth_limit
for n in range(self.N):
limit = self.env.bandwidth_limit
for n in range(self.BANDS):
limit += limit // 2
bands.append(limit)
self.bands = sorted(bands)
@ -306,17 +297,40 @@ class ServerManager(util.LoggedClass):
return (bisect_left(self.bands, session.bandwidth_used)
+ bisect_left(self.bands, group_bandwidth) + 1) // 2
def enqueue_request(self, session, request):
async def enqueue_delayed_sessions(self):
now = time.time()
keep = []
for pair in self.delayed_sessions:
timeout, session = pair
if timeout <= now:
self.queue.put_nowait(session)
else:
keep.append(pair)
self.delayed_sessions = keep
await asyncio.sleep(1)
def enqueue_session(self, session):
# Might have disconnected whilst waiting
if not session in self.sessions:
return
priority = self.session_priority(session)
item = (priority, self.next_request_id, request)
self.next_request_id += 1
item = (priority, self.next_queue_id, session)
self.next_queue_id += 1
secs = priority - self.N
secs = priority - self.BANDS
if secs >= 0:
self.delayed_queue.append((time.time() + secs, item))
session.log_info('delaying response {:d}s'.format(secs))
self.delayed_sessions.append((time.time() + secs, item))
else:
self.queue.put_nowait(item)
async def serve_requests(self):
'''Asynchronously run through the task queue.'''
while True:
priority_, id_, session = await self.queue.get()
if session in self.sessions:
await session.serve_requests()
async def main_loop(self):
'''Server manager main loop.'''
def add_future(coro):
@ -328,6 +342,7 @@ class ServerManager(util.LoggedClass):
add_future(self.mempool.main_loop(self.bp.event))
add_future(self.irc.start(self.bp.event))
add_future(self.start_servers(self.bp.event))
add_future(self.enqueue_delayed_sessions())
for n in range(4):
add_future(self.serve_requests())
@ -460,7 +475,7 @@ class ServerManager(util.LoggedClass):
if not self.servers:
return
self.clear_stale_sessions()
group = self.groups[int(self.start - self.manager.start) // 60]
group = self.groups[int(session.start - self.start) // 60]
group.add(session)
self.sessions[session] = group
session.log_info('connection from {}, {:,d} total'
@ -473,7 +488,7 @@ class ServerManager(util.LoggedClass):
def close_session(self, session):
'''Close the session's transport and cancel its future.'''
session.transport.close()
session.close_connection()
return 'disconnected {:d}'.format(session.id_)
def toggle_logging(self, session):
@ -488,7 +503,7 @@ class ServerManager(util.LoggedClass):
self.next_stale_check = now + 60
# Clear out empty groups
for key in [k for k, v in self.groups.items() if not v]:
del self.groups[k]
del self.groups[key]
cutoff = now - self.env.session_timeout
stale = [session for session in self.sessions
if session.last_recv < cutoff
@ -518,8 +533,10 @@ class ServerManager(util.LoggedClass):
'blocks': self.bp.db_height,
'closing': len([s for s in self.sessions if s.is_closing()]),
'errors': sum(s.error_count for s in self.sessions),
'groups': len(self.groups),
'logged': len([s for s in self.sessions if s.log_me]),
'peers': len(self.irc.peers),
'requests': sum(s.requests_remaining() for s in self.sessions),
'sessions': self.session_count(),
'txs_sent': self.txs_sent,
'watched': self.subscription_count,
@ -539,17 +556,18 @@ class ServerManager(util.LoggedClass):
fmt = ('{:<6} {:<5} {:>23} {:>15} {:>7} '
'{:>7} {:>7} {:>7} {:>7} {:>5} {:>9}')
yield fmt.format('ID', 'Flags', 'Peer', 'Client', 'Subs',
'Recv', 'Recv KB', 'Sent', 'Sent KB',
'Txs', 'Time')
for (id_, flags, peer, subs, client, recv_count, recv_size,
send_count, send_size, txs_sent, time) in data:
'Reqs', 'Txs', 'Recv', 'Recv KB', 'Sent',
'Sent KB', 'Time')
for (id_, flags, peer, client, subs, reqs, txs_sent,
recv_count, recv_size, send_count, send_size, time) in data:
yield fmt.format(id_, flags, peer, client,
'{:,d}'.format(subs),
'{:,d}'.format(reqs),
'{:,d}'.format(txs_sent),
'{:,d}'.format(recv_count),
'{:,d}'.format(recv_size // 1024),
'{:,d}'.format(send_count),
'{:,d}'.format(send_size // 1024),
'{:,d}'.format(txs_sent),
time_fmt(time))
def session_data(self, for_log):
@ -559,11 +577,12 @@ class ServerManager(util.LoggedClass):
return [(session.id_,
session.flags(),
session.peername(for_log=for_log),
session.sub_count(),
session.client,
session.requests_remaining(),
session.txs_sent,
session.sub_count(),
session.recv_count, session.recv_size,
session.send_count, session.send_size,
session.txs_sent,
now - session.start)
for session in sessions]
@ -597,6 +616,15 @@ class ServerManager(util.LoggedClass):
async def rpc_getinfo(self, params):
return self.server_summary()
async def rpc_groups(self, params):
result = {}
msg = '{:,d} sessions, {:,d} requests, {:,d}KB b/w quota used'
for group, sessions in self.groups.items():
bandwidth = sum(s.bandwidth_used for s in sessions)
reqs = sum(s.requests_remaining() for s in sessions)
result[group] = msg.format(len(sessions), reqs, bandwidth // 1024)
return result
async def rpc_sessions(self, params):
return self.session_data(for_log=False)
@ -631,7 +659,7 @@ class Session(JSONRPC):
self.max_send = env.max_send
self.bandwidth_limit = env.bandwidth_limit
self.txs_sent = 0
self.priority = 1
self.requests = []
def is_closing(self):
'''True if this session is closing.'''
@ -644,10 +672,44 @@ class Session(JSONRPC):
status += 'C'
if self.log_me:
status += 'L'
status += str(self.manager.session_priority(self))
return status
def requests_remaining(self):
return sum(request.remaining() for request in self.requests)
def enqueue_request(self, request):
self.manager.enqueue_request(self, request)
'''Add a request to the session's list.'''
if not self.requests:
self.manager.enqueue_session(self)
self.requests.append(request)
async def serve_requests(self):
'''Serve requests in batches.'''
done_reqs = 0
done_jobs = 0
limit = 4
for request in self.requests:
try:
done_jobs += await request.process(limit - done_jobs)
except asyncio.CancelledError:
raise
except Exception:
# Getting here should probably be considered a bug and fixed
self.log_error('error handling request {}'.format(request))
traceback.print_exc()
done_reqs += 1
else:
if not request.remaining():
done_reqs += 1
if done_jobs >= limit:
break
# Remove completed requests and re-enqueue ourself if any remain.
if done_reqs:
self.requests = self.requests[done_reqs:]
if self.requests:
self.manager.enqueue_session(self)
def connection_made(self, transport):
'''Handle an incoming client connection.'''
@ -1030,7 +1092,7 @@ class ElectrumX(Session):
async def version(self, params):
'''Return the server version as a string.'''
if params:
self.client = str(params[0])
self.client = str(params[0])[:15]
if len(params) > 1:
self.protocol_version = params[1]
return VERSION
@ -1041,7 +1103,8 @@ class LocalRPC(Session):
def __init__(self, *args):
super().__init__(*args)
cmds = ('disconnect getinfo log numpeers numsessions peers sessions'
cmds = ('disconnect getinfo groups log numpeers numsessions '
'peers sessions'
.split())
self.handlers = {cmd: getattr(self.manager, 'rpc_{}'.format(cmd))
for cmd in cmds}

Loading…
Cancel
Save