diff --git a/.travis.yml b/.travis.yml index 4729f1d..54f6b7d 100644 --- a/.travis.yml +++ b/.travis.yml @@ -15,5 +15,8 @@ install: - pip install lmdb - pip install plyvel - pip install pyrocksdb + - pip install pytest-cov + - pip install python-coveralls # command to run tests -script: pytest \ No newline at end of file +script: pytest --cov=server --cov=lib +after_success: coveralls \ No newline at end of file diff --git a/README.rst b/README.rst index 3efb463..4b796da 100644 --- a/README.rst +++ b/README.rst @@ -1,5 +1,8 @@ .. image:: https://travis-ci.org/kyuupichan/electrumx.svg?branch=master :target: https://travis-ci.org/kyuupichan/electrumx +.. image:: https://coveralls.io/repos/github/kyuupichan/electrumx/badge.svg + :target: https://coveralls.io/github/kyuupichan/electrumx + ElectrumX - Reimplementation of Electrum-server =============================================== @@ -76,12 +79,11 @@ ElectrumX should not have any need of threads. Roadmap ======= +- store all UTXOs, not just those with addresses - test a few more performance improvement ideas - implement light caching of client responses - yield during expensive requests and/or penalize the connection - improve DB abstraction so LMDB is not penalized -- continue to clean up the code and remove layering violations -- store all UTXOs, not just those with addresses - potentially move some functionality to C or C++ The above are in no particular order. diff --git a/docs/HOWTO.rst b/docs/HOWTO.rst index 21f7379..ad468c1 100644 --- a/docs/HOWTO.rst +++ b/docs/HOWTO.rst @@ -14,7 +14,8 @@ small - patches welcome. + irc: Python IRC package. Only required if you enable IRC; ElectrumX will happily serve clients that try to connect directly. I use 15.0.4 but older versions likely are fine. - ++ x11_hash: Python X11 Hash package. Only required if you use ElectrumX + with Dash Mainnet or Testnet. Version 1.4 tested. While not requirements for running ElectrumX, it is intended to be run with supervisor software such as Daniel Bernstein's daemontools, diff --git a/docs/RELEASE-NOTES b/docs/RELEASE-NOTES index 3243c45..6627a61 100644 --- a/docs/RELEASE-NOTES +++ b/docs/RELEASE-NOTES @@ -1,3 +1,12 @@ +version 0.4.2 +------------- + +- split out JSON RPC protcol handling. Now more robust and we should + fully support JSON RPC 2.0 clients, including batch requests + (Electrum client does not yet support these) +- refactored and cleaned up server handling +- improved DASH support (thelazier) + version 0.4.1 ------------- diff --git a/electrumx_rpc.py b/electrumx_rpc.py index 9d3a9c2..f7f05e1 100755 --- a/electrumx_rpc.py +++ b/electrumx_rpc.py @@ -32,7 +32,7 @@ class RPCClient(asyncio.Protocol): def send(self, method, params): self.method = method - payload = {'method': method, 'params': params} + payload = {'method': method, 'params': params, 'id': 'RPC'} data = json.dumps(payload) + '\n' self.transport.write(data.encode()) @@ -44,12 +44,28 @@ class RPCClient(asyncio.Protocol): if error: print("ERROR: {}".format(error)) else: + def data_fmt(count, size): + return '{:,d}/{:,d}KB'.format(count, size // 1024) + def time_fmt(t): + t = int(t) + return ('{:3d}:{:02d}:{:02d}' + .format(t // 3600, (t % 3600) // 60, t % 60)) + if self.method == 'sessions': - fmt = '{:<4} {:>23} {:>7} {:>15} {:>7}' - print(fmt.format('Type', 'Peer', 'Subs', 'Client', 'Time')) - for kind, peer, subs, client, time in result: - print(fmt.format(kind, peer, '{:,d}'.format(subs), - client, '{:,d}'.format(int(time)))) + fmt = ('{:<4} {:>23} {:>15} {:>5} ' + '{:>7} {:>7} {:>7} {:>7} {:>5} {:>9}') + print(fmt.format('Type', 'Peer', 'Client', 'Subs', + 'Snt #', 'Snt MB', 'Rcv #', 'Rcv MB', + 'Errs', 'Time')) + for (kind, peer, subs, client, recv_count, recv_size, + send_count, send_size, error_count, time) in result: + print(fmt.format(kind, peer, client, '{:,d}'.format(subs), + '{:,d}'.format(recv_count), + '{:,.1f}'.format(recv_size / 1048576), + '{:,d}'.format(send_count), + '{:,.1f}'.format(send_size / 1048576), + '{:,d}'.format(error_count), + time_fmt(time))) else: pprint.pprint(result, indent=4) diff --git a/lib/coins.py b/lib/coins.py index 8e0ad0a..b9b9688 100644 --- a/lib/coins.py +++ b/lib/coins.py @@ -307,24 +307,39 @@ class DogecoinTestnet(Coin): WIF_BYTE = 0xf1 -# Source: pycoin +# Source: https://github.com/dashpay/dash class Dash(Coin): NAME = "Dash" SHORTNAME = "DASH" NET = "mainnet" XPUB_VERBYTES = bytes.fromhex("02fe52cc") XPRV_VERBYTES = bytes.fromhex("02fe52f8") + GENESIS_HASH = (b'00000ffd590b1485b3caadc19b22e637' + b'9c733355108f107a430458cdf3407ab6') P2PKH_VERBYTE = 0x4c P2SH_VERBYTE = 0x10 WIF_BYTE = 0xcc + TX_COUNT_HEIGHT = 569399 + TX_COUNT = 2157510 + TX_PER_BLOCK = 4 + @classmethod + def header_hashes(cls, header): + '''Given a header return the previous and current block hashes.''' + import x11_hash + return header[4:36], x11_hash.getPoWHash(header) -class DashTestnet(Coin): - NAME = "Dogecoin" +class DashTestnet(Dash): + NAME = "Dash" SHORTNAME = "tDASH" NET = "testnet" XPUB_VERBYTES = bytes.fromhex("3a805837") XPRV_VERBYTES = bytes.fromhex("3a8061a0") - P2PKH_VERBYTE = 0x8b + GENESIS_HASH = (b'00000bafbc94add76cb75e2ec9289483' + b'7288a481e5c005f6563d91623bf8bc2c') + P2PKH_VERBYTE = 0x8c P2SH_VERBYTE = 0x13 WIF_BYTE = 0xef + TX_COUNT_HEIGHT = 101619 + TX_COUNT = 132681 + TX_PER_BLOCK = 1 diff --git a/lib/jsonrpc.py b/lib/jsonrpc.py new file mode 100644 index 0000000..d334d4f --- /dev/null +++ b/lib/jsonrpc.py @@ -0,0 +1,249 @@ +# Copyright (c) 2016, Neil Booth +# +# All rights reserved. +# +# See the file "LICENCE" for information about the copyright +# and warranty status of this software. + +'''Class for handling JSON RPC 2.0 connections, server or client.''' + +import asyncio +import json +import numbers +import time + +from lib.util import LoggedClass + + +def json_result_payload(result, id_): + # We should not respond to notifications + assert id_ is not None + return {'jsonrpc': '2.0', 'error': None, 'result': result, 'id': id_} + +def json_error_payload(message, code, id_=None): + error = {'message': message, 'code': code} + return {'jsonrpc': '2.0', 'error': error, 'result': None, 'id': id_} + +def json_notification_payload(method, params): + return {'jsonrpc': '2.0', 'id': None, 'method': method, 'params': params} + + +class JSONRPC(asyncio.Protocol, LoggedClass): + '''Manages a JSONRPC connection. + + Assumes JSON messages are newline-separated and that newlines + cannot appear in the JSON other than to separate lines. + + Derived classes need to implement the synchronous functions + on_json_request() and method_handler(). They probably also want + to override connection_made() and connection_lost() but should be + sure to call the implementation in this base class first. + + on_json_request() is passed a JSON request as a python object + after decoding. It should arrange to pass on to the asynchronous + handle_json_request() method. + + method_handler() takes a method string and should return a function + that can be passed a parameters array, or None for an unknown method. + + Handlers should raise an RPCError on error. + ''' + + # See http://www.jsonrpc.org/specification + PARSE_ERROR = -32700 + INVALID_REQUEST = -32600 + METHOD_NOT_FOUND = -32601 + INVALID_PARAMS = -32602 + INTERAL_ERROR = -32603 + + ID_TYPES = (type(None), str, numbers.Number) + + class RPCError(Exception): + '''RPC handlers raise this error.''' + def __init__(self, msg, code=-1, **kw_args): + super().__init__(**kw_args) + self.msg = msg + self.code + + + def __init__(self): + super().__init__() + self.start = time.time() + self.transport = None + # Parts of an incomplete JSON line. We buffer them until + # getting a newline. + self.parts = [] + # recv_count is JSON messages not calls to data_received() + self.recv_count = 0 + self.recv_size = 0 + self.send_count = 0 + self.send_size = 0 + self.error_count = 0 + + def connection_made(self, transport): + '''Handle an incoming client connection.''' + self.transport = transport + + def peer_info(self): + '''Return peer info.''' + if self.transport: + return self.transport.get_extra_info('peername') + return None + + def connection_lost(self, exc): + '''Handle client disconnection.''' + pass + + def data_received(self, data): + '''Handle incoming data (synchronously). + + Requests end in newline characters. Pass complete requests to + decode_message for handling. + ''' + self.recv_size += len(data) + while True: + npos = data.find(ord('\n')) + if npos == -1: + self.parts.append(data) + break + self.recv_count += 1 + tail, data = data[:npos], data[npos + 1:] + parts, self.parts = self.parts, [] + parts.append(tail) + self.decode_message(b''.join(parts)) + + def decode_message(self, message): + '''Decode a binary message and queue it for asynchronous handling. + + Messages that cannot be decoded are logged and dropped. + ''' + try: + message = message.decode() + except UnicodeDecodeError as e: + msg = 'cannot decode binary bytes: {}'.format(e) + self.logger.warning(msg) + self.send_json_error(msg, self.PARSE_ERROR) + return + + try: + message = json.loads(message) + except json.JSONDecodeError as e: + msg = 'cannot decode JSON: {}'.format(e) + self.logger.warning(msg) + self.send_json_error(msg, self.PARSE_ERROR) + return + + self.on_json_request(message) + + def send_json_notification(self, method, params): + '''Create a json notification.''' + return self.send_json(json_notification_payload(method, params)) + + def send_json_result(self, result, id_): + '''Send a JSON result.''' + return self.send_json(json_result_payload(result, id_)) + + def send_json_error(self, message, code, id_=None): + '''Send a JSON error.''' + self.error_count += 1 + return self.send_json(json_error_payload(message, code, id_)) + + def send_json(self, payload): + '''Send a JSON payload.''' + if self.transport.is_closing(): + self.logger.info('send_json: connection closing, not sending') + return False + + try: + data = (json.dumps(payload) + '\n').encode() + except TypeError: + msg = 'JSON encoding failure: {}'.format(payload) + self.logger.error(msg) + return self.send_json_error(msg, self.INTERNAL_ERROR, + payload.get('id')) + + self.send_count += 1 + self.send_size += len(data) + self.transport.write(data) + return True + + async def handle_json_request(self, request): + '''Asynchronously handle a JSON request. + + Handles batch requests. Returns True if the request response + was sent (or if nothing was sent because the request was a + notification). Returns False if the send was aborted because + the connection is closing. + ''' + if isinstance(request, list): + payload = self.batch_request_payload(request) + else: + payload = await self.single_request_payload(request) + + if not payload: + return True + return self.send_json(payload) + + async def batch_request_payload(self, batch): + '''Return the JSON payload corresponding to a batch JSON request.''' + # Batches must have at least one request. + if not batch: + return json_error_payload('empty request list', + self.INVALID_REQUEST) + + # PYTHON 3.6: use asynchronous comprehensions when supported + payload = [] + for item in request: + item_payload = await self.single_request_payload(item) + if item_payload: + payload.append(item_payload) + return payload + + async def single_request_payload(self, request): + '''Return the JSON payload corresponding to a single JSON request. + + Return None if the request is a notification. + ''' + if not isinstance(request, dict): + return json_error_payload('request must be a dict', + self.INVALID_REQUEST) + + id_ = request.get('id') + if not isinstance(id_, self.ID_TYPES): + return json_error_payload('invalid id: {}'.format(id_), + self.INVALID_REQUEST) + + try: + result = await self.method_result(request.get('method'), + request.get('params', [])) + if id_ is None: + return None + return json_result_payload(result, id_) + except self.RPCError as e: + if id_ is None: + return None + return json_error_payload(e.msg, e.code, id_) + + async def method_result(self, method, params): + if not isinstance(method, str): + raise self.RPCError('invalid method: {}'.format(method), + self.INVALID_REQUEST) + + if not isinstance(params, list): + raise self.RPCError('params should be an array', + self.INVALID_REQUEST) + + handler = self.method_handler(method) + if not handler: + raise self.RPCError('unknown method: {}'.format(method), + self.METHOD_NOT_FOUND) + + return await handler(params) + + def on_json_request(self, request): + raise NotImplementedError('on_json_request in class {}'. + format(self.__class__.__name__)) + + def method_handler(self, method): + raise NotImplementedError('method_handler in class {}'. + format(self.__class__.__name__)) diff --git a/server/protocol.py b/server/protocol.py index ab3899d..fb55466 100644 --- a/server/protocol.py +++ b/server/protocol.py @@ -18,6 +18,7 @@ from collections import namedtuple from functools import partial from lib.hash import sha256, double_sha256, hash_to_str, hex_str_to_hash +from lib.jsonrpc import JSONRPC, json_notification_payload from lib.util import LoggedClass from server.block_processor import BlockProcessor from server.daemon import DaemonError @@ -25,38 +26,49 @@ from server.irc import IRC from server.version import VERSION -class RPCError(Exception): - '''RPC handlers raise this error.''' +class BlockServer(BlockProcessor): + '''Like BlockProcessor but also has a server manager and starts + servers when caught up.''' + + def __init__(self, env): + super().__init__(env) + self.server_mgr = ServerManager(self, env) + self.bs_caught_up = False + + async def caught_up(self, mempool_hashes): + await super().caught_up(mempool_hashes) + if not self.bs_caught_up: + await self.server_mgr.start_servers() + self.bs_caught_up = True + self.server_mgr.notify(self.height, self.touched) + def stop(self): + '''Close the listening servers.''' + self.server_mgr.stop() -def json_notification(method, params): - '''Create a json notification.''' - return {'id': None, 'method': method, 'params': params} +class ServerManager(LoggedClass): + '''Manages the servers.''' -class BlockServer(BlockProcessor): - '''Like BlockProcessor but also starts servers when caught up.''' + AsyncTask = namedtuple('AsyncTask', 'session job') - def __init__(self, env): - super().__init__(env) + def __init__(self, bp, env): + super().__init__() + self.bp = bp + self.env = env self.servers = [] self.irc = IRC(env) + self.sessions = set() + self.tasks = asyncio.Queue() + self.current_task = None - async def caught_up(self, mempool_hashes): - await super().caught_up(mempool_hashes) - if not self.servers: - await self.start_servers() - if self.env.irc: - self.logger.info('starting IRC coroutine') - asyncio.ensure_future(self.irc.start()) - else: - self.logger.info('IRC disabled') - ElectrumX.notify(self.height, self.touched) - - async def start_server(self, class_name, kind, host, port, *, ssl=None): + async def start_server(self, kind, *args, **kw_args): loop = asyncio.get_event_loop() - protocol = partial(class_name, self.env, kind) - server = loop.create_server(protocol, host, port, ssl=ssl) + protocol_class = LocalRPC if kind == 'RPC' else ElectrumX + protocol = partial(protocol_class, self, self.bp, self.env, kind) + server = loop.create_server(protocol, *args, **kw_args) + + host, port = args[:2] try: self.servers.append(await server) except asyncio.CancelledError: @@ -69,45 +81,44 @@ class BlockServer(BlockProcessor): .format(kind, host, port)) async def start_servers(self): - '''Start listening on RPC, TCP and SSL ports. + '''Connect to IRC and start listening for incoming connections. - Does not start a server if the port wasn't specified. + Only connect to IRC if enabled. Start listening on RCP, TCP + and SSL ports only if the port wasn pecified. ''' env = self.env - JSONRPC.init(self, self.daemon, self.coin) + if env.rpc_port is not None: - await self.start_server(LocalRPC, 'RPC', 'localhost', env.rpc_port) + await self.start_server('RPC', 'localhost', env.rpc_port) if env.tcp_port is not None: - await self.start_server(ElectrumX, 'TCP', env.host, env.tcp_port) + await self.start_server('TCP', env.host, env.tcp_port) if env.ssl_port is not None: # FIXME: update if we want to require Python >= 3.5.3 sslc = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2) sslc.load_cert_chain(env.ssl_certfile, keyfile=env.ssl_keyfile) - await self.start_server(ElectrumX, 'SSL', env.host, - env.ssl_port, ssl=sslc) + await self.start_server('SSL', env.host, env.ssl_port, ssl=sslc) + + asyncio.ensure_future(self.run_tasks()) + + if env.irc: + self.logger.info('starting IRC coroutine') + asyncio.ensure_future(self.irc.start()) + else: + self.logger.info('IRC disabled') + + def notify(self, height, touched): + '''Notify sessions about height changes and touched addresses.''' + sessions = [session for session in self.sessions + if isinstance(session, ElectrumX)] + ElectrumX.notify(sessions, height, touched) def stop(self): '''Close the listening servers.''' for server in self.servers: server.close() - def irc_peers(self): - return self.irc.peers - - -AsyncTask = namedtuple('AsyncTask', 'session job') - -class SessionManager(LoggedClass): - - def __init__(self): - super().__init__() - self.sessions = set() - self.tasks = asyncio.Queue() - self.current_task = None - asyncio.ensure_future(self.run_tasks()) - def add_session(self, session): assert session not in self.sessions self.sessions.add(session) @@ -121,7 +132,7 @@ class SessionManager(LoggedClass): def add_task(self, session, job): assert session in self.sessions task = asyncio.ensure_future(job) - self.tasks.put_nowait(AsyncTask(session, task)) + self.tasks.put_nowait(self.AsyncTask(session, task)) async def run_tasks(self): '''Asynchronously run through the task queue.''' @@ -141,104 +152,82 @@ class SessionManager(LoggedClass): finally: self.current_task = None + def irc_peers(self): + return self.irc.peers + + def session_count(self): + return len(self.sessions) + + def info(self): + '''Returned in the RPC 'getinfo' call.''' + address_count = sum(len(session.hash168s) + for session in self.sessions + if isinstance(session, ElectrumX)) + return { + 'blocks': self.bp.height, + 'peers': len(self.irc_peers()), + 'sessions': self.session_count(), + 'watched': address_count, + 'cached': 0, + } + + def sessions_info(self): + '''Returned to the RPC 'sessions' call.''' + now = time.time() + return [(session.kind, + session.peername(), + len(session.hash168s), + 'RPC' if isinstance(session, LocalRPC) else session.client, + session.recv_count, session.recv_size, + session.send_count, session.send_size, + session.error_count, + now - session.start) + for session in self.sessions] -class JSONRPC(asyncio.Protocol, LoggedClass): - '''Base class that manages a JSONRPC connection.''' - def __init__(self): +class Session(JSONRPC): + '''Base class of ElectrumX JSON session protocols.''' + + def __init__(self, manager, bp, env, kind): super().__init__() - self.parts = [] - self.send_count = 0 - self.send_size = 0 - self.error_count = 0 + self.manager = manager + self.bp = bp + self.env = env + self.daemon = bp.daemon + self.coin = bp.coin + self.kind = kind self.hash168s = set() - self.start = time.time() self.client = 'unknown' - self.peername = 'unknown' def connection_made(self, transport): '''Handle an incoming client connection.''' - self.transport = transport - peer = transport.get_extra_info('peername') - self.peername = '{}:{}'.format(peer[0], peer[1]) - self.logger.info('connection from {}'.format(self.peername)) - self.SESSION_MGR.add_session(self) + super().connection_made(transport) + self.logger.info('connection from {}'.format(self.peername())) + self.manager.add_session(self) def connection_lost(self, exc): '''Handle client disconnection.''' - self.logger.info('{} disconnected. ' - 'Sent {:,d} bytes in {:,d} messages {:,d} errors' - .format(self.peername, self.send_size, - self.send_count, self.error_count)) - self.SESSION_MGR.remove_session(self) - - def data_received(self, data): - '''Handle incoming data (synchronously). - - Requests end in newline characters. Pass complete requests to - decode_message for handling. - ''' - while True: - npos = data.find(ord('\n')) - if npos == -1: - self.parts.append(data) - break - tail, data = data[:npos], data[npos + 1:] - parts, self.parts = self.parts, [] - parts.append(tail) - self.decode_message(b''.join(parts)) - - def decode_message(self, message): - '''Decode a binary message and queue it for asynchronous handling.''' - try: - message = json.loads(message.decode()) - except Exception as e: - self.logger.info('error decoding JSON message: {}'.format(e)) - else: - self.SESSION_MGR.add_task(self, self.request_handler(message)) - - async def request_handler(self, request): - '''Called asynchronously.''' - error = result = None - try: - handler = self.rpc_handler(request.get('method'), - request.get('params', [])) - result = await handler() - except RPCError as e: - self.error_count += 1 - error = {'code': 1, 'message': e.args[0]} - payload = {'id': request.get('id'), 'error': error, 'result': result} - if not self.json_send(payload): - # Let asyncio call connection_lost() so we stop this - # session's tasks - await asyncio.sleep(0) - - def json_send(self, payload): - if self.transport.is_closing(): - self.logger.info('connection closing, not writing') - return False - - data = (json.dumps(payload) + '\n').encode() - self.transport.write(data) - self.send_count += 1 - self.send_size += len(data) - return True - - def rpc_handler(self, method, params): - handler = None - if isinstance(method, str): - handler = self.handlers.get(method) - if not handler: - self.logger.info('unknown method: {}'.format(method)) - raise RPCError('unknown method: {}'.format(method)) - - if not isinstance(params, list): - raise RPCError('params should be an array') - - return partial(handler, self, params) - - @classmethod - def tx_hash_from_param(cls, param): + super().connection_lost(exc) + if self.error_count or self.send_size >= 250000: + self.logger.info('{} disconnected. ' + 'Sent {:,d} bytes in {:,d} messages {:,d} errors' + .format(self.peername(), self.send_size, + self.send_count, self.error_count)) + self.manager.remove_session(self) + + def method_handler(self, method): + '''Return the handler that will handle the RPC method.''' + return self.handlers.get(method) + + def on_json_request(self, request): + '''Queue the request for asynchronous handling.''' + self.manager.add_task(self, self.handle_json_request(request)) + + def peername(self): + info = self.peer_info() + return 'unknown' if not info else '{}:{}'.format(info[0], info[1]) + + def tx_hash_from_param(self, param): '''Raise an RPCError if the parameter is not a valid transaction hash.''' if isinstance(param, str) and len(param) == 64: @@ -250,17 +239,15 @@ class JSONRPC(asyncio.Protocol, LoggedClass): raise RPCError('parameter should be a transaction hash: {}' .format(param)) - @classmethod - def hash168_from_param(cls, param): + def hash168_from_param(self, param): if isinstance(param, str): try: - return cls.COIN.address_to_hash168(param) + return self.coin.address_to_hash168(param) except: pass raise RPCError('parameter should be a valid address: {}'.format(param)) - @classmethod - def non_negative_integer_from_param(cls, param): + def non_negative_integer_from_param(self, param): try: param = int(param) except ValueError: @@ -272,62 +259,28 @@ class JSONRPC(asyncio.Protocol, LoggedClass): raise RPCError('param should be a non-negative integer: {}' .format(param)) - @classmethod - def extract_hash168(cls, params): + def extract_hash168(self, params): if len(params) == 1: - return cls.hash168_from_param(params[0]) + return self.hash168_from_param(params[0]) raise RPCError('params should contain a single address: {}' .format(params)) - @classmethod - def extract_non_negative_integer(cls, params): + def extract_non_negative_integer(self, params): if len(params) == 1: - return cls.non_negative_integer_from_param(params[0]) + return self.non_negative_integer_from_param(params[0]) raise RPCError('params should contain a non-negative integer: {}' .format(params)) - @classmethod - def require_empty_params(cls, params): + def require_empty_params(self, params): if params: raise RPCError('params should be empty: {}'.format(params)) - @classmethod - def init(cls, block_processor, daemon, coin): - cls.BLOCK_PROCESSOR = block_processor - cls.DAEMON = daemon - cls.COIN = coin - cls.SESSION_MGR = SessionManager() - - @classmethod - def irc_peers(cls): - return cls.BLOCK_PROCESSOR.irc_peers() - - @classmethod - def height(cls): - '''Return the current height.''' - return cls.BLOCK_PROCESSOR.height - - @classmethod - def electrum_header(cls, height=None): - '''Return the binary header at the given height.''' - if not 0 <= height <= cls.height(): - raise RPCError('height {:,d} out of range'.format(height)) - header = cls.BLOCK_PROCESSOR.read_headers(height, 1) - return cls.COIN.electrum_header(header, height) - @classmethod - def current_electrum_header(cls): - '''Used as response to a headers subscription request.''' - return cls.electrum_header(cls.height()) - - -class ElectrumX(JSONRPC): +class ElectrumX(Session): '''A TCP server that handles incoming Electrum connections.''' - def __init__(self, env, kind): - super().__init__() - self.env = env - self.kind = kind + def __init__(self, *args): + super().__init__(*args) self.subscribe_headers = False self.subscribe_height = False self.notified_height = None @@ -342,54 +295,62 @@ class ElectrumX(JSONRPC): 'banner donation_address peers.subscribe version'), ] self.handlers = {'.'.join([prefix, suffix]): - getattr(self.__class__, suffix.replace('.', '_')) + getattr(self, suffix.replace('.', '_')) for prefix, suffixes in rpcs for suffix in suffixes.split()} @classmethod - def watched_address_count(cls): - sessions = cls.SESSION_MGR.sessions - return sum(len(session.hash168s) for session in sessions) - - @classmethod - def notify(cls, height, touched): - '''Notify electrum clients about height changes and touched - addresses.''' - headers_payload = json_notification( - 'blockchain.headers.subscribe', - (cls.electrum_header(height), ), - ) - height_payload = json_notification( - 'blockchain.numblocks.subscribe', - (height, ), - ) - hash168_to_address = cls.COIN.hash168_to_address - - for session in cls.SESSION_MGR.sessions: - if not isinstance(session, ElectrumX): - continue + def notify(cls, sessions, height, touched): + headers_payload = height_payload = None + for session in sessions: if height != session.notified_height: session.notified_height = height if session.subscribe_headers: - session.json_send(headers_payload) - if session.subscribe_height: - session.json_send(height_payload) + if headers_payload is None: + headers_payload = json_notification_payload( + 'blockchain.headers.subscribe', + (session.electrum_header(height), ), + ) + session.send_json(headers_payload) + if session.subscribe_height: + if height_payload is None: + height_payload = json_notification_payload( + 'blockchain.numblocks.subscribe', + (height, ), + ) + session.send_json(height_payload) + + hash168_to_address = session.coin.hash168_to_address for hash168 in session.hash168s.intersection(touched): address = hash168_to_address(hash168) - status = cls.address_status(hash168) - payload = json_notification('blockchain.address.subscribe', - (address, status)) - session.json_send(payload) + status = session.address_status(hash168) + payload = json_notification_payload( + 'blockchain.address.subscribe', (address, status)) + session.send_json(payload) - @classmethod - def address_status(cls, hash168): + def height(self): + '''Return the block processor's current height.''' + return self.bp.height + + def current_electrum_header(self): + '''Used as response to a headers subscription request.''' + return self.electrum_header(self.height()) + + def electrum_header(self, height): + '''Return the binary header at the given height.''' + if not 0 <= height <= self.height(): + raise RPCError('height {:,d} out of range'.format(height)) + header = self.bp.read_headers(height, 1) + return self.coin.electrum_header(header, height) + + def address_status(self, hash168): '''Returns status as 32 bytes.''' # Note history is ordered and mempool unordered in electrum-server # For mempool, height is -1 if unconfirmed txins, otherwise 0 - history = cls.BLOCK_PROCESSOR.get_history(hash168) - mempool = cls.BLOCK_PROCESSOR.mempool_transactions(hash168) + history = self.bp.get_history(hash168) + mempool = self.bp.mempool_transactions(hash168) status = ''.join('{}:{:d}:'.format(hash_to_str(tx_hash), height) for tx_hash, height in history) @@ -399,11 +360,10 @@ class ElectrumX(JSONRPC): return sha256(status.encode()).hex() return None - @classmethod - async def tx_merkle(cls, tx_hash, height): + async def tx_merkle(self, tx_hash, height): '''tx_hash is a hex string.''' - hex_hashes = await cls.DAEMON.block_hex_hashes(height, 1) - block = await cls.DAEMON.deserialised_block(hex_hashes[0]) + hex_hashes = await self.daemon.block_hex_hashes(height, 1) + block = await self.daemon.deserialised_block(hex_hashes[0]) tx_hashes = block['tx'] # This will throw if the tx_hash is bad pos = tx_hashes.index(tx_hash) @@ -422,16 +382,11 @@ class ElectrumX(JSONRPC): return {"block_height": height, "merkle": merkle_branch, "pos": pos} - @classmethod - def height(cls): - return cls.BLOCK_PROCESSOR.height - - @classmethod - def get_history(cls, hash168): + def get_history(self, hash168): # Note history is ordered and mempool unordered in electrum-server # For mempool, height is -1 if unconfirmed txins, otherwise 0 - history = cls.BLOCK_PROCESSOR.get_history(hash168, limit=None) - mempool = cls.BLOCK_PROCESSOR.mempool_transactions(hash168) + history = self.bp.get_history(hash168, limit=None) + mempool = self.bp.mempool_transactions(hash168) conf = tuple({'tx_hash': hash_to_str(tx_hash), 'height': height} for tx_hash, height in history) @@ -439,24 +394,21 @@ class ElectrumX(JSONRPC): for tx_hash, fee, unconfirmed in mempool) return conf + unconf - @classmethod - def get_chunk(cls, index): + def get_chunk(self, index): '''Return header chunk as hex. Index is a non-negative integer.''' - chunk_size = cls.COIN.CHUNK_SIZE - next_height = cls.height() + 1 + chunk_size = self.coin.CHUNK_SIZE + next_height = self.height() + 1 start_height = min(index * chunk_size, next_height) count = min(next_height - start_height, chunk_size) - return cls.BLOCK_PROCESSOR.read_headers(start_height, count).hex() + return self.bp.read_headers(start_height, count).hex() - @classmethod - def get_balance(cls, hash168): - confirmed = cls.BLOCK_PROCESSOR.get_balance(hash168) - unconfirmed = cls.BLOCK_PROCESSOR.mempool_value(hash168) + def get_balance(self, hash168): + confirmed = self.bp.get_balance(hash168) + unconfirmed = self.bp.mempool_value(hash168) return {'confirmed': confirmed, 'unconfirmed': unconfirmed} - @classmethod - def list_unspent(cls, hash168): - utxos = cls.BLOCK_PROCESSOR.get_utxos_sorted(hash168) + def list_unspent(self, hash168): + utxos = self.bp.get_utxos_sorted(hash168) return tuple({'tx_hash': hash_to_str(utxo.tx_hash), 'tx_pos': utxo.tx_pos, 'height': utxo.height, 'value': utxo.value} @@ -498,7 +450,7 @@ class ElectrumX(JSONRPC): return self.electrum_header(height) async def estimatefee(self, params): - return await self.DAEMON.estimatefee(params) + return await self.daemon.estimatefee(params) async def headers_subscribe(self, params): self.require_empty_params(params) @@ -514,7 +466,7 @@ class ElectrumX(JSONRPC): '''The minimum fee a low-priority tx must pay in order to be accepted to the daemon's memory pool.''' self.require_empty_params(params) - return await self.DAEMON.relayfee() + return await self.daemon.relayfee() async def transaction_broadcast(self, params): '''Pass through the parameters to the daemon. @@ -525,7 +477,7 @@ class ElectrumX(JSONRPC): user interface job here. ''' try: - tx_hash = await self.DAEMON.sendrawtransaction(params) + tx_hash = await self.daemon.sendrawtransaction(params) self.logger.info('sent tx: {}'.format(tx_hash)) return tx_hash except DaemonError as e: @@ -550,7 +502,7 @@ class ElectrumX(JSONRPC): # in anticipation it might be dropped in the future. if 1 <= len(params) <= 2: tx_hash = self.tx_hash_from_param(params[0]) - return await self.DAEMON.getrawtransaction(tx_hash) + return await self.daemon.getrawtransaction(tx_hash) raise RPCError('params wrong length: {}'.format(params)) @@ -567,9 +519,9 @@ class ElectrumX(JSONRPC): tx_hash = self.tx_hash_from_param(params[0]) index = self.non_negative_integer_from_param(params[1]) tx_hash = hex_str_to_hash(tx_hash) - hash168 = self.BLOCK_PROCESSOR.get_utxo_hash168(tx_hash, index) + hash168 = self.bp.get_utxo_hash168(tx_hash, index) if hash168: - return self.COIN.hash168_to_address(hash168) + return self.coin.hash168_to_address(hash168) return None raise RPCError('params should contain a transaction hash and index') @@ -604,7 +556,7 @@ class ElectrumX(JSONRPC): subscription. ''' self.require_empty_params(params) - return list(self.irc_peers().values()) + return list(self.manager.irc_peers().values()) async def version(self, params): '''Return the server version as a string.''' @@ -614,37 +566,25 @@ class ElectrumX(JSONRPC): return VERSION -class LocalRPC(JSONRPC): +class LocalRPC(Session): '''A local TCP RPC server for querying status.''' - def __init__(self, env, kind): - super().__init__() + def __init__(self, *args): + super().__init__(*args) cmds = 'getinfo sessions numsessions peers numpeers'.split() - self.handlers = {cmd: getattr(self.__class__, cmd) for cmd in cmds} - self.env = env - self.kind = kind + self.handlers = {cmd: getattr(self, cmd) for cmd in cmds} async def getinfo(self, params): - return { - 'blocks': self.height(), - 'peers': len(self.irc_peers()), - 'sessions': len(self.SESSION_MGR.sessions), - 'watched': ElectrumX.watched_address_count(), - 'cached': 0, - } + return self.manager.info() async def sessions(self, params): - now = time.time() - return [(session.kind, - 'this RPC client' if session == self else session.peername, - len(session.hash168s), session.client, now - session.start) - for session in self.SESSION_MGR.sessions] + return self.manager.sessions_info() async def numsessions(self, params): - return len(self.SESSION_MGR.sessions) + return self.manager.session_count() async def peers(self, params): - return self.irc_peers() + return self.manager.irc_peers() async def numpeers(self, params): - return len(self.irc_peers()) + return len(self.manager.irc_peers()) diff --git a/setup.py b/setup.py index be23287..e8f6574 100644 --- a/setup.py +++ b/setup.py @@ -9,6 +9,7 @@ setuptools.setup( python_requires='>=3.5', # "irc" package is only required if IRC connectivity is enabled # via environment variables, in which case I've tested with 15.0.4 + # "x11_hash" package (1.4) is required to sync DASH network. install_requires=['plyvel', 'aiohttp >= 1'], packages=setuptools.find_packages(), description='ElectrumX Server',