@ -1,21 +1,26 @@
# Copyright (c) 2016, Neil Booth
# Copyright (c) 2016-2017 , Neil Booth
#
# All rights reserved.
#
# See the file "LICENCE" for information about the copyright
# and warranty status of this software.
''' Class for handling JSON RPC 2.0 connections, server or client. '''
''' Classes for acting as a peer over a transport and speaking the JSON
RPC versions 1.0 and 2.0 .
JSONSessionBase can use an arbitrary transport .
JSONSession integrates asyncio . Protocol to provide the transport .
'''
import asyncio
import collections
import inspect
import json
import numbers
import time
import traceback
from functools import partial
from lib . util import LoggedClass
import lib . util as util
class RPCError ( Exception ) :
@ -26,307 +31,310 @@ class RPCError(Exception):
self . code = code
class RequestBase ( object ) :
''' An object that represents a queued request .'''
class JSONRPC ( object ) :
''' Base class of JSON RPC versions .'''
def __init__ ( self , remaining ) :
self . remaining = remaining
# See http://www.jsonrpc.org/specification
PARSE_ERROR = - 32700
INVALID_REQUEST = - 32600
METHOD_NOT_FOUND = - 32601
INVALID_ARGS = - 32602
INTERNAL_ERROR = - 32603
ID_TYPES = ( type ( None ) , str , numbers . Number )
HAS_BATCHES = False
class SingleRequest ( RequestBase ) :
''' An object that represents a single request. '''
def __init__ ( self , payload ) :
super ( ) . __init__ ( 1 )
self . payload = payload
class JSONRPCv1 ( JSONRPC ) :
''' JSON RPC version 1.0. '''
async def process ( self , session ) :
''' Asynchronously handle the JSON request. '''
self . remaining = 0
binary = await session . process_single_payload ( self . payload )
if binary :
session . _send_bytes ( binary )
@classmethod
def request_payload ( cls , id_ , method , params = None ) :
''' JSON v1 request payload. Params is mandatory. '''
return { ' method ' : method , ' params ' : params or [ ] , ' id ' : id_ }
def __str__ ( self ) :
return str ( self . payload )
@classmethod
def notification_payload ( cls , method , params = None ) :
''' JSON v1 notification payload. Params and id are mandatory. '''
return { ' method ' : method , ' params ' : params or [ ] , ' id ' : None }
@classmethod
def response_payload ( cls , result , id_ ) :
''' JSON v1 response payload. error is present and None. '''
return { ' id ' : id_ , ' result ' : result , ' error ' : None }
class BatchRequest ( RequestBase ) :
''' An object that represents a batch request and its processing state.
@classmethod
def error_payload ( cls , message , code , id_ ) :
''' JSON v1 error payload. result is present and None. '''
return { ' id ' : id_ , ' result ' : None ,
' error ' : { ' message ' : message , ' code ' : code } }
Batches are processed in chunks .
'''
@classmethod
def handle_response ( cls , handler , payload ) :
''' JSON v1 response handler. Both ' error ' and ' response '
should exist with exactly one being None .
'''
try :
result = payload [ ' result ' ]
error = payload [ ' error ' ]
except KeyError :
pass
else :
if ( result is None ) != ( error is None ) :
handler ( result , error )
def __init__ ( self , payload ) :
super ( ) . __init__ ( len ( payload ) )
self . payload = payload
self . parts = [ ]
@classmethod
def is_request ( cls , payload ) :
''' Returns True if the payload (which has a method) is a request.
False means it is a notification . '''
return payload . get ( ' id ' ) != None
async def process ( self , session ) :
''' Asynchronously handle the JSON batch according to the JSON 2.0
spec . '''
target = max ( self . remaining - 4 , 0 )
while self . remaining > target :
item = self . payload [ len ( self . payload ) - self . remaining ]
self . remaining - = 1
part = await session . process_single_payload ( item )
if part :
self . parts . append ( part )
total_len = sum ( len ( part ) + 2 for part in self . parts )
if session . is_oversized_request ( total_len ) :
raise RPCError ( ' request too large ' , JSONRPC . INVALID_REQUEST )
if not self . remaining :
if self . parts :
binary = b ' [ ' + b ' , ' . join ( self . parts ) + b ' ] '
session . _send_bytes ( binary )
def __str__ ( self ) :
return str ( self . payload )
class JSONRPC ( asyncio . Protocol , LoggedClass ) :
''' Manages a JSONRPC connection.
Assumes JSON messages are newline - separated and that newlines
cannot appear in the JSON other than to separate lines . Incoming
requests are passed to enqueue_request ( ) , which should arrange for
their asynchronous handling via the request ' s process() method.
Derived classes may want to override connection_made ( ) and
connection_lost ( ) but should be sure to call the implementation in
this base class first . They may also want to implement the asynchronous
function handle_response ( ) which by default does nothing .
The functions request_handler ( ) and notification_handler ( ) are
passed an RPC method name , and should return an asynchronous
function to call to handle it . The functions ' docstrings are used
for help , and the arguments are what can be used as JSONRPC 2.0
named arguments ( and thus become part of the external interface ) .
If the method is unknown return None .
Request handlers should return a Python object to return to the
caller , or raise an RPCError on error . Notification handlers
should not return a value or raise any exceptions .
'''
# See http://www.jsonrpc.org/specification
PARSE_ERROR = - 32700
INVALID_REQUEST = - 32600
METHOD_NOT_FOUND = - 32601
INVALID_ARGS = - 32602
INTERNAL_ERROR = - 32603
class JSONRPCv2 ( JSONRPC ) :
''' JSON RPC version 2.0. '''
ID_TYPES = ( type ( None ) , str , numbers . Number )
NEXT_SESSION_ID = 0
HAS_BATCHES = True
@classmethod
def request_payload ( cls , id_ , method , params = None ) :
payload = { ' jsonrpc ' : ' 2.0 ' , ' id ' : id_ , ' method ' : method }
''' JSON v2 request payload. Params is optional. '''
payload = { ' jsonrpc ' : ' 2.0 ' , ' method ' : method , ' id ' : id_ }
if params :
payload [ ' params ' ] = params
return payload
@classmethod
def notification_payload ( cls , method , params = None ) :
''' JSON v2 notification payload. There must be no id. '''
payload = { ' jsonrpc ' : ' 2.0 ' , ' method ' : method }
if params :
payload [ ' params ' ] = params
return payload
@classmethod
def response_payload ( cls , result , id_ ) :
return { ' jsonrpc ' : ' 2.0 ' , ' result ' : result , ' id ' : id_ }
''' JSON v2 response payload. error is not present. '''
return { ' jsonrpc ' : ' 2.0 ' , ' id ' : id_ , ' result ' : result }
@classmethod
def error_payload ( cls , message , code , id_ = None ) :
error = { ' message ' : message , ' code ' : code }
return { ' jsonrpc ' : ' 2.0 ' , ' error ' : error , ' id ' : id_ }
def error_payload ( cls , message , code , id_ ) :
''' JSON v2 error payload. result is not present. '''
return { ' jsonrpc ' : ' 2.0 ' , ' id ' : id_ ,
' error ' : { ' message ' : message , ' code ' : code } }
@classmethod
def check_payload_id ( cls , payload ) :
''' Extract and return the ID from the payload.
def handle_response ( cls , handler , payload ) :
''' JSON v2 response handler. Exactly one of ' error ' and ' response '
must exist . Errors must have ' code ' and ' message ' members .
'''
if ( ' error ' in payload ) != ( ' result ' in payload ) :
if ' result ' in payload :
handler ( payload [ ' result ' ] , None )
else :
error = payload [ ' error ' ]
if ( isinstance ( error , dict ) and ' code ' in error
and ' message ' in error ) :
handler ( None , error )
Raises an RPCError if it is missing or invalid . '''
if not ' id ' in payload :
raise RPCError ( ' missing id ' , JSONRPC . INVALID_REQUEST )
@classmethod
def batch_size ( cls , parts ) :
''' Return the size of a JSON batch from its parts. '''
return sum ( len ( part ) for part in parts ) + 2 * len ( parts )
id_ = payload [ ' id ' ]
if not isinstance ( id_ , JSONRPC . ID_TYPES ) :
raise RPCError ( ' invalid id: {} ' . format ( id_ ) ,
JSONRPC . INVALID_REQUEST )
return id_
@classmethod
def batch_bytes ( cls , parts ) :
''' Return the bytes of a JSON batch from its parts. '''
if parts :
return b ' [ ' + b ' , ' . join ( parts ) + b ' ] '
return b ' '
@classmethod
def payload_id ( cls , payload ) :
''' Extract and return the ID from the payload.
def is_request ( cls , payload ) :
''' Returns True if the payload (which has a method) is a request.
False means it is a notification . '''
return ' id ' in payload
Returns None if it is missing or invalid . '''
try :
return cls . check_payload_id ( payload )
except RPCError :
class JSONRPCCompat ( JSONRPC ) :
''' Intended to be used until receiving a response from the peer, at
which point detect_version should be used to choose which version
to use .
Sends requests compatible with v1 and v2 . Errors cannot be
compatible so v2 errors are sent .
Does not send responses or notifications , nor handle responses .
'''
@classmethod
def request_payload ( cls , id_ , method , params = None ) :
''' JSON v2 request payload but with params present. '''
return { ' jsonrpc ' : ' 2.0 ' , ' id ' : id_ ,
' method ' : method , ' params ' : params or [ ] }
@classmethod
def error_payload ( cls , message , code , id_ ) :
''' JSON v2 error payload. result is not present. '''
return { ' jsonrpc ' : ' 2.0 ' , ' id ' : id_ ,
' error ' : { ' message ' : message , ' code ' : code } }
@classmethod
def detect_version ( cls , payload ) :
''' Return a best guess at a version compatible with the received
payload .
Return None if one cannot be determined .
'''
def item_version ( item ) :
if isinstance ( item , dict ) :
version = item . get ( ' jsonrpc ' )
if version is None :
return JSONRPCv1
if version == ' 2.0 ' :
return JSONRPCv2
return None
def __init__ ( self ) :
if isinstance ( payload , list ) and payload :
version = item_version ( payload [ 0 ] )
# If a batch return at least JSONRPCv2
if version in ( JSONRPCv1 , None ) :
version = JSONRPCv2
else :
version = item_version ( payload )
return version
class JSONSessionBase ( util . LoggedClass ) :
''' Acts as the application layer session, communicating via JSON RPC
over an underlying transport .
Processes incoming and sends outgoing requests , notifications and
responses . Incoming messages are queued . When the queue goes
from empty
'''
NEXT_SESSION_ID = 0
@classmethod
def next_session_id ( cls ) :
session_id = cls . NEXT_SESSION_ID
cls . NEXT_SESSION_ID + = 1
return session_id
def __init__ ( self , version = JSONRPCCompat ) :
super ( ) . __init__ ( )
self . send_notification = partial ( self . send_request , None )
self . start = time . time ( )
self . stop = 0
self . last_recv = self . start
self . bandwidth_start = self . start
self . bandwidth_interval = 3600
self . bandwidth_used = 0
self . bandwidth_limit = 5000000
self . transport = None
self . pause = False
# Parts of an incomplete JSON line. We buffer them until
# getting a newline.
self . parts = [ ]
# recv_count is JSON messages not calls to data_received()
self . recv_count = 0
self . recv_size = 0
self . version = version
self . log_me = False
self . session_id = None
# Count of incoming complete JSON requests and the time of the
# last one. A batch counts as just one here.
self . last_recv = time . time ( )
self . send_count = 0
self . send_size = 0
self . recv_size = 0
self . recv_count = 0
self . error_count = 0
self . close_after_send = False
self . peer_info = None
# Sends longer than max_send are prevented, instead returning
# an oversized request error to other end of the network
# connection. The request causing it is logged. Values under
# 1000 are treated as 1000.
self . max_send = 0
self . pause = False
# Handling of incoming items
self . items = collections . deque ( )
self . batch_results = [ ]
# Handling of outgoing requests
self . next_request_id = 0
self . pending_responses = { }
# If buffered incoming data exceeds this the connection is closed
self . max_buffer_size = 1000000
self . anon_logs = False
self . id_ = JSONRPC . NEXT_SESSION_ID
JSONRPC . NEXT_SESSION_ID + = 1
self . log_prefix = ' [ {:d} ] ' . format ( self . id_ )
self . log_me = False
def peername ( self , * , for_log = True ) :
''' Return the peer name of this connection. '''
if not self . peer_info :
return ' unknown '
if for_log and self . anon_logs :
return ' xx.xx.xx.xx:xx '
if ' : ' in self . peer_info [ 0 ] :
return ' [ {} ]: {} ' . format ( self . peer_info [ 0 ] , self . peer_info [ 1 ] )
else :
return ' {} : {} ' . format ( self . peer_info [ 0 ] , self . peer_info [ 1 ] )
def connection_made ( self , transport ) :
''' Handle an incoming client connection. '''
self . transport = transport
self . peer_info = transport . get_extra_info ( ' peername ' )
transport . set_write_buffer_limits ( high = 500000 )
def connection_lost ( self , exc ) :
''' Handle client disconnection. '''
pass
self . max_send = 50000
self . close_after_send = False
def pause_writing ( self ) :
''' Called by asyncio when the write buffer is full.'''
self . log_info ( ' pausing request processing whilst socket drains ' )
''' Transport calls when the send buffer is full. '''
self . log_info ( ' pausing processing whilst socket drains ' )
self . pause = True
def resume_writing ( self ) :
''' Called by asyncio when the write buffer has room.'''
self . log_info ( ' resuming request processing ' )
''' Transport calls when the send buffer has room. '''
self . log_info ( ' resuming processing ' )
self . pause = False
def close_connection ( self ) :
self . stop = time . time ( )
if self . transport :
self . transport . close ( )
def using_bandwidth ( self , amount ) :
now = time . time ( )
# Reduce the recorded usage in proportion to the elapsed time
elapsed = now - self . bandwidth_start
self . bandwidth_start = now
refund = int ( elapsed / self . bandwidth_interval * self . bandwidth_limit )
refund = min ( refund , self . bandwidth_used )
self . bandwidth_used + = amount - refund
self . throttled = max ( 0 , self . throttled - int ( elapsed ) / / 60 )
def is_oversized ( self , length ) :
''' Return True if the given outgoing message size is too large. '''
if self . max_send and length > max ( 1000 , self . max_send ) :
msg = ' response too large (at least {:d} bytes) ' . format ( length )
return self . error_bytes ( msg , JSONRPC . INVALID_REQUEST ,
payload . get ( ' id ' ) )
return False
def data_received ( self , data ) :
''' Handle incoming data (synchronously) .
def send_binary ( self , binary ) :
''' Pass the bytes through to the transport.
Requests end in newline characters . Pass complete requests to
decode_message for handling .
Close the connection if close_after_send is set .
'''
self . recv_size + = len ( data )
self . using_bandwidth ( len ( data ) )
# Close abusive connections where buffered data exceeds limit
buffer_size = len ( data ) + sum ( len ( part ) for part in self . parts )
if buffer_size > self . max_buffer_size :
self . log_error ( ' read buffer of {:,d} bytes exceeds {:,d} '
' byte limit, closing {} '
. format ( buffer_size , self . max_buffer_size ,
self . peername ( ) ) )
self . close_connection ( )
# Do nothing if this connection is closing
if self . transport . is_closing ( ) :
if self . is_closing ( ) :
return
self . using_bandwidth ( len ( binary ) )
self . send_count + = 1
self . send_size + = len ( binary )
self . send_bytes ( binary )
if self . close_after_send :
self . close_connection ( )
while True :
npos = data . find ( ord ( ' \n ' ) )
if npos == - 1 :
self . parts . append ( data )
break
self . last_recv = time . time ( )
self . recv_count + = 1
tail , data = data [ : npos ] , data [ npos + 1 : ]
parts , self . parts = self . parts , [ ]
parts . append ( tail )
self . decode_message ( b ' ' . join ( parts ) )
def decode_message ( self , message ) :
''' Decode a binary message and queue it for asynchronous handling.
def payload_id ( self , payload ) :
''' Extract and return the ID from the payload.
Messages that cannot be decoded are logged and dropped .
'''
Returns None if it is missing or invalid . '''
try :
message = message . decode ( )
except UnicodeDecodeError as e :
msg = ' cannot decode binary bytes: {} ' . format ( e )
self . send_json_error ( msg , JSONRPC . PARSE_ERROR )
return
return self . check_payload_id ( payload )
except RPCError :
return None
try :
message = json . loads ( message )
except json . JSONDecodeError as e :
msg = ' cannot decode JSON: {} ' . format ( e )
self . send_json_error ( msg , JSONRPC . PARSE_ERROR )
return
def check_payload_id ( self , payload ) :
''' Extract and return the ID from the payload.
if isinstance ( message , list ) :
# Batches must have at least one object.
if not message :
self . send_json_error ( ' empty batch ' , JSONRPC . INVALID_REQUEST )
return
request = BatchRequest ( message )
else :
request = SingleRequest ( message )
Raises an RPCError if it is missing or invalid . '''
if not ' id ' in payload :
raise RPCError ( ' missing id ' , JSONRPC . INVALID_REQUEST )
''' Queue the request for asynchronous handling. '''
self . enqueue_request ( request )
if self . log_me :
self . log_info ( ' queued {} ' . format ( message ) )
id_ = payload [ ' id ' ]
if not isinstance ( id_ , self . version . ID_TYPES ) :
raise RPCError ( ' invalid id type {} ' . format ( type ( id_ ) ) ,
JSONRPC . INVALID_REQUEST )
return id_
def send_json_error ( self , message , code , id_ = None ) :
''' Send a JSON error. '''
self . _send_bytes ( self . json_error_bytes ( message , code , id_ ) )
def request_bytes ( self , id_ , method , params = None ) :
''' Return the bytes of a JSON request. '''
payload = self . version . request_payload ( id_ , method , params )
return self . encode_payload ( payload )
def send_request ( self , id_ , method , params = None ) :
''' Send a request. If id_ is None it is a notification. '''
self . encode_and_send_payload ( self . request_payload ( id_ , method , params ) )
def notification_bytes ( self , method , params = None ) :
payload = self . version . notification_payload ( method , params )
return self . encode_payload ( payload )
def send_notifications ( self , mp_iterable ) :
''' Send an iterable of (method, params) notification pairs.
def response_bytes ( self , result , id_ ) :
''' Return the bytes of a JSON response. '''
return self . encode_payload ( self . version . response_payload ( result , id_ ) )
A 1 - tuple is also valid in which case there are no params . '''
# TODO: maybe send batches if remote side supports it
for pair in mp_iterable :
self . send_notification ( * pair )
def error_bytes ( self , message , code , id_ = None ) :
''' Return the bytes of a JSON error.
Flag the connection to close on a fatal error or too many errors . '''
version = self . version
self . error_count + = 1
if code in ( version . PARSE_ERROR , version . INVALID_REQUEST ) :
self . log_info ( message )
self . close_after_send = True
elif self . error_count > = 10 :
self . log_info ( ' too many errors, last: {} ' . format ( message ) )
self . close_after_send = True
return self . encode_payload ( self . version . error_payload
( message , code , id_ ) )
def encode_payload ( self , payload ) :
''' Encode a Python object as binary bytes. '''
assert isinstance ( payload , dict )
try :
@ -334,86 +342,118 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
except TypeError :
msg = ' JSON encoding failure: {} ' . format ( payload )
self . log_error ( msg )
binary = self . json_ error_bytes( msg , JSONRPC . INTERNAL_ERROR ,
payload . get ( ' id ' ) )
binary = self . error_bytes ( msg , JSONRPC . INTERNAL_ERROR ,
payload . get ( ' id ' ) )
if self . is_oversized_request ( len ( binary ) ) :
binary = self . json_error_bytes ( ' request too large ' ,
JSONRPC . INVALID_REQUEST ,
payload . get ( ' id ' ) )
self . send_count + = 1
self . send_size + = len ( binary )
self . using_bandwidth ( len ( binary ) )
return binary
error_bytes = self . is_oversized ( len ( binary ) )
return error_bytes or binary
def is_oversized_request ( self , total_len ) :
return total_len > max ( 1000 , self . max_send )
def decode_message ( self , payload ) :
''' Decode a binary message and pass it on to process_single_item or
process_batch as appropriate .
def _send_bytes ( self , binary ) :
''' Send JSON text over the transport. Close it if close is True. '''
# Confirmed this happens, sometimes a lot
if self . transport . is_closing ( ) :
Messages that cannot be decoded are logged and dropped .
'''
try :
payload = payload . decode ( )
except UnicodeDecodeError as e :
msg = ' cannot decode message: {} ' . format ( e )
self . send_error ( msg , JSONRPC . PARSE_ERROR )
return
try :
payload = json . loads ( payload )
except json . JSONDecodeError as e :
msg = ' cannot decode JSON: {} ' . format ( e )
self . send_error ( msg , JSONRPC . PARSE_ERROR )
return
self . transport . write ( binary )
self . transport . write ( b ' \n ' )
if self . close_after_send :
self . close_connection ( )
def encode_and_send_payload ( self , payload ) :
''' Encode the payload and send it. '''
self . _send_bytes ( self . encode_payload ( payload ) )
if self . version is JSONRPCCompat :
# Attempt to detect peer's JSON RPC version
version = self . version . detect_version ( payload )
if not version :
version = JSONRPCv2
self . log_info ( ' unable to detect JSON RPC version, using 2.0 ' )
self . version = version
# Batches must have at least one object.
if payload == [ ] and self . version . HAS_BATCHES :
self . send_error ( ' empty batch ' , JSONRPC . INVALID_REQUEST )
return
def json_request_bytes ( self , method , id_ , params = None ) :
''' Return the bytes of a JSON request. '''
return self . encode_payload ( self . request_payload ( method , id_ , params ) )
# Incoming items get queued for later asynchronous processing.
if not self . items :
self . have_pending_items ( )
self . items . append ( payload )
def json_response_bytes ( self , result , id_ ) :
''' Return the bytes of a JSON response. '''
return self . encode_payload ( self . response_payload ( result , id_ ) )
async def process_batch ( self , batch , count ) :
''' Processes count items from the batch according to the JSON 2.0
sp ec .
def json_error_bytes ( self , message , code , id_ = None ) :
''' Return the bytes of a JSON error.
If any remain , puts what is left of the batch back in the deque
and returns None . Otherwise returns the binary batch result . '''
results = self . batch_results
self . batch_results = [ ]
Flag the connection to close on a fatal error or too many errors . '''
self . error_count + = 1
if ( code in ( JSONRPC . PARSE_ERROR , JSONRPC . INVALID_REQUEST )
or self . error_count > 10 ) :
self . close_after_send = True
return self . encode_payload ( self . error_payload ( message , code , id_ ) )
for n in range ( count ) :
item = batch . pop ( )
result = await self . process_single_item ( item )
if result :
results . append ( result )
if not batch :
return self . version . batch_bytes ( results )
error_bytes = self . is_oversized ( self . batch_size ( results ) )
if error_bytes :
return error_bytes
async def process_single_payload ( self , payload ) :
self . items . appendleft ( item )
self . batch_results = results
return None
async def process_single_item ( self , payload ) :
''' Handle a single JSON request, notification or response.
If it is a request , return the binary response , oterhwise None . '''
if self . log_me :
self . log_info ( ' processing {} ' . format ( payload ) )
if not isinstance ( payload , dict ) :
return self . json_error_bytes ( ' request must be a dict ' ,
JSONRPC . INVALID_REQUEST )
# Requests and notifications must have a method.
# Notifications are distinguished by having no 'id'.
if ' method ' in payload :
if ' id ' in payload :
return await self . process_single_request ( payload )
return self . error_bytes ( ' request must be a dictionary ' ,
JSONRPC . INVALID_REQUEST )
try :
# Requests and notifications must have a method.
if ' method ' in payload :
if self . version . is_request ( payload ) :
return await self . process_single_request ( payload )
else :
await self . process_single_notification ( payload )
else :
await self . process_single_notification ( payload )
else :
await self . process_single_response ( payload )
self . process_single_response ( payload )
return None
return None
except asyncio . CancelledError :
raise
except Exception :
self . log_error ( traceback . format_exc ( ) )
return self . error_bytes ( ' internal error processing request ' ,
JSONRPC . INTERNAL_ERROR ,
self . payload_id ( payload ) )
async def process_single_request ( self , payload ) :
''' Handle a single JSON request and return the binary response. '''
try :
result = await self . handle_payload ( payload , self . request_handler )
return self . json_response_bytes ( result , payload [ ' id ' ] )
return self . response_bytes ( result , payload [ ' id ' ] )
except RPCError as e :
return self . json_error_bytes ( e . msg , e . code ,
self . payload_id ( payload ) )
return self . error_bytes ( e . msg , e . code , self . payload_id ( payload ) )
except Exception :
self . log_error ( traceback . format_exc ( ) )
return self . json_ error_bytes( ' internal error processing request ' ,
JSONRPC . INTERNAL_ERROR ,
self . payload_id ( payload ) )
return self . error_bytes ( ' internal error processing request ' ,
JSONRPC . INTERNAL_ERROR ,
self . payload_id ( payload ) )
async def process_single_notification ( self , payload ) :
''' Handle a single JSON notification. '''
@ -424,18 +464,16 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
except Exception :
self . log_error ( traceback . format_exc ( ) )
async def process_single_response ( self , payload ) :
def process_single_response ( self , payload ) :
''' Handle a single JSON response. '''
try :
id_ = self . check_payload_id ( payload )
# Only one of result and error should exist
if ' error ' in payload :
error = payload [ ' error ' ]
if ( not ' result ' in payload and isinstance ( error , dict )
and ' code ' in error and ' message ' in error ) :
await self . handle_response ( None , error , id_ )
elif ' result ' in payload :
await self . handle_response ( payload [ ' result ' ] , None , id_ )
handler = self . pending_responses . pop ( id_ , None )
if handler :
self . version . handle_response ( handler , payload )
else :
self . log_info ( ' response for unsent id {} ' . format ( id_ ) ,
throttle = True )
except RPCError :
pass
except Exception :
@ -448,7 +486,7 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
method = payload . get ( ' method ' )
if not isinstance ( method , str ) :
raise RPCError ( " invalid method: ' {} ' " . format ( method ) ,
raise RPCError ( " invalid method type {} " . format ( type ( method ) ) ,
JSONRPC . INVALID_REQUEST )
handler = get_handler ( method )
@ -457,7 +495,7 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
JSONRPC . METHOD_NOT_FOUND )
if not isinstance ( args , ( list , dict ) ) :
raise RPCError ( ' arguments should be an array or a dict ' ,
raise RPCError ( ' arguments should be an array or dictionary ' ,
JSONRPC . INVALID_REQUEST )
params = inspect . signature ( handler ) . parameters
@ -465,12 +503,13 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
min_args = sum ( p . default is p . empty for p in params . values ( ) )
if len ( args ) < min_args :
raise RPCError ( ' too few arguments: expected {:d} got {:d} '
. format ( min_args , len ( args ) ) , JSONRPC . INVALID_ARGS )
raise RPCError ( ' too few arguments to {} : expected {:d} got {:d} '
. format ( method , min_args , len ( args ) ) ,
JSONRPC . INVALID_ARGS )
if len ( args ) > len ( params ) :
raise RPCError ( ' too many arguments: expected {:d} got {:d} '
. format ( len ( params ) , len ( args ) ) ,
raise RPCError ( ' too many arguments to {} : expected {:d} got {:d} '
. format ( method , len ( params ) , len ( args ) ) ,
JSONRPC . INVALID_ARGS )
if isinstance ( args , list ) :
@ -483,23 +522,179 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
raise RPCError ( ' invalid parameter names: {} '
. format ( ' , ' . join ( bad_names ) ) )
return await handler ( * * kw_args )
if inspect . iscoroutinefunction ( handler ) :
return await handler ( * * kw_args )
else :
return handler ( * * kw_args )
# ---- External Interface ----
async def process_pending_items ( self , limit = 8 ) :
''' Processes up to LIMIT pending items asynchronously. '''
while limit > 0 and self . items :
item = self . items . popleft ( )
if isinstance ( item , list ) and self . version . HAS_BATCHES :
count = min ( limit , len ( item ) )
binary = await self . process_batch ( item , count )
limit - = count
else :
binary = await self . process_single_item ( item )
limit - = 1
if binary :
self . send_binary ( binary )
def count_pending_items ( self ) :
''' Counts the number of pending items. '''
return sum ( len ( item ) if isinstance ( item , list ) else 1
for item in self . items )
def connection_made ( self ) :
''' Call when an incoming client connection is established. '''
self . session_id = self . next_session_id ( )
self . log_prefix = ' [ {:d} ] ' . format ( self . session_id )
def data_received ( self , data ) :
''' Underlying transport calls this when new data comes in.
Look for newline separators terminating full requests .
'''
if self . is_closing ( ) :
return
self . using_bandwidth ( len ( data ) )
self . recv_size + = len ( data )
# Close abusive connections where buffered data exceeds limit
buffer_size = len ( data ) + sum ( len ( part ) for part in self . parts )
if buffer_size > self . max_buffer_size :
self . log_error ( ' read buffer of {:,d} bytes over {:,d} byte limit '
. format ( buffer_size , self . max_buffer_size ) )
self . close_connection ( )
return
while True :
npos = data . find ( ord ( ' \n ' ) )
if npos == - 1 :
self . parts . append ( data )
break
tail , data = data [ : npos ] , data [ npos + 1 : ]
parts , self . parts = self . parts , [ ]
parts . append ( tail )
self . recv_count + = 1
self . last_recv = time . time ( )
self . decode_message ( b ' ' . join ( parts ) )
def send_error ( self , message , code , id_ = None ) :
''' Send a JSON error. '''
self . send_binary ( self . error_bytes ( message , code , id_ ) )
def send_request ( self , handler , method , params = None ) :
''' Sends a request and arranges for handler to be called with the
response when it comes in .
'''
id_ = self . next_request_id
self . next_request_id + = 1
self . send_binary ( self . request_bytes ( id_ , method , params ) )
self . pending_responses [ id_ ] = handler
def send_notification ( self , method , params = None ) :
''' Send a notification. '''
self . send_binary ( self . notification_bytes ( method , params ) )
def send_notifications ( self , mp_iterable ) :
''' Send an iterable of (method, params) notification pairs.
A 1 - tuple is also valid in which case there are no params . '''
if self . version . HAS_BATCHES :
parts = [ self . notification_bytes ( * pair ) for pair in mp_iterable ]
self . send_binary ( batch_bytes ( parts ) )
else :
for pair in mp_iterable :
self . send_notification ( * pair )
# -- derived classes are intended to override these functions
# Transport layer
# --- derived classes are intended to override these functions
def enqueue_request ( self , request ) :
''' Enqueue a request for later asynchronous processing. '''
def is_closing ( self ) :
''' Return True if the underlying transport is closing. '''
raise NotImplementedError
async def handle_response ( self , result , error , id_ ) :
''' Handle a JSON response.
def close_connection ( self ) :
''' Close the connection. '''
raise NotImplementedError
Should not raise an exception . Return values are ignored .
def send_bytes ( self , binary ) :
''' Pass the bytes through to the underlying transport. '''
raise NotImplementedError
# App layer
def have_pending_items ( self ) :
''' Called to indicate there are items pending to be processed
asynchronously by calling process_pending_items .
This is * not * called every time an item is added , just when
there were previously none and now there is at least one .
'''
raise NotImplementedError
def using_bandwidth ( self , amount ) :
''' Called as bandwidth is consumed.
Override to implement bandwidth management .
'''
pass
def notification_handler ( self , method ) :
''' Return the async handler for the given notification method. '''
''' Return the handler for the given notification.
The handler can be synchronous or asynchronous . '''
return None
def request_handler ( self , method ) :
''' Return the async handler for the given request method. '''
''' Return the handler for the given request method.
The handler can be synchronous or asynchronous . '''
return None
class JSONSession ( JSONSessionBase , asyncio . Protocol ) :
''' A JSONSessionBase instance specialized for use with
asyncio . protocol to implement the transport layer .
Derived classes must provide have_pending_items ( ) and may want to
override the request and notification handlers .
'''
def __init__ ( self , version = JSONRPCCompat ) :
super ( ) . __init__ ( version = version )
self . transport = None
self . write_buffer_high = 500000
def peer_info ( self ) :
''' Returns information about the peer. '''
return self . transport . get_extra_info ( ' peername ' )
def abort ( self ) :
''' Cut the connection abruptly. '''
self . transport . abort ( )
def connection_made ( self , transport ) :
''' Handle an incoming client connection. '''
transport . set_write_buffer_limits ( high = self . write_buffer_high )
self . transport = transport
super ( ) . connection_made ( )
def is_closing ( self ) :
''' True if the underlying transport is closing. '''
return self . transport and self . transport . is_closing ( )
def close_connection ( self ) :
''' Close the connection. '''
if self . transport :
self . transport . close ( )
def send_bytes ( self , binary ) :
''' Send JSON text over the transport. '''
self . transport . writelines ( ( binary , b ' \n ' ) )