From c7f930a18a33ff7f026cfb91752ce8f8d60d9c22 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Tue, 11 Oct 2016 21:20:04 +0900 Subject: [PATCH] Clean up db initialization and state writing --- server/db.py | 185 +++++++++++++++++++++++------------------------ server/server.py | 2 +- 2 files changed, 93 insertions(+), 94 deletions(-) diff --git a/server/db.py b/server/db.py index e7eae6f..6b71daf 100644 --- a/server/db.py +++ b/server/db.py @@ -2,6 +2,7 @@ # and warranty status of this software. import array +import ast import itertools import os import struct @@ -275,13 +276,6 @@ class UTXOCache(object): class DB(object): - HEIGHT_KEY = b'height' - TIP_KEY = b'tip' - GENESIS_KEY = b'genesis' - TX_COUNT_KEY = b'tx_count' - FLUSH_COUNT_KEY = b'flush_count' - WALL_TIME_KEY = b'wall_time' - class Error(Exception): pass @@ -289,75 +283,90 @@ class DB(object): self.logger = logging.getLogger('DB') self.logger.setLevel(logging.INFO) - self.coin = env.coin + # Meta + self.tx_hash_file_size = 16 * 1024 * 1024 self.flush_MB = env.flush_MB self.next_cache_check = 0 - self.logger.info('flushing after cache reaches {:,d} MB' - .format(self.flush_MB)) + self.last_flush = time.time() + self.coin = env.coin - self.tx_counts = array.array('I') - self.tx_hash_file_size = 4*1024*1024 - # Unflushed items. Headers and tx_hashes have one entry per block + # Chain state (initialize to genesis in case of new DB) + self.height = -1 + self.tx_count = 0 + self.flush_count = 0 + self.wall_time = 0 + self.tip = self.coin.GENESIS_HASH + + # Open DB and metadata files. Record some of its state. + self.db = self.open_db(self.coin) + self.db_tx_count = self.tx_count + self.db_height = self.height + + # Caches to be flushed later. Headers and tx_hashes have one + # entry per block self.headers = [] self.tx_hashes = [] self.history = defaultdict(partial(array.array, 'I')) self.history_size = 0 - - db_name = '{}-{}'.format(self.coin.NAME, self.coin.NET) - try: - self.db = self.open_db(db_name, False) - except: - self.db = self.open_db(db_name, True) - self.headers_file = self.open_file('headers', True) - self.txcount_file = self.open_file('txcount', True) - self.init_db() - self.logger.info('created new database {}'.format(db_name)) - else: - self.logger.info('successfully opened database {}'.format(db_name)) - self.headers_file = self.open_file('headers') - self.txcount_file = self.open_file('txcount') - self.read_db() - self.utxo_cache = UTXOCache(self, self.db, self.coin) + self.tx_counts = array.array('I') + self.txcount_file.seek(0) + self.tx_counts.fromfile(self.txcount_file, self.height + 1) - # Note that DB_HEIGHT is the height of the next block to be written. - # So an empty DB has a DB_HEIGHT of 0 not -1. - self.tx_count = self.db_tx_count - self.height = self.db_height - 1 - self.tx_counts.fromfile(self.txcount_file, self.db_height) - self.last_flush = time.time() - # FIXME: this sucks and causes issues with exceptions in init_db() - if self.tx_count == 0: - self.flush() - - def open_db(self, db_name, create): - return plyvel.DB(db_name, create_if_missing=create, - error_if_exists=create, compression=None) - - def init_db(self): - self.db_height = 0 - self.db_tx_count = 0 - self.flush_count = 0 - self.wall_time = 0 - self.tip = self.coin.GENESIS_HASH - self.db.put(self.GENESIS_KEY, unhexlify(self.tip)) - - def read_db(self): - db = self.db - genesis_hash = hexlify(db.get(self.GENESIS_KEY)) - if genesis_hash != self.coin.GENESIS_HASH: - raise self.Error('DB genesis hash {} does not match coin {}' - .format(genesis_hash, self.coin.GENESIS_HASH)) - self.db_height = from_4_bytes(db.get(self.HEIGHT_KEY)) - self.db_tx_count = from_4_bytes(db.get(self.TX_COUNT_KEY)) - self.flush_count = from_4_bytes(db.get(self.FLUSH_COUNT_KEY)) - self.wall_time = from_4_bytes(db.get(self.WALL_TIME_KEY)) - self.tip = hexlify(db.get(self.TIP_KEY)) + # Log state self.logger.info('{}/{} height: {:,d} tx count: {:,d} ' 'flush count: {:,d} sync time: {}' .format(self.coin.NAME, self.coin.NET, - self.db_height - 1, self.db_tx_count, + self.height, self.tx_count, self.flush_count, self.formatted_wall_time())) + self.logger.info('flushing after cache reaches {:,d} MB' + .format(self.flush_MB)) + + def open_db(self, coin): + self.headers_file = self.open_file('headers', True) + self.txcount_file = self.open_file('txcount', True) + is_new = self.headers_file.seek(0, 2) == 0 + if is_new != (self.txcount_file.seek(0, 2) == 0): + raise self.Error('just one metadata file is zero-length') + + db_name = '{}-{}'.format(coin.NAME, coin.NET) + db = plyvel.DB(db_name, create_if_missing=is_new, + error_if_exists=is_new, compression=None) + if is_new: + self.logger.info('created new database {}'.format(db_name)) + self.flush_state(db) + else: + self.logger.info('successfully opened database {}'.format(db_name)) + self.read_state(db) + return db + + def flush_state(self, batch): + '''Flush chain state to the batch.''' + now = time.time() + self.wall_time += now - self.last_flush + self.last_flush = now + state = { + 'genesis': self.coin.GENESIS_HASH, + 'height': self.height, + 'tx_count': self.tx_count, + 'tip': self.tip, + 'flush_count': self.flush_count, + 'wall_time': self.wall_time, + } + batch.put(b'state', repr(state).encode('ascii')) + + def read_state(self, db): + state = db.get(b'state') + state = ast.literal_eval(state.decode('ascii')) + if state['genesis'] != self.coin.GENESIS_HASH: + raise self.Error('DB genesis hash {} does not match coin {}' + .format(state['genesis_hash'], + self.coin.GENESIS_HASH)) + self.height = state['height'] + self.tx_count = state['tx_count'] + self.tip = state['tip'] + self.flush_count = state['flush_count'] + self.wall_time = state['wall_time'] def formatted_wall_time(self): wall_time = int(self.wall_time) @@ -365,12 +374,12 @@ class DB(object): wall_time // 86400, (wall_time % 86400) // 3600, (wall_time % 3600) // 60, wall_time % 60) - def flush(self): + def flush_all(self): '''Flush out all cached state.''' flush_start = time.time() last_flush = self.last_flush tx_diff = self.tx_count - self.db_tx_count - height_diff = self.height + 1 - self.db_height + height_diff = self.height - self.db_height self.logger.info('starting flush {:,d} txs and {:,d} blocks' .format(tx_diff, height_diff)) @@ -386,9 +395,13 @@ class DB(object): self.flush_state(batch) self.logger.info('committing transaction...') + # The flush succeeded, so update our record of DB state + self.db_tx_count = self.tx_count + self.db_height = self.height + # Update and put the wall time again - otherwise we drop the - # time it takes leveldb to commit the batch - self.update_wall_time(self.db) + # time it took leveldb to commit the batch + self.flush_state(self.db) flush_time = int(self.last_flush - flush_start) self.logger.info('flushed in {:,d}s to height {:,d} tx count {:,d} ' @@ -410,27 +423,11 @@ class DB(object): self.write_tx_hashes() os.sync() - def update_wall_time(self, dest): - '''Put the wall time to dest - a DB or batch.''' - now = time.time() - self.wall_time += now - self.last_flush - self.last_flush = now - dest.put(self.WALL_TIME_KEY, to_4_bytes(int(self.wall_time))) - - def flush_state(self, batch): - self.db_tx_count = self.tx_count - self.db_height = self.height + 1 - batch.put(self.HEIGHT_KEY, to_4_bytes(self.db_height)) - batch.put(self.TX_COUNT_KEY, to_4_bytes(self.db_tx_count)) - batch.put(self.FLUSH_COUNT_KEY, to_4_bytes(self.flush_count)) - batch.put(self.TIP_KEY, unhexlify(self.tip)) - self.update_wall_time(batch) - self.flush_count += 1 - def flush_history(self, batch): # Drop any None entry self.history.pop(None, None) + self.flush_count += 1 flush_id = struct.pack('>H', self.flush_count) for hash168, hist in self.history.items(): key = b'H' + hash168 + flush_id @@ -442,9 +439,10 @@ class DB(object): self.history = defaultdict(partial(array.array, 'I')) self.history_size = 0 - def open_file(self, filename, truncate=False, create=False): + def open_file(self, filename, create=False): + '''Open the file name. Return its handle.''' try: - return open(filename, 'wb+' if truncate else 'rb+') + return open(filename, 'rb+') except FileNotFoundError: if create: return open(filename, 'wb+') @@ -459,14 +457,15 @@ class DB(object): headers = b''.join(self.headers) header_len = self.coin.HEADER_LEN assert len(headers) % header_len == 0 - self.headers_file.seek(self.db_height * header_len) + self.headers_file.seek((self.db_height + 1) * header_len) self.headers_file.write(headers) self.headers_file.flush() self.headers = [] def write_tx_counts(self): - self.txcount_file.seek(self.db_height * self.tx_counts.itemsize) - self.txcount_file.write(self.tx_counts[self.db_height: self.height + 1]) + self.txcount_file.seek((self.db_height + 1) * self.tx_counts.itemsize) + self.txcount_file.write(self.tx_counts[self.db_height + 1: + self.height + 1]) self.txcount_file.flush() def write_tx_hashes(self): @@ -529,7 +528,7 @@ class DB(object): if now > self.next_cache_check: self.next_cache_check = now + 60 if self.cache_MB() > self.flush_MB: - self.flush() + self.flush_all() def process_tx(self, tx_hash, tx): cache = self.utxo_cache @@ -552,13 +551,13 @@ class DB(object): height = bisect_right(self.tx_counts, tx_num) # Is this on disk or unflushed? - if height >= self.db_height: - tx_hashes = self.tx_hashes[height - self.db_height] + if height > self.db_height: + tx_hashes = self.tx_hashes[height - (self.db_height + 1)] tx_hash = tx_hashes[tx_num - self.tx_counts[height - 1]] else: file_pos = tx_num * 32 file_num, offset = divmod(file_pos, self.tx_hash_file_size) - filename = 'hashes{:05d}'.format(file_num) + filename = 'hashes{:04d}'.format(file_num) with self.open_file(filename) as f: f.seek(offset) tx_hash = f.read(32) diff --git a/server/server.py b/server/server.py index 91117f9..50a9e35 100644 --- a/server/server.py +++ b/server/server.py @@ -43,7 +43,7 @@ class BlockCache(object): # Cache target size is in MB. Has little effect on sync time. self.cache_limit = 10 self.daemon_height = 0 - self.fetched_height = db.db_height + self.fetched_height = db.height # Blocks stored in reverse order. Next block is at end of list. self.blocks = [] self.recent_sizes = []