diff --git a/server/block_processor.py b/server/block_processor.py index 9a89314..9b5a8e9 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -319,6 +319,8 @@ class BlockProcessor(server.db.DB): super().__init__(env) # These are our state as we move ahead of DB state + self.fs_height = self.db_height + self.fs_tx_count = self.db_tx_count self.height = self.db_height self.tip = self.db_tip self.tx_count = self.db_tx_count @@ -526,7 +528,8 @@ class BlockProcessor(server.db.DB): def assert_flushed(self): '''Asserts state is fully flushed.''' - assert self.tx_count == self.db_tx_count + assert self.tx_count == self.fs_tx_count == self.db_tx_count + assert self.height == self.fs_height == self.db_height assert not self.history assert not self.utxo_cache assert not self.db_cache @@ -563,9 +566,10 @@ class BlockProcessor(server.db.DB): # time it took to commit the batch self.flush_state(self.db) - self.logger.info('flush #{:,d} to height {:,d} txs: {:,d} took {:,d}s' + self.logger.info('flush #{:,d} to height {:,d} txs: {:,d} ' + 'took {:,.1f}s' .format(self.flush_count, self.height, self.tx_count, - int(self.last_flush - flush_start))) + self.last_flush - flush_start)) # Catch-up stats if show_stats: @@ -591,7 +595,12 @@ class BlockProcessor(server.db.DB): formatted_time(tx_est / this_tx_per_sec))) def flush_history(self, batch): - flush_start = time.time() + fs_flush_start = time.time() + self.fs_flush() + fs_flush_end = time.time() + self.logger.info('FS flush took {:.1f} seconds' + .format(fs_flush_end - fs_flush_start)) + flush_id = pack('>H', self.flush_count) for hash168, hist in self.history.items(): @@ -599,21 +608,21 @@ class BlockProcessor(server.db.DB): batch.put(key, hist.tobytes()) self.logger.info('flushed {:,d} history entries for {:,d} addrs ' - 'in {:,d}s' + 'in {:.1f}s' .format(self.history_size, len(self.history), - int(time.time() - flush_start))) + time.time() - fs_flush_end)) self.history = defaultdict(partial(array.array, 'I')) self.history_size = 0 def fs_flush(self): '''Flush the things stored on the filesystem.''' blocks_done = len(self.headers) - prior_tx_count = (self.tx_counts[self.db_height] - if self.db_height >= 0 else 0) + prior_tx_count = (self.tx_counts[self.fs_height] + if self.fs_height >= 0 else 0) cur_tx_count = self.tx_counts[-1] if self.tx_counts else 0 txs_done = cur_tx_count - prior_tx_count - assert self.db_height + blocks_done == self.height + assert self.fs_height + blocks_done == self.height assert len(self.tx_hashes) == blocks_done assert len(self.tx_counts) == self.height + 1 assert cur_tx_count == self.tx_count, \ @@ -622,13 +631,13 @@ class BlockProcessor(server.db.DB): # First the headers headers = b''.join(self.headers) header_len = self.coin.HEADER_LEN - self.headers_file.seek((self.db_height + 1) * header_len) + self.headers_file.seek((self.fs_height + 1) * header_len) self.headers_file.write(headers) self.headers_file.flush() # Then the tx counts - self.txcount_file.seek((self.db_height + 1) * self.tx_counts.itemsize) - self.txcount_file.write(self.tx_counts[self.db_height + 1:]) + self.txcount_file.seek((self.fs_height + 1) * self.tx_counts.itemsize) + self.txcount_file.write(self.tx_counts[self.fs_height + 1:]) self.txcount_file.flush() # Finally the hashes @@ -648,7 +657,8 @@ class BlockProcessor(server.db.DB): file_pos += size os.sync() - + self.fs_height = self.height + self.fs_tx_count = self.tx_count self.tx_hashes = [] self.headers = [] @@ -692,9 +702,9 @@ class BlockProcessor(server.db.DB): utxo_cache_size = len(self.utxo_cache) * 187 db_cache_size = len(self.db_cache) * 105 hist_cache_size = len(self.history) * 180 + self.history_size * 4 - tx_hash_size = (self.tx_count - self.db_tx_count) * 74 - utxo_MB = (db_cache_size + utxo_cache_size + tx_hash_size) // one_MB - hist_MB = hist_cache_size // one_MB + tx_hash_size = (self.tx_count - self.fs_tx_count) * 74 + utxo_MB = (db_cache_size + utxo_cache_size) // one_MB + hist_MB = (hist_cache_size + tx_hash_size) // one_MB self.logger.info('UTXOs: {:,d} deletes: {:,d} ' 'UTXOs {:,d}MB hist {:,d}MB' @@ -978,6 +988,7 @@ class BlockProcessor(server.db.DB): # Care is needed because the writes generated by flushing the # UTXO state may have keys in common with our write cache or # may be in the DB already. + flush_start = time.time() self.logger.info('flushing {:,d} blocks with {:,d} txs' .format(self.height - self.db_height, self.tx_count - self.db_tx_count)) @@ -987,12 +998,6 @@ class BlockProcessor(server.db.DB): self.utxo_cache_spends, self.db_deletes)) - fs_flush_start = time.time() - self.fs_flush() - fs_flush_end = time.time() - self.logger.info('FS flush took {:.1f} seconds' - .format(fs_flush_end - fs_flush_start)) - collisions = 0 new_utxos = len(self.utxo_cache) @@ -1031,18 +1036,18 @@ class BlockProcessor(server.db.DB): self.db_tip = self.tip self.logger.info('UTXO flush took {:.1f} seconds' - .format(time.time() - fs_flush_end)) + .format(time.time() - flush_start)) def read_headers(self, start, count): # Read some from disk - disk_count = min(count, self.db_height + 1 - start) + disk_count = min(count, self.fs_height + 1 - start) result = self.fs_read_headers(start, disk_count) count -= disk_count start += disk_count # The rest from memory if count: - start -= self.db_height + 1 + start -= self.fs_height + 1 if not (count >= 0 and start + count <= len(self.headers)): raise ChainError('{:,d} headers starting at {:,d} not on disk' .format(count, start)) @@ -1056,7 +1061,7 @@ class BlockProcessor(server.db.DB): # Is this unflushed? if tx_hash is None: - tx_hashes = self.tx_hashes[tx_height - (self.db_height + 1)] + tx_hashes = self.tx_hashes[tx_height - (self.fs_height + 1)] tx_hash = tx_hashes[tx_num - self.tx_counts[tx_height - 1]] return tx_hash, tx_height