diff --git a/electrumx_rpc.py b/electrumx_rpc.py index 8de320d..1a6e0e2 100755 --- a/electrumx_rpc.py +++ b/electrumx_rpc.py @@ -22,13 +22,20 @@ from server.protocol import ServerManager class RPCClient(JSONRPC): + def __init__(self): + super().__init__() + self.queue = asyncio.Queue() + + def enqueue_request(self, request): + self.queue.put_nowait(request) + async def send_and_wait(self, method, params, timeout=None): # Raise incoming buffer size - presumably connection is trusted self.max_buffer_size = 5000000 payload = self.request_payload(method, id_=method, params=params) self.encode_and_send_payload(payload) - future = asyncio.ensure_future(self.messages.get()) + future = asyncio.ensure_future(self.queue.get()) for f in asyncio.as_completed([future], timeout=timeout): try: request = await f @@ -36,7 +43,7 @@ class RPCClient(JSONRPC): future.cancel() print('request timed out after {}s'.format(timeout)) else: - await request.process() + await request.process(1) async def handle_response(self, result, error, method): if result and method == 'sessions': diff --git a/lib/jsonrpc.py b/lib/jsonrpc.py index 1ed9f8d..4f30282 100644 --- a/lib/jsonrpc.py +++ b/lib/jsonrpc.py @@ -10,6 +10,7 @@ import asyncio import json import numbers +import socket import time from lib.util import LoggedClass @@ -20,46 +21,61 @@ class SingleRequest(object): def __init__(self, session, payload): self.payload = payload self.session = session + self.count = 1 - async def process(self): + def remaining(self): + return self.count + + async def process(self, limit): '''Asynchronously handle the JSON request.''' binary = await self.session.process_single_payload(self.payload) if binary: self.session._send_bytes(binary) + self.count = 0 + return 1 + + def __str__(self): + return str(self.payload) class BatchRequest(object): '''An object that represents a batch request and its processing state. - Batches are processed in parts chunks. + Batches are processed in chunks. ''' - CUHNK_SIZE = 3 - def __init__(self, session, payload): self.session = session self.payload = payload self.done = 0 self.parts = [] - async def process(self): + def remaining(self): + return len(self.payload) - self.done + + async def process(self, limit): '''Asynchronously handle the JSON batch according to the JSON 2.0 spec.''' - for n in range(self.CHUNK_SIZE): - if self.done >= len(self.payload): - if self.parts: - binary = b'[' + b', '.join(self.parts) + b']' - self.session._send_bytes(binary) - return + count = min(limit, self.remaining()) + for n in range(count): item = self.payload[self.done] part = await self.session.process_single_payload(item) if part: self.parts.append(part) self.done += 1 - # Re-enqueue to continue the rest later - self.session.enqueue_request(self) - return b'' + total_len = sum(len(part) + 2 for part in self.parts) + self.session.check_oversized_request(total_len) + + if not self.remaining(): + if self.parts: + binary = b'[' + b', '.join(self.parts) + b']' + self.session._send_bytes(binary) + + return count + + def __str__(self): + return str(self.payload) class JSONRPC(asyncio.Protocol, LoggedClass): @@ -135,6 +151,7 @@ class JSONRPC(asyncio.Protocol, LoggedClass): self.bandwidth_used = 0 self.bandwidth_limit = 5000000 self.transport = None + self.socket = None # Parts of an incomplete JSON line. We buffer them until # getting a newline. self.parts = [] @@ -170,11 +187,18 @@ class JSONRPC(asyncio.Protocol, LoggedClass): '''Handle an incoming client connection.''' self.transport = transport self.peer_info = transport.get_extra_info('peername') + self.socket = transport.get_extra_info('socket') + self.socket.settimeout(10) def connection_lost(self, exc): '''Handle client disconnection.''' pass + def close_connection(self): + if self.transport: + self.transport.close() + self.socket.shutdown(socket.SHUT_RDWR) + def using_bandwidth(self, amount): now = time.time() # Reduce the recorded usage in proportion to the elapsed time @@ -200,7 +224,7 @@ class JSONRPC(asyncio.Protocol, LoggedClass): 'byte limit, closing {}' .format(buffer_size, self.max_buffer_size, self.peername())) - self.transport.close() + self.close_connection() # Do nothing if this connection is closing if self.transport.is_closing(): @@ -274,7 +298,7 @@ class JSONRPC(asyncio.Protocol, LoggedClass): self.transport.write(binary) self.transport.write(b'\n') if close or self.error_count > 10: - self.transport.close() + self.close_connection() def send_json_error(self, message, code, id_=None, close=False): '''Send a JSON error and close the connection by default.''' diff --git a/server/protocol.py b/server/protocol.py index 740590e..1a9283f 100644 --- a/server/protocol.py +++ b/server/protocol.py @@ -67,8 +67,6 @@ class MemPool(util.LoggedClass): await asyncio.sleep(5) except DaemonError as e: self.logger.info('ignoring daemon error: {}'.format(e)) - except asyncio.CancelledError: - break async def update(self): '''Update state given the current mempool to the passed set of hashes. @@ -217,12 +215,18 @@ class ServerManager(util.LoggedClass): up with the daemon. ''' - MgrTask = namedtuple('MgrTask', 'session task') - N = 5 + BANDS = 5 class NotificationRequest(object): def __init__(self, fn_call): - self.process = fn_call + self.fn_call = fn_call + + def remaining(self): + return 0 + + async def process(self, limit): + await self.fn_call() + return 0 def __init__(self, env): super().__init__() @@ -242,8 +246,8 @@ class ServerManager(util.LoggedClass): self.history_cache = pylru.lrucache(256) self.header_cache = pylru.lrucache(8) self.queue = asyncio.PriorityQueue() - self.delayed_queue = [] - self.next_request_id = 0 + self.delayed_sessions = [] + self.next_queue_id = 0 self.height = 0 self.futures = [] env.max_send = max(350000, env.max_send) @@ -273,27 +277,14 @@ class ServerManager(util.LoggedClass): ''' return self.mempool.value(hash168) - async def serve_requests(self): - '''Asynchronously run through the task queue.''' - while True: - priority_, id_, request = await self.queue.get() - try: - await request.process() - except asyncio.CancelledError: - break - except Exception: - # Getting here should probably be considered a bug and fixed - self.log_error('error handling request {}'.format(request)) - traceback.print_exc() - def setup_bands(self): bands = [] - limit = env.bandwidth_limit - for n in range(self.N): + limit = self.env.bandwidth_limit + for n in range(self.BANDS): bands.append(limit) limit //= 4 - limit = env.bandwidth_limit - for n in range(self.N): + limit = self.env.bandwidth_limit + for n in range(self.BANDS): limit += limit // 2 bands.append(limit) self.bands = sorted(bands) @@ -306,17 +297,40 @@ class ServerManager(util.LoggedClass): return (bisect_left(self.bands, session.bandwidth_used) + bisect_left(self.bands, group_bandwidth) + 1) // 2 - def enqueue_request(self, session, request): + async def enqueue_delayed_sessions(self): + now = time.time() + keep = [] + for pair in self.delayed_sessions: + timeout, session = pair + if timeout <= now: + self.queue.put_nowait(session) + else: + keep.append(pair) + self.delayed_sessions = keep + await asyncio.sleep(1) + + def enqueue_session(self, session): + # Might have disconnected whilst waiting + if not session in self.sessions: + return priority = self.session_priority(session) - item = (priority, self.next_request_id, request) - self.next_request_id += 1 + item = (priority, self.next_queue_id, session) + self.next_queue_id += 1 - secs = priority - self.N + secs = priority - self.BANDS if secs >= 0: - self.delayed_queue.append((time.time() + secs, item)) + session.log_info('delaying response {:d}s'.format(secs)) + self.delayed_sessions.append((time.time() + secs, item)) else: self.queue.put_nowait(item) + async def serve_requests(self): + '''Asynchronously run through the task queue.''' + while True: + priority_, id_, session = await self.queue.get() + if session in self.sessions: + await session.serve_requests() + async def main_loop(self): '''Server manager main loop.''' def add_future(coro): @@ -328,6 +342,7 @@ class ServerManager(util.LoggedClass): add_future(self.mempool.main_loop(self.bp.event)) add_future(self.irc.start(self.bp.event)) add_future(self.start_servers(self.bp.event)) + add_future(self.enqueue_delayed_sessions()) for n in range(4): add_future(self.serve_requests()) @@ -460,7 +475,7 @@ class ServerManager(util.LoggedClass): if not self.servers: return self.clear_stale_sessions() - group = self.groups[int(self.start - self.manager.start) // 60] + group = self.groups[int(session.start - self.start) // 60] group.add(session) self.sessions[session] = group session.log_info('connection from {}, {:,d} total' @@ -473,7 +488,7 @@ class ServerManager(util.LoggedClass): def close_session(self, session): '''Close the session's transport and cancel its future.''' - session.transport.close() + session.close_connection() return 'disconnected {:d}'.format(session.id_) def toggle_logging(self, session): @@ -488,7 +503,7 @@ class ServerManager(util.LoggedClass): self.next_stale_check = now + 60 # Clear out empty groups for key in [k for k, v in self.groups.items() if not v]: - del self.groups[k] + del self.groups[key] cutoff = now - self.env.session_timeout stale = [session for session in self.sessions if session.last_recv < cutoff @@ -518,8 +533,10 @@ class ServerManager(util.LoggedClass): 'blocks': self.bp.db_height, 'closing': len([s for s in self.sessions if s.is_closing()]), 'errors': sum(s.error_count for s in self.sessions), + 'groups': len(self.groups), 'logged': len([s for s in self.sessions if s.log_me]), 'peers': len(self.irc.peers), + 'requests': sum(s.requests_remaining() for s in self.sessions), 'sessions': self.session_count(), 'txs_sent': self.txs_sent, 'watched': self.subscription_count, @@ -539,17 +556,18 @@ class ServerManager(util.LoggedClass): fmt = ('{:<6} {:<5} {:>23} {:>15} {:>7} ' '{:>7} {:>7} {:>7} {:>7} {:>5} {:>9}') yield fmt.format('ID', 'Flags', 'Peer', 'Client', 'Subs', - 'Recv', 'Recv KB', 'Sent', 'Sent KB', - 'Txs', 'Time') - for (id_, flags, peer, subs, client, recv_count, recv_size, - send_count, send_size, txs_sent, time) in data: + 'Reqs', 'Txs', 'Recv', 'Recv KB', 'Sent', + 'Sent KB', 'Time') + for (id_, flags, peer, client, subs, reqs, txs_sent, + recv_count, recv_size, send_count, send_size, time) in data: yield fmt.format(id_, flags, peer, client, '{:,d}'.format(subs), + '{:,d}'.format(reqs), + '{:,d}'.format(txs_sent), '{:,d}'.format(recv_count), '{:,d}'.format(recv_size // 1024), '{:,d}'.format(send_count), '{:,d}'.format(send_size // 1024), - '{:,d}'.format(txs_sent), time_fmt(time)) def session_data(self, for_log): @@ -559,11 +577,12 @@ class ServerManager(util.LoggedClass): return [(session.id_, session.flags(), session.peername(for_log=for_log), - session.sub_count(), session.client, + session.requests_remaining(), + session.txs_sent, + session.sub_count(), session.recv_count, session.recv_size, session.send_count, session.send_size, - session.txs_sent, now - session.start) for session in sessions] @@ -597,6 +616,15 @@ class ServerManager(util.LoggedClass): async def rpc_getinfo(self, params): return self.server_summary() + async def rpc_groups(self, params): + result = {} + msg = '{:,d} sessions, {:,d} requests, {:,d}KB b/w quota used' + for group, sessions in self.groups.items(): + bandwidth = sum(s.bandwidth_used for s in sessions) + reqs = sum(s.requests_remaining() for s in sessions) + result[group] = msg.format(len(sessions), reqs, bandwidth // 1024) + return result + async def rpc_sessions(self, params): return self.session_data(for_log=False) @@ -631,7 +659,7 @@ class Session(JSONRPC): self.max_send = env.max_send self.bandwidth_limit = env.bandwidth_limit self.txs_sent = 0 - self.priority = 1 + self.requests = [] def is_closing(self): '''True if this session is closing.''' @@ -644,10 +672,44 @@ class Session(JSONRPC): status += 'C' if self.log_me: status += 'L' + status += str(self.manager.session_priority(self)) return status + def requests_remaining(self): + return sum(request.remaining() for request in self.requests) + def enqueue_request(self, request): - self.manager.enqueue_request(self, request) + '''Add a request to the session's list.''' + if not self.requests: + self.manager.enqueue_session(self) + self.requests.append(request) + + async def serve_requests(self): + '''Serve requests in batches.''' + done_reqs = 0 + done_jobs = 0 + limit = 4 + for request in self.requests: + try: + done_jobs += await request.process(limit - done_jobs) + except asyncio.CancelledError: + raise + except Exception: + # Getting here should probably be considered a bug and fixed + self.log_error('error handling request {}'.format(request)) + traceback.print_exc() + done_reqs += 1 + else: + if not request.remaining(): + done_reqs += 1 + if done_jobs >= limit: + break + + # Remove completed requests and re-enqueue ourself if any remain. + if done_reqs: + self.requests = self.requests[done_reqs:] + if self.requests: + self.manager.enqueue_session(self) def connection_made(self, transport): '''Handle an incoming client connection.''' @@ -1030,7 +1092,7 @@ class ElectrumX(Session): async def version(self, params): '''Return the server version as a string.''' if params: - self.client = str(params[0]) + self.client = str(params[0])[:15] if len(params) > 1: self.protocol_version = params[1] return VERSION @@ -1041,7 +1103,8 @@ class LocalRPC(Session): def __init__(self, *args): super().__init__(*args) - cmds = ('disconnect getinfo log numpeers numsessions peers sessions' + cmds = ('disconnect getinfo groups log numpeers numsessions ' + 'peers sessions' .split()) self.handlers = {cmd: getattr(self.manager, 'rpc_{}'.format(cmd)) for cmd in cmds}