diff --git a/docs/ARCHITECTURE.rst b/docs/ARCHITECTURE.rst index ed1c0e0..b796d8c 100644 --- a/docs/ARCHITECTURE.rst +++ b/docs/ARCHITECTURE.rst @@ -36,7 +36,7 @@ Not started until the Block Processor has caught up with bitcoind. Daemon ------ -Encapsulates the RPC wire protcol with bitcoind for the whole server. +Encapsulates the RPC wire protocol with bitcoind for the whole server. Transparently handles temporary bitcoind connection errors, and fails over if necessary. diff --git a/docs/ENVIRONMENT.rst b/docs/ENVIRONMENT.rst index f323356..41f4284 100644 --- a/docs/ENVIRONMENT.rst +++ b/docs/ENVIRONMENT.rst @@ -205,7 +205,8 @@ below are low and encourage you to raise them. An integer number of seconds defaulting to 600. Sessions with no activity for longer than this are disconnected. Properly functioning Electrum clients by default will send pings roughly - every 60 seconds. + every 60 seconds, and servers doing peer discovery roughly every 300 + seconds. IRC --- diff --git a/electrumx_rpc.py b/electrumx_rpc.py index deecd1e..122a7df 100755 --- a/electrumx_rpc.py +++ b/electrumx_rpc.py @@ -16,44 +16,60 @@ import json from functools import partial from os import environ -from lib.jsonrpc import JSONRPC +from lib.jsonrpc import JSONSession, JSONRPCv2 from server.controller import Controller -class RPCClient(JSONRPC): +class RPCClient(JSONSession): def __init__(self): - super().__init__() - self.queue = asyncio.Queue() - self.max_send = 1000000 - - def enqueue_request(self, request): - self.queue.put_nowait(request) - - async def send_and_wait(self, method, params, timeout=None): - # Raise incoming buffer size - presumably connection is trusted - self.max_buffer_size = 5000000 - if params: - params = [params] - self.send_request(method, method, params) - - future = asyncio.ensure_future(self.queue.get()) - for f in asyncio.as_completed([future], timeout=timeout): - try: - request = await f - except asyncio.TimeoutError: - future.cancel() - print('request timed out after {}s'.format(timeout)) + super().__init__(version=JSONRPCv2) + self.max_send = 0 + self.max_buffer_size = 5*10**6 + self.event = asyncio.Event() + + def have_pending_items(self): + self.event.set() + + async def wait_for_response(self): + await self.event.wait() + await self.process_pending_items() + + def send_rpc_request(self, method, params): + handler = partial(self.handle_response, method) + self.send_request(handler, method, params) + + def handle_response(self, method, result, error): + if method in ('groups', 'sessions') and not error: + if method == 'groups': + lines = Controller.groups_text_lines(result) else: - await request.process(self) - - async def handle_response(self, result, error, method): - if result and method in ('groups', 'sessions'): - for line in Controller.text_lines(method, result): + lines = Controller.sessions_text_lines(result) + for line in lines: print(line) + elif error: + print('error: {} (code {:d})' + .format(error['message'], error['code'])) else: - value = {'error': error} if error else result - print(json.dumps(value, indent=4, sort_keys=True)) + print(json.dumps(result, indent=4, sort_keys=True)) + + +def rpc_send_and_wait(port, method, params, timeout=15): + loop = asyncio.get_event_loop() + coro = loop.create_connection(RPCClient, 'localhost', port) + try: + transport, rpc_client = loop.run_until_complete(coro) + rpc_client.send_rpc_request(method, params) + try: + coro = rpc_client.wait_for_response() + loop.run_until_complete(asyncio.wait_for(coro, timeout)) + except asyncio.TimeoutError: + print('request timed out after {}s'.format(timeout)) + except OSError: + print('cannot connect - is ElectrumX catching up, not running, or ' + 'is {:d} the wrong RPC port?'.format(port)) + finally: + loop.close() def main(): @@ -67,20 +83,17 @@ def main(): help='params to send') args = parser.parse_args() - if args.port is None: - args.port = int(environ.get('RPC_PORT', 8000)) + port = args.port + if port is None: + port = int(environ.get('RPC_PORT', 8000)) - loop = asyncio.get_event_loop() - coro = loop.create_connection(RPCClient, 'localhost', args.port) - try: - transport, protocol = loop.run_until_complete(coro) - coro = protocol.send_and_wait(args.command[0], args.param, timeout=15) - loop.run_until_complete(coro) - except OSError: - print('error connecting - is ElectrumX catching up or not running?') - finally: - loop.stop() - loop.close() + # Get the RPC request. + method = args.command[0] + params = args.param + if method in ('log', 'disconnect'): + params = [params] + + rpc_send_and_wait(port, method, params) if __name__ == '__main__': diff --git a/lib/jsonrpc.py b/lib/jsonrpc.py index 7657cfb..b46992c 100644 --- a/lib/jsonrpc.py +++ b/lib/jsonrpc.py @@ -1,21 +1,26 @@ -# Copyright (c) 2016, Neil Booth +# Copyright (c) 2016-2017, Neil Booth # # All rights reserved. # # See the file "LICENCE" for information about the copyright # and warranty status of this software. -'''Class for handling JSON RPC 2.0 connections, server or client.''' +'''Classes for acting as a peer over a transport and speaking the JSON +RPC versions 1.0 and 2.0. + +JSONSessionBase can use an arbitrary transport. +JSONSession integrates asyncio.Protocol to provide the transport. +''' import asyncio +import collections import inspect import json import numbers import time import traceback -from functools import partial -from lib.util import LoggedClass +import lib.util as util class RPCError(Exception): @@ -26,307 +31,310 @@ class RPCError(Exception): self.code = code -class RequestBase(object): - '''An object that represents a queued request.''' +class JSONRPC(object): + '''Base class of JSON RPC versions.''' - def __init__(self, remaining): - self.remaining = remaining + # See http://www.jsonrpc.org/specification + PARSE_ERROR = -32700 + INVALID_REQUEST = -32600 + METHOD_NOT_FOUND = -32601 + INVALID_ARGS = -32602 + INTERNAL_ERROR = -32603 + ID_TYPES = (type(None), str, numbers.Number) + HAS_BATCHES = False -class SingleRequest(RequestBase): - '''An object that represents a single request.''' - def __init__(self, payload): - super().__init__(1) - self.payload = payload +class JSONRPCv1(JSONRPC): + '''JSON RPC version 1.0.''' - async def process(self, session): - '''Asynchronously handle the JSON request.''' - self.remaining = 0 - binary = await session.process_single_payload(self.payload) - if binary: - session._send_bytes(binary) + @classmethod + def request_payload(cls, id_, method, params=None): + '''JSON v1 request payload. Params is mandatory.''' + return {'method': method, 'params': params or [], 'id': id_} - def __str__(self): - return str(self.payload) + @classmethod + def notification_payload(cls, method, params=None): + '''JSON v1 notification payload. Params and id are mandatory.''' + return {'method': method, 'params': params or [], 'id': None} + @classmethod + def response_payload(cls, result, id_): + '''JSON v1 response payload. error is present and None.''' + return {'id': id_, 'result': result, 'error': None} -class BatchRequest(RequestBase): - '''An object that represents a batch request and its processing state. + @classmethod + def error_payload(cls, message, code, id_): + '''JSON v1 error payload. result is present and None.''' + return {'id': id_, 'result': None, + 'error': {'message': message, 'code': code}} - Batches are processed in chunks. - ''' + @classmethod + def handle_response(cls, handler, payload): + '''JSON v1 response handler. Both 'error' and 'response' + should exist with exactly one being None. + ''' + try: + result = payload['result'] + error = payload['error'] + except KeyError: + pass + else: + if (result is None) != (error is None): + handler(result, error) - def __init__(self, payload): - super().__init__(len(payload)) - self.payload = payload - self.parts = [] + @classmethod + def is_request(cls, payload): + '''Returns True if the payload (which has a method) is a request. + False means it is a notification.''' + return payload.get('id') != None - async def process(self, session): - '''Asynchronously handle the JSON batch according to the JSON 2.0 - spec.''' - target = max(self.remaining - 4, 0) - while self.remaining > target: - item = self.payload[len(self.payload) - self.remaining] - self.remaining -= 1 - part = await session.process_single_payload(item) - if part: - self.parts.append(part) - - total_len = sum(len(part) + 2 for part in self.parts) - if session.is_oversized_request(total_len): - raise RPCError('request too large', JSONRPC.INVALID_REQUEST) - - if not self.remaining: - if self.parts: - binary = b'[' + b', '.join(self.parts) + b']' - session._send_bytes(binary) - - def __str__(self): - return str(self.payload) - - -class JSONRPC(asyncio.Protocol, LoggedClass): - '''Manages a JSONRPC connection. - - Assumes JSON messages are newline-separated and that newlines - cannot appear in the JSON other than to separate lines. Incoming - requests are passed to enqueue_request(), which should arrange for - their asynchronous handling via the request's process() method. - - Derived classes may want to override connection_made() and - connection_lost() but should be sure to call the implementation in - this base class first. They may also want to implement the asynchronous - function handle_response() which by default does nothing. - - The functions request_handler() and notification_handler() are - passed an RPC method name, and should return an asynchronous - function to call to handle it. The functions' docstrings are used - for help, and the arguments are what can be used as JSONRPC 2.0 - named arguments (and thus become part of the external interface). - If the method is unknown return None. - - Request handlers should return a Python object to return to the - caller, or raise an RPCError on error. Notification handlers - should not return a value or raise any exceptions. - ''' - # See http://www.jsonrpc.org/specification - PARSE_ERROR = -32700 - INVALID_REQUEST = -32600 - METHOD_NOT_FOUND = -32601 - INVALID_ARGS = -32602 - INTERNAL_ERROR = -32603 +class JSONRPCv2(JSONRPC): + '''JSON RPC version 2.0.''' - ID_TYPES = (type(None), str, numbers.Number) - NEXT_SESSION_ID = 0 + HAS_BATCHES = True @classmethod def request_payload(cls, id_, method, params=None): - payload = {'jsonrpc': '2.0', 'id': id_, 'method': method} + '''JSON v2 request payload. Params is optional.''' + payload = {'jsonrpc': '2.0', 'method': method, 'id': id_} + if params: + payload['params'] = params + return payload + + @classmethod + def notification_payload(cls, method, params=None): + '''JSON v2 notification payload. There must be no id.''' + payload = {'jsonrpc': '2.0', 'method': method} if params: payload['params'] = params return payload @classmethod def response_payload(cls, result, id_): - return {'jsonrpc': '2.0', 'result': result, 'id': id_} + '''JSON v2 response payload. error is not present.''' + return {'jsonrpc': '2.0', 'id': id_, 'result': result} @classmethod - def error_payload(cls, message, code, id_=None): - error = {'message': message, 'code': code} - return {'jsonrpc': '2.0', 'error': error, 'id': id_} + def error_payload(cls, message, code, id_): + '''JSON v2 error payload. result is not present.''' + return {'jsonrpc': '2.0', 'id': id_, + 'error': {'message': message, 'code': code}} @classmethod - def check_payload_id(cls, payload): - '''Extract and return the ID from the payload. + def handle_response(cls, handler, payload): + '''JSON v2 response handler. Exactly one of 'error' and 'response' + must exist. Errors must have 'code' and 'message' members. + ''' + if ('error' in payload) != ('result' in payload): + if 'result' in payload: + handler(payload['result'], None) + else: + error = payload['error'] + if (isinstance(error, dict) and 'code' in error + and 'message' in error): + handler(None, error) - Raises an RPCError if it is missing or invalid.''' - if not 'id' in payload: - raise RPCError('missing id', JSONRPC.INVALID_REQUEST) + @classmethod + def batch_size(cls, parts): + '''Return the size of a JSON batch from its parts.''' + return sum(len(part) for part in parts) + 2 * len(parts) - id_ = payload['id'] - if not isinstance(id_, JSONRPC.ID_TYPES): - raise RPCError('invalid id: {}'.format(id_), - JSONRPC.INVALID_REQUEST) - return id_ + @classmethod + def batch_bytes(cls, parts): + '''Return the bytes of a JSON batch from its parts.''' + if parts: + return b'[' + b', '.join(parts) + b']' + return b'' @classmethod - def payload_id(cls, payload): - '''Extract and return the ID from the payload. + def is_request(cls, payload): + '''Returns True if the payload (which has a method) is a request. + False means it is a notification.''' + return 'id' in payload - Returns None if it is missing or invalid.''' - try: - return cls.check_payload_id(payload) - except RPCError: + +class JSONRPCCompat(JSONRPC): + '''Intended to be used until receiving a response from the peer, at + which point detect_version should be used to choose which version + to use. + + Sends requests compatible with v1 and v2. Errors cannot be + compatible so v2 errors are sent. + + Does not send responses or notifications, nor handle responses. + + ''' + @classmethod + def request_payload(cls, id_, method, params=None): + '''JSON v2 request payload but with params present.''' + return {'jsonrpc': '2.0', 'id': id_, + 'method': method, 'params': params or []} + + @classmethod + def error_payload(cls, message, code, id_): + '''JSON v2 error payload. result is not present.''' + return {'jsonrpc': '2.0', 'id': id_, + 'error': {'message': message, 'code': code}} + + @classmethod + def detect_version(cls, payload): + '''Return a best guess at a version compatible with the received + payload. + + Return None if one cannot be determined. + ''' + def item_version(item): + if isinstance(item, dict): + version = item.get('jsonrpc') + if version is None: + return JSONRPCv1 + if version == '2.0': + return JSONRPCv2 return None - def __init__(self): + if isinstance(payload, list) and payload: + version = item_version(payload[0]) + # If a batch return at least JSONRPCv2 + if version in (JSONRPCv1, None): + version = JSONRPCv2 + else: + version = item_version(payload) + + return version + + +class JSONSessionBase(util.LoggedClass): + '''Acts as the application layer session, communicating via JSON RPC + over an underlying transport. + + Processes incoming and sends outgoing requests, notifications and + responses. Incoming messages are queued. When the queue goes + from empty + ''' + + NEXT_SESSION_ID = 0 + + @classmethod + def next_session_id(cls): + session_id = cls.NEXT_SESSION_ID + cls.NEXT_SESSION_ID += 1 + return session_id + + def __init__(self, version=JSONRPCCompat): super().__init__() - self.send_notification = partial(self.send_request, None) - self.start = time.time() - self.stop = 0 - self.last_recv = self.start - self.bandwidth_start = self.start - self.bandwidth_interval = 3600 - self.bandwidth_used = 0 - self.bandwidth_limit = 5000000 - self.transport = None - self.pause = False + # Parts of an incomplete JSON line. We buffer them until # getting a newline. self.parts = [] - # recv_count is JSON messages not calls to data_received() - self.recv_count = 0 - self.recv_size = 0 + self.version = version + self.log_me = False + self.session_id = None + # Count of incoming complete JSON requests and the time of the + # last one. A batch counts as just one here. + self.last_recv = time.time() self.send_count = 0 self.send_size = 0 + self.recv_size = 0 + self.recv_count = 0 self.error_count = 0 - self.close_after_send = False - self.peer_info = None - # Sends longer than max_send are prevented, instead returning - # an oversized request error to other end of the network - # connection. The request causing it is logged. Values under - # 1000 are treated as 1000. - self.max_send = 0 + self.pause = False + # Handling of incoming items + self.items = collections.deque() + self.batch_results = [] + # Handling of outgoing requests + self.next_request_id = 0 + self.pending_responses = {} # If buffered incoming data exceeds this the connection is closed self.max_buffer_size = 1000000 - self.anon_logs = False - self.id_ = JSONRPC.NEXT_SESSION_ID - JSONRPC.NEXT_SESSION_ID += 1 - self.log_prefix = '[{:d}] '.format(self.id_) - self.log_me = False - - def peername(self, *, for_log=True): - '''Return the peer name of this connection.''' - if not self.peer_info: - return 'unknown' - if for_log and self.anon_logs: - return 'xx.xx.xx.xx:xx' - if ':' in self.peer_info[0]: - return '[{}]:{}'.format(self.peer_info[0], self.peer_info[1]) - else: - return '{}:{}'.format(self.peer_info[0], self.peer_info[1]) - - def connection_made(self, transport): - '''Handle an incoming client connection.''' - self.transport = transport - self.peer_info = transport.get_extra_info('peername') - transport.set_write_buffer_limits(high=500000) - - def connection_lost(self, exc): - '''Handle client disconnection.''' - pass + self.max_send = 50000 + self.close_after_send = False def pause_writing(self): - '''Called by asyncio when the write buffer is full.''' - self.log_info('pausing request processing whilst socket drains') + '''Transport calls when the send buffer is full.''' + self.log_info('pausing processing whilst socket drains') self.pause = True def resume_writing(self): - '''Called by asyncio when the write buffer has room.''' - self.log_info('resuming request processing') + '''Transport calls when the send buffer has room.''' + self.log_info('resuming processing') self.pause = False - def close_connection(self): - self.stop = time.time() - if self.transport: - self.transport.close() - - def using_bandwidth(self, amount): - now = time.time() - # Reduce the recorded usage in proportion to the elapsed time - elapsed = now - self.bandwidth_start - self.bandwidth_start = now - refund = int(elapsed / self.bandwidth_interval * self.bandwidth_limit) - refund = min(refund, self.bandwidth_used) - self.bandwidth_used += amount - refund - self.throttled = max(0, self.throttled - int(elapsed) // 60) + def is_oversized(self, length): + '''Return True if the given outgoing message size is too large.''' + if self.max_send and length > max(1000, self.max_send): + msg = 'response too large (at least {:d} bytes)'.format(length) + return self.error_bytes(msg, JSONRPC.INVALID_REQUEST, + payload.get('id')) + return False - def data_received(self, data): - '''Handle incoming data (synchronously). + def send_binary(self, binary): + '''Pass the bytes through to the transport. - Requests end in newline characters. Pass complete requests to - decode_message for handling. + Close the connection if close_after_send is set. ''' - self.recv_size += len(data) - self.using_bandwidth(len(data)) - - # Close abusive connections where buffered data exceeds limit - buffer_size = len(data) + sum(len(part) for part in self.parts) - if buffer_size > self.max_buffer_size: - self.log_error('read buffer of {:,d} bytes exceeds {:,d} ' - 'byte limit, closing {}' - .format(buffer_size, self.max_buffer_size, - self.peername())) - self.close_connection() - - # Do nothing if this connection is closing - if self.transport.is_closing(): + if self.is_closing(): return + self.using_bandwidth(len(binary)) + self.send_count += 1 + self.send_size += len(binary) + self.send_bytes(binary) + if self.close_after_send: + self.close_connection() - while True: - npos = data.find(ord('\n')) - if npos == -1: - self.parts.append(data) - break - self.last_recv = time.time() - self.recv_count += 1 - tail, data = data[:npos], data[npos + 1:] - parts, self.parts = self.parts, [] - parts.append(tail) - self.decode_message(b''.join(parts)) - - def decode_message(self, message): - '''Decode a binary message and queue it for asynchronous handling. + def payload_id(self, payload): + '''Extract and return the ID from the payload. - Messages that cannot be decoded are logged and dropped. - ''' + Returns None if it is missing or invalid.''' try: - message = message.decode() - except UnicodeDecodeError as e: - msg = 'cannot decode binary bytes: {}'.format(e) - self.send_json_error(msg, JSONRPC.PARSE_ERROR) - return + return self.check_payload_id(payload) + except RPCError: + return None - try: - message = json.loads(message) - except json.JSONDecodeError as e: - msg = 'cannot decode JSON: {}'.format(e) - self.send_json_error(msg, JSONRPC.PARSE_ERROR) - return + def check_payload_id(self, payload): + '''Extract and return the ID from the payload. - if isinstance(message, list): - # Batches must have at least one object. - if not message: - self.send_json_error('empty batch', JSONRPC.INVALID_REQUEST) - return - request = BatchRequest(message) - else: - request = SingleRequest(message) + Raises an RPCError if it is missing or invalid.''' + if not 'id' in payload: + raise RPCError('missing id', JSONRPC.INVALID_REQUEST) - '''Queue the request for asynchronous handling.''' - self.enqueue_request(request) - if self.log_me: - self.log_info('queued {}'.format(message)) + id_ = payload['id'] + if not isinstance(id_, self.version.ID_TYPES): + raise RPCError('invalid id type {}'.format(type(id_)), + JSONRPC.INVALID_REQUEST) + return id_ - def send_json_error(self, message, code, id_=None): - '''Send a JSON error.''' - self._send_bytes(self.json_error_bytes(message, code, id_)) + def request_bytes(self, id_, method, params=None): + '''Return the bytes of a JSON request.''' + payload = self.version.request_payload(id_, method, params) + return self.encode_payload(payload) - def send_request(self, id_, method, params=None): - '''Send a request. If id_ is None it is a notification.''' - self.encode_and_send_payload(self.request_payload(id_, method, params)) + def notification_bytes(self, method, params=None): + payload = self.version.notification_payload(method, params) + return self.encode_payload(payload) - def send_notifications(self, mp_iterable): - '''Send an iterable of (method, params) notification pairs. + def response_bytes(self, result, id_): + '''Return the bytes of a JSON response.''' + return self.encode_payload(self.version.response_payload(result, id_)) - A 1-tuple is also valid in which case there are no params.''' - # TODO: maybe send batches if remote side supports it - for pair in mp_iterable: - self.send_notification(*pair) + def error_bytes(self, message, code, id_=None): + '''Return the bytes of a JSON error. + + Flag the connection to close on a fatal error or too many errors.''' + version = self.version + self.error_count += 1 + if code in (version.PARSE_ERROR, version.INVALID_REQUEST): + self.log_info(message) + self.close_after_send = True + elif self.error_count >= 10: + self.log_info('too many errors, last: {}'.format(message)) + self.close_after_send = True + return self.encode_payload(self.version.error_payload + (message, code, id_)) def encode_payload(self, payload): + '''Encode a Python object as binary bytes.''' assert isinstance(payload, dict) try: @@ -334,86 +342,118 @@ class JSONRPC(asyncio.Protocol, LoggedClass): except TypeError: msg = 'JSON encoding failure: {}'.format(payload) self.log_error(msg) - binary = self.json_error_bytes(msg, JSONRPC.INTERNAL_ERROR, - payload.get('id')) + binary = self.error_bytes(msg, JSONRPC.INTERNAL_ERROR, + payload.get('id')) - if self.is_oversized_request(len(binary)): - binary = self.json_error_bytes('request too large', - JSONRPC.INVALID_REQUEST, - payload.get('id')) - self.send_count += 1 - self.send_size += len(binary) - self.using_bandwidth(len(binary)) - return binary + error_bytes = self.is_oversized(len(binary)) + return error_bytes or binary - def is_oversized_request(self, total_len): - return total_len > max(1000, self.max_send) + def decode_message(self, payload): + '''Decode a binary message and pass it on to process_single_item or + process_batch as appropriate. - def _send_bytes(self, binary): - '''Send JSON text over the transport. Close it if close is True.''' - # Confirmed this happens, sometimes a lot - if self.transport.is_closing(): + Messages that cannot be decoded are logged and dropped. + ''' + try: + payload = payload.decode() + except UnicodeDecodeError as e: + msg = 'cannot decode message: {}'.format(e) + self.send_error(msg, JSONRPC.PARSE_ERROR) + return + + try: + payload = json.loads(payload) + except json.JSONDecodeError as e: + msg = 'cannot decode JSON: {}'.format(e) + self.send_error(msg, JSONRPC.PARSE_ERROR) return - self.transport.write(binary) - self.transport.write(b'\n') - if self.close_after_send: - self.close_connection() - def encode_and_send_payload(self, payload): - '''Encode the payload and send it.''' - self._send_bytes(self.encode_payload(payload)) + if self.version is JSONRPCCompat: + # Attempt to detect peer's JSON RPC version + version = self.version.detect_version(payload) + if not version: + version = JSONRPCv2 + self.log_info('unable to detect JSON RPC version, using 2.0') + self.version = version + + # Batches must have at least one object. + if payload == [] and self.version.HAS_BATCHES: + self.send_error('empty batch', JSONRPC.INVALID_REQUEST) + return - def json_request_bytes(self, method, id_, params=None): - '''Return the bytes of a JSON request.''' - return self.encode_payload(self.request_payload(method, id_, params)) + # Incoming items get queued for later asynchronous processing. + if not self.items: + self.have_pending_items() + self.items.append(payload) - def json_response_bytes(self, result, id_): - '''Return the bytes of a JSON response.''' - return self.encode_payload(self.response_payload(result, id_)) + async def process_batch(self, batch, count): + '''Processes count items from the batch according to the JSON 2.0 + spec. - def json_error_bytes(self, message, code, id_=None): - '''Return the bytes of a JSON error. + If any remain, puts what is left of the batch back in the deque + and returns None. Otherwise returns the binary batch result.''' + results = self.batch_results + self.batch_results = [] - Flag the connection to close on a fatal error or too many errors.''' - self.error_count += 1 - if (code in (JSONRPC.PARSE_ERROR, JSONRPC.INVALID_REQUEST) - or self.error_count > 10): - self.close_after_send = True - return self.encode_payload(self.error_payload(message, code, id_)) + for n in range(count): + item = batch.pop() + result = await self.process_single_item(item) + if result: + results.append(result) + + if not batch: + return self.version.batch_bytes(results) + + error_bytes = self.is_oversized(self.batch_size(results)) + if error_bytes: + return error_bytes - async def process_single_payload(self, payload): + self.items.appendleft(item) + self.batch_results = results + return None + + async def process_single_item(self, payload): '''Handle a single JSON request, notification or response. If it is a request, return the binary response, oterhwise None.''' + if self.log_me: + self.log_info('processing {}'.format(payload)) + if not isinstance(payload, dict): - return self.json_error_bytes('request must be a dict', - JSONRPC.INVALID_REQUEST) - - # Requests and notifications must have a method. - # Notifications are distinguished by having no 'id'. - if 'method' in payload: - if 'id' in payload: - return await self.process_single_request(payload) + return self.error_bytes('request must be a dictionary', + JSONRPC.INVALID_REQUEST) + + try: + # Requests and notifications must have a method. + if 'method' in payload: + if self.version.is_request(payload): + return await self.process_single_request(payload) + else: + await self.process_single_notification(payload) else: - await self.process_single_notification(payload) - else: - await self.process_single_response(payload) + self.process_single_response(payload) - return None + return None + except asyncio.CancelledError: + raise + except Exception: + self.log_error(traceback.format_exc()) + return self.error_bytes('internal error processing request', + JSONRPC.INTERNAL_ERROR, + self.payload_id(payload)) async def process_single_request(self, payload): '''Handle a single JSON request and return the binary response.''' try: result = await self.handle_payload(payload, self.request_handler) - return self.json_response_bytes(result, payload['id']) + return self.response_bytes(result, payload['id']) except RPCError as e: - return self.json_error_bytes(e.msg, e.code, - self.payload_id(payload)) + return self.error_bytes(e.msg, e.code, self.payload_id(payload)) except Exception: self.log_error(traceback.format_exc()) - return self.json_error_bytes('internal error processing request', - JSONRPC.INTERNAL_ERROR, - self.payload_id(payload)) + return self.error_bytes('internal error processing request', + JSONRPC.INTERNAL_ERROR, + self.payload_id(payload)) async def process_single_notification(self, payload): '''Handle a single JSON notification.''' @@ -424,18 +464,16 @@ class JSONRPC(asyncio.Protocol, LoggedClass): except Exception: self.log_error(traceback.format_exc()) - async def process_single_response(self, payload): + def process_single_response(self, payload): '''Handle a single JSON response.''' try: id_ = self.check_payload_id(payload) - # Only one of result and error should exist - if 'error' in payload: - error = payload['error'] - if (not 'result' in payload and isinstance(error, dict) - and 'code' in error and 'message' in error): - await self.handle_response(None, error, id_) - elif 'result' in payload: - await self.handle_response(payload['result'], None, id_) + handler = self.pending_responses.pop(id_, None) + if handler: + self.version.handle_response(handler, payload) + else: + self.log_info('response for unsent id {}'.format(id_), + throttle=True) except RPCError: pass except Exception: @@ -448,7 +486,7 @@ class JSONRPC(asyncio.Protocol, LoggedClass): method = payload.get('method') if not isinstance(method, str): - raise RPCError("invalid method: '{}'".format(method), + raise RPCError("invalid method type {}".format(type(method)), JSONRPC.INVALID_REQUEST) handler = get_handler(method) @@ -457,7 +495,7 @@ class JSONRPC(asyncio.Protocol, LoggedClass): JSONRPC.METHOD_NOT_FOUND) if not isinstance(args, (list, dict)): - raise RPCError('arguments should be an array or a dict', + raise RPCError('arguments should be an array or dictionary', JSONRPC.INVALID_REQUEST) params = inspect.signature(handler).parameters @@ -465,12 +503,13 @@ class JSONRPC(asyncio.Protocol, LoggedClass): min_args = sum(p.default is p.empty for p in params.values()) if len(args) < min_args: - raise RPCError('too few arguments: expected {:d} got {:d}' - .format(min_args, len(args)), JSONRPC.INVALID_ARGS) + raise RPCError('too few arguments to {}: expected {:d} got {:d}' + .format(method, min_args, len(args)), + JSONRPC.INVALID_ARGS) if len(args) > len(params): - raise RPCError('too many arguments: expected {:d} got {:d}' - .format(len(params), len(args)), + raise RPCError('too many arguments to {}: expected {:d} got {:d}' + .format(method, len(params), len(args)), JSONRPC.INVALID_ARGS) if isinstance(args, list): @@ -483,23 +522,179 @@ class JSONRPC(asyncio.Protocol, LoggedClass): raise RPCError('invalid parameter names: {}' .format(', '.join(bad_names))) - return await handler(**kw_args) + if inspect.iscoroutinefunction(handler): + return await handler(**kw_args) + else: + return handler(**kw_args) + + # ---- External Interface ---- + + async def process_pending_items(self, limit=8): + '''Processes up to LIMIT pending items asynchronously.''' + while limit > 0 and self.items: + item = self.items.popleft() + if isinstance(item, list) and self.version.HAS_BATCHES: + count = min(limit, len(item)) + binary = await self.process_batch(item, count) + limit -= count + else: + binary = await self.process_single_item(item) + limit -= 1 + + if binary: + self.send_binary(binary) + + def count_pending_items(self): + '''Counts the number of pending items.''' + return sum(len(item) if isinstance(item, list) else 1 + for item in self.items) + + def connection_made(self): + '''Call when an incoming client connection is established.''' + self.session_id = self.next_session_id() + self.log_prefix = '[{:d}] '.format(self.session_id) + + def data_received(self, data): + '''Underlying transport calls this when new data comes in. + + Look for newline separators terminating full requests. + ''' + if self.is_closing(): + return + self.using_bandwidth(len(data)) + self.recv_size += len(data) + + # Close abusive connections where buffered data exceeds limit + buffer_size = len(data) + sum(len(part) for part in self.parts) + if buffer_size > self.max_buffer_size: + self.log_error('read buffer of {:,d} bytes over {:,d} byte limit' + .format(buffer_size, self.max_buffer_size)) + self.close_connection() + return + + while True: + npos = data.find(ord('\n')) + if npos == -1: + self.parts.append(data) + break + tail, data = data[:npos], data[npos + 1:] + parts, self.parts = self.parts, [] + parts.append(tail) + self.recv_count += 1 + self.last_recv = time.time() + self.decode_message(b''.join(parts)) + + def send_error(self, message, code, id_=None): + '''Send a JSON error.''' + self.send_binary(self.error_bytes(message, code, id_)) + + def send_request(self, handler, method, params=None): + '''Sends a request and arranges for handler to be called with the + response when it comes in. + ''' + id_ = self.next_request_id + self.next_request_id += 1 + self.send_binary(self.request_bytes(id_, method, params)) + self.pending_responses[id_] = handler + + def send_notification(self, method, params=None): + '''Send a notification.''' + self.send_binary(self.notification_bytes(method, params)) + + def send_notifications(self, mp_iterable): + '''Send an iterable of (method, params) notification pairs. + + A 1-tuple is also valid in which case there are no params.''' + if self.version.HAS_BATCHES: + parts = [self.notification_bytes(*pair) for pair in mp_iterable] + self.send_binary(batch_bytes(parts)) + else: + for pair in mp_iterable: + self.send_notification(*pair) + + # -- derived classes are intended to override these functions + + # Transport layer - # --- derived classes are intended to override these functions - def enqueue_request(self, request): - '''Enqueue a request for later asynchronous processing.''' + def is_closing(self): + '''Return True if the underlying transport is closing.''' raise NotImplementedError - async def handle_response(self, result, error, id_): - '''Handle a JSON response. + def close_connection(self): + '''Close the connection.''' + raise NotImplementedError - Should not raise an exception. Return values are ignored. + def send_bytes(self, binary): + '''Pass the bytes through to the underlying transport.''' + raise NotImplementedError + + # App layer + + def have_pending_items(self): + '''Called to indicate there are items pending to be processed + asynchronously by calling process_pending_items. + + This is *not* called every time an item is added, just when + there were previously none and now there is at least one. ''' + raise NotImplementedError + + def using_bandwidth(self, amount): + '''Called as bandwidth is consumed. + + Override to implement bandwidth management. + ''' + pass def notification_handler(self, method): - '''Return the async handler for the given notification method.''' + '''Return the handler for the given notification. + + The handler can be synchronous or asynchronous.''' return None def request_handler(self, method): - '''Return the async handler for the given request method.''' + '''Return the handler for the given request method. + + The handler can be synchronous or asynchronous.''' return None + + +class JSONSession(JSONSessionBase, asyncio.Protocol): + '''A JSONSessionBase instance specialized for use with + asyncio.protocol to implement the transport layer. + + Derived classes must provide have_pending_items() and may want to + override the request and notification handlers. + ''' + + def __init__(self, version=JSONRPCCompat): + super().__init__(version=version) + self.transport = None + self.write_buffer_high = 500000 + + def peer_info(self): + '''Returns information about the peer.''' + return self.transport.get_extra_info('peername') + + def abort(self): + '''Cut the connection abruptly.''' + self.transport.abort() + + def connection_made(self, transport): + '''Handle an incoming client connection.''' + transport.set_write_buffer_limits(high=self.write_buffer_high) + self.transport = transport + super().connection_made() + + def is_closing(self): + '''True if the underlying transport is closing.''' + return self.transport and self.transport.is_closing() + + def close_connection(self): + '''Close the connection.''' + if self.transport: + self.transport.close() + + def send_bytes(self, binary): + '''Send JSON text over the transport.''' + self.transport.writelines((binary, b'\n')) diff --git a/server/controller.py b/server/controller.py index f2a8a6d..0372862 100644 --- a/server/controller.py +++ b/server/controller.py @@ -18,14 +18,14 @@ from functools import partial import pylru -from lib.jsonrpc import JSONRPC, RPCError, RequestBase +from lib.jsonrpc import JSONRPC, RPCError from lib.hash import sha256, double_sha256, hash_to_str, hex_str_to_hash import lib.util as util from server.block_processor import BlockProcessor from server.daemon import Daemon, DaemonError -from server.session import LocalRPC, ElectrumX -from server.peers import PeerManager from server.mempool import MemPool +from server.peers import PeerManager +from server.session import LocalRPC, ElectrumX from server.version import VERSION @@ -39,16 +39,6 @@ class Controller(util.LoggedClass): BANDS = 5 CATCHING_UP, LISTENING, PAUSED, SHUTTING_DOWN = range(4) - class NotificationRequest(RequestBase): - def __init__(self, height, touched): - super().__init__(1) - self.height = height - self.touched = touched - - async def process(self, session): - self.remaining = 0 - await session.notify(self.height, self.touched) - def __init__(self, env): super().__init__() # Set this event to cleanly shutdown @@ -56,7 +46,7 @@ class Controller(util.LoggedClass): self.loop = asyncio.get_event_loop() self.executor = ThreadPoolExecutor() self.loop.set_default_executor(self.executor) - self.start = time.time() + self.start_time = time.time() self.coin = env.coin self.daemon = Daemon(env.coin.daemon_urls(env.daemon_url)) self.bp = BlockProcessor(env, self.daemon) @@ -141,9 +131,9 @@ class Controller(util.LoggedClass): if isinstance(session, LocalRPC): return 0 gid = self.sessions[session] - group_bandwidth = sum(s.bandwidth_used for s in self.groups[gid]) - return 1 + (bisect_left(self.bands, session.bandwidth_used) - + bisect_left(self.bands, group_bandwidth)) // 2 + group_bw = sum(session.bw_used for session in self.groups[gid]) + return 1 + (bisect_left(self.bands, session.bw_used) + + bisect_left(self.bands, group_bw)) // 2 def is_deprioritized(self, session): return self.session_priority(session) > self.BANDS @@ -166,6 +156,15 @@ class Controller(util.LoggedClass): and self.state == self.PAUSED): await self.start_external_servers() + # Periodically log sessions + if self.env.log_sessions and time.time() > self.next_log_sessions: + if self.next_log_sessions: + data = self.session_data(for_log=True) + for line in Controller.sessions_text_lines(data): + self.logger.info(line) + self.logger.info(json.dumps(self.server_summary())) + self.next_log_sessions = time.time() + self.env.log_sessions + await asyncio.sleep(1) def enqueue_session(self, session): @@ -195,7 +194,10 @@ class Controller(util.LoggedClass): while True: priority_, id_, session = await self.queue.get() if session in self.sessions: - await session.serve_requests() + await session.process_pending_items() + # Re-enqueue the session if stuff is left + if session.items: + self.enqueue_session(session) def initiate_shutdown(self): '''Call this function to start the shutdown process.''' @@ -265,8 +267,8 @@ class Controller(util.LoggedClass): async def start_server(self, kind, *args, **kw_args): protocol_class = LocalRPC if kind == 'RPC' else ElectrumX - protocol = partial(protocol_class, self, self.bp, self.env, kind) - server = self.loop.create_server(protocol, *args, **kw_args) + protocol_factory = partial(protocol_class, self, kind) + server = self.loop.create_server(protocol_factory, *args, **kw_args) host, port = args[:2] try: @@ -329,17 +331,7 @@ class Controller(util.LoggedClass): for session in self.sessions: if isinstance(session, ElectrumX): - request = self.NotificationRequest(self.bp.db_height, - touched) - session.enqueue_request(request) - # Periodically log sessions - if self.env.log_sessions and time.time() > self.next_log_sessions: - if self.next_log_sessions: - data = self.session_data(for_log=True) - for line in Controller.sessions_text_lines(data): - self.logger.info(line) - self.logger.info(json.dumps(self.server_summary())) - self.next_log_sessions = time.time() + self.env.log_sessions + await session.notify(self.bp.db_height, touched) def electrum_header(self, height): '''Return the binary header at the given height.''' @@ -357,7 +349,7 @@ class Controller(util.LoggedClass): if now > self.next_stale_check: self.next_stale_check = now + 300 self.clear_stale_sessions() - gid = int(session.start - self.start) // 900 + gid = int(session.start_time - self.start_time) // 900 self.groups[gid].append(session) self.sessions[session] = gid session.log_info('{} {}, {:,d} total' @@ -381,12 +373,12 @@ class Controller(util.LoggedClass): def close_session(self, session): '''Close the session's transport and cancel its future.''' session.close_connection() - return 'disconnected {:d}'.format(session.id_) + return 'disconnected {:d}'.format(session.session_id) def toggle_logging(self, session): '''Toggle logging of the session.''' session.log_me = not session.log_me - return 'log {:d}: {}'.format(session.id_, session.log_me) + return 'log {:d}: {}'.format(session.session_id, session.log_me) def clear_stale_sessions(self, grace=15): '''Cut off sessions that haven't done anything for 10 minutes. Force @@ -400,17 +392,17 @@ class Controller(util.LoggedClass): stale = [] for session in self.sessions: if session.is_closing(): - if session.stop <= shutdown_cutoff: - session.transport.abort() + if session.close_time <= shutdown_cutoff: + session.abort() elif session.last_recv < stale_cutoff: self.close_session(session) - stale.append(session.id_) + stale.append(session.session_id) if stale: self.logger.info('closing stale connections {}'.format(stale)) # Consolidate small groups gids = [gid for gid, l in self.groups.items() if len(l) <= 4 - and sum(session.bandwidth_used for session in l) < 10000] + and sum(session.bw_used for session in l) < 10000] if len(gids) > 1: sessions = sum([self.groups[gid] for gid in gids], []) new_gid = max(gids) @@ -436,7 +428,7 @@ class Controller(util.LoggedClass): 'paused': sum(s.pause for s in self.sessions), 'pid': os.getpid(), 'peers': self.peers.count(), - 'requests': sum(s.requests_remaining() for s in self.sessions), + 'requests': sum(s.count_pending_items() for s in self.sessions), 'sessions': self.session_count(), 'subs': self.sub_count(), 'txs_sent': self.txs_sent, @@ -445,13 +437,6 @@ class Controller(util.LoggedClass): def sub_count(self): return sum(s.sub_count() for s in self.sessions) - @staticmethod - def text_lines(method, data): - if method == 'sessions': - return Controller.sessions_text_lines(data) - else: - return Controller.groups_text_lines(data) - @staticmethod def groups_text_lines(data): '''A generator returning lines for a list of groups. @@ -482,8 +467,8 @@ class Controller(util.LoggedClass): sessions = self.groups[gid] result.append([gid, len(sessions), - sum(s.bandwidth_used for s in sessions), - sum(s.requests_remaining() for s in sessions), + sum(s.bw_used for s in sessions), + sum(s.count_pending_items() for s in sessions), sum(s.txs_sent for s in sessions), sum(s.sub_count() for s in sessions), sum(s.recv_count for s in sessions), @@ -523,17 +508,17 @@ class Controller(util.LoggedClass): def session_data(self, for_log): '''Returned to the RPC 'sessions' call.''' now = time.time() - sessions = sorted(self.sessions, key=lambda s: s.start) - return [(session.id_, + sessions = sorted(self.sessions, key=lambda s: s.start_time) + return [(session.session_id, session.flags(), session.peername(for_log=for_log), session.client, - session.requests_remaining(), + session.count_pending_items(), session.txs_sent, session.sub_count(), session.recv_count, session.recv_size, session.send_count, session.send_size, - now - session.start) + now - session.start_time) for session in sessions] def lookup_session(self, session_id): @@ -543,7 +528,7 @@ class Controller(util.LoggedClass): pass else: for session in self.sessions: - if session.id_ == session_id: + if session.session_id == session_id: return session return None @@ -562,42 +547,42 @@ class Controller(util.LoggedClass): # Local RPC command handlers - async def rpc_disconnect(self, session_ids): + def rpc_disconnect(self, session_ids): '''Disconnect sesssions. session_ids: array of session IDs ''' return self.for_each_session(session_ids, self.close_session) - async def rpc_log(self, session_ids): + def rpc_log(self, session_ids): '''Toggle logging of sesssions. session_ids: array of session IDs ''' return self.for_each_session(session_ids, self.toggle_logging) - async def rpc_stop(self): + def rpc_stop(self): '''Shut down the server cleanly.''' self.initiate_shutdown() return 'stopping' - async def rpc_getinfo(self): + def rpc_getinfo(self): '''Return summary information about the server process.''' return self.server_summary() - async def rpc_groups(self): + def rpc_groups(self): '''Return statistics about the session groups.''' return self.group_data() - async def rpc_sessions(self): + def rpc_sessions(self): '''Return statistics about connected sessions.''' return self.session_data(for_log=False) - async def rpc_peers(self): + def rpc_peers(self): '''Return a list of server peers, currently taken from IRC.''' return self.peers.peer_list() - async def rpc_reorg(self, count=3): + def rpc_reorg(self, count=3): '''Force a reorg of the given number of blocks. count: number of blocks to reorg (default 3) @@ -779,14 +764,14 @@ class Controller(util.LoggedClass): 'height': utxo.height, 'value': utxo.value} for utxo in sorted(await self.get_utxos(hashX))] - async def block_get_chunk(self, index): + def block_get_chunk(self, index): '''Return a chunk of block headers. index: the chunk index''' index = self.non_negative_integer(index) return self.get_chunk(index) - async def block_get_header(self, height): + def block_get_header(self, height): '''The deserialized header at a given height. height: the header's height''' @@ -879,6 +864,6 @@ class Controller(util.LoggedClass): return banner - async def donation_address(self): + def donation_address(self): '''Return the donation address as a string, empty if there is none.''' return self.env.donation_address diff --git a/server/peers.py b/server/peers.py index 541c983..31519a3 100644 --- a/server/peers.py +++ b/server/peers.py @@ -132,7 +132,7 @@ class PeerManager(util.LoggedClass): def peer_list(self): return self.irc_peers - async def subscribe(self): + def subscribe(self): '''Returns the server peers as a list of (ip, host, details) tuples. Despite the name this is not currently treated as a subscription.''' diff --git a/server/session.py b/server/session.py index 46ca43e..8916f49 100644 --- a/server/session.py +++ b/server/session.py @@ -9,39 +9,61 @@ import asyncio +import time import traceback +from functools import partial -from lib.jsonrpc import JSONRPC, RPCError +from lib.jsonrpc import JSONSession, RPCError from server.daemon import DaemonError from server.version import VERSION -class Session(JSONRPC): - '''Base class of ElectrumX JSON session protocols. +class SessionBase(JSONSession): + '''Base class of ElectrumX JSON sessions. Each session runs its tasks in asynchronous parallelism with other - sessions. To prevent some sessions blocking others, potentially - long-running requests should yield. + sessions. ''' - def __init__(self, controller, bp, env, kind): + def __init__(self, controller, kind): super().__init__() + self.kind = kind # 'RPC', 'TCP' etc. self.controller = controller - self.bp = bp - self.env = env - self.daemon = bp.daemon - self.kind = kind + self.bp = controller.bp + self.env = controller.env + self.daemon = self.bp.daemon self.client = 'unknown' - self.anon_logs = env.anon_logs - self.max_send = env.max_send - self.bandwidth_limit = env.bandwidth_limit + self.anon_logs = self.env.anon_logs self.last_delay = 0 self.txs_sent = 0 self.requests = [] - - def is_closing(self): - '''True if this session is closing.''' - return self.transport and self.transport.is_closing() + self.start_time = time.time() + self.close_time = 0 + self.bw_time = self.start_time + self.bw_interval = 3600 + self.bw_used = 0 + + def have_pending_items(self): + '''Called each time the pending item queue goes from empty to having + one item.''' + self.controller.enqueue_session(self) + + def close_connection(self): + '''Call this to close the connection.''' + self.close_time = time.time() + super().close_connection() + + def peername(self, *, for_log=True): + '''Return the peer name of this connection.''' + peer_info = self.peer_info() + if not peer_info: + return 'unknown' + if for_log and self.anon_logs: + return 'xx.xx.xx.xx:xx' + if ':' in peer_info[0]: + return '[{}]:{}'.format(peer_info[0], peer_info[1]) + else: + return '{}:{}'.format(peer_info[0], peer_info[1]) def flags(self): '''Status flags.''' @@ -53,42 +75,6 @@ class Session(JSONRPC): status += str(self.controller.session_priority(self)) return status - def requests_remaining(self): - return sum(request.remaining for request in self.requests) - - def enqueue_request(self, request): - '''Add a request to the session's list.''' - self.requests.append(request) - if len(self.requests) == 1: - self.controller.enqueue_session(self) - - async def serve_requests(self): - '''Serve requests in batches.''' - total = 0 - errs = [] - # Process 8 items at a time - for request in self.requests: - try: - initial = request.remaining - await request.process(self) - total += initial - request.remaining - except asyncio.CancelledError: - raise - except Exception: - # Should probably be considered a bug and fixed - self.log_error('error handling request {}'.format(request)) - traceback.print_exc() - errs.append(request) - await asyncio.sleep(0) - if total >= 8: - break - - # Remove completed requests and re-enqueue ourself if any remain. - self.requests = [req for req in self.requests - if req.remaining and not req in errs] - if self.requests: - self.controller.enqueue_session(self) - def connection_made(self, transport): '''Handle an incoming client connection.''' super().connection_made(transport) @@ -96,27 +82,32 @@ class Session(JSONRPC): def connection_lost(self, exc): '''Handle client disconnection.''' - super().connection_lost(exc) - if (self.pause or self.controller.is_deprioritized(self) - or self.send_size >= 1024*1024 or self.error_count): - self.log_info('disconnected. Sent {:,d} bytes in {:,d} messages ' - '{:,d} errors' - .format(self.send_size, self.send_count, - self.error_count)) + msg = '' + if self.pause: + msg += ' whilst paused' + if self.controller.is_deprioritized(self): + msg += ' whilst deprioritized' + if self.send_size >= 1024*1024: + msg += ('. Sent {:,d} bytes in {:,d} messages' + .format(self.send_size, self.send_count)) + if msg: + msg = 'disconnected' + msg + self.log_info(msg) self.controller.remove_session(self) def sub_count(self): return 0 -class ElectrumX(Session): +class ElectrumX(SessionBase): '''A TCP server that handles incoming Electrum connections.''' - def __init__(self, *args): - super().__init__(*args) + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) self.subscribe_headers = False self.subscribe_height = False self.notified_height = None + self.max_send = self.env.max_send self.max_subs = self.env.max_session_subs self.hashX_subs = {} self.electrumx_handlers = { @@ -124,7 +115,7 @@ class ElectrumX(Session): 'blockchain.headers.subscribe': self.headers_subscribe, 'blockchain.numblocks.subscribe': self.numblocks_subscribe, 'blockchain.transaction.broadcast': self.transaction_broadcast, - 'server.version': self.version, + 'server.version': self.server_version, } def sub_count(self): @@ -167,12 +158,12 @@ class ElectrumX(Session): '''Used as response to a headers subscription request.''' return self.controller.electrum_header(self.height()) - async def headers_subscribe(self): + def headers_subscribe(self): '''Subscribe to get headers of new blocks.''' self.subscribe_headers = True return self.current_electrum_header() - async def numblocks_subscribe(self): + def numblocks_subscribe(self): '''Subscribe to get height of new blocks.''' self.subscribe_height = True return self.height() @@ -190,7 +181,7 @@ class ElectrumX(Session): self.hashX_subs[hashX] = address return status - async def version(self, client_name=None, protocol_version=None): + def server_version(self, client_name=None, protocol_version=None): '''Returns the server version as a string. client_name: a string identifying the client @@ -241,13 +232,13 @@ class ElectrumX(Session): return handler -class LocalRPC(Session): - '''A local TCP RPC server for querying status.''' +class LocalRPC(SessionBase): + '''A local TCP RPC server session.''' - def __init__(self, *args): - super().__init__(*args) + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) self.client = 'RPC' - self.max_send = 5000000 + self.max_send = 0 def request_handler(self, method): '''Return the async handler for the given request method.'''