Browse Source

Rework the JSON layer

Batch requests are now processed and encoded incrementally.
Their bandwidth usage is also incrementally added, and so overlarge
responses are rejected as soon as they become too large.
JSON text is also more memory efficient than the python data
structures they represent.
master
Neil Booth 8 years ago
parent
commit
f904ce2f5d
  1. 213
      lib/jsonrpc.py
  2. 6
      server/protocol.py

213
lib/jsonrpc.py

@ -33,6 +33,9 @@ def json_request_payload(method, id_, params=None):
def json_notification_payload(method, params=None):
return json_request_payload(method, None, params)
def json_payload_id(payload):
return payload.get('id') if isinstance(payload, dict) else None
class JSONRPC(asyncio.Protocol, LoggedClass):
'''Manages a JSONRPC connection.
@ -179,16 +182,14 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
message = message.decode()
except UnicodeDecodeError as e:
msg = 'cannot decode binary bytes: {}'.format(e)
self.send_json_error(msg, self.PARSE_ERROR)
self.transport.close()
self.send_json_error(msg, self.INVALID_REQUEST)
return
try:
message = json.loads(message)
except json.JSONDecodeError as e:
msg = 'cannot decode JSON: {}'.format(e)
self.send_json_error(msg, self.PARSE_ERROR)
self.transport.close()
self.send_json_error(msg, self.INVALID_REQUEST)
return
'''Queue the request for asynchronous handling.'''
@ -196,118 +197,129 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
if self.log_me:
self.log_info('queued {}'.format(message))
def send_json_notification(self, method, params):
'''Create a json notification.'''
self.send_json(json_notification_payload(method, params))
def send_json_request(self, method, id_, params=None):
'''Send a JSON request.'''
self.send_json(json_request_payload(method, id_, params))
def send_json_response(self, result, id_):
'''Send a JSON result.'''
self.send_json(json_response_payload(result, id_))
def encode_payload(self, payload):
try:
text = (json.dumps(payload) + '\n').encode()
except TypeError:
msg = 'JSON encoding failure: {}'.format(payload)
self.log_error(msg)
return self.json_error(msg, self.INTERNAL_ERROR,
json_payload_id(payload))
def send_json_error(self, message, code, id_=None):
'''Send a JSON error.'''
self.send_json(json_error_payload(message, code, id_))
self.check_oversized_request(len(text))
if 'error' in payload:
self.error_count += 1
# Close abusive clients
if self.error_count >= 10:
self.transport.close()
self.send_count += 1
self.send_size += len(text)
self.using_bandwidth(len(text))
return text
def send_json(self, payload):
'''Send a JSON payload.'''
def send_text(self, text, close):
'''Send JSON text over the transport. Close it if close is True.'''
# Confirmed this happens, sometimes a lot
if self.transport.is_closing():
return
self.transport.write(text)
if close:
self.transport.close()
id_ = payload.get('id') if isinstance(payload, dict) else None
try:
data = (json.dumps(payload) + '\n').encode()
except TypeError:
msg = 'JSON encoding failure: {}'.format(payload)
self.log_error(msg)
self.send_json_error(msg, self.INTERNAL_ERROR, id_)
else:
if len(data) > max(1000, self.max_send):
self.send_json_error('request too large',
self.INVALID_REQUEST, id_)
raise self.LargeRequestError
else:
self.send_count += 1
self.send_size += len(data)
self.using_bandwidth(len(data))
self.transport.write(data)
def send_json_error(self, message, code, id_=None, close=True):
'''Send a JSON error and close the connection by default.'''
self.send_text(self.json_error_text(message, code, id_), close)
def encode_and_send_payload(self, payload):
'''Encode the payload and send it.'''
self.send_text(self.encode_payload(payload), False)
async def handle_message(self, message):
def json_notification_text(self, method, params):
'''Return the text of a json notification.'''
return self.encode_payload(json_notification_payload(method, params))
def json_request_text(self, method, id_, params=None):
'''Return the text of a JSON request.'''
return self.encode_payload(json_request_payload(method, params))
def json_response_text(self, result, id_):
'''Return the text of a JSON response.'''
return self.encode_payload(json_response_payload(result, id_))
def json_error_text(self, message, code, id_=None):
'''Return the text of a JSON error.'''
return self.encode_payload(json_error_payload(message, code, id_))
async def handle_message(self, payload):
'''Asynchronously handle a JSON request or response.
Handles batches according to the JSON 2.0 spec.
'''
# Throttle high-bandwidth connections by delaying processing
# their requests. Delay more the higher the excessive usage.
excess = self.bandwidth_used - self.bandwidth_limit
if excess > 0:
secs = 1 + excess // self.bandwidth_limit
self.log_warning('high bandwidth use of {:,d} bytes, '
'sleeping {:d}s'
.format(self.bandwidth_used, secs))
await asyncio.sleep(secs)
if isinstance(message, list):
payload = await self.batch_payload(message)
try:
if isinstance(payload, list):
text = await self.process_json_batch(payload)
else:
payload = await self.single_payload(message)
text = await self.process_single_json(payload)
except self.RPCError as e:
text = self.json_error_text(e.msg, e.code,
json_payload_id(payload))
if payload:
try:
self.send_json(payload)
except self.LargeRequestError:
self.log_warning('blocked large request {}'.format(message))
if text:
self.send_text(text, self.error_count > 10)
async def batch_payload(self, batch):
'''Return the JSON payload corresponding to a batch JSON request.'''
async def process_json_batch(self, batch):
'''Return the text response to a JSON batch request.'''
# Batches must have at least one request.
if not batch:
return json_error_payload('empty request list',
self.INVALID_REQUEST)
return self.json_error_text('empty batch', self.INVALID_REQUEST)
# PYTHON 3.6: use asynchronous comprehensions when supported
payload = []
for message in batch:
message_payload = await self.single_payload(message)
if message_payload:
payload.append(message_payload)
return payload
async def single_payload(self, message):
'''Return the JSON payload corresponding to a single JSON request,
response or notification.
parts = []
total_len = 0
for item in batch:
part = await self.process_single_json(item)
if part:
parts.append(part)
total_len += len(part) + 2
self.check_oversized_request(total_len)
if parts:
return '{' + ', '.join(parts) + '}'
return ''
async def process_single_json(self, payload):
'''Return the JSON result of a single JSON request, response or
notification.
Return None if the request is a notification or a response.
'''
if not isinstance(message, dict):
return json_error_payload('request must be a dict',
# Throttle high-bandwidth connections by delaying processing
# their requests. Delay more the higher the excessive usage.
excess = self.bandwidth_used - self.bandwidth_limit
if excess > 0:
secs = 1 + excess // self.bandwidth_limit
self.log_warning('high bandwidth use of {:,d} bytes, '
'sleeping {:d}s'
.format(self.bandwidth_used, secs))
await asyncio.sleep(secs)
if not isinstance(payload, dict):
return self.json_error_text('request must be a dict',
self.INVALID_REQUEST)
if not 'id' in message:
return await self.json_notification(message)
if not 'id' in payload:
return await self.process_json_notification(payload)
id_ = message['id']
id_ = payload['id']
if not isinstance(id_, self.ID_TYPES):
return json_error_payload('invalid id: {}'.format(id_),
return self.json_error_text('invalid id: {}'.format(id_),
self.INVALID_REQUEST)
if 'method' in message:
return await self.json_request(message)
if 'method' in payload:
return await self.process_json_request(payload)
return await self.json_response(message)
return await self.process_json_response(payload)
@classmethod
def method_and_params(cls, message):
method = message.get('method')
params = message.get('params', [])
def method_and_params(cls, payload):
method = payload.get('method')
params = payload.get('params', [])
if not isinstance(method, str):
raise cls.RPCError('invalid method: {}'.format(method),
@ -319,31 +331,32 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
return method, params
async def json_notification(self, message):
async def process_json_notification(self, payload):
try:
method, params = self.method_and_params(message)
method, params = self.method_and_params(payload)
except self.RPCError:
pass
else:
await self.handle_notification(method, params)
return None
return ''
async def json_request(self, message):
try:
method, params = self.method_and_params(message)
async def process_json_request(self, payload):
method, params = self.method_and_params(payload)
result = await self.handle_request(method, params)
return json_response_payload(result, message['id'])
except self.RPCError as e:
return json_error_payload(e.msg, e.code, message['id'])
return self.json_response_text(result, payload['id'])
async def json_response(self, message):
async def process_json_response(self, payload):
# Only one of result and error should exist; we go with 'error'
# if both are supplied.
if 'error' in message:
await self.handle_response(None, message['error'], message['id'])
elif 'result' in message:
await self.handle_response(message['result'], None, message['id'])
return None
if 'error' in payload:
await self.handle_response(None, payload['error'], payload['id'])
elif 'result' in payload:
await self.handle_response(payload['result'], None, payload['id'])
return ''
def check_oversized_request(self, total_len):
if total_len > max(1000, self.max_send):
raise self.RPCError('request too large', self.INVALID_REQUEST)
def raise_unknown_method(self, method):
'''Respond to a request with an unknown method.'''

6
server/protocol.py

@ -692,14 +692,14 @@ class ElectrumX(Session):
'blockchain.headers.subscribe',
(self.electrum_header(height), ),
)
self.send_json(cache[key])
self.encode_and_send_payload(cache[key])
if self.subscribe_height:
payload = json_notification_payload(
'blockchain.numblocks.subscribe',
(height, ),
)
self.send_json(payload)
self.encode_and_send_payload(payload)
hash168_to_address = self.coin.hash168_to_address
matches = self.hash168s.intersection(touched)
@ -708,7 +708,7 @@ class ElectrumX(Session):
status = await self.address_status(hash168)
payload = json_notification_payload(
'blockchain.address.subscribe', (address, status))
self.send_json(payload)
self.encode_and_send_payload(payload)
if matches:
self.log_info('notified of {:,d} addresses'.format(len(matches)))

Loading…
Cancel
Save