Browse Source

Split out history into its own DB.

master
Neil Booth 8 years ago
parent
commit
91ca5fc14a
  1. 52
      docs/ENVIRONMENT.rst
  2. 12
      query.py
  3. 116
      server/block_processor.py
  4. 281
      server/db.py
  5. 2
      server/env.py
  6. 16
      server/storage.py

52
docs/ENVIRONMENT.rst

@ -260,34 +260,29 @@ Cache
----- -----
If synchronizing from the Genesis block your performance might change If synchronizing from the Genesis block your performance might change
by tweaking the following cache variables. Cache size is only checked by tweaking the cache size. Cache size is only checked roughly every
roughly every minute, so the caches can grow beyond the specified minute, so the cache can grow beyond the specified size. Moreover,
size. Also the Python process is often quite a bit fatter than the the Python process is often quite a bit fatter than the cache size,
combined cache size, because of Python overhead and also because because of Python overhead and also because leveldb consumes a lot of
leveldb consumes a lot of memory during UTXO flushing. So I recommend memory when flushing. So I recommend you do not set this over 60% of
you set the sum of these to nothing over half your available physical your available physical RAM:
RAM:
* **HIST_MB** * **CACHE_MB**
The amount of history cache, in MB, to retain before flushing to The amount of cache, in MB, to use. The default is 1,200.
disk. Default is 300; probably no benefit being much larger as
history is append-only and not searched.
I do not recommend setting this above 500. A portion of the cache is reserved for unflushed history, which is
written out frequently. The bulk is used to cache UTXOs.
* **UTXO_MB** Larger caches probably increase performance a little as there is
significant searching of the UTXO cache during indexing. However, I
don't see much benefit in my tests pushing this too high, and in
fact performance begins to fall, probably because LevelDB already
caches, and also because of Python GC.
The amount of UTXO and history cache, in MB, to retain before I do not recommend raising this above 2000. If upgrading from prior
flushing to disk. Default is 1000. This may be too large for small versions, a value of 90% of the sum of the old UTXO_MB and HIST_MB
boxes or too small for machines with lots of RAM. Larger caches variables is roughly equivalent.
generally perform better as there is significant searching of the
UTXO cache during indexing. However, I don't see much benefit in my
tests pushing this too high, and in fact performance begins to fall.
My machine has 24GB RAM; the slow down is probably because of
leveldb caching and Python GC effects.
I do not recommend setting this above 2000.
Debugging Debugging
--------- ---------
@ -297,9 +292,12 @@ The following are for debugging purposes:
* **FORCE_REORG** * **FORCE_REORG**
If set to a positive integer, it will simulate a reorg of the If set to a positive integer, it will simulate a reorg of the
blockchain for that number of blocks on startup. Although it should blockchain for that number of blocks on startup. You must have
fail gracefully if set to a value greater than **REORG_LIMIT**, I do synced before using this, otherwise there will be no undo
not recommend it as I have not tried it and there is a chance your information.
DB might corrupt.
Although it should fail gracefully if set to a value greater than
**REORG_LIMIT**, I do not recommend it as I have not tried it and
there is a chance your DB might corrupt.
.. _lib/coins.py: https://github.com/kyuupichan/electrumx/blob/master/lib/coins.py .. _lib/coins.py: https://github.com/kyuupichan/electrumx/blob/master/lib/coins.py

12
query.py

@ -20,23 +20,23 @@ from server.db import DB
from lib.hash import hash_to_str from lib.hash import hash_to_str
def count_entries(db): def count_entries(hist_db, utxo_db):
utxos = 0 utxos = 0
for key in db.iterator(prefix=b'u', include_value=False): for key in utxo_db.iterator(prefix=b'u', include_value=False):
utxos += 1 utxos += 1
print("UTXO count:", utxos) print("UTXO count:", utxos)
hashX = 0 hashX = 0
for key in db.iterator(prefix=b'h', include_value=False): for key in utxo_db.iterator(prefix=b'h', include_value=False):
hashX += 1 hashX += 1
print("HashX count:", hashX) print("HashX count:", hashX)
hist = 0 hist = 0
hist_len = 0 hist_len = 0
for key, value in db.iterator(prefix=b'H'): for key, value in hist_db.iterator(prefix=b'H'):
hist += 1 hist += 1
hist_len += len(value) // 4 hist_len += len(value) // 4
print("History rows {:,d} entries {:,d}", hist, hist_len) print("History rows {:,d} entries {:,d}".format(hist, hist_len))
def main(): def main():
@ -44,7 +44,7 @@ def main():
bp = DB(env) bp = DB(env)
coin = env.coin coin = env.coin
if len(sys.argv) == 1: if len(sys.argv) == 1:
count_entries(bp.db) count_entries(bp.hist_db, bp.utxo_db)
return return
argc = 1 argc = 1
try: try:

116
server/block_processor.py

@ -12,7 +12,6 @@ import array
import asyncio import asyncio
from struct import pack, unpack from struct import pack, unpack
import time import time
from bisect import bisect_left
from collections import defaultdict from collections import defaultdict
from functools import partial from functools import partial
@ -192,7 +191,7 @@ class BlockProcessor(server.db.DB):
self.db_deletes = [] self.db_deletes = []
# Log state # Log state
if self.first_sync: if self.utxo_db.for_sync:
self.logger.info('flushing DB cache at {:,d} MB' self.logger.info('flushing DB cache at {:,d} MB'
.format(self.cache_MB)) .format(self.cache_MB))
@ -250,13 +249,12 @@ class BlockProcessor(server.db.DB):
def caught_up(self): def caught_up(self):
'''Called when first caught up after starting.''' '''Called when first caught up after starting.'''
if not self.caught_up_event.is_set(): if not self.caught_up_event.is_set():
self.first_sync = False
self.flush(True) self.flush(True)
if self.first_sync: if self.utxo_db.for_sync:
self.logger.info('{} synced to height {:,d}' self.logger.info('{} synced to height {:,d}'
.format(VERSION, self.height)) .format(VERSION, self.height))
self.first_sync = False self.open_dbs()
self.flush_state(self.db)
self.open_db(for_sync=False)
self.caught_up_event.set() self.caught_up_event.set()
async def handle_chain_reorg(self, touched, count=None): async def handle_chain_reorg(self, touched, count=None):
@ -336,22 +334,34 @@ class BlockProcessor(server.db.DB):
self.assert_flushed() self.assert_flushed()
return return
self.flush_count += 1
flush_start = time.time() flush_start = time.time()
last_flush = self.last_flush last_flush = self.last_flush
tx_diff = self.tx_count - self.last_flush_tx_count tx_diff = self.tx_count - self.last_flush_tx_count
with self.db.write_batch() as batch: # Flush to file system
# History first - fast and frees memory. Flush state last self.fs_flush()
# as it reads the wall time. fs_end = time.time()
self.flush_history(batch) if self.utxo_db.for_sync:
self.logger.info('flushed to FS in {:.1f}s'
.format(fs_end - flush_start))
# History next - it's fast and frees memory
self.flush_history(self.history)
if self.utxo_db.for_sync:
self.logger.info('flushed history in {:.1f}s for {:,d} addrs'
.format(time.time() - fs_end, len(self.history)))
self.history = defaultdict(partial(array.array, 'I'))
self.history_size = 0
# Flush state last as it reads the wall time.
with self.utxo_db.write_batch() as batch:
if flush_utxos: if flush_utxos:
self.flush_utxos(batch) self.flush_utxos(batch)
self.flush_state(batch) self.flush_state(batch)
# Update and put the wall time again - otherwise we drop the # Update and put the wall time again - otherwise we drop the
# time it took to commit the batch # time it took to commit the batch
self.flush_state(self.db) self.flush_state(self.utxo_db)
self.logger.info('flush #{:,d} took {:.1f}s. Height {:,d} txs: {:,d}' self.logger.info('flush #{:,d} took {:.1f}s. Height {:,d} txs: {:,d}'
.format(self.flush_count, .format(self.flush_count,
@ -359,7 +369,7 @@ class BlockProcessor(server.db.DB):
self.height, self.tx_count)) self.height, self.tx_count))
# Catch-up stats # Catch-up stats
if self.first_sync: if self.utxo_db.for_sync:
daemon_height = self.daemon.cached_height() daemon_height = self.daemon.cached_height()
tx_per_sec = int(self.tx_count / self.wall_time) tx_per_sec = int(self.tx_count / self.wall_time)
this_tx_per_sec = 1 + int(tx_diff / (self.last_flush - last_flush)) this_tx_per_sec = 1 + int(tx_diff / (self.last_flush - last_flush))
@ -381,32 +391,12 @@ class BlockProcessor(server.db.DB):
.format(formatted_time(self.wall_time), .format(formatted_time(self.wall_time),
formatted_time(tx_est / this_tx_per_sec))) formatted_time(tx_est / this_tx_per_sec)))
def flush_history(self, batch):
fs_start = time.time()
self.fs_flush()
fs_end = time.time()
flush_id = pack('>H', self.flush_count)
for hashX, hist in self.history.items():
key = b'H' + hashX + flush_id
batch.put(key, hist.tobytes())
if self.first_sync:
self.logger.info('flushed to FS in {:.1f}s, history in {:.1f}s '
'for {:,d} addrs'
.format(fs_end - fs_start, time.time() - fs_end,
len(self.history)))
self.history = defaultdict(partial(array.array, 'I'))
self.history_size = 0
def fs_flush(self): def fs_flush(self):
'''Flush the things stored on the filesystem.''' '''Flush the things stored on the filesystem.'''
assert self.fs_height + len(self.headers) == self.height assert self.fs_height + len(self.headers) == self.height
assert self.tx_count == self.tx_counts[-1] if self.tx_counts else 0 assert self.tx_count == self.tx_counts[-1] if self.tx_counts else 0
self.fs_update(self.fs_height, self.headers, self.tx_hashes) self.fs_update(self.fs_height, self.headers, self.tx_hashes)
self.fs_height = self.height self.fs_height = self.height
self.fs_tx_count = self.tx_count self.fs_tx_count = self.tx_count
self.tx_hashes = [] self.tx_hashes = []
@ -422,55 +412,30 @@ class BlockProcessor(server.db.DB):
assert self.height < self.db_height assert self.height < self.db_height
assert not self.history assert not self.history
self.flush_count += 1
flush_start = time.time() flush_start = time.time()
with self.db.write_batch() as batch: # Backup FS (just move the pointers back)
self.fs_height = self.height
self.fs_tx_count = self.tx_count
assert not self.headers
assert not self.tx_hashes
# Backup history
nremoves = self.backup_history(hashXs)
self.logger.info('backing up removed {:,d} history entries from '
'{:,d} addresses'.format(nremoves, len(hashXs)))
with self.utxo_db.write_batch() as batch:
# Flush state last as it reads the wall time. # Flush state last as it reads the wall time.
self.backup_history(batch, hashXs)
self.flush_utxos(batch) self.flush_utxos(batch)
self.flush_state(batch) self.flush_state(batch)
# Update and put the wall time again - otherwise we drop the
# time it took to commit the batch
self.flush_state(self.db)
self.logger.info('backup flush #{:,d} took {:.1f}s. ' self.logger.info('backup flush #{:,d} took {:.1f}s. '
'Height {:,d} txs: {:,d}' 'Height {:,d} txs: {:,d}'
.format(self.flush_count, .format(self.flush_count,
self.last_flush - flush_start, self.last_flush - flush_start,
self.height, self.tx_count)) self.height, self.tx_count))
def backup_history(self, batch, hashXs):
nremoves = 0
for hashX in sorted(hashXs):
prefix = b'H' + hashX
deletes = []
puts = {}
for key, hist in self.db.iterator(prefix=prefix, reverse=True):
a = array.array('I')
a.frombytes(hist)
# Remove all history entries >= self.tx_count
idx = bisect_left(a, self.tx_count)
nremoves += len(a) - idx
if idx > 0:
puts[key] = a[:idx].tobytes()
break
deletes.append(key)
for key in deletes:
batch.delete(key)
for key, value in puts.items():
batch.put(key, value)
self.fs_height = self.height
self.fs_tx_count = self.tx_count
assert not self.headers
assert not self.tx_hashes
self.logger.info('backing up removed {:,d} history entries from '
'{:,d} addresses'.format(nremoves, len(hashXs)))
def check_cache_size(self): def check_cache_size(self):
'''Flush a cache if it gets too big.''' '''Flush a cache if it gets too big.'''
# Good average estimates based on traversal of subobjects and # Good average estimates based on traversal of subobjects and
@ -701,7 +666,7 @@ class BlockProcessor(server.db.DB):
# Value: hashX # Value: hashX
prefix = b'h' + tx_hash[:4] + idx_packed prefix = b'h' + tx_hash[:4] + idx_packed
candidates = {db_key: hashX for db_key, hashX candidates = {db_key: hashX for db_key, hashX
in self.db.iterator(prefix=prefix)} in self.utxo_db.iterator(prefix=prefix)}
for hdb_key, hashX in candidates.items(): for hdb_key, hashX in candidates.items():
tx_num_packed = hdb_key[-4:] tx_num_packed = hdb_key[-4:]
@ -716,7 +681,7 @@ class BlockProcessor(server.db.DB):
# Key: b'u' + address_hashX + tx_idx + tx_num # Key: b'u' + address_hashX + tx_idx + tx_num
# Value: the UTXO value as a 64-bit unsigned integer # Value: the UTXO value as a 64-bit unsigned integer
udb_key = b'u' + hashX + hdb_key[-6:] udb_key = b'u' + hashX + hdb_key[-6:]
utxo_value_packed = self.db.get(udb_key) utxo_value_packed = self.utxo_db.get(udb_key)
if utxo_value_packed: if utxo_value_packed:
# Remove both entries for this UTXO # Remove both entries for this UTXO
self.db_deletes.append(hdb_key) self.db_deletes.append(hdb_key)
@ -733,9 +698,10 @@ class BlockProcessor(server.db.DB):
# may be in the DB already. # may be in the DB already.
flush_start = time.time() flush_start = time.time()
delete_count = len(self.db_deletes) // 2 delete_count = len(self.db_deletes) // 2
utxo_cache_len = len(self.utxo_cache)
batch_delete = batch.delete batch_delete = batch.delete
for key in self.db_deletes: for key in sorted(self.db_deletes):
batch_delete(key) batch_delete(key)
self.db_deletes = [] self.db_deletes = []
@ -747,12 +713,12 @@ class BlockProcessor(server.db.DB):
batch_put(b'h' + cache_key[:4] + suffix, hashX) batch_put(b'h' + cache_key[:4] + suffix, hashX)
batch_put(b'u' + hashX + suffix, cache_value[-8:]) batch_put(b'u' + hashX + suffix, cache_value[-8:])
if self.first_sync: if self.utxo_db.for_sync:
self.logger.info('flushed {:,d} blocks with {:,d} txs, {:,d} UTXO ' self.logger.info('flushed {:,d} blocks with {:,d} txs, {:,d} UTXO '
'adds, {:,d} spends in {:.1f}s, committing...' 'adds, {:,d} spends in {:.1f}s, committing...'
.format(self.height - self.db_height, .format(self.height - self.db_height,
self.tx_count - self.db_tx_count, self.tx_count - self.db_tx_count,
len(self.utxo_cache), delete_count, utxo_cache_len, delete_count,
time.time() - flush_start)) time.time() - flush_start))
self.utxo_cache = {} self.utxo_cache = {}

281
server/db.py

@ -13,12 +13,12 @@ import ast
import itertools import itertools
import os import os
from struct import pack, unpack from struct import pack, unpack
from bisect import bisect_right from bisect import bisect_left, bisect_right
from collections import namedtuple from collections import namedtuple
import lib.util as util import lib.util as util
from lib.hash import hash_to_str from lib.hash import hash_to_str
from server.storage import open_db from server.storage import db_class
from server.version import VERSION from server.version import VERSION
@ -31,7 +31,7 @@ class DB(util.LoggedClass):
it was shutdown uncleanly. it was shutdown uncleanly.
''' '''
DB_VERSIONS = [4] DB_VERSIONS = [5]
class MissingUTXOError(Exception): class MissingUTXOError(Exception):
'''Raised if a mempool tx input UTXO couldn't be found.''' '''Raised if a mempool tx input UTXO couldn't be found.'''
@ -48,8 +48,13 @@ class DB(util.LoggedClass):
.format(env.db_dir)) .format(env.db_dir))
os.chdir(env.db_dir) os.chdir(env.db_dir)
self.db = None self.db_class = db_class(self.env.db_engine)
self.open_db(for_sync=False) self.logger.info('using {} for DB backend'.format(self.env.db_engine))
self.utxo_db = None
self.open_dbs()
self.clean_db()
self.logger.info('reorg limit is {:,d} blocks' self.logger.info('reorg limit is {:,d} blocks'
.format(self.env.reorg_limit)) .format(self.env.reorg_limit))
@ -67,67 +72,68 @@ class DB(util.LoggedClass):
assert self.db_tx_count == self.tx_counts[-1] assert self.db_tx_count == self.tx_counts[-1]
else: else:
assert self.db_tx_count == 0 assert self.db_tx_count == 0
self.clean_db()
def open_db(self, for_sync): def open_dbs(self):
'''Open the database. If the database is already open, it is '''Open the databases. If already open they are closed and re-opened.
closed and re-opened.
If for_sync is True, it is opened for sync (high number of open When syncing we want to reserve a lot of open files for the
file, etc.) synchtonization. When serving clients we want the open files for
Re-open to set the maximum number of open files appropriately. serving network connections.
''' '''
def log_reason(message, is_for_sync): def log_reason(message, is_for_sync):
reason = 'sync' if is_for_sync else 'serving' reason = 'sync' if is_for_sync else 'serving'
self.logger.info('{} for {}'.format(message, reason)) self.logger.info('{} for {}'.format(message, reason))
if self.db: # Assume we're serving until we find out otherwise
if self.db.for_sync == for_sync: for for_sync in [False, True]:
return if self.utxo_db:
log_reason('closing DB to re-open', for_sync) if self.utxo_db.for_sync == for_sync:
self.db.close() return
log_reason('closing DB to re-open', for_sync)
# Open DB and metadata files. Record some of its state. self.utxo_db.close()
self.db = open_db('db', self.env.db_engine, for_sync) self.hist_db.close()
if self.db.is_new:
self.logger.info('created new {} database' # Open DB and metadata files. Record some of its state.
.format(self.env.db_engine)) self.utxo_db = self.db_class('utxo', for_sync)
self.logger.info('creating metadata diretcory') self.hist_db = self.db_class('hist', for_sync)
os.mkdir('meta') if self.utxo_db.is_new:
with self.open_file('COIN', create=True) as f: self.logger.info('created new database')
f.write('ElectrumX DB and metadata files for {} {}' self.logger.info('creating metadata diretcory')
.format(self.coin.NAME, self.coin.NET).encode()) os.mkdir('meta')
else: with self.open_file('COIN', create=True) as f:
log_reason('opened {} database'.format(self.env.db_engine), f.write('ElectrumX databases and metadata for {} {}'
self.db.for_sync) .format(self.coin.NAME, self.coin.NET).encode())
else:
self.read_state() log_reason('opened DB', self.utxo_db.for_sync)
if self.first_sync == self.db.for_sync:
self.logger.info('software version: {}'.format(VERSION)) self.read_utxo_state()
self.logger.info('DB version: {:d}'.format(self.db_version)) if self.first_sync == self.utxo_db.for_sync:
self.logger.info('coin: {}'.format(self.coin.NAME)) break
self.logger.info('network: {}'.format(self.coin.NET))
self.logger.info('height: {:,d}'.format(self.db_height))
self.logger.info('tip: {}'.format(hash_to_str(self.db_tip)))
self.logger.info('tx count: {:,d}'.format(self.db_tx_count))
if self.first_sync:
self.logger.info('sync time so far: {}'
.format(util.formatted_time(self.wall_time)))
else:
self.open_db(self.first_sync)
def read_state(self): self.read_history_state()
if self.db.is_new:
self.logger.info('software version: {}'.format(VERSION))
self.logger.info('DB version: {:d}'.format(self.db_version))
self.logger.info('coin: {}'.format(self.coin.NAME))
self.logger.info('network: {}'.format(self.coin.NET))
self.logger.info('height: {:,d}'.format(self.db_height))
self.logger.info('tip: {}'.format(hash_to_str(self.db_tip)))
self.logger.info('tx count: {:,d}'.format(self.db_tx_count))
if self.first_sync:
self.logger.info('sync time so far: {}'
.format(util.formatted_time(self.wall_time)))
def read_utxo_state(self):
if self.utxo_db.is_new:
self.db_height = -1 self.db_height = -1
self.db_tx_count = 0 self.db_tx_count = 0
self.db_tip = b'\0' * 32 self.db_tip = b'\0' * 32
self.db_version = max(self.DB_VERSIONS) self.db_version = max(self.DB_VERSIONS)
self.flush_count = 0
self.utxo_flush_count = 0 self.utxo_flush_count = 0
self.wall_time = 0 self.wall_time = 0
self.first_sync = True self.first_sync = True
else: else:
state = self.db.get(b'state') state = self.utxo_db.get(b'state')
if state: if state:
state = ast.literal_eval(state.decode()) state = ast.literal_eval(state.decode())
if not isinstance(state, dict): if not isinstance(state, dict):
@ -144,22 +150,17 @@ class DB(util.LoggedClass):
self.db_height = state['height'] self.db_height = state['height']
self.db_tx_count = state['tx_count'] self.db_tx_count = state['tx_count']
self.db_tip = state['tip'] self.db_tip = state['tip']
self.flush_count = state['flush_count']
self.utxo_flush_count = state['utxo_flush_count'] self.utxo_flush_count = state['utxo_flush_count']
self.wall_time = state['wall_time'] self.wall_time = state['wall_time']
self.first_sync = state['first_sync'] self.first_sync = state['first_sync']
if self.flush_count < self.utxo_flush_count:
raise self.DBError('DB corrupt: flush_count < utxo_flush_count')
def write_state(self, batch): def write_state(self, batch):
'''Write chain state to the batch.''' '''Write (UTXO) state to the batch.'''
state = { state = {
'genesis': self.coin.GENESIS_HASH, 'genesis': self.coin.GENESIS_HASH,
'height': self.db_height, 'height': self.db_height,
'tx_count': self.db_tx_count, 'tx_count': self.db_tx_count,
'tip': self.db_tip, 'tip': self.db_tip,
'flush_count': self.flush_count,
'utxo_flush_count': self.utxo_flush_count, 'utxo_flush_count': self.utxo_flush_count,
'wall_time': self.wall_time, 'wall_time': self.wall_time,
'first_sync': self.first_sync, 'first_sync': self.first_sync,
@ -174,48 +175,28 @@ class DB(util.LoggedClass):
recent UTXO flush (only happens on unclean shutdown), and aged recent UTXO flush (only happens on unclean shutdown), and aged
undo information. undo information.
''' '''
if self.flush_count < self.utxo_flush_count:
raise self.DBError('DB corrupt: flush_count < utxo_flush_count')
if self.flush_count > self.utxo_flush_count: if self.flush_count > self.utxo_flush_count:
self.utxo_flush_count = self.flush_count self.clear_excess_history(self.utxo_flush_count)
self.logger.info('DB shut down uncleanly. Scanning for '
'excess history flushes...')
history_keys = self.excess_history_keys()
self.logger.info('deleting {:,d} history entries'
.format(len(history_keys)))
else:
history_keys = []
undo_keys = self.stale_undo_keys()
if undo_keys:
self.logger.info('deleting {:,d} stale undo entries'
.format(len(undo_keys)))
with self.db.write_batch() as batch:
batch_delete = batch.delete
for key in history_keys:
batch_delete(key)
for key in undo_keys:
batch_delete(key)
self.write_state(batch)
def excess_history_keys(self):
prefix = b'H'
keys = []
for key, hist in self.db.iterator(prefix=prefix):
flush_id, = unpack('>H', key[-2:])
if flush_id > self.utxo_flush_count:
keys.append(key)
return keys
def stale_undo_keys(self): # Remove stale undo information
prefix = b'U' prefix = b'U'
cutoff = self.db_height - self.env.reorg_limit cutoff = self.db_height - self.env.reorg_limit
keys = [] keys = []
for key, hist in self.db.iterator(prefix=prefix): for key, hist in self.utxo_db.iterator(prefix=prefix):
height, = unpack('>I', key[-4:]) height, = unpack('>I', key[-4:])
if height > cutoff: if height > cutoff:
break break
keys.append(key) keys.append(key)
return keys if keys:
self.logger.info('deleting {:,d} stale undo entries'
.format(len(keys)))
with self.utxo_db.write_batch() as batch:
for key in keys:
batch.delete(key)
self.write_state(batch)
def undo_key(self, height): def undo_key(self, height):
'''DB key for undo information at the given height.''' '''DB key for undo information at the given height.'''
@ -223,11 +204,11 @@ class DB(util.LoggedClass):
def write_undo_info(self, height, undo_info): def write_undo_info(self, height, undo_info):
'''Write out undo information for the current height.''' '''Write out undo information for the current height.'''
self.db.put(self.undo_key(height), undo_info) self.utxo_db.put(self.undo_key(height), undo_info)
def read_undo_info(self, height): def read_undo_info(self, height):
'''Read undo information from a file for the current height.''' '''Read undo information from a file for the current height.'''
return self.db.get(self.undo_key(height)) return self.utxo_db.get(self.undo_key(height))
def open_file(self, filename, create=False): def open_file(self, filename, create=False):
'''Open the file name. Return its handle.''' '''Open the file name. Return its handle.'''
@ -308,24 +289,6 @@ class DB(util.LoggedClass):
assert isinstance(limit, int) and limit >= 0 assert isinstance(limit, int) and limit >= 0
return limit return limit
def get_history(self, hashX, limit=1000):
'''Generator that returns an unpruned, sorted list of (tx_hash,
height) tuples of confirmed transactions that touched the address,
earliest in the blockchain first. Includes both spending and
receiving transactions. By default yields at most 1000 entries.
Set limit to None to get them all.
'''
limit = self._resolve_limit(limit)
prefix = b'H' + hashX
for key, hist in self.db.iterator(prefix=prefix):
a = array.array('I')
a.frombytes(hist)
for tx_num in a:
if limit == 0:
return
yield self.fs_tx_hash(tx_num)
limit -= 1
def get_balance(self, hashX): def get_balance(self, hashX):
'''Returns the confirmed balance of an address.''' '''Returns the confirmed balance of an address.'''
return sum(utxo.value for utxo in self.get_utxos(hashX, limit=None)) return sum(utxo.value for utxo in self.get_utxos(hashX, limit=None))
@ -340,7 +303,7 @@ class DB(util.LoggedClass):
# Key: b'u' + address_hashX + tx_idx + tx_num # Key: b'u' + address_hashX + tx_idx + tx_num
# Value: the UTXO value as a 64-bit unsigned integer # Value: the UTXO value as a 64-bit unsigned integer
prefix = b'u' + hashX prefix = b'u' + hashX
for db_key, db_value in self.db.iterator(prefix=prefix): for db_key, db_value in self.utxo_db.iterator(prefix=prefix):
if limit == 0: if limit == 0:
return return
limit -= 1 limit -= 1
@ -358,7 +321,7 @@ class DB(util.LoggedClass):
prefix = b'h' + tx_hash[:4] + idx_packed prefix = b'h' + tx_hash[:4] + idx_packed
# Find which entry, if any, the TX_HASH matches. # Find which entry, if any, the TX_HASH matches.
for db_key, hashX in self.db.iterator(prefix=prefix): for db_key, hashX in self.utxo_db.iterator(prefix=prefix):
tx_num_packed = db_key[-4:] tx_num_packed = db_key[-4:]
tx_num, = unpack('<I', tx_num_packed) tx_num, = unpack('<I', tx_num_packed)
hash, height = self.fs_tx_hash(tx_num) hash, height = self.fs_tx_hash(tx_num)
@ -383,9 +346,103 @@ class DB(util.LoggedClass):
# Key: b'u' + address_hashX + tx_idx + tx_num # Key: b'u' + address_hashX + tx_idx + tx_num
# Value: the UTXO value as a 64-bit unsigned integer # Value: the UTXO value as a 64-bit unsigned integer
key = b'u' + hashX + idx_packed + tx_num_packed key = b'u' + hashX + idx_packed + tx_num_packed
db_value = self.db.get(key) db_value = self.utxo_db.get(key)
if not db_value: if not db_value:
raise self.DBError('UTXO {} / {:,d} in one table only' raise self.DBError('UTXO {} / {:,d} in one table only'
.format(hash_to_str(tx_hash), tx_idx)) .format(hash_to_str(tx_hash), tx_idx))
value, = unpack('<Q', db_value) value, = unpack('<Q', db_value)
return hashX, value return hashX, value
# -- History database
def clear_excess_history(self, flush_count):
self.logger.info('DB shut down uncleanly. Scanning for '
'excess history flushes...')
keys = []
for key, hist in self.hist_db.iterator(prefix=b''):
flush_id, = unpack('>H', key[-2:])
if flush_id > flush_count:
keys.append(key)
self.logger.info('deleting {:,d} history entries'.format(len(keys)))
self.flush_count = flush_count
with self.hist_db.write_batch() as batch:
for key in keys:
batch.delete(key)
self.write_history_state(batch)
self.logger.info('deleted excess history entries')
def write_history_state(self, batch):
state = {'flush_count': self.flush_count}
# History entries are not prefixed; the suffix \0\0 ensures we
# look similar to other entries and aren't interfered with
batch.put(b'state\0\0', repr(state).encode())
def read_history_state(self):
state = self.hist_db.get(b'state\0\0')
if state:
state = ast.literal_eval(state.decode())
if not isinstance(state, dict):
raise self.DBError('failed reading state from history DB')
self.flush_count = state['flush_count']
else:
self.flush_count = 0
def flush_history(self, history):
self.flush_count += 1
flush_id = pack('>H', self.flush_count)
with self.hist_db.write_batch() as batch:
for hashX in sorted(history):
key = hashX + flush_id
batch.put(key, history[hashX].tobytes())
self.write_history_state(batch)
def backup_history(self, hashXs):
# Not certain this is needed, but it doesn't hurt
self.flush_count += 1
nremoves = 0
with self.hist_db.write_batch() as batch:
for hashX in sorted(hashXs):
deletes = []
puts = {}
for key, hist in self.hist_db.iterator(prefix=hashX,
reverse=True):
a = array.array('I')
a.frombytes(hist)
# Remove all history entries >= self.tx_count
idx = bisect_left(a, self.tx_count)
nremoves += len(a) - idx
if idx > 0:
puts[key] = a[:idx].tobytes()
break
deletes.append(key)
for key in deletes:
batch.delete(key)
for key, value in puts.items():
batch.put(key, value)
self.write_history_state(batch)
return nremoves
def get_history(self, hashX, limit=1000):
'''Generator that returns an unpruned, sorted list of (tx_hash,
height) tuples of confirmed transactions that touched the address,
earliest in the blockchain first. Includes both spending and
receiving transactions. By default yields at most 1000 entries.
Set limit to None to get them all.
'''
limit = self._resolve_limit(limit)
for key, hist in self.hist_db.iterator(prefix=hashX):
a = array.array('I')
a.frombytes(hist)
for tx_num in a:
if limit == 0:
return
yield self.fs_tx_hash(tx_num)
limit -= 1

2
server/env.py

@ -27,7 +27,7 @@ class Env(LoggedClass):
network = self.default('NETWORK', 'mainnet') network = self.default('NETWORK', 'mainnet')
self.coin = Coin.lookup_coin_class(coin_name, network) self.coin = Coin.lookup_coin_class(coin_name, network)
self.db_dir = self.required('DB_DIRECTORY') self.db_dir = self.required('DB_DIRECTORY')
self.cache_MB = self.integer('CACHE_MB', 1250) self.cache_MB = self.integer('CACHE_MB', 1200)
self.host = self.default('HOST', 'localhost') self.host = self.default('HOST', 'localhost')
self.reorg_limit = self.integer('REORG_LIMIT', self.coin.REORG_LIMIT) self.reorg_limit = self.integer('REORG_LIMIT', self.coin.REORG_LIMIT)
self.daemon_url = self.required('DAEMON_URL') self.daemon_url = self.required('DAEMON_URL')

16
server/storage.py

@ -15,15 +15,13 @@ from functools import partial
from lib.util import subclasses, increment_byte_string from lib.util import subclasses, increment_byte_string
def db_class(name):
def open_db(name, db_engine, for_sync): '''Returns a DB engine class.'''
'''Returns a database handle.'''
for db_class in subclasses(Storage): for db_class in subclasses(Storage):
if db_class.__name__.lower() == db_engine.lower(): if db_class.__name__.lower() == name.lower():
db_class.import_module() db_class.import_module()
return db_class(name, for_sync) return db_class
raise RuntimeError('unrecognised DB engine "{}"'.format(name))
raise RuntimeError('unrecognised DB engine "{}"'.format(db_engine))
class Storage(object): class Storage(object):
@ -81,7 +79,7 @@ class LevelDB(Storage):
cls.module = plyvel cls.module = plyvel
def open(self, name, create): def open(self, name, create):
mof = 1024 if self.for_sync else 256 mof = 512 if self.for_sync else 128
self.db = self.module.DB(name, create_if_missing=create, self.db = self.module.DB(name, create_if_missing=create,
max_open_files=mof, compression=None) max_open_files=mof, compression=None)
self.close = self.db.close self.close = self.db.close
@ -101,7 +99,7 @@ class RocksDB(Storage):
cls.module = rocksdb cls.module = rocksdb
def open(self, name, create): def open(self, name, create):
mof = 1024 if self.for_sync else 256 mof = 512 if self.for_sync else 128
compression = "no" compression = "no"
compression = getattr(self.module.CompressionType, compression = getattr(self.module.CompressionType,
compression + "_compression") compression + "_compression")

Loading…
Cancel
Save