From 1a9e8cdcd41891baaa2e14b519d8cc9fbc71b99b Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Thu, 8 Dec 2016 22:45:00 +0900 Subject: [PATCH 1/3] More work --- lib/jsonrpc.py | 7 ++-- server/protocol.py | 102 ++++++++++++++++++++++++++++++++------------- 2 files changed, 76 insertions(+), 33 deletions(-) diff --git a/lib/jsonrpc.py b/lib/jsonrpc.py index 7773c96..1ed9f8d 100644 --- a/lib/jsonrpc.py +++ b/lib/jsonrpc.py @@ -67,8 +67,8 @@ class JSONRPC(asyncio.Protocol, LoggedClass): Assumes JSON messages are newline-separated and that newlines cannot appear in the JSON other than to separate lines. Incoming - messages are queued on the messages queue for later asynchronous - processing, and should be passed to the handle_request() function. + requests are passed to enqueue_request(), which should arrange for + their asynchronous handling via the request's process() method. Derived classes may want to override connection_made() and connection_lost() but should be sure to call the implementation in @@ -145,7 +145,6 @@ class JSONRPC(asyncio.Protocol, LoggedClass): self.send_size = 0 self.error_count = 0 self.peer_info = None - self.messages = asyncio.Queue() # Sends longer than max_send are prevented, instead returning # an oversized request error to other end of the network # connection. The request causing it is logged. Values under @@ -408,7 +407,7 @@ class JSONRPC(asyncio.Protocol, LoggedClass): # --- derived classes are intended to override these functions def enqueue_request(self, request): '''Enqueue a request for later asynchronous processing.''' - self.messages.put_nowait(request) + raise NotImplementedError async def handle_notification(self, method, params): '''Handle a notification.''' diff --git a/server/protocol.py b/server/protocol.py index 07a6533..740590e 100644 --- a/server/protocol.py +++ b/server/protocol.py @@ -14,6 +14,7 @@ import json import ssl import time import traceback +from bisect import bisect_left from collections import defaultdict, namedtuple from functools import partial @@ -217,6 +218,7 @@ class ServerManager(util.LoggedClass): ''' MgrTask = namedtuple('MgrTask', 'session task') + N = 5 class NotificationRequest(object): def __init__(self, fn_call): @@ -231,6 +233,7 @@ class ServerManager(util.LoggedClass): self.env = env self.servers = [] self.sessions = {} + self.groups = defaultdict(set) self.txs_sent = 0 self.next_log_sessions = 0 self.max_subs = env.max_subs @@ -238,9 +241,13 @@ class ServerManager(util.LoggedClass): self.next_stale_check = 0 self.history_cache = pylru.lrucache(256) self.header_cache = pylru.lrucache(8) + self.queue = asyncio.PriorityQueue() + self.delayed_queue = [] + self.next_request_id = 0 self.height = 0 self.futures = [] env.max_send = max(350000, env.max_send) + self.setup_bands() self.logger.info('session timeout: {:,d} seconds' .format(env.session_timeout)) self.logger.info('session bandwidth limit {:,d} bytes' @@ -266,6 +273,50 @@ class ServerManager(util.LoggedClass): ''' return self.mempool.value(hash168) + async def serve_requests(self): + '''Asynchronously run through the task queue.''' + while True: + priority_, id_, request = await self.queue.get() + try: + await request.process() + except asyncio.CancelledError: + break + except Exception: + # Getting here should probably be considered a bug and fixed + self.log_error('error handling request {}'.format(request)) + traceback.print_exc() + + def setup_bands(self): + bands = [] + limit = env.bandwidth_limit + for n in range(self.N): + bands.append(limit) + limit //= 4 + limit = env.bandwidth_limit + for n in range(self.N): + limit += limit // 2 + bands.append(limit) + self.bands = sorted(bands) + self.logger.info('bands: {}'.format(self.bands)) + + def session_priority(self, session): + if isinstance(session, LocalRPC): + return 0 + group_bandwidth = sum(s.bandwidth_used for s in self.sessions[session]) + return (bisect_left(self.bands, session.bandwidth_used) + + bisect_left(self.bands, group_bandwidth) + 1) // 2 + + def enqueue_request(self, session, request): + priority = self.session_priority(session) + item = (priority, self.next_request_id, request) + self.next_request_id += 1 + + secs = priority - self.N + if secs >= 0: + self.delayed_queue.append((time.time() + secs, item)) + else: + self.queue.put_nowait(item) + async def main_loop(self): '''Server manager main loop.''' def add_future(coro): @@ -277,6 +328,8 @@ class ServerManager(util.LoggedClass): add_future(self.mempool.main_loop(self.bp.event)) add_future(self.irc.start(self.bp.event)) add_future(self.start_servers(self.bp.event)) + for n in range(4): + add_future(self.serve_requests()) for future in asyncio.as_completed(self.futures): try: @@ -403,27 +456,24 @@ class ServerManager(util.LoggedClass): .format(len(self.sessions))) def add_session(self, session): + # Some connections are acknowledged after the servers are closed + if not self.servers: + return self.clear_stale_sessions() - coro = session.serve_requests() - future = asyncio.ensure_future(coro) - self.sessions[session] = future + group = self.groups[int(self.start - self.manager.start) // 60] + group.add(session) + self.sessions[session] = group session.log_info('connection from {}, {:,d} total' .format(session.peername(), len(self.sessions))) - # Some connections are acknowledged after the servers are closed - if not self.servers: - self.close_session(session) def remove_session(self, session): - # It might have been forcefully removed earlier by close_session() - if session in self.sessions: - self.subscription_count -= session.sub_count() - future = self.sessions.pop(session) - future.cancel() + group = self.sessions.pop(session) + group.remove(session) + self.subscription_count -= session.sub_count() def close_session(self, session): '''Close the session's transport and cancel its future.''' session.transport.close() - self.sessions[session].cancel() return 'disconnected {:d}'.format(session.id_) def toggle_logging(self, session): @@ -436,6 +486,9 @@ class ServerManager(util.LoggedClass): now = time.time() if now > self.next_stale_check: self.next_stale_check = now + 60 + # Clear out empty groups + for key in [k for k, v in self.groups.items() if not v]: + del self.groups[k] cutoff = now - self.env.session_timeout stale = [session for session in self.sessions if session.last_recv < cutoff @@ -465,6 +518,7 @@ class ServerManager(util.LoggedClass): 'blocks': self.bp.db_height, 'closing': len([s for s in self.sessions if s.is_closing()]), 'errors': sum(s.error_count for s in self.sessions), + 'logged': len([s for s in self.sessions if s.log_me]), 'peers': len(self.irc.peers), 'sessions': self.session_count(), 'txs_sent': self.txs_sent, @@ -482,9 +536,9 @@ class ServerManager(util.LoggedClass): return ('{:3d}:{:02d}:{:02d}' .format(t // 3600, (t % 3600) // 60, t % 60)) - fmt = ('{:<6} {:<3} {:>23} {:>15} {:>7} ' + fmt = ('{:<6} {:<5} {:>23} {:>15} {:>7} ' '{:>7} {:>7} {:>7} {:>7} {:>5} {:>9}') - yield fmt.format('ID', 'Flg', 'Peer', 'Client', 'Subs', + yield fmt.format('ID', 'Flags', 'Peer', 'Client', 'Subs', 'Recv', 'Recv KB', 'Sent', 'Sent KB', 'Txs', 'Time') for (id_, flags, peer, subs, client, recv_count, recv_size, @@ -501,7 +555,7 @@ class ServerManager(util.LoggedClass): def session_data(self, for_log): '''Returned to the RPC 'sessions' call.''' now = time.time() - sessions = sorted(self.sessions.keys(), key=lambda s: s.start) + sessions = sorted(self.sessions, key=lambda s: s.start) return [(session.id_, session.flags(), session.peername(for_log=for_log), @@ -577,7 +631,7 @@ class Session(JSONRPC): self.max_send = env.max_send self.bandwidth_limit = env.bandwidth_limit self.txs_sent = 0 - self.bucket = int(self.start - self.manager.start) // 60 + self.priority = 1 def is_closing(self): '''True if this session is closing.''' @@ -592,6 +646,9 @@ class Session(JSONRPC): status += 'L' return status + def enqueue_request(self, request): + self.manager.enqueue_request(self, request) + def connection_made(self, transport): '''Handle an incoming client connection.''' super().connection_made(transport) @@ -615,19 +672,6 @@ class Session(JSONRPC): return await handler(params) - async def serve_requests(self): - '''Asynchronously run through the task queue.''' - while True: - request = await self.messages.get() - try: - await request.process() - except asyncio.CancelledError: - break - except Exception: - # Getting here should probably be considered a bug and fixed - self.log_error('error handling request {}'.format(request)) - traceback.print_exc() - def sub_count(self): return 0 From 2a461bd98c46aa4fa4e2c4ae09fdca4f18a9fae5 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Fri, 9 Dec 2016 20:41:11 +0900 Subject: [PATCH 2/3] Serve sessions in a priority queue. Add new RPC command requests. Adjust sessions RPC command to show pending requests. Only keep first 15 chars in version string. Set socket timeout Try more forceful closing of socket Fixes #67 --- electrumx_rpc.py | 11 +++- lib/jsonrpc.py | 56 ++++++++++++----- server/protocol.py | 149 ++++++++++++++++++++++++++++++++------------- 3 files changed, 155 insertions(+), 61 deletions(-) diff --git a/electrumx_rpc.py b/electrumx_rpc.py index 8de320d..1a6e0e2 100755 --- a/electrumx_rpc.py +++ b/electrumx_rpc.py @@ -22,13 +22,20 @@ from server.protocol import ServerManager class RPCClient(JSONRPC): + def __init__(self): + super().__init__() + self.queue = asyncio.Queue() + + def enqueue_request(self, request): + self.queue.put_nowait(request) + async def send_and_wait(self, method, params, timeout=None): # Raise incoming buffer size - presumably connection is trusted self.max_buffer_size = 5000000 payload = self.request_payload(method, id_=method, params=params) self.encode_and_send_payload(payload) - future = asyncio.ensure_future(self.messages.get()) + future = asyncio.ensure_future(self.queue.get()) for f in asyncio.as_completed([future], timeout=timeout): try: request = await f @@ -36,7 +43,7 @@ class RPCClient(JSONRPC): future.cancel() print('request timed out after {}s'.format(timeout)) else: - await request.process() + await request.process(1) async def handle_response(self, result, error, method): if result and method == 'sessions': diff --git a/lib/jsonrpc.py b/lib/jsonrpc.py index 1ed9f8d..4f30282 100644 --- a/lib/jsonrpc.py +++ b/lib/jsonrpc.py @@ -10,6 +10,7 @@ import asyncio import json import numbers +import socket import time from lib.util import LoggedClass @@ -20,46 +21,61 @@ class SingleRequest(object): def __init__(self, session, payload): self.payload = payload self.session = session + self.count = 1 - async def process(self): + def remaining(self): + return self.count + + async def process(self, limit): '''Asynchronously handle the JSON request.''' binary = await self.session.process_single_payload(self.payload) if binary: self.session._send_bytes(binary) + self.count = 0 + return 1 + + def __str__(self): + return str(self.payload) class BatchRequest(object): '''An object that represents a batch request and its processing state. - Batches are processed in parts chunks. + Batches are processed in chunks. ''' - CUHNK_SIZE = 3 - def __init__(self, session, payload): self.session = session self.payload = payload self.done = 0 self.parts = [] - async def process(self): + def remaining(self): + return len(self.payload) - self.done + + async def process(self, limit): '''Asynchronously handle the JSON batch according to the JSON 2.0 spec.''' - for n in range(self.CHUNK_SIZE): - if self.done >= len(self.payload): - if self.parts: - binary = b'[' + b', '.join(self.parts) + b']' - self.session._send_bytes(binary) - return + count = min(limit, self.remaining()) + for n in range(count): item = self.payload[self.done] part = await self.session.process_single_payload(item) if part: self.parts.append(part) self.done += 1 - # Re-enqueue to continue the rest later - self.session.enqueue_request(self) - return b'' + total_len = sum(len(part) + 2 for part in self.parts) + self.session.check_oversized_request(total_len) + + if not self.remaining(): + if self.parts: + binary = b'[' + b', '.join(self.parts) + b']' + self.session._send_bytes(binary) + + return count + + def __str__(self): + return str(self.payload) class JSONRPC(asyncio.Protocol, LoggedClass): @@ -135,6 +151,7 @@ class JSONRPC(asyncio.Protocol, LoggedClass): self.bandwidth_used = 0 self.bandwidth_limit = 5000000 self.transport = None + self.socket = None # Parts of an incomplete JSON line. We buffer them until # getting a newline. self.parts = [] @@ -170,11 +187,18 @@ class JSONRPC(asyncio.Protocol, LoggedClass): '''Handle an incoming client connection.''' self.transport = transport self.peer_info = transport.get_extra_info('peername') + self.socket = transport.get_extra_info('socket') + self.socket.settimeout(10) def connection_lost(self, exc): '''Handle client disconnection.''' pass + def close_connection(self): + if self.transport: + self.transport.close() + self.socket.shutdown(socket.SHUT_RDWR) + def using_bandwidth(self, amount): now = time.time() # Reduce the recorded usage in proportion to the elapsed time @@ -200,7 +224,7 @@ class JSONRPC(asyncio.Protocol, LoggedClass): 'byte limit, closing {}' .format(buffer_size, self.max_buffer_size, self.peername())) - self.transport.close() + self.close_connection() # Do nothing if this connection is closing if self.transport.is_closing(): @@ -274,7 +298,7 @@ class JSONRPC(asyncio.Protocol, LoggedClass): self.transport.write(binary) self.transport.write(b'\n') if close or self.error_count > 10: - self.transport.close() + self.close_connection() def send_json_error(self, message, code, id_=None, close=False): '''Send a JSON error and close the connection by default.''' diff --git a/server/protocol.py b/server/protocol.py index 740590e..1a9283f 100644 --- a/server/protocol.py +++ b/server/protocol.py @@ -67,8 +67,6 @@ class MemPool(util.LoggedClass): await asyncio.sleep(5) except DaemonError as e: self.logger.info('ignoring daemon error: {}'.format(e)) - except asyncio.CancelledError: - break async def update(self): '''Update state given the current mempool to the passed set of hashes. @@ -217,12 +215,18 @@ class ServerManager(util.LoggedClass): up with the daemon. ''' - MgrTask = namedtuple('MgrTask', 'session task') - N = 5 + BANDS = 5 class NotificationRequest(object): def __init__(self, fn_call): - self.process = fn_call + self.fn_call = fn_call + + def remaining(self): + return 0 + + async def process(self, limit): + await self.fn_call() + return 0 def __init__(self, env): super().__init__() @@ -242,8 +246,8 @@ class ServerManager(util.LoggedClass): self.history_cache = pylru.lrucache(256) self.header_cache = pylru.lrucache(8) self.queue = asyncio.PriorityQueue() - self.delayed_queue = [] - self.next_request_id = 0 + self.delayed_sessions = [] + self.next_queue_id = 0 self.height = 0 self.futures = [] env.max_send = max(350000, env.max_send) @@ -273,27 +277,14 @@ class ServerManager(util.LoggedClass): ''' return self.mempool.value(hash168) - async def serve_requests(self): - '''Asynchronously run through the task queue.''' - while True: - priority_, id_, request = await self.queue.get() - try: - await request.process() - except asyncio.CancelledError: - break - except Exception: - # Getting here should probably be considered a bug and fixed - self.log_error('error handling request {}'.format(request)) - traceback.print_exc() - def setup_bands(self): bands = [] - limit = env.bandwidth_limit - for n in range(self.N): + limit = self.env.bandwidth_limit + for n in range(self.BANDS): bands.append(limit) limit //= 4 - limit = env.bandwidth_limit - for n in range(self.N): + limit = self.env.bandwidth_limit + for n in range(self.BANDS): limit += limit // 2 bands.append(limit) self.bands = sorted(bands) @@ -306,17 +297,40 @@ class ServerManager(util.LoggedClass): return (bisect_left(self.bands, session.bandwidth_used) + bisect_left(self.bands, group_bandwidth) + 1) // 2 - def enqueue_request(self, session, request): + async def enqueue_delayed_sessions(self): + now = time.time() + keep = [] + for pair in self.delayed_sessions: + timeout, session = pair + if timeout <= now: + self.queue.put_nowait(session) + else: + keep.append(pair) + self.delayed_sessions = keep + await asyncio.sleep(1) + + def enqueue_session(self, session): + # Might have disconnected whilst waiting + if not session in self.sessions: + return priority = self.session_priority(session) - item = (priority, self.next_request_id, request) - self.next_request_id += 1 + item = (priority, self.next_queue_id, session) + self.next_queue_id += 1 - secs = priority - self.N + secs = priority - self.BANDS if secs >= 0: - self.delayed_queue.append((time.time() + secs, item)) + session.log_info('delaying response {:d}s'.format(secs)) + self.delayed_sessions.append((time.time() + secs, item)) else: self.queue.put_nowait(item) + async def serve_requests(self): + '''Asynchronously run through the task queue.''' + while True: + priority_, id_, session = await self.queue.get() + if session in self.sessions: + await session.serve_requests() + async def main_loop(self): '''Server manager main loop.''' def add_future(coro): @@ -328,6 +342,7 @@ class ServerManager(util.LoggedClass): add_future(self.mempool.main_loop(self.bp.event)) add_future(self.irc.start(self.bp.event)) add_future(self.start_servers(self.bp.event)) + add_future(self.enqueue_delayed_sessions()) for n in range(4): add_future(self.serve_requests()) @@ -460,7 +475,7 @@ class ServerManager(util.LoggedClass): if not self.servers: return self.clear_stale_sessions() - group = self.groups[int(self.start - self.manager.start) // 60] + group = self.groups[int(session.start - self.start) // 60] group.add(session) self.sessions[session] = group session.log_info('connection from {}, {:,d} total' @@ -473,7 +488,7 @@ class ServerManager(util.LoggedClass): def close_session(self, session): '''Close the session's transport and cancel its future.''' - session.transport.close() + session.close_connection() return 'disconnected {:d}'.format(session.id_) def toggle_logging(self, session): @@ -488,7 +503,7 @@ class ServerManager(util.LoggedClass): self.next_stale_check = now + 60 # Clear out empty groups for key in [k for k, v in self.groups.items() if not v]: - del self.groups[k] + del self.groups[key] cutoff = now - self.env.session_timeout stale = [session for session in self.sessions if session.last_recv < cutoff @@ -518,8 +533,10 @@ class ServerManager(util.LoggedClass): 'blocks': self.bp.db_height, 'closing': len([s for s in self.sessions if s.is_closing()]), 'errors': sum(s.error_count for s in self.sessions), + 'groups': len(self.groups), 'logged': len([s for s in self.sessions if s.log_me]), 'peers': len(self.irc.peers), + 'requests': sum(s.requests_remaining() for s in self.sessions), 'sessions': self.session_count(), 'txs_sent': self.txs_sent, 'watched': self.subscription_count, @@ -539,17 +556,18 @@ class ServerManager(util.LoggedClass): fmt = ('{:<6} {:<5} {:>23} {:>15} {:>7} ' '{:>7} {:>7} {:>7} {:>7} {:>5} {:>9}') yield fmt.format('ID', 'Flags', 'Peer', 'Client', 'Subs', - 'Recv', 'Recv KB', 'Sent', 'Sent KB', - 'Txs', 'Time') - for (id_, flags, peer, subs, client, recv_count, recv_size, - send_count, send_size, txs_sent, time) in data: + 'Reqs', 'Txs', 'Recv', 'Recv KB', 'Sent', + 'Sent KB', 'Time') + for (id_, flags, peer, client, subs, reqs, txs_sent, + recv_count, recv_size, send_count, send_size, time) in data: yield fmt.format(id_, flags, peer, client, '{:,d}'.format(subs), + '{:,d}'.format(reqs), + '{:,d}'.format(txs_sent), '{:,d}'.format(recv_count), '{:,d}'.format(recv_size // 1024), '{:,d}'.format(send_count), '{:,d}'.format(send_size // 1024), - '{:,d}'.format(txs_sent), time_fmt(time)) def session_data(self, for_log): @@ -559,11 +577,12 @@ class ServerManager(util.LoggedClass): return [(session.id_, session.flags(), session.peername(for_log=for_log), - session.sub_count(), session.client, + session.requests_remaining(), + session.txs_sent, + session.sub_count(), session.recv_count, session.recv_size, session.send_count, session.send_size, - session.txs_sent, now - session.start) for session in sessions] @@ -597,6 +616,15 @@ class ServerManager(util.LoggedClass): async def rpc_getinfo(self, params): return self.server_summary() + async def rpc_groups(self, params): + result = {} + msg = '{:,d} sessions, {:,d} requests, {:,d}KB b/w quota used' + for group, sessions in self.groups.items(): + bandwidth = sum(s.bandwidth_used for s in sessions) + reqs = sum(s.requests_remaining() for s in sessions) + result[group] = msg.format(len(sessions), reqs, bandwidth // 1024) + return result + async def rpc_sessions(self, params): return self.session_data(for_log=False) @@ -631,7 +659,7 @@ class Session(JSONRPC): self.max_send = env.max_send self.bandwidth_limit = env.bandwidth_limit self.txs_sent = 0 - self.priority = 1 + self.requests = [] def is_closing(self): '''True if this session is closing.''' @@ -644,10 +672,44 @@ class Session(JSONRPC): status += 'C' if self.log_me: status += 'L' + status += str(self.manager.session_priority(self)) return status + def requests_remaining(self): + return sum(request.remaining() for request in self.requests) + def enqueue_request(self, request): - self.manager.enqueue_request(self, request) + '''Add a request to the session's list.''' + if not self.requests: + self.manager.enqueue_session(self) + self.requests.append(request) + + async def serve_requests(self): + '''Serve requests in batches.''' + done_reqs = 0 + done_jobs = 0 + limit = 4 + for request in self.requests: + try: + done_jobs += await request.process(limit - done_jobs) + except asyncio.CancelledError: + raise + except Exception: + # Getting here should probably be considered a bug and fixed + self.log_error('error handling request {}'.format(request)) + traceback.print_exc() + done_reqs += 1 + else: + if not request.remaining(): + done_reqs += 1 + if done_jobs >= limit: + break + + # Remove completed requests and re-enqueue ourself if any remain. + if done_reqs: + self.requests = self.requests[done_reqs:] + if self.requests: + self.manager.enqueue_session(self) def connection_made(self, transport): '''Handle an incoming client connection.''' @@ -1030,7 +1092,7 @@ class ElectrumX(Session): async def version(self, params): '''Return the server version as a string.''' if params: - self.client = str(params[0]) + self.client = str(params[0])[:15] if len(params) > 1: self.protocol_version = params[1] return VERSION @@ -1041,7 +1103,8 @@ class LocalRPC(Session): def __init__(self, *args): super().__init__(*args) - cmds = ('disconnect getinfo log numpeers numsessions peers sessions' + cmds = ('disconnect getinfo groups log numpeers numsessions ' + 'peers sessions' .split()) self.handlers = {cmd: getattr(self.manager, 'rpc_{}'.format(cmd)) for cmd in cmds} From 5f73fa02a3250fdd45afe3ec942b8f400b25faf8 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Fri, 9 Dec 2016 22:05:43 +0900 Subject: [PATCH 3/3] Prepare 0.8.8 --- RELEASE-NOTES | 12 ++++++++++++ server/version.py | 2 +- 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/RELEASE-NOTES b/RELEASE-NOTES index fceca12..8f532cb 100644 --- a/RELEASE-NOTES +++ b/RELEASE-NOTES @@ -1,3 +1,15 @@ +version 0.8.8 +------------- + +- put sessions in a priority queue to better prioritise serving. Low-bandwidth + sessions get served first +- new RPC command "groups" - shows information about session groups +- sessions output: session priority shown under Flags column; the lower the + number the higher the priority. txs column moved, new column reqs showns + the number of outstanding requests for that connection (includes subrequests + of batches) +- issued fixed: #67 + version 0.8.7 ------------- diff --git a/server/version.py b/server/version.py index 5cd0726..c02dade 100644 --- a/server/version.py +++ b/server/version.py @@ -1 +1 @@ -VERSION = "ElectrumX 0.8.7" +VERSION = "ElectrumX 0.8.8"