Browse Source

Implement reorg logic

master
Neil Booth 8 years ago
parent
commit
5c63cd40de
  1. 20
      query.py
  2. 372
      server/block_processor.py
  3. 108
      server/cache.py
  4. 1
      server/env.py

20
query.py

@ -11,11 +11,31 @@ from server.block_processor import BlockProcessor
from lib.hash import hash_to_str from lib.hash import hash_to_str
def count_entries(db):
utxos = 0
for key in db.iterator(prefix=b'u', include_value=False):
utxos += 1
print("UTXO count:", utxos)
hash168 = 0
for key in db.iterator(prefix=b'h', include_value=False):
hash168 += 1
print("Hash168 count:", hash168)
hist = 0
for key in db.iterator(prefix=b'H', include_value=False):
hist += 1
print("History addresses:", hist)
def main(): def main():
env = Env() env = Env()
coin = env.coin coin = env.coin
os.chdir(env.db_dir) os.chdir(env.db_dir)
bp = BlockProcessor(env, None) bp = BlockProcessor(env, None)
if len(sys.argv) == 1:
count_entries(bp.db)
return
argc = 1 argc = 1
try: try:
limit = int(sys.argv[argc]) limit = int(sys.argv[argc])

372
server/block_processor.py

@ -6,15 +6,17 @@ import ast
import asyncio import asyncio
import struct import struct
import time import time
from bisect import bisect_left
from collections import defaultdict, namedtuple from collections import defaultdict, namedtuple
from functools import partial from functools import partial
import plyvel import plyvel
from server.cache import FSCache, UTXOCache from server.cache import FSCache, UTXOCache, NO_CACHE_ENTRY
from server.daemon import DaemonError from server.daemon import DaemonError
from lib.hash import hash_to_str from lib.hash import hash_to_str
from lib.util import LoggedClass from lib.script import ScriptPubKey
from lib.util import chunks, LoggedClass
def formatted_time(t): def formatted_time(t):
@ -124,24 +126,28 @@ class BlockProcessor(LoggedClass):
self.next_cache_check = 0 self.next_cache_check = 0
self.last_flush = time.time() self.last_flush = time.time()
self.coin = env.coin self.coin = env.coin
self.caught_up = False
self.reorg_limit = env.reorg_limit
# Chain state (initialize to genesis in case of new DB) # Chain state (initialize to genesis in case of new DB)
self.db_height = -1 self.db_height = -1
self.db_tx_count = 0 self.db_tx_count = 0
self.db_tip = b'\0' * 32
self.flush_count = 0 self.flush_count = 0
self.utxo_flush_count = 0 self.utxo_flush_count = 0
self.wall_time = 0 self.wall_time = 0
self.tip = b'\0' * 32
# Open DB and metadata files. Record some of its state. # Open DB and metadata files. Record some of its state.
self.db = self.open_db(self.coin) self.db = self.open_db(self.coin)
self.tx_count = self.db_tx_count self.tx_count = self.db_tx_count
self.height = self.db_height self.height = self.db_height
self.tip = self.db_tip
# Caches to be flushed later. Headers and tx_hashes have one # Caches to be flushed later. Headers and tx_hashes have one
# entry per block # entry per block
self.history = defaultdict(partial(array.array, 'I')) self.history = defaultdict(partial(array.array, 'I'))
self.history_size = 0 self.history_size = 0
self.backup_hash168s = set()
self.utxo_cache = UTXOCache(self, self.db, self.coin) self.utxo_cache = UTXOCache(self, self.db, self.coin)
self.fs_cache = FSCache(self.coin, self.height, self.tx_count) self.fs_cache = FSCache(self.coin, self.height, self.tx_count)
self.prefetcher = Prefetcher(daemon, self.height) self.prefetcher = Prefetcher(daemon, self.height)
@ -157,13 +163,20 @@ class BlockProcessor(LoggedClass):
self.tx_count, self.flush_count, self.tx_count, self.flush_count,
self.utxo_flush_count, self.utxo_flush_count,
formatted_time(self.wall_time))) formatted_time(self.wall_time)))
self.logger.info('reorg limit of {:,d} blocks'
.format(self.reorg_limit))
self.logger.info('flushing UTXO cache at {:,d} MB' self.logger.info('flushing UTXO cache at {:,d} MB'
.format(self.utxo_MB)) .format(self.utxo_MB))
self.logger.info('flushing history cache at {:,d} MB' self.logger.info('flushing history cache at {:,d} MB'
.format(self.hist_MB)) .format(self.hist_MB))
def coros(self): self.clean_db()
return [self.start(), self.prefetcher.start()]
def coros(self, force_backup=False):
if force_backup:
return [self.force_chain_reorg(True), self.prefetcher.start()]
else:
return [self.start(), self.prefetcher.start()]
async def start(self): async def start(self):
'''External entry point for block processing. '''External entry point for block processing.
@ -178,30 +191,49 @@ class BlockProcessor(LoggedClass):
async def advance_blocks(self): async def advance_blocks(self):
'''Loop forever processing blocks in the forward direction.''' '''Loop forever processing blocks in the forward direction.'''
caught_up = False
while True: while True:
blocks = await self.prefetcher.get_blocks() blocks = await self.prefetcher.get_blocks()
for block in blocks: for block in blocks:
if not self.advance_block(block): if not self.advance_block(block):
await self.handle_chain_reorg() await self.handle_chain_reorg()
caught_up = False self.caught_up = False
break break
await asyncio.sleep(0) # Yield await asyncio.sleep(0) # Yield
if not caught_up and self.height == self.daemon.cached_height(): if self.height != self.daemon.cached_height():
caught_up = True continue
if not self.caught_up:
self.caught_up = True
self.logger.info('caught up to height {:,d}' self.logger.info('caught up to height {:,d}'
.format(self.height)) .format(self.height))
async def handle_chain_reorg(self): # Flush everything when in caught-up state as queries
hashes = await self.reorg_hashes(self) # are performed on DB not in-memory
hex_hashes = [hash_to_str(hash) for hash in hashes] self.flush(True)
blocks = await self.daemon.raw_blocks(hex_hashes)
for block in reversed(blocks):
self.backup_block(block)
await self.prefetcher.clear()
async def reorg_hashes(self): async def force_chain_reorg(self, to_genesis):
try:
await self.handle_chain_reorg(to_genesis)
finally:
self.flush(True)
async def handle_chain_reorg(self, to_genesis=False):
# First get all state on disk
self.logger.info('chain reorg detected')
self.flush(True)
self.logger.info('finding common height...')
hashes = await self.reorg_hashes(to_genesis)
# Reverse and convert to hex strings.
hashes = [hash_to_str(hash) for hash in reversed(hashes)]
for hex_hashes in chunks(hashes, 50):
blocks = await self.daemon.raw_blocks(hex_hashes)
self.backup_blocks(blocks)
self.logger.info('backed up to height {:,d}'.format(self.height))
await self.prefetcher.clear(self.height)
self.logger.info('prefetcher reset')
async def reorg_hashes(self, to_genesis):
'''Return the list of hashes to back up beacuse of a reorg. '''Return the list of hashes to back up beacuse of a reorg.
The hashes are returned in order of increasing height.''' The hashes are returned in order of increasing height.'''
@ -211,27 +243,26 @@ class BlockProcessor(LoggedClass):
return n return n
return -1 return -1
self.logger.info('chain reorg detected; finding common height...')
start = self.height - 1 start = self.height - 1
count = 1 count = 1
while True: while start > 0:
self.logger.info('start: {:,d} count: {:,d}'.format(start, count))
hashes = self.fs_cache.block_hashes(start, count) hashes = self.fs_cache.block_hashes(start, count)
hex_hashes = [hash_to_str(hash) for hash in hashes]
d_hex_hashes = await self.daemon.block_hex_hashes(start, count) d_hex_hashes = await self.daemon.block_hex_hashes(start, count)
d_hashes = [bytes.fromhex(hex_hash) for hex_hash in d_hex_hashes] n = match_pos(hex_hashes, d_hex_hashes)
n = match_pos(hashes, d_hashes) if n >= 0 and not to_genesis:
if n >= 0: start += n + 1
break break
assert start > 0
count = min(count * 2, start) count = min(count * 2, start)
start -= count start -= count
# Hashes differ from height 'start' # Hashes differ from height 'start'
start += n + 1
count = (self.height - start) + 1 count = (self.height - start) + 1
self.logger.info('chain was reorganised for {:,d} blocks starting ' self.logger.info('chain was reorganised for {:,d} blocks from '
'at height {:,d}', start, count) 'height {:,d} to height {:,d}'
.format(count, start, start + count - 1))
return self.fs_cache.block_hashes(start, count) return self.fs_cache.block_hashes(start, count)
@ -244,11 +275,9 @@ class BlockProcessor(LoggedClass):
db = plyvel.DB(db_name, create_if_missing=True, db = plyvel.DB(db_name, create_if_missing=True,
error_if_exists=True, compression=None) error_if_exists=True, compression=None)
self.logger.info('created new database {}'.format(db_name)) self.logger.info('created new database {}'.format(db_name))
self.flush_state(db)
else: else:
self.logger.info('successfully opened database {}'.format(db_name)) self.logger.info('successfully opened database {}'.format(db_name))
self.read_state(db) self.read_state(db)
self.delete_excess_history(db)
return db return db
@ -261,37 +290,58 @@ class BlockProcessor(LoggedClass):
self.coin.GENESIS_HASH)) self.coin.GENESIS_HASH))
self.db_height = state['height'] self.db_height = state['height']
self.db_tx_count = state['tx_count'] self.db_tx_count = state['tx_count']
self.tip = state['tip'] self.db_tip = state['tip']
self.flush_count = state['flush_count'] self.flush_count = state['flush_count']
self.utxo_flush_count = state['utxo_flush_count'] self.utxo_flush_count = state['utxo_flush_count']
self.wall_time = state['wall_time'] self.wall_time = state['wall_time']
def delete_excess_history(self, db): def clean_db(self):
'''Clear history flushed since the most recent UTXO flush.''' '''Clean out stale DB items.
utxo_flush_count = self.utxo_flush_count
diff = self.flush_count - utxo_flush_count Stale DB items are excess history flushed since the most
if diff == 0: recent UTXO flush (only happens on unclean shutdown), and aged
return undo information.
if diff < 0: '''
if self.flush_count < self.utxo_flush_count:
raise ChainError('DB corrupt: flush_count < utxo_flush_count') raise ChainError('DB corrupt: flush_count < utxo_flush_count')
with self.db.write_batch(transaction=True) as batch:
if self.flush_count > self.utxo_flush_count:
self.logger.info('DB shut down uncleanly. Scanning for '
'excess history flushes...')
self.remove_excess_history(batch)
self.utxo_flush_count = self.flush_count
self.remove_stale_undo_items(batch)
self.flush_state(batch)
self.logger.info('DB not shut down cleanly. Scanning for most ' def remove_excess_history(self, batch):
'recent {:,d} history flushes'.format(diff))
prefix = b'H' prefix = b'H'
unpack = struct.unpack unpack = struct.unpack
keys = [] keys = []
for key, hist in db.iterator(prefix=prefix): for key, hist in self.db.iterator(prefix=prefix):
flush_id, = unpack('>H', key[-2:]) flush_id, = unpack('>H', key[-2:])
if flush_id > self.utxo_flush_count: if flush_id > self.utxo_flush_count:
keys.append(key) keys.append(key)
self.logger.info('deleting {:,d} history entries'.format(len(keys))) self.logger.info('deleting {:,d} history entries'
with db.write_batch(transaction=True) as batch: .format(len(keys)))
for key in keys: for key in keys:
db.delete(key) batch.delete(key)
self.utxo_flush_count = self.flush_count
self.flush_state(batch) def remove_stale_undo_items(self, batch):
self.logger.info('deletion complete') prefix = b'U'
unpack = struct.unpack
cutoff = self.db_height - self.reorg_limit
keys = []
for key, hist in self.db.iterator(prefix=prefix):
height, = unpack('>I', key[-4:])
if height > cutoff:
break
keys.append(key)
self.logger.info('deleting {:,d} stale undo entries'
.format(len(keys)))
for key in keys:
batch.delete(key)
def flush_state(self, batch): def flush_state(self, batch):
'''Flush chain state to the batch.''' '''Flush chain state to the batch.'''
@ -302,7 +352,7 @@ class BlockProcessor(LoggedClass):
'genesis': self.coin.GENESIS_HASH, 'genesis': self.coin.GENESIS_HASH,
'height': self.db_height, 'height': self.db_height,
'tx_count': self.db_tx_count, 'tx_count': self.db_tx_count,
'tip': self.tip, 'tip': self.db_tip,
'flush_count': self.flush_count, 'flush_count': self.flush_count,
'utxo_flush_count': self.utxo_flush_count, 'utxo_flush_count': self.utxo_flush_count,
'wall_time': self.wall_time, 'wall_time': self.wall_time,
@ -317,63 +367,83 @@ class BlockProcessor(LoggedClass):
self.utxo_flush_count = self.flush_count self.utxo_flush_count = self.flush_count
self.db_tx_count = self.tx_count self.db_tx_count = self.tx_count
self.db_height = self.height self.db_height = self.height
self.db_tip = self.tip
def assert_flushed(self):
'''Asserts state is fully flushed.'''
assert self.tx_count == self.db_tx_count
assert not self.history
assert not self.utxo_cache.cache
assert not self.utxo_cache.db_cache
assert not self.backup_hash168s
def flush(self, flush_utxos=False): def flush(self, flush_utxos=False):
'''Flush out cached state. '''Flush out cached state.
History is always flushed. UTXOs are flushed if flush_utxos.''' History is always flushed. UTXOs are flushed if flush_utxos.'''
if self.height == self.db_height:
self.logger.info('nothing to flush')
self.assert_flushed()
return
flush_start = time.time() flush_start = time.time()
last_flush = self.last_flush last_flush = self.last_flush
tx_diff = self.tx_count - self.db_tx_count
# Write out the files to the FS before flushing to the DB. If # Write out the files to the FS before flushing to the DB. If
# the DB transaction fails, the files being too long doesn't # the DB transaction fails, the files being too long doesn't
# matter. But if writing the files fails we do not want to # matter. But if writing the files fails we do not want to
# have updated the DB. # have updated the DB.
tx_diff = self.fs_cache.flush(self.height, self.tx_count) if self.height > self.db_height:
self.fs_cache.flush(self.height, self.tx_count)
with self.db.write_batch(transaction=True) as batch: with self.db.write_batch(transaction=True) as batch:
# History first - fast and frees memory. Flush state last # History first - fast and frees memory. Flush state last
# as it reads the wall time. # as it reads the wall time.
self.flush_history(batch) if self.height > self.db_height:
self.flush_history(batch)
else:
self.backup_history(batch)
if flush_utxos: if flush_utxos:
self.flush_utxos(batch) self.flush_utxos(batch)
self.flush_state(batch) self.flush_state(batch)
self.logger.info('committing transaction...') self.logger.info('committing transaction...')
# Update and put the wall time again - otherwise we drop the # Update and put the wall time again - otherwise we drop the
# time it took leveldb to commit the batch # time it took to commit the batch
self.flush_state(self.db) self.flush_state(self.db)
flush_time = int(self.last_flush - flush_start) flush_time = int(self.last_flush - flush_start)
self.logger.info('flush #{:,d} to height {:,d} took {:,d}s' self.logger.info('flush #{:,d} to height {:,d} txs: {:,d} took {:,d}s'
.format(self.flush_count, self.height, flush_time)) .format(self.flush_count, self.height, self.tx_count,
flush_time))
# Log handy stats
daemon_height = self.daemon.cached_height() # Catch-up stats
txs_per_sec = int(self.tx_count / self.wall_time) if not self.caught_up and tx_diff > 0:
this_txs_per_sec = 1 + int(tx_diff / (self.last_flush - last_flush)) daemon_height = self.daemon.cached_height()
if self.height > self.coin.TX_COUNT_HEIGHT: txs_per_sec = int(self.tx_count / self.wall_time)
tx_est = (daemon_height - self.height) * self.coin.TX_PER_BLOCK this_txs_per_sec = 1 + int(tx_diff / (self.last_flush - last_flush))
else: if self.height > self.coin.TX_COUNT_HEIGHT:
tx_est = ((daemon_height - self.coin.TX_COUNT_HEIGHT) tx_est = (daemon_height - self.height) * self.coin.TX_PER_BLOCK
* self.coin.TX_PER_BLOCK else:
+ (self.coin.TX_COUNT - self.tx_count)) tx_est = ((daemon_height - self.coin.TX_COUNT_HEIGHT)
* self.coin.TX_PER_BLOCK
self.logger.info('txs: {:,d} tx/sec since genesis: {:,d}, ' + (self.coin.TX_COUNT - self.tx_count))
'since last flush: {:,d}'
.format(self.tx_count, txs_per_sec, this_txs_per_sec)) self.logger.info('tx/sec since genesis: {:,d}, '
self.logger.info('sync time: {} ETA: {}' 'since last flush: {:,d}'
.format(formatted_time(self.wall_time), .format(txs_per_sec, this_txs_per_sec))
formatted_time(tx_est / this_txs_per_sec))) self.logger.info('sync time: {} ETA: {}'
.format(formatted_time(self.wall_time),
formatted_time(tx_est / this_txs_per_sec)))
def flush_history(self, batch): def flush_history(self, batch):
self.logger.info('flushing history') self.logger.info('flushing history')
assert not self.backup_hash168s
# Drop any None entry
self.history.pop(None, None)
self.flush_count += 1 self.flush_count += 1
flush_id = struct.pack('>H', self.flush_count) flush_id = struct.pack('>H', self.flush_count)
for hash168, hist in self.history.items(): for hash168, hist in self.history.items():
key = b'H' + hash168 + flush_id key = b'H' + hash168 + flush_id
batch.put(key, hist.tobytes()) batch.put(key, hist.tobytes())
@ -384,6 +454,39 @@ class BlockProcessor(LoggedClass):
self.history = defaultdict(partial(array.array, 'I')) self.history = defaultdict(partial(array.array, 'I'))
self.history_size = 0 self.history_size = 0
def backup_history(self, batch):
self.logger.info('backing up history to height {:,d} tx_count {:,d}'
.format(self.height, self.tx_count))
# Drop any NO_CACHE entry
self.backup_hash168s.discard(NO_CACHE_ENTRY)
assert not self.history
nremoves = 0
for hash168 in sorted(self.backup_hash168s):
prefix = b'H' + hash168
deletes = []
puts = {}
for key, hist in self.db.iterator(reverse=True, prefix=prefix):
a = array.array('I')
a.frombytes(hist)
# Remove all history entries >= self.tx_count
idx = bisect_left(a, self.tx_count)
nremoves += len(a) - idx
if idx > 0:
puts[key] = a[:idx].tobytes()
break
deletes.append(key)
for key in deletes:
batch.delete(key)
for key, value in puts.items():
batch.put(key, value)
self.logger.info('removed {:,d} history entries from {:,d} addresses'
.format(nremoves, len(self.backup_hash168s)))
self.backup_hash168s = set()
def cache_sizes(self): def cache_sizes(self):
'''Returns the approximate size of the cache, in MB.''' '''Returns the approximate size of the cache, in MB.'''
# Good average estimates based on traversal of subobjects and # Good average estimates based on traversal of subobjects and
@ -400,15 +503,27 @@ class BlockProcessor(LoggedClass):
self.logger.info('cache stats at height {:,d} daemon height: {:,d}' self.logger.info('cache stats at height {:,d} daemon height: {:,d}'
.format(self.height, self.daemon.cached_height())) .format(self.height, self.daemon.cached_height()))
self.logger.info(' entries: UTXO: {:,d} DB: {:,d} ' self.logger.info(' entries: UTXO: {:,d} DB: {:,d} '
'hist addrs: {:,d} hist size: {:,d}' 'hist addrs: {:,d} hist size {:,d}'
.format(len(self.utxo_cache.cache), .format(len(self.utxo_cache.cache),
len(self.utxo_cache.db_cache), len(self.utxo_cache.db_cache),
len(self.history), self.history_size,
self.history_size)) len(self.history)))
self.logger.info(' size: {:,d}MB (UTXOs {:,d}MB hist {:,d}MB)' self.logger.info(' size: {:,d}MB (UTXOs {:,d}MB hist {:,d}MB)'
.format(utxo_MB + hist_MB, utxo_MB, hist_MB)) .format(utxo_MB + hist_MB, utxo_MB, hist_MB))
return utxo_MB, hist_MB return utxo_MB, hist_MB
def undo_key(self, height):
'''DB key for undo information at the given height.'''
return b'U' + struct.pack('>I', height)
def write_undo_info(self, height, undo_info):
'''Write out undo information for the current height.'''
self.db.put(self.undo_key(height), undo_info)
def read_undo_info(self, height):
'''Read undo information from a file for the current height.'''
return self.db.get(self.undo_key(height))
def advance_block(self, block): def advance_block(self, block):
# We must update the fs_cache before calling advance_txs() as # We must update the fs_cache before calling advance_txs() as
# the UTXO cache uses the fs_cache via get_tx_hash() to # the UTXO cache uses the fs_cache via get_tx_hash() to
@ -421,7 +536,9 @@ class BlockProcessor(LoggedClass):
self.tip = header_hash self.tip = header_hash
self.height += 1 self.height += 1
self.advance_txs(tx_hashes, txs) undo_info = self.advance_txs(tx_hashes, txs)
if self.daemon.cached_height() - self.height <= self.reorg_limit:
self.write_undo_info(self.height, b''.join(undo_info))
# Check if we're getting full and time to flush? # Check if we're getting full and time to flush?
now = time.time() now = time.time()
@ -434,28 +551,105 @@ class BlockProcessor(LoggedClass):
return True return True
def advance_txs(self, tx_hashes, txs): def advance_txs(self, tx_hashes, txs):
cache = self.utxo_cache put_utxo = self.utxo_cache.put
spend_utxo = self.utxo_cache.spend
undo_info = []
# Use local vars for speed in the loops
history = self.history
tx_num = self.tx_count tx_num = self.tx_count
coin = self.coin
parse_script = ScriptPubKey.from_script
pack = struct.pack
for tx_hash, tx in zip(tx_hashes, txs): for tx, tx_hash in zip(txs, tx_hashes):
# Add the outputs as new UTXOs; spend the inputs hash168s = set()
hash168s = cache.add_many(tx_hash, tx_num, tx.outputs) tx_numb = pack('<I', tx_num)
# Spend the inputs
if not tx.is_coinbase: if not tx.is_coinbase:
for txin in tx.inputs: for txin in tx.inputs:
hash168s.add(cache.spend(txin)) cache_value = spend_utxo(txin.prev_hash, txin.prev_idx)
undo_info.append(cache_value)
hash168s.add(cache_value[:21])
# Add the new UTXOs
for idx, txout in enumerate(tx.outputs):
# Get the hash168. Ignore scripts we can't grok.
hash168 = parse_script(txout.pk_script, coin).hash168
if hash168:
hash168s.add(hash168)
put_utxo(tx_hash + pack('<H', idx),
hash168 + tx_numb + pack('<Q', txout.value))
# Drop any NO_CACHE entry
hash168s.discard(NO_CACHE_ENTRY)
for hash168 in hash168s: for hash168 in hash168s:
self.history[hash168].append(tx_num) history[hash168].append(tx_num)
self.history_size += len(hash168s) self.history_size += len(hash168s)
tx_num += 1 tx_num += 1
self.tx_count = tx_num self.tx_count = tx_num
def backup_block(self, block): return undo_info
pass
def backup_blocks(self, blocks):
'''Backup the blocks and flush.
def undo_txs(self, tx_hashes, txs): The blocks should be in order of decreasing height.
pass A flush is performed once the blocks are backed up.
'''
self.logger.info('backing up {:,d} blocks'.format(len(blocks)))
self.assert_flushed()
for block in blocks:
header, tx_hashes, txs = self.coin.read_block(block)
prev_hash, header_hash = self.coin.header_hashes(header)
if header_hash != self.tip:
raise ChainError('backup block {} is not tip {} at height {:,d}'
.format(hash_to_str(header_hash),
hash_to_str(self.tip), self.height))
self.backup_txs(tx_hashes, txs)
self.fs_cache.backup_block()
self.tip = prev_hash
self.height -= 1
self.logger.info('backed up to height {:,d}'.format(self.height))
self.flush(True)
def backup_txs(self, tx_hashes, txs):
# Prevout values, in order down the block (coinbase first if present)
# undo_info is in reverse block order
undo_info = self.read_undo_info(self.height)
n = len(undo_info)
# Use local vars for speed in the loops
pack = struct.pack
put_utxo = self.utxo_cache.put
spend_utxo = self.utxo_cache.spend
hash168s = self.backup_hash168s
rtxs = reversed(txs)
rtx_hashes = reversed(tx_hashes)
for tx_hash, tx in zip(rtx_hashes, rtxs):
# Spend the outputs
for idx, txout in enumerate(tx.outputs):
cache_value = spend_utxo(tx_hash, idx)
hash168s.add(cache_value[:21])
# Restore the inputs
if not tx.is_coinbase:
for txin in reversed(tx.inputs):
n -= 33
undo_item = undo_info[n:n+33]
put_utxo(txin.prev_hash + pack('<H', txin.prev_idx),
undo_item)
hash168s.add(undo_item[:21])
assert n == 0
self.tx_count -= len(txs)
@staticmethod @staticmethod
def resolve_limit(limit): def resolve_limit(limit):

108
server/cache.py

@ -7,7 +7,6 @@ import os
import struct import struct
from bisect import bisect_right from bisect import bisect_right
from lib.script import ScriptPubKey
from lib.util import chunks, LoggedClass from lib.util import chunks, LoggedClass
from lib.hash import double_sha256, hash_to_str from lib.hash import double_sha256, hash_to_str
@ -17,6 +16,8 @@ HIST_ENTRIES_PER_KEY = 1024
HIST_VALUE_BYTES = HIST_ENTRIES_PER_KEY * 4 HIST_VALUE_BYTES = HIST_ENTRIES_PER_KEY * 4
ADDR_TX_HASH_LEN = 4 ADDR_TX_HASH_LEN = 4
UTXO_TX_HASH_LEN = 4 UTXO_TX_HASH_LEN = 4
NO_HASH_168 = bytes([255]) * 21
NO_CACHE_ENTRY = NO_HASH_168 + bytes(12)
class UTXOCache(LoggedClass): class UTXOCache(LoggedClass):
@ -76,84 +77,47 @@ class UTXOCache(LoggedClass):
self.parent = parent self.parent = parent
self.coin = coin self.coin = coin
self.cache = {} self.cache = {}
self.put = self.cache.__setitem__
self.db = db self.db = db
self.db_cache = {} self.db_cache = {}
# Statistics # Statistics
self.adds = 0 self.cache_spends = 0
self.cache_hits = 0
self.db_deletes = 0 self.db_deletes = 0
def add_many(self, tx_hash, tx_num, txouts): def spend(self, prev_hash, prev_idx):
'''Add a sequence of UTXOs to the cache, return the set of hash168s '''Spend a UTXO and return the cache's value.
seen.
Pass the hash of the TX it appears in, its TX number, and the
TX outputs.
'''
parse_script = ScriptPubKey.from_script
pack = struct.pack
tx_numb = pack('<I', tx_num)
hash168s = set()
self.adds += len(txouts)
for idx, txout in enumerate(txouts):
# Get the hash168. Ignore scripts we can't grok.
pk = parse_script(txout.pk_script, self.coin)
hash168 = pk.hash168
if not hash168:
continue
hash168s.add(hash168)
key = tx_hash + pack('<H', idx)
# Well-known duplicate coinbases from heights 91722-91880
# that destoyed 100 BTC forever:
# e3bf3d07d4b0375638d5f1db5255fe07ba2c4cb067cd81b84ee974b6585fb468
# d5d27987d2a3dfc724e359870c6644b40e497bdc0589a033220fe15429d88599
#if key in self.cache:
# self.logger.info('duplicate tx hash {}'
# .format(hash_to_str(tx_hash)))
self.cache[key] = hash168 + tx_numb + pack('<Q', txout.value)
return hash168s
def spend(self, txin):
'''Spend a UTXO and return the address spent.
If the UTXO is not in the cache it must be on disk. If the UTXO is not in the cache it must be on disk.
''' '''
# Fast track is it's in the cache # Fast track is it's in the cache
pack = struct.pack pack = struct.pack
key = txin.prev_hash + pack('<H', txin.prev_idx) idx_packed = pack('<H', prev_idx)
value = self.cache.pop(key, None) value = self.cache.pop(prev_hash + idx_packed, None)
if value: if value:
self.cache_hits += 1 self.cache_spends += 1
return value[:21] return value
# Oh well. Find and remove it from the DB. # Oh well. Find and remove it from the DB.
hash168 = self.hash168(txin.prev_hash, txin.prev_idx) hash168 = self.hash168(prev_hash, idx_packed)
if not hash168: if not hash168:
return None return NO_CACHE_ENTRY
self.db_deletes += 1 self.db_deletes += 1
# Read the UTXO through the cache from the disk. We have to # Read the UTXO through the cache from the disk. We have to
# go through the cache because compressed keys can collide. # go through the cache because compressed keys can collide.
key = (b'u' + hash168 + txin.prev_hash[:UTXO_TX_HASH_LEN] key = b'u' + hash168 + prev_hash[:UTXO_TX_HASH_LEN] + idx_packed
+ pack('<H', txin.prev_idx))
data = self.cache_get(key) data = self.cache_get(key)
if data is None: if data is None:
# Uh-oh, this should not happen... # Uh-oh, this should not happen...
self.logger.error('found no UTXO for {} / {:d} key {}' self.logger.error('found no UTXO for {} / {:d} key {}'
.format(hash_to_str(txin.prev_hash), .format(hash_to_str(prev_hash), prev_idx,
txin.prev_idx, bytes(key).hex())) bytes(key).hex()))
return hash168 return NO_CACHE_ENTRY
if len(data) == 12: if len(data) == 12:
(tx_num, ) = struct.unpack('<I', data[:4])
self.cache_delete(key) self.cache_delete(key)
return hash168 return hash168 + data
# Resolve the compressed key collison. These should be # Resolve the compressed key collison. These should be
# extremely rare. # extremely rare.
@ -161,26 +125,25 @@ class UTXOCache(LoggedClass):
for n in range(0, len(data), 12): for n in range(0, len(data), 12):
(tx_num, ) = struct.unpack('<I', data[n:n+4]) (tx_num, ) = struct.unpack('<I', data[n:n+4])
tx_hash, height = self.parent.get_tx_hash(tx_num) tx_hash, height = self.parent.get_tx_hash(tx_num)
if txin.prev_hash == tx_hash: if prev_hash == tx_hash:
data = data[:n] + data[n + 12:] result = hash168 + data[n: n+12]
data = data[:n] + data[n+12:]
self.cache_write(key, data) self.cache_write(key, data)
return hash168 return result
raise Exception('could not resolve UTXO key collision') raise Exception('could not resolve UTXO key collision')
def hash168(self, tx_hash, idx): def hash168(self, tx_hash, idx_packed):
'''Return the hash168 paid to by the given TXO. '''Return the hash168 paid to by the given TXO.
Refers to the database. Returns None if not found (which is Refers to the database. Returns None if not found (which is
indicates a non-standard script). indicates a non-standard script).
''' '''
key = b'h' + tx_hash[:ADDR_TX_HASH_LEN] + struct.pack('<H', idx) key = b'h' + tx_hash[:ADDR_TX_HASH_LEN] + idx_packed
data = self.cache_get(key) data = self.cache_get(key)
if data is None: if data is None:
# Assuming the DB is not corrupt, this indicates a # Assuming the DB is not corrupt, this indicates a
# successful spend of a non-standard script # successful spend of a non-standard script
# self.logger.info('ignoring spend of non-standard UTXO {} / {:d}'
# .format(hash_to_str(tx_hash), idx)))
return None return None
if len(data) == 25: if len(data) == 25:
@ -222,6 +185,7 @@ class UTXOCache(LoggedClass):
# may be in the DB already. # may be in the DB already.
hcolls = ucolls = 0 hcolls = ucolls = 0
new_utxos = len(self.cache) new_utxos = len(self.cache)
for cache_key, cache_value in self.cache.items(): for cache_key, cache_value in self.cache.items():
# Frist write to the hash168 lookup table # Frist write to the hash168 lookup table
key = b'h' + cache_key[:ADDR_TX_HASH_LEN] + cache_key[-2:] key = b'h' + cache_key[:ADDR_TX_HASH_LEN] + cache_key[-2:]
@ -244,6 +208,7 @@ class UTXOCache(LoggedClass):
# GC-ing this now can only help the levelDB write. # GC-ing this now can only help the levelDB write.
self.cache = {} self.cache = {}
self.put = self.cache.__setitem__
# Now we can update to the batch. # Now we can update to the batch.
for key, value in self.db_cache.items(): for key, value in self.db_cache.items():
@ -254,13 +219,15 @@ class UTXOCache(LoggedClass):
self.db_cache = {} self.db_cache = {}
adds = new_utxos + self.cache_spends
self.logger.info('UTXO cache adds: {:,d} spends: {:,d} ' self.logger.info('UTXO cache adds: {:,d} spends: {:,d} '
.format(self.adds, self.cache_hits)) .format(adds, self.cache_spends))
self.logger.info('UTXO DB adds: {:,d} spends: {:,d}. ' self.logger.info('UTXO DB adds: {:,d} spends: {:,d}. '
'Collisions: hash168: {:,d} UTXO: {:,d}' 'Collisions: hash168: {:,d} UTXO: {:,d}'
.format(new_utxos, self.db_deletes, .format(new_utxos, self.db_deletes,
hcolls, ucolls)) hcolls, ucolls))
self.adds = self.cache_hits = self.db_deletes = 0 self.cache_spends = self.db_deletes = 0
class FSCache(LoggedClass): class FSCache(LoggedClass):
@ -311,9 +278,15 @@ class FSCache(LoggedClass):
self.tx_hashes.append(tx_hashes) self.tx_hashes.append(tx_hashes)
self.tx_counts.append(prior_tx_count + len(txs)) self.tx_counts.append(prior_tx_count + len(txs))
def backup_block(self, block): def backup_block(self):
'''Revert a block and return (header, tx_hashes, txs)''' '''Revert a block.'''
pass assert not self.headers
assert not self.tx_hashes
assert self.height >= 0
# Just update in-memory. It doesn't matter if disk files are
# too long, they will be overwritten when advancing.
self.height -= 1
self.tx_counts.pop()
def flush(self, new_height, new_tx_count): def flush(self, new_height, new_tx_count):
'''Flush the things stored on the filesystem. '''Flush the things stored on the filesystem.
@ -326,9 +299,10 @@ class FSCache(LoggedClass):
txs_done = cur_tx_count - prior_tx_count txs_done = cur_tx_count - prior_tx_count
assert self.height + blocks_done == new_height assert self.height + blocks_done == new_height
assert cur_tx_count == new_tx_count
assert len(self.tx_hashes) == blocks_done assert len(self.tx_hashes) == blocks_done
assert len(self.tx_counts) == new_height + 1 assert len(self.tx_counts) == new_height + 1
assert cur_tx_count == new_tx_count, \
'cur: {:,d} new: {:,d}'.format(cur_tx_count, new_tx_count)
# First the headers # First the headers
headers = b''.join(self.headers) headers = b''.join(self.headers)
@ -364,8 +338,6 @@ class FSCache(LoggedClass):
self.headers = [] self.headers = []
self.height += blocks_done self.height += blocks_done
return txs_done
def read_headers(self, height, count): def read_headers(self, height, count):
read_count = min(count, self.height + 1 - height) read_count = min(count, self.height + 1 - height)

1
server/env.py

@ -25,6 +25,7 @@ class Env(LoggedClass):
self.tcp_port = self.integer('TCP_PORT', None) self.tcp_port = self.integer('TCP_PORT', None)
self.ssl_port = self.integer('SSL_PORT', None) self.ssl_port = self.integer('SSL_PORT', None)
self.rpc_port = self.integer('RPC_PORT', 8000) self.rpc_port = self.integer('RPC_PORT', 8000)
self.reorg_limit = self.integer('REORG_LIMIT', 200)
self.daemon_url = self.build_daemon_url() self.daemon_url = self.build_daemon_url()
self.max_subscriptions = self.integer('MAX_SUBSCRIPTIONS', 10000) self.max_subscriptions = self.integer('MAX_SUBSCRIPTIONS', 10000)
self.banner_file = self.default('BANNER_FILE', None) self.banner_file = self.default('BANNER_FILE', None)

Loading…
Cancel
Save