Browse Source

Implement history compression with tests.

Still to do: running compression in background when the flush
count reaches a certain level
master
Neil Booth 8 years ago
parent
commit
2f26e81629
  1. 76
      compact_history.py
  2. 7
      server/block_processor.py
  3. 180
      server/db.py
  4. 131
      tests/server/test_compaction.py

76
compact_history.py

@ -0,0 +1,76 @@
#!/usr/bin/env python3
#
# Copyright (c) 2017, Neil Booth
#
# All rights reserved.
#
# See the file "LICENCE" for information about the copyright
# and warranty status of this software.
'''Script to compact the history database. This should save space and
will reset the flush counter to a low number, avoiding overflow when
the flush count reaches 65,536.
This needs to lock the database so ElectrumX must not be running -
shut it down cleanly first.
It is recommended you run this script with the same environment as
ElectrumX. However it is intended to be runnable with just
DB_DIRECTORY and COIN set (COIN defaults as for ElectrumX).
If you use daemon tools, you might run this script like so:
envdir /path/to/the/environment/directory ./compact_history.py
Depending on your hardware this script may take up to 6 hours to
complete; it logs progress regularly.
Compaction can be interrupted and restarted harmlessly and will pick
up where it left off. However, if you restart ElectrumX without
running the compaction to completion, it will not benefit and
subsequent compactions will restart from the beginning.
'''
import logging
import sys
import traceback
from os import environ
from server.env import Env
from server.db import DB
def compact_history():
if sys.version_info < (3, 5, 3):
raise RuntimeError('Python >= 3.5.3 is required to run ElectrumX')
environ['DAEMON_URL'] = '' # Avoid Env erroring out
env = Env()
db = DB(env)
assert not db.first_sync
# Continue where we left off, if interrupted
if db.comp_cursor == -1:
db.comp_cursor = 0
db.comp_flush_count = max(db.comp_flush_count, 1)
limit = 8 * 1000 * 1000
while db.comp_cursor != -1:
db._compact_history(limit)
def main():
logging.basicConfig(level=logging.INFO)
logging.info('Starting history compaction...')
try:
compact_history()
except Exception:
traceback.print_exc()
logging.critical('History compaction terminated abnormally')
else:
logging.info('History compaction complete')
if __name__ == '__main__':
main()

7
server/block_processor.py

@ -142,6 +142,11 @@ class BlockProcessor(server.db.DB):
def __init__(self, env, controller, daemon): def __init__(self, env, controller, daemon):
super().__init__(env) super().__init__(env)
# An incomplete compaction needs to be cancelled otherwise
# restarting it will corrupt the history
self.cancel_history_compaction()
self.daemon = daemon self.daemon = daemon
self.controller = controller self.controller = controller
@ -332,7 +337,7 @@ class BlockProcessor(server.db.DB):
self.wall_time += now - self.last_flush self.wall_time += now - self.last_flush
self.last_flush = now self.last_flush = now
self.last_flush_tx_count = self.tx_count self.last_flush_tx_count = self.tx_count
self.write_state(batch) self.utxo_write_state(batch)
def assert_flushed(self): def assert_flushed(self):
'''Asserts state is fully flushed.''' '''Asserts state is fully flushed.'''

180
server/db.py

@ -60,6 +60,9 @@ class DB(util.LoggedClass):
self.db_class = db_class(self.env.db_engine) self.db_class = db_class(self.env.db_engine)
self.logger.info('using {} for DB backend'.format(self.env.db_engine)) self.logger.info('using {} for DB backend'.format(self.env.db_engine))
# For history compaction
self.max_hist_row_entries = 12500
self.utxo_db = None self.utxo_db = None
self.open_dbs() self.open_dbs()
self.clean_db() self.clean_db()
@ -134,6 +137,7 @@ class DB(util.LoggedClass):
self.logger.info('height: {:,d}'.format(self.db_height)) self.logger.info('height: {:,d}'.format(self.db_height))
self.logger.info('tip: {}'.format(hash_to_str(self.db_tip))) self.logger.info('tip: {}'.format(hash_to_str(self.db_tip)))
self.logger.info('tx count: {:,d}'.format(self.db_tx_count)) self.logger.info('tx count: {:,d}'.format(self.db_tx_count))
self.logger.info('flush count: {:,d}'.format(self.flush_count))
if self.first_sync: if self.first_sync:
self.logger.info('sync time so far: {}' self.logger.info('sync time so far: {}'
.format(util.formatted_time(self.wall_time))) .format(util.formatted_time(self.wall_time)))
@ -172,7 +176,7 @@ class DB(util.LoggedClass):
self.wall_time = state['wall_time'] self.wall_time = state['wall_time']
self.first_sync = state['first_sync'] self.first_sync = state['first_sync']
def write_state(self, batch): def utxo_write_state(self, batch):
'''Write (UTXO) state to the batch.''' '''Write (UTXO) state to the batch.'''
state = { state = {
'genesis': self.coin.GENESIS_HASH, 'genesis': self.coin.GENESIS_HASH,
@ -194,7 +198,9 @@ class DB(util.LoggedClass):
undo information. undo information.
''' '''
if self.flush_count < self.utxo_flush_count: if self.flush_count < self.utxo_flush_count:
raise self.DBError('DB corrupt: flush_count < utxo_flush_count') # Might happen at end of compaction as both DBs cannot be
# updated atomically
self.utxo_flush_count = self.flush_count
if self.flush_count > self.utxo_flush_count: if self.flush_count > self.utxo_flush_count:
self.clear_excess_history(self.utxo_flush_count) self.clear_excess_history(self.utxo_flush_count)
self.clear_excess_undo_info() self.clear_excess_undo_info()
@ -417,7 +423,12 @@ class DB(util.LoggedClass):
self.logger.info('deleted excess history entries') self.logger.info('deleted excess history entries')
def write_history_state(self, batch): def write_history_state(self, batch):
state = {'flush_count': self.flush_count} '''Write state to hist_db.'''
state = {
'flush_count': self.flush_count,
'comp_flush_count': self.comp_flush_count,
'comp_cursor': self.comp_cursor,
}
# History entries are not prefixed; the suffix \0\0 ensures we # History entries are not prefixed; the suffix \0\0 ensures we
# look similar to other entries and aren't interfered with # look similar to other entries and aren't interfered with
batch.put(b'state\0\0', repr(state).encode()) batch.put(b'state\0\0', repr(state).encode())
@ -429,8 +440,12 @@ class DB(util.LoggedClass):
if not isinstance(state, dict): if not isinstance(state, dict):
raise self.DBError('failed reading state from history DB') raise self.DBError('failed reading state from history DB')
self.flush_count = state['flush_count'] self.flush_count = state['flush_count']
self.comp_flush_count = state.get('comp_flush_count', -1)
self.comp_cursor = state.get('comp_cursor', -1)
else: else:
self.flush_count = 0 self.flush_count = 0
self.comp_flush_count = -1
self.comp_cursor = -1
def flush_history(self, history): def flush_history(self, history):
self.flush_count += 1 self.flush_count += 1
@ -495,3 +510,162 @@ class DB(util.LoggedClass):
''' '''
for tx_num in self.get_history_txnums(hashX, limit): for tx_num in self.get_history_txnums(hashX, limit):
yield self.fs_tx_hash(tx_num) yield self.fs_tx_hash(tx_num)
#
# History compaction
#
# comp_cursor is a cursor into compaction progress.
# -1: no compaction in progress
# 0-65535: Compaction in progress; all prefixes < comp_cursor have
# been compacted, and later ones have not.
# 65536: compaction complete in-memory but not flushed
#
# comp_flush_count applies during compaction, and is a flush count
# for history with prefix < comp_cursor. flush_count applies
# to still uncompacted history. It is -1 when no compaction is
# taking place. Key suffixes up to and including comp_flush_count
# are used, so a parallel history flush must first increment this
#
# When compaction is complete and the final flush takes place,
# flush_count is reset to comp_flush_count, and comp_flush_count to -1
def _flush_compaction(self, cursor, write_items, keys_to_delete):
'''Flush a single compaction pass as a batch.'''
# Update compaction state
if cursor == 65536:
self.flush_count = self.comp_flush_count
self.comp_cursor = -1
self.comp_flush_count = -1
else:
self.comp_cursor = cursor
# History DB. Flush compacted history and updated state
with self.hist_db.write_batch() as batch:
# Important: delete first! The keyspace may overlap.
for key in keys_to_delete:
batch.delete(key)
for key, value in write_items:
batch.put(key, value)
self.write_history_state(batch)
# If compaction was completed also update the UTXO flush count
if cursor == 65536:
self.utxo_flush_count = self.flush_count
with self.utxo_db.write_batch() as batch:
self.utxo_write_state(batch)
def _compact_hashX(self, hashX, hist_map, hist_list,
write_items, keys_to_delete):
'''Compres history for a hashX. hist_list is an ordered list of
the histories to be compressed.'''
# History entries (tx numbers) are 4 bytes each. Distribute
# over rows of up to 50KB in size. A fixed row size means
# future compactions will not need to update the first N - 1
# rows.
max_row_size = self.max_hist_row_entries * 4
full_hist = b''.join(hist_list)
nrows = (len(full_hist) + max_row_size - 1) // max_row_size
if nrows > 4:
self.log_info('hashX {} is large: {:,d} entries across {:,d} rows'
.format(hash_to_str(hashX), len(full_hist) // 4,
nrows));
# Find what history needs to be written, and what keys need to
# be deleted. Start by assuming all keys are to be deleted,
# and then remove those that are the same on-disk as when
# compacted.
write_size = 0
keys_to_delete.update(hist_map)
for n, chunk in enumerate(util.chunks(full_hist, max_row_size)):
key = hashX + pack('>H', n)
if hist_map.get(key) == chunk:
keys_to_delete.remove(key)
else:
write_items.append((key, chunk))
write_size += len(chunk)
assert n + 1 == nrows
self.comp_flush_count = max(self.comp_flush_count, n)
return write_size
def _compact_prefix(self, prefix, write_items, keys_to_delete):
'''Compact all history entries for hashXs beginning with the
given prefix. Update keys_to_delete and write.'''
prior_hashX = None
hist_map = {}
hist_list = []
key_len = self.coin.HASHX_LEN + 2
write_size = 0
for key, hist in self.hist_db.iterator(prefix=prefix):
# Ignore non-history entries
if len(key) != key_len:
continue
hashX = key[:-2]
if hashX != prior_hashX and prior_hashX:
write_size += self._compact_hashX(prior_hashX, hist_map,
hist_list, write_items,
keys_to_delete)
hist_map.clear()
hist_list.clear()
prior_hashX = hashX
hist_map[key] = hist
hist_list.append(hist)
if prior_hashX:
write_size += self._compact_hashX(prior_hashX, hist_map, hist_list,
write_items, keys_to_delete)
return write_size
def _compact_history(self, limit):
'''Inner loop of history compaction. Loops until limit bytes have
been processed.
'''
keys_to_delete = set()
write_items = [] # A list of (key, value) pairs
write_size = 0
# Loop over 2-byte prefixes
cursor = self.comp_cursor
while write_size < limit and cursor < 65536:
prefix = pack('>H', cursor)
write_size += self._compact_prefix(prefix, write_items,
keys_to_delete)
cursor += 1
max_rows = self.comp_flush_count + 1
self._flush_compaction(cursor, write_items, keys_to_delete)
self.log_info('history compaction: wrote {:,d} rows ({:.1f} MB), '
'removed {:,d} rows, largest: {:,d}, {:.1f}% complete'
.format(len(write_items), write_size / 1000000,
len(keys_to_delete), max_rows,
100 * cursor / 65536))
return write_size
async def compact_history(self, loop):
'''Start a background history compaction and reset the flush count if
its getting high.
'''
# Do nothing if during initial sync or if a compaction hasn't
# been initiated
if self.first_sync or self.comp_cursor == -1:
return
self.comp_flush_count = max(self.comp_flush_count, 1)
limit = 50 * 1000 * 1000
while self.comp_cursor != -1:
locked = self.semaphore.locked
if self.semaphore.locked:
self.log_info('compact_history: waiting on semaphore...')
with await self.semaphore:
await loop.run_in_executor(None, self._compact_history, limit)
def cancel_history_compaction(self):
if self.comp_cursor != -1:
self.logger.warning('cancelling in-progress history compaction')
self.comp_flush_count = -1
self.comp_cursor = -1

131
tests/server/test_compaction.py

@ -0,0 +1,131 @@
# Test of compaction code in server/db.py
import array
from collections import defaultdict
from os import environ, urandom
from struct import pack
import random
from lib.hash import hash_to_str
from server.env import Env
from server.db import DB
def create_histories(db, hashX_count=100):
'''Creates a bunch of random transaction histories, and write them
to disk in a series of small flushes.'''
hashXs = [urandom(db.coin.HASHX_LEN) for n in range(hashX_count)]
mk_array = lambda : array.array('I')
histories = {hashX : mk_array() for hashX in hashXs}
this_history = defaultdict(mk_array)
tx_num = 0
while hashXs:
hash_indexes = set(random.randrange(len(hashXs))
for n in range(1 + random.randrange(4)))
for index in hash_indexes:
histories[hashXs[index]].append(tx_num)
this_history[hashXs[index]].append(tx_num)
tx_num += 1
# Occasionally flush and drop a random hashX if non-empty
if random.random() < 0.1:
db.flush_history(this_history)
this_history.clear()
index = random.randrange(0, len(hashXs))
if histories[hashXs[index]]:
del hashXs[index]
return histories
def check_hashX_compaction(db):
db.max_hist_row_entries = 40
row_size = db.max_hist_row_entries * 4
full_hist = array.array('I', range(100)).tobytes()
hashX = urandom(db.coin.HASHX_LEN)
pairs = ((1, 20), (26, 50), (56, 30))
cum = 0
hist_list = []
hist_map = {}
for flush_count, count in pairs:
key = hashX + pack('>H', flush_count)
hist = full_hist[cum * 4: (cum+count) * 4]
hist_map[key] = hist
hist_list.append(hist)
cum += count
write_items = []
keys_to_delete = set()
write_size = db._compact_hashX(hashX, hist_map, hist_list,
write_items, keys_to_delete)
# Check results for sanity
assert write_size == len(full_hist)
assert len(write_items) == 3
assert len(keys_to_delete) == 3
assert len(hist_map) == len(pairs)
for n, item in enumerate(write_items):
assert item == (hashX + pack('>H', n),
full_hist[n * row_size: (n + 1) * row_size])
for flush_count, count in pairs:
assert hashX + pack('>H', flush_count) in keys_to_delete
# Check re-compaction is null
hist_map = {key: value for key, value in write_items}
hist_list = [value for key, value in write_items]
write_items.clear()
keys_to_delete.clear()
write_size = db._compact_hashX(hashX, hist_map, hist_list,
write_items, keys_to_delete)
assert write_size == 0
assert len(write_items) == 0
assert len(keys_to_delete) == 0
assert len(hist_map) == len(pairs)
# Check re-compaction adding a single tx writes the one row
hist_list[-1] += array.array('I', [100]).tobytes()
write_size = db._compact_hashX(hashX, hist_map, hist_list,
write_items, keys_to_delete)
assert write_size == len(hist_list[-1])
assert write_items == [(hashX + pack('>H', 2), hist_list[-1])]
assert len(keys_to_delete) == 1
assert write_items[0][0] in keys_to_delete
assert len(hist_map) == len(pairs)
def check_written(db, histories):
for hashX, hist in histories.items():
db_hist = array.array('I', db.get_history_txnums(hashX, limit=None))
assert hist == db_hist
def compact_history(db):
'''Synchronously compact the DB history.'''
db.first_sync = False
db.comp_cursor = 0
db.comp_flush_count = max(db.comp_flush_count, 1)
limit = 5 * 1000
write_size = 0
while db.comp_cursor != -1:
write_size += db._compact_history(limit)
assert write_size != 0
def run_test(db_dir):
environ.clear()
environ['DB_DIRECTORY'] = db_dir
environ['DAEMON_URL'] = ''
env = Env()
db = DB(env)
# Test abstract compaction
check_hashX_compaction(db)
# Now test in with random data
histories = create_histories(db)
check_written(db, histories)
compact_history(db)
check_written(db, histories)
def test_compaction(tmpdir):
db_dir = str(tmpdir)
print('Temp dir: {}'.format(db_dir))
run_test(db_dir)
Loading…
Cancel
Save