From fb43712869accdd044187775c7a67bdd950f3cb9 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Thu, 3 Nov 2016 14:03:19 +0900 Subject: [PATCH] Controller clean up --- server/controller.py | 80 +------------------- server/protocol.py | 171 ++++++++++++++++++++++++++++++------------- 2 files changed, 125 insertions(+), 126 deletions(-) diff --git a/server/controller.py b/server/controller.py index 7519a17..cbbfaf1 100644 --- a/server/controller.py +++ b/server/controller.py @@ -20,7 +20,6 @@ from functools import partial from server.daemon import Daemon from server.block_processor import BlockProcessor from server.protocol import ElectrumX, LocalRPC, JSONRPC -from lib.hash import sha256, double_sha256, hash_to_str, hex_str_to_hash from lib.util import LoggedClass @@ -38,11 +37,10 @@ class Controller(LoggedClass): self.daemon = Daemon(env.daemon_url) self.block_processor = BlockProcessor(env, self.daemon, on_update=self.on_update) - JSONRPC.init(self.block_processor, self.coin) + JSONRPC.init(self.block_processor, self.daemon, self.coin, + self.add_job) self.servers = [] - self.addresses = {} self.jobs = asyncio.Queue() - self.peers = {} def start(self): '''Prime the event loop with asynchronous jobs.''' @@ -72,7 +70,7 @@ class Controller(LoggedClass): env = self.env loop = self.loop - protocol = partial(LocalRPC, self) + protocol = LocalRPC if env.rpc_port is not None: host = 'localhost' rpc_server = loop.create_server(protocol, host, env.rpc_port) @@ -80,7 +78,7 @@ class Controller(LoggedClass): self.logger.info('RPC server listening on {}:{:d}' .format(host, env.rpc_port)) - protocol = partial(ElectrumX, self, self.daemon, env) + protocol = partial(ElectrumX, env) if env.tcp_port is not None: tcp_server = loop.create_server(protocol, env.host, env.tcp_port) servers.append(await tcp_server) @@ -127,73 +125,3 @@ class Controller(LoggedClass): except Exception: # Getting here should probably be considered a bug and fixed traceback.print_exc() - - def address_status(self, hash168): - '''Returns status as 32 bytes.''' - status = self.addresses.get(hash168) - if status is None: - history = self.block_processor.get_history(hash168) - status = ''.join('{}:{:d}:'.format(hash_to_str(tx_hash), height) - for tx_hash, height in history) - if status: - status = sha256(status.encode()) - self.addresses[hash168] = status - - return status - - async def get_merkle(self, tx_hash, height): - '''tx_hash is a hex string.''' - block_hash = await self.daemon.send_single('getblockhash', (height,)) - block = await self.daemon.send_single('getblock', (block_hash, True)) - tx_hashes = block['tx'] - # This will throw if the tx_hash is bad - pos = tx_hashes.index(tx_hash) - - idx = pos - hashes = [hex_str_to_hash(txh) for txh in tx_hashes] - merkle_branch = [] - while len(hashes) > 1: - if len(hashes) & 1: - hashes.append(hashes[-1]) - idx = idx - 1 if (idx & 1) else idx + 1 - merkle_branch.append(hash_to_str(hashes[idx])) - idx //= 2 - hashes = [double_sha256(hashes[n] + hashes[n + 1]) - for n in range(0, len(hashes), 2)] - - return {"block_height": height, "merkle": merkle_branch, "pos": pos} - - def get_peers(self): - '''Returns a dictionary of IRC nick to (ip, host, ports) tuples, one - per peer.''' - return self.peers - - def height(self): - return self.block_processor.height - - def get_history(self, hash168): - history = self.block_processor.get_history(hash168, limit=None) - return [ - {'tx_hash': hash_to_str(tx_hash), 'height': height} - for tx_hash, height in history - ] - - def get_chunk(self, index): - '''Return header chunk as hex. Index is a non-negative integer.''' - chunk_size = self.coin.CHUNK_SIZE - next_height = self.height() + 1 - start_height = min(index * chunk_size, next_height) - count = min(next_height - start_height, chunk_size) - return self.block_processor.read_headers(start_height, count).hex() - - def get_balance(self, hash168): - confirmed = self.block_processor.get_balance(hash168) - unconfirmed = -1 # FIXME - return {'confirmed': confirmed, 'unconfirmed': unconfirmed} - - def list_unspent(self, hash168): - utxos = self.block_processor.get_utxos_sorted(hash168) - return tuple({'tx_hash': hash_to_str(utxo.tx_hash), - 'tx_pos': utxo.tx_pos, 'height': utxo.height, - 'value': utxo.value} - for utxo in utxos) diff --git a/server/protocol.py b/server/protocol.py index 7eef090..23f8333 100644 --- a/server/protocol.py +++ b/server/protocol.py @@ -16,7 +16,7 @@ import traceback from functools import partial from server.daemon import DaemonError -from lib.hash import hex_str_to_hash +from lib.hash import sha256, double_sha256, hash_to_str, hex_str_to_hash from lib.util import LoggedClass from server.version import VERSION @@ -33,19 +33,13 @@ def json_notification(method, params): class JSONRPC(asyncio.Protocol, LoggedClass): '''Base class that manages a JSONRPC connection.''' SESSIONS = set() - BLOCK_PROCESSOR = None - COIN = None - def __init__(self, controller): + def __init__(self): super().__init__() - self.controller = controller self.parts = [] self.send_count = 0 self.send_size = 0 self.error_count = 0 - self.subscribe_headers = False - self.subscribe_height = False - self.notified_height = None def connection_made(self, transport): '''Handle an incoming client connection.''' @@ -85,7 +79,7 @@ class JSONRPC(asyncio.Protocol, LoggedClass): except Exception as e: self.logger.info('error decoding JSON message'.format(e)) else: - self.controller.add_job(self.request_handler(message)) + self.ADD_JOB(self.request_handler(message)) async def request_handler(self, request): '''Called asynchronously.''' @@ -174,9 +168,11 @@ class JSONRPC(asyncio.Protocol, LoggedClass): raise RPCError('params should be empty: {}'.format(params)) @classmethod - def init(cls, block_processor, coin): + def init(cls, block_processor, daemon, coin, add_job): cls.BLOCK_PROCESSOR = block_processor + cls.DAEMON = daemon cls.COIN = coin + cls.ADD_JOB = add_job @classmethod def height(cls): @@ -196,6 +192,37 @@ class JSONRPC(asyncio.Protocol, LoggedClass): '''Used as response to a headers subscription request.''' return cls.electrum_header(cls.height()) + +class ElectrumX(JSONRPC): + '''A TCP server that handles incoming Electrum connections.''' + + def __init__(self, env): + super().__init__() + self.env = env + self.hash168s = set() + self.subscribe_headers = False + self.subscribe_height = False + self.notified_height = None + rpcs = [ + ('blockchain', + 'address.get_balance address.get_history address.get_mempool ' + 'address.get_proof address.listunspent address.subscribe ' + 'block.get_header block.get_chunk estimatefee headers.subscribe ' + 'numblocks.subscribe relayfee transaction.broadcast ' + 'transaction.get transaction.get_merkle utxo.get_address'), + ('server', + 'banner donation_address peers.subscribe version'), + ] + self.handlers = {'.'.join([prefix, suffix]): + getattr(self.__class__, suffix.replace('.', '_')) + for prefix, suffixes in rpcs + for suffix in suffixes.split()} + + @classmethod + def watched_address_count(cls): + return sum(len(session.hash168s) for session in self.SESSIONS + if isinstance(session, cls)) + @classmethod def notify(cls, height, touched): '''Notify electrum clients about height changes and touched @@ -220,49 +247,94 @@ class JSONRPC(asyncio.Protocol, LoggedClass): for hash168 in session.hash168s.intersection(touched): address = hash168_to_address(hash168) + status = cls.address_status(hash168) payload = json_notification('blockchain.address.subscribe', - (address, )) + (address, status)) session.json_send(payload) + @classmethod + def address_status(cls, hash168): + '''Returns status as 32 bytes.''' + history = cls.BLOCK_PROCESSOR.get_history(hash168) + status = ''.join('{}:{:d}:'.format(hash_to_str(tx_hash), height) + for tx_hash, height in history) + if status: + return sha256(status.encode()).hex() + return None -class ElectrumX(JSONRPC): - '''A TCP server that handles incoming Electrum connections.''' + @classmethod + async def tx_merkle(cls, tx_hash, height): + '''tx_hash is a hex string.''' + block_hash = await cls.DAEMON.send_single('getblockhash', (height,)) + block = await cls.DAEMON.send_single('getblock', (block_hash, True)) + tx_hashes = block['tx'] + # This will throw if the tx_hash is bad + pos = tx_hashes.index(tx_hash) + + idx = pos + hashes = [hex_str_to_hash(txh) for txh in tx_hashes] + merkle_branch = [] + while len(hashes) > 1: + if len(hashes) & 1: + hashes.append(hashes[-1]) + idx = idx - 1 if (idx & 1) else idx + 1 + merkle_branch.append(hash_to_str(hashes[idx])) + idx //= 2 + hashes = [double_sha256(hashes[n] + hashes[n + 1]) + for n in range(0, len(hashes), 2)] + + return {"block_height": height, "merkle": merkle_branch, "pos": pos} - def __init__(self, controller, daemon, env): - super().__init__(controller) - self.daemon = daemon - self.env = env - self.hash168s = set() - rpcs = [( - 'blockchain', - 'address.get_balance address.get_history address.get_mempool ' - 'address.get_proof address.listunspent address.subscribe ' - 'block.get_header block.get_chunk estimatefee headers.subscribe ' - 'numblocks.subscribe relayfee transaction.broadcast ' - 'transaction.get transaction.get_merkle utxo.get_address'), - ( - 'server', - 'banner donation_address peers.subscribe version'), + @classmethod + def irc_peers(cls): + '''Returns a dictionary of IRC nick to (ip, host, ports) tuples, one + per peer.''' + return {} + + @classmethod + def height(cls): + return cls.BLOCK_PROCESSOR.height + + @classmethod + def get_history(cls, hash168): + history = cls.BLOCK_PROCESSOR.get_history(hash168, limit=None) + return [ + {'tx_hash': hash_to_str(tx_hash), 'height': height} + for tx_hash, height in history ] - self.handlers = {'.'.join([prefix, suffix]): - getattr(self.__class__, suffix.replace('.', '_')) - for prefix, suffixes in rpcs - for suffix in suffixes.split()} @classmethod - def watched_address_count(cls): - return sum(len(session.hash168s) for session in self.SESSIONS - if isinstance(session, cls)) + def get_chunk(cls, index): + '''Return header chunk as hex. Index is a non-negative integer.''' + chunk_size = cls.COIN.CHUNK_SIZE + next_height = cls.height() + 1 + start_height = min(index * chunk_size, next_height) + count = min(next_height - start_height, chunk_size) + return cls.BLOCK_PROCESSOR.read_headers(start_height, count).hex() + + @classmethod + def get_balance(cls, hash168): + confirmed = cls.BLOCK_PROCESSOR.get_balance(hash168) + unconfirmed = -1 # FIXME + return {'confirmed': confirmed, 'unconfirmed': unconfirmed} + + @classmethod + def list_unspent(cls, hash168): + utxos = cls.BLOCK_PROCESSOR.get_utxos_sorted(hash168) + return tuple({'tx_hash': hash_to_str(utxo.tx_hash), + 'tx_pos': utxo.tx_pos, 'height': utxo.height, + 'value': utxo.value} + for utxo in utxos) # --- blockchain commands async def address_get_balance(self, params): hash168 = self.extract_hash168(params) - return self.controller.get_balance(hash168) + return self.get_balance(hash168) async def address_get_history(self, params): hash168 = self.extract_hash168(params) - return self.controller.get_history(hash168) + return self.get_history(hash168) async def address_get_mempool(self, params): hash168 = self.extract_hash168(params) @@ -274,24 +346,23 @@ class ElectrumX(JSONRPC): async def address_listunspent(self, params): hash168 = self.extract_hash168(params) - return self.controller.list_unspent(hash168) + return self.list_unspent(hash168) async def address_subscribe(self, params): hash168 = self.extract_hash168(params) self.hash168s.add(hash168) - status = self.controller.address_status(hash168) - return status.hex() if status else None + return self.address_status(hash168) async def block_get_chunk(self, params): index = self.extract_non_negative_integer(params) - return self.controller.get_chunk(index) + return self.get_chunk(index) async def block_get_header(self, params): height = self.extract_non_negative_integer(params) return self.electrum_header(height) async def estimatefee(self, params): - return await self.daemon.estimatefee(params) + return await self.DAEMON.estimatefee(params) async def headers_subscribe(self, params): self.require_empty_params(params) @@ -307,7 +378,7 @@ class ElectrumX(JSONRPC): '''The minimum fee a low-priority tx must pay in order to be accepted to the daemon's memory pool.''' self.require_empty_params(params) - return await self.daemon.relayfee() + return await self.DAEMON.relayfee() async def transaction_broadcast(self, params): '''Pass through the parameters to the daemon. @@ -318,7 +389,7 @@ class ElectrumX(JSONRPC): user interface job here. ''' try: - tx_hash = await self.daemon.sendrawtransaction(params) + tx_hash = await self.DAEMON.sendrawtransaction(params) self.logger.info('sent tx: {}'.format(tx_hash)) return tx_hash except DaemonError as e: @@ -344,7 +415,7 @@ class ElectrumX(JSONRPC): # in anticipation it might be dropped in the future. if 1 <= len(params) <= 2: tx_hash = self.tx_hash_from_param(params[0]) - return await self.daemon.getrawtransaction(tx_hash) + return await self.DAEMON.getrawtransaction(tx_hash) raise RPCError('params wrong length: {}'.format(params)) @@ -352,7 +423,7 @@ class ElectrumX(JSONRPC): if len(params) == 2: tx_hash = self.tx_hash_from_param(params[0]) height = self.non_negative_integer_from_param(params[1]) - return await self.controller.get_merkle(tx_hash, height) + return await self.tx_merkle(tx_hash, height) raise RPCError('params should contain a transaction hash and height') @@ -398,7 +469,7 @@ class ElectrumX(JSONRPC): subscription. ''' self.require_empty_params(params) - peers = self.controller.get_peers() + peers = ElectrumX.irc_peers() return tuple(peers.values()) async def version(self, params): @@ -417,7 +488,7 @@ class LocalRPC(JSONRPC): async def getinfo(self, params): return { 'blocks': self.height(), - 'peers': len(self.controller.get_peers()), + 'peers': len(ElectrumX.irc_peers()), 'sessions': len(self.SESSIONS), 'watched': ElectrumX.watched_address_count(), 'cached': 0, @@ -430,7 +501,7 @@ class LocalRPC(JSONRPC): return len(self.SESSIONS) async def peers(self, params): - return tuple(self.controller.get_peers().keys()) + return tuple(ElectrumX.irc_peers().keys()) async def numpeers(self, params): - return len(self.controller.get_peers()) + return len(ElectrumX.irc_peers())