From 7b36a1431cfd5bdd20ba5b99524881aa2f405e7a Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Sun, 9 Oct 2016 13:00:46 +0900 Subject: [PATCH 1/3] Add instrumentation --- server/db.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/server/db.py b/server/db.py index 35c7e47..9eacc54 100644 --- a/server/db.py +++ b/server/db.py @@ -191,13 +191,16 @@ class DB(object): with self.db.write_batch(transaction=True) as batch: # Flush the state, then the cache, then the history self.put_state() - for key, value in self.write_cache.items(): + for n, (key, value) in enumerate(self.write_cache.items()): if value is None: batch.delete(key) deletes += 1 else: batch.put(key, value) writes += 1 + if n % 1000 == 0: + pct = n // len(self.write_cache) * 100 + self.logger.info('{:d} {:d}% done...'.format(n, pct)) self.flush_history() @@ -220,7 +223,7 @@ class DB(object): # Drop any None entry self.history.pop(None, None) - for hash160, hist in self.history.items(): + for m, (hash160, hist) in enumerate(self.history.items()): prefix = b'H' + hash160 for key, v in self.db.iterator(reverse=True, prefix=prefix, fill_cache=False): @@ -245,6 +248,10 @@ class DB(object): .format(addr, idx)) self.db.put(key, v[n:n + HIST_ENTRY_LEN]) + if m % 1000 == 0: + pct = m // len(self.history) * 100 + self.logger.info('{:d} {:d}% done...'.format(m, pct)) + self.history = defaultdict(list) def get_hash160(self, tx_hash, idx, delete=True): From 329f41164049da35f70a69e321df059e68c4e90e Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Sun, 9 Oct 2016 13:27:02 +0900 Subject: [PATCH 2/3] Fix percentages --- server/db.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/server/db.py b/server/db.py index 9eacc54..40a9e02 100644 --- a/server/db.py +++ b/server/db.py @@ -198,9 +198,9 @@ class DB(object): else: batch.put(key, value) writes += 1 - if n % 1000 == 0: - pct = n // len(self.write_cache) * 100 - self.logger.info('{:d} {:d}% done...'.format(n, pct)) + if n % 20000 == 0: + pct = n * 100 // len(self.write_cache) + self.logger.info('U {:d} {:d}% done...'.format(n, pct)) self.flush_history() @@ -248,9 +248,9 @@ class DB(object): .format(addr, idx)) self.db.put(key, v[n:n + HIST_ENTRY_LEN]) - if m % 1000 == 0: - pct = m // len(self.history) * 100 - self.logger.info('{:d} {:d}% done...'.format(m, pct)) + if m % 20000 == 0: + pct = m * 100 // len(self.history) + self.logger.info('H {:d} {:d}% done...'.format(m, pct)) self.history = defaultdict(list) From 4879422e92edd611b10f5100fc6173740324d214 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Sun, 9 Oct 2016 16:06:45 +0900 Subject: [PATCH 3/3] Improve the leveldb flush; it should be a lot faster now. More useful logging and stats. --- README.rst | 5 +- server/db.py | 191 +++++++++++++++++++++++++---------------------- server/server.py | 15 ++-- 3 files changed, 112 insertions(+), 99 deletions(-) diff --git a/README.rst b/README.rst index be2b7ca..0364edb 100644 --- a/README.rst +++ b/README.rst @@ -100,9 +100,8 @@ Database Format =============== The database and metadata formats of ElectrumX are very likely to -change in the future. If so old DBs would not be usable. However it -should be easy to write short Python script to do any necessary -conversions in-place without having to start afresh. +change in the future which will render old DBs unusable. For now I do +not intend to provide converters as the rate of flux is high. Miscellany diff --git a/server/db.py b/server/db.py index 40a9e02..a166cac 100644 --- a/server/db.py +++ b/server/db.py @@ -17,9 +17,11 @@ import plyvel from lib.coins import Bitcoin from lib.script import ScriptPubKey -ADDR_TX_HASH_LEN=6 -UTXO_TX_HASH_LEN=4 -HIST_ENTRY_LEN=256*4 # Admits 65536 * HIST_ENTRY_LEN/4 entries +# History can hold approx. 65536 * HIST_ENTRIES_PER_KEY entries +HIST_ENTRIES_PER_KEY = 1024 +HIST_VALUE_BYTES = HIST_ENTRIES_PER_KEY * 4 +ADDR_TX_HASH_LEN = 4 +UTXO_TX_HASH_LEN = 4 UTXO = namedtuple("UTXO", "tx_num tx_pos tx_hash height value") @@ -36,6 +38,7 @@ class DB(object): TIP_KEY = b'tip' GENESIS_KEY = b'genesis' TX_COUNT_KEY = b'tx_count' + FLUSH_COUNT_KEY = b'flush_count' WALL_TIME_KEY = b'wall_time' class Error(Exception): @@ -59,8 +62,7 @@ class DB(object): self.writes_avoided = 0 self.read_cache_hits = 0 self.write_cache_hits = 0 - self.last_writes = 0 - self.last_time = time.time() + self.hcolls = 0 # Things put in a batch are not visible until the batch is written, # so use a cache. @@ -95,6 +97,8 @@ class DB(object): self.tx_count = self.db_tx_count self.height = self.db_height - 1 self.tx_counts.fromfile(self.txcount_file, self.db_height) + self.last_flush = time.time() + # FIXME: this sucks and causes issues with exceptions in init_db() if self.tx_count == 0: self.flush() @@ -107,6 +111,7 @@ class DB(object): def init_db(self): self.db_height = 0 self.db_tx_count = 0 + self.flush_count = 0 self.wall_time = 0 self.tip = self.coin.GENESIS_HASH self.put(self.GENESIS_KEY, unhexlify(self.tip)) @@ -118,12 +123,14 @@ class DB(object): .format(genesis_hash, self.coin.GENESIS_HASH)) self.db_height = from_4_bytes(self.get(self.HEIGHT_KEY)) self.db_tx_count = from_4_bytes(self.get(self.TX_COUNT_KEY)) + self.flush_count = from_4_bytes(self.get(self.FLUSH_COUNT_KEY)) self.wall_time = from_4_bytes(self.get(self.WALL_TIME_KEY)) self.tip = hexlify(self.get(self.TIP_KEY)) - self.logger.info('{}/{} height: {:,d} tx count: {:,d} sync time: {}' + self.logger.info('{}/{} height: {:,d} tx count: {:,d} ' + 'flush count: {:,d} sync time: {}' .format(self.coin.NAME, self.coin.NET, self.db_height - 1, self.db_tx_count, - self.formatted_wall_time())) + self.flush_count, self.formatted_wall_time())) def formatted_wall_time(self): wall_time = int(self.wall_time) @@ -158,57 +165,43 @@ class DB(object): else: self.write_cache[key] = None - def put_state(self): - now = time.time() - self.wall_time += now - self.last_time - self.last_time = now - self.db_tx_count = self.tx_count - self.db_height = self.height + 1 - self.put(self.HEIGHT_KEY, to_4_bytes(self.db_height)) - self.put(self.TX_COUNT_KEY, to_4_bytes(self.db_tx_count)) - self.put(self.TIP_KEY, unhexlify(self.tip)) - self.put(self.WALL_TIME_KEY, to_4_bytes(int(self.wall_time))) - def flush(self): + '''Flush out all cached state.''' + flush_start = time.time() + last_flush = self.last_flush + tx_diff = self.tx_count - self.db_tx_count + height_diff = self.height + 1 - self.db_height + self.logger.info('starting flush {:,d} txs and {:,d} blocks' + .format(tx_diff, height_diff)) + # Write out the files to the FS before flushing to the DB. If # the DB transaction fails, the files being too long doesn't # matter. But if writing the files fails we do not want to - # have updated the DB. This disk flush is fast. - self.write_headers() - self.write_tx_counts() - self.write_tx_hashes() - - tx_diff = self.tx_count - self.db_tx_count - height_diff = self.height + 1 - self.db_height - self.logger.info('flushing to levelDB {:,d} txs and {:,d} blocks ' - 'to height {:,d} tx count: {:,d}' - .format(tx_diff, height_diff, self.height, - self.tx_count)) - - # This LevelDB flush is slow - deletes = 0 - writes = 0 + # have updated the DB. Flush state last as it reads the wall + # time. + self.flush_to_fs() with self.db.write_batch(transaction=True) as batch: - # Flush the state, then the cache, then the history - self.put_state() - for n, (key, value) in enumerate(self.write_cache.items()): - if value is None: - batch.delete(key) - deletes += 1 - else: - batch.put(key, value) - writes += 1 - if n % 20000 == 0: - pct = n * 100 // len(self.write_cache) - self.logger.info('U {:d} {:d}% done...'.format(n, pct)) - - self.flush_history() - - self.logger.info('flushed. Cache hits: {:,d}/{:,d} writes: {:,d} ' - 'deletes: {:,d} elided: {:,d} sync: {}' - .format(self.write_cache_hits, - self.read_cache_hits, writes, deletes, - self.writes_avoided, + self.flush_cache(batch) + self.flush_history(batch) + self.logger.info('flushed history...') + self.flush_state(batch) + self.logger.info('committing transaction...') + + # Update and put the wall time again - otherwise we drop the + # time it takes leveldb to commit the batch + self.update_wall_time(self.db) + + flush_time = int(self.last_flush - flush_start) + self.logger.info('flushed in {:,d}s to height {:,d} tx count {:,d} ' + 'flush count {:,d}' + .format(flush_time, self.height, self.tx_count, + self.flush_count)) + + txs_per_sec = int(self.tx_count / self.wall_time) + this_txs_per_sec = int(tx_diff / (self.last_flush - last_flush)) + self.logger.info('tx/s since genesis: {:,d} since last flush: {:,d} ' + 'sync time {}' + .format(txs_per_sec, this_txs_per_sec, self.formatted_wall_time())) # Note this preserves semantics and hopefully saves time @@ -217,40 +210,55 @@ class DB(object): self.writes_avoided = 0 self.read_cache_hits = 0 self.write_cache_hits = 0 - self.last_writes = writes - def flush_history(self): + def flush_to_fs(self): + '''Flush the things stored on the filesystem.''' + self.write_headers() + self.write_tx_counts() + self.write_tx_hashes() + os.sync() + + def update_wall_time(self, dest): + now = time.time() + self.wall_time += now - self.last_flush + self.last_flush = now + dest.put(self.WALL_TIME_KEY, to_4_bytes(int(self.wall_time))) + + def flush_state(self, batch): + self.db_tx_count = self.tx_count + self.db_height = self.height + 1 + batch.put(self.HEIGHT_KEY, to_4_bytes(self.db_height)) + batch.put(self.TX_COUNT_KEY, to_4_bytes(self.db_tx_count)) + batch.put(self.FLUSH_COUNT_KEY, to_4_bytes(self.flush_count)) + batch.put(self.TIP_KEY, unhexlify(self.tip)) + self.update_wall_time(batch) + self.flush_count += 1 + + def flush_cache(self, batch): + '''Flushes the UTXO write cache.''' + deletes = writes = 0 + for n, (key, value) in enumerate(self.write_cache.items()): + if value is None: + batch.delete(key) + deletes += 1 + else: + batch.put(key, value) + writes += 1 + + self.logger.info('flushed UTXO cache. Hits: {:,d}/{:,d} ' + 'writes: {:,d} deletes: {:,d} elided: {:,d}' + .format(self.write_cache_hits, + self.read_cache_hits, writes, deletes, + self.writes_avoided)) + + def flush_history(self, batch): # Drop any None entry self.history.pop(None, None) - for m, (hash160, hist) in enumerate(self.history.items()): - prefix = b'H' + hash160 - for key, v in self.db.iterator(reverse=True, prefix=prefix, - fill_cache=False): - assert len(key) == 23 - v += array.array('I', hist).tobytes() - break - else: - key = prefix + bytes(2) - v = array.array('I', hist).tobytes() - - # db.put doesn't accept a memoryview! - self.db.put(key, v[:HIST_ENTRY_LEN]) - if len(v) > HIST_ENTRY_LEN: - # must be big-endian - (idx, ) = struct.unpack('>H', key[-2:]) - for n in range(HIST_ENTRY_LEN, len(v), HIST_ENTRY_LEN): - idx += 1 - key = prefix + struct.pack('>H', idx) - if idx % 500 == 0: - addr = self.coin.P2PKH_address_from_hash160(hash160) - self.logger.info('address {} hist moving to idx {:d}' - .format(addr, idx)) - self.db.put(key, v[n:n + HIST_ENTRY_LEN]) - - if m % 20000 == 0: - pct = m * 100 // len(self.history) - self.logger.info('H {:d} {:d}% done...'.format(m, pct)) + flush_id = struct.pack('>H', self.flush_count) + for hash160, hist in self.history.items(): + key = b'H' + hash160 + flush_id + batch.put(key, array.array('I', hist).tobytes()) self.history = defaultdict(list) @@ -265,10 +273,11 @@ class DB(object): self.delete(key) return data[:20] - # This should almost never happen assert len(data) % 24 == 0 - self.logger.info('hash160 compressed key collision {}' - .format(key.hex())) + self.hcolls += 1 + if self.hcolls % 1000 == 0: + self.logger.info('{} total hash160 compressed key collisions' + .format(self.hcolls)) for n in range(0, len(data), 24): (tx_num, ) = struct.unpack(' last_log + 15: + last_log = now + self.logger.info('prefilled blocks to height {:,d} ' + 'daemon height: {:,d}' + .format(self.fetched_height, + self.daemon_height)) await asyncio.sleep(1) if not self.stop: @@ -108,7 +117,6 @@ class BlockCache(object): if not count or self.stop: return False # Done catching up -# self.logger.info('requesting {:,d} blocks'.format(count)) first = self.fetched_height + 1 param_lists = [[height] for height in range(first, first + count)] hashes = await self.rpc.rpc_multi('getblockhash', param_lists) @@ -129,11 +137,6 @@ class BlockCache(object): # Reverse order and place at front of list self.blocks = list(reversed(blocks)) + self.blocks - self.logger.info('prefilled {:,d} blocks to height {:,d} ' - 'daemon height: {:,d} block cache size: {:,d}' - .format(count, self.fetched_height, - self.daemon_height, self.cache_used())) - # Keep 50 most recent block sizes for fetch count estimation sizes = [len(block) for block in blocks] self.recent_sizes.extend(sizes)