From 3b6ab77e4725b0fca6f5ffa7140724f5169ee761 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Sun, 13 Nov 2016 13:39:37 +0900 Subject: [PATCH] Break out JSONRPC into own file - improved handling of JSON 2.0 RPC protocol - permits batched requests (not yet supported by Electrum client) --- electrumx_rpc.py | 2 +- lib/jsonrpc.py | 249 +++++++++++++++++++++++++++++++++++++++++++++ server/protocol.py | 149 ++++++++------------------- 3 files changed, 291 insertions(+), 109 deletions(-) create mode 100644 lib/jsonrpc.py diff --git a/electrumx_rpc.py b/electrumx_rpc.py index 9d3a9c2..0bc7a0e 100755 --- a/electrumx_rpc.py +++ b/electrumx_rpc.py @@ -32,7 +32,7 @@ class RPCClient(asyncio.Protocol): def send(self, method, params): self.method = method - payload = {'method': method, 'params': params} + payload = {'method': method, 'params': params, 'id': 'RPC'} data = json.dumps(payload) + '\n' self.transport.write(data.encode()) diff --git a/lib/jsonrpc.py b/lib/jsonrpc.py new file mode 100644 index 0000000..d334d4f --- /dev/null +++ b/lib/jsonrpc.py @@ -0,0 +1,249 @@ +# Copyright (c) 2016, Neil Booth +# +# All rights reserved. +# +# See the file "LICENCE" for information about the copyright +# and warranty status of this software. + +'''Class for handling JSON RPC 2.0 connections, server or client.''' + +import asyncio +import json +import numbers +import time + +from lib.util import LoggedClass + + +def json_result_payload(result, id_): + # We should not respond to notifications + assert id_ is not None + return {'jsonrpc': '2.0', 'error': None, 'result': result, 'id': id_} + +def json_error_payload(message, code, id_=None): + error = {'message': message, 'code': code} + return {'jsonrpc': '2.0', 'error': error, 'result': None, 'id': id_} + +def json_notification_payload(method, params): + return {'jsonrpc': '2.0', 'id': None, 'method': method, 'params': params} + + +class JSONRPC(asyncio.Protocol, LoggedClass): + '''Manages a JSONRPC connection. + + Assumes JSON messages are newline-separated and that newlines + cannot appear in the JSON other than to separate lines. + + Derived classes need to implement the synchronous functions + on_json_request() and method_handler(). They probably also want + to override connection_made() and connection_lost() but should be + sure to call the implementation in this base class first. + + on_json_request() is passed a JSON request as a python object + after decoding. It should arrange to pass on to the asynchronous + handle_json_request() method. + + method_handler() takes a method string and should return a function + that can be passed a parameters array, or None for an unknown method. + + Handlers should raise an RPCError on error. + ''' + + # See http://www.jsonrpc.org/specification + PARSE_ERROR = -32700 + INVALID_REQUEST = -32600 + METHOD_NOT_FOUND = -32601 + INVALID_PARAMS = -32602 + INTERAL_ERROR = -32603 + + ID_TYPES = (type(None), str, numbers.Number) + + class RPCError(Exception): + '''RPC handlers raise this error.''' + def __init__(self, msg, code=-1, **kw_args): + super().__init__(**kw_args) + self.msg = msg + self.code + + + def __init__(self): + super().__init__() + self.start = time.time() + self.transport = None + # Parts of an incomplete JSON line. We buffer them until + # getting a newline. + self.parts = [] + # recv_count is JSON messages not calls to data_received() + self.recv_count = 0 + self.recv_size = 0 + self.send_count = 0 + self.send_size = 0 + self.error_count = 0 + + def connection_made(self, transport): + '''Handle an incoming client connection.''' + self.transport = transport + + def peer_info(self): + '''Return peer info.''' + if self.transport: + return self.transport.get_extra_info('peername') + return None + + def connection_lost(self, exc): + '''Handle client disconnection.''' + pass + + def data_received(self, data): + '''Handle incoming data (synchronously). + + Requests end in newline characters. Pass complete requests to + decode_message for handling. + ''' + self.recv_size += len(data) + while True: + npos = data.find(ord('\n')) + if npos == -1: + self.parts.append(data) + break + self.recv_count += 1 + tail, data = data[:npos], data[npos + 1:] + parts, self.parts = self.parts, [] + parts.append(tail) + self.decode_message(b''.join(parts)) + + def decode_message(self, message): + '''Decode a binary message and queue it for asynchronous handling. + + Messages that cannot be decoded are logged and dropped. + ''' + try: + message = message.decode() + except UnicodeDecodeError as e: + msg = 'cannot decode binary bytes: {}'.format(e) + self.logger.warning(msg) + self.send_json_error(msg, self.PARSE_ERROR) + return + + try: + message = json.loads(message) + except json.JSONDecodeError as e: + msg = 'cannot decode JSON: {}'.format(e) + self.logger.warning(msg) + self.send_json_error(msg, self.PARSE_ERROR) + return + + self.on_json_request(message) + + def send_json_notification(self, method, params): + '''Create a json notification.''' + return self.send_json(json_notification_payload(method, params)) + + def send_json_result(self, result, id_): + '''Send a JSON result.''' + return self.send_json(json_result_payload(result, id_)) + + def send_json_error(self, message, code, id_=None): + '''Send a JSON error.''' + self.error_count += 1 + return self.send_json(json_error_payload(message, code, id_)) + + def send_json(self, payload): + '''Send a JSON payload.''' + if self.transport.is_closing(): + self.logger.info('send_json: connection closing, not sending') + return False + + try: + data = (json.dumps(payload) + '\n').encode() + except TypeError: + msg = 'JSON encoding failure: {}'.format(payload) + self.logger.error(msg) + return self.send_json_error(msg, self.INTERNAL_ERROR, + payload.get('id')) + + self.send_count += 1 + self.send_size += len(data) + self.transport.write(data) + return True + + async def handle_json_request(self, request): + '''Asynchronously handle a JSON request. + + Handles batch requests. Returns True if the request response + was sent (or if nothing was sent because the request was a + notification). Returns False if the send was aborted because + the connection is closing. + ''' + if isinstance(request, list): + payload = self.batch_request_payload(request) + else: + payload = await self.single_request_payload(request) + + if not payload: + return True + return self.send_json(payload) + + async def batch_request_payload(self, batch): + '''Return the JSON payload corresponding to a batch JSON request.''' + # Batches must have at least one request. + if not batch: + return json_error_payload('empty request list', + self.INVALID_REQUEST) + + # PYTHON 3.6: use asynchronous comprehensions when supported + payload = [] + for item in request: + item_payload = await self.single_request_payload(item) + if item_payload: + payload.append(item_payload) + return payload + + async def single_request_payload(self, request): + '''Return the JSON payload corresponding to a single JSON request. + + Return None if the request is a notification. + ''' + if not isinstance(request, dict): + return json_error_payload('request must be a dict', + self.INVALID_REQUEST) + + id_ = request.get('id') + if not isinstance(id_, self.ID_TYPES): + return json_error_payload('invalid id: {}'.format(id_), + self.INVALID_REQUEST) + + try: + result = await self.method_result(request.get('method'), + request.get('params', [])) + if id_ is None: + return None + return json_result_payload(result, id_) + except self.RPCError as e: + if id_ is None: + return None + return json_error_payload(e.msg, e.code, id_) + + async def method_result(self, method, params): + if not isinstance(method, str): + raise self.RPCError('invalid method: {}'.format(method), + self.INVALID_REQUEST) + + if not isinstance(params, list): + raise self.RPCError('params should be an array', + self.INVALID_REQUEST) + + handler = self.method_handler(method) + if not handler: + raise self.RPCError('unknown method: {}'.format(method), + self.METHOD_NOT_FOUND) + + return await handler(params) + + def on_json_request(self, request): + raise NotImplementedError('on_json_request in class {}'. + format(self.__class__.__name__)) + + def method_handler(self, method): + raise NotImplementedError('method_handler in class {}'. + format(self.__class__.__name__)) diff --git a/server/protocol.py b/server/protocol.py index ab3899d..f3b6e2a 100644 --- a/server/protocol.py +++ b/server/protocol.py @@ -18,6 +18,7 @@ from collections import namedtuple from functools import partial from lib.hash import sha256, double_sha256, hash_to_str, hex_str_to_hash +from lib.jsonrpc import JSONRPC, json_notification_payload from lib.util import LoggedClass from server.block_processor import BlockProcessor from server.daemon import DaemonError @@ -25,15 +26,6 @@ from server.irc import IRC from server.version import VERSION -class RPCError(Exception): - '''RPC handlers raise this error.''' - - -def json_notification(method, params): - '''Create a json notification.''' - return {'id': None, 'method': method, 'params': params} - - class BlockServer(BlockProcessor): '''Like BlockProcessor but also starts servers when caught up.''' @@ -74,7 +66,7 @@ class BlockServer(BlockProcessor): Does not start a server if the port wasn't specified. ''' env = self.env - JSONRPC.init(self, self.daemon, self.coin) + Session.init(self, self.daemon, self.coin) if env.rpc_port is not None: await self.start_server(LocalRPC, 'RPC', 'localhost', env.rpc_port) @@ -142,100 +134,43 @@ class SessionManager(LoggedClass): self.current_task = None -class JSONRPC(asyncio.Protocol, LoggedClass): - '''Base class that manages a JSONRPC connection.''' +class Session(JSONRPC): + '''Base class of ElectrumX JSON session protocols.''' - def __init__(self): + def __init__(self, env, kind): super().__init__() - self.parts = [] - self.send_count = 0 - self.send_size = 0 - self.error_count = 0 self.hash168s = set() - self.start = time.time() self.client = 'unknown' - self.peername = 'unknown' + self.env = env + self.kind = kind def connection_made(self, transport): '''Handle an incoming client connection.''' - self.transport = transport - peer = transport.get_extra_info('peername') - self.peername = '{}:{}'.format(peer[0], peer[1]) - self.logger.info('connection from {}'.format(self.peername)) + super().connection_made(transport) + self.logger.info('connection from {}'.format(self.peername())) self.SESSION_MGR.add_session(self) def connection_lost(self, exc): '''Handle client disconnection.''' - self.logger.info('{} disconnected. ' - 'Sent {:,d} bytes in {:,d} messages {:,d} errors' - .format(self.peername, self.send_size, - self.send_count, self.error_count)) + super().connection_lost(exc) + if self.error_count or self.send_size >= 250000: + self.logger.info('{} disconnected. ' + 'Sent {:,d} bytes in {:,d} messages {:,d} errors' + .format(self.peername(), self.send_size, + self.send_count, self.error_count)) self.SESSION_MGR.remove_session(self) - def data_received(self, data): - '''Handle incoming data (synchronously). + def method_handler(self, method): + '''Return the handler that will handle the RPC method.''' + return self.handlers.get(method) - Requests end in newline characters. Pass complete requests to - decode_message for handling. - ''' - while True: - npos = data.find(ord('\n')) - if npos == -1: - self.parts.append(data) - break - tail, data = data[:npos], data[npos + 1:] - parts, self.parts = self.parts, [] - parts.append(tail) - self.decode_message(b''.join(parts)) - - def decode_message(self, message): - '''Decode a binary message and queue it for asynchronous handling.''' - try: - message = json.loads(message.decode()) - except Exception as e: - self.logger.info('error decoding JSON message: {}'.format(e)) - else: - self.SESSION_MGR.add_task(self, self.request_handler(message)) + def on_json_request(self, request): + '''Queue the request for asynchronous handling.''' + self.SESSION_MGR.add_task(self, self.handle_json_request(request)) - async def request_handler(self, request): - '''Called asynchronously.''' - error = result = None - try: - handler = self.rpc_handler(request.get('method'), - request.get('params', [])) - result = await handler() - except RPCError as e: - self.error_count += 1 - error = {'code': 1, 'message': e.args[0]} - payload = {'id': request.get('id'), 'error': error, 'result': result} - if not self.json_send(payload): - # Let asyncio call connection_lost() so we stop this - # session's tasks - await asyncio.sleep(0) - - def json_send(self, payload): - if self.transport.is_closing(): - self.logger.info('connection closing, not writing') - return False - - data = (json.dumps(payload) + '\n').encode() - self.transport.write(data) - self.send_count += 1 - self.send_size += len(data) - return True - - def rpc_handler(self, method, params): - handler = None - if isinstance(method, str): - handler = self.handlers.get(method) - if not handler: - self.logger.info('unknown method: {}'.format(method)) - raise RPCError('unknown method: {}'.format(method)) - - if not isinstance(params, list): - raise RPCError('params should be an array') - - return partial(handler, self, params) + def peername(self): + info = self.peer_info() + return 'unknown' if not info else '{}:{}'.format(info[0], info[1]) @classmethod def tx_hash_from_param(cls, param): @@ -321,13 +256,11 @@ class JSONRPC(asyncio.Protocol, LoggedClass): return cls.electrum_header(cls.height()) -class ElectrumX(JSONRPC): +class ElectrumX(Session): '''A TCP server that handles incoming Electrum connections.''' def __init__(self, env, kind): - super().__init__() - self.env = env - self.kind = kind + super().__init__(env, kind) self.subscribe_headers = False self.subscribe_height = False self.notified_height = None @@ -342,7 +275,7 @@ class ElectrumX(JSONRPC): 'banner donation_address peers.subscribe version'), ] self.handlers = {'.'.join([prefix, suffix]): - getattr(self.__class__, suffix.replace('.', '_')) + getattr(self, suffix.replace('.', '_')) for prefix, suffixes in rpcs for suffix in suffixes.split()} @@ -355,11 +288,11 @@ class ElectrumX(JSONRPC): def notify(cls, height, touched): '''Notify electrum clients about height changes and touched addresses.''' - headers_payload = json_notification( + headers_payload = json_notification_payload( 'blockchain.headers.subscribe', (cls.electrum_header(height), ), ) - height_payload = json_notification( + height_payload = json_notification_payload( 'blockchain.numblocks.subscribe', (height, ), ) @@ -372,16 +305,16 @@ class ElectrumX(JSONRPC): if height != session.notified_height: session.notified_height = height if session.subscribe_headers: - session.json_send(headers_payload) + session.send_json(headers_payload) if session.subscribe_height: - session.json_send(height_payload) + session.send_json(height_payload) for hash168 in session.hash168s.intersection(touched): address = hash168_to_address(hash168) status = cls.address_status(hash168) - payload = json_notification('blockchain.address.subscribe', - (address, status)) - session.json_send(payload) + payload = json_notification_payload( + 'blockchain.address.subscribe', (address, status)) + session.send_json(payload) @classmethod def address_status(cls, hash168): @@ -614,15 +547,13 @@ class ElectrumX(JSONRPC): return VERSION -class LocalRPC(JSONRPC): +class LocalRPC(Session): '''A local TCP RPC server for querying status.''' def __init__(self, env, kind): - super().__init__() + super().__init__(env, kind) cmds = 'getinfo sessions numsessions peers numpeers'.split() - self.handlers = {cmd: getattr(self.__class__, cmd) for cmd in cmds} - self.env = env - self.kind = kind + self.handlers = {cmd: getattr(self, cmd) for cmd in cmds} async def getinfo(self, params): return { @@ -636,8 +567,10 @@ class LocalRPC(JSONRPC): async def sessions(self, params): now = time.time() return [(session.kind, - 'this RPC client' if session == self else session.peername, - len(session.hash168s), session.client, now - session.start) + '' if session == self else session.peername(), + len(session.hash168s), + 'this RPC client' if session == self else session.client, + now - session.start) for session in self.SESSION_MGR.sessions] async def numsessions(self, params):