Browse Source

Merge branch 'better_logs' into develop

master
Neil Booth 8 years ago
parent
commit
4fe7e7dab2
  1. 83
      server/block_processor.py

83
server/block_processor.py

@ -82,7 +82,8 @@ class Prefetcher(LoggedClass):
async def main_loop(self):
'''Loop forever polling for more blocks.'''
self.logger.info('starting daemon poll loop')
self.logger.info('catching up to daemon height {:,d}...'
.format(await self.daemon.height()))
while True:
try:
if await self._caught_up():
@ -117,6 +118,7 @@ class Prefetcher(LoggedClass):
if self.queue_size >= self.target_cache_size:
return [], 0
caught_up = self.daemon.cached_height() == self.fetched_height
daemon_height = await self.daemon.height()
cache_room = self.target_cache_size // self.ave_size
@ -129,6 +131,11 @@ class Prefetcher(LoggedClass):
first = self.fetched_height + 1
hex_hashes = await self.daemon.block_hex_hashes(first, count)
if caught_up:
self.logger.info('new block height {:,d} hash {}'
.format(first + count - 1,
hash_to_str(hex_hashes[-1])))
blocks = await self.daemon.raw_blocks(hex_hashes)
size = sum(len(block) for block in blocks)
@ -353,24 +360,22 @@ class BlockProcessor(server.db.DB):
# UTXO cache
self.utxo_cache = {}
self.utxo_cache_spends = 0
self.db_deletes = []
# Log state
self.logger.info('{}/{} height: {:,d} tx count: {:,d} '
'flush count: {:,d} utxo flush count: {:,d} '
'sync time: {}'
.format(self.coin.NAME, self.coin.NET, self.height,
self.tx_count, self.flush_count,
self.utxo_flush_count,
formatted_time(self.wall_time)))
self.logger.info('reorg limit of {:,d} blocks'
self.logger.info('coin: {}'.format(self.coin.NAME))
self.logger.info('network: {}'.format(self.coin.NET))
self.logger.info('height: {:,d}'.format(self.db_height))
self.logger.info('tx count: {:,d}'.format(self.db_tx_count))
self.logger.info('reorg limit is {:,d} blocks'
.format(self.reorg_limit))
if self.first_sync:
self.logger.info('sync time so far: {}'
.format(formatted_time(self.wall_time)))
self.logger.info('flushing UTXO cache at {:,d} MB'
.format(self.utxo_MB))
self.logger.info('flushing history cache at {:,d} MB'
.format(self.hist_MB))
self.clean_db()
async def main_loop(self):
@ -554,7 +559,6 @@ class BlockProcessor(server.db.DB):
flush_start = time.time()
last_flush = self.last_flush
tx_diff = self.tx_count - self.last_flush_tx_count
show_stats = self.first_sync
if self.height > self.db_height:
assert flush_history is None
@ -567,19 +571,18 @@ class BlockProcessor(server.db.DB):
if flush_utxos:
self.flush_utxos(batch)
self.flush_state(batch)
self.logger.info('committing transaction...')
# Update and put the wall time again - otherwise we drop the
# time it took to commit the batch
self.flush_state(self.db)
self.logger.info('flush #{:,d} to height {:,d} txs: {:,d} '
'took {:,.1f}s'
.format(self.flush_count, self.height, self.tx_count,
self.last_flush - flush_start))
self.logger.info('flush #{:,d} took {:.1f}s. Height {:,d} txs: {:,d}'
.format(self.flush_count,
self.last_flush - flush_start,
self.height, self.tx_count))
# Catch-up stats
if show_stats:
if self.first_sync:
daemon_height = self.daemon.cached_height()
tx_per_sec = int(self.tx_count / self.wall_time)
this_tx_per_sec = 1 + int(tx_diff / (self.last_flush - last_flush))
@ -602,11 +605,9 @@ class BlockProcessor(server.db.DB):
formatted_time(tx_est / this_tx_per_sec)))
def flush_history(self, batch):
fs_flush_start = time.time()
fs_start = time.time()
self.fs_flush()
fs_flush_end = time.time()
self.logger.info('FS flush took {:.1f} seconds'
.format(fs_flush_end - fs_flush_start))
fs_end = time.time()
flush_id = pack('>H', self.flush_count)
@ -614,10 +615,11 @@ class BlockProcessor(server.db.DB):
key = b'H' + hash168 + flush_id
batch.put(key, hist.tobytes())
self.logger.info('flushed {:,d} history entries for {:,d} addrs '
'in {:.1f}s'
.format(self.history_size, len(self.history),
time.time() - fs_flush_end))
if self.first_sync:
self.logger.info('flushed to FS in {:.1f}s, history in {:.1f}s '
'for {:,d} addrs'
.format(fs_end - fs_start, time.time() - fs_end,
len(self.history)))
self.history = defaultdict(partial(array.array, 'I'))
self.history_size = 0
@ -713,13 +715,10 @@ class BlockProcessor(server.db.DB):
utxo_MB = (db_deletes_size + utxo_cache_size) // one_MB
hist_MB = (hist_cache_size + tx_hash_size) // one_MB
self.logger.info('UTXOs: {:,d} deletes: {:,d} '
self.logger.info('our height: {:,d} daemon: {:,d} '
'UTXOs {:,d}MB hist {:,d}MB'
.format(len(self.utxo_cache),
len(self.db_deletes) // 2,
.format(self.height, self.daemon.cached_height(),
utxo_MB, hist_MB))
self.logger.info('our height: {:,d} daemon height: {:,d}'
.format(self.height, self.daemon.cached_height()))
return utxo_MB, hist_MB
def undo_key(self, height):
@ -940,7 +939,6 @@ class BlockProcessor(server.db.DB):
idx_packed = pack('<H', tx_idx)
cache_value = self.utxo_cache.pop(tx_hash + idx_packed, None)
if cache_value:
self.utxo_cache_spends += 1
return cache_value
# Spend it from the DB.
@ -979,14 +977,6 @@ class BlockProcessor(server.db.DB):
# UTXO state may have keys in common with our write cache or
# may be in the DB already.
flush_start = time.time()
self.logger.info('flushing {:,d} blocks with {:,d} txs'
.format(self.height - self.db_height,
self.tx_count - self.db_tx_count))
self.logger.info('UTXO cache adds: {:,d} spends: {:,d} '
'DB spends: {:,d}'
.format(len(self.utxo_cache) + self.utxo_cache_spends,
self.utxo_cache_spends,
len(self.db_deletes) // 2))
batch_delete = batch.delete
for key in self.db_deletes:
@ -1001,17 +991,22 @@ class BlockProcessor(server.db.DB):
batch_put(b'h' + cache_key[:4] + suffix, hash168)
batch_put(b'u' + hash168 + suffix, cache_value[25:])
if self.first_sync:
self.logger.info('flushed {:,d} blocks with {:,d} txs, {:,d} UTXO '
'adds, {:,d} spends in {:.1f}s, committing...'
.format(self.height - self.db_height,
self.tx_count - self.db_tx_count,
len(self.utxo_cache),
len(self.db_deletes) // 2,
time.time() - flush_start))
self.utxo_cache = {}
self.db_deletes = []
self.utxo_cache_spends = 0
self.utxo_flush_count = self.flush_count
self.db_tx_count = self.tx_count
self.db_height = self.height
self.db_tip = self.tip
self.logger.info('UTXO flush took {:.1f} seconds'
.format(time.time() - flush_start))
def read_headers(self, start, count):
# Read some from disk
disk_count = min(count, self.fs_height + 1 - start)

Loading…
Cancel
Save