From 063a5469cc13dd18a48bddacf5e74ed2b1604311 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Sat, 19 Nov 2016 08:02:13 +0900 Subject: [PATCH 1/2] Move mempool to be with BlockServer BlockProcessor doesn't really need a mempool; mempools are only useful for servers. Set first_sync before flushing, so it goes into DB state. Start servers immediately on catchup; mempool fills asynchronously. Fixes #31 --- server/block_processor.py | 183 +---------------------------------- server/protocol.py | 195 +++++++++++++++++++++++++++++++++++++- 2 files changed, 191 insertions(+), 187 deletions(-) diff --git a/server/block_processor.py b/server/block_processor.py index 95a4fb8..60299ac 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -152,170 +152,6 @@ class ChainReorg(Exception): '''Raised on a blockchain reorganisation.''' -class MemPool(LoggedClass): - '''Representation of the daemon's mempool. - - Updated regularly in caught-up state. Goal is to enable efficient - response to the value() and transactions() calls. - - To that end we maintain the following maps: - - tx_hash -> [txin_pairs, txout_pairs, unconfirmed] - hash168 -> set of all tx hashes in which the hash168 appears - - A pair is a (hash168, value) tuple. Unconfirmed is true if any of the - tx's txins are unconfirmed. tx hashes are hex strings. - ''' - - def __init__(self, bp): - super().__init__() - self.txs = {} - self.hash168s = defaultdict(set) # None can be a key - self.bp = bp - self.count = -1 - - async def update(self, hex_hashes): - '''Update state given the current mempool to the passed set of hashes. - - Remove transactions that are no longer in our mempool. - Request new transactions we don't have then add to our mempool. - ''' - hex_hashes = set(hex_hashes) - touched = set() - missing_utxos = [] - - initial = self.count < 0 - if initial: - self.logger.info('beginning import of {:,d} mempool txs' - .format(len(hex_hashes))) - - # Remove gone items - gone = set(self.txs).difference(hex_hashes) - for hex_hash in gone: - txin_pairs, txout_pairs, unconfirmed = self.txs.pop(hex_hash) - hash168s = set(hash168 for hash168, value in txin_pairs) - hash168s.update(hash168 for hash168, value in txout_pairs) - for hash168 in hash168s: - self.hash168s[hash168].remove(hex_hash) - if not self.hash168s[hash168]: - del self.hash168s[hash168] - touched.update(hash168s) - - # Get the raw transactions for the new hashes. Ignore the - # ones the daemon no longer has (it will return None). Put - # them into a dictionary of hex hash to deserialized tx. - hex_hashes.difference_update(self.txs) - raw_txs = await self.bp.daemon.getrawtransactions(hex_hashes) - if initial: - self.logger.info('analysing {:,d} mempool txs' - .format(len(raw_txs))) - new_txs = {hex_hash: Deserializer(raw_tx).read_tx() - for hex_hash, raw_tx in zip(hex_hashes, raw_txs) if raw_tx} - del raw_txs, hex_hashes - - # The mempool is unordered, so process all outputs first so - # that looking for inputs has full info. - script_hash168 = self.bp.coin.hash168_from_script() - db_utxo_lookup = self.bp.db_utxo_lookup - - def txout_pair(txout): - return (script_hash168(txout.pk_script), txout.value) - - for n, (hex_hash, tx) in enumerate(new_txs.items()): - # Yield to process e.g. signals - if n % 100 == 0: - await asyncio.sleep(0) - txout_pairs = [txout_pair(txout) for txout in tx.outputs] - self.txs[hex_hash] = (None, txout_pairs, None) - - def txin_info(txin): - hex_hash = hash_to_str(txin.prev_hash) - mempool_entry = self.txs.get(hex_hash) - if mempool_entry: - return mempool_entry[1][txin.prev_idx], True - pair = db_utxo_lookup(txin.prev_hash, txin.prev_idx) - return pair, False - - if initial: - next_log = time.time() - self.logger.info('processed outputs, now examining inputs. ' - 'This can take some time...') - - # Now add the inputs - for n, (hex_hash, tx) in enumerate(new_txs.items()): - # Yield to process e.g. signals - if n % 10 == 0: - await asyncio.sleep(0) - - if initial and time.time() > next_log: - next_log = time.time() + 20 - self.logger.info('{:,d} done ({:d}%)' - .format(n, int(n / len(new_txs) * 100))) - - txout_pairs = self.txs[hex_hash][1] - try: - infos = (txin_info(txin) for txin in tx.inputs) - txin_pairs, unconfs = zip(*infos) - except self.bp.MissingUTXOError: - # Drop this TX. If other mempool txs depend on it - # it's harmless - next time the mempool is refreshed - # they'll either be cleaned up or the UTXOs will no - # longer be missing. - del self.txs[hex_hash] - continue - self.txs[hex_hash] = (txin_pairs, txout_pairs, any(unconfs)) - - # Update touched and self.hash168s for the new tx - for hash168, value in txin_pairs: - self.hash168s[hash168].add(hex_hash) - touched.add(hash168) - for hash168, value in txout_pairs: - self.hash168s[hash168].add(hex_hash) - touched.add(hash168) - - if missing_utxos: - self.logger.info('{:,d} txs had missing UTXOs; probably the ' - 'daemon is a block or two ahead of us.' - .format(len(missing_utxos))) - first = ', '.join('{} / {:,d}'.format(hash_to_str(txin.prev_hash), - txin.prev_idx) - for txin in sorted(missing_utxos)[:3]) - self.logger.info('first ones are {}'.format(first)) - - self.count += 1 - if self.count % 25 == 0 or gone: - self.count = 0 - self.logger.info('{:,d} txs touching {:,d} addresses' - .format(len(self.txs), len(self.hash168s))) - - # Might include a None - return touched - - def transactions(self, hash168): - '''Generate (hex_hash, tx_fee, unconfirmed) tuples for mempool - entries for the hash168. - - unconfirmed is True if any txin is unconfirmed. - ''' - for hex_hash in self.hash168s[hash168]: - txin_pairs, txout_pairs, unconfirmed = self.txs[hex_hash] - tx_fee = (sum(v for hash168, v in txin_pairs) - - sum(v for hash168, v in txout_pairs)) - yield (hex_hash, tx_fee, unconfirmed) - - def value(self, hash168): - '''Return the unconfirmed amount in the mempool for hash168. - - Can be positive or negative. - ''' - value = 0 - for hex_hash in self.hash168s[hash168]: - txin_pairs, txout_pairs, unconfirmed = self.txs[hex_hash] - value -= sum(v for h168, v in txin_pairs if h168 == hash168) - value += sum(v for h168, v in txout_pairs if h168 == hash168) - return value - - class BlockProcessor(server.db.DB): '''Process blocks and update the DB state to match. @@ -335,7 +171,6 @@ class BlockProcessor(server.db.DB): self.daemon = Daemon(env.daemon_url, env.debug) self.daemon.debug_set_height(self.height) - self.mempool = MemPool(self) self.touched = set() self.futures = [] @@ -423,12 +258,11 @@ class BlockProcessor(server.db.DB): '''Called after each deamon poll if caught up.''' # Caught up to daemon height. Flush everything as queries # are performed on the DB and not in-memory. - self.flush(True) if self.first_sync: self.first_sync = False self.logger.info('{} synced to height {:,d}. DB version:' .format(VERSION, self.height, self.db_version)) - self.touched.update(await self.mempool.update(mempool_hashes)) + self.flush(True) async def handle_chain_reorg(self): # First get all state on disk @@ -1025,18 +859,3 @@ class BlockProcessor(server.db.DB): tx_hash = tx_hashes[tx_num - self.tx_counts[tx_height - 1]] return tx_hash, tx_height - - def mempool_transactions(self, hash168): - '''Generate (hex_hash, tx_fee, unconfirmed) tuples for mempool - entries for the hash168. - - unconfirmed is True if any txin is unconfirmed. - ''' - return self.mempool.transactions(hash168) - - def mempool_value(self, hash168): - '''Return the unconfirmed amount in the mempool for hash168. - - Can be positive or negative. - ''' - return self.mempool.value(hash168) diff --git a/server/protocol.py b/server/protocol.py index c221a6d..63460a2 100644 --- a/server/protocol.py +++ b/server/protocol.py @@ -27,19 +27,25 @@ from server.version import VERSION class BlockServer(BlockProcessor): - '''Like BlockProcessor but also has a server manager and starts - servers when caught up.''' + '''Like BlockProcessor but also has a mempool and a server manager. + + Servers are started immediately the block processor first catches + up with the daemon. + ''' def __init__(self, env): super().__init__(env) self.server_mgr = ServerManager(self, env) - self.bs_caught_up = False + self.mempool = MemPool(self) + self.caught_up_yet = False async def caught_up(self, mempool_hashes): + # Call the base class to flush before doing anything else. await super().caught_up(mempool_hashes) - if not self.bs_caught_up: + if not self.caught_up_yet: await self.server_mgr.start_servers() - self.bs_caught_up = True + self.caught_up_yet = True + self.touched.update(await self.mempool.update(mempool_hashes)) self.server_mgr.notify(self.height, self.touched) def on_cancel(self): @@ -47,6 +53,185 @@ class BlockServer(BlockProcessor): self.server_mgr.stop() super().on_cancel() + def mempool_transactions(self, hash168): + '''Generate (hex_hash, tx_fee, unconfirmed) tuples for mempool + entries for the hash168. + + unconfirmed is True if any txin is unconfirmed. + ''' + return self.mempool.transactions(hash168) + + def mempool_value(self, hash168): + '''Return the unconfirmed amount in the mempool for hash168. + + Can be positive or negative. + ''' + return self.mempool.value(hash168) + + +class MemPool(LoggedClass): + '''Representation of the daemon's mempool. + + Updated regularly in caught-up state. Goal is to enable efficient + response to the value() and transactions() calls. + + To that end we maintain the following maps: + + tx_hash -> [txin_pairs, txout_pairs, unconfirmed] + hash168 -> set of all tx hashes in which the hash168 appears + + A pair is a (hash168, value) tuple. Unconfirmed is true if any of the + tx's txins are unconfirmed. tx hashes are hex strings. + ''' + + def __init__(self, bp): + super().__init__() + self.txs = {} + self.hash168s = defaultdict(set) # None can be a key + self.bp = bp + self.count = -1 + + async def update(self, hex_hashes): + '''Update state given the current mempool to the passed set of hashes. + + Remove transactions that are no longer in our mempool. + Request new transactions we don't have then add to our mempool. + ''' + hex_hashes = set(hex_hashes) + touched = set() + missing_utxos = [] + + initial = self.count < 0 + if initial: + self.logger.info('beginning import of {:,d} mempool txs' + .format(len(hex_hashes))) + + # Remove gone items + gone = set(self.txs).difference(hex_hashes) + for hex_hash in gone: + txin_pairs, txout_pairs, unconfirmed = self.txs.pop(hex_hash) + hash168s = set(hash168 for hash168, value in txin_pairs) + hash168s.update(hash168 for hash168, value in txout_pairs) + for hash168 in hash168s: + self.hash168s[hash168].remove(hex_hash) + if not self.hash168s[hash168]: + del self.hash168s[hash168] + touched.update(hash168s) + + # Get the raw transactions for the new hashes. Ignore the + # ones the daemon no longer has (it will return None). Put + # them into a dictionary of hex hash to deserialized tx. + hex_hashes.difference_update(self.txs) + raw_txs = await self.bp.daemon.getrawtransactions(hex_hashes) + if initial: + self.logger.info('analysing {:,d} mempool txs' + .format(len(raw_txs))) + new_txs = {hex_hash: Deserializer(raw_tx).read_tx() + for hex_hash, raw_tx in zip(hex_hashes, raw_txs) if raw_tx} + del raw_txs, hex_hashes + + # The mempool is unordered, so process all outputs first so + # that looking for inputs has full info. + script_hash168 = self.bp.coin.hash168_from_script() + db_utxo_lookup = self.bp.db_utxo_lookup + + def txout_pair(txout): + return (script_hash168(txout.pk_script), txout.value) + + for n, (hex_hash, tx) in enumerate(new_txs.items()): + # Yield to process e.g. signals + if n % 100 == 0: + await asyncio.sleep(0) + txout_pairs = [txout_pair(txout) for txout in tx.outputs] + self.txs[hex_hash] = (None, txout_pairs, None) + + def txin_info(txin): + hex_hash = hash_to_str(txin.prev_hash) + mempool_entry = self.txs.get(hex_hash) + if mempool_entry: + return mempool_entry[1][txin.prev_idx], True + pair = db_utxo_lookup(txin.prev_hash, txin.prev_idx) + return pair, False + + if initial: + next_log = time.time() + self.logger.info('processed outputs, now examining inputs. ' + 'This can take some time...') + + # Now add the inputs + for n, (hex_hash, tx) in enumerate(new_txs.items()): + # Yield to process e.g. signals + if n % 10 == 0: + await asyncio.sleep(0) + + if initial and time.time() > next_log: + next_log = time.time() + 20 + self.logger.info('{:,d} done ({:d}%)' + .format(n, int(n / len(new_txs) * 100))) + + txout_pairs = self.txs[hex_hash][1] + try: + infos = (txin_info(txin) for txin in tx.inputs) + txin_pairs, unconfs = zip(*infos) + except self.bp.MissingUTXOError: + # Drop this TX. If other mempool txs depend on it + # it's harmless - next time the mempool is refreshed + # they'll either be cleaned up or the UTXOs will no + # longer be missing. + del self.txs[hex_hash] + continue + self.txs[hex_hash] = (txin_pairs, txout_pairs, any(unconfs)) + + # Update touched and self.hash168s for the new tx + for hash168, value in txin_pairs: + self.hash168s[hash168].add(hex_hash) + touched.add(hash168) + for hash168, value in txout_pairs: + self.hash168s[hash168].add(hex_hash) + touched.add(hash168) + + if missing_utxos: + self.logger.info('{:,d} txs had missing UTXOs; probably the ' + 'daemon is a block or two ahead of us.' + .format(len(missing_utxos))) + first = ', '.join('{} / {:,d}'.format(hash_to_str(txin.prev_hash), + txin.prev_idx) + for txin in sorted(missing_utxos)[:3]) + self.logger.info('first ones are {}'.format(first)) + + self.count += 1 + if self.count % 25 == 0 or gone: + self.count = 0 + self.logger.info('{:,d} txs touching {:,d} addresses' + .format(len(self.txs), len(self.hash168s))) + + # Might include a None + return touched + + def transactions(self, hash168): + '''Generate (hex_hash, tx_fee, unconfirmed) tuples for mempool + entries for the hash168. + + unconfirmed is True if any txin is unconfirmed. + ''' + for hex_hash in self.hash168s[hash168]: + txin_pairs, txout_pairs, unconfirmed = self.txs[hex_hash] + tx_fee = (sum(v for hash168, v in txin_pairs) + - sum(v for hash168, v in txout_pairs)) + yield (hex_hash, tx_fee, unconfirmed) + + def value(self, hash168): + '''Return the unconfirmed amount in the mempool for hash168. + + Can be positive or negative. + ''' + value = 0 + for hex_hash in self.hash168s[hash168]: + txin_pairs, txout_pairs, unconfirmed = self.txs[hex_hash] + value -= sum(v for h168, v in txin_pairs if h168 == hash168) + value += sum(v for h168, v in txout_pairs if h168 == hash168) + return value + class ServerManager(LoggedClass): '''Manages the servers.''' From 942d5d6b00627ffc209e6f8670081a543f7d33a7 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Sat, 19 Nov 2016 08:07:37 +0900 Subject: [PATCH 2/2] Fix import --- server/block_processor.py | 1 - server/protocol.py | 3 ++- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/server/block_processor.py b/server/block_processor.py index 60299ac..55a28f0 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -21,7 +21,6 @@ from functools import partial from server.daemon import Daemon, DaemonError from server.version import VERSION from lib.hash import hash_to_str -from lib.tx import Deserializer from lib.util import chunks, LoggedClass import server.db from server.storage import open_db diff --git a/server/protocol.py b/server/protocol.py index 63460a2..6e1051e 100644 --- a/server/protocol.py +++ b/server/protocol.py @@ -14,11 +14,12 @@ import json import ssl import time import traceback -from collections import namedtuple +from collections import defaultdict, namedtuple from functools import partial from lib.hash import sha256, double_sha256, hash_to_str, hex_str_to_hash from lib.jsonrpc import JSONRPC, json_notification_payload +from lib.tx import Deserializer from lib.util import LoggedClass from server.block_processor import BlockProcessor from server.daemon import DaemonError