Browse Source

Server work

Avoid touching the block preprocessor hot loop for now
master
Neil Booth 8 years ago
parent
commit
ceecdc54ac
  1. 19
      lib/coins.py
  2. 12
      server/block_processor.py
  3. 19
      server/cache.py
  4. 55
      server/controller.py
  5. 25
      server/daemon.py
  6. 378
      server/protocol.py

19
lib/coins.py

@ -13,9 +13,10 @@ necessary for appropriate handling.
from decimal import Decimal from decimal import Decimal
import inspect import inspect
import struct
import sys import sys
from lib.hash import Base58, hash160, double_sha256 from lib.hash import Base58, hash160, double_sha256, hash_to_str
from lib.script import ScriptPubKey from lib.script import ScriptPubKey
from lib.tx import Deserializer from lib.tx import Deserializer
@ -31,6 +32,7 @@ class Coin(object):
HEADER_LEN = 80 HEADER_LEN = 80
DEFAULT_RPC_PORT = 8332 DEFAULT_RPC_PORT = 8332
VALUE_PER_COIN = 100000000 VALUE_PER_COIN = 100000000
CHUNK_SIZE=2016
@staticmethod @staticmethod
def coin_classes(): def coin_classes():
@ -168,6 +170,21 @@ class Coin(object):
''' '''
return Decimal(value) / cls.VALUE_PER_COIN return Decimal(value) / cls.VALUE_PER_COIN
@classmethod
def electrum_header(cls, header, height):
version, = struct.unpack('<I', header[:4])
timestamp, bits, nonce = struct.unpack('<III', header[68:80])
return {
'block_height': height,
'version': version,
'prev_block_hash': hash_to_str(header[4:36]),
'merkle_root': hash_to_str(header[36:68]),
'timestamp': timestamp,
'bits': bits,
'nonce': nonce,
}
class Bitcoin(Coin): class Bitcoin(Coin):
NAME = "Bitcoin" NAME = "Bitcoin"

12
server/block_processor.py

@ -125,11 +125,15 @@ class BlockProcessor(LoggedClass):
Coordinate backing up in case of chain reorganisations. Coordinate backing up in case of chain reorganisations.
''' '''
def __init__(self, env, daemon, on_catchup=None): def __init__(self, env, daemon, on_update=None):
'''on_update is awaitable, and called only when caught up with the
daemon and a new block arrives or the mempool is updated.
'''
super().__init__() super().__init__()
self.daemon = daemon self.daemon = daemon
self.on_catchup = on_catchup self.on_update = on_update
# Meta # Meta
self.utxo_MB = env.utxo_MB self.utxo_MB = env.utxo_MB
@ -200,8 +204,8 @@ class BlockProcessor(LoggedClass):
if not self.have_caught_up: if not self.have_caught_up:
self.have_caught_up = True self.have_caught_up = True
self.logger.info('caught up to height {:,d}'.format(self.height)) self.logger.info('caught up to height {:,d}'.format(self.height))
if self.on_catchup: if self.on_update:
await self.on_catchup() await self.on_update(self.height, set())
async def start(self): async def start(self):
'''External entry point for block processing. '''External entry point for block processing.

19
server/cache.py

@ -390,22 +390,3 @@ class FSCache(LoggedClass):
headers = self.read_headers(height, count) headers = self.read_headers(height, count)
hlen = self.coin.HEADER_LEN hlen = self.coin.HEADER_LEN
return [double_sha256(header) for header in chunks(headers, hlen)] return [double_sha256(header) for header in chunks(headers, hlen)]
def encode_header(self, height):
if height < 0 or height > self.height + len(self.headers):
raise Exception('no header information for height {:,d}'
.format(height))
header = self.read_headers(self.height, 1)
unpack = struct.unpack
version, = unpack('<I', header[:4])
timestamp, bits, nonce = unpack('<III', header[68:80])
return {
'block_height': self.height,
'version': version,
'prev_block_hash': hash_to_str(header[4:36]),
'merkle_root': hash_to_str(header[36:68]),
'timestamp': timestamp,
'bits': bits,
'nonce': nonce,
}

55
server/controller.py

@ -19,7 +19,7 @@ from functools import partial
from server.daemon import Daemon, DaemonError from server.daemon import Daemon, DaemonError
from server.block_processor import BlockProcessor from server.block_processor import BlockProcessor
from server.protocol import ElectrumX, LocalRPC from server.protocol import ElectrumX, LocalRPC, RPCError, JSONRPC
from lib.hash import (sha256, double_sha256, hash_to_str, from lib.hash import (sha256, double_sha256, hash_to_str,
Base58, hex_str_to_hash) Base58, hex_str_to_hash)
from lib.util import LoggedClass from lib.util import LoggedClass
@ -35,11 +35,12 @@ class Controller(LoggedClass):
super().__init__() super().__init__()
self.loop = loop self.loop = loop
self.env = env self.env = env
self.coin = env.coin
self.daemon = Daemon(env.daemon_url) self.daemon = Daemon(env.daemon_url)
self.block_processor = BlockProcessor(env, self.daemon, self.block_processor = BlockProcessor(env, self.daemon,
on_catchup=self.start_servers) on_update=self.on_update)
JSONRPC.init(self.block_processor, self.coin)
self.servers = [] self.servers = []
self.sessions = set()
self.addresses = {} self.addresses = {}
self.jobs = asyncio.Queue() self.jobs = asyncio.Queue()
self.peers = {} self.peers = {}
@ -57,15 +58,18 @@ class Controller(LoggedClass):
self.loop.add_signal_handler(getattr(signal, signame), self.loop.add_signal_handler(getattr(signal, signame),
partial(self.on_signal, signame)) partial(self.on_signal, signame))
async def on_update(self, height, touched):
if not self.servers:
self.servers = await self.start_servers()
ElectrumX.notify(height, touched)
async def start_servers(self): async def start_servers(self):
'''Start listening on RPC, TCP and SSL ports. '''Start listening on RPC, TCP and SSL ports.
Does not start a server if the port wasn't specified. Does Does not start a server if the port wasn't specified. Does
nothing if servers are already running. nothing if servers are already running.
''' '''
if self.servers: servers = []
return
env = self.env env = self.env
loop = self.loop loop = self.loop
@ -73,14 +77,14 @@ class Controller(LoggedClass):
if env.rpc_port is not None: if env.rpc_port is not None:
host = 'localhost' host = 'localhost'
rpc_server = loop.create_server(protocol, host, env.rpc_port) rpc_server = loop.create_server(protocol, host, env.rpc_port)
self.servers.append(await rpc_server) servers.append(await rpc_server)
self.logger.info('RPC server listening on {}:{:d}' self.logger.info('RPC server listening on {}:{:d}'
.format(host, env.rpc_port)) .format(host, env.rpc_port))
protocol = partial(ElectrumX, self, self.daemon, env) protocol = partial(ElectrumX, self, self.daemon, env)
if env.tcp_port is not None: if env.tcp_port is not None:
tcp_server = loop.create_server(protocol, env.host, env.tcp_port) tcp_server = loop.create_server(protocol, env.host, env.tcp_port)
self.servers.append(await tcp_server) servers.append(await tcp_server)
self.logger.info('TCP server listening on {}:{:d}' self.logger.info('TCP server listening on {}:{:d}'
.format(env.host, env.tcp_port)) .format(env.host, env.tcp_port))
@ -91,10 +95,12 @@ class Controller(LoggedClass):
keyfile=env.ssl_keyfile) keyfile=env.ssl_keyfile)
ssl_server = loop.create_server(protocol, env.host, env.ssl_port, ssl_server = loop.create_server(protocol, env.host, env.ssl_port,
ssl=ssl_context) ssl=ssl_context)
self.servers.append(await ssl_server) servers.append(await ssl_server)
self.logger.info('SSL server listening on {}:{:d}' self.logger.info('SSL server listening on {}:{:d}'
.format(env.host, env.ssl_port)) .format(env.host, env.ssl_port))
return servers
def stop(self): def stop(self):
'''Close the listening servers.''' '''Close the listening servers.'''
for server in self.servers: for server in self.servers:
@ -107,14 +113,6 @@ class Controller(LoggedClass):
for task in asyncio.Task.all_tasks(self.loop): for task in asyncio.Task.all_tasks(self.loop):
task.cancel() task.cancel()
def add_session(self, session):
'''Add a session representing one incoming connection.'''
self.sessions.add(session)
def remove_session(self, session):
'''Remove a session.'''
self.sessions.remove(session)
def add_job(self, coro): def add_job(self, coro):
'''Queue a job for asynchronous processing.''' '''Queue a job for asynchronous processing.'''
self.jobs.put_nowait(coro) self.jobs.put_nowait(coro)
@ -174,12 +172,29 @@ class Controller(LoggedClass):
def height(self): def height(self):
return self.block_processor.height return self.block_processor.height
def get_current_header(self):
return self.block_processor.get_current_header()
def get_history(self, hash168): def get_history(self, hash168):
history = self.block_processor.get_history(hash168, limit=None) history = self.block_processor.get_history(hash168, limit=None)
return [ return [
{'tx_hash': hash_to_str(tx_hash), 'height': height} {'tx_hash': hash_to_str(tx_hash), 'height': height}
for tx_hash, height in history for tx_hash, height in history
] ]
def get_chunk(self, index):
'''Return header chunk as hex. Index is a non-negative integer.'''
chunk_size = self.coin.CHUNK_SIZE
next_height = self.height() + 1
start_height = min(index * chunk_size, next_height)
count = min(next_height - start_height, chunk_size)
return self.block_processor.read_headers(start_height, count).hex()
def get_balance(self, hash168):
confirmed = self.block_processor.get_balance(hash168)
unconfirmed = -1 # FIXME
return {'confirmed': confirmed, 'unconfirmed': unconfirmed}
def list_unspent(self, hash168):
utxos = self.block_processor.get_utxos_sorted(hash168)
return tuple({'tx_hash': hash_to_str(utxo.tx_hash),
'tx_pos': utxo.tx_pos, 'height': utxo.height,
'value': utxo.value}
for utxo in utxos)

25
server/daemon.py

@ -71,8 +71,7 @@ class Daemon(LoggedClass):
msg = 'daemon still warming up.' msg = 'daemon still warming up.'
secs = 30 secs = 30
else: else:
msg = '{}'.format(errs) raise DaemonError(errs)
raise DaemonError(msg)
self.logger.error('{}. Sleeping {:d}s and trying again...' self.logger.error('{}. Sleeping {:d}s and trying again...'
.format(msg, secs)) .format(msg, secs))
@ -90,6 +89,28 @@ class Daemon(LoggedClass):
# Convert hex string to bytes # Convert hex string to bytes
return [bytes.fromhex(block) for block in blocks] return [bytes.fromhex(block) for block in blocks]
async def mempool_hashes(self):
'''Return the hashes of the txs in the daemon's mempool.'''
return await self.send_single('getrawmempool')
async def estimatefee(self, params):
'''Return the fee estimate for the given parameters.'''
return await self.send_single('estimatefee', params)
async def relayfee(self):
'''The minimum fee a low-priority tx must pay in order to be accepted
to the daemon's memory pool.'''
net_info = await self.send_single('getnetworkinfo')
return net_info['relayfee']
async def getrawtransaction(self, hex_hash):
'''Return the serialized raw transaction with the given hash.'''
return await self.send_single('getrawtransaction', (hex_hash, 0))
async def sendrawtransaction(self, params):
'''Broadcast a transaction to the network.'''
return await self.send_single('sendrawtransaction', params)
async def height(self): async def height(self):
'''Query the daemon for its current height.''' '''Query the daemon for its current height.'''
self._height = await self.send_single('getblockcount') self._height = await self.send_single('getblockcount')

378
server/protocol.py

@ -11,44 +11,55 @@
import asyncio import asyncio
import codecs import codecs
import json import json
import struct
import traceback import traceback
from functools import partial from functools import partial
from server.daemon import DaemonError
from lib.util import LoggedClass from lib.util import LoggedClass
from server.version import VERSION from server.version import VERSION
class Error(Exception): class RPCError(Exception):
BAD_REQUEST = 1 '''RPC handlers raise this error.'''
INTERNAL_ERROR = 2
class JSONRPC(asyncio.Protocol, LoggedClass): def json_notification(method, params):
'''Base class that manages a JSONRPC connection. '''Create a json notification.'''
return {'id': None, 'method': method, 'params': params}
When a request comes in for an RPC method M, then a member class JSONRPC(asyncio.Protocol, LoggedClass):
function handle_M is called with the request params array, except '''Base class that manages a JSONRPC connection.'''
that periods in M are replaced with underscores. So a RPC call SESSIONS = set()
for method 'blockchain.estimatefee' will be passed to BLOCK_PROCESSOR = None
handle_blockchain_estimatefee. COIN = None
'''
def __init__(self, controller): def __init__(self, controller):
super().__init__() super().__init__()
self.controller = controller self.controller = controller
self.parts = [] self.parts = []
self.send_count = 0
self.send_size = 0
self.error_count = 0
self.subscribe_headers = False
self.subscribe_height = False
self.notified_height = None
def connection_made(self, transport): def connection_made(self, transport):
'''Handle an incoming client connection.''' '''Handle an incoming client connection.'''
self.transport = transport self.transport = transport
self.peername = transport.get_extra_info('peername') self.peername = transport.get_extra_info('peername')
self.logger.info('connection from {}'.format(self.peername)) self.logger.info('connection from {}'.format(self.peername))
self.controller.add_session(self) self.SESSIONS.add(self)
def connection_lost(self, exc): def connection_lost(self, exc):
'''Handle client disconnection.''' '''Handle client disconnection.'''
self.logger.info('disconnected: {}'.format(self.peername)) self.logger.info('{} disconnected. '
self.controller.remove_session(self) 'Sent {:,d} bytes in {:,d} messages {:,d} errors'
.format(self.peername, self.send_size,
self.send_count, self.error_count))
self.SESSIONS.remove(self)
def data_received(self, data): def data_received(self, data):
'''Handle incoming data (synchronously). '''Handle incoming data (synchronously).
@ -79,40 +90,129 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
'''Called asynchronously.''' '''Called asynchronously.'''
error = result = None error = result = None
try: try:
result = await self.json_handler(request) handler = self.rpc_handler(request.get('method'),
except Error as e: request.get('params', []))
error = {'code': e.args[0], 'message': e.args[1]} result = await handler()
except asyncio.CancelledError: except RPCError as e:
raise self.error_count += 1
except Exception as e: error = {'code': 1, 'message': e.args[0]}
# This should be considered a bug and fixed
traceback.print_exc()
error = {'code': Error.INTERNAL_ERROR, 'message': str(e)}
payload = {'id': request.get('id'), 'error': error, 'result': result} payload = {'id': request.get('id'), 'error': error, 'result': result}
try: self.json_send(payload)
data = json.dumps(payload) + '\n'
except TypeError: def json_send(self, payload):
msg = 'cannot JSON encode response to request {}'.format(request) data = (json.dumps(payload) + '\n').encode()
self.logger.error(msg) self.transport.write(data)
error = {'code': Error.INTERNAL_ERROR, 'message': msg} self.send_count += 1
payload = {'id': request.get('id'), 'error': error, 'result': None} self.send_size += len(data)
data = json.dumps(payload) + '\n'
self.transport.write(data.encode()) def rpc_handler(self, method, params):
async def json_handler(self, request):
method = request.get('method')
handler = None handler = None
if isinstance(method, str): if isinstance(method, str):
handler_name = 'handle_{}'.format(method.replace('.', '_')) handler = self.handlers.get(method)
handler = getattr(self, handler_name, None)
if not handler: if not handler:
self.logger.info('unknown method: {}'.format(method)) self.logger.info('unknown method: {}'.format(method))
raise Error(Error.BAD_REQUEST, 'unknown method: {}'.format(method)) raise RPCError('unknown method: {}'.format(method))
params = request.get('params', [])
if not isinstance(params, list): if not isinstance(params, list):
raise Error(Error.BAD_REQUEST, 'params should be an array') raise RPCError('params should be an array')
return await handler(params)
return partial(handler, self, params)
@classmethod
def tx_hash_from_param(cls, param):
'''Raise an RPCError if the parameter is not a valid transaction
hash.'''
if isinstance(param, str) and len(param) == 64:
try:
bytes.fromhex(param)
return param
except ValueError:
pass
raise RPCError('parameter should be a transaction hash: {}'
.format(param))
@classmethod
def hash168_from_param(cls, param):
if isinstance(param, str):
try:
return cls.COIN.address_to_hash168(param)
except:
pass
raise RPCError('parameter should be a valid address: {}'.format(param))
@classmethod
def non_negative_integer_from_param(cls, param):
if isinstance(param, int) and param >= 0:
return param
raise RPCError('param should be a non-negative integer: {}'
.format(param))
@classmethod
def extract_hash168(cls, params):
if len(params) == 1:
return cls.hash168_from_param(params[0])
raise RPCError('params should contain a single address: {}'
.format(params))
@classmethod
def extract_non_negative_integer(cls, params):
if len(params) == 1:
return cls.non_negative_integer_from_param(params[0])
raise RPCError('params should contain a non-negative integer: {}'
.format(params))
@classmethod
def require_empty_params(cls, params):
if params:
raise RPCError('params should be empty: {}'.format(params))
@classmethod
def init(cls, block_processor, coin):
cls.BLOCK_PROCESSOR = block_processor
cls.COIN = coin
@classmethod
def height(cls):
'''Return the current height.'''
return cls.BLOCK_PROCESSOR.height
@classmethod
def electrum_header(cls, height=None):
'''Return the binary header at the given height.'''
if not 0 <= height <= cls.height():
raise RPCError('height {:,d} out of range'.format(height))
header = cls.BLOCK_PROCESSOR.read_headers(height, 1)
return cls.COIN.electrum_header(header, height)
@classmethod
def current_electrum_header(cls):
'''Used as response to a headers subscription request.'''
return cls.electrum_header(cls.height())
@classmethod
def notify(cls, height, touched):
'''Notify electrum clients about height changes and touched
addresses.'''
headers_payload = json_notification(
'blockchain.headers.subscribe',
(cls.electrum_header(height), ),
)
height_payload = json_notification(
'blockchain.numblocks.subscribe',
(height, ),
)
for session in cls.SESSIONS:
if height != session.notified_height:
session.notified_height = height
if session.subscribe_headers:
session.json_send(headers_payload)
if session.subscribe_height:
session.json_send(height_payload)
for hash168 in session.hash168s.intersection(touched):
payload = json_notification('blockchain.address.subscribe',
(Base58.encode_check(hash168), ))
session.json_send(payload)
class ElectrumX(JSONRPC): class ElectrumX(JSONRPC):
@ -122,60 +222,138 @@ class ElectrumX(JSONRPC):
super().__init__(controller) super().__init__(controller)
self.daemon = daemon self.daemon = daemon
self.env = env self.env = env
self.addresses = set() self.hash168s = set()
self.subscribe_headers = False rpcs = [(
'blockchain',
'address.get_balance address.get_history address.get_mempool '
'address.get_proof address.listunspent address.subscribe '
'block.get_header block.get_chunk estimatefee headers.subscribe '
'numblocks.subscribe relayfee transaction.broadcast '
'transaction.get transaction.get_merkle utxo.get_address'),
(
'server',
'banner donation_address peers.subscribe version'),
]
self.handlers = {'.'.join([prefix, suffix]):
getattr(self.__class__, suffix.replace('.', '_'))
for prefix, suffixes in rpcs
for suffix in suffixes.split()}
@classmethod
def watched_address_count(cls):
return sum(len(session.hash168s) for session in self.SESSIONS
if isinstance(session, cls))
# --- blockchain commands
async def address_get_balance(self, params):
hash168 = self.extract_hash168(params)
return self.controller.get_balance(hash168)
async def address_get_history(self, params):
hash168 = self.extract_hash168(params)
return self.controller.get_history(hash168)
def params_to_hash168(self, params): async def address_get_mempool(self, params):
if len(params) != 1: hash168 = self.extract_hash168(params)
raise Error(Error.BAD_REQUEST, raise RPCError('get_mempool is not yet implemented')
'params should contain a single address')
address = params[0]
try:
return self.env.coin.address_to_hash168(address)
except:
raise Error(Error.BAD_REQUEST,
'invalid address: {}'.format(address))
async def handle_blockchain_address_get_history(self, params): async def address_get_proof(self, params):
hash168 = self.params_to_hash168(params) hash168 = self.extract_hash168(params)
return self.controller.get_history(hash168) raise RPCError('get_proof is not yet implemented')
async def handle_blockchain_address_subscribe(self, params): async def address_listunspent(self, params):
hash168 = self.params_to_hash168(params) hash168 = self.extract_hash168(params)
return self.controller.list_unspent(hash168)
async def address_subscribe(self, params):
hash168 = self.extract_hash168(params)
self.hash168s.add(hash168)
status = self.controller.address_status(hash168) status = self.controller.address_status(hash168)
return status.hex() if status else None return status.hex() if status else None
async def handle_blockchain_estimatefee(self, params): async def block_get_chunk(self, params):
result = await self.daemon.send_single('estimatefee', params) index = self.extract_non_negative_integer(params)
return result return self.controller.get_chunk(index)
async def block_get_header(self, params):
height = self.extract_non_negative_integer(params)
return self.electrum_header(height)
async def estimatefee(self, params):
return await self.daemon.estimatefee(params)
async def handle_blockchain_headers_subscribe(self, params): async def headers_subscribe(self, params):
self.require_empty_params(params)
self.subscribe_headers = True self.subscribe_headers = True
return self.controller.get_current_header() return self.current_electrum_header()
async def handle_blockchain_relayfee(self, params): async def numblocks_subscribe(self, params):
self.require_empty_params(params)
self.subscribe_height = True
return self.height()
async def relayfee(self, params):
'''The minimum fee a low-priority tx must pay in order to be accepted '''The minimum fee a low-priority tx must pay in order to be accepted
to this daemon's memory pool. to the daemon's memory pool.'''
self.require_empty_params(params)
return await self.daemon.relayfee()
async def transaction_broadcast(self, params):
'''Pass through the parameters to the daemon.
An ugly API: current Electrum clients only pass the raw
transaction in hex and expect error messages to be returned in
the result field. And the server shouldn't be doing the client's
user interface job here.
''' '''
net_info = await self.daemon.send_single('getnetworkinfo') try:
return net_info['relayfee'] tx_hash = await self.daemon.sendrawtransaction(params)
self.logger.info('sent tx: {}'.format(tx_hash))
async def handle_blockchain_transaction_get(self, params): return tx_hash
if len(params) != 1: except DaemonError as e:
raise Error(Error.BAD_REQUEST, errors = e.args[0]
'params should contain a transaction hash') error = errors[0]
tx_hash = params[0] message = error['message']
return await self.daemon.send_single('getrawtransaction', (tx_hash, 0)) self.logger.info('sendrawtransaction: {}'.format(message))
if 'non-mandatory-script-verify-flag' in message:
async def handle_blockchain_transaction_get_merkle(self, params): return (
if len(params) != 2: 'Your client produced a transaction that is not accepted '
raise Error(Error.BAD_REQUEST, 'by the network any more. Please upgrade to Electrum '
'params should contain a transaction hash and height') '2.5.1 or newer.'
tx_hash, height = params )
return await self.controller.get_merkle(tx_hash, height)
return (
async def handle_server_banner(self, params): 'The transaction was rejected by network rules. ({})\n[{}]'
.format(message, params[0])
)
async def transaction_get(self, params):
'''Return the serialized raw transaction.'''
# For some reason Electrum passes a height. Don't require it
# in anticipation it might be dropped in the future.
if 1 <= len(params) <= 2:
tx_hash = self.tx_hash_from_param(params[0])
return await self.daemon.getrawtransaction(tx_hash)
raise RPCError('params wrong length: {}'.format(params))
async def transaction_get_merkle(self, params):
if len(params) == 2:
tx_hash = self.tx_hash_from_param(params[0])
height = self.non_negative_integer_from_param(params[1])
return await self.controller.get_merkle(tx_hash, height)
raise RPCError('params should contain a transaction hash and height')
async def utxo_get_address(self, params):
pass # TODO
# --- server commands
async def banner(self, params):
'''Return the server banner.''' '''Return the server banner.'''
self.require_empty_params(params)
banner = 'Welcome to Electrum!' banner = 'Welcome to Electrum!'
if self.env.banner_file: if self.env.banner_file:
try: try:
@ -186,23 +364,25 @@ class ElectrumX(JSONRPC):
.format(self.env.banner_file, e)) .format(self.env.banner_file, e))
return banner return banner
async def handle_server_donation_address(self, params): async def donation_address(self, params):
'''Return the donation address as a string. '''Return the donation address as a string.
If none is specified return the empty string. If none is specified return the empty string.
''' '''
self.require_empty_params(params)
return self.env.donation_address return self.env.donation_address
async def handle_server_peers_subscribe(self, params): async def peers_subscribe(self, params):
'''Returns the peer (ip, host, ports) tuples. '''Returns the peer (ip, host, ports) tuples.
Despite the name electrum-server does not treat this as a Despite the name electrum-server does not treat this as a
subscription. subscription.
''' '''
self.require_empty_params(params)
peers = self.controller.get_peers() peers = self.controller.get_peers()
return tuple(peers.values()) return tuple(peers.values())
async def handle_server_version(self, params): async def version(self, params):
'''Return the server version as a string.''' '''Return the server version as a string.'''
return VERSION return VERSION
@ -210,24 +390,28 @@ class ElectrumX(JSONRPC):
class LocalRPC(JSONRPC): class LocalRPC(JSONRPC):
'''A local TCP RPC server for querying status.''' '''A local TCP RPC server for querying status.'''
async def handle_getinfo(self, params): def __init__(self):
super().__init__()
cmds = 'getinfo sessions numsessions peers numpeers'.split()
self.handlers = {cmd: getattr(self.__class__, cmd) for cmd in cmds}
async def getinfo(self, params):
return { return {
'blocks': self.controller.height(), 'blocks': self.height(),
'peers': len(self.controller.get_peers()), 'peers': len(self.controller.get_peers()),
'sessions': len(self.controller.sessions), 'sessions': len(self.SESSIONS),
'watched': sum(len(s.addresses) for s in self.controller.sessions 'watched': ElectrumX.watched_address_count(),
if isinstance(s, ElectrumX)),
'cached': 0, 'cached': 0,
} }
async def handle_sessions(self, params): async def sessions(self, params):
return [] return []
async def handle_numsessions(self, params): async def numsessions(self, params):
return len(self.controller.sessions) return len(self.SESSIONS)
async def handle_peers(self, params): async def peers(self, params):
return tuple(self.controller.get_peers().keys()) return tuple(self.controller.get_peers().keys())
async def handle_numpeers(self, params): async def numpeers(self, params):
return len(self.controller.get_peers()) return len(self.controller.get_peers())

Loading…
Cancel
Save