diff --git a/docs/RELEASE-NOTES b/docs/RELEASE-NOTES index 303ef4c..a793349 100644 --- a/docs/RELEASE-NOTES +++ b/docs/RELEASE-NOTES @@ -1,3 +1,9 @@ +version 0.7.9 +------------- + +- rewrite jsonrpc.py to also operate as a client. Use this class + for a robust electrumx_rpc.py. Fixes issue #43 + version 0.7.8 ------------- diff --git a/electrumx_rpc.py b/electrumx_rpc.py index 5f02bbf..fb6e854 100755 --- a/electrumx_rpc.py +++ b/electrumx_rpc.py @@ -16,24 +16,30 @@ import json from functools import partial from os import environ +from lib.jsonrpc import JSONRPC -class RPCClient(asyncio.Protocol): - def __init__(self, loop): - self.loop = loop - self.method = None +class RPCClient(JSONRPC): - def connection_made(self, transport): - self.transport = transport + async def send_and_wait(self, method, params, timeout=None): + self.send_json_request(method, id_=method, params=params) - def connection_lost(self, exc): - self.loop.stop() + future = asyncio.ensure_future(self.messages.get()) + for f in asyncio.as_completed([future], timeout=timeout): + try: + message = await f + except asyncio.TimeoutError: + future.cancel() + print('request timed out') + else: + await self.handle_message(message) - def send(self, method, params): - self.method = method - payload = {'method': method, 'params': params, 'id': 'RPC'} - data = json.dumps(payload) + '\n' - self.transport.write(data.encode()) + async def handle_response(self, result, error, method): + if result and method == 'sessions': + self.print_sessions(result) + else: + value = {'error': error} if error else result + print(json.dumps(value, indent=4, sort_keys=True)) def print_sessions(self, result): def data_fmt(count, size): @@ -58,17 +64,6 @@ class RPCClient(asyncio.Protocol): '{:,d}'.format(error_count), time_fmt(time))) - def data_received(self, data): - payload = json.loads(data.decode()) - self.transport.close() - result = payload['result'] - error = payload['error'] - if not error and self.method == 'sessions': - self.print_sessions(result) - else: - value = {'error': error} if error else result - print(json.dumps(value, indent=4, sort_keys=True)) - def main(): '''Send the RPC command to the server and print the result.''' parser = argparse.ArgumentParser('Send electrumx an RPC command' ) @@ -84,12 +79,11 @@ def main(): args.port = int(environ.get('ELECTRUMX_RPC_PORT', 8000)) loop = asyncio.get_event_loop() - proto_factory = partial(RPCClient, loop) - coro = loop.create_connection(proto_factory, 'localhost', args.port) + coro = loop.create_connection(RPCClient, 'localhost', args.port) try: transport, protocol = loop.run_until_complete(coro) - protocol.send(args.command[0], args.param) - loop.run_forever() + coro = protocol.send_and_wait(args.command[0], args.param, timeout=5) + loop.run_until_complete(coro) except OSError: print('error connecting - is ElectrumX catching up or not running?') finally: diff --git a/lib/jsonrpc.py b/lib/jsonrpc.py index e2ddda2..fda25f4 100644 --- a/lib/jsonrpc.py +++ b/lib/jsonrpc.py @@ -15,38 +15,45 @@ import time from lib.util import LoggedClass -def json_result_payload(result, id_): +def json_response_payload(result, id_): # We should not respond to notifications assert id_ is not None - return {'jsonrpc': '2.0', 'error': None, 'result': result, 'id': id_} + return {'jsonrpc': '2.0', 'result': result, 'id': id_} def json_error_payload(message, code, id_=None): error = {'message': message, 'code': code} - return {'jsonrpc': '2.0', 'error': error, 'result': None, 'id': id_} + return {'jsonrpc': '2.0', 'error': error, 'id': id_} -def json_notification_payload(method, params): - return {'jsonrpc': '2.0', 'id': None, 'method': method, 'params': params} +def json_request_payload(method, id_, params=None): + payload = {'jsonrpc': '2.0', 'id': id_, 'method': method} + if params: + payload['params'] = params + return payload + +def json_notification_payload(method, params=None): + return json_request_payload(method, None, params) class JSONRPC(asyncio.Protocol, LoggedClass): '''Manages a JSONRPC connection. Assumes JSON messages are newline-separated and that newlines - cannot appear in the JSON other than to separate lines. - - Derived classes need to implement the synchronous functions - on_json_request() and method_handler(). They probably also want - to override connection_made() and connection_lost() but should be - sure to call the implementation in this base class first. - - on_json_request() is passed a JSON request as a python object - after decoding. It should arrange to pass on to the asynchronous - handle_json_request() method. - - method_handler() takes a method string and should return a function - that can be passed a parameters array, or None for an unknown method. + cannot appear in the JSON other than to separate lines. Incoming + messages are queued on the messages queue for later asynchronous + processing, and should be passed to the handle_message() function. + + Derived classes may want to override connection_made() and + connection_lost() but should be sure to call the implementation in + this base class first. They will also want to implement some or + all of the asynchronous functions handle_notification(), + handle_response() and handle_request(). + + handle_request() returns the result to pass over the network, and + must raise an RPCError if there is an error. + handle_notification() and handle_response() should not return + anything or raise any exceptions. All three functions have + default "ignore" implementations supplied by this class. - Handlers should raise an RPCError on error. ''' # See http://www.jsonrpc.org/specification @@ -54,7 +61,7 @@ class JSONRPC(asyncio.Protocol, LoggedClass): INVALID_REQUEST = -32600 METHOD_NOT_FOUND = -32601 INVALID_PARAMS = -32602 - INTERAL_ERROR = -32603 + INTERNAL_ERROR = -32603 ID_TYPES = (type(None), str, numbers.Number) @@ -65,7 +72,6 @@ class JSONRPC(asyncio.Protocol, LoggedClass): self.msg = msg self.code = code - def __init__(self): super().__init__() self.start = time.time() @@ -80,6 +86,7 @@ class JSONRPC(asyncio.Protocol, LoggedClass): self.send_size = 0 self.error_count = 0 self.peer_info = None + self.messages = asyncio.Queue() def connection_made(self, transport): '''Handle an incoming client connection.''' @@ -132,15 +139,20 @@ class JSONRPC(asyncio.Protocol, LoggedClass): self.transport.close() return - self.on_json_request(message) + '''Queue the request for asynchronous handling.''' + self.messages.put_nowait(message) def send_json_notification(self, method, params): '''Create a json notification.''' self.send_json(json_notification_payload(method, params)) - def send_json_result(self, result, id_): + def send_json_request(self, method, id_, params=None): + '''Send a JSON request.''' + self.send_json(json_request_payload(method, id_, params)) + + def send_json_response(self, result, id_): '''Send a JSON result.''' - self.send_json(json_result_payload(result, id_)) + self.send_json(json_response_payload(result, id_)) def send_json_error(self, message, code, id_=None): '''Send a JSON error.''' @@ -167,20 +179,20 @@ class JSONRPC(asyncio.Protocol, LoggedClass): self.send_size += len(data) self.transport.write(data) - async def handle_json_request(self, request): - '''Asynchronously handle a JSON request. + async def handle_message(self, message): + '''Asynchronously handle a JSON request or response. - Handles batch requests. + Handles batches according to the JSON 2.0 spec. ''' - if isinstance(request, list): - payload = await self.batch_request_payload(request) + if isinstance(message, list): + payload = await self.batch_payload(message) else: - payload = await self.single_request_payload(request) + payload = await self.single_payload(message) if payload: self.send_json(payload) - async def batch_request_payload(self, batch): + async def batch_payload(self, batch): '''Return the JSON payload corresponding to a batch JSON request.''' # Batches must have at least one request. if not batch: @@ -189,38 +201,39 @@ class JSONRPC(asyncio.Protocol, LoggedClass): # PYTHON 3.6: use asynchronous comprehensions when supported payload = [] - for item in request: - item_payload = await self.single_request_payload(item) - if item_payload: - payload.append(item_payload) + for message in batch: + message_payload = await self.single_payload(message) + if message_payload: + payload.append(message_payload) return payload - async def single_request_payload(self, request): - '''Return the JSON payload corresponding to a single JSON request. + async def single_payload(self, message): + '''Return the JSON payload corresponding to a single JSON request, + response or notification. - Return None if the request is a notification. + Return None if the request is a notification or a response. ''' - if not isinstance(request, dict): + if not isinstance(message, dict): return json_error_payload('request must be a dict', self.INVALID_REQUEST) - id_ = request.get('id') + if not 'id' in message: + return await self.json_notification(message) + + id_ = message['id'] if not isinstance(id_, self.ID_TYPES): return json_error_payload('invalid id: {}'.format(id_), self.INVALID_REQUEST) - try: - result = await self.method_result(request.get('method'), - request.get('params', [])) - if id_ is None: - return None - return json_result_payload(result, id_) - except self.RPCError as e: - if id_ is None: - return None - return json_error_payload(e.msg, e.code, id_) + if 'method' in message: + return await self.json_request(message) + + return await self.json_response(message) + + def method_and_params(self, message): + method = message.get('method') + params = message.get('params', []) - async def method_result(self, method, params): if not isinstance(method, str): raise self.RPCError('invalid method: {}'.format(method), self.INVALID_REQUEST) @@ -229,17 +242,46 @@ class JSONRPC(asyncio.Protocol, LoggedClass): raise self.RPCError('params should be an array', self.INVALID_REQUEST) - handler = self.method_handler(method) - if not handler: - raise self.RPCError('unknown method: "{}"'.format(method), - self.METHOD_NOT_FOUND) - - return await handler(params) + return method, params - def on_json_request(self, request): - raise NotImplementedError('on_json_request in class {}'. - format(self.__class__.__name__)) + async def json_notification(self, message): + try: + method, params = self.method_and_params(message) + except RCPError: + pass + else: + self.handle_notification(method, params) + return None - def method_handler(self, method): - raise NotImplementedError('method_handler in class {}'. - format(self.__class__.__name__)) + async def json_request(self, message): + try: + method, params = self.method_and_params(message) + result = await self.handle_request(method, params) + return json_response_payload(result, message['id']) + except self.RPCError as e: + return json_error_payload(e.msg, e.code, message['id']) + + async def json_response(self, message): + # Only one of result and error should exist; we go with 'error' + # if both are supplied. + if 'error' in message: + await self.handle_response(None, message['error'], message['id']) + elif 'result' in message: + await self.handle_response(message['result'], None, message['id']) + return None + + def raise_unknown_method(method): + '''Respond to a request with an unknown method.''' + raise self.RPCError('unknown method: "{}"'.format(method), + self.METHOD_NOT_FOUND) + + # --- derived classes are intended to override these functions + async def handle_notification(self, method, params): + '''Handle a notification.''' + + async def handle_request(self, method, params): + '''Handle a request.''' + return None + + async def handle_response(self, result, error, id_): + '''Handle a response.''' diff --git a/server/protocol.py b/server/protocol.py index dcb9e63..9af9e06 100644 --- a/server/protocol.py +++ b/server/protocol.py @@ -309,7 +309,7 @@ class ServerManager(util.LoggedClass): for session in self.sessions: if isinstance(session, ElectrumX): # Use a tuple to distinguish from JSON - session.jobs.put_nowait((self.bp.height, touched, cache)) + session.messages.put_nowait((self.bp.height, touched, cache)) async def shutdown(self): '''Call to shutdown the servers. Returns when done.''' @@ -420,7 +420,6 @@ class Session(JSONRPC): self.daemon = bp.daemon self.coin = bp.coin self.kind = kind - self.jobs = asyncio.Queue() self.client = 'unknown' def connection_made(self, transport): @@ -438,29 +437,30 @@ class Session(JSONRPC): self.send_count, self.error_count)) self.manager.remove_session(self) - def method_handler(self, method): - '''Return the handler that will handle the RPC method.''' - return self.handlers.get(method) + async def handle_request(self, method, params): + '''Handle a request.''' + handler = self.handlers.get(method) + if not handler: + self.raise_unknown_method(method) - def on_json_request(self, request): - '''Queue the request for asynchronous handling.''' - self.jobs.put_nowait(request) + return await handler(params) async def serve_requests(self): '''Asynchronously run through the task queue.''' while True: await asyncio.sleep(0) - job = await self.jobs.get() + message = await self.messages.get() try: - if isinstance(job, tuple): # Height / mempool notification - await self.notify(*job) + # Height / mempool notification? + if isinstance(message, tuple): + await self.notify(*message) else: - await self.handle_json_request(job) + await self.handle_message(message) except asyncio.CancelledError: break except Exception: # Getting here should probably be considered a bug and fixed - self.logger.error('error handling request {}'.format(job)) + self.logger.error('error handling request {}'.format(message)) traceback.print_exc() def peername(self, *, for_log=True): diff --git a/server/version.py b/server/version.py index a659742..e2b598b 100644 --- a/server/version.py +++ b/server/version.py @@ -1 +1 @@ -VERSION = "ElectrumX 0.7.8" +VERSION = "ElectrumX 0.7.9"