diff --git a/RELEASE-NOTES b/RELEASE-NOTES index fceca12..8f532cb 100644 --- a/RELEASE-NOTES +++ b/RELEASE-NOTES @@ -1,3 +1,15 @@ +version 0.8.8 +------------- + +- put sessions in a priority queue to better prioritise serving. Low-bandwidth + sessions get served first +- new RPC command "groups" - shows information about session groups +- sessions output: session priority shown under Flags column; the lower the + number the higher the priority. txs column moved, new column reqs showns + the number of outstanding requests for that connection (includes subrequests + of batches) +- issued fixed: #67 + version 0.8.7 ------------- diff --git a/electrumx_rpc.py b/electrumx_rpc.py index 8de320d..1a6e0e2 100755 --- a/electrumx_rpc.py +++ b/electrumx_rpc.py @@ -22,13 +22,20 @@ from server.protocol import ServerManager class RPCClient(JSONRPC): + def __init__(self): + super().__init__() + self.queue = asyncio.Queue() + + def enqueue_request(self, request): + self.queue.put_nowait(request) + async def send_and_wait(self, method, params, timeout=None): # Raise incoming buffer size - presumably connection is trusted self.max_buffer_size = 5000000 payload = self.request_payload(method, id_=method, params=params) self.encode_and_send_payload(payload) - future = asyncio.ensure_future(self.messages.get()) + future = asyncio.ensure_future(self.queue.get()) for f in asyncio.as_completed([future], timeout=timeout): try: request = await f @@ -36,7 +43,7 @@ class RPCClient(JSONRPC): future.cancel() print('request timed out after {}s'.format(timeout)) else: - await request.process() + await request.process(1) async def handle_response(self, result, error, method): if result and method == 'sessions': diff --git a/lib/jsonrpc.py b/lib/jsonrpc.py index 7773c96..4f30282 100644 --- a/lib/jsonrpc.py +++ b/lib/jsonrpc.py @@ -10,6 +10,7 @@ import asyncio import json import numbers +import socket import time from lib.util import LoggedClass @@ -20,46 +21,61 @@ class SingleRequest(object): def __init__(self, session, payload): self.payload = payload self.session = session + self.count = 1 - async def process(self): + def remaining(self): + return self.count + + async def process(self, limit): '''Asynchronously handle the JSON request.''' binary = await self.session.process_single_payload(self.payload) if binary: self.session._send_bytes(binary) + self.count = 0 + return 1 + + def __str__(self): + return str(self.payload) class BatchRequest(object): '''An object that represents a batch request and its processing state. - Batches are processed in parts chunks. + Batches are processed in chunks. ''' - CUHNK_SIZE = 3 - def __init__(self, session, payload): self.session = session self.payload = payload self.done = 0 self.parts = [] - async def process(self): + def remaining(self): + return len(self.payload) - self.done + + async def process(self, limit): '''Asynchronously handle the JSON batch according to the JSON 2.0 spec.''' - for n in range(self.CHUNK_SIZE): - if self.done >= len(self.payload): - if self.parts: - binary = b'[' + b', '.join(self.parts) + b']' - self.session._send_bytes(binary) - return + count = min(limit, self.remaining()) + for n in range(count): item = self.payload[self.done] part = await self.session.process_single_payload(item) if part: self.parts.append(part) self.done += 1 - # Re-enqueue to continue the rest later - self.session.enqueue_request(self) - return b'' + total_len = sum(len(part) + 2 for part in self.parts) + self.session.check_oversized_request(total_len) + + if not self.remaining(): + if self.parts: + binary = b'[' + b', '.join(self.parts) + b']' + self.session._send_bytes(binary) + + return count + + def __str__(self): + return str(self.payload) class JSONRPC(asyncio.Protocol, LoggedClass): @@ -67,8 +83,8 @@ class JSONRPC(asyncio.Protocol, LoggedClass): Assumes JSON messages are newline-separated and that newlines cannot appear in the JSON other than to separate lines. Incoming - messages are queued on the messages queue for later asynchronous - processing, and should be passed to the handle_request() function. + requests are passed to enqueue_request(), which should arrange for + their asynchronous handling via the request's process() method. Derived classes may want to override connection_made() and connection_lost() but should be sure to call the implementation in @@ -135,6 +151,7 @@ class JSONRPC(asyncio.Protocol, LoggedClass): self.bandwidth_used = 0 self.bandwidth_limit = 5000000 self.transport = None + self.socket = None # Parts of an incomplete JSON line. We buffer them until # getting a newline. self.parts = [] @@ -145,7 +162,6 @@ class JSONRPC(asyncio.Protocol, LoggedClass): self.send_size = 0 self.error_count = 0 self.peer_info = None - self.messages = asyncio.Queue() # Sends longer than max_send are prevented, instead returning # an oversized request error to other end of the network # connection. The request causing it is logged. Values under @@ -171,11 +187,18 @@ class JSONRPC(asyncio.Protocol, LoggedClass): '''Handle an incoming client connection.''' self.transport = transport self.peer_info = transport.get_extra_info('peername') + self.socket = transport.get_extra_info('socket') + self.socket.settimeout(10) def connection_lost(self, exc): '''Handle client disconnection.''' pass + def close_connection(self): + if self.transport: + self.transport.close() + self.socket.shutdown(socket.SHUT_RDWR) + def using_bandwidth(self, amount): now = time.time() # Reduce the recorded usage in proportion to the elapsed time @@ -201,7 +224,7 @@ class JSONRPC(asyncio.Protocol, LoggedClass): 'byte limit, closing {}' .format(buffer_size, self.max_buffer_size, self.peername())) - self.transport.close() + self.close_connection() # Do nothing if this connection is closing if self.transport.is_closing(): @@ -275,7 +298,7 @@ class JSONRPC(asyncio.Protocol, LoggedClass): self.transport.write(binary) self.transport.write(b'\n') if close or self.error_count > 10: - self.transport.close() + self.close_connection() def send_json_error(self, message, code, id_=None, close=False): '''Send a JSON error and close the connection by default.''' @@ -408,7 +431,7 @@ class JSONRPC(asyncio.Protocol, LoggedClass): # --- derived classes are intended to override these functions def enqueue_request(self, request): '''Enqueue a request for later asynchronous processing.''' - self.messages.put_nowait(request) + raise NotImplementedError async def handle_notification(self, method, params): '''Handle a notification.''' diff --git a/server/protocol.py b/server/protocol.py index 07a6533..1a9283f 100644 --- a/server/protocol.py +++ b/server/protocol.py @@ -14,6 +14,7 @@ import json import ssl import time import traceback +from bisect import bisect_left from collections import defaultdict, namedtuple from functools import partial @@ -66,8 +67,6 @@ class MemPool(util.LoggedClass): await asyncio.sleep(5) except DaemonError as e: self.logger.info('ignoring daemon error: {}'.format(e)) - except asyncio.CancelledError: - break async def update(self): '''Update state given the current mempool to the passed set of hashes. @@ -216,11 +215,18 @@ class ServerManager(util.LoggedClass): up with the daemon. ''' - MgrTask = namedtuple('MgrTask', 'session task') + BANDS = 5 class NotificationRequest(object): def __init__(self, fn_call): - self.process = fn_call + self.fn_call = fn_call + + def remaining(self): + return 0 + + async def process(self, limit): + await self.fn_call() + return 0 def __init__(self, env): super().__init__() @@ -231,6 +237,7 @@ class ServerManager(util.LoggedClass): self.env = env self.servers = [] self.sessions = {} + self.groups = defaultdict(set) self.txs_sent = 0 self.next_log_sessions = 0 self.max_subs = env.max_subs @@ -238,9 +245,13 @@ class ServerManager(util.LoggedClass): self.next_stale_check = 0 self.history_cache = pylru.lrucache(256) self.header_cache = pylru.lrucache(8) + self.queue = asyncio.PriorityQueue() + self.delayed_sessions = [] + self.next_queue_id = 0 self.height = 0 self.futures = [] env.max_send = max(350000, env.max_send) + self.setup_bands() self.logger.info('session timeout: {:,d} seconds' .format(env.session_timeout)) self.logger.info('session bandwidth limit {:,d} bytes' @@ -266,6 +277,60 @@ class ServerManager(util.LoggedClass): ''' return self.mempool.value(hash168) + def setup_bands(self): + bands = [] + limit = self.env.bandwidth_limit + for n in range(self.BANDS): + bands.append(limit) + limit //= 4 + limit = self.env.bandwidth_limit + for n in range(self.BANDS): + limit += limit // 2 + bands.append(limit) + self.bands = sorted(bands) + self.logger.info('bands: {}'.format(self.bands)) + + def session_priority(self, session): + if isinstance(session, LocalRPC): + return 0 + group_bandwidth = sum(s.bandwidth_used for s in self.sessions[session]) + return (bisect_left(self.bands, session.bandwidth_used) + + bisect_left(self.bands, group_bandwidth) + 1) // 2 + + async def enqueue_delayed_sessions(self): + now = time.time() + keep = [] + for pair in self.delayed_sessions: + timeout, session = pair + if timeout <= now: + self.queue.put_nowait(session) + else: + keep.append(pair) + self.delayed_sessions = keep + await asyncio.sleep(1) + + def enqueue_session(self, session): + # Might have disconnected whilst waiting + if not session in self.sessions: + return + priority = self.session_priority(session) + item = (priority, self.next_queue_id, session) + self.next_queue_id += 1 + + secs = priority - self.BANDS + if secs >= 0: + session.log_info('delaying response {:d}s'.format(secs)) + self.delayed_sessions.append((time.time() + secs, item)) + else: + self.queue.put_nowait(item) + + async def serve_requests(self): + '''Asynchronously run through the task queue.''' + while True: + priority_, id_, session = await self.queue.get() + if session in self.sessions: + await session.serve_requests() + async def main_loop(self): '''Server manager main loop.''' def add_future(coro): @@ -277,6 +342,9 @@ class ServerManager(util.LoggedClass): add_future(self.mempool.main_loop(self.bp.event)) add_future(self.irc.start(self.bp.event)) add_future(self.start_servers(self.bp.event)) + add_future(self.enqueue_delayed_sessions()) + for n in range(4): + add_future(self.serve_requests()) for future in asyncio.as_completed(self.futures): try: @@ -403,27 +471,24 @@ class ServerManager(util.LoggedClass): .format(len(self.sessions))) def add_session(self, session): + # Some connections are acknowledged after the servers are closed + if not self.servers: + return self.clear_stale_sessions() - coro = session.serve_requests() - future = asyncio.ensure_future(coro) - self.sessions[session] = future + group = self.groups[int(session.start - self.start) // 60] + group.add(session) + self.sessions[session] = group session.log_info('connection from {}, {:,d} total' .format(session.peername(), len(self.sessions))) - # Some connections are acknowledged after the servers are closed - if not self.servers: - self.close_session(session) def remove_session(self, session): - # It might have been forcefully removed earlier by close_session() - if session in self.sessions: - self.subscription_count -= session.sub_count() - future = self.sessions.pop(session) - future.cancel() + group = self.sessions.pop(session) + group.remove(session) + self.subscription_count -= session.sub_count() def close_session(self, session): '''Close the session's transport and cancel its future.''' - session.transport.close() - self.sessions[session].cancel() + session.close_connection() return 'disconnected {:d}'.format(session.id_) def toggle_logging(self, session): @@ -436,6 +501,9 @@ class ServerManager(util.LoggedClass): now = time.time() if now > self.next_stale_check: self.next_stale_check = now + 60 + # Clear out empty groups + for key in [k for k, v in self.groups.items() if not v]: + del self.groups[key] cutoff = now - self.env.session_timeout stale = [session for session in self.sessions if session.last_recv < cutoff @@ -465,7 +533,10 @@ class ServerManager(util.LoggedClass): 'blocks': self.bp.db_height, 'closing': len([s for s in self.sessions if s.is_closing()]), 'errors': sum(s.error_count for s in self.sessions), + 'groups': len(self.groups), + 'logged': len([s for s in self.sessions if s.log_me]), 'peers': len(self.irc.peers), + 'requests': sum(s.requests_remaining() for s in self.sessions), 'sessions': self.session_count(), 'txs_sent': self.txs_sent, 'watched': self.subscription_count, @@ -482,34 +553,36 @@ class ServerManager(util.LoggedClass): return ('{:3d}:{:02d}:{:02d}' .format(t // 3600, (t % 3600) // 60, t % 60)) - fmt = ('{:<6} {:<3} {:>23} {:>15} {:>7} ' + fmt = ('{:<6} {:<5} {:>23} {:>15} {:>7} ' '{:>7} {:>7} {:>7} {:>7} {:>5} {:>9}') - yield fmt.format('ID', 'Flg', 'Peer', 'Client', 'Subs', - 'Recv', 'Recv KB', 'Sent', 'Sent KB', - 'Txs', 'Time') - for (id_, flags, peer, subs, client, recv_count, recv_size, - send_count, send_size, txs_sent, time) in data: + yield fmt.format('ID', 'Flags', 'Peer', 'Client', 'Subs', + 'Reqs', 'Txs', 'Recv', 'Recv KB', 'Sent', + 'Sent KB', 'Time') + for (id_, flags, peer, client, subs, reqs, txs_sent, + recv_count, recv_size, send_count, send_size, time) in data: yield fmt.format(id_, flags, peer, client, '{:,d}'.format(subs), + '{:,d}'.format(reqs), + '{:,d}'.format(txs_sent), '{:,d}'.format(recv_count), '{:,d}'.format(recv_size // 1024), '{:,d}'.format(send_count), '{:,d}'.format(send_size // 1024), - '{:,d}'.format(txs_sent), time_fmt(time)) def session_data(self, for_log): '''Returned to the RPC 'sessions' call.''' now = time.time() - sessions = sorted(self.sessions.keys(), key=lambda s: s.start) + sessions = sorted(self.sessions, key=lambda s: s.start) return [(session.id_, session.flags(), session.peername(for_log=for_log), - session.sub_count(), session.client, + session.requests_remaining(), + session.txs_sent, + session.sub_count(), session.recv_count, session.recv_size, session.send_count, session.send_size, - session.txs_sent, now - session.start) for session in sessions] @@ -543,6 +616,15 @@ class ServerManager(util.LoggedClass): async def rpc_getinfo(self, params): return self.server_summary() + async def rpc_groups(self, params): + result = {} + msg = '{:,d} sessions, {:,d} requests, {:,d}KB b/w quota used' + for group, sessions in self.groups.items(): + bandwidth = sum(s.bandwidth_used for s in sessions) + reqs = sum(s.requests_remaining() for s in sessions) + result[group] = msg.format(len(sessions), reqs, bandwidth // 1024) + return result + async def rpc_sessions(self, params): return self.session_data(for_log=False) @@ -577,7 +659,7 @@ class Session(JSONRPC): self.max_send = env.max_send self.bandwidth_limit = env.bandwidth_limit self.txs_sent = 0 - self.bucket = int(self.start - self.manager.start) // 60 + self.requests = [] def is_closing(self): '''True if this session is closing.''' @@ -590,8 +672,45 @@ class Session(JSONRPC): status += 'C' if self.log_me: status += 'L' + status += str(self.manager.session_priority(self)) return status + def requests_remaining(self): + return sum(request.remaining() for request in self.requests) + + def enqueue_request(self, request): + '''Add a request to the session's list.''' + if not self.requests: + self.manager.enqueue_session(self) + self.requests.append(request) + + async def serve_requests(self): + '''Serve requests in batches.''' + done_reqs = 0 + done_jobs = 0 + limit = 4 + for request in self.requests: + try: + done_jobs += await request.process(limit - done_jobs) + except asyncio.CancelledError: + raise + except Exception: + # Getting here should probably be considered a bug and fixed + self.log_error('error handling request {}'.format(request)) + traceback.print_exc() + done_reqs += 1 + else: + if not request.remaining(): + done_reqs += 1 + if done_jobs >= limit: + break + + # Remove completed requests and re-enqueue ourself if any remain. + if done_reqs: + self.requests = self.requests[done_reqs:] + if self.requests: + self.manager.enqueue_session(self) + def connection_made(self, transport): '''Handle an incoming client connection.''' super().connection_made(transport) @@ -615,19 +734,6 @@ class Session(JSONRPC): return await handler(params) - async def serve_requests(self): - '''Asynchronously run through the task queue.''' - while True: - request = await self.messages.get() - try: - await request.process() - except asyncio.CancelledError: - break - except Exception: - # Getting here should probably be considered a bug and fixed - self.log_error('error handling request {}'.format(request)) - traceback.print_exc() - def sub_count(self): return 0 @@ -986,7 +1092,7 @@ class ElectrumX(Session): async def version(self, params): '''Return the server version as a string.''' if params: - self.client = str(params[0]) + self.client = str(params[0])[:15] if len(params) > 1: self.protocol_version = params[1] return VERSION @@ -997,7 +1103,8 @@ class LocalRPC(Session): def __init__(self, *args): super().__init__(*args) - cmds = ('disconnect getinfo log numpeers numsessions peers sessions' + cmds = ('disconnect getinfo groups log numpeers numsessions ' + 'peers sessions' .split()) self.handlers = {cmd: getattr(self.manager, 'rpc_{}'.format(cmd)) for cmd in cmds} diff --git a/server/version.py b/server/version.py index 5cd0726..c02dade 100644 --- a/server/version.py +++ b/server/version.py @@ -1 +1 @@ -VERSION = "ElectrumX 0.8.7" +VERSION = "ElectrumX 0.8.8"