Browse Source

Add support for RocksDB and LMDB

master
Johann Bauer 8 years ago
parent
commit
78edab6faa
  1. 1
      AUTHORS
  2. 13
      HOWTO.rst
  3. 33
      samples/scripts/NOTES
  4. 41
      server/block_processor.py
  5. 1
      server/env.py
  6. 152
      server/storage.py

1
AUTHORS

@ -1 +1,2 @@
Neil Booth: creator and maintainer
Johann Bauer: backend DB abstraction

13
HOWTO.rst

@ -29,6 +29,19 @@ metadata comes to just over 17GB. Leveldb needs a bit more for brief
periods, and the block chain is only getting longer, so I would
recommend having at least 30-40GB free space.
Database Engine
===============
You can choose between either RocksDB, LevelDB or LMDB to store transaction
information on disk. Currently, the fastest seems to be RocksDB with LevelDB
being about 10% slower. LMDB seems to be the slowest but maybe that's because
of bad implementation or configuration.
You will need to install either:
+ `plyvel <https://plyvel.readthedocs.io/en/latest/installation.html>`_ for LevelDB
+ `pyrocksdb <http://pyrocksdb.readthedocs.io/en/v0.4/installation.html>`_ for RocksDB
+ `lmdb <https://lmdb.readthedocs.io/en/release/#installation-unix>`_ for LMDB
Running
=======

33
samples/scripts/NOTES

@ -31,18 +31,21 @@ bit bigger than the combine cache size, because of Python overhead and
also because leveldb can consume quite a lot of memory during UTXO
flushing. So these are rough numbers only:
HIST_MB - amount of history cache, in MB, to retain before flushing to
disk. Default is 250; probably no benefit being much larger
as history is append-only and not searched.
UTXO_MB - amount of UTXO and history cache, in MB, to retain before
flushing to disk. Default is 1000. This may be too large
for small boxes or too small for machines with lots of RAM.
Larger caches generally perform better as there is
significant searching of the UTXO cache during indexing.
However, I don't see much benefit in my tests pushing this
too high, and in fact performance begins to fall. My
machine has 24GB RAM; the slow down is probably because of
leveldb caching and Python GC effects. However this may be
very dependent on hardware and you may have different
results.
HIST_MB - amount of history cache, in MB, to retain before flushing to
disk. Default is 250; probably no benefit being much larger
as history is append-only and not searched.
UTXO_MB - amount of UTXO and history cache, in MB, to retain before
flushing to disk. Default is 1000. This may be too large
for small boxes or too small for machines with lots of RAM.
Larger caches generally perform better as there is
significant searching of the UTXO cache during indexing.
However, I don't see much benefit in my tests pushing this
too high, and in fact performance begins to fall. My
machine has 24GB RAM; the slow down is probably because of
leveldb caching and Python GC effects. However this may be
very dependent on hardware and you may have different
results.
DB_ENGINE - database engine for the transaction database, either rocksdb,
leveldb or lmdb

41
server/block_processor.py

@ -10,13 +10,12 @@ from bisect import bisect_left
from collections import defaultdict, namedtuple
from functools import partial
import plyvel
from server.cache import FSCache, UTXOCache, NO_CACHE_ENTRY
from server.daemon import DaemonError
from lib.hash import hash_to_str
from lib.script import ScriptPubKey
from lib.util import chunks, LoggedClass
from server.storage import LMDB, RocksDB, LevelDB, NoDatabaseException
def formatted_time(t):
@ -137,7 +136,7 @@ class BlockProcessor(LoggedClass):
self.wall_time = 0
# Open DB and metadata files. Record some of its state.
self.db = self.open_db(self.coin)
self.db = self.open_db(self.coin, env.db_engine)
self.tx_count = self.db_tx_count
self.height = self.db_height
self.tip = self.db_tip
@ -200,7 +199,7 @@ class BlockProcessor(LoggedClass):
await self.handle_chain_reorg()
self.caught_up = False
break
await asyncio.sleep(0) # Yield
await asyncio.sleep(0) # Yield
if self.height != self.daemon.cached_height():
continue
@ -239,6 +238,7 @@ class BlockProcessor(LoggedClass):
'''Return the list of hashes to back up beacuse of a reorg.
The hashes are returned in order of increasing height.'''
def match_pos(hashes1, hashes2):
for n, (hash1, hash2) in enumerate(zip(hashes1, hashes2)):
if hash1 == hash2:
@ -268,17 +268,22 @@ class BlockProcessor(LoggedClass):
return self.fs_cache.block_hashes(start, count)
def open_db(self, coin):
def open_db(self, coin, db_engine):
db_name = '{}-{}'.format(coin.NAME, coin.NET)
db_engine_class = {
"leveldb": LevelDB,
"rocksdb": RocksDB,
"lmdb": LMDB
}[db_engine.lower()]
try:
db = plyvel.DB(db_name, create_if_missing=False,
error_if_exists=False, compression=None)
except:
db = plyvel.DB(db_name, create_if_missing=True,
error_if_exists=True, compression=None)
self.logger.info('created new database {}'.format(db_name))
db = db_engine_class(db_name, create_if_missing=False,
error_if_exists=False, compression=None)
except NoDatabaseException:
db = db_engine_class(db_name, create_if_missing=True,
error_if_exists=True, compression=None)
self.logger.info('created new {} database {}'.format(db_engine, db_name))
else:
self.logger.info('successfully opened database {}'.format(db_name))
self.logger.info('successfully opened {} database {}'.format(db_engine, db_name))
self.read_state(db)
return db
@ -306,7 +311,7 @@ class BlockProcessor(LoggedClass):
'''
if self.flush_count < self.utxo_flush_count:
raise ChainError('DB corrupt: flush_count < utxo_flush_count')
with self.db.write_batch(transaction=True) as batch:
with self.db.write_batch() as batch:
if self.flush_count > self.utxo_flush_count:
self.logger.info('DB shut down uncleanly. Scanning for '
'excess history flushes...')
@ -400,7 +405,7 @@ class BlockProcessor(LoggedClass):
if self.height > self.db_height:
self.fs_cache.flush(self.height, self.tx_count)
with self.db.write_batch(transaction=True) as batch:
with self.db.write_batch() as batch:
# History first - fast and frees memory. Flush state last
# as it reads the wall time.
if self.height > self.db_height:
@ -646,7 +651,7 @@ class BlockProcessor(LoggedClass):
if not tx.is_coinbase:
for txin in reversed(tx.inputs):
n -= 33
undo_item = undo_info[n:n+33]
undo_item = undo_info[n:n + 33]
put_utxo(txin.prev_hash + pack('<H', txin.prev_idx),
undo_item)
hash168s.add(undo_item[:21])
@ -693,13 +698,13 @@ class BlockProcessor(LoggedClass):
prefix = b'u' + hash168
utxos = []
for k, v in self.db.iterator(prefix=prefix):
(tx_pos, ) = unpack('<H', k[-2:])
(tx_pos,) = unpack('<H', k[-2:])
for n in range(0, len(v), 12):
if limit == 0:
return
(tx_num, ) = unpack('<I', v[n:n+4])
(value, ) = unpack('<Q', v[n+4:n+12])
(tx_num,) = unpack('<I', v[n:n + 4])
(value,) = unpack('<Q', v[n + 4:n + 12])
tx_hash, height = self.get_tx_hash(tx_num)
yield UTXO(tx_num, tx_pos, tx_hash, height, value)
limit -= 1

1
server/env.py

@ -32,6 +32,7 @@ class Env(LoggedClass):
self.banner_file = self.default('BANNER_FILE', None)
# The electrum client takes the empty string as unspecified
self.donation_address = self.default('DONATION_ADDRESS', '')
self.db_engine = self.default('DB_ENGINE', 'leveldb')
def default(self, envvar, default):
return environ.get(envvar, default)

152
server/storage.py

@ -0,0 +1,152 @@
import os
class Storage(object):
def __init__(self, name, create_if_missing=False, error_if_exists=False, compression=None):
if not create_if_missing and not os.path.exists(name):
raise NoDatabaseException
def get(self, key):
raise NotImplementedError()
def put(self, key, value):
raise NotImplementedError()
def write_batch(self):
"""
Returns a context manager that provides `put` and `delete`.
Changes should only be committed when the context manager closes without an exception.
"""
raise NotImplementedError()
def iterator(self, prefix=b""):
"""
Returns an iterator that yields (key, value) pairs from the database sorted by key.
If `prefix` is set, only keys starting with `prefix` will be included.
"""
raise NotImplementedError()
class NoDatabaseException(Exception):
pass
class LevelDB(Storage):
def __init__(self, name, create_if_missing=False, error_if_exists=False, compression=None):
super().__init__(name, create_if_missing, error_if_exists, compression)
import plyvel
self.db = plyvel.DB(name, create_if_missing=create_if_missing,
error_if_exists=error_if_exists, compression=compression)
def get(self, key):
return self.db.get(key)
def write_batch(self):
return self.db.write_batch(transaction=True)
def iterator(self, prefix=b""):
return self.db.iterator(prefix=prefix)
def put(self, key, value):
self.db.put(key, value)
class RocksDB(Storage):
rocksdb = None
def __init__(self, name, create_if_missing=False, error_if_exists=False, compression=None):
super().__init__(name, create_if_missing, error_if_exists, compression)
import rocksdb
RocksDB.rocksdb = rocksdb
if not compression:
compression = "no"
compression = getattr(rocksdb.CompressionType, compression + "_compression")
self.db = rocksdb.DB(name, rocksdb.Options(create_if_missing=create_if_missing,
compression=compression,
target_file_size_base=33554432,
max_open_files=1024))
def get(self, key):
return self.db.get(key)
class WriteBatch(object):
def __init__(self, db):
self.batch = RocksDB.rocksdb.WriteBatch()
self.db = db
def __enter__(self):
return self.batch
def __exit__(self, exc_type, exc_val, exc_tb):
if not exc_val:
self.db.write(self.batch)
def write_batch(self):
return RocksDB.WriteBatch(self.db)
class Iterator(object):
def __init__(self, db, prefix):
self.it = db.iteritems()
self.prefix = prefix
def __iter__(self):
self.it.seek(self.prefix)
return self
def __next__(self):
k, v = self.it.__next__()
if not k.startswith(self.prefix):
# We're already ahead of the prefix
raise StopIteration
return k, v
def iterator(self, prefix=b""):
return RocksDB.Iterator(self.db, prefix)
def put(self, key, value):
return self.db.put(key, value)
class LMDB(Storage):
lmdb = None
def __init__(self, name, create_if_missing=False, error_if_exists=False, compression=None):
super().__init__(name, create_if_missing, error_if_exists, compression)
import lmdb
LMDB.lmdb = lmdb
self.env = lmdb.Environment(".", subdir=True, create=create_if_missing, max_dbs=32, map_size=5 * 10 ** 10)
self.db = self.env.open_db(create=create_if_missing)
def get(self, key):
with self.env.begin(db=self.db) as tx:
return tx.get(key)
def put(self, key, value):
with self.env.begin(db=self.db, write=True) as tx:
tx.put(key, value)
def write_batch(self):
return self.env.begin(db=self.db, write=True)
def iterator(self, prefix=b""):
return LMDB.lmdb.Iterator(self.db, self.env, prefix)
class Iterator:
def __init__(self, db, env, prefix):
self.transaction = env.begin(db=db)
self.transaction.__enter__()
self.db = db
self.prefix = prefix
def __iter__(self):
self.iterator = LMDB.lmdb.Cursor(self.db, self.transaction)
self.iterator.set_range(self.prefix)
return self
def __next__(self):
k, v = self.iterator.item()
if not k.startswith(self.prefix) or not self.iterator.next():
# We're already ahead of the prefix
self.transaction.__exit__()
raise StopIteration
return k, v
Loading…
Cancel
Save