From d87c3dedcf524578c9e32f5e20c79bd67a28014f Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Thu, 9 Aug 2018 07:15:37 +0900 Subject: [PATCH 01/14] Move assert_flushed DB logic to db.py --- electrumx/server/block_processor.py | 4 +--- electrumx/server/db.py | 7 +++++++ 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/electrumx/server/block_processor.py b/electrumx/server/block_processor.py index 97f93a0..3502c09 100644 --- a/electrumx/server/block_processor.py +++ b/electrumx/server/block_processor.py @@ -324,12 +324,10 @@ class BlockProcessor(electrumx.server.db.DB): def assert_flushed(self): '''Asserts state is fully flushed.''' - assert self.tx_count == self.fs_tx_count == self.db_tx_count - assert self.height == self.fs_height == self.db_height assert not self.undo_infos assert not self.utxo_cache assert not self.db_deletes - self.history.assert_flushed() + self.db_assert_flushed(self.tx_count, self.height) async def flush(self, flush_utxos): if self.height == self.db_height: diff --git a/electrumx/server/db.py b/electrumx/server/db.py index b23f87c..3cb36dd 100644 --- a/electrumx/server/db.py +++ b/electrumx/server/db.py @@ -149,6 +149,13 @@ class DB(object): async def header_branch_and_root(self, length, height): return await self.header_mc.branch_and_root(length, height) + # Flushing + def db_assert_flushed(self, to_tx_count, to_height): + '''Asserts state is fully flushed.''' + assert to_tx_count == self.fs_tx_count == self.db_tx_count + assert to_height == self.fs_height == self.db_height + self.history.assert_flushed() + def fs_update_header_offsets(self, offset_start, height_start, headers): if self.coin.STATIC_BLOCK_HEADERS: return From 11c6c919a62ed7614f341af1cfb576bd4a9eb44f Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Thu, 9 Aug 2018 07:30:15 +0900 Subject: [PATCH 02/14] Move fs_flush to db.py and merge with fs_update --- electrumx/server/block_processor.py | 20 +++------ electrumx/server/db.py | 67 ++++++++++++++++------------- 2 files changed, 42 insertions(+), 45 deletions(-) diff --git a/electrumx/server/block_processor.py b/electrumx/server/block_processor.py index 3502c09..a2697fb 100644 --- a/electrumx/server/block_processor.py +++ b/electrumx/server/block_processor.py @@ -344,11 +344,12 @@ class BlockProcessor(electrumx.server.db.DB): tx_diff = self.tx_count - self.last_flush_tx_count # Flush to file system - self.fs_flush() + self.fs_flush(self.height, self.tx_count, self.headers, + self.tx_hashes) + self.tx_hashes = [] + self.headers = [] + fs_end = time.time() - if self.utxo_db.for_sync: - self.logger.info('flushed to FS in {:.1f}s' - .format(fs_end - flush_start)) # History next - it's fast and frees memory hashX_count = self.history.flush() @@ -395,17 +396,6 @@ class BlockProcessor(electrumx.server.db.DB): .format(formatted_time(self.wall_time), formatted_time(tx_est / this_tx_per_sec))) - def fs_flush(self): - '''Flush the things stored on the filesystem.''' - assert self.fs_height + len(self.headers) == self.height - assert self.tx_count == self.tx_counts[-1] if self.tx_counts else 0 - - self.fs_update(self.fs_height, self.headers, self.tx_hashes) - self.fs_height = self.height - self.fs_tx_count = self.tx_count - self.tx_hashes = [] - self.headers = [] - async def backup_flush(self): assert self.height < self.db_height assert not self.headers diff --git a/electrumx/server/db.py b/electrumx/server/db.py index 3cb36dd..817f51e 100644 --- a/electrumx/server/db.py +++ b/electrumx/server/db.py @@ -150,6 +150,43 @@ class DB(object): return await self.header_mc.branch_and_root(length, height) # Flushing + def fs_flush(self, to_height, to_tx_count, headers, block_tx_hashes): + '''Write headers, tx counts and block tx hashes to the filesystem. + No LevelDB state is updated. + + The first height to write is self.fs_height + 1. The FS + metadata is all append-only, so in a crash we just pick up + again from the height stored in the DB. + ''' + prior_tx_count = (self.tx_counts[self.fs_height] + if self.fs_height >= 0 else 0) + assert len(block_tx_hashes) == len(headers) + assert to_height == self.fs_height + len(headers) + assert to_tx_count == self.tx_counts[-1] if self.tx_counts else 0 + assert len(self.tx_counts) == to_height + 1 + hashes = b''.join(block_tx_hashes) + assert len(hashes) % 32 == 0 + assert len(hashes) // 32 == to_tx_count - prior_tx_count + + # Write the headers, tx counts, and tx hashes + start_time = time.time() + height_start = self.fs_height + 1 + offset = self.header_offset(height_start) + self.headers_file.write(offset, b''.join(headers)) + self.fs_update_header_offsets(offset, height_start, headers) + offset = height_start * self.tx_counts.itemsize + self.tx_counts_file.write(offset, + self.tx_counts[height_start:].tobytes()) + offset = prior_tx_count * 32 + self.hashes_file.write(offset, hashes) + + self.fs_height = to_height + self.fs_tx_count = to_tx_count + + if self.utxo_db.for_sync: + elapsed = time.time() - start_time + self.logger.info(f'flushed to FS in {elapsed:.2f}s') + def db_assert_flushed(self, to_tx_count, to_height): '''Asserts state is fully flushed.''' assert to_tx_count == self.fs_tx_count == self.db_tx_count @@ -185,36 +222,6 @@ class DB(object): # Truncate header_mc: header count is 1 more than the height. self.header_mc.truncate(height + 1) - def fs_update(self, fs_height, headers, block_tx_hashes): - '''Write headers, the tx_count array and block tx hashes to disk. - - Their first height is fs_height. No recorded DB state is - updated. These arrays are all append only, so in a crash we - just pick up again from the DB height. - ''' - blocks_done = len(headers) - height_start = fs_height + 1 - new_height = fs_height + blocks_done - prior_tx_count = (self.tx_counts[fs_height] if fs_height >= 0 else 0) - cur_tx_count = self.tx_counts[-1] if self.tx_counts else 0 - txs_done = cur_tx_count - prior_tx_count - - assert len(block_tx_hashes) == blocks_done - assert len(self.tx_counts) == new_height + 1 - hashes = b''.join(block_tx_hashes) - assert len(hashes) % 32 == 0 - assert len(hashes) // 32 == txs_done - - # Write the headers, tx counts, and tx hashes - offset = self.header_offset(height_start) - self.headers_file.write(offset, b''.join(headers)) - self.fs_update_header_offsets(offset, height_start, headers) - offset = height_start * self.tx_counts.itemsize - self.tx_counts_file.write(offset, - self.tx_counts[height_start:].tobytes()) - offset = prior_tx_count * 32 - self.hashes_file.write(offset, hashes) - async def read_headers(self, start_height, count): '''Requires start_height >= 0, count >= 0. Reads as many headers as are available starting at start_height up to count. This From c9631f3438826ccc87068d31c07a8f02d50ac8e7 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Thu, 9 Aug 2018 07:38:51 +0900 Subject: [PATCH 03/14] Move history flushing to DB.flush_history() --- electrumx/server/block_processor.py | 11 +++-------- electrumx/server/db.py | 6 ++++-- electrumx/server/history.py | 9 ++++++++- 3 files changed, 15 insertions(+), 11 deletions(-) diff --git a/electrumx/server/block_processor.py b/electrumx/server/block_processor.py index a2697fb..b157a33 100644 --- a/electrumx/server/block_processor.py +++ b/electrumx/server/block_processor.py @@ -344,18 +344,13 @@ class BlockProcessor(electrumx.server.db.DB): tx_diff = self.tx_count - self.last_flush_tx_count # Flush to file system - self.fs_flush(self.height, self.tx_count, self.headers, + self.flush_fs(self.height, self.tx_count, self.headers, self.tx_hashes) self.tx_hashes = [] self.headers = [] - fs_end = time.time() - - # History next - it's fast and frees memory - hashX_count = self.history.flush() - if self.utxo_db.for_sync: - self.logger.info('flushed history in {:.1f}s for {:,d} addrs' - .format(time.time() - fs_end, hashX_count)) + # Then history + self.flush_history() # Flush state last as it reads the wall time. with self.utxo_db.write_batch() as batch: diff --git a/electrumx/server/db.py b/electrumx/server/db.py index 817f51e..f862af3 100644 --- a/electrumx/server/db.py +++ b/electrumx/server/db.py @@ -150,9 +150,8 @@ class DB(object): return await self.header_mc.branch_and_root(length, height) # Flushing - def fs_flush(self, to_height, to_tx_count, headers, block_tx_hashes): + def flush_fs(self, to_height, to_tx_count, headers, block_tx_hashes): '''Write headers, tx counts and block tx hashes to the filesystem. - No LevelDB state is updated. The first height to write is self.fs_height + 1. The FS metadata is all append-only, so in a crash we just pick up @@ -187,6 +186,9 @@ class DB(object): elapsed = time.time() - start_time self.logger.info(f'flushed to FS in {elapsed:.2f}s') + def flush_history(self): + self.history.flush() + def db_assert_flushed(self, to_tx_count, to_height): '''Asserts state is fully flushed.''' assert to_tx_count == self.fs_tx_count == self.db_tx_count diff --git a/electrumx/server/history.py b/electrumx/server/history.py index b525af8..b78e757 100644 --- a/electrumx/server/history.py +++ b/electrumx/server/history.py @@ -11,6 +11,7 @@ import array import ast import bisect +import time from collections import defaultdict from functools import partial from struct import pack, unpack @@ -119,6 +120,7 @@ class History(object): assert not self.unflushed def flush(self): + start_time = time.time() self.flush_count += 1 flush_id = pack('>H', self.flush_count) unflushed = self.unflushed @@ -132,7 +134,12 @@ class History(object): count = len(unflushed) unflushed.clear() self.unflushed_count = 0 - return count + + if self.db.for_sync: + elapsed = time.time() - start_time + self.logger.info(f'flushed history in {elapsed:.1f}s ' + f'for {count:,d} addrs') + def backup(self, hashXs, tx_count): # Not certain this is needed, but it doesn't hurt From aac84ade75de5035b542bd81d166cddae646f983 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Thu, 9 Aug 2018 08:16:07 +0900 Subject: [PATCH 04/14] Sleep at shutdown --- electrumx/lib/server_base.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/electrumx/lib/server_base.py b/electrumx/lib/server_base.py index 06557b5..db7a213 100644 --- a/electrumx/lib/server_base.py +++ b/electrumx/lib/server_base.py @@ -100,6 +100,9 @@ class ServerBase(object): self.logger.info('shutting down') server_task.cancel() + # Prevent some silly logs + await asyncio.sleep(0.01) + self.logger.info('shutdown complete') def run(self): From 9515e1a1e4a23c73e3e862a4b4b4aa4f33aaf2ff Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Thu, 9 Aug 2018 08:38:14 +0900 Subject: [PATCH 05/14] Improve flow for opening DBs --- compact_history.py | 2 +- contrib/query.py | 2 +- electrumx/server/block_processor.py | 14 +++----------- electrumx/server/db.py | 18 ++++++++++++++---- electrumx/server/history.py | 8 ++++++-- 5 files changed, 25 insertions(+), 19 deletions(-) diff --git a/compact_history.py b/compact_history.py index 9186c64..0a4574b 100755 --- a/compact_history.py +++ b/compact_history.py @@ -48,7 +48,7 @@ async def compact_history(): environ['DAEMON_URL'] = '' # Avoid Env erroring out env = Env() db = DB(env) - await db.open_for_sync() + await db.open_for_compacting() assert not db.first_sync history = db.history diff --git a/contrib/query.py b/contrib/query.py index 60f1696..56dbda8 100755 --- a/contrib/query.py +++ b/contrib/query.py @@ -62,7 +62,7 @@ async def query(args): db = DB(env) coin = env.coin - await db._open_dbs(False) + await db.open_for_serving() if not args.scripts: await print_stats(db.hist_db, db.utxo_db) diff --git a/electrumx/server/block_processor.py b/electrumx/server/block_processor.py index b157a33..b30a0a4 100644 --- a/electrumx/server/block_processor.py +++ b/electrumx/server/block_processor.py @@ -159,7 +159,6 @@ class BlockProcessor(electrumx.server.db.DB): self.prefetcher = Prefetcher(daemon, env.coin, self.blocks_event) # Meta - self.cache_MB = env.cache_MB self.next_cache_check = 0 self.last_flush = time.time() self.touched = set() @@ -448,8 +447,9 @@ class BlockProcessor(electrumx.server.db.DB): # Flush history if it takes up over 20% of cache memory. # Flush UTXOs once they take up 80% of cache memory. - if utxo_MB + hist_MB >= self.cache_MB or hist_MB >= self.cache_MB // 5: - return utxo_MB >= self.cache_MB * 4 // 5 + cache_MB = self.env.cache_MB + if utxo_MB + hist_MB >= cache_MB or hist_MB >= cache_MB // 5: + return utxo_MB >= cache_MB * 4 // 5 return None def advance_blocks(self, blocks): @@ -755,18 +755,10 @@ class BlockProcessor(electrumx.server.db.DB): async def _first_open_dbs(self): await self.open_for_sync() - # An incomplete compaction needs to be cancelled otherwise - # restarting it will corrupt the history - self.history.cancel_compaction() - # These are our state as we move ahead of DB state - self.fs_height = self.db_height - self.fs_tx_count = self.db_tx_count self.height = self.db_height self.tip = self.db_tip self.tx_count = self.db_tx_count self.last_flush_tx_count = self.tx_count - if self.utxo_db.for_sync: - self.logger.info(f'flushing DB cache at {self.cache_MB:,d} MB') # --- External API diff --git a/electrumx/server/db.py b/electrumx/server/db.py index f862af3..b705258 100644 --- a/electrumx/server/db.py +++ b/electrumx/server/db.py @@ -90,7 +90,7 @@ class DB(object): else: assert self.db_tx_count == 0 - async def _open_dbs(self, for_sync): + async def _open_dbs(self, for_sync, compacting): assert self.utxo_db is None # First UTXO DB @@ -110,12 +110,16 @@ class DB(object): # Then history DB self.utxo_flush_count = self.history.open_db(self.db_class, for_sync, - self.utxo_flush_count) + self.utxo_flush_count, + compacting) self.clear_excess_undo_info() # Read TX counts (requires meta directory) await self._read_tx_counts() + async def open_for_compacting(self): + await self._open_dbs(True, True) + async def open_for_sync(self): '''Open the databases to sync to the daemon. @@ -123,7 +127,7 @@ class DB(object): synchronization. When serving clients we want the open files for serving network connections. ''' - await self._open_dbs(True) + await self._open_dbs(True, False) async def open_for_serving(self): '''Open the databases for serving. If they are already open they are @@ -134,7 +138,7 @@ class DB(object): self.utxo_db.close() self.history.close_db() self.utxo_db = None - await self._open_dbs(False) + await self._open_dbs(False, False) # Header merkle cache @@ -395,6 +399,10 @@ class DB(object): self.wall_time = state['wall_time'] self.first_sync = state['first_sync'] + # These are our state as we move ahead of DB state + self.fs_height = self.db_height + self.fs_tx_count = self.db_tx_count + # Log some stats self.logger.info('DB version: {:d}'.format(self.db_version)) self.logger.info('coin: {}'.format(self.coin.NAME)) @@ -402,6 +410,8 @@ class DB(object): self.logger.info('height: {:,d}'.format(self.db_height)) self.logger.info('tip: {}'.format(hash_to_hex_str(self.db_tip))) self.logger.info('tx count: {:,d}'.format(self.db_tx_count)) + if self.utxo_db.for_sync: + self.logger.info(f'flushing DB cache at {self.env.cache_MB:,d} MB') if self.first_sync: self.logger.info('sync time so far: {}' .format(util.formatted_time(self.wall_time))) diff --git a/electrumx/server/history.py b/electrumx/server/history.py index b78e757..eaab4af 100644 --- a/electrumx/server/history.py +++ b/electrumx/server/history.py @@ -32,10 +32,14 @@ class History(object): self.unflushed_count = 0 self.db = None - def open_db(self, db_class, for_sync, utxo_flush_count): + def open_db(self, db_class, for_sync, utxo_flush_count, compacting): self.db = db_class('hist', for_sync) self.read_state() self.clear_excess(utxo_flush_count) + # An incomplete compaction needs to be cancelled otherwise + # restarting it will corrupt the history + if not compacting: + self._cancel_compaction() return self.flush_count def close_db(self): @@ -314,7 +318,7 @@ class History(object): 100 * cursor / 65536)) return write_size - def cancel_compaction(self): + def _cancel_compaction(self): if self.comp_cursor != -1: self.logger.warning('cancelling in-progress history compaction') self.comp_flush_count = -1 From d1510b1192513b958b193a3c1e1e29605fd4a1db Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Thu, 9 Aug 2018 08:57:16 +0900 Subject: [PATCH 06/14] Move bulk of UTXO flush logic to db.py --- electrumx/server/block_processor.py | 39 +++-------------------------- electrumx/server/db.py | 39 +++++++++++++++++++++++++++++ 2 files changed, 42 insertions(+), 36 deletions(-) diff --git a/electrumx/server/block_processor.py b/electrumx/server/block_processor.py index b30a0a4..8b2015c 100644 --- a/electrumx/server/block_processor.py +++ b/electrumx/server/block_processor.py @@ -683,46 +683,13 @@ class BlockProcessor(electrumx.server.db.DB): def flush_utxos(self, batch): '''Flush the cached DB writes and UTXO set to the batch.''' - # Care is needed because the writes generated by flushing the - # UTXO state may have keys in common with our write cache or - # may be in the DB already. - flush_start = time.time() - delete_count = len(self.db_deletes) // 2 - utxo_cache_len = len(self.utxo_cache) - - # Spends - batch_delete = batch.delete - for key in sorted(self.db_deletes): - batch_delete(key) + self.flush_utxo_db(batch, self.db_deletes, self.utxo_cache, + self.undo_infos, self.height, self.tx_count, + self.tip) self.db_deletes = [] - - # New UTXOs - batch_put = batch.put - for cache_key, cache_value in self.utxo_cache.items(): - # suffix = tx_idx + tx_num - hashX = cache_value[:-12] - suffix = cache_key[-2:] + cache_value[-12:-8] - batch_put(b'h' + cache_key[:4] + suffix, hashX) - batch_put(b'u' + hashX + suffix, cache_value[-8:]) self.utxo_cache = {} - - # New undo information - self.flush_undo_infos(batch_put, self.undo_infos) self.undo_infos = [] - if self.utxo_db.for_sync: - self.logger.info('flushed {:,d} blocks with {:,d} txs, {:,d} UTXO ' - 'adds, {:,d} spends in {:.1f}s, committing...' - .format(self.height - self.db_height, - self.tx_count - self.db_tx_count, - utxo_cache_len, delete_count, - time.time() - flush_start)) - - self.utxo_flush_count = self.history.flush_count - self.db_tx_count = self.tx_count - self.db_height = self.height - self.db_tip = self.tip - async def _process_prefetched_blocks(self): '''Loop forever processing blocks as they arrive.''' while True: diff --git a/electrumx/server/db.py b/electrumx/server/db.py index b705258..7beba1d 100644 --- a/electrumx/server/db.py +++ b/electrumx/server/db.py @@ -193,6 +193,45 @@ class DB(object): def flush_history(self): self.history.flush() + def flush_utxo_db(self, batch, deletes, adds, undo_infos, + to_height, to_tx_count, to_tip): + '''Flush the cached DB writes and UTXO set to the batch.''' + # Care is needed because the writes generated by flushing the + # UTXO state may have keys in common with our write cache or + # may be in the DB already. + start_time = time.time() + + # Spends + batch_delete = batch.delete + for key in sorted(deletes): + batch_delete(key) + + # New UTXOs + batch_put = batch.put + for key, value in adds.items(): + # suffix = tx_idx + tx_num + hashX = value[:-12] + suffix = key[-2:] + value[-12:-8] + batch_put(b'h' + key[:4] + suffix, hashX) + batch_put(b'u' + hashX + suffix, value[-8:]) + + # New undo information + self.flush_undo_infos(batch_put, undo_infos) + + if self.utxo_db.for_sync: + block_count = to_height - self.db_height + tx_count = to_tx_count - self.db_tx_count + elapsed = time.time() - start_time + self.logger.info(f'flushed {block_count:,d} blocks with ' + f'{tx_count:,d} txs, {len(adds):,d} UTXO adds, ' + f'{len(deletes) // 2:,d} spends in ' + f'{elapsed:.1f}s, committing...') + + self.utxo_flush_count = self.history.flush_count + self.db_height = to_height + self.db_tx_count = to_tx_count + self.db_tip = to_tip + def db_assert_flushed(self, to_tx_count, to_height): '''Asserts state is fully flushed.''' assert to_tx_count == self.fs_tx_count == self.db_tx_count From d3f9ba386cbe02f8d6d7ce06ff626fb74edee04b Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Thu, 9 Aug 2018 09:06:24 +0900 Subject: [PATCH 07/14] Move flush_state() to db.py --- electrumx/server/block_processor.py | 10 ---------- electrumx/server/db.py | 10 ++++++++++ 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/electrumx/server/block_processor.py b/electrumx/server/block_processor.py index 8b2015c..e0ccf57 100644 --- a/electrumx/server/block_processor.py +++ b/electrumx/server/block_processor.py @@ -160,7 +160,6 @@ class BlockProcessor(electrumx.server.db.DB): # Meta self.next_cache_check = 0 - self.last_flush = time.time() self.touched = set() self.reorg_count = 0 @@ -313,14 +312,6 @@ class BlockProcessor(electrumx.server.db.DB): return start, count - def flush_state(self, batch): - '''Flush chain state to the batch.''' - now = time.time() - self.wall_time += now - self.last_flush - self.last_flush = now - self.last_flush_tx_count = self.tx_count - self.write_utxo_state(batch) - def assert_flushed(self): '''Asserts state is fully flushed.''' assert not self.undo_infos @@ -725,7 +716,6 @@ class BlockProcessor(electrumx.server.db.DB): self.height = self.db_height self.tip = self.db_tip self.tx_count = self.db_tx_count - self.last_flush_tx_count = self.tx_count # --- External API diff --git a/electrumx/server/db.py b/electrumx/server/db.py index 7beba1d..f86bcd8 100644 --- a/electrumx/server/db.py +++ b/electrumx/server/db.py @@ -62,6 +62,7 @@ class DB(object): self.history = History() self.utxo_db = None self.tx_counts = None + self.last_flush = time.time() self.logger.info(f'using {self.env.db_engine} for DB backend') @@ -232,6 +233,14 @@ class DB(object): self.db_tx_count = to_tx_count self.db_tip = to_tip + def flush_state(self, batch): + '''Flush chain state to the batch.''' + now = time.time() + self.wall_time += now - self.last_flush + self.last_flush = now + self.last_flush_tx_count = self.fs_tx_count + self.write_utxo_state(batch) + def db_assert_flushed(self, to_tx_count, to_height): '''Asserts state is fully flushed.''' assert to_tx_count == self.fs_tx_count == self.db_tx_count @@ -441,6 +450,7 @@ class DB(object): # These are our state as we move ahead of DB state self.fs_height = self.db_height self.fs_tx_count = self.db_tx_count + self.last_flush_tx_count = self.fs_tx_count # Log some stats self.logger.info('DB version: {:d}'.format(self.db_version)) From 42c3a308dbf037ccd3e48d4bd00e190009961498 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Thu, 9 Aug 2018 10:04:17 +0900 Subject: [PATCH 08/14] Move to flush_dbs in db.py --- electrumx/server/block_processor.py | 55 ++++++------------ electrumx/server/db.py | 89 +++++++++++++++++++++-------- 2 files changed, 83 insertions(+), 61 deletions(-) diff --git a/electrumx/server/block_processor.py b/electrumx/server/block_processor.py index e0ccf57..a7ecc0a 100644 --- a/electrumx/server/block_processor.py +++ b/electrumx/server/block_processor.py @@ -21,7 +21,7 @@ import electrumx from electrumx.server.daemon import DaemonError from electrumx.lib.hash import hash_to_hex_str, HASHX_LEN from electrumx.lib.util import chunks, formatted_time, class_logger -import electrumx.server.db +from electrumx.server.db import DB, FlushData class Prefetcher(object): @@ -142,7 +142,7 @@ class ChainError(Exception): '''Raised on error processing blocks.''' -class BlockProcessor(electrumx.server.db.DB): +class BlockProcessor(DB): '''Process blocks and update the DB state to match. Employ a prefetcher to prefetch blocks in batches for processing. @@ -325,37 +325,23 @@ class BlockProcessor(electrumx.server.db.DB): else: await self.run_in_thread_with_lock(self._flush_body, flush_utxos) - def _flush_body(self, flush_utxos): - '''Flush out cached state. + def flush_data(self): + return FlushData(self.height, self.tx_count, self.headers, + self.tx_hashes, self.undo_infos, self.utxo_cache, + self.db_deletes, self.tip) - History is always flushed. UTXOs are flushed if flush_utxos.''' - flush_start = time.time() + def _flush_body(self, flush_utxos): + '''Flush out cached state. UTXOs are flushed if flush_utxos.''' last_flush = self.last_flush tx_diff = self.tx_count - self.last_flush_tx_count - # Flush to file system - self.flush_fs(self.height, self.tx_count, self.headers, - self.tx_hashes) + self.flush_dbs(self.flush_data(), flush_utxos) self.tx_hashes = [] self.headers = [] - - # Then history - self.flush_history() - - # Flush state last as it reads the wall time. - with self.utxo_db.write_batch() as batch: - if flush_utxos: - self.flush_utxos(batch) - self.flush_state(batch) - - # Update and put the wall time again - otherwise we drop the - # time it took to commit the batch - self.flush_state(self.utxo_db) - - self.logger.info('flush #{:,d} took {:.1f}s. Height {:,d} txs: {:,d}' - .format(self.history.flush_count, - self.last_flush - flush_start, - self.height, self.tx_count)) + if flush_utxos: + self.db_deletes = [] + self.utxo_cache = {} + self.undo_infos = [] # Catch-up stats if self.utxo_db.for_sync: @@ -407,10 +393,14 @@ class BlockProcessor(electrumx.server.db.DB): .format(nremoves)) with self.utxo_db.write_batch() as batch: + self.flush_utxo_db(batch, self.flush_data()) # Flush state last as it reads the wall time. - self.flush_utxos(batch) self.flush_state(batch) + self.db_deletes = [] + self.utxo_cache = {} + self.undo_infos = [] + self.logger.info('backup flush #{:,d} took {:.1f}s. ' 'Height {:,d} txs: {:,d}' .format(self.history.flush_count, @@ -672,15 +662,6 @@ class BlockProcessor(electrumx.server.db.DB): raise ChainError('UTXO {} / {:,d} not found in "h" table' .format(hash_to_hex_str(tx_hash), tx_idx)) - def flush_utxos(self, batch): - '''Flush the cached DB writes and UTXO set to the batch.''' - self.flush_utxo_db(batch, self.db_deletes, self.utxo_cache, - self.undo_infos, self.height, self.tx_count, - self.tip) - self.db_deletes = [] - self.utxo_cache = {} - self.undo_infos = [] - async def _process_prefetched_blocks(self): '''Loop forever processing blocks as they arrive.''' while True: diff --git a/electrumx/server/db.py b/electrumx/server/db.py index f86bcd8..d5d62ca 100644 --- a/electrumx/server/db.py +++ b/electrumx/server/db.py @@ -18,6 +18,7 @@ from collections import namedtuple from glob import glob from struct import pack, unpack +import attr from aiorpcx import run_in_thread import electrumx.lib.util as util @@ -29,6 +30,17 @@ from electrumx.server.history import History UTXO = namedtuple("UTXO", "tx_num tx_pos tx_hash height value") +@attr.s(slots=True) +class FlushData(object): + height = attr.ib() + tx_count = attr.ib() + headers = attr.ib() + block_tx_hashes = attr.ib() + # The following are flushed to the UTXO DB if undo_infos is not None + undo_infos = attr.ib() + adds = attr.ib() + deletes = attr.ib() + tip = attr.ib() class DB(object): '''Simple wrapper of the backend database for querying. @@ -155,7 +167,34 @@ class DB(object): return await self.header_mc.branch_and_root(length, height) # Flushing - def flush_fs(self, to_height, to_tx_count, headers, block_tx_hashes): + def flush_dbs(self, flush_data, flush_utxos): + '''Flush out cached state. History is always flushed; UTXOs are + flushed if flush_utxos.''' + start_time = time.time() + tx_delta = flush_data.tx_count - self.last_flush_tx_count + + # Flush to file system + self.flush_fs(flush_data) + + # Then history + self.flush_history() + + # Flush state last as it reads the wall time. + with self.utxo_db.write_batch() as batch: + if flush_utxos: + self.flush_utxo_db(batch, flush_data) + self.flush_state(batch) + + # Update and put the wall time again - otherwise we drop the + # time it took to commit the batch + self.flush_state(self.utxo_db) + + elapsed = self.last_flush - start_time + self.logger.info(f'flush #{self.history.flush_count:,d} took ' + f'{elapsed:.1f}s. Height {flush_data.height:,d} ' + f'txs: {flush_data.tx_count:,d} ({tx_delta:+,d})') + + def flush_fs(self, flush_data): '''Write headers, tx counts and block tx hashes to the filesystem. The first height to write is self.fs_height + 1. The FS @@ -164,38 +203,38 @@ class DB(object): ''' prior_tx_count = (self.tx_counts[self.fs_height] if self.fs_height >= 0 else 0) - assert len(block_tx_hashes) == len(headers) - assert to_height == self.fs_height + len(headers) - assert to_tx_count == self.tx_counts[-1] if self.tx_counts else 0 - assert len(self.tx_counts) == to_height + 1 - hashes = b''.join(block_tx_hashes) + assert len(flush_data.block_tx_hashes) == len(flush_data.headers) + assert flush_data.height == self.fs_height + len(flush_data.headers) + assert flush_data.tx_count == (self.tx_counts[-1] if self.tx_counts + else 0) + assert len(self.tx_counts) == flush_data.height + 1 + hashes = b''.join(flush_data.block_tx_hashes) assert len(hashes) % 32 == 0 - assert len(hashes) // 32 == to_tx_count - prior_tx_count + assert len(hashes) // 32 == flush_data.tx_count - prior_tx_count # Write the headers, tx counts, and tx hashes start_time = time.time() height_start = self.fs_height + 1 offset = self.header_offset(height_start) - self.headers_file.write(offset, b''.join(headers)) - self.fs_update_header_offsets(offset, height_start, headers) + self.headers_file.write(offset, b''.join(flush_data.headers)) + self.fs_update_header_offsets(offset, height_start, flush_data.headers) offset = height_start * self.tx_counts.itemsize self.tx_counts_file.write(offset, self.tx_counts[height_start:].tobytes()) offset = prior_tx_count * 32 self.hashes_file.write(offset, hashes) - self.fs_height = to_height - self.fs_tx_count = to_tx_count + self.fs_height = flush_data.height + self.fs_tx_count = flush_data.tx_count if self.utxo_db.for_sync: elapsed = time.time() - start_time - self.logger.info(f'flushed to FS in {elapsed:.2f}s') + self.logger.info(f'flushed filesystem data in {elapsed:.2f}s') def flush_history(self): self.history.flush() - def flush_utxo_db(self, batch, deletes, adds, undo_infos, - to_height, to_tx_count, to_tip): + def flush_utxo_db(self, batch, flush_data): '''Flush the cached DB writes and UTXO set to the batch.''' # Care is needed because the writes generated by flushing the # UTXO state may have keys in common with our write cache or @@ -204,12 +243,12 @@ class DB(object): # Spends batch_delete = batch.delete - for key in sorted(deletes): + for key in sorted(flush_data.deletes): batch_delete(key) # New UTXOs batch_put = batch.put - for key, value in adds.items(): + for key, value in flush_data.adds.items(): # suffix = tx_idx + tx_num hashX = value[:-12] suffix = key[-2:] + value[-12:-8] @@ -217,21 +256,23 @@ class DB(object): batch_put(b'u' + hashX + suffix, value[-8:]) # New undo information - self.flush_undo_infos(batch_put, undo_infos) + self.flush_undo_infos(batch_put, flush_data.undo_infos) if self.utxo_db.for_sync: - block_count = to_height - self.db_height - tx_count = to_tx_count - self.db_tx_count + block_count = flush_data.height - self.db_height + tx_count = flush_data.tx_count - self.db_tx_count + add_count = len(flush_data.adds) + spend_count = len(flush_data.deletes) // 2 elapsed = time.time() - start_time self.logger.info(f'flushed {block_count:,d} blocks with ' - f'{tx_count:,d} txs, {len(adds):,d} UTXO adds, ' - f'{len(deletes) // 2:,d} spends in ' + f'{tx_count:,d} txs, {add_count:,d} UTXO adds, ' + f'{spend_count:,d} spends in ' f'{elapsed:.1f}s, committing...') self.utxo_flush_count = self.history.flush_count - self.db_height = to_height - self.db_tx_count = to_tx_count - self.db_tip = to_tip + self.db_height = flush_data.height + self.db_tx_count = flush_data.tx_count + self.db_tip = flush_data.tip def flush_state(self, batch): '''Flush chain state to the batch.''' From 891730e78fec13a63b93acaf65828b2002788422 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Thu, 9 Aug 2018 11:10:37 +0900 Subject: [PATCH 09/14] Move flush_backup() to db.py --- electrumx/server/block_processor.py | 52 ++++++----------------------- electrumx/server/db.py | 22 ++++++++++++ electrumx/server/history.py | 5 ++- 3 files changed, 35 insertions(+), 44 deletions(-) diff --git a/electrumx/server/block_processor.py b/electrumx/server/block_processor.py index a7ecc0a..457d30b 100644 --- a/electrumx/server/block_processor.py +++ b/electrumx/server/block_processor.py @@ -261,7 +261,7 @@ class BlockProcessor(DB): for hex_hashes in chunks(hashes, 50): raw_blocks = await get_raw_blocks(last, hex_hashes) await self.run_in_thread_with_lock(self.backup_blocks, raw_blocks) - await self.backup_flush() + await self.flush_for_backup() last -= len(raw_blocks) await self.prefetcher.reset_height(self.height) @@ -319,6 +319,16 @@ class BlockProcessor(DB): assert not self.db_deletes self.db_assert_flushed(self.tx_count, self.height) + async def flush_for_backup(self): + # self.touched can include other addresses which is + # harmless, but remove None. + self.touched.discard(None) + await self.run_in_thread_with_lock( + self.flush_backup, self.flush_data(), self.touched) + self.db_deletes = [] + self.utxo_cache = {} + self.undo_infos = [] + async def flush(self, flush_utxos): if self.height == self.db_height: self.assert_flushed() @@ -367,46 +377,6 @@ class BlockProcessor(DB): .format(formatted_time(self.wall_time), formatted_time(tx_est / this_tx_per_sec))) - async def backup_flush(self): - assert self.height < self.db_height - assert not self.headers - assert not self.tx_hashes - self.history.assert_flushed() - await self.run_in_thread_with_lock(self._backup_flush_body) - - def _backup_flush_body(self): - '''Like flush() but when backing up. All UTXOs are flushed. - - hashXs - sequence of hashXs which were touched by backing - up. Searched for history entries to remove after the backup - height. - ''' - flush_start = time.time() - - self.backup_fs(self.height, self.tx_count) - - # Backup history. self.touched can include other addresses - # which is harmless, but remove None. - self.touched.discard(None) - nremoves = self.history.backup(self.touched, self.tx_count) - self.logger.info('backing up removed {:,d} history entries' - .format(nremoves)) - - with self.utxo_db.write_batch() as batch: - self.flush_utxo_db(batch, self.flush_data()) - # Flush state last as it reads the wall time. - self.flush_state(batch) - - self.db_deletes = [] - self.utxo_cache = {} - self.undo_infos = [] - - self.logger.info('backup flush #{:,d} took {:.1f}s. ' - 'Height {:,d} txs: {:,d}' - .format(self.history.flush_count, - self.last_flush - flush_start, - self.height, self.tx_count)) - def check_cache_size(self): '''Flush a cache if it gets too big.''' # Good average estimates based on traversal of subobjects and diff --git a/electrumx/server/db.py b/electrumx/server/db.py index d5d62ca..861845c 100644 --- a/electrumx/server/db.py +++ b/electrumx/server/db.py @@ -282,6 +282,28 @@ class DB(object): self.last_flush_tx_count = self.fs_tx_count self.write_utxo_state(batch) + def flush_backup(self, flush_data, touched): + '''Like flush_dbs() but when backing up. All UTXOs are flushed.''' + assert not flush_data.headers + assert not flush_data.block_tx_hashes + assert flush_data.height < self.db_height + self.history.assert_flushed() + + start_time = time.time() + tx_delta = flush_data.tx_count - self.last_flush_tx_count + + self.backup_fs(flush_data.height, flush_data.tx_count) + self.history.backup(touched, flush_data.tx_count) + with self.utxo_db.write_batch() as batch: + self.flush_utxo_db(batch, flush_data) + # Flush state last as it reads the wall time. + self.flush_state(batch) + + elapsed = self.last_flush - start_time + self.logger.info(f'backup flush #{self.history.flush_count:,d} took ' + f'{elapsed:.1f}s. Height {flush_data.height:,d} ' + f'txs: {flush_data.tx_count:,d} ({tx_delta:+,d})') + def db_assert_flushed(self, to_tx_count, to_height): '''Asserts state is fully flushed.''' assert to_tx_count == self.fs_tx_count == self.db_tx_count diff --git a/electrumx/server/history.py b/electrumx/server/history.py index eaab4af..b42ca6c 100644 --- a/electrumx/server/history.py +++ b/electrumx/server/history.py @@ -85,7 +85,7 @@ class History(object): if flush_id > utxo_flush_count: keys.append(key) - self.logger.info('deleting {:,d} history entries'.format(len(keys))) + self.logger.info(f'deleting {len(keys):,d} history entries') self.flush_count = utxo_flush_count with self.db.write_batch() as batch: @@ -144,7 +144,6 @@ class History(object): self.logger.info(f'flushed history in {elapsed:.1f}s ' f'for {count:,d} addrs') - def backup(self, hashXs, tx_count): # Not certain this is needed, but it doesn't hurt self.flush_count += 1 @@ -172,7 +171,7 @@ class History(object): batch.put(key, value) self.write_state(batch) - return nremoves + self.logger.info(f'backing up removed {nremoves:,d} history entries') def get_txnums(self, hashX, limit=1000): '''Generator that returns an unpruned, sorted list of tx_nums in the From dc445e2a54ee7cbb0f6676677d13825cd6461730 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Thu, 9 Aug 2018 13:47:54 +0900 Subject: [PATCH 10/14] Move catch-up stats to db.py --- electrumx/server/block_processor.py | 52 ++++++++++------------------- electrumx/server/db.py | 15 ++++++++- 2 files changed, 32 insertions(+), 35 deletions(-) diff --git a/electrumx/server/block_processor.py b/electrumx/server/block_processor.py index 457d30b..eb71d17 100644 --- a/electrumx/server/block_processor.py +++ b/electrumx/server/block_processor.py @@ -20,8 +20,8 @@ from aiorpcx import TaskGroup, run_in_thread import electrumx from electrumx.server.daemon import DaemonError from electrumx.lib.hash import hash_to_hex_str, HASHX_LEN -from electrumx.lib.util import chunks, formatted_time, class_logger -from electrumx.server.db import DB, FlushData +from electrumx.lib.util import chunks, class_logger +from electrumx.server.db import FlushData class Prefetcher(object): @@ -312,6 +312,8 @@ class BlockProcessor(DB): return start, count + # - Flushing + def assert_flushed(self): '''Asserts state is fully flushed.''' assert not self.undo_infos @@ -319,6 +321,11 @@ class BlockProcessor(DB): assert not self.db_deletes self.db_assert_flushed(self.tx_count, self.height) + def flush_data(self): + return FlushData(self.height, self.tx_count, self.headers, + self.tx_hashes, self.undo_infos, self.utxo_cache, + self.db_deletes, self.tip) + async def flush_for_backup(self): # self.touched can include other addresses which is # harmless, but remove None. @@ -335,17 +342,18 @@ class BlockProcessor(DB): else: await self.run_in_thread_with_lock(self._flush_body, flush_utxos) - def flush_data(self): - return FlushData(self.height, self.tx_count, self.headers, - self.tx_hashes, self.undo_infos, self.utxo_cache, - self.db_deletes, self.tip) - def _flush_body(self, flush_utxos): '''Flush out cached state. UTXOs are flushed if flush_utxos.''' - last_flush = self.last_flush - tx_diff = self.tx_count - self.last_flush_tx_count + # Try to estimate how many txs there are to go + daemon_height = self.daemon.cached_height() + coin = self.coin + tail_count = daemon_height - max(self.height, coin.TX_COUNT_HEIGHT) + # Damp the initial enthusiasm + factor = max(2.0 - 0.9 * self.height / coin.TX_COUNT_HEIGHT, 1.0) + estimated_txs = (tail_count * coin.TX_PER_BLOCK + + max(coin.TX_COUNT - self.tx_count, 0)) * factor - self.flush_dbs(self.flush_data(), flush_utxos) + self.flush_dbs(self.flush_data(), flush_utxos, estimated_txs) self.tx_hashes = [] self.headers = [] if flush_utxos: @@ -353,30 +361,6 @@ class BlockProcessor(DB): self.utxo_cache = {} self.undo_infos = [] - # Catch-up stats - if self.utxo_db.for_sync: - tx_per_sec = int(self.tx_count / self.wall_time) - this_tx_per_sec = 1 + int(tx_diff / (self.last_flush - last_flush)) - self.logger.info('tx/sec since genesis: {:,d}, ' - 'since last flush: {:,d}' - .format(tx_per_sec, this_tx_per_sec)) - - daemon_height = self.daemon.cached_height() - if self.height > self.coin.TX_COUNT_HEIGHT: - tx_est = (daemon_height - self.height) * self.coin.TX_PER_BLOCK - else: - tx_est = ((daemon_height - self.coin.TX_COUNT_HEIGHT) - * self.coin.TX_PER_BLOCK - + (self.coin.TX_COUNT - self.tx_count)) - - # Damp the enthusiasm - realism = 2.0 - 0.9 * self.height / self.coin.TX_COUNT_HEIGHT - tx_est *= max(realism, 1.0) - - self.logger.info('sync time: {} ETA: {}' - .format(formatted_time(self.wall_time), - formatted_time(tx_est / this_tx_per_sec))) - def check_cache_size(self): '''Flush a cache if it gets too big.''' # Good average estimates based on traversal of subobjects and diff --git a/electrumx/server/db.py b/electrumx/server/db.py index 861845c..d64183d 100644 --- a/electrumx/server/db.py +++ b/electrumx/server/db.py @@ -24,6 +24,7 @@ from aiorpcx import run_in_thread import electrumx.lib.util as util from electrumx.lib.hash import hash_to_hex_str, HASHX_LEN from electrumx.lib.merkle import Merkle, MerkleCache +from electrumx.lib.util import formatted_time from electrumx.server.storage import db_class from electrumx.server.history import History @@ -167,10 +168,11 @@ class DB(object): return await self.header_mc.branch_and_root(length, height) # Flushing - def flush_dbs(self, flush_data, flush_utxos): + def flush_dbs(self, flush_data, flush_utxos, estimated_txs): '''Flush out cached state. History is always flushed; UTXOs are flushed if flush_utxos.''' start_time = time.time() + prior_flush = self.last_flush tx_delta = flush_data.tx_count - self.last_flush_tx_count # Flush to file system @@ -194,6 +196,17 @@ class DB(object): f'{elapsed:.1f}s. Height {flush_data.height:,d} ' f'txs: {flush_data.tx_count:,d} ({tx_delta:+,d})') + # Catch-up stats + if self.utxo_db.for_sync: + flush_interval = self.last_flush - prior_flush + tx_per_sec_gen = int(flush_data.tx_count / self.wall_time) + tx_per_sec_last = 1 + int(tx_delta / flush_interval) + eta = estimated_txs / tx_per_sec_last + self.logger.info(f'tx/sec since genesis: {tx_per_sec_gen:,d}, ' + f'since last flush: {tx_per_sec_last:,d}') + self.logger.info(f'sync time: {formatted_time(self.wall_time)} ' + f'ETA: {formatted_time(eta)}') + def flush_fs(self, flush_data): '''Write headers, tx counts and block tx hashes to the filesystem. From a50d17c5b9781e69647e844639004afe08d664c3 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Thu, 9 Aug 2018 14:02:37 +0900 Subject: [PATCH 11/14] Clear data by reference as it's flushed --- electrumx/server/block_processor.py | 22 +++++----------------- electrumx/server/db.py | 10 ++++++++-- 2 files changed, 13 insertions(+), 19 deletions(-) diff --git a/electrumx/server/block_processor.py b/electrumx/server/block_processor.py index eb71d17..c2ad2aa 100644 --- a/electrumx/server/block_processor.py +++ b/electrumx/server/block_processor.py @@ -261,7 +261,11 @@ class BlockProcessor(DB): for hex_hashes in chunks(hashes, 50): raw_blocks = await get_raw_blocks(last, hex_hashes) await self.run_in_thread_with_lock(self.backup_blocks, raw_blocks) - await self.flush_for_backup() + # self.touched can include other addresses which is + # harmless, but remove None. + self.touched.discard(None) + await self.run_in_thread_with_lock( + self.flush_backup, self.flush_data(), self.touched) last -= len(raw_blocks) await self.prefetcher.reset_height(self.height) @@ -326,16 +330,6 @@ class BlockProcessor(DB): self.tx_hashes, self.undo_infos, self.utxo_cache, self.db_deletes, self.tip) - async def flush_for_backup(self): - # self.touched can include other addresses which is - # harmless, but remove None. - self.touched.discard(None) - await self.run_in_thread_with_lock( - self.flush_backup, self.flush_data(), self.touched) - self.db_deletes = [] - self.utxo_cache = {} - self.undo_infos = [] - async def flush(self, flush_utxos): if self.height == self.db_height: self.assert_flushed() @@ -354,12 +348,6 @@ class BlockProcessor(DB): max(coin.TX_COUNT - self.tx_count, 0)) * factor self.flush_dbs(self.flush_data(), flush_utxos, estimated_txs) - self.tx_hashes = [] - self.headers = [] - if flush_utxos: - self.db_deletes = [] - self.utxo_cache = {} - self.undo_infos = [] def check_cache_size(self): '''Flush a cache if it gets too big.''' diff --git a/electrumx/server/db.py b/electrumx/server/db.py index d64183d..3b7019d 100644 --- a/electrumx/server/db.py +++ b/electrumx/server/db.py @@ -222,6 +222,7 @@ class DB(object): else 0) assert len(self.tx_counts) == flush_data.height + 1 hashes = b''.join(flush_data.block_tx_hashes) + flush_data.block_tx_hashes.clear() assert len(hashes) % 32 == 0 assert len(hashes) // 32 == flush_data.tx_count - prior_tx_count @@ -231,6 +232,8 @@ class DB(object): offset = self.header_offset(height_start) self.headers_file.write(offset, b''.join(flush_data.headers)) self.fs_update_header_offsets(offset, height_start, flush_data.headers) + flush_data.headers.clear() + offset = height_start * self.tx_counts.itemsize self.tx_counts_file.write(offset, self.tx_counts[height_start:].tobytes()) @@ -253,11 +256,14 @@ class DB(object): # UTXO state may have keys in common with our write cache or # may be in the DB already. start_time = time.time() + add_count = len(flush_data.adds) + spend_count = len(flush_data.deletes) // 2 # Spends batch_delete = batch.delete for key in sorted(flush_data.deletes): batch_delete(key) + flush_data.deletes.clear() # New UTXOs batch_put = batch.put @@ -267,15 +273,15 @@ class DB(object): suffix = key[-2:] + value[-12:-8] batch_put(b'h' + key[:4] + suffix, hashX) batch_put(b'u' + hashX + suffix, value[-8:]) + flush_data.adds.clear() # New undo information self.flush_undo_infos(batch_put, flush_data.undo_infos) + flush_data.undo_infos.clear() if self.utxo_db.for_sync: block_count = flush_data.height - self.db_height tx_count = flush_data.tx_count - self.db_tx_count - add_count = len(flush_data.adds) - spend_count = len(flush_data.deletes) // 2 elapsed = time.time() - start_time self.logger.info(f'flushed {block_count:,d} blocks with ' f'{tx_count:,d} txs, {add_count:,d} UTXO adds, ' From 27b31746f8a11afe36577af8eb1b2feeae95eead Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Thu, 9 Aug 2018 14:13:09 +0900 Subject: [PATCH 12/14] Remove remaining flush-related logic to db.py --- electrumx/server/block_processor.py | 59 +++++++++++------------------ electrumx/server/db.py | 26 +++++++++---- 2 files changed, 41 insertions(+), 44 deletions(-) diff --git a/electrumx/server/block_processor.py b/electrumx/server/block_processor.py index c2ad2aa..2669526 100644 --- a/electrumx/server/block_processor.py +++ b/electrumx/server/block_processor.py @@ -187,17 +187,6 @@ class BlockProcessor(DB): return await run_in_thread(func, *args) return await asyncio.shield(run_in_thread_locked()) - async def _maybe_flush(self): - # If caught up, flush everything as client queries are - # performed on the DB. - if self._caught_up_event.is_set(): - await self.flush(True) - elif time.time() > self.next_cache_check: - flush_arg = self.check_cache_size() - if flush_arg is not None: - await self.flush(flush_arg) - self.next_cache_check = time.time() + 30 - async def check_and_advance_blocks(self, raw_blocks): '''Process the list of raw blocks passed. Detects and handles reorgs. @@ -316,38 +305,36 @@ class BlockProcessor(DB): return start, count - # - Flushing - - def assert_flushed(self): - '''Asserts state is fully flushed.''' - assert not self.undo_infos - assert not self.utxo_cache - assert not self.db_deletes - self.db_assert_flushed(self.tx_count, self.height) + def estimate_txs_remaining(self): + # Try to estimate how many txs there are to go + daemon_height = self.daemon.cached_height() + coin = self.coin + tail_count = daemon_height - max(self.height, coin.TX_COUNT_HEIGHT) + # Damp the initial enthusiasm + realism = max(2.0 - 0.9 * self.height / coin.TX_COUNT_HEIGHT, 1.0) + return (tail_count * coin.TX_PER_BLOCK + + max(coin.TX_COUNT - self.tx_count, 0)) * realism + # - Flushing def flush_data(self): return FlushData(self.height, self.tx_count, self.headers, self.tx_hashes, self.undo_infos, self.utxo_cache, self.db_deletes, self.tip) async def flush(self, flush_utxos): - if self.height == self.db_height: - self.assert_flushed() - else: - await self.run_in_thread_with_lock(self._flush_body, flush_utxos) + await self.run_in_thread_with_lock( + self.flush_dbs, self.flush_data(), flush_utxos) - def _flush_body(self, flush_utxos): - '''Flush out cached state. UTXOs are flushed if flush_utxos.''' - # Try to estimate how many txs there are to go - daemon_height = self.daemon.cached_height() - coin = self.coin - tail_count = daemon_height - max(self.height, coin.TX_COUNT_HEIGHT) - # Damp the initial enthusiasm - factor = max(2.0 - 0.9 * self.height / coin.TX_COUNT_HEIGHT, 1.0) - estimated_txs = (tail_count * coin.TX_PER_BLOCK + - max(coin.TX_COUNT - self.tx_count, 0)) * factor - - self.flush_dbs(self.flush_data(), flush_utxos, estimated_txs) + async def _maybe_flush(self): + # If caught up, flush everything as client queries are + # performed on the DB. + if self._caught_up_event.is_set(): + await self.flush(True) + elif time.time() > self.next_cache_check: + flush_arg = self.check_cache_size() + if flush_arg is not None: + await self.flush(flush_arg) + self.next_cache_check = time.time() + 30 def check_cache_size(self): '''Flush a cache if it gets too big.''' @@ -448,7 +435,7 @@ class BlockProcessor(DB): The blocks should be in order of decreasing height, starting at. self.height. A flush is performed once the blocks are backed up. ''' - self.assert_flushed() + self.assert_flushed(self.flush_data()) assert self.height >= len(raw_blocks) coin = self.coin diff --git a/electrumx/server/db.py b/electrumx/server/db.py index 3b7019d..ab57539 100644 --- a/electrumx/server/db.py +++ b/electrumx/server/db.py @@ -168,9 +168,25 @@ class DB(object): return await self.header_mc.branch_and_root(length, height) # Flushing - def flush_dbs(self, flush_data, flush_utxos, estimated_txs): + def assert_flushed(self, flush_data): + '''Asserts state is fully flushed.''' + assert flush_data.tx_count == self.fs_tx_count == self.db_tx_count + assert flush_data.height == self.fs_height == self.db_height + assert flush_data.tip == self.tip + assert not flush_data.headers + assert not flush_data.block_tx_hashes + assert not flush_data.adds + assert not flush_data.deletes + assert not flush_data.undo_infos + self.history.assert_flushed() + + def flush_dbs(self, flush_data, flush_utxos): '''Flush out cached state. History is always flushed; UTXOs are flushed if flush_utxos.''' + if flush_data.height == self.db_height: + self.assert_flushed(flush_data) + return + start_time = time.time() prior_flush = self.last_flush tx_delta = flush_data.tx_count - self.last_flush_tx_count @@ -201,7 +217,7 @@ class DB(object): flush_interval = self.last_flush - prior_flush tx_per_sec_gen = int(flush_data.tx_count / self.wall_time) tx_per_sec_last = 1 + int(tx_delta / flush_interval) - eta = estimated_txs / tx_per_sec_last + eta = self.estimate_txs_remaining() / tx_per_sec_last self.logger.info(f'tx/sec since genesis: {tx_per_sec_gen:,d}, ' f'since last flush: {tx_per_sec_last:,d}') self.logger.info(f'sync time: {formatted_time(self.wall_time)} ' @@ -323,12 +339,6 @@ class DB(object): f'{elapsed:.1f}s. Height {flush_data.height:,d} ' f'txs: {flush_data.tx_count:,d} ({tx_delta:+,d})') - def db_assert_flushed(self, to_tx_count, to_height): - '''Asserts state is fully flushed.''' - assert to_tx_count == self.fs_tx_count == self.db_tx_count - assert to_height == self.fs_height == self.db_height - self.history.assert_flushed() - def fs_update_header_offsets(self, offset_start, height_start, headers): if self.coin.STATIC_BLOCK_HEADERS: return From 967b2de60d08f720323c969ff2d78b1d8ffd88ad Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Thu, 9 Aug 2018 14:42:17 +0900 Subject: [PATCH 13/14] Separate the block processor from the DB - BP no longer inherits from the DB, but is passed it --- electrumx/server/block_processor.py | 61 +++++++++++++++-------------- electrumx/server/chain_state.py | 20 +++++----- electrumx/server/controller.py | 10 +++-- electrumx/server/db.py | 8 ++-- 4 files changed, 52 insertions(+), 47 deletions(-) diff --git a/electrumx/server/block_processor.py b/electrumx/server/block_processor.py index 2669526..6c6301f 100644 --- a/electrumx/server/block_processor.py +++ b/electrumx/server/block_processor.py @@ -142,21 +142,23 @@ class ChainError(Exception): '''Raised on error processing blocks.''' -class BlockProcessor(DB): +class BlockProcessor(object): '''Process blocks and update the DB state to match. Employ a prefetcher to prefetch blocks in batches for processing. Coordinate backing up in case of chain reorganisations. ''' - def __init__(self, env, daemon, notifications): - super().__init__(env) - + def __init__(self, env, db, daemon, notifications): + self.env = env + self.db = db self.daemon = daemon self.notifications = notifications + self.coin = env.coin self.blocks_event = asyncio.Event() self.prefetcher = Prefetcher(daemon, env.coin, self.blocks_event) + self.logger = class_logger(__name__, self.__class__.__name__) # Meta self.next_cache_check = 0 @@ -204,7 +206,7 @@ class BlockProcessor(DB): start = time.time() await self.run_in_thread_with_lock(self.advance_blocks, blocks) await self._maybe_flush() - if not self.first_sync: + if not self.db.first_sync: s = '' if len(blocks) == 1 else 's' self.logger.info('processed {:,d} block{} in {:.1f}s' .format(len(blocks), s, @@ -254,7 +256,7 @@ class BlockProcessor(DB): # harmless, but remove None. self.touched.discard(None) await self.run_in_thread_with_lock( - self.flush_backup, self.flush_data(), self.touched) + self.db.flush_backup, self.flush_data(), self.touched) last -= len(raw_blocks) await self.prefetcher.reset_height(self.height) @@ -271,7 +273,7 @@ class BlockProcessor(DB): self.logger.info(f'chain was reorganised replacing {count:,d} ' f'block{s} at heights {start:,d}-{last:,d}') - return start, last, await self.fs_block_hashes(start, count) + return start, last, await self.db.fs_block_hashes(start, count) async def calc_reorg_range(self, count): '''Calculate the reorg range''' @@ -289,7 +291,7 @@ class BlockProcessor(DB): start = self.height - 1 count = 1 while start > 0: - hashes = await self.fs_block_hashes(start, count) + hashes = await self.db.fs_block_hashes(start, count) hex_hashes = [hash_to_hex_str(hash) for hash in hashes] d_hex_hashes = await self.daemon.block_hex_hashes(start, count) n = diff_pos(hex_hashes, d_hex_hashes) @@ -323,7 +325,8 @@ class BlockProcessor(DB): async def flush(self, flush_utxos): await self.run_in_thread_with_lock( - self.flush_dbs, self.flush_data(), flush_utxos) + self.db.flush_dbs, self.flush_data(), flush_utxos, + self.estimate_txs_remaining) async def _maybe_flush(self): # If caught up, flush everything as client queries are @@ -343,10 +346,10 @@ class BlockProcessor(DB): one_MB = 1000*1000 utxo_cache_size = len(self.utxo_cache) * 205 db_deletes_size = len(self.db_deletes) * 57 - hist_cache_size = self.history.unflushed_memsize() + hist_cache_size = self.db.history.unflushed_memsize() # Roughly ntxs * 32 + nblocks * 42 - tx_hash_size = ((self.tx_count - self.fs_tx_count) * 32 - + (self.height - self.fs_height) * 42) + tx_hash_size = ((self.tx_count - self.db.fs_tx_count) * 32 + + (self.height - self.db.fs_height) * 42) utxo_MB = (db_deletes_size + utxo_cache_size) // one_MB hist_MB = (hist_cache_size + tx_hash_size) // one_MB @@ -367,7 +370,7 @@ class BlockProcessor(DB): It is already verified they correctly connect onto our tip. ''' - min_height = self.min_undo_height(self.daemon.cached_height()) + min_height = self.db.min_undo_height(self.daemon.cached_height()) height = self.height for block in blocks: @@ -375,7 +378,7 @@ class BlockProcessor(DB): undo_info = self.advance_txs(block.transactions) if height >= min_height: self.undo_infos.append((undo_info, height)) - self.write_raw_block(block.raw, height) + self.db.write_raw_block(block.raw, height) headers = [block.header for block in blocks] self.height = height @@ -422,10 +425,10 @@ class BlockProcessor(DB): update_touched(hashXs) tx_num += 1 - self.history.add_unflushed(hashXs_by_tx, self.tx_count) + self.db.history.add_unflushed(hashXs_by_tx, self.tx_count) self.tx_count = tx_num - self.tx_counts.append(tx_num) + self.db.tx_counts.append(tx_num) return undo_info @@ -435,7 +438,7 @@ class BlockProcessor(DB): The blocks should be in order of decreasing height, starting at. self.height. A flush is performed once the blocks are backed up. ''' - self.assert_flushed(self.flush_data()) + self.db.assert_flushed(self.flush_data()) assert self.height >= len(raw_blocks) coin = self.coin @@ -451,14 +454,14 @@ class BlockProcessor(DB): self.tip = coin.header_prevhash(block.header) self.backup_txs(block.transactions) self.height -= 1 - self.tx_counts.pop() + self.db.tx_counts.pop() self.logger.info('backed up to height {:,d}'.format(self.height)) def backup_txs(self, txs): # Prevout values, in order down the block (coinbase first if present) # undo_info is in reverse block order - undo_info = self.read_undo_info(self.height) + undo_info = self.db.read_undo_info(self.height) if undo_info is None: raise ChainError('no undo information found for height {:,d}' .format(self.height)) @@ -566,14 +569,14 @@ class BlockProcessor(DB): # Value: hashX prefix = b'h' + tx_hash[:4] + idx_packed candidates = {db_key: hashX for db_key, hashX - in self.utxo_db.iterator(prefix=prefix)} + in self.db.utxo_db.iterator(prefix=prefix)} for hdb_key, hashX in candidates.items(): tx_num_packed = hdb_key[-4:] if len(candidates) > 1: tx_num, = unpack('False state. - first_sync = self.first_sync - self.first_sync = False + first_sync = self.db.first_sync + self.db.first_sync = False await self.flush(True) if first_sync: self.logger.info(f'{electrumx.version} synced to ' @@ -619,13 +622,13 @@ class BlockProcessor(DB): # Initialise the notification framework await self.notifications.on_block(set(), self.height) # Reopen for serving - await self.open_for_serving() + await self.db.open_for_serving() async def _first_open_dbs(self): - await self.open_for_sync() - self.height = self.db_height - self.tip = self.db_tip - self.tx_count = self.db_tx_count + await self.db.open_for_sync() + self.height = self.db.db_height + self.tip = self.db.db_tip + self.tx_count = self.db.db_tx_count # --- External API diff --git a/electrumx/server/chain_state.py b/electrumx/server/chain_state.py index 8e33830..135d42a 100644 --- a/electrumx/server/chain_state.py +++ b/electrumx/server/chain_state.py @@ -14,18 +14,18 @@ class ChainState(object): blocks, transaction history, UTXOs and the mempool. ''' - def __init__(self, env, daemon, bp): + def __init__(self, env, db, daemon, bp): self._env = env + self._db = db self._daemon = daemon - self._bp = bp # External interface pass-throughs for session.py - self.force_chain_reorg = self._bp.force_chain_reorg - self.tx_branch_and_root = self._bp.merkle.branch_and_root - self.read_headers = self._bp.read_headers - self.all_utxos = self._bp.all_utxos - self.limited_history = self._bp.limited_history - self.header_branch_and_root = self._bp.header_branch_and_root + self.force_chain_reorg = bp.force_chain_reorg + self.tx_branch_and_root = db.merkle.branch_and_root + self.read_headers = db.read_headers + self.all_utxos = db.all_utxos + self.limited_history = db.limited_history + self.header_branch_and_root = db.header_branch_and_root async def broadcast_transaction(self, raw_tx): return await self._daemon.sendrawtransaction([raw_tx]) @@ -34,7 +34,7 @@ class ChainState(object): return await getattr(self._daemon, method)(*args) def db_height(self): - return self._bp.db_height + return self._db.db_height def get_info(self): '''Chain state info for LocalRPC and logs.''' @@ -57,7 +57,7 @@ class ChainState(object): async def query(self, args, limit): coin = self._env.coin - db = self._bp + db = self._db lines = [] def arg_to_hashX(arg): diff --git a/electrumx/server/controller.py b/electrumx/server/controller.py index e3115c6..665d39c 100644 --- a/electrumx/server/controller.py +++ b/electrumx/server/controller.py @@ -13,6 +13,7 @@ import electrumx from electrumx.lib.server_base import ServerBase from electrumx.lib.util import version_string from electrumx.server.chain_state import ChainState +from electrumx.server.db import DB from electrumx.server.mempool import MemPool from electrumx.server.session import SessionManager @@ -93,10 +94,11 @@ class Controller(ServerBase): notifications = Notifications() daemon = env.coin.DAEMON(env) + db = DB(env) BlockProcessor = env.coin.BLOCK_PROCESSOR - bp = BlockProcessor(env, daemon, notifications) - mempool = MemPool(env.coin, daemon, notifications, bp.lookup_utxos) - chain_state = ChainState(env, daemon, bp) + bp = BlockProcessor(env, db, daemon, notifications) + mempool = MemPool(env.coin, daemon, notifications, db.lookup_utxos) + chain_state = ChainState(env, db, daemon, bp) session_mgr = SessionManager(env, chain_state, mempool, notifications, shutdown_event) @@ -108,7 +110,7 @@ class Controller(ServerBase): await group.spawn(session_mgr.serve(serve_externally_event)) await group.spawn(bp.fetch_and_process_blocks(caught_up_event)) await caught_up_event.wait() - await group.spawn(bp.populate_header_merkle_cache()) + await group.spawn(db.populate_header_merkle_cache()) await group.spawn(mempool.keep_synchronized(synchronized_event)) await synchronized_event.wait() serve_externally_event.set() diff --git a/electrumx/server/db.py b/electrumx/server/db.py index ab57539..6e82a52 100644 --- a/electrumx/server/db.py +++ b/electrumx/server/db.py @@ -158,7 +158,7 @@ class DB(object): async def populate_header_merkle_cache(self): self.logger.info('populating header merkle cache...') - length = max(1, self.height - self.env.reorg_limit) + length = max(1, self.db_height - self.env.reorg_limit) start = time.time() await self.header_mc.initialize(length) elapsed = time.time() - start @@ -172,7 +172,7 @@ class DB(object): '''Asserts state is fully flushed.''' assert flush_data.tx_count == self.fs_tx_count == self.db_tx_count assert flush_data.height == self.fs_height == self.db_height - assert flush_data.tip == self.tip + assert flush_data.tip == self.db_tip assert not flush_data.headers assert not flush_data.block_tx_hashes assert not flush_data.adds @@ -180,7 +180,7 @@ class DB(object): assert not flush_data.undo_infos self.history.assert_flushed() - def flush_dbs(self, flush_data, flush_utxos): + def flush_dbs(self, flush_data, flush_utxos, estimate_txs_remaining): '''Flush out cached state. History is always flushed; UTXOs are flushed if flush_utxos.''' if flush_data.height == self.db_height: @@ -217,7 +217,7 @@ class DB(object): flush_interval = self.last_flush - prior_flush tx_per_sec_gen = int(flush_data.tx_count / self.wall_time) tx_per_sec_last = 1 + int(tx_delta / flush_interval) - eta = self.estimate_txs_remaining() / tx_per_sec_last + eta = estimate_txs_remaining() / tx_per_sec_last self.logger.info(f'tx/sec since genesis: {tx_per_sec_gen:,d}, ' f'since last flush: {tx_per_sec_last:,d}') self.logger.info(f'sync time: {formatted_time(self.wall_time)} ' From 635ffed42b410ea9bd87036326ed028793f47d22 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Fri, 10 Aug 2018 11:41:02 +0900 Subject: [PATCH 14/14] flush_data() must be called with the lock held --- electrumx/server/block_processor.py | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/electrumx/server/block_processor.py b/electrumx/server/block_processor.py index 6c6301f..c9bde57 100644 --- a/electrumx/server/block_processor.py +++ b/electrumx/server/block_processor.py @@ -246,17 +246,19 @@ class BlockProcessor(object): except Exception: return await self.daemon.raw_blocks(hex_hashes) + def flush_backup(): + # self.touched can include other addresses which is + # harmless, but remove None. + self.touched.discard(None) + self.db.flush_backup(self.flush_data(), self.touched) + start, last, hashes = await self.reorg_hashes(count) # Reverse and convert to hex strings. hashes = [hash_to_hex_str(hash) for hash in reversed(hashes)] for hex_hashes in chunks(hashes, 50): raw_blocks = await get_raw_blocks(last, hex_hashes) await self.run_in_thread_with_lock(self.backup_blocks, raw_blocks) - # self.touched can include other addresses which is - # harmless, but remove None. - self.touched.discard(None) - await self.run_in_thread_with_lock( - self.db.flush_backup, self.flush_data(), self.touched) + await self.run_in_thread_with_lock(flush_backup) last -= len(raw_blocks) await self.prefetcher.reset_height(self.height) @@ -319,14 +321,17 @@ class BlockProcessor(object): # - Flushing def flush_data(self): + '''The data for a flush. The lock must be taken.''' + assert self.state_lock.locked() return FlushData(self.height, self.tx_count, self.headers, self.tx_hashes, self.undo_infos, self.utxo_cache, self.db_deletes, self.tip) async def flush(self, flush_utxos): - await self.run_in_thread_with_lock( - self.db.flush_dbs, self.flush_data(), flush_utxos, - self.estimate_txs_remaining) + def flush(): + self.db.flush_dbs(self.flush_data(), flush_utxos, + self.estimate_txs_remaining) + await self.run_in_thread_with_lock(flush) async def _maybe_flush(self): # If caught up, flush everything as client queries are