Browse Source

Merge branch 'protocol' into develop

master
Neil Booth 8 years ago
parent
commit
ad46fbb61f
  1. 28
      electrumx_rpc.py
  2. 249
      lib/jsonrpc.py
  3. 470
      server/protocol.py

28
electrumx_rpc.py

@ -32,7 +32,7 @@ class RPCClient(asyncio.Protocol):
def send(self, method, params): def send(self, method, params):
self.method = method self.method = method
payload = {'method': method, 'params': params} payload = {'method': method, 'params': params, 'id': 'RPC'}
data = json.dumps(payload) + '\n' data = json.dumps(payload) + '\n'
self.transport.write(data.encode()) self.transport.write(data.encode())
@ -44,12 +44,28 @@ class RPCClient(asyncio.Protocol):
if error: if error:
print("ERROR: {}".format(error)) print("ERROR: {}".format(error))
else: else:
def data_fmt(count, size):
return '{:,d}/{:,d}KB'.format(count, size // 1024)
def time_fmt(t):
t = int(t)
return ('{:3d}:{:02d}:{:02d}'
.format(t // 3600, (t % 3600) // 60, t % 60))
if self.method == 'sessions': if self.method == 'sessions':
fmt = '{:<4} {:>23} {:>7} {:>15} {:>7}' fmt = ('{:<4} {:>23} {:>15} {:>5} '
print(fmt.format('Type', 'Peer', 'Subs', 'Client', 'Time')) '{:>7} {:>7} {:>7} {:>7} {:>5} {:>9}')
for kind, peer, subs, client, time in result: print(fmt.format('Type', 'Peer', 'Client', 'Subs',
print(fmt.format(kind, peer, '{:,d}'.format(subs), 'Snt #', 'Snt MB', 'Rcv #', 'Rcv MB',
client, '{:,d}'.format(int(time)))) 'Errs', 'Time'))
for (kind, peer, subs, client, recv_count, recv_size,
send_count, send_size, error_count, time) in result:
print(fmt.format(kind, peer, client, '{:,d}'.format(subs),
'{:,d}'.format(recv_count),
'{:,.1f}'.format(recv_size / 1048576),
'{:,d}'.format(send_count),
'{:,.1f}'.format(send_size / 1048576),
'{:,d}'.format(error_count),
time_fmt(time)))
else: else:
pprint.pprint(result, indent=4) pprint.pprint(result, indent=4)

249
lib/jsonrpc.py

@ -0,0 +1,249 @@
# Copyright (c) 2016, Neil Booth
#
# All rights reserved.
#
# See the file "LICENCE" for information about the copyright
# and warranty status of this software.
'''Class for handling JSON RPC 2.0 connections, server or client.'''
import asyncio
import json
import numbers
import time
from lib.util import LoggedClass
def json_result_payload(result, id_):
# We should not respond to notifications
assert id_ is not None
return {'jsonrpc': '2.0', 'error': None, 'result': result, 'id': id_}
def json_error_payload(message, code, id_=None):
error = {'message': message, 'code': code}
return {'jsonrpc': '2.0', 'error': error, 'result': None, 'id': id_}
def json_notification_payload(method, params):
return {'jsonrpc': '2.0', 'id': None, 'method': method, 'params': params}
class JSONRPC(asyncio.Protocol, LoggedClass):
'''Manages a JSONRPC connection.
Assumes JSON messages are newline-separated and that newlines
cannot appear in the JSON other than to separate lines.
Derived classes need to implement the synchronous functions
on_json_request() and method_handler(). They probably also want
to override connection_made() and connection_lost() but should be
sure to call the implementation in this base class first.
on_json_request() is passed a JSON request as a python object
after decoding. It should arrange to pass on to the asynchronous
handle_json_request() method.
method_handler() takes a method string and should return a function
that can be passed a parameters array, or None for an unknown method.
Handlers should raise an RPCError on error.
'''
# See http://www.jsonrpc.org/specification
PARSE_ERROR = -32700
INVALID_REQUEST = -32600
METHOD_NOT_FOUND = -32601
INVALID_PARAMS = -32602
INTERAL_ERROR = -32603
ID_TYPES = (type(None), str, numbers.Number)
class RPCError(Exception):
'''RPC handlers raise this error.'''
def __init__(self, msg, code=-1, **kw_args):
super().__init__(**kw_args)
self.msg = msg
self.code
def __init__(self):
super().__init__()
self.start = time.time()
self.transport = None
# Parts of an incomplete JSON line. We buffer them until
# getting a newline.
self.parts = []
# recv_count is JSON messages not calls to data_received()
self.recv_count = 0
self.recv_size = 0
self.send_count = 0
self.send_size = 0
self.error_count = 0
def connection_made(self, transport):
'''Handle an incoming client connection.'''
self.transport = transport
def peer_info(self):
'''Return peer info.'''
if self.transport:
return self.transport.get_extra_info('peername')
return None
def connection_lost(self, exc):
'''Handle client disconnection.'''
pass
def data_received(self, data):
'''Handle incoming data (synchronously).
Requests end in newline characters. Pass complete requests to
decode_message for handling.
'''
self.recv_size += len(data)
while True:
npos = data.find(ord('\n'))
if npos == -1:
self.parts.append(data)
break
self.recv_count += 1
tail, data = data[:npos], data[npos + 1:]
parts, self.parts = self.parts, []
parts.append(tail)
self.decode_message(b''.join(parts))
def decode_message(self, message):
'''Decode a binary message and queue it for asynchronous handling.
Messages that cannot be decoded are logged and dropped.
'''
try:
message = message.decode()
except UnicodeDecodeError as e:
msg = 'cannot decode binary bytes: {}'.format(e)
self.logger.warning(msg)
self.send_json_error(msg, self.PARSE_ERROR)
return
try:
message = json.loads(message)
except json.JSONDecodeError as e:
msg = 'cannot decode JSON: {}'.format(e)
self.logger.warning(msg)
self.send_json_error(msg, self.PARSE_ERROR)
return
self.on_json_request(message)
def send_json_notification(self, method, params):
'''Create a json notification.'''
return self.send_json(json_notification_payload(method, params))
def send_json_result(self, result, id_):
'''Send a JSON result.'''
return self.send_json(json_result_payload(result, id_))
def send_json_error(self, message, code, id_=None):
'''Send a JSON error.'''
self.error_count += 1
return self.send_json(json_error_payload(message, code, id_))
def send_json(self, payload):
'''Send a JSON payload.'''
if self.transport.is_closing():
self.logger.info('send_json: connection closing, not sending')
return False
try:
data = (json.dumps(payload) + '\n').encode()
except TypeError:
msg = 'JSON encoding failure: {}'.format(payload)
self.logger.error(msg)
return self.send_json_error(msg, self.INTERNAL_ERROR,
payload.get('id'))
self.send_count += 1
self.send_size += len(data)
self.transport.write(data)
return True
async def handle_json_request(self, request):
'''Asynchronously handle a JSON request.
Handles batch requests. Returns True if the request response
was sent (or if nothing was sent because the request was a
notification). Returns False if the send was aborted because
the connection is closing.
'''
if isinstance(request, list):
payload = self.batch_request_payload(request)
else:
payload = await self.single_request_payload(request)
if not payload:
return True
return self.send_json(payload)
async def batch_request_payload(self, batch):
'''Return the JSON payload corresponding to a batch JSON request.'''
# Batches must have at least one request.
if not batch:
return json_error_payload('empty request list',
self.INVALID_REQUEST)
# PYTHON 3.6: use asynchronous comprehensions when supported
payload = []
for item in request:
item_payload = await self.single_request_payload(item)
if item_payload:
payload.append(item_payload)
return payload
async def single_request_payload(self, request):
'''Return the JSON payload corresponding to a single JSON request.
Return None if the request is a notification.
'''
if not isinstance(request, dict):
return json_error_payload('request must be a dict',
self.INVALID_REQUEST)
id_ = request.get('id')
if not isinstance(id_, self.ID_TYPES):
return json_error_payload('invalid id: {}'.format(id_),
self.INVALID_REQUEST)
try:
result = await self.method_result(request.get('method'),
request.get('params', []))
if id_ is None:
return None
return json_result_payload(result, id_)
except self.RPCError as e:
if id_ is None:
return None
return json_error_payload(e.msg, e.code, id_)
async def method_result(self, method, params):
if not isinstance(method, str):
raise self.RPCError('invalid method: {}'.format(method),
self.INVALID_REQUEST)
if not isinstance(params, list):
raise self.RPCError('params should be an array',
self.INVALID_REQUEST)
handler = self.method_handler(method)
if not handler:
raise self.RPCError('unknown method: {}'.format(method),
self.METHOD_NOT_FOUND)
return await handler(params)
def on_json_request(self, request):
raise NotImplementedError('on_json_request in class {}'.
format(self.__class__.__name__))
def method_handler(self, method):
raise NotImplementedError('method_handler in class {}'.
format(self.__class__.__name__))

470
server/protocol.py

@ -18,6 +18,7 @@ from collections import namedtuple
from functools import partial from functools import partial
from lib.hash import sha256, double_sha256, hash_to_str, hex_str_to_hash from lib.hash import sha256, double_sha256, hash_to_str, hex_str_to_hash
from lib.jsonrpc import JSONRPC, json_notification_payload
from lib.util import LoggedClass from lib.util import LoggedClass
from server.block_processor import BlockProcessor from server.block_processor import BlockProcessor
from server.daemon import DaemonError from server.daemon import DaemonError
@ -25,38 +26,49 @@ from server.irc import IRC
from server.version import VERSION from server.version import VERSION
class RPCError(Exception): class BlockServer(BlockProcessor):
'''RPC handlers raise this error.''' '''Like BlockProcessor but also has a server manager and starts
servers when caught up.'''
def __init__(self, env):
super().__init__(env)
self.server_mgr = ServerManager(self, env)
self.bs_caught_up = False
async def caught_up(self, mempool_hashes):
await super().caught_up(mempool_hashes)
if not self.bs_caught_up:
await self.server_mgr.start_servers()
self.bs_caught_up = True
self.server_mgr.notify(self.height, self.touched)
def stop(self):
'''Close the listening servers.'''
self.server_mgr.stop()
def json_notification(method, params):
'''Create a json notification.'''
return {'id': None, 'method': method, 'params': params}
class ServerManager(LoggedClass):
'''Manages the servers.'''
class BlockServer(BlockProcessor): AsyncTask = namedtuple('AsyncTask', 'session job')
'''Like BlockProcessor but also starts servers when caught up.'''
def __init__(self, env): def __init__(self, bp, env):
super().__init__(env) super().__init__()
self.bp = bp
self.env = env
self.servers = [] self.servers = []
self.irc = IRC(env) self.irc = IRC(env)
self.sessions = set()
self.tasks = asyncio.Queue()
self.current_task = None
async def caught_up(self, mempool_hashes): async def start_server(self, kind, *args, **kw_args):
await super().caught_up(mempool_hashes)
if not self.servers:
await self.start_servers()
if self.env.irc:
self.logger.info('starting IRC coroutine')
asyncio.ensure_future(self.irc.start())
else:
self.logger.info('IRC disabled')
ElectrumX.notify(self.height, self.touched)
async def start_server(self, class_name, kind, host, port, *, ssl=None):
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
protocol = partial(class_name, self.env, kind) protocol_class = LocalRPC if kind == 'RPC' else ElectrumX
server = loop.create_server(protocol, host, port, ssl=ssl) protocol = partial(protocol_class, self, self.bp, self.env, kind)
server = loop.create_server(protocol, *args, **kw_args)
host, port = args[:2]
try: try:
self.servers.append(await server) self.servers.append(await server)
except asyncio.CancelledError: except asyncio.CancelledError:
@ -69,45 +81,44 @@ class BlockServer(BlockProcessor):
.format(kind, host, port)) .format(kind, host, port))
async def start_servers(self): async def start_servers(self):
'''Start listening on RPC, TCP and SSL ports. '''Connect to IRC and start listening for incoming connections.
Does not start a server if the port wasn't specified. Only connect to IRC if enabled. Start listening on RCP, TCP
and SSL ports only if the port wasn pecified.
''' '''
env = self.env env = self.env
JSONRPC.init(self, self.daemon, self.coin)
if env.rpc_port is not None: if env.rpc_port is not None:
await self.start_server(LocalRPC, 'RPC', 'localhost', env.rpc_port) await self.start_server('RPC', 'localhost', env.rpc_port)
if env.tcp_port is not None: if env.tcp_port is not None:
await self.start_server(ElectrumX, 'TCP', env.host, env.tcp_port) await self.start_server('TCP', env.host, env.tcp_port)
if env.ssl_port is not None: if env.ssl_port is not None:
# FIXME: update if we want to require Python >= 3.5.3 # FIXME: update if we want to require Python >= 3.5.3
sslc = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2) sslc = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
sslc.load_cert_chain(env.ssl_certfile, keyfile=env.ssl_keyfile) sslc.load_cert_chain(env.ssl_certfile, keyfile=env.ssl_keyfile)
await self.start_server(ElectrumX, 'SSL', env.host, await self.start_server('SSL', env.host, env.ssl_port, ssl=sslc)
env.ssl_port, ssl=sslc)
asyncio.ensure_future(self.run_tasks())
if env.irc:
self.logger.info('starting IRC coroutine')
asyncio.ensure_future(self.irc.start())
else:
self.logger.info('IRC disabled')
def notify(self, height, touched):
'''Notify sessions about height changes and touched addresses.'''
sessions = [session for session in self.sessions
if isinstance(session, ElectrumX)]
ElectrumX.notify(sessions, height, touched)
def stop(self): def stop(self):
'''Close the listening servers.''' '''Close the listening servers.'''
for server in self.servers: for server in self.servers:
server.close() server.close()
def irc_peers(self):
return self.irc.peers
AsyncTask = namedtuple('AsyncTask', 'session job')
class SessionManager(LoggedClass):
def __init__(self):
super().__init__()
self.sessions = set()
self.tasks = asyncio.Queue()
self.current_task = None
asyncio.ensure_future(self.run_tasks())
def add_session(self, session): def add_session(self, session):
assert session not in self.sessions assert session not in self.sessions
self.sessions.add(session) self.sessions.add(session)
@ -121,7 +132,7 @@ class SessionManager(LoggedClass):
def add_task(self, session, job): def add_task(self, session, job):
assert session in self.sessions assert session in self.sessions
task = asyncio.ensure_future(job) task = asyncio.ensure_future(job)
self.tasks.put_nowait(AsyncTask(session, task)) self.tasks.put_nowait(self.AsyncTask(session, task))
async def run_tasks(self): async def run_tasks(self):
'''Asynchronously run through the task queue.''' '''Asynchronously run through the task queue.'''
@ -141,104 +152,82 @@ class SessionManager(LoggedClass):
finally: finally:
self.current_task = None self.current_task = None
def irc_peers(self):
return self.irc.peers
def session_count(self):
return len(self.sessions)
def info(self):
'''Returned in the RPC 'getinfo' call.'''
address_count = sum(len(session.hash168s)
for session in self.sessions
if isinstance(session, ElectrumX))
return {
'blocks': self.bp.height,
'peers': len(self.irc_peers()),
'sessions': self.session_count(),
'watched': address_count,
'cached': 0,
}
def sessions_info(self):
'''Returned to the RPC 'sessions' call.'''
now = time.time()
return [(session.kind,
session.peername(),
len(session.hash168s),
'RPC' if isinstance(session, LocalRPC) else session.client,
session.recv_count, session.recv_size,
session.send_count, session.send_size,
session.error_count,
now - session.start)
for session in self.sessions]
class JSONRPC(asyncio.Protocol, LoggedClass):
'''Base class that manages a JSONRPC connection.'''
def __init__(self): class Session(JSONRPC):
'''Base class of ElectrumX JSON session protocols.'''
def __init__(self, manager, bp, env, kind):
super().__init__() super().__init__()
self.parts = [] self.manager = manager
self.send_count = 0 self.bp = bp
self.send_size = 0 self.env = env
self.error_count = 0 self.daemon = bp.daemon
self.coin = bp.coin
self.kind = kind
self.hash168s = set() self.hash168s = set()
self.start = time.time()
self.client = 'unknown' self.client = 'unknown'
self.peername = 'unknown'
def connection_made(self, transport): def connection_made(self, transport):
'''Handle an incoming client connection.''' '''Handle an incoming client connection.'''
self.transport = transport super().connection_made(transport)
peer = transport.get_extra_info('peername') self.logger.info('connection from {}'.format(self.peername()))
self.peername = '{}:{}'.format(peer[0], peer[1]) self.manager.add_session(self)
self.logger.info('connection from {}'.format(self.peername))
self.SESSION_MGR.add_session(self)
def connection_lost(self, exc): def connection_lost(self, exc):
'''Handle client disconnection.''' '''Handle client disconnection.'''
self.logger.info('{} disconnected. ' super().connection_lost(exc)
'Sent {:,d} bytes in {:,d} messages {:,d} errors' if self.error_count or self.send_size >= 250000:
.format(self.peername, self.send_size, self.logger.info('{} disconnected. '
self.send_count, self.error_count)) 'Sent {:,d} bytes in {:,d} messages {:,d} errors'
self.SESSION_MGR.remove_session(self) .format(self.peername(), self.send_size,
self.send_count, self.error_count))
def data_received(self, data): self.manager.remove_session(self)
'''Handle incoming data (synchronously).
def method_handler(self, method):
Requests end in newline characters. Pass complete requests to '''Return the handler that will handle the RPC method.'''
decode_message for handling. return self.handlers.get(method)
'''
while True: def on_json_request(self, request):
npos = data.find(ord('\n')) '''Queue the request for asynchronous handling.'''
if npos == -1: self.manager.add_task(self, self.handle_json_request(request))
self.parts.append(data)
break def peername(self):
tail, data = data[:npos], data[npos + 1:] info = self.peer_info()
parts, self.parts = self.parts, [] return 'unknown' if not info else '{}:{}'.format(info[0], info[1])
parts.append(tail)
self.decode_message(b''.join(parts)) def tx_hash_from_param(self, param):
def decode_message(self, message):
'''Decode a binary message and queue it for asynchronous handling.'''
try:
message = json.loads(message.decode())
except Exception as e:
self.logger.info('error decoding JSON message: {}'.format(e))
else:
self.SESSION_MGR.add_task(self, self.request_handler(message))
async def request_handler(self, request):
'''Called asynchronously.'''
error = result = None
try:
handler = self.rpc_handler(request.get('method'),
request.get('params', []))
result = await handler()
except RPCError as e:
self.error_count += 1
error = {'code': 1, 'message': e.args[0]}
payload = {'id': request.get('id'), 'error': error, 'result': result}
if not self.json_send(payload):
# Let asyncio call connection_lost() so we stop this
# session's tasks
await asyncio.sleep(0)
def json_send(self, payload):
if self.transport.is_closing():
self.logger.info('connection closing, not writing')
return False
data = (json.dumps(payload) + '\n').encode()
self.transport.write(data)
self.send_count += 1
self.send_size += len(data)
return True
def rpc_handler(self, method, params):
handler = None
if isinstance(method, str):
handler = self.handlers.get(method)
if not handler:
self.logger.info('unknown method: {}'.format(method))
raise RPCError('unknown method: {}'.format(method))
if not isinstance(params, list):
raise RPCError('params should be an array')
return partial(handler, self, params)
@classmethod
def tx_hash_from_param(cls, param):
'''Raise an RPCError if the parameter is not a valid transaction '''Raise an RPCError if the parameter is not a valid transaction
hash.''' hash.'''
if isinstance(param, str) and len(param) == 64: if isinstance(param, str) and len(param) == 64:
@ -250,17 +239,15 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
raise RPCError('parameter should be a transaction hash: {}' raise RPCError('parameter should be a transaction hash: {}'
.format(param)) .format(param))
@classmethod def hash168_from_param(self, param):
def hash168_from_param(cls, param):
if isinstance(param, str): if isinstance(param, str):
try: try:
return cls.COIN.address_to_hash168(param) return self.coin.address_to_hash168(param)
except: except:
pass pass
raise RPCError('parameter should be a valid address: {}'.format(param)) raise RPCError('parameter should be a valid address: {}'.format(param))
@classmethod def non_negative_integer_from_param(self, param):
def non_negative_integer_from_param(cls, param):
try: try:
param = int(param) param = int(param)
except ValueError: except ValueError:
@ -272,62 +259,28 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
raise RPCError('param should be a non-negative integer: {}' raise RPCError('param should be a non-negative integer: {}'
.format(param)) .format(param))
@classmethod def extract_hash168(self, params):
def extract_hash168(cls, params):
if len(params) == 1: if len(params) == 1:
return cls.hash168_from_param(params[0]) return self.hash168_from_param(params[0])
raise RPCError('params should contain a single address: {}' raise RPCError('params should contain a single address: {}'
.format(params)) .format(params))
@classmethod def extract_non_negative_integer(self, params):
def extract_non_negative_integer(cls, params):
if len(params) == 1: if len(params) == 1:
return cls.non_negative_integer_from_param(params[0]) return self.non_negative_integer_from_param(params[0])
raise RPCError('params should contain a non-negative integer: {}' raise RPCError('params should contain a non-negative integer: {}'
.format(params)) .format(params))
@classmethod def require_empty_params(self, params):
def require_empty_params(cls, params):
if params: if params:
raise RPCError('params should be empty: {}'.format(params)) raise RPCError('params should be empty: {}'.format(params))
@classmethod
def init(cls, block_processor, daemon, coin):
cls.BLOCK_PROCESSOR = block_processor
cls.DAEMON = daemon
cls.COIN = coin
cls.SESSION_MGR = SessionManager()
@classmethod
def irc_peers(cls):
return cls.BLOCK_PROCESSOR.irc_peers()
@classmethod
def height(cls):
'''Return the current height.'''
return cls.BLOCK_PROCESSOR.height
@classmethod
def electrum_header(cls, height=None):
'''Return the binary header at the given height.'''
if not 0 <= height <= cls.height():
raise RPCError('height {:,d} out of range'.format(height))
header = cls.BLOCK_PROCESSOR.read_headers(height, 1)
return cls.COIN.electrum_header(header, height)
@classmethod class ElectrumX(Session):
def current_electrum_header(cls):
'''Used as response to a headers subscription request.'''
return cls.electrum_header(cls.height())
class ElectrumX(JSONRPC):
'''A TCP server that handles incoming Electrum connections.''' '''A TCP server that handles incoming Electrum connections.'''
def __init__(self, env, kind): def __init__(self, *args):
super().__init__() super().__init__(*args)
self.env = env
self.kind = kind
self.subscribe_headers = False self.subscribe_headers = False
self.subscribe_height = False self.subscribe_height = False
self.notified_height = None self.notified_height = None
@ -342,54 +295,62 @@ class ElectrumX(JSONRPC):
'banner donation_address peers.subscribe version'), 'banner donation_address peers.subscribe version'),
] ]
self.handlers = {'.'.join([prefix, suffix]): self.handlers = {'.'.join([prefix, suffix]):
getattr(self.__class__, suffix.replace('.', '_')) getattr(self, suffix.replace('.', '_'))
for prefix, suffixes in rpcs for prefix, suffixes in rpcs
for suffix in suffixes.split()} for suffix in suffixes.split()}
@classmethod @classmethod
def watched_address_count(cls): def notify(cls, sessions, height, touched):
sessions = cls.SESSION_MGR.sessions headers_payload = height_payload = None
return sum(len(session.hash168s) for session in sessions)
@classmethod
def notify(cls, height, touched):
'''Notify electrum clients about height changes and touched
addresses.'''
headers_payload = json_notification(
'blockchain.headers.subscribe',
(cls.electrum_header(height), ),
)
height_payload = json_notification(
'blockchain.numblocks.subscribe',
(height, ),
)
hash168_to_address = cls.COIN.hash168_to_address
for session in cls.SESSION_MGR.sessions:
if not isinstance(session, ElectrumX):
continue
for session in sessions:
if height != session.notified_height: if height != session.notified_height:
session.notified_height = height session.notified_height = height
if session.subscribe_headers: if session.subscribe_headers:
session.json_send(headers_payload) if headers_payload is None:
if session.subscribe_height: headers_payload = json_notification_payload(
session.json_send(height_payload) 'blockchain.headers.subscribe',
(session.electrum_header(height), ),
)
session.send_json(headers_payload)
if session.subscribe_height:
if height_payload is None:
height_payload = json_notification_payload(
'blockchain.numblocks.subscribe',
(height, ),
)
session.send_json(height_payload)
hash168_to_address = session.coin.hash168_to_address
for hash168 in session.hash168s.intersection(touched): for hash168 in session.hash168s.intersection(touched):
address = hash168_to_address(hash168) address = hash168_to_address(hash168)
status = cls.address_status(hash168) status = session.address_status(hash168)
payload = json_notification('blockchain.address.subscribe', payload = json_notification_payload(
(address, status)) 'blockchain.address.subscribe', (address, status))
session.json_send(payload) session.send_json(payload)
@classmethod def height(self):
def address_status(cls, hash168): '''Return the block processor's current height.'''
return self.bp.height
def current_electrum_header(self):
'''Used as response to a headers subscription request.'''
return self.electrum_header(self.height())
def electrum_header(self, height):
'''Return the binary header at the given height.'''
if not 0 <= height <= self.height():
raise RPCError('height {:,d} out of range'.format(height))
header = self.bp.read_headers(height, 1)
return self.coin.electrum_header(header, height)
def address_status(self, hash168):
'''Returns status as 32 bytes.''' '''Returns status as 32 bytes.'''
# Note history is ordered and mempool unordered in electrum-server # Note history is ordered and mempool unordered in electrum-server
# For mempool, height is -1 if unconfirmed txins, otherwise 0 # For mempool, height is -1 if unconfirmed txins, otherwise 0
history = cls.BLOCK_PROCESSOR.get_history(hash168) history = self.bp.get_history(hash168)
mempool = cls.BLOCK_PROCESSOR.mempool_transactions(hash168) mempool = self.bp.mempool_transactions(hash168)
status = ''.join('{}:{:d}:'.format(hash_to_str(tx_hash), height) status = ''.join('{}:{:d}:'.format(hash_to_str(tx_hash), height)
for tx_hash, height in history) for tx_hash, height in history)
@ -399,11 +360,10 @@ class ElectrumX(JSONRPC):
return sha256(status.encode()).hex() return sha256(status.encode()).hex()
return None return None
@classmethod async def tx_merkle(self, tx_hash, height):
async def tx_merkle(cls, tx_hash, height):
'''tx_hash is a hex string.''' '''tx_hash is a hex string.'''
hex_hashes = await cls.DAEMON.block_hex_hashes(height, 1) hex_hashes = await self.daemon.block_hex_hashes(height, 1)
block = await cls.DAEMON.deserialised_block(hex_hashes[0]) block = await self.daemon.deserialised_block(hex_hashes[0])
tx_hashes = block['tx'] tx_hashes = block['tx']
# This will throw if the tx_hash is bad # This will throw if the tx_hash is bad
pos = tx_hashes.index(tx_hash) pos = tx_hashes.index(tx_hash)
@ -422,16 +382,11 @@ class ElectrumX(JSONRPC):
return {"block_height": height, "merkle": merkle_branch, "pos": pos} return {"block_height": height, "merkle": merkle_branch, "pos": pos}
@classmethod def get_history(self, hash168):
def height(cls):
return cls.BLOCK_PROCESSOR.height
@classmethod
def get_history(cls, hash168):
# Note history is ordered and mempool unordered in electrum-server # Note history is ordered and mempool unordered in electrum-server
# For mempool, height is -1 if unconfirmed txins, otherwise 0 # For mempool, height is -1 if unconfirmed txins, otherwise 0
history = cls.BLOCK_PROCESSOR.get_history(hash168, limit=None) history = self.bp.get_history(hash168, limit=None)
mempool = cls.BLOCK_PROCESSOR.mempool_transactions(hash168) mempool = self.bp.mempool_transactions(hash168)
conf = tuple({'tx_hash': hash_to_str(tx_hash), 'height': height} conf = tuple({'tx_hash': hash_to_str(tx_hash), 'height': height}
for tx_hash, height in history) for tx_hash, height in history)
@ -439,24 +394,21 @@ class ElectrumX(JSONRPC):
for tx_hash, fee, unconfirmed in mempool) for tx_hash, fee, unconfirmed in mempool)
return conf + unconf return conf + unconf
@classmethod def get_chunk(self, index):
def get_chunk(cls, index):
'''Return header chunk as hex. Index is a non-negative integer.''' '''Return header chunk as hex. Index is a non-negative integer.'''
chunk_size = cls.COIN.CHUNK_SIZE chunk_size = self.coin.CHUNK_SIZE
next_height = cls.height() + 1 next_height = self.height() + 1
start_height = min(index * chunk_size, next_height) start_height = min(index * chunk_size, next_height)
count = min(next_height - start_height, chunk_size) count = min(next_height - start_height, chunk_size)
return cls.BLOCK_PROCESSOR.read_headers(start_height, count).hex() return self.bp.read_headers(start_height, count).hex()
@classmethod def get_balance(self, hash168):
def get_balance(cls, hash168): confirmed = self.bp.get_balance(hash168)
confirmed = cls.BLOCK_PROCESSOR.get_balance(hash168) unconfirmed = self.bp.mempool_value(hash168)
unconfirmed = cls.BLOCK_PROCESSOR.mempool_value(hash168)
return {'confirmed': confirmed, 'unconfirmed': unconfirmed} return {'confirmed': confirmed, 'unconfirmed': unconfirmed}
@classmethod def list_unspent(self, hash168):
def list_unspent(cls, hash168): utxos = self.bp.get_utxos_sorted(hash168)
utxos = cls.BLOCK_PROCESSOR.get_utxos_sorted(hash168)
return tuple({'tx_hash': hash_to_str(utxo.tx_hash), return tuple({'tx_hash': hash_to_str(utxo.tx_hash),
'tx_pos': utxo.tx_pos, 'height': utxo.height, 'tx_pos': utxo.tx_pos, 'height': utxo.height,
'value': utxo.value} 'value': utxo.value}
@ -498,7 +450,7 @@ class ElectrumX(JSONRPC):
return self.electrum_header(height) return self.electrum_header(height)
async def estimatefee(self, params): async def estimatefee(self, params):
return await self.DAEMON.estimatefee(params) return await self.daemon.estimatefee(params)
async def headers_subscribe(self, params): async def headers_subscribe(self, params):
self.require_empty_params(params) self.require_empty_params(params)
@ -514,7 +466,7 @@ class ElectrumX(JSONRPC):
'''The minimum fee a low-priority tx must pay in order to be accepted '''The minimum fee a low-priority tx must pay in order to be accepted
to the daemon's memory pool.''' to the daemon's memory pool.'''
self.require_empty_params(params) self.require_empty_params(params)
return await self.DAEMON.relayfee() return await self.daemon.relayfee()
async def transaction_broadcast(self, params): async def transaction_broadcast(self, params):
'''Pass through the parameters to the daemon. '''Pass through the parameters to the daemon.
@ -525,7 +477,7 @@ class ElectrumX(JSONRPC):
user interface job here. user interface job here.
''' '''
try: try:
tx_hash = await self.DAEMON.sendrawtransaction(params) tx_hash = await self.daemon.sendrawtransaction(params)
self.logger.info('sent tx: {}'.format(tx_hash)) self.logger.info('sent tx: {}'.format(tx_hash))
return tx_hash return tx_hash
except DaemonError as e: except DaemonError as e:
@ -550,7 +502,7 @@ class ElectrumX(JSONRPC):
# in anticipation it might be dropped in the future. # in anticipation it might be dropped in the future.
if 1 <= len(params) <= 2: if 1 <= len(params) <= 2:
tx_hash = self.tx_hash_from_param(params[0]) tx_hash = self.tx_hash_from_param(params[0])
return await self.DAEMON.getrawtransaction(tx_hash) return await self.daemon.getrawtransaction(tx_hash)
raise RPCError('params wrong length: {}'.format(params)) raise RPCError('params wrong length: {}'.format(params))
@ -567,9 +519,9 @@ class ElectrumX(JSONRPC):
tx_hash = self.tx_hash_from_param(params[0]) tx_hash = self.tx_hash_from_param(params[0])
index = self.non_negative_integer_from_param(params[1]) index = self.non_negative_integer_from_param(params[1])
tx_hash = hex_str_to_hash(tx_hash) tx_hash = hex_str_to_hash(tx_hash)
hash168 = self.BLOCK_PROCESSOR.get_utxo_hash168(tx_hash, index) hash168 = self.bp.get_utxo_hash168(tx_hash, index)
if hash168: if hash168:
return self.COIN.hash168_to_address(hash168) return self.coin.hash168_to_address(hash168)
return None return None
raise RPCError('params should contain a transaction hash and index') raise RPCError('params should contain a transaction hash and index')
@ -604,7 +556,7 @@ class ElectrumX(JSONRPC):
subscription. subscription.
''' '''
self.require_empty_params(params) self.require_empty_params(params)
return list(self.irc_peers().values()) return list(self.manager.irc_peers().values())
async def version(self, params): async def version(self, params):
'''Return the server version as a string.''' '''Return the server version as a string.'''
@ -614,37 +566,25 @@ class ElectrumX(JSONRPC):
return VERSION return VERSION
class LocalRPC(JSONRPC): class LocalRPC(Session):
'''A local TCP RPC server for querying status.''' '''A local TCP RPC server for querying status.'''
def __init__(self, env, kind): def __init__(self, *args):
super().__init__() super().__init__(*args)
cmds = 'getinfo sessions numsessions peers numpeers'.split() cmds = 'getinfo sessions numsessions peers numpeers'.split()
self.handlers = {cmd: getattr(self.__class__, cmd) for cmd in cmds} self.handlers = {cmd: getattr(self, cmd) for cmd in cmds}
self.env = env
self.kind = kind
async def getinfo(self, params): async def getinfo(self, params):
return { return self.manager.info()
'blocks': self.height(),
'peers': len(self.irc_peers()),
'sessions': len(self.SESSION_MGR.sessions),
'watched': ElectrumX.watched_address_count(),
'cached': 0,
}
async def sessions(self, params): async def sessions(self, params):
now = time.time() return self.manager.sessions_info()
return [(session.kind,
'this RPC client' if session == self else session.peername,
len(session.hash168s), session.client, now - session.start)
for session in self.SESSION_MGR.sessions]
async def numsessions(self, params): async def numsessions(self, params):
return len(self.SESSION_MGR.sessions) return self.manager.session_count()
async def peers(self, params): async def peers(self, params):
return self.irc_peers() return self.manager.irc_peers()
async def numpeers(self, params): async def numpeers(self, params):
return len(self.irc_peers()) return len(self.manager.irc_peers())

Loading…
Cancel
Save