@ -152,170 +152,6 @@ class ChainReorg(Exception):
''' Raised on a blockchain reorganisation. '''
class MemPool ( LoggedClass ) :
''' Representation of the daemon ' s mempool.
Updated regularly in caught - up state . Goal is to enable efficient
response to the value ( ) and transactions ( ) calls .
To that end we maintain the following maps :
tx_hash - > [ txin_pairs , txout_pairs , unconfirmed ]
hash168 - > set of all tx hashes in which the hash168 appears
A pair is a ( hash168 , value ) tuple . Unconfirmed is true if any of the
tx ' s txins are unconfirmed. tx hashes are hex strings.
'''
def __init__ ( self , bp ) :
super ( ) . __init__ ( )
self . txs = { }
self . hash168s = defaultdict ( set ) # None can be a key
self . bp = bp
self . count = - 1
async def update ( self , hex_hashes ) :
''' Update state given the current mempool to the passed set of hashes.
Remove transactions that are no longer in our mempool .
Request new transactions we don ' t have then add to our mempool.
'''
hex_hashes = set ( hex_hashes )
touched = set ( )
missing_utxos = [ ]
initial = self . count < 0
if initial :
self . logger . info ( ' beginning import of {:,d} mempool txs '
. format ( len ( hex_hashes ) ) )
# Remove gone items
gone = set ( self . txs ) . difference ( hex_hashes )
for hex_hash in gone :
txin_pairs , txout_pairs , unconfirmed = self . txs . pop ( hex_hash )
hash168s = set ( hash168 for hash168 , value in txin_pairs )
hash168s . update ( hash168 for hash168 , value in txout_pairs )
for hash168 in hash168s :
self . hash168s [ hash168 ] . remove ( hex_hash )
if not self . hash168s [ hash168 ] :
del self . hash168s [ hash168 ]
touched . update ( hash168s )
# Get the raw transactions for the new hashes. Ignore the
# ones the daemon no longer has (it will return None). Put
# them into a dictionary of hex hash to deserialized tx.
hex_hashes . difference_update ( self . txs )
raw_txs = await self . bp . daemon . getrawtransactions ( hex_hashes )
if initial :
self . logger . info ( ' analysing {:,d} mempool txs '
. format ( len ( raw_txs ) ) )
new_txs = { hex_hash : Deserializer ( raw_tx ) . read_tx ( )
for hex_hash , raw_tx in zip ( hex_hashes , raw_txs ) if raw_tx }
del raw_txs , hex_hashes
# The mempool is unordered, so process all outputs first so
# that looking for inputs has full info.
script_hash168 = self . bp . coin . hash168_from_script ( )
db_utxo_lookup = self . bp . db_utxo_lookup
def txout_pair ( txout ) :
return ( script_hash168 ( txout . pk_script ) , txout . value )
for n , ( hex_hash , tx ) in enumerate ( new_txs . items ( ) ) :
# Yield to process e.g. signals
if n % 100 == 0 :
await asyncio . sleep ( 0 )
txout_pairs = [ txout_pair ( txout ) for txout in tx . outputs ]
self . txs [ hex_hash ] = ( None , txout_pairs , None )
def txin_info ( txin ) :
hex_hash = hash_to_str ( txin . prev_hash )
mempool_entry = self . txs . get ( hex_hash )
if mempool_entry :
return mempool_entry [ 1 ] [ txin . prev_idx ] , True
pair = db_utxo_lookup ( txin . prev_hash , txin . prev_idx )
return pair , False
if initial :
next_log = time . time ( )
self . logger . info ( ' processed outputs, now examining inputs. '
' This can take some time... ' )
# Now add the inputs
for n , ( hex_hash , tx ) in enumerate ( new_txs . items ( ) ) :
# Yield to process e.g. signals
if n % 10 == 0 :
await asyncio . sleep ( 0 )
if initial and time . time ( ) > next_log :
next_log = time . time ( ) + 20
self . logger . info ( ' {:,d} done ( {:d} % ) '
. format ( n , int ( n / len ( new_txs ) * 100 ) ) )
txout_pairs = self . txs [ hex_hash ] [ 1 ]
try :
infos = ( txin_info ( txin ) for txin in tx . inputs )
txin_pairs , unconfs = zip ( * infos )
except self . bp . MissingUTXOError :
# Drop this TX. If other mempool txs depend on it
# it's harmless - next time the mempool is refreshed
# they'll either be cleaned up or the UTXOs will no
# longer be missing.
del self . txs [ hex_hash ]
continue
self . txs [ hex_hash ] = ( txin_pairs , txout_pairs , any ( unconfs ) )
# Update touched and self.hash168s for the new tx
for hash168 , value in txin_pairs :
self . hash168s [ hash168 ] . add ( hex_hash )
touched . add ( hash168 )
for hash168 , value in txout_pairs :
self . hash168s [ hash168 ] . add ( hex_hash )
touched . add ( hash168 )
if missing_utxos :
self . logger . info ( ' {:,d} txs had missing UTXOs; probably the '
' daemon is a block or two ahead of us. '
. format ( len ( missing_utxos ) ) )
first = ' , ' . join ( ' {} / {:,d} ' . format ( hash_to_str ( txin . prev_hash ) ,
txin . prev_idx )
for txin in sorted ( missing_utxos ) [ : 3 ] )
self . logger . info ( ' first ones are {} ' . format ( first ) )
self . count + = 1
if self . count % 25 == 0 or gone :
self . count = 0
self . logger . info ( ' {:,d} txs touching {:,d} addresses '
. format ( len ( self . txs ) , len ( self . hash168s ) ) )
# Might include a None
return touched
def transactions ( self , hash168 ) :
''' Generate (hex_hash, tx_fee, unconfirmed) tuples for mempool
entries for the hash168 .
unconfirmed is True if any txin is unconfirmed .
'''
for hex_hash in self . hash168s [ hash168 ] :
txin_pairs , txout_pairs , unconfirmed = self . txs [ hex_hash ]
tx_fee = ( sum ( v for hash168 , v in txin_pairs )
- sum ( v for hash168 , v in txout_pairs ) )
yield ( hex_hash , tx_fee , unconfirmed )
def value ( self , hash168 ) :
''' Return the unconfirmed amount in the mempool for hash168.
Can be positive or negative .
'''
value = 0
for hex_hash in self . hash168s [ hash168 ] :
txin_pairs , txout_pairs , unconfirmed = self . txs [ hex_hash ]
value - = sum ( v for h168 , v in txin_pairs if h168 == hash168 )
value + = sum ( v for h168 , v in txout_pairs if h168 == hash168 )
return value
class BlockProcessor ( server . db . DB ) :
''' Process blocks and update the DB state to match.
@ -335,7 +171,6 @@ class BlockProcessor(server.db.DB):
self . daemon = Daemon ( env . daemon_url , env . debug )
self . daemon . debug_set_height ( self . height )
self . mempool = MemPool ( self )
self . touched = set ( )
self . futures = [ ]
@ -423,12 +258,11 @@ class BlockProcessor(server.db.DB):
''' Called after each deamon poll if caught up. '''
# Caught up to daemon height. Flush everything as queries
# are performed on the DB and not in-memory.
self . flush ( True )
if self . first_sync :
self . first_sync = False
self . logger . info ( ' {} synced to height {:,d} . DB version: '
. format ( VERSION , self . height , self . db_version ) )
self . touched . update ( await self . mempool . update ( mempool_hashes ) )
self . flush ( True )
async def handle_chain_reorg ( self ) :
# First get all state on disk
@ -1025,18 +859,3 @@ class BlockProcessor(server.db.DB):
tx_hash = tx_hashes [ tx_num - self . tx_counts [ tx_height - 1 ] ]
return tx_hash , tx_height
def mempool_transactions ( self , hash168 ) :
''' Generate (hex_hash, tx_fee, unconfirmed) tuples for mempool
entries for the hash168 .
unconfirmed is True if any txin is unconfirmed .
'''
return self . mempool . transactions ( hash168 )
def mempool_value ( self , hash168 ) :
''' Return the unconfirmed amount in the mempool for hash168.
Can be positive or negative .
'''
return self . mempool . value ( hash168 )