Browse Source

Merge branch 'release-0.4.2'

master 0.4.2
Neil Booth 8 years ago
parent
commit
c64eb5748f
  1. 5
      .travis.yml
  2. 6
      README.rst
  3. 3
      docs/HOWTO.rst
  4. 9
      docs/RELEASE-NOTES
  5. 28
      electrumx_rpc.py
  6. 23
      lib/coins.py
  7. 249
      lib/jsonrpc.py
  8. 452
      server/protocol.py
  9. 1
      setup.py

5
.travis.yml

@ -15,5 +15,8 @@ install:
- pip install lmdb - pip install lmdb
- pip install plyvel - pip install plyvel
- pip install pyrocksdb - pip install pyrocksdb
- pip install pytest-cov
- pip install python-coveralls
# command to run tests # command to run tests
script: pytest script: pytest --cov=server --cov=lib
after_success: coveralls

6
README.rst

@ -1,5 +1,8 @@
.. image:: https://travis-ci.org/kyuupichan/electrumx.svg?branch=master .. image:: https://travis-ci.org/kyuupichan/electrumx.svg?branch=master
:target: https://travis-ci.org/kyuupichan/electrumx :target: https://travis-ci.org/kyuupichan/electrumx
.. image:: https://coveralls.io/repos/github/kyuupichan/electrumx/badge.svg
:target: https://coveralls.io/github/kyuupichan/electrumx
ElectrumX - Reimplementation of Electrum-server ElectrumX - Reimplementation of Electrum-server
=============================================== ===============================================
@ -76,12 +79,11 @@ ElectrumX should not have any need of threads.
Roadmap Roadmap
======= =======
- store all UTXOs, not just those with addresses
- test a few more performance improvement ideas - test a few more performance improvement ideas
- implement light caching of client responses - implement light caching of client responses
- yield during expensive requests and/or penalize the connection - yield during expensive requests and/or penalize the connection
- improve DB abstraction so LMDB is not penalized - improve DB abstraction so LMDB is not penalized
- continue to clean up the code and remove layering violations
- store all UTXOs, not just those with addresses
- potentially move some functionality to C or C++ - potentially move some functionality to C or C++
The above are in no particular order. The above are in no particular order.

3
docs/HOWTO.rst

@ -14,7 +14,8 @@ small - patches welcome.
+ irc: Python IRC package. Only required if you enable IRC; ElectrumX + irc: Python IRC package. Only required if you enable IRC; ElectrumX
will happily serve clients that try to connect directly. will happily serve clients that try to connect directly.
I use 15.0.4 but older versions likely are fine. I use 15.0.4 but older versions likely are fine.
+ x11_hash: Python X11 Hash package. Only required if you use ElectrumX
with Dash Mainnet or Testnet. Version 1.4 tested.
While not requirements for running ElectrumX, it is intended to be run While not requirements for running ElectrumX, it is intended to be run
with supervisor software such as Daniel Bernstein's daemontools, with supervisor software such as Daniel Bernstein's daemontools,

9
docs/RELEASE-NOTES

@ -1,3 +1,12 @@
version 0.4.2
-------------
- split out JSON RPC protcol handling. Now more robust and we should
fully support JSON RPC 2.0 clients, including batch requests
(Electrum client does not yet support these)
- refactored and cleaned up server handling
- improved DASH support (thelazier)
version 0.4.1 version 0.4.1
------------- -------------

28
electrumx_rpc.py

@ -32,7 +32,7 @@ class RPCClient(asyncio.Protocol):
def send(self, method, params): def send(self, method, params):
self.method = method self.method = method
payload = {'method': method, 'params': params} payload = {'method': method, 'params': params, 'id': 'RPC'}
data = json.dumps(payload) + '\n' data = json.dumps(payload) + '\n'
self.transport.write(data.encode()) self.transport.write(data.encode())
@ -44,12 +44,28 @@ class RPCClient(asyncio.Protocol):
if error: if error:
print("ERROR: {}".format(error)) print("ERROR: {}".format(error))
else: else:
def data_fmt(count, size):
return '{:,d}/{:,d}KB'.format(count, size // 1024)
def time_fmt(t):
t = int(t)
return ('{:3d}:{:02d}:{:02d}'
.format(t // 3600, (t % 3600) // 60, t % 60))
if self.method == 'sessions': if self.method == 'sessions':
fmt = '{:<4} {:>23} {:>7} {:>15} {:>7}' fmt = ('{:<4} {:>23} {:>15} {:>5} '
print(fmt.format('Type', 'Peer', 'Subs', 'Client', 'Time')) '{:>7} {:>7} {:>7} {:>7} {:>5} {:>9}')
for kind, peer, subs, client, time in result: print(fmt.format('Type', 'Peer', 'Client', 'Subs',
print(fmt.format(kind, peer, '{:,d}'.format(subs), 'Snt #', 'Snt MB', 'Rcv #', 'Rcv MB',
client, '{:,d}'.format(int(time)))) 'Errs', 'Time'))
for (kind, peer, subs, client, recv_count, recv_size,
send_count, send_size, error_count, time) in result:
print(fmt.format(kind, peer, client, '{:,d}'.format(subs),
'{:,d}'.format(recv_count),
'{:,.1f}'.format(recv_size / 1048576),
'{:,d}'.format(send_count),
'{:,.1f}'.format(send_size / 1048576),
'{:,d}'.format(error_count),
time_fmt(time)))
else: else:
pprint.pprint(result, indent=4) pprint.pprint(result, indent=4)

23
lib/coins.py

@ -307,24 +307,39 @@ class DogecoinTestnet(Coin):
WIF_BYTE = 0xf1 WIF_BYTE = 0xf1
# Source: pycoin # Source: https://github.com/dashpay/dash
class Dash(Coin): class Dash(Coin):
NAME = "Dash" NAME = "Dash"
SHORTNAME = "DASH" SHORTNAME = "DASH"
NET = "mainnet" NET = "mainnet"
XPUB_VERBYTES = bytes.fromhex("02fe52cc") XPUB_VERBYTES = bytes.fromhex("02fe52cc")
XPRV_VERBYTES = bytes.fromhex("02fe52f8") XPRV_VERBYTES = bytes.fromhex("02fe52f8")
GENESIS_HASH = (b'00000ffd590b1485b3caadc19b22e637'
b'9c733355108f107a430458cdf3407ab6')
P2PKH_VERBYTE = 0x4c P2PKH_VERBYTE = 0x4c
P2SH_VERBYTE = 0x10 P2SH_VERBYTE = 0x10
WIF_BYTE = 0xcc WIF_BYTE = 0xcc
TX_COUNT_HEIGHT = 569399
TX_COUNT = 2157510
TX_PER_BLOCK = 4
@classmethod
def header_hashes(cls, header):
'''Given a header return the previous and current block hashes.'''
import x11_hash
return header[4:36], x11_hash.getPoWHash(header)
class DashTestnet(Coin): class DashTestnet(Dash):
NAME = "Dogecoin" NAME = "Dash"
SHORTNAME = "tDASH" SHORTNAME = "tDASH"
NET = "testnet" NET = "testnet"
XPUB_VERBYTES = bytes.fromhex("3a805837") XPUB_VERBYTES = bytes.fromhex("3a805837")
XPRV_VERBYTES = bytes.fromhex("3a8061a0") XPRV_VERBYTES = bytes.fromhex("3a8061a0")
P2PKH_VERBYTE = 0x8b GENESIS_HASH = (b'00000bafbc94add76cb75e2ec9289483'
b'7288a481e5c005f6563d91623bf8bc2c')
P2PKH_VERBYTE = 0x8c
P2SH_VERBYTE = 0x13 P2SH_VERBYTE = 0x13
WIF_BYTE = 0xef WIF_BYTE = 0xef
TX_COUNT_HEIGHT = 101619
TX_COUNT = 132681
TX_PER_BLOCK = 1

249
lib/jsonrpc.py

@ -0,0 +1,249 @@
# Copyright (c) 2016, Neil Booth
#
# All rights reserved.
#
# See the file "LICENCE" for information about the copyright
# and warranty status of this software.
'''Class for handling JSON RPC 2.0 connections, server or client.'''
import asyncio
import json
import numbers
import time
from lib.util import LoggedClass
def json_result_payload(result, id_):
# We should not respond to notifications
assert id_ is not None
return {'jsonrpc': '2.0', 'error': None, 'result': result, 'id': id_}
def json_error_payload(message, code, id_=None):
error = {'message': message, 'code': code}
return {'jsonrpc': '2.0', 'error': error, 'result': None, 'id': id_}
def json_notification_payload(method, params):
return {'jsonrpc': '2.0', 'id': None, 'method': method, 'params': params}
class JSONRPC(asyncio.Protocol, LoggedClass):
'''Manages a JSONRPC connection.
Assumes JSON messages are newline-separated and that newlines
cannot appear in the JSON other than to separate lines.
Derived classes need to implement the synchronous functions
on_json_request() and method_handler(). They probably also want
to override connection_made() and connection_lost() but should be
sure to call the implementation in this base class first.
on_json_request() is passed a JSON request as a python object
after decoding. It should arrange to pass on to the asynchronous
handle_json_request() method.
method_handler() takes a method string and should return a function
that can be passed a parameters array, or None for an unknown method.
Handlers should raise an RPCError on error.
'''
# See http://www.jsonrpc.org/specification
PARSE_ERROR = -32700
INVALID_REQUEST = -32600
METHOD_NOT_FOUND = -32601
INVALID_PARAMS = -32602
INTERAL_ERROR = -32603
ID_TYPES = (type(None), str, numbers.Number)
class RPCError(Exception):
'''RPC handlers raise this error.'''
def __init__(self, msg, code=-1, **kw_args):
super().__init__(**kw_args)
self.msg = msg
self.code
def __init__(self):
super().__init__()
self.start = time.time()
self.transport = None
# Parts of an incomplete JSON line. We buffer them until
# getting a newline.
self.parts = []
# recv_count is JSON messages not calls to data_received()
self.recv_count = 0
self.recv_size = 0
self.send_count = 0
self.send_size = 0
self.error_count = 0
def connection_made(self, transport):
'''Handle an incoming client connection.'''
self.transport = transport
def peer_info(self):
'''Return peer info.'''
if self.transport:
return self.transport.get_extra_info('peername')
return None
def connection_lost(self, exc):
'''Handle client disconnection.'''
pass
def data_received(self, data):
'''Handle incoming data (synchronously).
Requests end in newline characters. Pass complete requests to
decode_message for handling.
'''
self.recv_size += len(data)
while True:
npos = data.find(ord('\n'))
if npos == -1:
self.parts.append(data)
break
self.recv_count += 1
tail, data = data[:npos], data[npos + 1:]
parts, self.parts = self.parts, []
parts.append(tail)
self.decode_message(b''.join(parts))
def decode_message(self, message):
'''Decode a binary message and queue it for asynchronous handling.
Messages that cannot be decoded are logged and dropped.
'''
try:
message = message.decode()
except UnicodeDecodeError as e:
msg = 'cannot decode binary bytes: {}'.format(e)
self.logger.warning(msg)
self.send_json_error(msg, self.PARSE_ERROR)
return
try:
message = json.loads(message)
except json.JSONDecodeError as e:
msg = 'cannot decode JSON: {}'.format(e)
self.logger.warning(msg)
self.send_json_error(msg, self.PARSE_ERROR)
return
self.on_json_request(message)
def send_json_notification(self, method, params):
'''Create a json notification.'''
return self.send_json(json_notification_payload(method, params))
def send_json_result(self, result, id_):
'''Send a JSON result.'''
return self.send_json(json_result_payload(result, id_))
def send_json_error(self, message, code, id_=None):
'''Send a JSON error.'''
self.error_count += 1
return self.send_json(json_error_payload(message, code, id_))
def send_json(self, payload):
'''Send a JSON payload.'''
if self.transport.is_closing():
self.logger.info('send_json: connection closing, not sending')
return False
try:
data = (json.dumps(payload) + '\n').encode()
except TypeError:
msg = 'JSON encoding failure: {}'.format(payload)
self.logger.error(msg)
return self.send_json_error(msg, self.INTERNAL_ERROR,
payload.get('id'))
self.send_count += 1
self.send_size += len(data)
self.transport.write(data)
return True
async def handle_json_request(self, request):
'''Asynchronously handle a JSON request.
Handles batch requests. Returns True if the request response
was sent (or if nothing was sent because the request was a
notification). Returns False if the send was aborted because
the connection is closing.
'''
if isinstance(request, list):
payload = self.batch_request_payload(request)
else:
payload = await self.single_request_payload(request)
if not payload:
return True
return self.send_json(payload)
async def batch_request_payload(self, batch):
'''Return the JSON payload corresponding to a batch JSON request.'''
# Batches must have at least one request.
if not batch:
return json_error_payload('empty request list',
self.INVALID_REQUEST)
# PYTHON 3.6: use asynchronous comprehensions when supported
payload = []
for item in request:
item_payload = await self.single_request_payload(item)
if item_payload:
payload.append(item_payload)
return payload
async def single_request_payload(self, request):
'''Return the JSON payload corresponding to a single JSON request.
Return None if the request is a notification.
'''
if not isinstance(request, dict):
return json_error_payload('request must be a dict',
self.INVALID_REQUEST)
id_ = request.get('id')
if not isinstance(id_, self.ID_TYPES):
return json_error_payload('invalid id: {}'.format(id_),
self.INVALID_REQUEST)
try:
result = await self.method_result(request.get('method'),
request.get('params', []))
if id_ is None:
return None
return json_result_payload(result, id_)
except self.RPCError as e:
if id_ is None:
return None
return json_error_payload(e.msg, e.code, id_)
async def method_result(self, method, params):
if not isinstance(method, str):
raise self.RPCError('invalid method: {}'.format(method),
self.INVALID_REQUEST)
if not isinstance(params, list):
raise self.RPCError('params should be an array',
self.INVALID_REQUEST)
handler = self.method_handler(method)
if not handler:
raise self.RPCError('unknown method: {}'.format(method),
self.METHOD_NOT_FOUND)
return await handler(params)
def on_json_request(self, request):
raise NotImplementedError('on_json_request in class {}'.
format(self.__class__.__name__))
def method_handler(self, method):
raise NotImplementedError('method_handler in class {}'.
format(self.__class__.__name__))

452
server/protocol.py

@ -18,6 +18,7 @@ from collections import namedtuple
from functools import partial from functools import partial
from lib.hash import sha256, double_sha256, hash_to_str, hex_str_to_hash from lib.hash import sha256, double_sha256, hash_to_str, hex_str_to_hash
from lib.jsonrpc import JSONRPC, json_notification_payload
from lib.util import LoggedClass from lib.util import LoggedClass
from server.block_processor import BlockProcessor from server.block_processor import BlockProcessor
from server.daemon import DaemonError from server.daemon import DaemonError
@ -25,38 +26,49 @@ from server.irc import IRC
from server.version import VERSION from server.version import VERSION
class RPCError(Exception):
'''RPC handlers raise this error.'''
def json_notification(method, params):
'''Create a json notification.'''
return {'id': None, 'method': method, 'params': params}
class BlockServer(BlockProcessor): class BlockServer(BlockProcessor):
'''Like BlockProcessor but also starts servers when caught up.''' '''Like BlockProcessor but also has a server manager and starts
servers when caught up.'''
def __init__(self, env): def __init__(self, env):
super().__init__(env) super().__init__(env)
self.servers = [] self.server_mgr = ServerManager(self, env)
self.irc = IRC(env) self.bs_caught_up = False
async def caught_up(self, mempool_hashes): async def caught_up(self, mempool_hashes):
await super().caught_up(mempool_hashes) await super().caught_up(mempool_hashes)
if not self.servers: if not self.bs_caught_up:
await self.start_servers() await self.server_mgr.start_servers()
if self.env.irc: self.bs_caught_up = True
self.logger.info('starting IRC coroutine') self.server_mgr.notify(self.height, self.touched)
asyncio.ensure_future(self.irc.start())
else: def stop(self):
self.logger.info('IRC disabled') '''Close the listening servers.'''
ElectrumX.notify(self.height, self.touched) self.server_mgr.stop()
class ServerManager(LoggedClass):
'''Manages the servers.'''
AsyncTask = namedtuple('AsyncTask', 'session job')
async def start_server(self, class_name, kind, host, port, *, ssl=None): def __init__(self, bp, env):
super().__init__()
self.bp = bp
self.env = env
self.servers = []
self.irc = IRC(env)
self.sessions = set()
self.tasks = asyncio.Queue()
self.current_task = None
async def start_server(self, kind, *args, **kw_args):
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
protocol = partial(class_name, self.env, kind) protocol_class = LocalRPC if kind == 'RPC' else ElectrumX
server = loop.create_server(protocol, host, port, ssl=ssl) protocol = partial(protocol_class, self, self.bp, self.env, kind)
server = loop.create_server(protocol, *args, **kw_args)
host, port = args[:2]
try: try:
self.servers.append(await server) self.servers.append(await server)
except asyncio.CancelledError: except asyncio.CancelledError:
@ -69,45 +81,44 @@ class BlockServer(BlockProcessor):
.format(kind, host, port)) .format(kind, host, port))
async def start_servers(self): async def start_servers(self):
'''Start listening on RPC, TCP and SSL ports. '''Connect to IRC and start listening for incoming connections.
Does not start a server if the port wasn't specified. Only connect to IRC if enabled. Start listening on RCP, TCP
and SSL ports only if the port wasn pecified.
''' '''
env = self.env env = self.env
JSONRPC.init(self, self.daemon, self.coin)
if env.rpc_port is not None: if env.rpc_port is not None:
await self.start_server(LocalRPC, 'RPC', 'localhost', env.rpc_port) await self.start_server('RPC', 'localhost', env.rpc_port)
if env.tcp_port is not None: if env.tcp_port is not None:
await self.start_server(ElectrumX, 'TCP', env.host, env.tcp_port) await self.start_server('TCP', env.host, env.tcp_port)
if env.ssl_port is not None: if env.ssl_port is not None:
# FIXME: update if we want to require Python >= 3.5.3 # FIXME: update if we want to require Python >= 3.5.3
sslc = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2) sslc = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
sslc.load_cert_chain(env.ssl_certfile, keyfile=env.ssl_keyfile) sslc.load_cert_chain(env.ssl_certfile, keyfile=env.ssl_keyfile)
await self.start_server(ElectrumX, 'SSL', env.host, await self.start_server('SSL', env.host, env.ssl_port, ssl=sslc)
env.ssl_port, ssl=sslc)
asyncio.ensure_future(self.run_tasks())
if env.irc:
self.logger.info('starting IRC coroutine')
asyncio.ensure_future(self.irc.start())
else:
self.logger.info('IRC disabled')
def notify(self, height, touched):
'''Notify sessions about height changes and touched addresses.'''
sessions = [session for session in self.sessions
if isinstance(session, ElectrumX)]
ElectrumX.notify(sessions, height, touched)
def stop(self): def stop(self):
'''Close the listening servers.''' '''Close the listening servers.'''
for server in self.servers: for server in self.servers:
server.close() server.close()
def irc_peers(self):
return self.irc.peers
AsyncTask = namedtuple('AsyncTask', 'session job')
class SessionManager(LoggedClass):
def __init__(self):
super().__init__()
self.sessions = set()
self.tasks = asyncio.Queue()
self.current_task = None
asyncio.ensure_future(self.run_tasks())
def add_session(self, session): def add_session(self, session):
assert session not in self.sessions assert session not in self.sessions
self.sessions.add(session) self.sessions.add(session)
@ -121,7 +132,7 @@ class SessionManager(LoggedClass):
def add_task(self, session, job): def add_task(self, session, job):
assert session in self.sessions assert session in self.sessions
task = asyncio.ensure_future(job) task = asyncio.ensure_future(job)
self.tasks.put_nowait(AsyncTask(session, task)) self.tasks.put_nowait(self.AsyncTask(session, task))
async def run_tasks(self): async def run_tasks(self):
'''Asynchronously run through the task queue.''' '''Asynchronously run through the task queue.'''
@ -141,104 +152,82 @@ class SessionManager(LoggedClass):
finally: finally:
self.current_task = None self.current_task = None
def irc_peers(self):
return self.irc.peers
def session_count(self):
return len(self.sessions)
class JSONRPC(asyncio.Protocol, LoggedClass): def info(self):
'''Base class that manages a JSONRPC connection.''' '''Returned in the RPC 'getinfo' call.'''
address_count = sum(len(session.hash168s)
for session in self.sessions
if isinstance(session, ElectrumX))
return {
'blocks': self.bp.height,
'peers': len(self.irc_peers()),
'sessions': self.session_count(),
'watched': address_count,
'cached': 0,
}
def __init__(self): def sessions_info(self):
'''Returned to the RPC 'sessions' call.'''
now = time.time()
return [(session.kind,
session.peername(),
len(session.hash168s),
'RPC' if isinstance(session, LocalRPC) else session.client,
session.recv_count, session.recv_size,
session.send_count, session.send_size,
session.error_count,
now - session.start)
for session in self.sessions]
class Session(JSONRPC):
'''Base class of ElectrumX JSON session protocols.'''
def __init__(self, manager, bp, env, kind):
super().__init__() super().__init__()
self.parts = [] self.manager = manager
self.send_count = 0 self.bp = bp
self.send_size = 0 self.env = env
self.error_count = 0 self.daemon = bp.daemon
self.coin = bp.coin
self.kind = kind
self.hash168s = set() self.hash168s = set()
self.start = time.time()
self.client = 'unknown' self.client = 'unknown'
self.peername = 'unknown'
def connection_made(self, transport): def connection_made(self, transport):
'''Handle an incoming client connection.''' '''Handle an incoming client connection.'''
self.transport = transport super().connection_made(transport)
peer = transport.get_extra_info('peername') self.logger.info('connection from {}'.format(self.peername()))
self.peername = '{}:{}'.format(peer[0], peer[1]) self.manager.add_session(self)
self.logger.info('connection from {}'.format(self.peername))
self.SESSION_MGR.add_session(self)
def connection_lost(self, exc): def connection_lost(self, exc):
'''Handle client disconnection.''' '''Handle client disconnection.'''
super().connection_lost(exc)
if self.error_count or self.send_size >= 250000:
self.logger.info('{} disconnected. ' self.logger.info('{} disconnected. '
'Sent {:,d} bytes in {:,d} messages {:,d} errors' 'Sent {:,d} bytes in {:,d} messages {:,d} errors'
.format(self.peername, self.send_size, .format(self.peername(), self.send_size,
self.send_count, self.error_count)) self.send_count, self.error_count))
self.SESSION_MGR.remove_session(self) self.manager.remove_session(self)
def data_received(self, data): def method_handler(self, method):
'''Handle incoming data (synchronously). '''Return the handler that will handle the RPC method.'''
return self.handlers.get(method)
Requests end in newline characters. Pass complete requests to def on_json_request(self, request):
decode_message for handling. '''Queue the request for asynchronous handling.'''
''' self.manager.add_task(self, self.handle_json_request(request))
while True:
npos = data.find(ord('\n'))
if npos == -1:
self.parts.append(data)
break
tail, data = data[:npos], data[npos + 1:]
parts, self.parts = self.parts, []
parts.append(tail)
self.decode_message(b''.join(parts))
def decode_message(self, message):
'''Decode a binary message and queue it for asynchronous handling.'''
try:
message = json.loads(message.decode())
except Exception as e:
self.logger.info('error decoding JSON message: {}'.format(e))
else:
self.SESSION_MGR.add_task(self, self.request_handler(message))
async def request_handler(self, request): def peername(self):
'''Called asynchronously.''' info = self.peer_info()
error = result = None return 'unknown' if not info else '{}:{}'.format(info[0], info[1])
try:
handler = self.rpc_handler(request.get('method'),
request.get('params', []))
result = await handler()
except RPCError as e:
self.error_count += 1
error = {'code': 1, 'message': e.args[0]}
payload = {'id': request.get('id'), 'error': error, 'result': result}
if not self.json_send(payload):
# Let asyncio call connection_lost() so we stop this
# session's tasks
await asyncio.sleep(0)
def json_send(self, payload):
if self.transport.is_closing():
self.logger.info('connection closing, not writing')
return False
data = (json.dumps(payload) + '\n').encode()
self.transport.write(data)
self.send_count += 1
self.send_size += len(data)
return True
def rpc_handler(self, method, params):
handler = None
if isinstance(method, str):
handler = self.handlers.get(method)
if not handler:
self.logger.info('unknown method: {}'.format(method))
raise RPCError('unknown method: {}'.format(method))
if not isinstance(params, list):
raise RPCError('params should be an array')
return partial(handler, self, params)
@classmethod def tx_hash_from_param(self, param):
def tx_hash_from_param(cls, param):
'''Raise an RPCError if the parameter is not a valid transaction '''Raise an RPCError if the parameter is not a valid transaction
hash.''' hash.'''
if isinstance(param, str) and len(param) == 64: if isinstance(param, str) and len(param) == 64:
@ -250,17 +239,15 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
raise RPCError('parameter should be a transaction hash: {}' raise RPCError('parameter should be a transaction hash: {}'
.format(param)) .format(param))
@classmethod def hash168_from_param(self, param):
def hash168_from_param(cls, param):
if isinstance(param, str): if isinstance(param, str):
try: try:
return cls.COIN.address_to_hash168(param) return self.coin.address_to_hash168(param)
except: except:
pass pass
raise RPCError('parameter should be a valid address: {}'.format(param)) raise RPCError('parameter should be a valid address: {}'.format(param))
@classmethod def non_negative_integer_from_param(self, param):
def non_negative_integer_from_param(cls, param):
try: try:
param = int(param) param = int(param)
except ValueError: except ValueError:
@ -272,62 +259,28 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
raise RPCError('param should be a non-negative integer: {}' raise RPCError('param should be a non-negative integer: {}'
.format(param)) .format(param))
@classmethod def extract_hash168(self, params):
def extract_hash168(cls, params):
if len(params) == 1: if len(params) == 1:
return cls.hash168_from_param(params[0]) return self.hash168_from_param(params[0])
raise RPCError('params should contain a single address: {}' raise RPCError('params should contain a single address: {}'
.format(params)) .format(params))
@classmethod def extract_non_negative_integer(self, params):
def extract_non_negative_integer(cls, params):
if len(params) == 1: if len(params) == 1:
return cls.non_negative_integer_from_param(params[0]) return self.non_negative_integer_from_param(params[0])
raise RPCError('params should contain a non-negative integer: {}' raise RPCError('params should contain a non-negative integer: {}'
.format(params)) .format(params))
@classmethod def require_empty_params(self, params):
def require_empty_params(cls, params):
if params: if params:
raise RPCError('params should be empty: {}'.format(params)) raise RPCError('params should be empty: {}'.format(params))
@classmethod
def init(cls, block_processor, daemon, coin):
cls.BLOCK_PROCESSOR = block_processor
cls.DAEMON = daemon
cls.COIN = coin
cls.SESSION_MGR = SessionManager()
@classmethod
def irc_peers(cls):
return cls.BLOCK_PROCESSOR.irc_peers()
@classmethod
def height(cls):
'''Return the current height.'''
return cls.BLOCK_PROCESSOR.height
@classmethod class ElectrumX(Session):
def electrum_header(cls, height=None):
'''Return the binary header at the given height.'''
if not 0 <= height <= cls.height():
raise RPCError('height {:,d} out of range'.format(height))
header = cls.BLOCK_PROCESSOR.read_headers(height, 1)
return cls.COIN.electrum_header(header, height)
@classmethod
def current_electrum_header(cls):
'''Used as response to a headers subscription request.'''
return cls.electrum_header(cls.height())
class ElectrumX(JSONRPC):
'''A TCP server that handles incoming Electrum connections.''' '''A TCP server that handles incoming Electrum connections.'''
def __init__(self, env, kind): def __init__(self, *args):
super().__init__() super().__init__(*args)
self.env = env
self.kind = kind
self.subscribe_headers = False self.subscribe_headers = False
self.subscribe_height = False self.subscribe_height = False
self.notified_height = None self.notified_height = None
@ -342,54 +295,62 @@ class ElectrumX(JSONRPC):
'banner donation_address peers.subscribe version'), 'banner donation_address peers.subscribe version'),
] ]
self.handlers = {'.'.join([prefix, suffix]): self.handlers = {'.'.join([prefix, suffix]):
getattr(self.__class__, suffix.replace('.', '_')) getattr(self, suffix.replace('.', '_'))
for prefix, suffixes in rpcs for prefix, suffixes in rpcs
for suffix in suffixes.split()} for suffix in suffixes.split()}
@classmethod @classmethod
def watched_address_count(cls): def notify(cls, sessions, height, touched):
sessions = cls.SESSION_MGR.sessions headers_payload = height_payload = None
return sum(len(session.hash168s) for session in sessions)
@classmethod for session in sessions:
def notify(cls, height, touched): if height != session.notified_height:
'''Notify electrum clients about height changes and touched session.notified_height = height
addresses.''' if session.subscribe_headers:
headers_payload = json_notification( if headers_payload is None:
headers_payload = json_notification_payload(
'blockchain.headers.subscribe', 'blockchain.headers.subscribe',
(cls.electrum_header(height), ), (session.electrum_header(height), ),
) )
height_payload = json_notification( session.send_json(headers_payload)
if session.subscribe_height:
if height_payload is None:
height_payload = json_notification_payload(
'blockchain.numblocks.subscribe', 'blockchain.numblocks.subscribe',
(height, ), (height, ),
) )
hash168_to_address = cls.COIN.hash168_to_address session.send_json(height_payload)
for session in cls.SESSION_MGR.sessions:
if not isinstance(session, ElectrumX):
continue
if height != session.notified_height:
session.notified_height = height
if session.subscribe_headers:
session.json_send(headers_payload)
if session.subscribe_height:
session.json_send(height_payload)
hash168_to_address = session.coin.hash168_to_address
for hash168 in session.hash168s.intersection(touched): for hash168 in session.hash168s.intersection(touched):
address = hash168_to_address(hash168) address = hash168_to_address(hash168)
status = cls.address_status(hash168) status = session.address_status(hash168)
payload = json_notification('blockchain.address.subscribe', payload = json_notification_payload(
(address, status)) 'blockchain.address.subscribe', (address, status))
session.json_send(payload) session.send_json(payload)
@classmethod def height(self):
def address_status(cls, hash168): '''Return the block processor's current height.'''
return self.bp.height
def current_electrum_header(self):
'''Used as response to a headers subscription request.'''
return self.electrum_header(self.height())
def electrum_header(self, height):
'''Return the binary header at the given height.'''
if not 0 <= height <= self.height():
raise RPCError('height {:,d} out of range'.format(height))
header = self.bp.read_headers(height, 1)
return self.coin.electrum_header(header, height)
def address_status(self, hash168):
'''Returns status as 32 bytes.''' '''Returns status as 32 bytes.'''
# Note history is ordered and mempool unordered in electrum-server # Note history is ordered and mempool unordered in electrum-server
# For mempool, height is -1 if unconfirmed txins, otherwise 0 # For mempool, height is -1 if unconfirmed txins, otherwise 0
history = cls.BLOCK_PROCESSOR.get_history(hash168) history = self.bp.get_history(hash168)
mempool = cls.BLOCK_PROCESSOR.mempool_transactions(hash168) mempool = self.bp.mempool_transactions(hash168)
status = ''.join('{}:{:d}:'.format(hash_to_str(tx_hash), height) status = ''.join('{}:{:d}:'.format(hash_to_str(tx_hash), height)
for tx_hash, height in history) for tx_hash, height in history)
@ -399,11 +360,10 @@ class ElectrumX(JSONRPC):
return sha256(status.encode()).hex() return sha256(status.encode()).hex()
return None return None
@classmethod async def tx_merkle(self, tx_hash, height):
async def tx_merkle(cls, tx_hash, height):
'''tx_hash is a hex string.''' '''tx_hash is a hex string.'''
hex_hashes = await cls.DAEMON.block_hex_hashes(height, 1) hex_hashes = await self.daemon.block_hex_hashes(height, 1)
block = await cls.DAEMON.deserialised_block(hex_hashes[0]) block = await self.daemon.deserialised_block(hex_hashes[0])
tx_hashes = block['tx'] tx_hashes = block['tx']
# This will throw if the tx_hash is bad # This will throw if the tx_hash is bad
pos = tx_hashes.index(tx_hash) pos = tx_hashes.index(tx_hash)
@ -422,16 +382,11 @@ class ElectrumX(JSONRPC):
return {"block_height": height, "merkle": merkle_branch, "pos": pos} return {"block_height": height, "merkle": merkle_branch, "pos": pos}
@classmethod def get_history(self, hash168):
def height(cls):
return cls.BLOCK_PROCESSOR.height
@classmethod
def get_history(cls, hash168):
# Note history is ordered and mempool unordered in electrum-server # Note history is ordered and mempool unordered in electrum-server
# For mempool, height is -1 if unconfirmed txins, otherwise 0 # For mempool, height is -1 if unconfirmed txins, otherwise 0
history = cls.BLOCK_PROCESSOR.get_history(hash168, limit=None) history = self.bp.get_history(hash168, limit=None)
mempool = cls.BLOCK_PROCESSOR.mempool_transactions(hash168) mempool = self.bp.mempool_transactions(hash168)
conf = tuple({'tx_hash': hash_to_str(tx_hash), 'height': height} conf = tuple({'tx_hash': hash_to_str(tx_hash), 'height': height}
for tx_hash, height in history) for tx_hash, height in history)
@ -439,24 +394,21 @@ class ElectrumX(JSONRPC):
for tx_hash, fee, unconfirmed in mempool) for tx_hash, fee, unconfirmed in mempool)
return conf + unconf return conf + unconf
@classmethod def get_chunk(self, index):
def get_chunk(cls, index):
'''Return header chunk as hex. Index is a non-negative integer.''' '''Return header chunk as hex. Index is a non-negative integer.'''
chunk_size = cls.COIN.CHUNK_SIZE chunk_size = self.coin.CHUNK_SIZE
next_height = cls.height() + 1 next_height = self.height() + 1
start_height = min(index * chunk_size, next_height) start_height = min(index * chunk_size, next_height)
count = min(next_height - start_height, chunk_size) count = min(next_height - start_height, chunk_size)
return cls.BLOCK_PROCESSOR.read_headers(start_height, count).hex() return self.bp.read_headers(start_height, count).hex()
@classmethod def get_balance(self, hash168):
def get_balance(cls, hash168): confirmed = self.bp.get_balance(hash168)
confirmed = cls.BLOCK_PROCESSOR.get_balance(hash168) unconfirmed = self.bp.mempool_value(hash168)
unconfirmed = cls.BLOCK_PROCESSOR.mempool_value(hash168)
return {'confirmed': confirmed, 'unconfirmed': unconfirmed} return {'confirmed': confirmed, 'unconfirmed': unconfirmed}
@classmethod def list_unspent(self, hash168):
def list_unspent(cls, hash168): utxos = self.bp.get_utxos_sorted(hash168)
utxos = cls.BLOCK_PROCESSOR.get_utxos_sorted(hash168)
return tuple({'tx_hash': hash_to_str(utxo.tx_hash), return tuple({'tx_hash': hash_to_str(utxo.tx_hash),
'tx_pos': utxo.tx_pos, 'height': utxo.height, 'tx_pos': utxo.tx_pos, 'height': utxo.height,
'value': utxo.value} 'value': utxo.value}
@ -498,7 +450,7 @@ class ElectrumX(JSONRPC):
return self.electrum_header(height) return self.electrum_header(height)
async def estimatefee(self, params): async def estimatefee(self, params):
return await self.DAEMON.estimatefee(params) return await self.daemon.estimatefee(params)
async def headers_subscribe(self, params): async def headers_subscribe(self, params):
self.require_empty_params(params) self.require_empty_params(params)
@ -514,7 +466,7 @@ class ElectrumX(JSONRPC):
'''The minimum fee a low-priority tx must pay in order to be accepted '''The minimum fee a low-priority tx must pay in order to be accepted
to the daemon's memory pool.''' to the daemon's memory pool.'''
self.require_empty_params(params) self.require_empty_params(params)
return await self.DAEMON.relayfee() return await self.daemon.relayfee()
async def transaction_broadcast(self, params): async def transaction_broadcast(self, params):
'''Pass through the parameters to the daemon. '''Pass through the parameters to the daemon.
@ -525,7 +477,7 @@ class ElectrumX(JSONRPC):
user interface job here. user interface job here.
''' '''
try: try:
tx_hash = await self.DAEMON.sendrawtransaction(params) tx_hash = await self.daemon.sendrawtransaction(params)
self.logger.info('sent tx: {}'.format(tx_hash)) self.logger.info('sent tx: {}'.format(tx_hash))
return tx_hash return tx_hash
except DaemonError as e: except DaemonError as e:
@ -550,7 +502,7 @@ class ElectrumX(JSONRPC):
# in anticipation it might be dropped in the future. # in anticipation it might be dropped in the future.
if 1 <= len(params) <= 2: if 1 <= len(params) <= 2:
tx_hash = self.tx_hash_from_param(params[0]) tx_hash = self.tx_hash_from_param(params[0])
return await self.DAEMON.getrawtransaction(tx_hash) return await self.daemon.getrawtransaction(tx_hash)
raise RPCError('params wrong length: {}'.format(params)) raise RPCError('params wrong length: {}'.format(params))
@ -567,9 +519,9 @@ class ElectrumX(JSONRPC):
tx_hash = self.tx_hash_from_param(params[0]) tx_hash = self.tx_hash_from_param(params[0])
index = self.non_negative_integer_from_param(params[1]) index = self.non_negative_integer_from_param(params[1])
tx_hash = hex_str_to_hash(tx_hash) tx_hash = hex_str_to_hash(tx_hash)
hash168 = self.BLOCK_PROCESSOR.get_utxo_hash168(tx_hash, index) hash168 = self.bp.get_utxo_hash168(tx_hash, index)
if hash168: if hash168:
return self.COIN.hash168_to_address(hash168) return self.coin.hash168_to_address(hash168)
return None return None
raise RPCError('params should contain a transaction hash and index') raise RPCError('params should contain a transaction hash and index')
@ -604,7 +556,7 @@ class ElectrumX(JSONRPC):
subscription. subscription.
''' '''
self.require_empty_params(params) self.require_empty_params(params)
return list(self.irc_peers().values()) return list(self.manager.irc_peers().values())
async def version(self, params): async def version(self, params):
'''Return the server version as a string.''' '''Return the server version as a string.'''
@ -614,37 +566,25 @@ class ElectrumX(JSONRPC):
return VERSION return VERSION
class LocalRPC(JSONRPC): class LocalRPC(Session):
'''A local TCP RPC server for querying status.''' '''A local TCP RPC server for querying status.'''
def __init__(self, env, kind): def __init__(self, *args):
super().__init__() super().__init__(*args)
cmds = 'getinfo sessions numsessions peers numpeers'.split() cmds = 'getinfo sessions numsessions peers numpeers'.split()
self.handlers = {cmd: getattr(self.__class__, cmd) for cmd in cmds} self.handlers = {cmd: getattr(self, cmd) for cmd in cmds}
self.env = env
self.kind = kind
async def getinfo(self, params): async def getinfo(self, params):
return { return self.manager.info()
'blocks': self.height(),
'peers': len(self.irc_peers()),
'sessions': len(self.SESSION_MGR.sessions),
'watched': ElectrumX.watched_address_count(),
'cached': 0,
}
async def sessions(self, params): async def sessions(self, params):
now = time.time() return self.manager.sessions_info()
return [(session.kind,
'this RPC client' if session == self else session.peername,
len(session.hash168s), session.client, now - session.start)
for session in self.SESSION_MGR.sessions]
async def numsessions(self, params): async def numsessions(self, params):
return len(self.SESSION_MGR.sessions) return self.manager.session_count()
async def peers(self, params): async def peers(self, params):
return self.irc_peers() return self.manager.irc_peers()
async def numpeers(self, params): async def numpeers(self, params):
return len(self.irc_peers()) return len(self.manager.irc_peers())

1
setup.py

@ -9,6 +9,7 @@ setuptools.setup(
python_requires='>=3.5', python_requires='>=3.5',
# "irc" package is only required if IRC connectivity is enabled # "irc" package is only required if IRC connectivity is enabled
# via environment variables, in which case I've tested with 15.0.4 # via environment variables, in which case I've tested with 15.0.4
# "x11_hash" package (1.4) is required to sync DASH network.
install_requires=['plyvel', 'aiohttp >= 1'], install_requires=['plyvel', 'aiohttp >= 1'],
packages=setuptools.find_packages(), packages=setuptools.find_packages(),
description='ElectrumX Server', description='ElectrumX Server',

Loading…
Cancel
Save