Browse Source

Merge branch 'release-0.7.9'

master 0.7.9
Neil Booth 8 years ago
parent
commit
b6de915890
  1. 6
      docs/RELEASE-NOTES
  2. 50
      electrumx_rpc.py
  3. 162
      lib/jsonrpc.py
  4. 26
      server/protocol.py
  5. 2
      server/version.py

6
docs/RELEASE-NOTES

@ -1,3 +1,9 @@
version 0.7.9
-------------
- rewrite jsonrpc.py to also operate as a client. Use this class
for a robust electrumx_rpc.py. Fixes issue #43
version 0.7.8
-------------

50
electrumx_rpc.py

@ -16,24 +16,30 @@ import json
from functools import partial
from os import environ
from lib.jsonrpc import JSONRPC
class RPCClient(asyncio.Protocol):
def __init__(self, loop):
self.loop = loop
self.method = None
class RPCClient(JSONRPC):
def connection_made(self, transport):
self.transport = transport
async def send_and_wait(self, method, params, timeout=None):
self.send_json_request(method, id_=method, params=params)
def connection_lost(self, exc):
self.loop.stop()
future = asyncio.ensure_future(self.messages.get())
for f in asyncio.as_completed([future], timeout=timeout):
try:
message = await f
except asyncio.TimeoutError:
future.cancel()
print('request timed out')
else:
await self.handle_message(message)
def send(self, method, params):
self.method = method
payload = {'method': method, 'params': params, 'id': 'RPC'}
data = json.dumps(payload) + '\n'
self.transport.write(data.encode())
async def handle_response(self, result, error, method):
if result and method == 'sessions':
self.print_sessions(result)
else:
value = {'error': error} if error else result
print(json.dumps(value, indent=4, sort_keys=True))
def print_sessions(self, result):
def data_fmt(count, size):
@ -58,17 +64,6 @@ class RPCClient(asyncio.Protocol):
'{:,d}'.format(error_count),
time_fmt(time)))
def data_received(self, data):
payload = json.loads(data.decode())
self.transport.close()
result = payload['result']
error = payload['error']
if not error and self.method == 'sessions':
self.print_sessions(result)
else:
value = {'error': error} if error else result
print(json.dumps(value, indent=4, sort_keys=True))
def main():
'''Send the RPC command to the server and print the result.'''
parser = argparse.ArgumentParser('Send electrumx an RPC command' )
@ -84,12 +79,11 @@ def main():
args.port = int(environ.get('ELECTRUMX_RPC_PORT', 8000))
loop = asyncio.get_event_loop()
proto_factory = partial(RPCClient, loop)
coro = loop.create_connection(proto_factory, 'localhost', args.port)
coro = loop.create_connection(RPCClient, 'localhost', args.port)
try:
transport, protocol = loop.run_until_complete(coro)
protocol.send(args.command[0], args.param)
loop.run_forever()
coro = protocol.send_and_wait(args.command[0], args.param, timeout=5)
loop.run_until_complete(coro)
except OSError:
print('error connecting - is ElectrumX catching up or not running?')
finally:

162
lib/jsonrpc.py

@ -15,38 +15,45 @@ import time
from lib.util import LoggedClass
def json_result_payload(result, id_):
def json_response_payload(result, id_):
# We should not respond to notifications
assert id_ is not None
return {'jsonrpc': '2.0', 'error': None, 'result': result, 'id': id_}
return {'jsonrpc': '2.0', 'result': result, 'id': id_}
def json_error_payload(message, code, id_=None):
error = {'message': message, 'code': code}
return {'jsonrpc': '2.0', 'error': error, 'result': None, 'id': id_}
return {'jsonrpc': '2.0', 'error': error, 'id': id_}
def json_notification_payload(method, params):
return {'jsonrpc': '2.0', 'id': None, 'method': method, 'params': params}
def json_request_payload(method, id_, params=None):
payload = {'jsonrpc': '2.0', 'id': id_, 'method': method}
if params:
payload['params'] = params
return payload
def json_notification_payload(method, params=None):
return json_request_payload(method, None, params)
class JSONRPC(asyncio.Protocol, LoggedClass):
'''Manages a JSONRPC connection.
Assumes JSON messages are newline-separated and that newlines
cannot appear in the JSON other than to separate lines.
Derived classes need to implement the synchronous functions
on_json_request() and method_handler(). They probably also want
to override connection_made() and connection_lost() but should be
sure to call the implementation in this base class first.
on_json_request() is passed a JSON request as a python object
after decoding. It should arrange to pass on to the asynchronous
handle_json_request() method.
cannot appear in the JSON other than to separate lines. Incoming
messages are queued on the messages queue for later asynchronous
processing, and should be passed to the handle_message() function.
Derived classes may want to override connection_made() and
connection_lost() but should be sure to call the implementation in
this base class first. They will also want to implement some or
all of the asynchronous functions handle_notification(),
handle_response() and handle_request().
handle_request() returns the result to pass over the network, and
must raise an RPCError if there is an error.
handle_notification() and handle_response() should not return
anything or raise any exceptions. All three functions have
default "ignore" implementations supplied by this class.
method_handler() takes a method string and should return a function
that can be passed a parameters array, or None for an unknown method.
Handlers should raise an RPCError on error.
'''
# See http://www.jsonrpc.org/specification
@ -54,7 +61,7 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
INVALID_REQUEST = -32600
METHOD_NOT_FOUND = -32601
INVALID_PARAMS = -32602
INTERAL_ERROR = -32603
INTERNAL_ERROR = -32603
ID_TYPES = (type(None), str, numbers.Number)
@ -65,7 +72,6 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
self.msg = msg
self.code = code
def __init__(self):
super().__init__()
self.start = time.time()
@ -80,6 +86,7 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
self.send_size = 0
self.error_count = 0
self.peer_info = None
self.messages = asyncio.Queue()
def connection_made(self, transport):
'''Handle an incoming client connection.'''
@ -132,15 +139,20 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
self.transport.close()
return
self.on_json_request(message)
'''Queue the request for asynchronous handling.'''
self.messages.put_nowait(message)
def send_json_notification(self, method, params):
'''Create a json notification.'''
self.send_json(json_notification_payload(method, params))
def send_json_result(self, result, id_):
def send_json_request(self, method, id_, params=None):
'''Send a JSON request.'''
self.send_json(json_request_payload(method, id_, params))
def send_json_response(self, result, id_):
'''Send a JSON result.'''
self.send_json(json_result_payload(result, id_))
self.send_json(json_response_payload(result, id_))
def send_json_error(self, message, code, id_=None):
'''Send a JSON error.'''
@ -167,20 +179,20 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
self.send_size += len(data)
self.transport.write(data)
async def handle_json_request(self, request):
'''Asynchronously handle a JSON request.
async def handle_message(self, message):
'''Asynchronously handle a JSON request or response.
Handles batch requests.
Handles batches according to the JSON 2.0 spec.
'''
if isinstance(request, list):
payload = await self.batch_request_payload(request)
if isinstance(message, list):
payload = await self.batch_payload(message)
else:
payload = await self.single_request_payload(request)
payload = await self.single_payload(message)
if payload:
self.send_json(payload)
async def batch_request_payload(self, batch):
async def batch_payload(self, batch):
'''Return the JSON payload corresponding to a batch JSON request.'''
# Batches must have at least one request.
if not batch:
@ -189,38 +201,39 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
# PYTHON 3.6: use asynchronous comprehensions when supported
payload = []
for item in request:
item_payload = await self.single_request_payload(item)
if item_payload:
payload.append(item_payload)
for message in batch:
message_payload = await self.single_payload(message)
if message_payload:
payload.append(message_payload)
return payload
async def single_request_payload(self, request):
'''Return the JSON payload corresponding to a single JSON request.
async def single_payload(self, message):
'''Return the JSON payload corresponding to a single JSON request,
response or notification.
Return None if the request is a notification.
Return None if the request is a notification or a response.
'''
if not isinstance(request, dict):
if not isinstance(message, dict):
return json_error_payload('request must be a dict',
self.INVALID_REQUEST)
id_ = request.get('id')
if not 'id' in message:
return await self.json_notification(message)
id_ = message['id']
if not isinstance(id_, self.ID_TYPES):
return json_error_payload('invalid id: {}'.format(id_),
self.INVALID_REQUEST)
try:
result = await self.method_result(request.get('method'),
request.get('params', []))
if id_ is None:
return None
return json_result_payload(result, id_)
except self.RPCError as e:
if id_ is None:
return None
return json_error_payload(e.msg, e.code, id_)
if 'method' in message:
return await self.json_request(message)
return await self.json_response(message)
def method_and_params(self, message):
method = message.get('method')
params = message.get('params', [])
async def method_result(self, method, params):
if not isinstance(method, str):
raise self.RPCError('invalid method: {}'.format(method),
self.INVALID_REQUEST)
@ -229,17 +242,46 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
raise self.RPCError('params should be an array',
self.INVALID_REQUEST)
handler = self.method_handler(method)
if not handler:
return method, params
async def json_notification(self, message):
try:
method, params = self.method_and_params(message)
except RCPError:
pass
else:
self.handle_notification(method, params)
return None
async def json_request(self, message):
try:
method, params = self.method_and_params(message)
result = await self.handle_request(method, params)
return json_response_payload(result, message['id'])
except self.RPCError as e:
return json_error_payload(e.msg, e.code, message['id'])
async def json_response(self, message):
# Only one of result and error should exist; we go with 'error'
# if both are supplied.
if 'error' in message:
await self.handle_response(None, message['error'], message['id'])
elif 'result' in message:
await self.handle_response(message['result'], None, message['id'])
return None
def raise_unknown_method(method):
'''Respond to a request with an unknown method.'''
raise self.RPCError('unknown method: "{}"'.format(method),
self.METHOD_NOT_FOUND)
return await handler(params)
# --- derived classes are intended to override these functions
async def handle_notification(self, method, params):
'''Handle a notification.'''
def on_json_request(self, request):
raise NotImplementedError('on_json_request in class {}'.
format(self.__class__.__name__))
async def handle_request(self, method, params):
'''Handle a request.'''
return None
def method_handler(self, method):
raise NotImplementedError('method_handler in class {}'.
format(self.__class__.__name__))
async def handle_response(self, result, error, id_):
'''Handle a response.'''

26
server/protocol.py

@ -309,7 +309,7 @@ class ServerManager(util.LoggedClass):
for session in self.sessions:
if isinstance(session, ElectrumX):
# Use a tuple to distinguish from JSON
session.jobs.put_nowait((self.bp.height, touched, cache))
session.messages.put_nowait((self.bp.height, touched, cache))
async def shutdown(self):
'''Call to shutdown the servers. Returns when done.'''
@ -420,7 +420,6 @@ class Session(JSONRPC):
self.daemon = bp.daemon
self.coin = bp.coin
self.kind = kind
self.jobs = asyncio.Queue()
self.client = 'unknown'
def connection_made(self, transport):
@ -438,29 +437,30 @@ class Session(JSONRPC):
self.send_count, self.error_count))
self.manager.remove_session(self)
def method_handler(self, method):
'''Return the handler that will handle the RPC method.'''
return self.handlers.get(method)
async def handle_request(self, method, params):
'''Handle a request.'''
handler = self.handlers.get(method)
if not handler:
self.raise_unknown_method(method)
def on_json_request(self, request):
'''Queue the request for asynchronous handling.'''
self.jobs.put_nowait(request)
return await handler(params)
async def serve_requests(self):
'''Asynchronously run through the task queue.'''
while True:
await asyncio.sleep(0)
job = await self.jobs.get()
message = await self.messages.get()
try:
if isinstance(job, tuple): # Height / mempool notification
await self.notify(*job)
# Height / mempool notification?
if isinstance(message, tuple):
await self.notify(*message)
else:
await self.handle_json_request(job)
await self.handle_message(message)
except asyncio.CancelledError:
break
except Exception:
# Getting here should probably be considered a bug and fixed
self.logger.error('error handling request {}'.format(job))
self.logger.error('error handling request {}'.format(message))
traceback.print_exc()
def peername(self, *, for_log=True):

2
server/version.py

@ -1 +1 @@
VERSION = "ElectrumX 0.7.8"
VERSION = "ElectrumX 0.7.9"

Loading…
Cancel
Save