|
|
@ -21,6 +21,7 @@ from server.cache import FSCache, UTXOCache, NO_CACHE_ENTRY |
|
|
|
from server.daemon import DaemonError |
|
|
|
from lib.hash import hash_to_str |
|
|
|
from lib.script import ScriptPubKey |
|
|
|
from lib.tx import Deserializer |
|
|
|
from lib.util import chunks, LoggedClass |
|
|
|
from server.storage import open_db |
|
|
|
|
|
|
@ -50,7 +51,7 @@ class Prefetcher(LoggedClass): |
|
|
|
self.queue = asyncio.Queue() |
|
|
|
self.queue_size = 0 |
|
|
|
self.fetched_height = height |
|
|
|
self.mempool = [] |
|
|
|
self.mempool_hashes = [] |
|
|
|
# Target cache size. Has little effect on sync time. |
|
|
|
self.target_cache_size = 10 * 1024 * 1024 |
|
|
|
# First fetch to be 10 blocks |
|
|
@ -74,7 +75,7 @@ class Prefetcher(LoggedClass): |
|
|
|
blocks, height, size = await self.queue.get() |
|
|
|
self.queue_size -= size |
|
|
|
if height == self.daemon.cached_height(): |
|
|
|
return blocks, self.mempool |
|
|
|
return blocks, self.mempool_hashes |
|
|
|
else: |
|
|
|
return blocks, None |
|
|
|
|
|
|
@ -99,7 +100,7 @@ class Prefetcher(LoggedClass): |
|
|
|
self.fetched_height += len(blocks) |
|
|
|
caught_up = self.fetched_height == self.daemon.cached_height() |
|
|
|
if caught_up: |
|
|
|
self.mempool = await self.daemon.mempool_hashes() |
|
|
|
self.mempool_hashes = await self.daemon.mempool_hashes() |
|
|
|
|
|
|
|
# Wake up block processor if we have something |
|
|
|
if blocks or caught_up: |
|
|
@ -137,6 +138,143 @@ class Prefetcher(LoggedClass): |
|
|
|
|
|
|
|
return blocks, size |
|
|
|
|
|
|
|
class MissingUTXOError(Exception): |
|
|
|
pass |
|
|
|
|
|
|
|
class MemPool(LoggedClass): |
|
|
|
'''Representation of the daemon's mempool. |
|
|
|
|
|
|
|
Updated regularly in caught-up state. Goal is to enable efficient |
|
|
|
response to the value() and transactions() calls. |
|
|
|
|
|
|
|
To that end we maintain the following maps: |
|
|
|
|
|
|
|
tx_hash -> [txin_pairs, txout_pairs, unconfirmed] |
|
|
|
hash168 -> set of all tx hashes in which the hash168 appears |
|
|
|
|
|
|
|
A pair is a (hash168, value) tuple. Unconfirmed is true if any of the |
|
|
|
tx's txins are unconfirmed. tx hashes are hex strings. |
|
|
|
''' |
|
|
|
|
|
|
|
def __init__(self, bp): |
|
|
|
super().__init__() |
|
|
|
self.txs = {} |
|
|
|
self.hash168s = defaultdict(set) # None can be a key |
|
|
|
self.bp = bp |
|
|
|
|
|
|
|
async def update(self, hex_hashes): |
|
|
|
'''Update state given the current mempool to the passed set of hashes. |
|
|
|
|
|
|
|
Remove transactions that are no longer in our mempool. |
|
|
|
Request new transactions we don't have then add to our mempool. |
|
|
|
''' |
|
|
|
hex_hashes = set(hex_hashes) |
|
|
|
touched = set() |
|
|
|
|
|
|
|
if not self.txs: |
|
|
|
self.logger.info('initial fetch of {:,d} daemon mempool txs' |
|
|
|
.format(len(hex_hashes))) |
|
|
|
|
|
|
|
# Remove gone items |
|
|
|
gone = set(self.txs).difference(hex_hashes) |
|
|
|
for hex_hash in gone: |
|
|
|
txin_pairs, txout_pairs, unconfirmed = self.txs.pop(hex_hash) |
|
|
|
hash168s = set(hash168 for hash168, value in txin_pairs) |
|
|
|
hash168s.update(hash168 for hash168, value in txout_pairs) |
|
|
|
for hash168 in hash168s: |
|
|
|
self.hash168s[hash168].remove(hex_hash) |
|
|
|
touched.update(hash168s) |
|
|
|
if gone: |
|
|
|
self.logger.info('{:,d} entries removed from mempool' |
|
|
|
.format(len(gone))) |
|
|
|
|
|
|
|
# Get the raw transactions for the new hashes. Ignore the |
|
|
|
# ones the daemon no longer has (it will return None). Put |
|
|
|
# them into a dictionary of hex hash to deserialized tx. |
|
|
|
hex_hashes.difference_update(self.txs) |
|
|
|
raw_txs = await self.bp.daemon.getrawtransactions(hex_hashes) |
|
|
|
new_txs = {hex_hash: Deserializer(raw_tx).read_tx() |
|
|
|
for hex_hash, raw_tx in zip(hex_hashes, raw_txs) if raw_tx} |
|
|
|
del raw_txs, hex_hashes |
|
|
|
|
|
|
|
# The mempool is unordered, so process all outputs first so |
|
|
|
# that looking for inputs has full info. |
|
|
|
parse_script = ScriptPubKey.from_script |
|
|
|
coin = self.bp.coin |
|
|
|
utxo_lookup = self.bp.utxo_cache.lookup |
|
|
|
|
|
|
|
def txout_pair(txout): |
|
|
|
return (parse_script(txout.pk_script, coin).hash168, txout.value) |
|
|
|
|
|
|
|
for hex_hash, tx in new_txs.items(): |
|
|
|
txout_pairs = tuple(txout_pair(txout) for txout in tx.outputs) |
|
|
|
self.txs[hex_hash] = [None, txout_pairs, None] |
|
|
|
|
|
|
|
def txin_info(txin): |
|
|
|
hex_hash = hash_to_str(txin.prev_hash) |
|
|
|
mempool_entry = self.txs.get(hex_hash) |
|
|
|
if mempool_entry: |
|
|
|
return mempool_entry[1][txin.prev_idx], True |
|
|
|
entry = utxo_lookup(txin.prev_hash, txin.prev_idx) |
|
|
|
if entry == NO_CACHE_ENTRY: |
|
|
|
# Not possible unless daemon is lying or we're corrupted? |
|
|
|
self.logger.warning('no UTXO found for {} / {}' |
|
|
|
.format(hash_to_str(txin.prev_hash), |
|
|
|
txin.prev_idx)) |
|
|
|
raise MissingUTXOError |
|
|
|
value, = struct.unpack('<Q', entry[-8:]) |
|
|
|
return (entry[:21], value), False |
|
|
|
|
|
|
|
# Now add the inputs |
|
|
|
for hex_hash, tx in new_txs.items(): |
|
|
|
txout_pairs = self.txs[hex_hash][1] |
|
|
|
try: |
|
|
|
infos = (txin_info(txin) for txin in tx.inputs) |
|
|
|
txin_pairs, unconfs = zip(*infos) |
|
|
|
except MissingUTXOError: |
|
|
|
# If we were missing a UTXO for some reason drop this tx |
|
|
|
del self.txs[hex_hash] |
|
|
|
continue |
|
|
|
self.txs[hex_hash] = [txin_pairs, txout_pairs, any(unconfs)] |
|
|
|
|
|
|
|
# Update touched and self.hash168s for the new tx |
|
|
|
for hash168, value in txin_pairs: |
|
|
|
self.hash168s[hash168].add(hex_hash) |
|
|
|
touched.add(hash168) |
|
|
|
for hash168, value in txout_pairs: |
|
|
|
self.hash168s[hash168].add(hex_hash) |
|
|
|
touched.add(hash168) |
|
|
|
|
|
|
|
self.logger.info('{:,d} entries in mempool for {:,d} addresses' |
|
|
|
.format(len(self.txs), len(self.hash168s))) |
|
|
|
|
|
|
|
# Might include a None |
|
|
|
return touched |
|
|
|
|
|
|
|
def transactions(self, hash168): |
|
|
|
'''Generate (hex_hash, tx_fee, unconfirmed) tuples for mempool |
|
|
|
entries for the hash168. |
|
|
|
|
|
|
|
unconfirmed is True if any txin is confirmed. |
|
|
|
''' |
|
|
|
for hex_hash in self.hash168s[hash168]: |
|
|
|
txin_pairs, txout_pairs, unconfirmed = self.txs[hex_hash] |
|
|
|
tx_fee = (sum(v for hash168, v in txin_pairs) |
|
|
|
- sum(v for hash168, v in txout_pairs)) |
|
|
|
yield (hex_hash, tx_fee, unconfirmed) |
|
|
|
|
|
|
|
def value(self, hash168): |
|
|
|
'''Return the unconfirmed amount in the mempool for hash168. |
|
|
|
|
|
|
|
Can be positive or negative. |
|
|
|
''' |
|
|
|
value = 0 |
|
|
|
for tx_hash in self.hash168s[hash168]: |
|
|
|
txin_pairs, txout_pairs, unconfirmed = self.txs[tx_hash] |
|
|
|
value -= sum(v for h168, v in txin_pairs if h168 == hash168) |
|
|
|
value += sum(v for h168, v in txout_pairs if h168 == hash168) |
|
|
|
return value |
|
|
|
|
|
|
|
|
|
|
|
class BlockProcessor(LoggedClass): |
|
|
|
'''Process blocks and update the DB state to match. |
|
|
@ -153,6 +291,8 @@ class BlockProcessor(LoggedClass): |
|
|
|
|
|
|
|
self.daemon = daemon |
|
|
|
self.on_update = on_update |
|
|
|
self.mempool = MemPool(self) |
|
|
|
self.touched = set() |
|
|
|
|
|
|
|
# Meta |
|
|
|
self.utxo_MB = env.utxo_MB |
|
|
@ -232,27 +372,29 @@ class BlockProcessor(LoggedClass): |
|
|
|
Blocks are only processed in the forward direction. The |
|
|
|
prefetcher only provides a non-None mempool when caught up. |
|
|
|
''' |
|
|
|
all_touched = [set()] |
|
|
|
blocks, mempool = await self.prefetcher.get_blocks() |
|
|
|
blocks, mempool_hashes = await self.prefetcher.get_blocks() |
|
|
|
caught_up = mempool_hashes is not None |
|
|
|
for block in blocks: |
|
|
|
touched = self.advance_block(block) |
|
|
|
if touched is None: |
|
|
|
all_touched.append(await self.handle_chain_reorg()) |
|
|
|
mempool = None |
|
|
|
break |
|
|
|
all_touched.append(touched) |
|
|
|
if self.advance_block(block, caught_up): |
|
|
|
await self.handle_chain_reorg() |
|
|
|
return |
|
|
|
await asyncio.sleep(0) # Yield |
|
|
|
|
|
|
|
if mempool is not None: |
|
|
|
# Caught up to daemon height. Flush everything as queries |
|
|
|
# are performed on the DB and not in-memory. |
|
|
|
self.flush(True) |
|
|
|
if self.first_sync: |
|
|
|
self.first_sync = False |
|
|
|
self.logger.info('synced to height {:,d}'.format(self.height)) |
|
|
|
if self.on_update: |
|
|
|
await self.on_update(self.height, set.union(*all_touched)) |
|
|
|
if caught_up: |
|
|
|
await self.caught_up(mempool_hashes) |
|
|
|
|
|
|
|
async def caught_up(self, mempool_hashes): |
|
|
|
'''Called after each deamon poll if caught up.''' |
|
|
|
# Caught up to daemon height. Flush everything as queries |
|
|
|
# are performed on the DB and not in-memory. |
|
|
|
self.flush(True) |
|
|
|
if self.first_sync: |
|
|
|
self.first_sync = False |
|
|
|
self.logger.info('synced to height {:,d}'.format(self.height)) |
|
|
|
if self.on_update: |
|
|
|
self.touched.update(await self.mempool.update(mempool_hashes)) |
|
|
|
await self.on_update(self.height, self.touched) |
|
|
|
self.touched = set() |
|
|
|
|
|
|
|
async def force_chain_reorg(self, to_genesis): |
|
|
|
try: |
|
|
@ -266,20 +408,17 @@ class BlockProcessor(LoggedClass): |
|
|
|
self.flush(True) |
|
|
|
self.logger.info('finding common height...') |
|
|
|
|
|
|
|
touched = set() |
|
|
|
hashes = await self.reorg_hashes(to_genesis) |
|
|
|
# Reverse and convert to hex strings. |
|
|
|
hashes = [hash_to_str(hash) for hash in reversed(hashes)] |
|
|
|
for hex_hashes in chunks(hashes, 50): |
|
|
|
blocks = await self.daemon.raw_blocks(hex_hashes) |
|
|
|
touched.update(self.backup_blocks(blocks)) |
|
|
|
self.backup_blocks(blocks) |
|
|
|
|
|
|
|
self.logger.info('backed up to height {:,d}'.format(self.height)) |
|
|
|
await self.prefetcher.clear(self.height) |
|
|
|
self.logger.info('prefetcher reset') |
|
|
|
|
|
|
|
return touched |
|
|
|
|
|
|
|
async def reorg_hashes(self, to_genesis): |
|
|
|
'''Return the list of hashes to back up beacuse of a reorg. |
|
|
|
|
|
|
@ -565,7 +704,7 @@ class BlockProcessor(LoggedClass): |
|
|
|
'''Read undo information from a file for the current height.''' |
|
|
|
return self.db.get(self.undo_key(height)) |
|
|
|
|
|
|
|
def advance_block(self, block): |
|
|
|
def advance_block(self, block, update_touched): |
|
|
|
# We must update the fs_cache before calling advance_txs() as |
|
|
|
# the UTXO cache uses the fs_cache via get_tx_hash() to |
|
|
|
# resolve compressed key collisions |
|
|
@ -590,7 +729,8 @@ class BlockProcessor(LoggedClass): |
|
|
|
if utxo_MB >= self.utxo_MB or hist_MB >= self.hist_MB: |
|
|
|
self.flush(utxo_MB >= self.utxo_MB) |
|
|
|
|
|
|
|
return touched |
|
|
|
if update_touched: |
|
|
|
self.touched.update(touched) |
|
|
|
|
|
|
|
def advance_txs(self, tx_hashes, txs, touched): |
|
|
|
put_utxo = self.utxo_cache.put |
|
|
@ -661,9 +801,9 @@ class BlockProcessor(LoggedClass): |
|
|
|
|
|
|
|
self.logger.info('backed up to height {:,d}'.format(self.height)) |
|
|
|
|
|
|
|
self.touched.update(touched) |
|
|
|
flush_history = partial(self.backup_history, hash168s=touched) |
|
|
|
self.flush(True, flush_history=flush_history) |
|
|
|
return touched |
|
|
|
|
|
|
|
def backup_txs(self, tx_hashes, txs, touched): |
|
|
|
# Prevout values, in order down the block (coinbase first if present) |
|
|
@ -704,9 +844,24 @@ class BlockProcessor(LoggedClass): |
|
|
|
assert isinstance(limit, int) and limit >= 0 |
|
|
|
return limit |
|
|
|
|
|
|
|
def mempool_transactions(self, hash168): |
|
|
|
'''Generate (hex_hash, tx_fee, unconfirmed) tuples for mempool |
|
|
|
entries for the hash168. |
|
|
|
|
|
|
|
unconfirmed is True if any txin is confirmed. |
|
|
|
''' |
|
|
|
return self.mempool.transactions(hash168) |
|
|
|
|
|
|
|
def mempool_value(self, hash168): |
|
|
|
'''Return the unconfirmed amount in the mempool for hash168. |
|
|
|
|
|
|
|
Can be positive or negative. |
|
|
|
''' |
|
|
|
return self.mempool.value(hash168) |
|
|
|
|
|
|
|
def get_history(self, hash168, limit=1000): |
|
|
|
'''Generator that returns an unpruned, sorted list of (tx_hash, |
|
|
|
height) tuples of transactions that touched the address, |
|
|
|
height) tuples of confirmed transactions that touched the address, |
|
|
|
earliest in the blockchain first. Includes both spending and |
|
|
|
receiving transactions. By default yields at most 1000 entries. |
|
|
|
Set limit to None to get them all. |
|
|
@ -756,7 +911,7 @@ class BlockProcessor(LoggedClass): |
|
|
|
hash168 = None |
|
|
|
if 0 <= index <= 65535: |
|
|
|
idx_packed = struct.pack('<H', index) |
|
|
|
hash168 = self.utxo_cache.hash168(tx_hash, idx_packed) |
|
|
|
hash168 = self.utxo_cache.hash168(tx_hash, idx_packed, False) |
|
|
|
if hash168 == NO_CACHE_ENTRY: |
|
|
|
hash168 = None |
|
|
|
return hash168 |
|
|
|