diff --git a/README.rst b/README.rst index bb28936..253b23a 100644 --- a/README.rst +++ b/README.rst @@ -68,7 +68,6 @@ Roadmap ======= - test a few more performance improvement ideas -- handle the mempool - implement light caching of client responses - yield during expensive requests and/or penalize the connection - improve DB abstraction so LMDB is not penalized diff --git a/RELEASE-NOTES b/RELEASE-NOTES index 6ffc948..eba9cdf 100644 --- a/RELEASE-NOTES +++ b/RELEASE-NOTES @@ -1,3 +1,14 @@ +Version 0.06 +------------ + +- mempool support. ElectrumX maintains a representation of the daemon's + mempool and serves unconfirmed transactions and balances to clients. + +Version 0.05 +------------ + +- fixed a bug in 0.04 that stopped ElectrumX serving once synced + Version 0.04 ------------ diff --git a/samples/scripts/NOTES b/samples/scripts/NOTES index 7da8c62..e56c4ec 100644 --- a/samples/scripts/NOTES +++ b/samples/scripts/NOTES @@ -22,8 +22,10 @@ REORG_LIMIT - maximum number of blocks to be able to handle in a chain reorganisation. ElectrumX retains some fairly compact undo information for this many blocks in levelDB. Default is 200. -TCP_PORT - if set will serve Electrum clients on that port -SSL_PORT - if set will serve Electrum clients over SSL on that port. +HOST - the host that the TCP and SSL servers will use. Defaults to + localhost. +TCP_PORT - if set will serve Electrum TCP clients on that HOST:TCP_PORT +SSL_PORT - if set will serve Electrum SSL clients on that HOST:SSL_PORT If set, SSL_CERTFILE and SSL_KEYFILE must be filesystem paths. RPC_PORT - Listen on this port for local RPC connections, defaults to 8000. diff --git a/server/block_processor.py b/server/block_processor.py index 742b536..34c9bce 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -21,6 +21,7 @@ from server.cache import FSCache, UTXOCache, NO_CACHE_ENTRY from server.daemon import DaemonError from lib.hash import hash_to_str from lib.script import ScriptPubKey +from lib.tx import Deserializer from lib.util import chunks, LoggedClass from server.storage import open_db @@ -50,7 +51,7 @@ class Prefetcher(LoggedClass): self.queue = asyncio.Queue() self.queue_size = 0 self.fetched_height = height - self.mempool = [] + self.mempool_hashes = [] # Target cache size. Has little effect on sync time. self.target_cache_size = 10 * 1024 * 1024 # First fetch to be 10 blocks @@ -74,7 +75,7 @@ class Prefetcher(LoggedClass): blocks, height, size = await self.queue.get() self.queue_size -= size if height == self.daemon.cached_height(): - return blocks, self.mempool + return blocks, self.mempool_hashes else: return blocks, None @@ -99,7 +100,7 @@ class Prefetcher(LoggedClass): self.fetched_height += len(blocks) caught_up = self.fetched_height == self.daemon.cached_height() if caught_up: - self.mempool = await self.daemon.mempool_hashes() + self.mempool_hashes = await self.daemon.mempool_hashes() # Wake up block processor if we have something if blocks or caught_up: @@ -137,6 +138,143 @@ class Prefetcher(LoggedClass): return blocks, size +class MissingUTXOError(Exception): + pass + +class MemPool(LoggedClass): + '''Representation of the daemon's mempool. + + Updated regularly in caught-up state. Goal is to enable efficient + response to the value() and transactions() calls. + + To that end we maintain the following maps: + + tx_hash -> [txin_pairs, txout_pairs, unconfirmed] + hash168 -> set of all tx hashes in which the hash168 appears + + A pair is a (hash168, value) tuple. Unconfirmed is true if any of the + tx's txins are unconfirmed. tx hashes are hex strings. + ''' + + def __init__(self, bp): + super().__init__() + self.txs = {} + self.hash168s = defaultdict(set) # None can be a key + self.bp = bp + + async def update(self, hex_hashes): + '''Update state given the current mempool to the passed set of hashes. + + Remove transactions that are no longer in our mempool. + Request new transactions we don't have then add to our mempool. + ''' + hex_hashes = set(hex_hashes) + touched = set() + + if not self.txs: + self.logger.info('initial fetch of {:,d} daemon mempool txs' + .format(len(hex_hashes))) + + # Remove gone items + gone = set(self.txs).difference(hex_hashes) + for hex_hash in gone: + txin_pairs, txout_pairs, unconfirmed = self.txs.pop(hex_hash) + hash168s = set(hash168 for hash168, value in txin_pairs) + hash168s.update(hash168 for hash168, value in txout_pairs) + for hash168 in hash168s: + self.hash168s[hash168].remove(hex_hash) + touched.update(hash168s) + if gone: + self.logger.info('{:,d} entries removed from mempool' + .format(len(gone))) + + # Get the raw transactions for the new hashes. Ignore the + # ones the daemon no longer has (it will return None). Put + # them into a dictionary of hex hash to deserialized tx. + hex_hashes.difference_update(self.txs) + raw_txs = await self.bp.daemon.getrawtransactions(hex_hashes) + new_txs = {hex_hash: Deserializer(raw_tx).read_tx() + for hex_hash, raw_tx in zip(hex_hashes, raw_txs) if raw_tx} + del raw_txs, hex_hashes + + # The mempool is unordered, so process all outputs first so + # that looking for inputs has full info. + parse_script = ScriptPubKey.from_script + coin = self.bp.coin + utxo_lookup = self.bp.utxo_cache.lookup + + def txout_pair(txout): + return (parse_script(txout.pk_script, coin).hash168, txout.value) + + for hex_hash, tx in new_txs.items(): + txout_pairs = tuple(txout_pair(txout) for txout in tx.outputs) + self.txs[hex_hash] = [None, txout_pairs, None] + + def txin_info(txin): + hex_hash = hash_to_str(txin.prev_hash) + mempool_entry = self.txs.get(hex_hash) + if mempool_entry: + return mempool_entry[1][txin.prev_idx], True + entry = utxo_lookup(txin.prev_hash, txin.prev_idx) + if entry == NO_CACHE_ENTRY: + # Not possible unless daemon is lying or we're corrupted? + self.logger.warning('no UTXO found for {} / {}' + .format(hash_to_str(txin.prev_hash), + txin.prev_idx)) + raise MissingUTXOError + value, = struct.unpack('= self.utxo_MB or hist_MB >= self.hist_MB: self.flush(utxo_MB >= self.utxo_MB) - return touched + if update_touched: + self.touched.update(touched) def advance_txs(self, tx_hashes, txs, touched): put_utxo = self.utxo_cache.put @@ -661,9 +801,9 @@ class BlockProcessor(LoggedClass): self.logger.info('backed up to height {:,d}'.format(self.height)) + self.touched.update(touched) flush_history = partial(self.backup_history, hash168s=touched) self.flush(True, flush_history=flush_history) - return touched def backup_txs(self, tx_hashes, txs, touched): # Prevout values, in order down the block (coinbase first if present) @@ -704,9 +844,24 @@ class BlockProcessor(LoggedClass): assert isinstance(limit, int) and limit >= 0 return limit + def mempool_transactions(self, hash168): + '''Generate (hex_hash, tx_fee, unconfirmed) tuples for mempool + entries for the hash168. + + unconfirmed is True if any txin is confirmed. + ''' + return self.mempool.transactions(hash168) + + def mempool_value(self, hash168): + '''Return the unconfirmed amount in the mempool for hash168. + + Can be positive or negative. + ''' + return self.mempool.value(hash168) + def get_history(self, hash168, limit=1000): '''Generator that returns an unpruned, sorted list of (tx_hash, - height) tuples of transactions that touched the address, + height) tuples of confirmed transactions that touched the address, earliest in the blockchain first. Includes both spending and receiving transactions. By default yields at most 1000 entries. Set limit to None to get them all. @@ -756,7 +911,7 @@ class BlockProcessor(LoggedClass): hash168 = None if 0 <= index <= 65535: idx_packed = struct.pack('