Browse Source

Merge branch 'develop'

master
Neil Booth 8 years ago
parent
commit
5374eccd16
  1. 12
      RELEASE-NOTES
  2. 11
      electrumx_rpc.py
  3. 63
      lib/jsonrpc.py
  4. 191
      server/protocol.py
  5. 2
      server/version.py

12
RELEASE-NOTES

@ -1,3 +1,15 @@
version 0.8.8
-------------
- put sessions in a priority queue to better prioritise serving. Low-bandwidth
sessions get served first
- new RPC command "groups" - shows information about session groups
- sessions output: session priority shown under Flags column; the lower the
number the higher the priority. txs column moved, new column reqs showns
the number of outstanding requests for that connection (includes subrequests
of batches)
- issued fixed: #67
version 0.8.7 version 0.8.7
------------- -------------

11
electrumx_rpc.py

@ -22,13 +22,20 @@ from server.protocol import ServerManager
class RPCClient(JSONRPC): class RPCClient(JSONRPC):
def __init__(self):
super().__init__()
self.queue = asyncio.Queue()
def enqueue_request(self, request):
self.queue.put_nowait(request)
async def send_and_wait(self, method, params, timeout=None): async def send_and_wait(self, method, params, timeout=None):
# Raise incoming buffer size - presumably connection is trusted # Raise incoming buffer size - presumably connection is trusted
self.max_buffer_size = 5000000 self.max_buffer_size = 5000000
payload = self.request_payload(method, id_=method, params=params) payload = self.request_payload(method, id_=method, params=params)
self.encode_and_send_payload(payload) self.encode_and_send_payload(payload)
future = asyncio.ensure_future(self.messages.get()) future = asyncio.ensure_future(self.queue.get())
for f in asyncio.as_completed([future], timeout=timeout): for f in asyncio.as_completed([future], timeout=timeout):
try: try:
request = await f request = await f
@ -36,7 +43,7 @@ class RPCClient(JSONRPC):
future.cancel() future.cancel()
print('request timed out after {}s'.format(timeout)) print('request timed out after {}s'.format(timeout))
else: else:
await request.process() await request.process(1)
async def handle_response(self, result, error, method): async def handle_response(self, result, error, method):
if result and method == 'sessions': if result and method == 'sessions':

63
lib/jsonrpc.py

@ -10,6 +10,7 @@
import asyncio import asyncio
import json import json
import numbers import numbers
import socket
import time import time
from lib.util import LoggedClass from lib.util import LoggedClass
@ -20,46 +21,61 @@ class SingleRequest(object):
def __init__(self, session, payload): def __init__(self, session, payload):
self.payload = payload self.payload = payload
self.session = session self.session = session
self.count = 1
async def process(self): def remaining(self):
return self.count
async def process(self, limit):
'''Asynchronously handle the JSON request.''' '''Asynchronously handle the JSON request.'''
binary = await self.session.process_single_payload(self.payload) binary = await self.session.process_single_payload(self.payload)
if binary: if binary:
self.session._send_bytes(binary) self.session._send_bytes(binary)
self.count = 0
return 1
def __str__(self):
return str(self.payload)
class BatchRequest(object): class BatchRequest(object):
'''An object that represents a batch request and its processing state. '''An object that represents a batch request and its processing state.
Batches are processed in parts chunks. Batches are processed in chunks.
''' '''
CUHNK_SIZE = 3
def __init__(self, session, payload): def __init__(self, session, payload):
self.session = session self.session = session
self.payload = payload self.payload = payload
self.done = 0 self.done = 0
self.parts = [] self.parts = []
async def process(self): def remaining(self):
return len(self.payload) - self.done
async def process(self, limit):
'''Asynchronously handle the JSON batch according to the JSON 2.0 '''Asynchronously handle the JSON batch according to the JSON 2.0
spec.''' spec.'''
for n in range(self.CHUNK_SIZE): count = min(limit, self.remaining())
if self.done >= len(self.payload): for n in range(count):
if self.parts:
binary = b'[' + b', '.join(self.parts) + b']'
self.session._send_bytes(binary)
return
item = self.payload[self.done] item = self.payload[self.done]
part = await self.session.process_single_payload(item) part = await self.session.process_single_payload(item)
if part: if part:
self.parts.append(part) self.parts.append(part)
self.done += 1 self.done += 1
# Re-enqueue to continue the rest later total_len = sum(len(part) + 2 for part in self.parts)
self.session.enqueue_request(self) self.session.check_oversized_request(total_len)
return b''
if not self.remaining():
if self.parts:
binary = b'[' + b', '.join(self.parts) + b']'
self.session._send_bytes(binary)
return count
def __str__(self):
return str(self.payload)
class JSONRPC(asyncio.Protocol, LoggedClass): class JSONRPC(asyncio.Protocol, LoggedClass):
@ -67,8 +83,8 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
Assumes JSON messages are newline-separated and that newlines Assumes JSON messages are newline-separated and that newlines
cannot appear in the JSON other than to separate lines. Incoming cannot appear in the JSON other than to separate lines. Incoming
messages are queued on the messages queue for later asynchronous requests are passed to enqueue_request(), which should arrange for
processing, and should be passed to the handle_request() function. their asynchronous handling via the request's process() method.
Derived classes may want to override connection_made() and Derived classes may want to override connection_made() and
connection_lost() but should be sure to call the implementation in connection_lost() but should be sure to call the implementation in
@ -135,6 +151,7 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
self.bandwidth_used = 0 self.bandwidth_used = 0
self.bandwidth_limit = 5000000 self.bandwidth_limit = 5000000
self.transport = None self.transport = None
self.socket = None
# Parts of an incomplete JSON line. We buffer them until # Parts of an incomplete JSON line. We buffer them until
# getting a newline. # getting a newline.
self.parts = [] self.parts = []
@ -145,7 +162,6 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
self.send_size = 0 self.send_size = 0
self.error_count = 0 self.error_count = 0
self.peer_info = None self.peer_info = None
self.messages = asyncio.Queue()
# Sends longer than max_send are prevented, instead returning # Sends longer than max_send are prevented, instead returning
# an oversized request error to other end of the network # an oversized request error to other end of the network
# connection. The request causing it is logged. Values under # connection. The request causing it is logged. Values under
@ -171,11 +187,18 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
'''Handle an incoming client connection.''' '''Handle an incoming client connection.'''
self.transport = transport self.transport = transport
self.peer_info = transport.get_extra_info('peername') self.peer_info = transport.get_extra_info('peername')
self.socket = transport.get_extra_info('socket')
self.socket.settimeout(10)
def connection_lost(self, exc): def connection_lost(self, exc):
'''Handle client disconnection.''' '''Handle client disconnection.'''
pass pass
def close_connection(self):
if self.transport:
self.transport.close()
self.socket.shutdown(socket.SHUT_RDWR)
def using_bandwidth(self, amount): def using_bandwidth(self, amount):
now = time.time() now = time.time()
# Reduce the recorded usage in proportion to the elapsed time # Reduce the recorded usage in proportion to the elapsed time
@ -201,7 +224,7 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
'byte limit, closing {}' 'byte limit, closing {}'
.format(buffer_size, self.max_buffer_size, .format(buffer_size, self.max_buffer_size,
self.peername())) self.peername()))
self.transport.close() self.close_connection()
# Do nothing if this connection is closing # Do nothing if this connection is closing
if self.transport.is_closing(): if self.transport.is_closing():
@ -275,7 +298,7 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
self.transport.write(binary) self.transport.write(binary)
self.transport.write(b'\n') self.transport.write(b'\n')
if close or self.error_count > 10: if close or self.error_count > 10:
self.transport.close() self.close_connection()
def send_json_error(self, message, code, id_=None, close=False): def send_json_error(self, message, code, id_=None, close=False):
'''Send a JSON error and close the connection by default.''' '''Send a JSON error and close the connection by default.'''
@ -408,7 +431,7 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
# --- derived classes are intended to override these functions # --- derived classes are intended to override these functions
def enqueue_request(self, request): def enqueue_request(self, request):
'''Enqueue a request for later asynchronous processing.''' '''Enqueue a request for later asynchronous processing.'''
self.messages.put_nowait(request) raise NotImplementedError
async def handle_notification(self, method, params): async def handle_notification(self, method, params):
'''Handle a notification.''' '''Handle a notification.'''

191
server/protocol.py

@ -14,6 +14,7 @@ import json
import ssl import ssl
import time import time
import traceback import traceback
from bisect import bisect_left
from collections import defaultdict, namedtuple from collections import defaultdict, namedtuple
from functools import partial from functools import partial
@ -66,8 +67,6 @@ class MemPool(util.LoggedClass):
await asyncio.sleep(5) await asyncio.sleep(5)
except DaemonError as e: except DaemonError as e:
self.logger.info('ignoring daemon error: {}'.format(e)) self.logger.info('ignoring daemon error: {}'.format(e))
except asyncio.CancelledError:
break
async def update(self): async def update(self):
'''Update state given the current mempool to the passed set of hashes. '''Update state given the current mempool to the passed set of hashes.
@ -216,11 +215,18 @@ class ServerManager(util.LoggedClass):
up with the daemon. up with the daemon.
''' '''
MgrTask = namedtuple('MgrTask', 'session task') BANDS = 5
class NotificationRequest(object): class NotificationRequest(object):
def __init__(self, fn_call): def __init__(self, fn_call):
self.process = fn_call self.fn_call = fn_call
def remaining(self):
return 0
async def process(self, limit):
await self.fn_call()
return 0
def __init__(self, env): def __init__(self, env):
super().__init__() super().__init__()
@ -231,6 +237,7 @@ class ServerManager(util.LoggedClass):
self.env = env self.env = env
self.servers = [] self.servers = []
self.sessions = {} self.sessions = {}
self.groups = defaultdict(set)
self.txs_sent = 0 self.txs_sent = 0
self.next_log_sessions = 0 self.next_log_sessions = 0
self.max_subs = env.max_subs self.max_subs = env.max_subs
@ -238,9 +245,13 @@ class ServerManager(util.LoggedClass):
self.next_stale_check = 0 self.next_stale_check = 0
self.history_cache = pylru.lrucache(256) self.history_cache = pylru.lrucache(256)
self.header_cache = pylru.lrucache(8) self.header_cache = pylru.lrucache(8)
self.queue = asyncio.PriorityQueue()
self.delayed_sessions = []
self.next_queue_id = 0
self.height = 0 self.height = 0
self.futures = [] self.futures = []
env.max_send = max(350000, env.max_send) env.max_send = max(350000, env.max_send)
self.setup_bands()
self.logger.info('session timeout: {:,d} seconds' self.logger.info('session timeout: {:,d} seconds'
.format(env.session_timeout)) .format(env.session_timeout))
self.logger.info('session bandwidth limit {:,d} bytes' self.logger.info('session bandwidth limit {:,d} bytes'
@ -266,6 +277,60 @@ class ServerManager(util.LoggedClass):
''' '''
return self.mempool.value(hash168) return self.mempool.value(hash168)
def setup_bands(self):
bands = []
limit = self.env.bandwidth_limit
for n in range(self.BANDS):
bands.append(limit)
limit //= 4
limit = self.env.bandwidth_limit
for n in range(self.BANDS):
limit += limit // 2
bands.append(limit)
self.bands = sorted(bands)
self.logger.info('bands: {}'.format(self.bands))
def session_priority(self, session):
if isinstance(session, LocalRPC):
return 0
group_bandwidth = sum(s.bandwidth_used for s in self.sessions[session])
return (bisect_left(self.bands, session.bandwidth_used)
+ bisect_left(self.bands, group_bandwidth) + 1) // 2
async def enqueue_delayed_sessions(self):
now = time.time()
keep = []
for pair in self.delayed_sessions:
timeout, session = pair
if timeout <= now:
self.queue.put_nowait(session)
else:
keep.append(pair)
self.delayed_sessions = keep
await asyncio.sleep(1)
def enqueue_session(self, session):
# Might have disconnected whilst waiting
if not session in self.sessions:
return
priority = self.session_priority(session)
item = (priority, self.next_queue_id, session)
self.next_queue_id += 1
secs = priority - self.BANDS
if secs >= 0:
session.log_info('delaying response {:d}s'.format(secs))
self.delayed_sessions.append((time.time() + secs, item))
else:
self.queue.put_nowait(item)
async def serve_requests(self):
'''Asynchronously run through the task queue.'''
while True:
priority_, id_, session = await self.queue.get()
if session in self.sessions:
await session.serve_requests()
async def main_loop(self): async def main_loop(self):
'''Server manager main loop.''' '''Server manager main loop.'''
def add_future(coro): def add_future(coro):
@ -277,6 +342,9 @@ class ServerManager(util.LoggedClass):
add_future(self.mempool.main_loop(self.bp.event)) add_future(self.mempool.main_loop(self.bp.event))
add_future(self.irc.start(self.bp.event)) add_future(self.irc.start(self.bp.event))
add_future(self.start_servers(self.bp.event)) add_future(self.start_servers(self.bp.event))
add_future(self.enqueue_delayed_sessions())
for n in range(4):
add_future(self.serve_requests())
for future in asyncio.as_completed(self.futures): for future in asyncio.as_completed(self.futures):
try: try:
@ -403,27 +471,24 @@ class ServerManager(util.LoggedClass):
.format(len(self.sessions))) .format(len(self.sessions)))
def add_session(self, session): def add_session(self, session):
# Some connections are acknowledged after the servers are closed
if not self.servers:
return
self.clear_stale_sessions() self.clear_stale_sessions()
coro = session.serve_requests() group = self.groups[int(session.start - self.start) // 60]
future = asyncio.ensure_future(coro) group.add(session)
self.sessions[session] = future self.sessions[session] = group
session.log_info('connection from {}, {:,d} total' session.log_info('connection from {}, {:,d} total'
.format(session.peername(), len(self.sessions))) .format(session.peername(), len(self.sessions)))
# Some connections are acknowledged after the servers are closed
if not self.servers:
self.close_session(session)
def remove_session(self, session): def remove_session(self, session):
# It might have been forcefully removed earlier by close_session() group = self.sessions.pop(session)
if session in self.sessions: group.remove(session)
self.subscription_count -= session.sub_count() self.subscription_count -= session.sub_count()
future = self.sessions.pop(session)
future.cancel()
def close_session(self, session): def close_session(self, session):
'''Close the session's transport and cancel its future.''' '''Close the session's transport and cancel its future.'''
session.transport.close() session.close_connection()
self.sessions[session].cancel()
return 'disconnected {:d}'.format(session.id_) return 'disconnected {:d}'.format(session.id_)
def toggle_logging(self, session): def toggle_logging(self, session):
@ -436,6 +501,9 @@ class ServerManager(util.LoggedClass):
now = time.time() now = time.time()
if now > self.next_stale_check: if now > self.next_stale_check:
self.next_stale_check = now + 60 self.next_stale_check = now + 60
# Clear out empty groups
for key in [k for k, v in self.groups.items() if not v]:
del self.groups[key]
cutoff = now - self.env.session_timeout cutoff = now - self.env.session_timeout
stale = [session for session in self.sessions stale = [session for session in self.sessions
if session.last_recv < cutoff if session.last_recv < cutoff
@ -465,7 +533,10 @@ class ServerManager(util.LoggedClass):
'blocks': self.bp.db_height, 'blocks': self.bp.db_height,
'closing': len([s for s in self.sessions if s.is_closing()]), 'closing': len([s for s in self.sessions if s.is_closing()]),
'errors': sum(s.error_count for s in self.sessions), 'errors': sum(s.error_count for s in self.sessions),
'groups': len(self.groups),
'logged': len([s for s in self.sessions if s.log_me]),
'peers': len(self.irc.peers), 'peers': len(self.irc.peers),
'requests': sum(s.requests_remaining() for s in self.sessions),
'sessions': self.session_count(), 'sessions': self.session_count(),
'txs_sent': self.txs_sent, 'txs_sent': self.txs_sent,
'watched': self.subscription_count, 'watched': self.subscription_count,
@ -482,34 +553,36 @@ class ServerManager(util.LoggedClass):
return ('{:3d}:{:02d}:{:02d}' return ('{:3d}:{:02d}:{:02d}'
.format(t // 3600, (t % 3600) // 60, t % 60)) .format(t // 3600, (t % 3600) // 60, t % 60))
fmt = ('{:<6} {:<3} {:>23} {:>15} {:>7} ' fmt = ('{:<6} {:<5} {:>23} {:>15} {:>7} '
'{:>7} {:>7} {:>7} {:>7} {:>5} {:>9}') '{:>7} {:>7} {:>7} {:>7} {:>5} {:>9}')
yield fmt.format('ID', 'Flg', 'Peer', 'Client', 'Subs', yield fmt.format('ID', 'Flags', 'Peer', 'Client', 'Subs',
'Recv', 'Recv KB', 'Sent', 'Sent KB', 'Reqs', 'Txs', 'Recv', 'Recv KB', 'Sent',
'Txs', 'Time') 'Sent KB', 'Time')
for (id_, flags, peer, subs, client, recv_count, recv_size, for (id_, flags, peer, client, subs, reqs, txs_sent,
send_count, send_size, txs_sent, time) in data: recv_count, recv_size, send_count, send_size, time) in data:
yield fmt.format(id_, flags, peer, client, yield fmt.format(id_, flags, peer, client,
'{:,d}'.format(subs), '{:,d}'.format(subs),
'{:,d}'.format(reqs),
'{:,d}'.format(txs_sent),
'{:,d}'.format(recv_count), '{:,d}'.format(recv_count),
'{:,d}'.format(recv_size // 1024), '{:,d}'.format(recv_size // 1024),
'{:,d}'.format(send_count), '{:,d}'.format(send_count),
'{:,d}'.format(send_size // 1024), '{:,d}'.format(send_size // 1024),
'{:,d}'.format(txs_sent),
time_fmt(time)) time_fmt(time))
def session_data(self, for_log): def session_data(self, for_log):
'''Returned to the RPC 'sessions' call.''' '''Returned to the RPC 'sessions' call.'''
now = time.time() now = time.time()
sessions = sorted(self.sessions.keys(), key=lambda s: s.start) sessions = sorted(self.sessions, key=lambda s: s.start)
return [(session.id_, return [(session.id_,
session.flags(), session.flags(),
session.peername(for_log=for_log), session.peername(for_log=for_log),
session.sub_count(),
session.client, session.client,
session.requests_remaining(),
session.txs_sent,
session.sub_count(),
session.recv_count, session.recv_size, session.recv_count, session.recv_size,
session.send_count, session.send_size, session.send_count, session.send_size,
session.txs_sent,
now - session.start) now - session.start)
for session in sessions] for session in sessions]
@ -543,6 +616,15 @@ class ServerManager(util.LoggedClass):
async def rpc_getinfo(self, params): async def rpc_getinfo(self, params):
return self.server_summary() return self.server_summary()
async def rpc_groups(self, params):
result = {}
msg = '{:,d} sessions, {:,d} requests, {:,d}KB b/w quota used'
for group, sessions in self.groups.items():
bandwidth = sum(s.bandwidth_used for s in sessions)
reqs = sum(s.requests_remaining() for s in sessions)
result[group] = msg.format(len(sessions), reqs, bandwidth // 1024)
return result
async def rpc_sessions(self, params): async def rpc_sessions(self, params):
return self.session_data(for_log=False) return self.session_data(for_log=False)
@ -577,7 +659,7 @@ class Session(JSONRPC):
self.max_send = env.max_send self.max_send = env.max_send
self.bandwidth_limit = env.bandwidth_limit self.bandwidth_limit = env.bandwidth_limit
self.txs_sent = 0 self.txs_sent = 0
self.bucket = int(self.start - self.manager.start) // 60 self.requests = []
def is_closing(self): def is_closing(self):
'''True if this session is closing.''' '''True if this session is closing.'''
@ -590,8 +672,45 @@ class Session(JSONRPC):
status += 'C' status += 'C'
if self.log_me: if self.log_me:
status += 'L' status += 'L'
status += str(self.manager.session_priority(self))
return status return status
def requests_remaining(self):
return sum(request.remaining() for request in self.requests)
def enqueue_request(self, request):
'''Add a request to the session's list.'''
if not self.requests:
self.manager.enqueue_session(self)
self.requests.append(request)
async def serve_requests(self):
'''Serve requests in batches.'''
done_reqs = 0
done_jobs = 0
limit = 4
for request in self.requests:
try:
done_jobs += await request.process(limit - done_jobs)
except asyncio.CancelledError:
raise
except Exception:
# Getting here should probably be considered a bug and fixed
self.log_error('error handling request {}'.format(request))
traceback.print_exc()
done_reqs += 1
else:
if not request.remaining():
done_reqs += 1
if done_jobs >= limit:
break
# Remove completed requests and re-enqueue ourself if any remain.
if done_reqs:
self.requests = self.requests[done_reqs:]
if self.requests:
self.manager.enqueue_session(self)
def connection_made(self, transport): def connection_made(self, transport):
'''Handle an incoming client connection.''' '''Handle an incoming client connection.'''
super().connection_made(transport) super().connection_made(transport)
@ -615,19 +734,6 @@ class Session(JSONRPC):
return await handler(params) return await handler(params)
async def serve_requests(self):
'''Asynchronously run through the task queue.'''
while True:
request = await self.messages.get()
try:
await request.process()
except asyncio.CancelledError:
break
except Exception:
# Getting here should probably be considered a bug and fixed
self.log_error('error handling request {}'.format(request))
traceback.print_exc()
def sub_count(self): def sub_count(self):
return 0 return 0
@ -986,7 +1092,7 @@ class ElectrumX(Session):
async def version(self, params): async def version(self, params):
'''Return the server version as a string.''' '''Return the server version as a string.'''
if params: if params:
self.client = str(params[0]) self.client = str(params[0])[:15]
if len(params) > 1: if len(params) > 1:
self.protocol_version = params[1] self.protocol_version = params[1]
return VERSION return VERSION
@ -997,7 +1103,8 @@ class LocalRPC(Session):
def __init__(self, *args): def __init__(self, *args):
super().__init__(*args) super().__init__(*args)
cmds = ('disconnect getinfo log numpeers numsessions peers sessions' cmds = ('disconnect getinfo groups log numpeers numsessions '
'peers sessions'
.split()) .split())
self.handlers = {cmd: getattr(self.manager, 'rpc_{}'.format(cmd)) self.handlers = {cmd: getattr(self.manager, 'rpc_{}'.format(cmd))
for cmd in cmds} for cmd in cmds}

2
server/version.py

@ -1 +1 @@
VERSION = "ElectrumX 0.8.7" VERSION = "ElectrumX 0.8.8"

Loading…
Cancel
Save