diff --git a/docs/RELEASE-NOTES b/docs/RELEASE-NOTES index 444f9e4..94e59be 100644 --- a/docs/RELEASE-NOTES +++ b/docs/RELEASE-NOTES @@ -1,3 +1,12 @@ +version 0.6.2 +------------- + +- handle daemon errors properly that result from client requests; pass the + error onto the client +- start serving immediatley on catchup; don't wait for the mempool +- logging improvements, in particular logging software and DB versions +- issues closed: #29, #31, #32 + version 0.6.1 ------------- diff --git a/server/block_processor.py b/server/block_processor.py index f792e36..55a28f0 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -19,8 +19,8 @@ from collections import defaultdict from functools import partial from server.daemon import Daemon, DaemonError +from server.version import VERSION from lib.hash import hash_to_str -from lib.tx import Deserializer from lib.util import chunks, LoggedClass import server.db from server.storage import open_db @@ -151,170 +151,6 @@ class ChainReorg(Exception): '''Raised on a blockchain reorganisation.''' -class MemPool(LoggedClass): - '''Representation of the daemon's mempool. - - Updated regularly in caught-up state. Goal is to enable efficient - response to the value() and transactions() calls. - - To that end we maintain the following maps: - - tx_hash -> [txin_pairs, txout_pairs, unconfirmed] - hash168 -> set of all tx hashes in which the hash168 appears - - A pair is a (hash168, value) tuple. Unconfirmed is true if any of the - tx's txins are unconfirmed. tx hashes are hex strings. - ''' - - def __init__(self, bp): - super().__init__() - self.txs = {} - self.hash168s = defaultdict(set) # None can be a key - self.bp = bp - self.count = -1 - - async def update(self, hex_hashes): - '''Update state given the current mempool to the passed set of hashes. - - Remove transactions that are no longer in our mempool. - Request new transactions we don't have then add to our mempool. - ''' - hex_hashes = set(hex_hashes) - touched = set() - missing_utxos = [] - - initial = self.count < 0 - if initial: - self.logger.info('beginning import of {:,d} mempool txs' - .format(len(hex_hashes))) - - # Remove gone items - gone = set(self.txs).difference(hex_hashes) - for hex_hash in gone: - txin_pairs, txout_pairs, unconfirmed = self.txs.pop(hex_hash) - hash168s = set(hash168 for hash168, value in txin_pairs) - hash168s.update(hash168 for hash168, value in txout_pairs) - for hash168 in hash168s: - self.hash168s[hash168].remove(hex_hash) - if not self.hash168s[hash168]: - del self.hash168s[hash168] - touched.update(hash168s) - - # Get the raw transactions for the new hashes. Ignore the - # ones the daemon no longer has (it will return None). Put - # them into a dictionary of hex hash to deserialized tx. - hex_hashes.difference_update(self.txs) - raw_txs = await self.bp.daemon.getrawtransactions(hex_hashes) - if initial: - self.logger.info('analysing {:,d} mempool txs' - .format(len(raw_txs))) - new_txs = {hex_hash: Deserializer(raw_tx).read_tx() - for hex_hash, raw_tx in zip(hex_hashes, raw_txs) if raw_tx} - del raw_txs, hex_hashes - - # The mempool is unordered, so process all outputs first so - # that looking for inputs has full info. - script_hash168 = self.bp.coin.hash168_from_script() - db_utxo_lookup = self.bp.db_utxo_lookup - - def txout_pair(txout): - return (script_hash168(txout.pk_script), txout.value) - - for n, (hex_hash, tx) in enumerate(new_txs.items()): - # Yield to process e.g. signals - if n % 100 == 0: - await asyncio.sleep(0) - txout_pairs = [txout_pair(txout) for txout in tx.outputs] - self.txs[hex_hash] = (None, txout_pairs, None) - - def txin_info(txin): - hex_hash = hash_to_str(txin.prev_hash) - mempool_entry = self.txs.get(hex_hash) - if mempool_entry: - return mempool_entry[1][txin.prev_idx], True - pair = db_utxo_lookup(txin.prev_hash, txin.prev_idx) - return pair, False - - if initial: - next_log = time.time() - self.logger.info('processed outputs, now examining inputs. ' - 'This can take some time...') - - # Now add the inputs - for n, (hex_hash, tx) in enumerate(new_txs.items()): - # Yield to process e.g. signals - if n % 10 == 0: - await asyncio.sleep(0) - - if initial and time.time() > next_log: - next_log = time.time() + 20 - self.logger.info('{:,d} done ({:d}%)' - .format(n, int(n / len(new_txs) * 100))) - - txout_pairs = self.txs[hex_hash][1] - try: - infos = (txin_info(txin) for txin in tx.inputs) - txin_pairs, unconfs = zip(*infos) - except self.bp.MissingUTXOError: - # Drop this TX. If other mempool txs depend on it - # it's harmless - next time the mempool is refreshed - # they'll either be cleaned up or the UTXOs will no - # longer be missing. - del self.txs[hex_hash] - continue - self.txs[hex_hash] = (txin_pairs, txout_pairs, any(unconfs)) - - # Update touched and self.hash168s for the new tx - for hash168, value in txin_pairs: - self.hash168s[hash168].add(hex_hash) - touched.add(hash168) - for hash168, value in txout_pairs: - self.hash168s[hash168].add(hex_hash) - touched.add(hash168) - - if missing_utxos: - self.logger.info('{:,d} txs had missing UTXOs; probably the ' - 'daemon is a block or two ahead of us.' - .format(len(missing_utxos))) - first = ', '.join('{} / {:,d}'.format(hash_to_str(txin.prev_hash), - txin.prev_idx) - for txin in sorted(missing_utxos)[:3]) - self.logger.info('first ones are {}'.format(first)) - - self.count += 1 - if self.count % 25 == 0 or gone: - self.count = 0 - self.logger.info('{:,d} txs touching {:,d} addresses' - .format(len(self.txs), len(self.hash168s))) - - # Might include a None - return touched - - def transactions(self, hash168): - '''Generate (hex_hash, tx_fee, unconfirmed) tuples for mempool - entries for the hash168. - - unconfirmed is True if any txin is unconfirmed. - ''' - for hex_hash in self.hash168s[hash168]: - txin_pairs, txout_pairs, unconfirmed = self.txs[hex_hash] - tx_fee = (sum(v for hash168, v in txin_pairs) - - sum(v for hash168, v in txout_pairs)) - yield (hex_hash, tx_fee, unconfirmed) - - def value(self, hash168): - '''Return the unconfirmed amount in the mempool for hash168. - - Can be positive or negative. - ''' - value = 0 - for hex_hash in self.hash168s[hash168]: - txin_pairs, txout_pairs, unconfirmed = self.txs[hex_hash] - value -= sum(v for h168, v in txin_pairs if h168 == hash168) - value += sum(v for h168, v in txout_pairs if h168 == hash168) - return value - - class BlockProcessor(server.db.DB): '''Process blocks and update the DB state to match. @@ -334,7 +170,6 @@ class BlockProcessor(server.db.DB): self.daemon = Daemon(env.daemon_url, env.debug) self.daemon.debug_set_height(self.height) - self.mempool = MemPool(self) self.touched = set() self.futures = [] @@ -361,15 +196,9 @@ class BlockProcessor(server.db.DB): self.db_deletes = [] # Log state - self.logger.info('coin: {}'.format(self.coin.NAME)) - self.logger.info('network: {}'.format(self.coin.NET)) - self.logger.info('height: {:,d}'.format(self.db_height)) - self.logger.info('tx count: {:,d}'.format(self.db_tx_count)) self.logger.info('reorg limit is {:,d} blocks' .format(self.reorg_limit)) if self.first_sync: - self.logger.info('sync time so far: {}' - .format(formatted_time(self.wall_time))) self.logger.info('flushing UTXO cache at {:,d} MB' .format(self.utxo_MB)) self.logger.info('flushing history cache at {:,d} MB' @@ -428,11 +257,11 @@ class BlockProcessor(server.db.DB): '''Called after each deamon poll if caught up.''' # Caught up to daemon height. Flush everything as queries # are performed on the DB and not in-memory. - self.flush(True) if self.first_sync: self.first_sync = False - self.logger.info('synced to height {:,d}'.format(self.height)) - self.touched.update(await self.mempool.update(mempool_hashes)) + self.logger.info('{} synced to height {:,d}. DB version:' + .format(VERSION, self.height, self.db_version)) + self.flush(True) async def handle_chain_reorg(self): # First get all state on disk @@ -1029,18 +858,3 @@ class BlockProcessor(server.db.DB): tx_hash = tx_hashes[tx_num - self.tx_counts[tx_height - 1]] return tx_hash, tx_height - - def mempool_transactions(self, hash168): - '''Generate (hex_hash, tx_fee, unconfirmed) tuples for mempool - entries for the hash168. - - unconfirmed is True if any txin is unconfirmed. - ''' - return self.mempool.transactions(hash168) - - def mempool_value(self, hash168): - '''Return the unconfirmed amount in the mempool for hash168. - - Can be positive or negative. - ''' - return self.mempool.value(hash168) diff --git a/server/db.py b/server/db.py index 2427b99..c44b17c 100644 --- a/server/db.py +++ b/server/db.py @@ -18,6 +18,7 @@ from collections import namedtuple from lib.util import chunks, LoggedClass from lib.hash import double_sha256, hash_to_str from server.storage import open_db +from server.version import VERSION UTXO = namedtuple("UTXO", "tx_num tx_pos tx_hash height value") @@ -29,7 +30,7 @@ class DB(LoggedClass): it was shutdown uncleanly. ''' - VERSIONS = [3] + DB_VERSIONS = [3] class MissingUTXOError(Exception): '''Raised if a mempool tx input UTXO couldn't be found.''' @@ -77,6 +78,7 @@ class DB(LoggedClass): self.db_height = -1 self.db_tx_count = 0 self.db_tip = b'\0' * 32 + self.db_version = max(self.DB_VERSIONS) self.flush_count = 0 self.utxo_flush_count = 0 self.wall_time = 0 @@ -87,11 +89,11 @@ class DB(LoggedClass): state = ast.literal_eval(state.decode()) if not isinstance(state, dict): raise self.DBError('failed reading state from DB') - db_version = state.get('db_version', 0) - if db_version not in self.VERSIONS: + self.db_version = state['db_version'] + if self.db_version not in self.DB_VERSIONS: raise self.DBError('your DB version is {} but this software ' 'only handles versions {}' - .format(db_version, self.VERSIONS)) + .format(db_version, self.DB_VERSIONS)) if state['genesis'] != self.coin.GENESIS_HASH: raise self.DBError('DB genesis hash {} does not match coin {}' .format(state['genesis_hash'], @@ -104,6 +106,18 @@ class DB(LoggedClass): self.wall_time = state['wall_time'] self.first_sync = state['first_sync'] + self.logger.info('software version: {}'.format(VERSION)) + self.logger.info('DB version: {:d}'.format(self.db_version)) + self.logger.info('coin: {}'.format(self.coin.NAME)) + self.logger.info('network: {}'.format(self.coin.NET)) + self.logger.info('height: {:,d}'.format(self.db_height)) + self.logger.info('tip: {}'.format(hash_to_str(self.db_tip))) + self.logger.info('tx count: {:,d}'.format(self.db_tx_count)) + if self.first_sync: + self.logger.info('sync time so far: {}' + .format(formatted_time(self.wall_time))) + + def write_state(self, batch): '''Write chain state to the batch.''' state = { @@ -115,7 +129,7 @@ class DB(LoggedClass): 'utxo_flush_count': self.utxo_flush_count, 'wall_time': self.wall_time, 'first_sync': self.first_sync, - 'db_version': max(self.VERSIONS), + 'db_version': self.db_version, } batch.put(b'state', repr(state).encode()) diff --git a/server/protocol.py b/server/protocol.py index e21fd45..c0f062e 100644 --- a/server/protocol.py +++ b/server/protocol.py @@ -14,11 +14,12 @@ import json import ssl import time import traceback -from collections import namedtuple +from collections import defaultdict, namedtuple from functools import partial from lib.hash import sha256, double_sha256, hash_to_str, hex_str_to_hash from lib.jsonrpc import JSONRPC, json_notification_payload +from lib.tx import Deserializer from lib.util import LoggedClass from server.block_processor import BlockProcessor from server.daemon import DaemonError @@ -27,19 +28,25 @@ from server.version import VERSION class BlockServer(BlockProcessor): - '''Like BlockProcessor but also has a server manager and starts - servers when caught up.''' + '''Like BlockProcessor but also has a mempool and a server manager. + + Servers are started immediately the block processor first catches + up with the daemon. + ''' def __init__(self, env): super().__init__(env) self.server_mgr = ServerManager(self, env) - self.bs_caught_up = False + self.mempool = MemPool(self) + self.caught_up_yet = False async def caught_up(self, mempool_hashes): + # Call the base class to flush before doing anything else. await super().caught_up(mempool_hashes) - if not self.bs_caught_up: + if not self.caught_up_yet: await self.server_mgr.start_servers() - self.bs_caught_up = True + self.caught_up_yet = True + self.touched.update(await self.mempool.update(mempool_hashes)) self.server_mgr.notify(self.height, self.touched) def on_cancel(self): @@ -47,6 +54,185 @@ class BlockServer(BlockProcessor): self.server_mgr.stop() super().on_cancel() + def mempool_transactions(self, hash168): + '''Generate (hex_hash, tx_fee, unconfirmed) tuples for mempool + entries for the hash168. + + unconfirmed is True if any txin is unconfirmed. + ''' + return self.mempool.transactions(hash168) + + def mempool_value(self, hash168): + '''Return the unconfirmed amount in the mempool for hash168. + + Can be positive or negative. + ''' + return self.mempool.value(hash168) + + +class MemPool(LoggedClass): + '''Representation of the daemon's mempool. + + Updated regularly in caught-up state. Goal is to enable efficient + response to the value() and transactions() calls. + + To that end we maintain the following maps: + + tx_hash -> [txin_pairs, txout_pairs, unconfirmed] + hash168 -> set of all tx hashes in which the hash168 appears + + A pair is a (hash168, value) tuple. Unconfirmed is true if any of the + tx's txins are unconfirmed. tx hashes are hex strings. + ''' + + def __init__(self, bp): + super().__init__() + self.txs = {} + self.hash168s = defaultdict(set) # None can be a key + self.bp = bp + self.count = -1 + + async def update(self, hex_hashes): + '''Update state given the current mempool to the passed set of hashes. + + Remove transactions that are no longer in our mempool. + Request new transactions we don't have then add to our mempool. + ''' + hex_hashes = set(hex_hashes) + touched = set() + missing_utxos = [] + + initial = self.count < 0 + if initial: + self.logger.info('beginning import of {:,d} mempool txs' + .format(len(hex_hashes))) + + # Remove gone items + gone = set(self.txs).difference(hex_hashes) + for hex_hash in gone: + txin_pairs, txout_pairs, unconfirmed = self.txs.pop(hex_hash) + hash168s = set(hash168 for hash168, value in txin_pairs) + hash168s.update(hash168 for hash168, value in txout_pairs) + for hash168 in hash168s: + self.hash168s[hash168].remove(hex_hash) + if not self.hash168s[hash168]: + del self.hash168s[hash168] + touched.update(hash168s) + + # Get the raw transactions for the new hashes. Ignore the + # ones the daemon no longer has (it will return None). Put + # them into a dictionary of hex hash to deserialized tx. + hex_hashes.difference_update(self.txs) + raw_txs = await self.bp.daemon.getrawtransactions(hex_hashes) + if initial: + self.logger.info('analysing {:,d} mempool txs' + .format(len(raw_txs))) + new_txs = {hex_hash: Deserializer(raw_tx).read_tx() + for hex_hash, raw_tx in zip(hex_hashes, raw_txs) if raw_tx} + del raw_txs, hex_hashes + + # The mempool is unordered, so process all outputs first so + # that looking for inputs has full info. + script_hash168 = self.bp.coin.hash168_from_script() + db_utxo_lookup = self.bp.db_utxo_lookup + + def txout_pair(txout): + return (script_hash168(txout.pk_script), txout.value) + + for n, (hex_hash, tx) in enumerate(new_txs.items()): + # Yield to process e.g. signals + if n % 100 == 0: + await asyncio.sleep(0) + txout_pairs = [txout_pair(txout) for txout in tx.outputs] + self.txs[hex_hash] = (None, txout_pairs, None) + + def txin_info(txin): + hex_hash = hash_to_str(txin.prev_hash) + mempool_entry = self.txs.get(hex_hash) + if mempool_entry: + return mempool_entry[1][txin.prev_idx], True + pair = db_utxo_lookup(txin.prev_hash, txin.prev_idx) + return pair, False + + if initial: + next_log = time.time() + self.logger.info('processed outputs, now examining inputs. ' + 'This can take some time...') + + # Now add the inputs + for n, (hex_hash, tx) in enumerate(new_txs.items()): + # Yield to process e.g. signals + if n % 10 == 0: + await asyncio.sleep(0) + + if initial and time.time() > next_log: + next_log = time.time() + 20 + self.logger.info('{:,d} done ({:d}%)' + .format(n, int(n / len(new_txs) * 100))) + + txout_pairs = self.txs[hex_hash][1] + try: + infos = (txin_info(txin) for txin in tx.inputs) + txin_pairs, unconfs = zip(*infos) + except self.bp.MissingUTXOError: + # Drop this TX. If other mempool txs depend on it + # it's harmless - next time the mempool is refreshed + # they'll either be cleaned up or the UTXOs will no + # longer be missing. + del self.txs[hex_hash] + continue + self.txs[hex_hash] = (txin_pairs, txout_pairs, any(unconfs)) + + # Update touched and self.hash168s for the new tx + for hash168, value in txin_pairs: + self.hash168s[hash168].add(hex_hash) + touched.add(hash168) + for hash168, value in txout_pairs: + self.hash168s[hash168].add(hex_hash) + touched.add(hash168) + + if missing_utxos: + self.logger.info('{:,d} txs had missing UTXOs; probably the ' + 'daemon is a block or two ahead of us.' + .format(len(missing_utxos))) + first = ', '.join('{} / {:,d}'.format(hash_to_str(txin.prev_hash), + txin.prev_idx) + for txin in sorted(missing_utxos)[:3]) + self.logger.info('first ones are {}'.format(first)) + + self.count += 1 + if self.count % 25 == 0 or gone: + self.count = 0 + self.logger.info('{:,d} txs touching {:,d} addresses' + .format(len(self.txs), len(self.hash168s))) + + # Might include a None + return touched + + def transactions(self, hash168): + '''Generate (hex_hash, tx_fee, unconfirmed) tuples for mempool + entries for the hash168. + + unconfirmed is True if any txin is unconfirmed. + ''' + for hex_hash in self.hash168s[hash168]: + txin_pairs, txout_pairs, unconfirmed = self.txs[hex_hash] + tx_fee = (sum(v for hash168, v in txin_pairs) + - sum(v for hash168, v in txout_pairs)) + yield (hex_hash, tx_fee, unconfirmed) + + def value(self, hash168): + '''Return the unconfirmed amount in the mempool for hash168. + + Can be positive or negative. + ''' + value = 0 + for hex_hash in self.hash168s[hash168]: + txin_pairs, txout_pairs, unconfirmed = self.txs[hex_hash] + value -= sum(v for h168, v in txin_pairs if h168 == hash168) + value += sum(v for h168, v in txout_pairs if h168 == hash168) + return value + class ServerManager(LoggedClass): '''Manages the servers.''' @@ -163,8 +349,8 @@ class ServerManager(LoggedClass): now = time.time() return [(session.kind, session.peername(for_log=False), - len(session.hash168s), - 'RPC' if isinstance(session, LocalRPC) else session.client, + session.sub_count(), + session.client, session.recv_count, session.recv_size, session.send_count, session.send_size, session.error_count, @@ -197,9 +383,7 @@ class Session(JSONRPC): self.daemon = bp.daemon self.coin = bp.coin self.kind = kind - self.hash168s = set() self.jobs = asyncio.Queue() - self.current_task = None self.client = 'unknown' def connection_made(self, transport): @@ -251,6 +435,16 @@ class Session(JSONRPC): return 'xx.xx.xx.xx:xx' return '{}:{}'.format(self.peer_info[0], self.peer_info[1]) + def sub_count(self): + return 0 + + async def daemon_request(self, method, *args): + '''Catch a DaemonError and convert it to an RPCError.''' + try: + return await getattr(self.daemon, method)(*args) + except DaemonError as e: + raise RPCError('daemon error: {}'.format(e)) + def tx_hash_from_param(self, param): '''Raise an RPCError if the parameter is not a valid transaction hash.''' @@ -309,6 +503,7 @@ class ElectrumX(Session): self.subscribe_headers = False self.subscribe_height = False self.notified_height = None + self.hash168s = set() rpcs = [ ('blockchain', 'address.get_balance address.get_history address.get_mempool ' @@ -324,6 +519,9 @@ class ElectrumX(Session): for prefix, suffixes in rpcs for suffix in suffixes.split()} + def sub_count(self): + return len(self.hash168s) + async def notify(self, height, touched, cache): '''Notify the client about changes in height and touched addresses. @@ -392,8 +590,8 @@ class ElectrumX(Session): async def tx_merkle(self, tx_hash, height): '''tx_hash is a hex string.''' - hex_hashes = await self.daemon.block_hex_hashes(height, 1) - block = await self.daemon.deserialised_block(hex_hashes[0]) + hex_hashes = await self.daemon_request('block_hex_hashes', height, 1) + block = await self.daemon_request('deserialised_block', hex_hashes[0]) tx_hashes = block['tx'] try: pos = tx_hashes.index(tx_hash) @@ -503,7 +701,7 @@ class ElectrumX(Session): return self.electrum_header(height) async def estimatefee(self, params): - return await self.daemon.estimatefee(params) + return await self.daemon_request('estimatefee', params) async def headers_subscribe(self, params): self.require_empty_params(params) @@ -519,7 +717,7 @@ class ElectrumX(Session): '''The minimum fee a low-priority tx must pay in order to be accepted to the daemon's memory pool.''' self.require_empty_params(params) - return await self.daemon.relayfee() + return await self.daemon_request('relayfee') async def transaction_broadcast(self, params): '''Pass through the parameters to the daemon. @@ -555,7 +753,7 @@ class ElectrumX(Session): # in anticipation it might be dropped in the future. if 1 <= len(params) <= 2: tx_hash = self.tx_hash_from_param(params[0]) - return await self.daemon.getrawtransaction(tx_hash) + return await self.daemon_request('getrawtransaction', tx_hash) raise self.RPCError('params wrong length: {}'.format(params)) @@ -629,3 +827,4 @@ class LocalRPC(Session): cmds = 'getinfo sessions numsessions peers numpeers'.split() self.handlers = {cmd: getattr(self.manager, 'rpc_{}'.format(cmd)) for cmd in cmds} + self.client = 'RPC' diff --git a/server/version.py b/server/version.py index fb32e04..9cd7c85 100644 --- a/server/version.py +++ b/server/version.py @@ -1 +1 @@ -VERSION = "ElectrumX 0.6.1" +VERSION = "ElectrumX 0.6.2"