diff --git a/server/block_processor.py b/server/block_processor.py index aaf1f2c..362f76f 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -229,9 +229,12 @@ class BlockProcessor(server.db.DB): self.caught_up = True if self.first_sync: self.first_sync = False - self.logger.info('{} synced to height {:,d}. DB version:' - .format(VERSION, self.height, self.db_version)) - self.flush(True) + self.logger.info('{} synced to height {:,d}' + .format(VERSION, self.height)) + self.flush(True) + self.reopen_db(False) + else: + self.flush(True) self.event.set() async def handle_chain_reorg(self, count, touched): diff --git a/server/db.py b/server/db.py index f6672bc..677ca72 100644 --- a/server/db.py +++ b/server/db.py @@ -50,16 +50,8 @@ class DB(LoggedClass): self.logger.info('reorg limit is {:,d} blocks' .format(self.env.reorg_limit)) - # Open DB and metadata files. Record some of its state. - db_name = '{}-{}'.format(self.coin.NAME, self.coin.NET) - self.db = open_db(db_name, env.db_engine) - if self.db.is_new: - self.logger.info('created new {} database {}' - .format(env.db_engine, db_name)) - else: - self.logger.info('successfully opened {} database {}' - .format(env.db_engine, db_name)) - self.read_state() + self.db = None + self.reopen_db(True) create = self.db_height == -1 self.headers_file = self.open_file('headers', create) @@ -77,6 +69,43 @@ class DB(LoggedClass): assert self.db_tx_count == 0 self.clean_db() + def reopen_db(self, first_sync): + '''Open the database. If the database is already open, it is + closed (implicitly via GC) and re-opened. + + Re-open to set the maximum number of open files appropriately. + ''' + if self.db: + self.logger.info('closing DB to re-open') + self.db.close() + + max_open_files = 1024 if first_sync else 256 + + # Open DB and metadata files. Record some of its state. + db_name = '{}-{}'.format(self.coin.NAME, self.coin.NET) + self.db = open_db(db_name, self.env.db_engine, max_open_files) + if self.db.is_new: + self.logger.info('created new {} database {}' + .format(self.env.db_engine, db_name)) + else: + self.logger.info('successfully opened {} database {} for sync: {}' + .format(self.env.db_engine, db_name, first_sync)) + self.read_state() + + if self.first_sync == first_sync: + self.logger.info('software version: {}'.format(VERSION)) + self.logger.info('DB version: {:d}'.format(self.db_version)) + self.logger.info('coin: {}'.format(self.coin.NAME)) + self.logger.info('network: {}'.format(self.coin.NET)) + self.logger.info('height: {:,d}'.format(self.db_height)) + self.logger.info('tip: {}'.format(hash_to_str(self.db_tip))) + self.logger.info('tx count: {:,d}'.format(self.db_tx_count)) + if self.first_sync: + self.logger.info('sync time so far: {}' + .format(formatted_time(self.wall_time))) + else: + self.reopen_db(self.first_sync) + def read_state(self): if self.db.is_new: self.db_height = -1 @@ -110,16 +139,6 @@ class DB(LoggedClass): self.wall_time = state['wall_time'] self.first_sync = state['first_sync'] - self.logger.info('software version: {}'.format(VERSION)) - self.logger.info('DB version: {:d}'.format(self.db_version)) - self.logger.info('coin: {}'.format(self.coin.NAME)) - self.logger.info('network: {}'.format(self.coin.NET)) - self.logger.info('height: {:,d}'.format(self.db_height)) - self.logger.info('tip: {}'.format(hash_to_str(self.db_tip))) - self.logger.info('tx count: {:,d}'.format(self.db_tx_count)) - if self.first_sync: - self.logger.info('sync time so far: {}' - .format(formatted_time(self.wall_time))) if self.flush_count < self.utxo_flush_count: raise self.DBError('DB corrupt: flush_count < utxo_flush_count') diff --git a/server/storage.py b/server/storage.py index b8a0785..12d7d33 100644 --- a/server/storage.py +++ b/server/storage.py @@ -16,12 +16,12 @@ from functools import partial from lib.util import subclasses, increment_byte_string -def open_db(name, db_engine): +def open_db(name, db_engine, for_sync): '''Returns a database handle.''' for db_class in subclasses(Storage): if db_class.__name__.lower() == db_engine.lower(): db_class.import_module() - return db_class(name) + return db_class(name, for_sync) raise RuntimeError('unrecognised DB engine "{}"'.format(db_engine)) @@ -29,9 +29,9 @@ def open_db(name, db_engine): class Storage(object): '''Abstract base class of the DB backend abstraction.''' - def __init__(self, name): + def __init__(self, name, for_sync): self.is_new = not os.path.exists(name) - self.open(name, create=self.is_new) + self.open(name, create=self.is_new, for_sync=for_sync) @classmethod def import_module(cls): @@ -42,6 +42,10 @@ class Storage(object): '''Open an existing database or create a new one.''' raise NotImplementedError + def close(self): + '''Close an existing database.''' + raise NotImplementedError + def get(self, key): raise NotImplementedError @@ -75,9 +79,11 @@ class LevelDB(Storage): import plyvel cls.module = plyvel - def open(self, name, create): + def open(self, name, create, for_sync): + mof = 1024 if for_sync else 256 self.db = self.module.DB(name, create_if_missing=create, - max_open_files=256, compression=None) + max_open_files=mof, compression=None) + self.close = self.db.close self.get = self.db.get self.put = self.db.put self.iterator = self.db.iterator @@ -92,18 +98,25 @@ class RocksDB(Storage): import rocksdb cls.module = rocksdb - def open(self, name, create): + def open(self, name, create, for_sync): + mof = 1024 if for_sync else 256 compression = "no" compression = getattr(self.module.CompressionType, compression + "_compression") options = self.module.Options(create_if_missing=create, compression=compression, target_file_size_base=33554432, - max_open_files=1024) + max_open_files=mof) self.db = self.module.DB(name, options) self.get = self.db.get self.put = self.db.put + def close(self): + # PyRocksDB doesn't provide a close method; hopefully this is enough + self.db = None + import gc + gc.collect() + class WriteBatch(object): def __init__(self, db): self.batch = RocksDB.module.WriteBatch() @@ -157,11 +170,15 @@ class LMDB(Storage): import lmdb cls.module = lmdb - def open(self, name, create): + def open(self, name, create, for_sync): + # I don't see anything equivalent to max_open_files for for_sync self.env = LMDB.module.Environment('.', subdir=True, create=create, max_dbs=32, map_size=5 * 10 ** 10) self.db = self.env.open_db(create=create) + def close(self): + self.env.close() + def get(self, key): with self.env.begin(db=self.db) as tx: return tx.get(key) diff --git a/tests/test_storage.py b/tests/test_storage.py index b765c27..2c3d429 100644 --- a/tests/test_storage.py +++ b/tests/test_storage.py @@ -21,7 +21,7 @@ for c in subclasses(Storage): def db(tmpdir, request): cwd = os.getcwd() os.chdir(str(tmpdir)) - db = open_db("db", request.param) + db = open_db("db", request.param, False) os.chdir(cwd) yield db # Make sure all the locks and handles are closed @@ -66,4 +66,4 @@ def test_iterator_reverse(db): assert list(db.iterator(prefix=b"abc", reverse=True)) == [ (b"abc" + str.encode(str(i)), str.encode(str(i))) for i in reversed(range(5)) - ] \ No newline at end of file + ]