From 3b6ab77e4725b0fca6f5ffa7140724f5169ee761 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Sun, 13 Nov 2016 13:39:37 +0900 Subject: [PATCH 1/3] Break out JSONRPC into own file - improved handling of JSON 2.0 RPC protocol - permits batched requests (not yet supported by Electrum client) --- electrumx_rpc.py | 2 +- lib/jsonrpc.py | 249 +++++++++++++++++++++++++++++++++++++++++++++ server/protocol.py | 149 ++++++++------------------- 3 files changed, 291 insertions(+), 109 deletions(-) create mode 100644 lib/jsonrpc.py diff --git a/electrumx_rpc.py b/electrumx_rpc.py index 9d3a9c2..0bc7a0e 100755 --- a/electrumx_rpc.py +++ b/electrumx_rpc.py @@ -32,7 +32,7 @@ class RPCClient(asyncio.Protocol): def send(self, method, params): self.method = method - payload = {'method': method, 'params': params} + payload = {'method': method, 'params': params, 'id': 'RPC'} data = json.dumps(payload) + '\n' self.transport.write(data.encode()) diff --git a/lib/jsonrpc.py b/lib/jsonrpc.py new file mode 100644 index 0000000..d334d4f --- /dev/null +++ b/lib/jsonrpc.py @@ -0,0 +1,249 @@ +# Copyright (c) 2016, Neil Booth +# +# All rights reserved. +# +# See the file "LICENCE" for information about the copyright +# and warranty status of this software. + +'''Class for handling JSON RPC 2.0 connections, server or client.''' + +import asyncio +import json +import numbers +import time + +from lib.util import LoggedClass + + +def json_result_payload(result, id_): + # We should not respond to notifications + assert id_ is not None + return {'jsonrpc': '2.0', 'error': None, 'result': result, 'id': id_} + +def json_error_payload(message, code, id_=None): + error = {'message': message, 'code': code} + return {'jsonrpc': '2.0', 'error': error, 'result': None, 'id': id_} + +def json_notification_payload(method, params): + return {'jsonrpc': '2.0', 'id': None, 'method': method, 'params': params} + + +class JSONRPC(asyncio.Protocol, LoggedClass): + '''Manages a JSONRPC connection. + + Assumes JSON messages are newline-separated and that newlines + cannot appear in the JSON other than to separate lines. + + Derived classes need to implement the synchronous functions + on_json_request() and method_handler(). They probably also want + to override connection_made() and connection_lost() but should be + sure to call the implementation in this base class first. + + on_json_request() is passed a JSON request as a python object + after decoding. It should arrange to pass on to the asynchronous + handle_json_request() method. + + method_handler() takes a method string and should return a function + that can be passed a parameters array, or None for an unknown method. + + Handlers should raise an RPCError on error. + ''' + + # See http://www.jsonrpc.org/specification + PARSE_ERROR = -32700 + INVALID_REQUEST = -32600 + METHOD_NOT_FOUND = -32601 + INVALID_PARAMS = -32602 + INTERAL_ERROR = -32603 + + ID_TYPES = (type(None), str, numbers.Number) + + class RPCError(Exception): + '''RPC handlers raise this error.''' + def __init__(self, msg, code=-1, **kw_args): + super().__init__(**kw_args) + self.msg = msg + self.code + + + def __init__(self): + super().__init__() + self.start = time.time() + self.transport = None + # Parts of an incomplete JSON line. We buffer them until + # getting a newline. + self.parts = [] + # recv_count is JSON messages not calls to data_received() + self.recv_count = 0 + self.recv_size = 0 + self.send_count = 0 + self.send_size = 0 + self.error_count = 0 + + def connection_made(self, transport): + '''Handle an incoming client connection.''' + self.transport = transport + + def peer_info(self): + '''Return peer info.''' + if self.transport: + return self.transport.get_extra_info('peername') + return None + + def connection_lost(self, exc): + '''Handle client disconnection.''' + pass + + def data_received(self, data): + '''Handle incoming data (synchronously). + + Requests end in newline characters. Pass complete requests to + decode_message for handling. + ''' + self.recv_size += len(data) + while True: + npos = data.find(ord('\n')) + if npos == -1: + self.parts.append(data) + break + self.recv_count += 1 + tail, data = data[:npos], data[npos + 1:] + parts, self.parts = self.parts, [] + parts.append(tail) + self.decode_message(b''.join(parts)) + + def decode_message(self, message): + '''Decode a binary message and queue it for asynchronous handling. + + Messages that cannot be decoded are logged and dropped. + ''' + try: + message = message.decode() + except UnicodeDecodeError as e: + msg = 'cannot decode binary bytes: {}'.format(e) + self.logger.warning(msg) + self.send_json_error(msg, self.PARSE_ERROR) + return + + try: + message = json.loads(message) + except json.JSONDecodeError as e: + msg = 'cannot decode JSON: {}'.format(e) + self.logger.warning(msg) + self.send_json_error(msg, self.PARSE_ERROR) + return + + self.on_json_request(message) + + def send_json_notification(self, method, params): + '''Create a json notification.''' + return self.send_json(json_notification_payload(method, params)) + + def send_json_result(self, result, id_): + '''Send a JSON result.''' + return self.send_json(json_result_payload(result, id_)) + + def send_json_error(self, message, code, id_=None): + '''Send a JSON error.''' + self.error_count += 1 + return self.send_json(json_error_payload(message, code, id_)) + + def send_json(self, payload): + '''Send a JSON payload.''' + if self.transport.is_closing(): + self.logger.info('send_json: connection closing, not sending') + return False + + try: + data = (json.dumps(payload) + '\n').encode() + except TypeError: + msg = 'JSON encoding failure: {}'.format(payload) + self.logger.error(msg) + return self.send_json_error(msg, self.INTERNAL_ERROR, + payload.get('id')) + + self.send_count += 1 + self.send_size += len(data) + self.transport.write(data) + return True + + async def handle_json_request(self, request): + '''Asynchronously handle a JSON request. + + Handles batch requests. Returns True if the request response + was sent (or if nothing was sent because the request was a + notification). Returns False if the send was aborted because + the connection is closing. + ''' + if isinstance(request, list): + payload = self.batch_request_payload(request) + else: + payload = await self.single_request_payload(request) + + if not payload: + return True + return self.send_json(payload) + + async def batch_request_payload(self, batch): + '''Return the JSON payload corresponding to a batch JSON request.''' + # Batches must have at least one request. + if not batch: + return json_error_payload('empty request list', + self.INVALID_REQUEST) + + # PYTHON 3.6: use asynchronous comprehensions when supported + payload = [] + for item in request: + item_payload = await self.single_request_payload(item) + if item_payload: + payload.append(item_payload) + return payload + + async def single_request_payload(self, request): + '''Return the JSON payload corresponding to a single JSON request. + + Return None if the request is a notification. + ''' + if not isinstance(request, dict): + return json_error_payload('request must be a dict', + self.INVALID_REQUEST) + + id_ = request.get('id') + if not isinstance(id_, self.ID_TYPES): + return json_error_payload('invalid id: {}'.format(id_), + self.INVALID_REQUEST) + + try: + result = await self.method_result(request.get('method'), + request.get('params', [])) + if id_ is None: + return None + return json_result_payload(result, id_) + except self.RPCError as e: + if id_ is None: + return None + return json_error_payload(e.msg, e.code, id_) + + async def method_result(self, method, params): + if not isinstance(method, str): + raise self.RPCError('invalid method: {}'.format(method), + self.INVALID_REQUEST) + + if not isinstance(params, list): + raise self.RPCError('params should be an array', + self.INVALID_REQUEST) + + handler = self.method_handler(method) + if not handler: + raise self.RPCError('unknown method: {}'.format(method), + self.METHOD_NOT_FOUND) + + return await handler(params) + + def on_json_request(self, request): + raise NotImplementedError('on_json_request in class {}'. + format(self.__class__.__name__)) + + def method_handler(self, method): + raise NotImplementedError('method_handler in class {}'. + format(self.__class__.__name__)) diff --git a/server/protocol.py b/server/protocol.py index ab3899d..f3b6e2a 100644 --- a/server/protocol.py +++ b/server/protocol.py @@ -18,6 +18,7 @@ from collections import namedtuple from functools import partial from lib.hash import sha256, double_sha256, hash_to_str, hex_str_to_hash +from lib.jsonrpc import JSONRPC, json_notification_payload from lib.util import LoggedClass from server.block_processor import BlockProcessor from server.daemon import DaemonError @@ -25,15 +26,6 @@ from server.irc import IRC from server.version import VERSION -class RPCError(Exception): - '''RPC handlers raise this error.''' - - -def json_notification(method, params): - '''Create a json notification.''' - return {'id': None, 'method': method, 'params': params} - - class BlockServer(BlockProcessor): '''Like BlockProcessor but also starts servers when caught up.''' @@ -74,7 +66,7 @@ class BlockServer(BlockProcessor): Does not start a server if the port wasn't specified. ''' env = self.env - JSONRPC.init(self, self.daemon, self.coin) + Session.init(self, self.daemon, self.coin) if env.rpc_port is not None: await self.start_server(LocalRPC, 'RPC', 'localhost', env.rpc_port) @@ -142,100 +134,43 @@ class SessionManager(LoggedClass): self.current_task = None -class JSONRPC(asyncio.Protocol, LoggedClass): - '''Base class that manages a JSONRPC connection.''' +class Session(JSONRPC): + '''Base class of ElectrumX JSON session protocols.''' - def __init__(self): + def __init__(self, env, kind): super().__init__() - self.parts = [] - self.send_count = 0 - self.send_size = 0 - self.error_count = 0 self.hash168s = set() - self.start = time.time() self.client = 'unknown' - self.peername = 'unknown' + self.env = env + self.kind = kind def connection_made(self, transport): '''Handle an incoming client connection.''' - self.transport = transport - peer = transport.get_extra_info('peername') - self.peername = '{}:{}'.format(peer[0], peer[1]) - self.logger.info('connection from {}'.format(self.peername)) + super().connection_made(transport) + self.logger.info('connection from {}'.format(self.peername())) self.SESSION_MGR.add_session(self) def connection_lost(self, exc): '''Handle client disconnection.''' - self.logger.info('{} disconnected. ' - 'Sent {:,d} bytes in {:,d} messages {:,d} errors' - .format(self.peername, self.send_size, - self.send_count, self.error_count)) + super().connection_lost(exc) + if self.error_count or self.send_size >= 250000: + self.logger.info('{} disconnected. ' + 'Sent {:,d} bytes in {:,d} messages {:,d} errors' + .format(self.peername(), self.send_size, + self.send_count, self.error_count)) self.SESSION_MGR.remove_session(self) - def data_received(self, data): - '''Handle incoming data (synchronously). + def method_handler(self, method): + '''Return the handler that will handle the RPC method.''' + return self.handlers.get(method) - Requests end in newline characters. Pass complete requests to - decode_message for handling. - ''' - while True: - npos = data.find(ord('\n')) - if npos == -1: - self.parts.append(data) - break - tail, data = data[:npos], data[npos + 1:] - parts, self.parts = self.parts, [] - parts.append(tail) - self.decode_message(b''.join(parts)) - - def decode_message(self, message): - '''Decode a binary message and queue it for asynchronous handling.''' - try: - message = json.loads(message.decode()) - except Exception as e: - self.logger.info('error decoding JSON message: {}'.format(e)) - else: - self.SESSION_MGR.add_task(self, self.request_handler(message)) + def on_json_request(self, request): + '''Queue the request for asynchronous handling.''' + self.SESSION_MGR.add_task(self, self.handle_json_request(request)) - async def request_handler(self, request): - '''Called asynchronously.''' - error = result = None - try: - handler = self.rpc_handler(request.get('method'), - request.get('params', [])) - result = await handler() - except RPCError as e: - self.error_count += 1 - error = {'code': 1, 'message': e.args[0]} - payload = {'id': request.get('id'), 'error': error, 'result': result} - if not self.json_send(payload): - # Let asyncio call connection_lost() so we stop this - # session's tasks - await asyncio.sleep(0) - - def json_send(self, payload): - if self.transport.is_closing(): - self.logger.info('connection closing, not writing') - return False - - data = (json.dumps(payload) + '\n').encode() - self.transport.write(data) - self.send_count += 1 - self.send_size += len(data) - return True - - def rpc_handler(self, method, params): - handler = None - if isinstance(method, str): - handler = self.handlers.get(method) - if not handler: - self.logger.info('unknown method: {}'.format(method)) - raise RPCError('unknown method: {}'.format(method)) - - if not isinstance(params, list): - raise RPCError('params should be an array') - - return partial(handler, self, params) + def peername(self): + info = self.peer_info() + return 'unknown' if not info else '{}:{}'.format(info[0], info[1]) @classmethod def tx_hash_from_param(cls, param): @@ -321,13 +256,11 @@ class JSONRPC(asyncio.Protocol, LoggedClass): return cls.electrum_header(cls.height()) -class ElectrumX(JSONRPC): +class ElectrumX(Session): '''A TCP server that handles incoming Electrum connections.''' def __init__(self, env, kind): - super().__init__() - self.env = env - self.kind = kind + super().__init__(env, kind) self.subscribe_headers = False self.subscribe_height = False self.notified_height = None @@ -342,7 +275,7 @@ class ElectrumX(JSONRPC): 'banner donation_address peers.subscribe version'), ] self.handlers = {'.'.join([prefix, suffix]): - getattr(self.__class__, suffix.replace('.', '_')) + getattr(self, suffix.replace('.', '_')) for prefix, suffixes in rpcs for suffix in suffixes.split()} @@ -355,11 +288,11 @@ class ElectrumX(JSONRPC): def notify(cls, height, touched): '''Notify electrum clients about height changes and touched addresses.''' - headers_payload = json_notification( + headers_payload = json_notification_payload( 'blockchain.headers.subscribe', (cls.electrum_header(height), ), ) - height_payload = json_notification( + height_payload = json_notification_payload( 'blockchain.numblocks.subscribe', (height, ), ) @@ -372,16 +305,16 @@ class ElectrumX(JSONRPC): if height != session.notified_height: session.notified_height = height if session.subscribe_headers: - session.json_send(headers_payload) + session.send_json(headers_payload) if session.subscribe_height: - session.json_send(height_payload) + session.send_json(height_payload) for hash168 in session.hash168s.intersection(touched): address = hash168_to_address(hash168) status = cls.address_status(hash168) - payload = json_notification('blockchain.address.subscribe', - (address, status)) - session.json_send(payload) + payload = json_notification_payload( + 'blockchain.address.subscribe', (address, status)) + session.send_json(payload) @classmethod def address_status(cls, hash168): @@ -614,15 +547,13 @@ class ElectrumX(JSONRPC): return VERSION -class LocalRPC(JSONRPC): +class LocalRPC(Session): '''A local TCP RPC server for querying status.''' def __init__(self, env, kind): - super().__init__() + super().__init__(env, kind) cmds = 'getinfo sessions numsessions peers numpeers'.split() - self.handlers = {cmd: getattr(self.__class__, cmd) for cmd in cmds} - self.env = env - self.kind = kind + self.handlers = {cmd: getattr(self, cmd) for cmd in cmds} async def getinfo(self, params): return { @@ -636,8 +567,10 @@ class LocalRPC(JSONRPC): async def sessions(self, params): now = time.time() return [(session.kind, - 'this RPC client' if session == self else session.peername, - len(session.hash168s), session.client, now - session.start) + '' if session == self else session.peername(), + len(session.hash168s), + 'this RPC client' if session == self else session.client, + now - session.start) for session in self.SESSION_MGR.sessions] async def numsessions(self, params): From 7523735f99c5d564a9ab5ac0e077ccf21a9a2f6e Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Sun, 13 Nov 2016 14:43:13 +0900 Subject: [PATCH 2/3] Split out server and session management --- server/protocol.py | 335 +++++++++++++++++++++++---------------------- 1 file changed, 170 insertions(+), 165 deletions(-) diff --git a/server/protocol.py b/server/protocol.py index f3b6e2a..5a17257 100644 --- a/server/protocol.py +++ b/server/protocol.py @@ -27,28 +27,43 @@ from server.version import VERSION class BlockServer(BlockProcessor): - '''Like BlockProcessor but also starts servers when caught up.''' + '''Like BlockProcessor but also has a server manager and starts + servers when caught up.''' def __init__(self, env): super().__init__(env) - self.servers = [] - self.irc = IRC(env) + self.server_mgr = ServerManager(self, env) async def caught_up(self, mempool_hashes): await super().caught_up(mempool_hashes) - if not self.servers: - await self.start_servers() - if self.env.irc: - self.logger.info('starting IRC coroutine') - asyncio.ensure_future(self.irc.start()) - else: - self.logger.info('IRC disabled') - ElectrumX.notify(self.height, self.touched) - - async def start_server(self, class_name, kind, host, port, *, ssl=None): + self.server_mgr.notify(self.height, self.touched) + + def stop(self): + '''Close the listening servers.''' + self.server_mgr.stop() + + +class ServerManager(LoggedClass): + '''Manages the servers.''' + + AsyncTask = namedtuple('AsyncTask', 'session job') + + def __init__(self, bp, env): + super().__init__() + self.bp = bp + self.env = env + self.servers = [] + self.irc = IRC(env) + self.sessions = set() + self.tasks = asyncio.Queue() + self.current_task = None + + async def start_server(self, kind, *args, **kw_args): loop = asyncio.get_event_loop() - protocol = partial(class_name, self.env, kind) - server = loop.create_server(protocol, host, port, ssl=ssl) + protocol_class = LocalRPC if kind == 'RPC' else ElectrumX + protocol = partial(protocol_class, self, self.bp, self.env, kind) + server = loop.create_server(protocol, *args, **kw_args) + try: self.servers.append(await server) except asyncio.CancelledError: @@ -61,44 +76,49 @@ class BlockServer(BlockProcessor): .format(kind, host, port)) async def start_servers(self): - '''Start listening on RPC, TCP and SSL ports. + '''Connect to IRC and start listening for incoming connections. - Does not start a server if the port wasn't specified. + Only connect to IRC if enabled. Start listening on RCP, TCP + and SSL ports only if the port wasn pecified. ''' env = self.env - Session.init(self, self.daemon, self.coin) + if env.rpc_port is not None: - await self.start_server(LocalRPC, 'RPC', 'localhost', env.rpc_port) + await self.start_server('RPC', 'localhost', env.rpc_port) if env.tcp_port is not None: - await self.start_server(ElectrumX, 'TCP', env.host, env.tcp_port) + await self.start_server('TCP', env.host, env.tcp_port) if env.ssl_port is not None: # FIXME: update if we want to require Python >= 3.5.3 sslc = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2) sslc.load_cert_chain(env.ssl_certfile, keyfile=env.ssl_keyfile) - await self.start_server(ElectrumX, 'SSL', env.host, - env.ssl_port, ssl=sslc) + await self.start_server('SSL', env.host, env.ssl_port, ssl=sslc) - def stop(self): - '''Close the listening servers.''' - for server in self.servers: - server.close() + asyncio.ensure_future(self.run_tasks()) - def irc_peers(self): - return self.irc.peers + if env.irc: + self.logger.info('starting IRC coroutine') + asyncio.ensure_future(self.irc.start()) + else: + self.logger.info('IRC disabled') + async def notify(self, height, touched): + '''Notify electrum clients about height changes and touched addresses. -AsyncTask = namedtuple('AsyncTask', 'session job') + Start listening if not yet listening. + ''' + if not self.servers: + await self.start_servers() -class SessionManager(LoggedClass): + sessions = [session for session in self.sessions + if isinstance(session, ElectrumX)] + self.ElectrumX.notify(sessions, height, touched) - def __init__(self): - super().__init__() - self.sessions = set() - self.tasks = asyncio.Queue() - self.current_task = None - asyncio.ensure_future(self.run_tasks()) + def stop(self): + '''Close the listening servers.''' + for server in self.servers: + server.close() def add_session(self, session): assert session not in self.sessions @@ -113,7 +133,7 @@ class SessionManager(LoggedClass): def add_task(self, session, job): assert session in self.sessions task = asyncio.ensure_future(job) - self.tasks.put_nowait(AsyncTask(session, task)) + self.tasks.put_nowait(self.AsyncTask(session, task)) async def run_tasks(self): '''Asynchronously run through the task queue.''' @@ -133,22 +153,55 @@ class SessionManager(LoggedClass): finally: self.current_task = None + def irc_peers(self): + return self.irc.peers + + def session_count(self): + return len(self.manager.sessions) + + def info(self): + '''Returned in the RPC 'getinfo' call.''' + address_count = sum(len(session.hash168s) + for session in self.sessions + if isinstance(session, ElectrumX)) + return { + 'blocks': self.bp.height, + 'peers': len(self.irc_peers()), + 'sessions': self.session_count(), + 'watched': address_count, + 'cached': 0, + } + + def sessions_info(self): + '''Returned to the RPC 'sessions' call.''' + now = time.time() + return [(session.kind, + session.peername(), + len(session.hash168s), + 'RPC' if isinstance(session, LocalRPC) else session.client, + now - session.start) + for session in self.sessions] + class Session(JSONRPC): '''Base class of ElectrumX JSON session protocols.''' - def __init__(self, env, kind): + def __init__(self, manager, bp, env, kind): super().__init__() - self.hash168s = set() - self.client = 'unknown' + self.manager = manager + self.bp = bp self.env = env + self.daemon = bp.daemon + self.coin = bp.coin self.kind = kind + self.hash168s = set() + self.client = 'unknown' def connection_made(self, transport): '''Handle an incoming client connection.''' super().connection_made(transport) self.logger.info('connection from {}'.format(self.peername())) - self.SESSION_MGR.add_session(self) + self.manager.add_session(self) def connection_lost(self, exc): '''Handle client disconnection.''' @@ -158,7 +211,7 @@ class Session(JSONRPC): 'Sent {:,d} bytes in {:,d} messages {:,d} errors' .format(self.peername(), self.send_size, self.send_count, self.error_count)) - self.SESSION_MGR.remove_session(self) + self.maanger.remove_session(self) def method_handler(self, method): '''Return the handler that will handle the RPC method.''' @@ -166,14 +219,13 @@ class Session(JSONRPC): def on_json_request(self, request): '''Queue the request for asynchronous handling.''' - self.SESSION_MGR.add_task(self, self.handle_json_request(request)) + self.manager.add_task(self, self.handle_json_request(request)) def peername(self): info = self.peer_info() return 'unknown' if not info else '{}:{}'.format(info[0], info[1]) - @classmethod - def tx_hash_from_param(cls, param): + def tx_hash_from_param(self, param): '''Raise an RPCError if the parameter is not a valid transaction hash.''' if isinstance(param, str) and len(param) == 64: @@ -185,17 +237,15 @@ class Session(JSONRPC): raise RPCError('parameter should be a transaction hash: {}' .format(param)) - @classmethod - def hash168_from_param(cls, param): + def hash168_from_param(self, param): if isinstance(param, str): try: - return cls.COIN.address_to_hash168(param) + return self.coin.address_to_hash168(param) except: pass raise RPCError('parameter should be a valid address: {}'.format(param)) - @classmethod - def non_negative_integer_from_param(cls, param): + def non_negative_integer_from_param(self, param): try: param = int(param) except ValueError: @@ -207,60 +257,28 @@ class Session(JSONRPC): raise RPCError('param should be a non-negative integer: {}' .format(param)) - @classmethod - def extract_hash168(cls, params): + def extract_hash168(self, params): if len(params) == 1: - return cls.hash168_from_param(params[0]) + return self.hash168_from_param(params[0]) raise RPCError('params should contain a single address: {}' .format(params)) - @classmethod - def extract_non_negative_integer(cls, params): + def extract_non_negative_integer(self, params): if len(params) == 1: - return cls.non_negative_integer_from_param(params[0]) + return self.non_negative_integer_from_param(params[0]) raise RPCError('params should contain a non-negative integer: {}' .format(params)) - @classmethod - def require_empty_params(cls, params): + def require_empty_params(self, params): if params: raise RPCError('params should be empty: {}'.format(params)) - @classmethod - def init(cls, block_processor, daemon, coin): - cls.BLOCK_PROCESSOR = block_processor - cls.DAEMON = daemon - cls.COIN = coin - cls.SESSION_MGR = SessionManager() - - @classmethod - def irc_peers(cls): - return cls.BLOCK_PROCESSOR.irc_peers() - - @classmethod - def height(cls): - '''Return the current height.''' - return cls.BLOCK_PROCESSOR.height - - @classmethod - def electrum_header(cls, height=None): - '''Return the binary header at the given height.''' - if not 0 <= height <= cls.height(): - raise RPCError('height {:,d} out of range'.format(height)) - header = cls.BLOCK_PROCESSOR.read_headers(height, 1) - return cls.COIN.electrum_header(header, height) - - @classmethod - def current_electrum_header(cls): - '''Used as response to a headers subscription request.''' - return cls.electrum_header(cls.height()) - class ElectrumX(Session): '''A TCP server that handles incoming Electrum connections.''' - def __init__(self, env, kind): - super().__init__(env, kind) + def __init__(self, *args): + super().__init__(*args) self.subscribe_headers = False self.subscribe_height = False self.notified_height = None @@ -280,49 +298,57 @@ class ElectrumX(Session): for suffix in suffixes.split()} @classmethod - def watched_address_count(cls): - sessions = cls.SESSION_MGR.sessions - return sum(len(session.hash168s) for session in sessions) - - @classmethod - def notify(cls, height, touched): - '''Notify electrum clients about height changes and touched - addresses.''' - headers_payload = json_notification_payload( - 'blockchain.headers.subscribe', - (cls.electrum_header(height), ), - ) - height_payload = json_notification_payload( - 'blockchain.numblocks.subscribe', - (height, ), - ) - hash168_to_address = cls.COIN.hash168_to_address - - for session in cls.SESSION_MGR.sessions: - if not isinstance(session, ElectrumX): - continue + def notify(cls, sessions, height, touched): + headers_payload = height_payload = None + for session in sessions: if height != session.notified_height: session.notified_height = height if session.subscribe_headers: + if headers_payload is None: + headers_payload = json_notification_payload( + 'blockchain.headers.subscribe', + (session.electrum_header(height), ), + ) session.send_json(headers_payload) + if session.subscribe_height: + if height_payload is None: + height_payload = json_notification_payload( + 'blockchain.numblocks.subscribe', + (height, ), + ) session.send_json(height_payload) + hash168_to_address = session.coin.hash168_to_address for hash168 in session.hash168s.intersection(touched): address = hash168_to_address(hash168) - status = cls.address_status(hash168) + status = session.address_status(hash168) payload = json_notification_payload( 'blockchain.address.subscribe', (address, status)) session.send_json(payload) - @classmethod - def address_status(cls, hash168): + def height(self): + '''Return the block processor's current height.''' + return self.bp.height + + def current_electrum_header(self): + '''Used as response to a headers subscription request.''' + return self.electrum_header(self.height()) + + def electrum_header(self, height): + '''Return the binary header at the given height.''' + if not 0 <= height <= self.height(): + raise RPCError('height {:,d} out of range'.format(height)) + header = self.bp.read_headers(height, 1) + return self.coin.electrum_header(header, height) + + def address_status(self, hash168): '''Returns status as 32 bytes.''' # Note history is ordered and mempool unordered in electrum-server # For mempool, height is -1 if unconfirmed txins, otherwise 0 - history = cls.BLOCK_PROCESSOR.get_history(hash168) - mempool = cls.BLOCK_PROCESSOR.mempool_transactions(hash168) + history = self.bp.get_history(hash168) + mempool = self.bp.mempool_transactions(hash168) status = ''.join('{}:{:d}:'.format(hash_to_str(tx_hash), height) for tx_hash, height in history) @@ -332,11 +358,10 @@ class ElectrumX(Session): return sha256(status.encode()).hex() return None - @classmethod - async def tx_merkle(cls, tx_hash, height): + async def tx_merkle(self, tx_hash, height): '''tx_hash is a hex string.''' - hex_hashes = await cls.DAEMON.block_hex_hashes(height, 1) - block = await cls.DAEMON.deserialised_block(hex_hashes[0]) + hex_hashes = await self.daemon.block_hex_hashes(height, 1) + block = await self.daemon.deserialised_block(hex_hashes[0]) tx_hashes = block['tx'] # This will throw if the tx_hash is bad pos = tx_hashes.index(tx_hash) @@ -355,16 +380,11 @@ class ElectrumX(Session): return {"block_height": height, "merkle": merkle_branch, "pos": pos} - @classmethod - def height(cls): - return cls.BLOCK_PROCESSOR.height - - @classmethod - def get_history(cls, hash168): + def get_history(self, hash168): # Note history is ordered and mempool unordered in electrum-server # For mempool, height is -1 if unconfirmed txins, otherwise 0 - history = cls.BLOCK_PROCESSOR.get_history(hash168, limit=None) - mempool = cls.BLOCK_PROCESSOR.mempool_transactions(hash168) + history = self.bp.get_history(hash168, limit=None) + mempool = self.bp.mempool_transactions(hash168) conf = tuple({'tx_hash': hash_to_str(tx_hash), 'height': height} for tx_hash, height in history) @@ -372,24 +392,21 @@ class ElectrumX(Session): for tx_hash, fee, unconfirmed in mempool) return conf + unconf - @classmethod - def get_chunk(cls, index): + def get_chunk(self, index): '''Return header chunk as hex. Index is a non-negative integer.''' - chunk_size = cls.COIN.CHUNK_SIZE - next_height = cls.height() + 1 + chunk_size = self.coin.CHUNK_SIZE + next_height = self.height() + 1 start_height = min(index * chunk_size, next_height) count = min(next_height - start_height, chunk_size) - return cls.BLOCK_PROCESSOR.read_headers(start_height, count).hex() + return self.bp.read_headers(start_height, count).hex() - @classmethod - def get_balance(cls, hash168): - confirmed = cls.BLOCK_PROCESSOR.get_balance(hash168) - unconfirmed = cls.BLOCK_PROCESSOR.mempool_value(hash168) + def get_balance(self, hash168): + confirmed = self.bp.get_balance(hash168) + unconfirmed = self.bp.mempool_value(hash168) return {'confirmed': confirmed, 'unconfirmed': unconfirmed} - @classmethod - def list_unspent(cls, hash168): - utxos = cls.BLOCK_PROCESSOR.get_utxos_sorted(hash168) + def list_unspent(self, hash168): + utxos = self.bp.get_utxos_sorted(hash168) return tuple({'tx_hash': hash_to_str(utxo.tx_hash), 'tx_pos': utxo.tx_pos, 'height': utxo.height, 'value': utxo.value} @@ -431,7 +448,7 @@ class ElectrumX(Session): return self.electrum_header(height) async def estimatefee(self, params): - return await self.DAEMON.estimatefee(params) + return await self.daemon.estimatefee(params) async def headers_subscribe(self, params): self.require_empty_params(params) @@ -447,7 +464,7 @@ class ElectrumX(Session): '''The minimum fee a low-priority tx must pay in order to be accepted to the daemon's memory pool.''' self.require_empty_params(params) - return await self.DAEMON.relayfee() + return await self.daemon.relayfee() async def transaction_broadcast(self, params): '''Pass through the parameters to the daemon. @@ -458,7 +475,7 @@ class ElectrumX(Session): user interface job here. ''' try: - tx_hash = await self.DAEMON.sendrawtransaction(params) + tx_hash = await self.daemon.sendrawtransaction(params) self.logger.info('sent tx: {}'.format(tx_hash)) return tx_hash except DaemonError as e: @@ -483,7 +500,7 @@ class ElectrumX(Session): # in anticipation it might be dropped in the future. if 1 <= len(params) <= 2: tx_hash = self.tx_hash_from_param(params[0]) - return await self.DAEMON.getrawtransaction(tx_hash) + return await self.daemon.getrawtransaction(tx_hash) raise RPCError('params wrong length: {}'.format(params)) @@ -500,9 +517,9 @@ class ElectrumX(Session): tx_hash = self.tx_hash_from_param(params[0]) index = self.non_negative_integer_from_param(params[1]) tx_hash = hex_str_to_hash(tx_hash) - hash168 = self.BLOCK_PROCESSOR.get_utxo_hash168(tx_hash, index) + hash168 = self.bp.get_utxo_hash168(tx_hash, index) if hash168: - return self.COIN.hash168_to_address(hash168) + return self.coin.hash168_to_address(hash168) return None raise RPCError('params should contain a transaction hash and index') @@ -537,7 +554,7 @@ class ElectrumX(Session): subscription. ''' self.require_empty_params(params) - return list(self.irc_peers().values()) + return list(self.manager.irc_peers().values()) async def version(self, params): '''Return the server version as a string.''' @@ -550,34 +567,22 @@ class ElectrumX(Session): class LocalRPC(Session): '''A local TCP RPC server for querying status.''' - def __init__(self, env, kind): - super().__init__(env, kind) + def __init__(self, *args): + super().__init__(*args) cmds = 'getinfo sessions numsessions peers numpeers'.split() self.handlers = {cmd: getattr(self, cmd) for cmd in cmds} async def getinfo(self, params): - return { - 'blocks': self.height(), - 'peers': len(self.irc_peers()), - 'sessions': len(self.SESSION_MGR.sessions), - 'watched': ElectrumX.watched_address_count(), - 'cached': 0, - } + return self.manager.info() async def sessions(self, params): - now = time.time() - return [(session.kind, - '' if session == self else session.peername(), - len(session.hash168s), - 'this RPC client' if session == self else session.client, - now - session.start) - for session in self.SESSION_MGR.sessions] + return self.manager.sessions_info() async def numsessions(self, params): - return len(self.SESSION_MGR.sessions) + return self.manager.session_count() async def peers(self, params): - return self.irc_peers() + return self.manager.irc_peers() async def numpeers(self, params): - return len(self.irc_peers()) + return len(self.manager.irc_peers()) From 831225492eeafb8cd7a9e768be7f25d28c57046c Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Sun, 13 Nov 2016 14:54:02 +0900 Subject: [PATCH 3/3] Better RPC sessions stats --- electrumx_rpc.py | 26 +++++++++++++++++++++----- server/protocol.py | 24 +++++++++++++----------- 2 files changed, 34 insertions(+), 16 deletions(-) diff --git a/electrumx_rpc.py b/electrumx_rpc.py index 0bc7a0e..f7f05e1 100755 --- a/electrumx_rpc.py +++ b/electrumx_rpc.py @@ -44,12 +44,28 @@ class RPCClient(asyncio.Protocol): if error: print("ERROR: {}".format(error)) else: + def data_fmt(count, size): + return '{:,d}/{:,d}KB'.format(count, size // 1024) + def time_fmt(t): + t = int(t) + return ('{:3d}:{:02d}:{:02d}' + .format(t // 3600, (t % 3600) // 60, t % 60)) + if self.method == 'sessions': - fmt = '{:<4} {:>23} {:>7} {:>15} {:>7}' - print(fmt.format('Type', 'Peer', 'Subs', 'Client', 'Time')) - for kind, peer, subs, client, time in result: - print(fmt.format(kind, peer, '{:,d}'.format(subs), - client, '{:,d}'.format(int(time)))) + fmt = ('{:<4} {:>23} {:>15} {:>5} ' + '{:>7} {:>7} {:>7} {:>7} {:>5} {:>9}') + print(fmt.format('Type', 'Peer', 'Client', 'Subs', + 'Snt #', 'Snt MB', 'Rcv #', 'Rcv MB', + 'Errs', 'Time')) + for (kind, peer, subs, client, recv_count, recv_size, + send_count, send_size, error_count, time) in result: + print(fmt.format(kind, peer, client, '{:,d}'.format(subs), + '{:,d}'.format(recv_count), + '{:,.1f}'.format(recv_size / 1048576), + '{:,d}'.format(send_count), + '{:,.1f}'.format(send_size / 1048576), + '{:,d}'.format(error_count), + time_fmt(time))) else: pprint.pprint(result, indent=4) diff --git a/server/protocol.py b/server/protocol.py index 5a17257..fb55466 100644 --- a/server/protocol.py +++ b/server/protocol.py @@ -33,9 +33,13 @@ class BlockServer(BlockProcessor): def __init__(self, env): super().__init__(env) self.server_mgr = ServerManager(self, env) + self.bs_caught_up = False async def caught_up(self, mempool_hashes): await super().caught_up(mempool_hashes) + if not self.bs_caught_up: + await self.server_mgr.start_servers() + self.bs_caught_up = True self.server_mgr.notify(self.height, self.touched) def stop(self): @@ -64,6 +68,7 @@ class ServerManager(LoggedClass): protocol = partial(protocol_class, self, self.bp, self.env, kind) server = loop.create_server(protocol, *args, **kw_args) + host, port = args[:2] try: self.servers.append(await server) except asyncio.CancelledError: @@ -103,17 +108,11 @@ class ServerManager(LoggedClass): else: self.logger.info('IRC disabled') - async def notify(self, height, touched): - '''Notify electrum clients about height changes and touched addresses. - - Start listening if not yet listening. - ''' - if not self.servers: - await self.start_servers() - + def notify(self, height, touched): + '''Notify sessions about height changes and touched addresses.''' sessions = [session for session in self.sessions if isinstance(session, ElectrumX)] - self.ElectrumX.notify(sessions, height, touched) + ElectrumX.notify(sessions, height, touched) def stop(self): '''Close the listening servers.''' @@ -157,7 +156,7 @@ class ServerManager(LoggedClass): return self.irc.peers def session_count(self): - return len(self.manager.sessions) + return len(self.sessions) def info(self): '''Returned in the RPC 'getinfo' call.''' @@ -179,6 +178,9 @@ class ServerManager(LoggedClass): session.peername(), len(session.hash168s), 'RPC' if isinstance(session, LocalRPC) else session.client, + session.recv_count, session.recv_size, + session.send_count, session.send_size, + session.error_count, now - session.start) for session in self.sessions] @@ -211,7 +213,7 @@ class Session(JSONRPC): 'Sent {:,d} bytes in {:,d} messages {:,d} errors' .format(self.peername(), self.send_size, self.send_count, self.error_count)) - self.maanger.remove_session(self) + self.manager.remove_session(self) def method_handler(self, method): '''Return the handler that will handle the RPC method.'''