From a50d17c5b9781e69647e844639004afe08d664c3 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Thu, 9 Aug 2018 14:02:37 +0900 Subject: [PATCH] Clear data by reference as it's flushed --- electrumx/server/block_processor.py | 22 +++++----------------- electrumx/server/db.py | 10 ++++++++-- 2 files changed, 13 insertions(+), 19 deletions(-) diff --git a/electrumx/server/block_processor.py b/electrumx/server/block_processor.py index eb71d17..c2ad2aa 100644 --- a/electrumx/server/block_processor.py +++ b/electrumx/server/block_processor.py @@ -261,7 +261,11 @@ class BlockProcessor(DB): for hex_hashes in chunks(hashes, 50): raw_blocks = await get_raw_blocks(last, hex_hashes) await self.run_in_thread_with_lock(self.backup_blocks, raw_blocks) - await self.flush_for_backup() + # self.touched can include other addresses which is + # harmless, but remove None. + self.touched.discard(None) + await self.run_in_thread_with_lock( + self.flush_backup, self.flush_data(), self.touched) last -= len(raw_blocks) await self.prefetcher.reset_height(self.height) @@ -326,16 +330,6 @@ class BlockProcessor(DB): self.tx_hashes, self.undo_infos, self.utxo_cache, self.db_deletes, self.tip) - async def flush_for_backup(self): - # self.touched can include other addresses which is - # harmless, but remove None. - self.touched.discard(None) - await self.run_in_thread_with_lock( - self.flush_backup, self.flush_data(), self.touched) - self.db_deletes = [] - self.utxo_cache = {} - self.undo_infos = [] - async def flush(self, flush_utxos): if self.height == self.db_height: self.assert_flushed() @@ -354,12 +348,6 @@ class BlockProcessor(DB): max(coin.TX_COUNT - self.tx_count, 0)) * factor self.flush_dbs(self.flush_data(), flush_utxos, estimated_txs) - self.tx_hashes = [] - self.headers = [] - if flush_utxos: - self.db_deletes = [] - self.utxo_cache = {} - self.undo_infos = [] def check_cache_size(self): '''Flush a cache if it gets too big.''' diff --git a/electrumx/server/db.py b/electrumx/server/db.py index d64183d..3b7019d 100644 --- a/electrumx/server/db.py +++ b/electrumx/server/db.py @@ -222,6 +222,7 @@ class DB(object): else 0) assert len(self.tx_counts) == flush_data.height + 1 hashes = b''.join(flush_data.block_tx_hashes) + flush_data.block_tx_hashes.clear() assert len(hashes) % 32 == 0 assert len(hashes) // 32 == flush_data.tx_count - prior_tx_count @@ -231,6 +232,8 @@ class DB(object): offset = self.header_offset(height_start) self.headers_file.write(offset, b''.join(flush_data.headers)) self.fs_update_header_offsets(offset, height_start, flush_data.headers) + flush_data.headers.clear() + offset = height_start * self.tx_counts.itemsize self.tx_counts_file.write(offset, self.tx_counts[height_start:].tobytes()) @@ -253,11 +256,14 @@ class DB(object): # UTXO state may have keys in common with our write cache or # may be in the DB already. start_time = time.time() + add_count = len(flush_data.adds) + spend_count = len(flush_data.deletes) // 2 # Spends batch_delete = batch.delete for key in sorted(flush_data.deletes): batch_delete(key) + flush_data.deletes.clear() # New UTXOs batch_put = batch.put @@ -267,15 +273,15 @@ class DB(object): suffix = key[-2:] + value[-12:-8] batch_put(b'h' + key[:4] + suffix, hashX) batch_put(b'u' + hashX + suffix, value[-8:]) + flush_data.adds.clear() # New undo information self.flush_undo_infos(batch_put, flush_data.undo_infos) + flush_data.undo_infos.clear() if self.utxo_db.for_sync: block_count = flush_data.height - self.db_height tx_count = flush_data.tx_count - self.db_tx_count - add_count = len(flush_data.adds) - spend_count = len(flush_data.deletes) // 2 elapsed = time.time() - start_time self.logger.info(f'flushed {block_count:,d} blocks with ' f'{tx_count:,d} txs, {add_count:,d} UTXO adds, '