diff --git a/server/block_processor.py b/server/block_processor.py index c9c86be..aaf1f2c 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -241,7 +241,6 @@ class BlockProcessor(server.db.DB): a real reorg.''' self.logger.info('chain reorg detected') self.flush(True) - self.logger.info('finding common height...') hashes = await self.reorg_hashes(count) # Reverse and convert to hex strings. @@ -305,12 +304,11 @@ class BlockProcessor(server.db.DB): assert not self.utxo_cache assert not self.db_deletes - def flush(self, flush_utxos=False, flush_history=None): + def flush(self, flush_utxos=False): '''Flush out cached state. History is always flushed. UTXOs are flushed if flush_utxos.''' if self.height == self.db_height: - assert flush_history is None self.assert_flushed() return @@ -319,14 +317,10 @@ class BlockProcessor(server.db.DB): last_flush = self.last_flush tx_diff = self.tx_count - self.last_flush_tx_count - if self.height > self.db_height: - assert flush_history is None - flush_history = self.flush_history - with self.db.write_batch() as batch: # History first - fast and frees memory. Flush state last # as it reads the wall time. - flush_history(batch) + self.flush_history(batch) if flush_utxos: self.flush_utxos(batch) self.flush_state(batch) @@ -394,12 +388,36 @@ class BlockProcessor(server.db.DB): self.tx_hashes = [] self.headers = [] - def backup_history(self, batch, hash168s): - self.logger.info('backing up history to height {:,d} tx_count {:,d}' - .format(self.height, self.tx_count)) + def backup_flush(self, hash168s): + '''Like flush() but when backing up. All UTXOs are flushed. + hash168s - sequence of hash168s which were touched by backing + up. Searched for history entries to remove after the backup + height. + ''' + assert self.height < self.db_height assert not self.history + self.flush_count += 1 + flush_start = time.time() + + with self.db.write_batch() as batch: + # Flush state last as it reads the wall time. + self.backup_history(batch, hash168s) + self.flush_utxos(batch) + self.flush_state(batch) + + # Update and put the wall time again - otherwise we drop the + # time it took to commit the batch + self.flush_state(self.db) + + self.logger.info('backup flush #{:,d} took {:.1f}s. ' + 'Height {:,d} txs: {:,d}' + .format(self.flush_count, + self.last_flush - flush_start, + self.height, self.tx_count)) + + def backup_history(self, batch, hash168s): nremoves = 0 for hash168 in sorted(hash168s): prefix = b'H' + hash168 @@ -426,8 +444,8 @@ class BlockProcessor(server.db.DB): assert not self.headers assert not self.tx_hashes - self.logger.info('removed {:,d} history entries from {:,d} addresses' - .format(nremoves, len(hash168s))) + self.logger.info('backing up removed {:,d} history entries from ' + '{:,d} addresses'.format(nremoves, len(hash168s))) def check_cache_size(self): '''Flush a cache if it gets too big.''' @@ -520,7 +538,6 @@ class BlockProcessor(server.db.DB): The blocks should be in order of decreasing height. A flush is performed once the blocks are backed up. ''' - self.logger.info('backing up {:,d} blocks'.format(len(blocks))) self.assert_flushed() for block in blocks: @@ -541,8 +558,7 @@ class BlockProcessor(server.db.DB): # touched includes those passed into this function. That will # generally be empty but is harmless if not. - flush_history = partial(self.backup_history, hash168s=touched) - self.flush(True, flush_history=flush_history) + self.backup_flush(touched) def backup_txs(self, tx_hashes, txs, touched): # Prevout values, in order down the block (coinbase first if present)