Browse Source

Controller clean up

master
Neil Booth 8 years ago
parent
commit
fb43712869
  1. 80
      server/controller.py
  2. 171
      server/protocol.py

80
server/controller.py

@ -20,7 +20,6 @@ from functools import partial
from server.daemon import Daemon from server.daemon import Daemon
from server.block_processor import BlockProcessor from server.block_processor import BlockProcessor
from server.protocol import ElectrumX, LocalRPC, JSONRPC from server.protocol import ElectrumX, LocalRPC, JSONRPC
from lib.hash import sha256, double_sha256, hash_to_str, hex_str_to_hash
from lib.util import LoggedClass from lib.util import LoggedClass
@ -38,11 +37,10 @@ class Controller(LoggedClass):
self.daemon = Daemon(env.daemon_url) self.daemon = Daemon(env.daemon_url)
self.block_processor = BlockProcessor(env, self.daemon, self.block_processor = BlockProcessor(env, self.daemon,
on_update=self.on_update) on_update=self.on_update)
JSONRPC.init(self.block_processor, self.coin) JSONRPC.init(self.block_processor, self.daemon, self.coin,
self.add_job)
self.servers = [] self.servers = []
self.addresses = {}
self.jobs = asyncio.Queue() self.jobs = asyncio.Queue()
self.peers = {}
def start(self): def start(self):
'''Prime the event loop with asynchronous jobs.''' '''Prime the event loop with asynchronous jobs.'''
@ -72,7 +70,7 @@ class Controller(LoggedClass):
env = self.env env = self.env
loop = self.loop loop = self.loop
protocol = partial(LocalRPC, self) protocol = LocalRPC
if env.rpc_port is not None: if env.rpc_port is not None:
host = 'localhost' host = 'localhost'
rpc_server = loop.create_server(protocol, host, env.rpc_port) rpc_server = loop.create_server(protocol, host, env.rpc_port)
@ -80,7 +78,7 @@ class Controller(LoggedClass):
self.logger.info('RPC server listening on {}:{:d}' self.logger.info('RPC server listening on {}:{:d}'
.format(host, env.rpc_port)) .format(host, env.rpc_port))
protocol = partial(ElectrumX, self, self.daemon, env) protocol = partial(ElectrumX, env)
if env.tcp_port is not None: if env.tcp_port is not None:
tcp_server = loop.create_server(protocol, env.host, env.tcp_port) tcp_server = loop.create_server(protocol, env.host, env.tcp_port)
servers.append(await tcp_server) servers.append(await tcp_server)
@ -127,73 +125,3 @@ class Controller(LoggedClass):
except Exception: except Exception:
# Getting here should probably be considered a bug and fixed # Getting here should probably be considered a bug and fixed
traceback.print_exc() traceback.print_exc()
def address_status(self, hash168):
'''Returns status as 32 bytes.'''
status = self.addresses.get(hash168)
if status is None:
history = self.block_processor.get_history(hash168)
status = ''.join('{}:{:d}:'.format(hash_to_str(tx_hash), height)
for tx_hash, height in history)
if status:
status = sha256(status.encode())
self.addresses[hash168] = status
return status
async def get_merkle(self, tx_hash, height):
'''tx_hash is a hex string.'''
block_hash = await self.daemon.send_single('getblockhash', (height,))
block = await self.daemon.send_single('getblock', (block_hash, True))
tx_hashes = block['tx']
# This will throw if the tx_hash is bad
pos = tx_hashes.index(tx_hash)
idx = pos
hashes = [hex_str_to_hash(txh) for txh in tx_hashes]
merkle_branch = []
while len(hashes) > 1:
if len(hashes) & 1:
hashes.append(hashes[-1])
idx = idx - 1 if (idx & 1) else idx + 1
merkle_branch.append(hash_to_str(hashes[idx]))
idx //= 2
hashes = [double_sha256(hashes[n] + hashes[n + 1])
for n in range(0, len(hashes), 2)]
return {"block_height": height, "merkle": merkle_branch, "pos": pos}
def get_peers(self):
'''Returns a dictionary of IRC nick to (ip, host, ports) tuples, one
per peer.'''
return self.peers
def height(self):
return self.block_processor.height
def get_history(self, hash168):
history = self.block_processor.get_history(hash168, limit=None)
return [
{'tx_hash': hash_to_str(tx_hash), 'height': height}
for tx_hash, height in history
]
def get_chunk(self, index):
'''Return header chunk as hex. Index is a non-negative integer.'''
chunk_size = self.coin.CHUNK_SIZE
next_height = self.height() + 1
start_height = min(index * chunk_size, next_height)
count = min(next_height - start_height, chunk_size)
return self.block_processor.read_headers(start_height, count).hex()
def get_balance(self, hash168):
confirmed = self.block_processor.get_balance(hash168)
unconfirmed = -1 # FIXME
return {'confirmed': confirmed, 'unconfirmed': unconfirmed}
def list_unspent(self, hash168):
utxos = self.block_processor.get_utxos_sorted(hash168)
return tuple({'tx_hash': hash_to_str(utxo.tx_hash),
'tx_pos': utxo.tx_pos, 'height': utxo.height,
'value': utxo.value}
for utxo in utxos)

171
server/protocol.py

@ -16,7 +16,7 @@ import traceback
from functools import partial from functools import partial
from server.daemon import DaemonError from server.daemon import DaemonError
from lib.hash import hex_str_to_hash from lib.hash import sha256, double_sha256, hash_to_str, hex_str_to_hash
from lib.util import LoggedClass from lib.util import LoggedClass
from server.version import VERSION from server.version import VERSION
@ -33,19 +33,13 @@ def json_notification(method, params):
class JSONRPC(asyncio.Protocol, LoggedClass): class JSONRPC(asyncio.Protocol, LoggedClass):
'''Base class that manages a JSONRPC connection.''' '''Base class that manages a JSONRPC connection.'''
SESSIONS = set() SESSIONS = set()
BLOCK_PROCESSOR = None
COIN = None
def __init__(self, controller): def __init__(self):
super().__init__() super().__init__()
self.controller = controller
self.parts = [] self.parts = []
self.send_count = 0 self.send_count = 0
self.send_size = 0 self.send_size = 0
self.error_count = 0 self.error_count = 0
self.subscribe_headers = False
self.subscribe_height = False
self.notified_height = None
def connection_made(self, transport): def connection_made(self, transport):
'''Handle an incoming client connection.''' '''Handle an incoming client connection.'''
@ -85,7 +79,7 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
except Exception as e: except Exception as e:
self.logger.info('error decoding JSON message'.format(e)) self.logger.info('error decoding JSON message'.format(e))
else: else:
self.controller.add_job(self.request_handler(message)) self.ADD_JOB(self.request_handler(message))
async def request_handler(self, request): async def request_handler(self, request):
'''Called asynchronously.''' '''Called asynchronously.'''
@ -174,9 +168,11 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
raise RPCError('params should be empty: {}'.format(params)) raise RPCError('params should be empty: {}'.format(params))
@classmethod @classmethod
def init(cls, block_processor, coin): def init(cls, block_processor, daemon, coin, add_job):
cls.BLOCK_PROCESSOR = block_processor cls.BLOCK_PROCESSOR = block_processor
cls.DAEMON = daemon
cls.COIN = coin cls.COIN = coin
cls.ADD_JOB = add_job
@classmethod @classmethod
def height(cls): def height(cls):
@ -196,6 +192,37 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
'''Used as response to a headers subscription request.''' '''Used as response to a headers subscription request.'''
return cls.electrum_header(cls.height()) return cls.electrum_header(cls.height())
class ElectrumX(JSONRPC):
'''A TCP server that handles incoming Electrum connections.'''
def __init__(self, env):
super().__init__()
self.env = env
self.hash168s = set()
self.subscribe_headers = False
self.subscribe_height = False
self.notified_height = None
rpcs = [
('blockchain',
'address.get_balance address.get_history address.get_mempool '
'address.get_proof address.listunspent address.subscribe '
'block.get_header block.get_chunk estimatefee headers.subscribe '
'numblocks.subscribe relayfee transaction.broadcast '
'transaction.get transaction.get_merkle utxo.get_address'),
('server',
'banner donation_address peers.subscribe version'),
]
self.handlers = {'.'.join([prefix, suffix]):
getattr(self.__class__, suffix.replace('.', '_'))
for prefix, suffixes in rpcs
for suffix in suffixes.split()}
@classmethod
def watched_address_count(cls):
return sum(len(session.hash168s) for session in self.SESSIONS
if isinstance(session, cls))
@classmethod @classmethod
def notify(cls, height, touched): def notify(cls, height, touched):
'''Notify electrum clients about height changes and touched '''Notify electrum clients about height changes and touched
@ -220,49 +247,94 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
for hash168 in session.hash168s.intersection(touched): for hash168 in session.hash168s.intersection(touched):
address = hash168_to_address(hash168) address = hash168_to_address(hash168)
status = cls.address_status(hash168)
payload = json_notification('blockchain.address.subscribe', payload = json_notification('blockchain.address.subscribe',
(address, )) (address, status))
session.json_send(payload) session.json_send(payload)
@classmethod
def address_status(cls, hash168):
'''Returns status as 32 bytes.'''
history = cls.BLOCK_PROCESSOR.get_history(hash168)
status = ''.join('{}:{:d}:'.format(hash_to_str(tx_hash), height)
for tx_hash, height in history)
if status:
return sha256(status.encode()).hex()
return None
class ElectrumX(JSONRPC): @classmethod
'''A TCP server that handles incoming Electrum connections.''' async def tx_merkle(cls, tx_hash, height):
'''tx_hash is a hex string.'''
block_hash = await cls.DAEMON.send_single('getblockhash', (height,))
block = await cls.DAEMON.send_single('getblock', (block_hash, True))
tx_hashes = block['tx']
# This will throw if the tx_hash is bad
pos = tx_hashes.index(tx_hash)
idx = pos
hashes = [hex_str_to_hash(txh) for txh in tx_hashes]
merkle_branch = []
while len(hashes) > 1:
if len(hashes) & 1:
hashes.append(hashes[-1])
idx = idx - 1 if (idx & 1) else idx + 1
merkle_branch.append(hash_to_str(hashes[idx]))
idx //= 2
hashes = [double_sha256(hashes[n] + hashes[n + 1])
for n in range(0, len(hashes), 2)]
return {"block_height": height, "merkle": merkle_branch, "pos": pos}
def __init__(self, controller, daemon, env): @classmethod
super().__init__(controller) def irc_peers(cls):
self.daemon = daemon '''Returns a dictionary of IRC nick to (ip, host, ports) tuples, one
self.env = env per peer.'''
self.hash168s = set() return {}
rpcs = [(
'blockchain', @classmethod
'address.get_balance address.get_history address.get_mempool ' def height(cls):
'address.get_proof address.listunspent address.subscribe ' return cls.BLOCK_PROCESSOR.height
'block.get_header block.get_chunk estimatefee headers.subscribe '
'numblocks.subscribe relayfee transaction.broadcast ' @classmethod
'transaction.get transaction.get_merkle utxo.get_address'), def get_history(cls, hash168):
( history = cls.BLOCK_PROCESSOR.get_history(hash168, limit=None)
'server', return [
'banner donation_address peers.subscribe version'), {'tx_hash': hash_to_str(tx_hash), 'height': height}
for tx_hash, height in history
] ]
self.handlers = {'.'.join([prefix, suffix]):
getattr(self.__class__, suffix.replace('.', '_'))
for prefix, suffixes in rpcs
for suffix in suffixes.split()}
@classmethod @classmethod
def watched_address_count(cls): def get_chunk(cls, index):
return sum(len(session.hash168s) for session in self.SESSIONS '''Return header chunk as hex. Index is a non-negative integer.'''
if isinstance(session, cls)) chunk_size = cls.COIN.CHUNK_SIZE
next_height = cls.height() + 1
start_height = min(index * chunk_size, next_height)
count = min(next_height - start_height, chunk_size)
return cls.BLOCK_PROCESSOR.read_headers(start_height, count).hex()
@classmethod
def get_balance(cls, hash168):
confirmed = cls.BLOCK_PROCESSOR.get_balance(hash168)
unconfirmed = -1 # FIXME
return {'confirmed': confirmed, 'unconfirmed': unconfirmed}
@classmethod
def list_unspent(cls, hash168):
utxos = cls.BLOCK_PROCESSOR.get_utxos_sorted(hash168)
return tuple({'tx_hash': hash_to_str(utxo.tx_hash),
'tx_pos': utxo.tx_pos, 'height': utxo.height,
'value': utxo.value}
for utxo in utxos)
# --- blockchain commands # --- blockchain commands
async def address_get_balance(self, params): async def address_get_balance(self, params):
hash168 = self.extract_hash168(params) hash168 = self.extract_hash168(params)
return self.controller.get_balance(hash168) return self.get_balance(hash168)
async def address_get_history(self, params): async def address_get_history(self, params):
hash168 = self.extract_hash168(params) hash168 = self.extract_hash168(params)
return self.controller.get_history(hash168) return self.get_history(hash168)
async def address_get_mempool(self, params): async def address_get_mempool(self, params):
hash168 = self.extract_hash168(params) hash168 = self.extract_hash168(params)
@ -274,24 +346,23 @@ class ElectrumX(JSONRPC):
async def address_listunspent(self, params): async def address_listunspent(self, params):
hash168 = self.extract_hash168(params) hash168 = self.extract_hash168(params)
return self.controller.list_unspent(hash168) return self.list_unspent(hash168)
async def address_subscribe(self, params): async def address_subscribe(self, params):
hash168 = self.extract_hash168(params) hash168 = self.extract_hash168(params)
self.hash168s.add(hash168) self.hash168s.add(hash168)
status = self.controller.address_status(hash168) return self.address_status(hash168)
return status.hex() if status else None
async def block_get_chunk(self, params): async def block_get_chunk(self, params):
index = self.extract_non_negative_integer(params) index = self.extract_non_negative_integer(params)
return self.controller.get_chunk(index) return self.get_chunk(index)
async def block_get_header(self, params): async def block_get_header(self, params):
height = self.extract_non_negative_integer(params) height = self.extract_non_negative_integer(params)
return self.electrum_header(height) return self.electrum_header(height)
async def estimatefee(self, params): async def estimatefee(self, params):
return await self.daemon.estimatefee(params) return await self.DAEMON.estimatefee(params)
async def headers_subscribe(self, params): async def headers_subscribe(self, params):
self.require_empty_params(params) self.require_empty_params(params)
@ -307,7 +378,7 @@ class ElectrumX(JSONRPC):
'''The minimum fee a low-priority tx must pay in order to be accepted '''The minimum fee a low-priority tx must pay in order to be accepted
to the daemon's memory pool.''' to the daemon's memory pool.'''
self.require_empty_params(params) self.require_empty_params(params)
return await self.daemon.relayfee() return await self.DAEMON.relayfee()
async def transaction_broadcast(self, params): async def transaction_broadcast(self, params):
'''Pass through the parameters to the daemon. '''Pass through the parameters to the daemon.
@ -318,7 +389,7 @@ class ElectrumX(JSONRPC):
user interface job here. user interface job here.
''' '''
try: try:
tx_hash = await self.daemon.sendrawtransaction(params) tx_hash = await self.DAEMON.sendrawtransaction(params)
self.logger.info('sent tx: {}'.format(tx_hash)) self.logger.info('sent tx: {}'.format(tx_hash))
return tx_hash return tx_hash
except DaemonError as e: except DaemonError as e:
@ -344,7 +415,7 @@ class ElectrumX(JSONRPC):
# in anticipation it might be dropped in the future. # in anticipation it might be dropped in the future.
if 1 <= len(params) <= 2: if 1 <= len(params) <= 2:
tx_hash = self.tx_hash_from_param(params[0]) tx_hash = self.tx_hash_from_param(params[0])
return await self.daemon.getrawtransaction(tx_hash) return await self.DAEMON.getrawtransaction(tx_hash)
raise RPCError('params wrong length: {}'.format(params)) raise RPCError('params wrong length: {}'.format(params))
@ -352,7 +423,7 @@ class ElectrumX(JSONRPC):
if len(params) == 2: if len(params) == 2:
tx_hash = self.tx_hash_from_param(params[0]) tx_hash = self.tx_hash_from_param(params[0])
height = self.non_negative_integer_from_param(params[1]) height = self.non_negative_integer_from_param(params[1])
return await self.controller.get_merkle(tx_hash, height) return await self.tx_merkle(tx_hash, height)
raise RPCError('params should contain a transaction hash and height') raise RPCError('params should contain a transaction hash and height')
@ -398,7 +469,7 @@ class ElectrumX(JSONRPC):
subscription. subscription.
''' '''
self.require_empty_params(params) self.require_empty_params(params)
peers = self.controller.get_peers() peers = ElectrumX.irc_peers()
return tuple(peers.values()) return tuple(peers.values())
async def version(self, params): async def version(self, params):
@ -417,7 +488,7 @@ class LocalRPC(JSONRPC):
async def getinfo(self, params): async def getinfo(self, params):
return { return {
'blocks': self.height(), 'blocks': self.height(),
'peers': len(self.controller.get_peers()), 'peers': len(ElectrumX.irc_peers()),
'sessions': len(self.SESSIONS), 'sessions': len(self.SESSIONS),
'watched': ElectrumX.watched_address_count(), 'watched': ElectrumX.watched_address_count(),
'cached': 0, 'cached': 0,
@ -430,7 +501,7 @@ class LocalRPC(JSONRPC):
return len(self.SESSIONS) return len(self.SESSIONS)
async def peers(self, params): async def peers(self, params):
return tuple(self.controller.get_peers().keys()) return tuple(ElectrumX.irc_peers().keys())
async def numpeers(self, params): async def numpeers(self, params):
return len(self.controller.get_peers()) return len(ElectrumX.irc_peers())

Loading…
Cancel
Save