diff --git a/compact_history.py b/compact_history.py new file mode 100755 index 0000000..32e0cbb --- /dev/null +++ b/compact_history.py @@ -0,0 +1,76 @@ +#!/usr/bin/env python3 +# +# Copyright (c) 2017, Neil Booth +# +# All rights reserved. +# +# See the file "LICENCE" for information about the copyright +# and warranty status of this software. + +'''Script to compact the history database. This should save space and +will reset the flush counter to a low number, avoiding overflow when +the flush count reaches 65,536. + +This needs to lock the database so ElectrumX must not be running - +shut it down cleanly first. + +It is recommended you run this script with the same environment as +ElectrumX. However it is intended to be runnable with just +DB_DIRECTORY and COIN set (COIN defaults as for ElectrumX). + +If you use daemon tools, you might run this script like so: + + envdir /path/to/the/environment/directory ./compact_history.py + +Depending on your hardware this script may take up to 6 hours to +complete; it logs progress regularly. + +Compaction can be interrupted and restarted harmlessly and will pick +up where it left off. However, if you restart ElectrumX without +running the compaction to completion, it will not benefit and +subsequent compactions will restart from the beginning. +''' + +import logging +import sys +import traceback +from os import environ + +from server.env import Env +from server.db import DB + + +def compact_history(): + if sys.version_info < (3, 5, 3): + raise RuntimeError('Python >= 3.5.3 is required to run ElectrumX') + + environ['DAEMON_URL'] = '' # Avoid Env erroring out + env = Env() + db = DB(env) + + assert not db.first_sync + # Continue where we left off, if interrupted + if db.comp_cursor == -1: + db.comp_cursor = 0 + + db.comp_flush_count = max(db.comp_flush_count, 1) + limit = 8 * 1000 * 1000 + + while db.comp_cursor != -1: + db._compact_history(limit) + + +def main(): + logging.basicConfig(level=logging.INFO) + logging.info('Starting history compaction...') + try: + compact_history() + except Exception: + traceback.print_exc() + logging.critical('History compaction terminated abnormally') + else: + logging.info('History compaction complete') + + +if __name__ == '__main__': + main() diff --git a/server/block_processor.py b/server/block_processor.py index fe26f63..63e2b1a 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -142,6 +142,11 @@ class BlockProcessor(server.db.DB): def __init__(self, env, controller, daemon): super().__init__(env) + + # An incomplete compaction needs to be cancelled otherwise + # restarting it will corrupt the history + self.cancel_history_compaction() + self.daemon = daemon self.controller = controller @@ -332,7 +337,7 @@ class BlockProcessor(server.db.DB): self.wall_time += now - self.last_flush self.last_flush = now self.last_flush_tx_count = self.tx_count - self.write_state(batch) + self.utxo_write_state(batch) def assert_flushed(self): '''Asserts state is fully flushed.''' diff --git a/server/db.py b/server/db.py index 4958c2b..b802fcf 100644 --- a/server/db.py +++ b/server/db.py @@ -60,6 +60,9 @@ class DB(util.LoggedClass): self.db_class = db_class(self.env.db_engine) self.logger.info('using {} for DB backend'.format(self.env.db_engine)) + # For history compaction + self.max_hist_row_entries = 12500 + self.utxo_db = None self.open_dbs() self.clean_db() @@ -134,6 +137,7 @@ class DB(util.LoggedClass): self.logger.info('height: {:,d}'.format(self.db_height)) self.logger.info('tip: {}'.format(hash_to_str(self.db_tip))) self.logger.info('tx count: {:,d}'.format(self.db_tx_count)) + self.logger.info('flush count: {:,d}'.format(self.flush_count)) if self.first_sync: self.logger.info('sync time so far: {}' .format(util.formatted_time(self.wall_time))) @@ -172,7 +176,7 @@ class DB(util.LoggedClass): self.wall_time = state['wall_time'] self.first_sync = state['first_sync'] - def write_state(self, batch): + def utxo_write_state(self, batch): '''Write (UTXO) state to the batch.''' state = { 'genesis': self.coin.GENESIS_HASH, @@ -194,7 +198,9 @@ class DB(util.LoggedClass): undo information. ''' if self.flush_count < self.utxo_flush_count: - raise self.DBError('DB corrupt: flush_count < utxo_flush_count') + # Might happen at end of compaction as both DBs cannot be + # updated atomically + self.utxo_flush_count = self.flush_count if self.flush_count > self.utxo_flush_count: self.clear_excess_history(self.utxo_flush_count) self.clear_excess_undo_info() @@ -417,7 +423,12 @@ class DB(util.LoggedClass): self.logger.info('deleted excess history entries') def write_history_state(self, batch): - state = {'flush_count': self.flush_count} + '''Write state to hist_db.''' + state = { + 'flush_count': self.flush_count, + 'comp_flush_count': self.comp_flush_count, + 'comp_cursor': self.comp_cursor, + } # History entries are not prefixed; the suffix \0\0 ensures we # look similar to other entries and aren't interfered with batch.put(b'state\0\0', repr(state).encode()) @@ -429,8 +440,12 @@ class DB(util.LoggedClass): if not isinstance(state, dict): raise self.DBError('failed reading state from history DB') self.flush_count = state['flush_count'] + self.comp_flush_count = state.get('comp_flush_count', -1) + self.comp_cursor = state.get('comp_cursor', -1) else: self.flush_count = 0 + self.comp_flush_count = -1 + self.comp_cursor = -1 def flush_history(self, history): self.flush_count += 1 @@ -495,3 +510,162 @@ class DB(util.LoggedClass): ''' for tx_num in self.get_history_txnums(hashX, limit): yield self.fs_tx_hash(tx_num) + + # + # History compaction + # + + # comp_cursor is a cursor into compaction progress. + # -1: no compaction in progress + # 0-65535: Compaction in progress; all prefixes < comp_cursor have + # been compacted, and later ones have not. + # 65536: compaction complete in-memory but not flushed + # + # comp_flush_count applies during compaction, and is a flush count + # for history with prefix < comp_cursor. flush_count applies + # to still uncompacted history. It is -1 when no compaction is + # taking place. Key suffixes up to and including comp_flush_count + # are used, so a parallel history flush must first increment this + # + # When compaction is complete and the final flush takes place, + # flush_count is reset to comp_flush_count, and comp_flush_count to -1 + + def _flush_compaction(self, cursor, write_items, keys_to_delete): + '''Flush a single compaction pass as a batch.''' + # Update compaction state + if cursor == 65536: + self.flush_count = self.comp_flush_count + self.comp_cursor = -1 + self.comp_flush_count = -1 + else: + self.comp_cursor = cursor + + # History DB. Flush compacted history and updated state + with self.hist_db.write_batch() as batch: + # Important: delete first! The keyspace may overlap. + for key in keys_to_delete: + batch.delete(key) + for key, value in write_items: + batch.put(key, value) + self.write_history_state(batch) + + # If compaction was completed also update the UTXO flush count + if cursor == 65536: + self.utxo_flush_count = self.flush_count + with self.utxo_db.write_batch() as batch: + self.utxo_write_state(batch) + + def _compact_hashX(self, hashX, hist_map, hist_list, + write_items, keys_to_delete): + '''Compres history for a hashX. hist_list is an ordered list of + the histories to be compressed.''' + # History entries (tx numbers) are 4 bytes each. Distribute + # over rows of up to 50KB in size. A fixed row size means + # future compactions will not need to update the first N - 1 + # rows. + max_row_size = self.max_hist_row_entries * 4 + full_hist = b''.join(hist_list) + nrows = (len(full_hist) + max_row_size - 1) // max_row_size + if nrows > 4: + self.log_info('hashX {} is large: {:,d} entries across {:,d} rows' + .format(hash_to_str(hashX), len(full_hist) // 4, + nrows)); + + # Find what history needs to be written, and what keys need to + # be deleted. Start by assuming all keys are to be deleted, + # and then remove those that are the same on-disk as when + # compacted. + write_size = 0 + keys_to_delete.update(hist_map) + for n, chunk in enumerate(util.chunks(full_hist, max_row_size)): + key = hashX + pack('>H', n) + if hist_map.get(key) == chunk: + keys_to_delete.remove(key) + else: + write_items.append((key, chunk)) + write_size += len(chunk) + + assert n + 1 == nrows + self.comp_flush_count = max(self.comp_flush_count, n) + + return write_size + + def _compact_prefix(self, prefix, write_items, keys_to_delete): + '''Compact all history entries for hashXs beginning with the + given prefix. Update keys_to_delete and write.''' + prior_hashX = None + hist_map = {} + hist_list = [] + + key_len = self.coin.HASHX_LEN + 2 + write_size = 0 + for key, hist in self.hist_db.iterator(prefix=prefix): + # Ignore non-history entries + if len(key) != key_len: + continue + hashX = key[:-2] + if hashX != prior_hashX and prior_hashX: + write_size += self._compact_hashX(prior_hashX, hist_map, + hist_list, write_items, + keys_to_delete) + hist_map.clear() + hist_list.clear() + prior_hashX = hashX + hist_map[key] = hist + hist_list.append(hist) + + if prior_hashX: + write_size += self._compact_hashX(prior_hashX, hist_map, hist_list, + write_items, keys_to_delete) + return write_size + + def _compact_history(self, limit): + '''Inner loop of history compaction. Loops until limit bytes have + been processed. + ''' + keys_to_delete = set() + write_items = [] # A list of (key, value) pairs + write_size = 0 + + # Loop over 2-byte prefixes + cursor = self.comp_cursor + while write_size < limit and cursor < 65536: + prefix = pack('>H', cursor) + write_size += self._compact_prefix(prefix, write_items, + keys_to_delete) + cursor += 1 + + max_rows = self.comp_flush_count + 1 + self._flush_compaction(cursor, write_items, keys_to_delete) + + self.log_info('history compaction: wrote {:,d} rows ({:.1f} MB), ' + 'removed {:,d} rows, largest: {:,d}, {:.1f}% complete' + .format(len(write_items), write_size / 1000000, + len(keys_to_delete), max_rows, + 100 * cursor / 65536)) + return write_size + + async def compact_history(self, loop): + '''Start a background history compaction and reset the flush count if + its getting high. + ''' + # Do nothing if during initial sync or if a compaction hasn't + # been initiated + if self.first_sync or self.comp_cursor == -1: + return + + self.comp_flush_count = max(self.comp_flush_count, 1) + limit = 50 * 1000 * 1000 + + while self.comp_cursor != -1: + locked = self.semaphore.locked + if self.semaphore.locked: + self.log_info('compact_history: waiting on semaphore...') + with await self.semaphore: + await loop.run_in_executor(None, self._compact_history, limit) + + def cancel_history_compaction(self): + if self.comp_cursor != -1: + self.logger.warning('cancelling in-progress history compaction') + self.comp_flush_count = -1 + self.comp_cursor = -1 diff --git a/tests/server/test_compaction.py b/tests/server/test_compaction.py new file mode 100644 index 0000000..d1974b5 --- /dev/null +++ b/tests/server/test_compaction.py @@ -0,0 +1,131 @@ +# Test of compaction code in server/db.py + +import array +from collections import defaultdict +from os import environ, urandom +from struct import pack +import random + +from lib.hash import hash_to_str +from server.env import Env +from server.db import DB + + +def create_histories(db, hashX_count=100): + '''Creates a bunch of random transaction histories, and write them + to disk in a series of small flushes.''' + hashXs = [urandom(db.coin.HASHX_LEN) for n in range(hashX_count)] + mk_array = lambda : array.array('I') + histories = {hashX : mk_array() for hashX in hashXs} + this_history = defaultdict(mk_array) + tx_num = 0 + while hashXs: + hash_indexes = set(random.randrange(len(hashXs)) + for n in range(1 + random.randrange(4))) + for index in hash_indexes: + histories[hashXs[index]].append(tx_num) + this_history[hashXs[index]].append(tx_num) + + tx_num += 1 + # Occasionally flush and drop a random hashX if non-empty + if random.random() < 0.1: + db.flush_history(this_history) + this_history.clear() + index = random.randrange(0, len(hashXs)) + if histories[hashXs[index]]: + del hashXs[index] + + return histories + + +def check_hashX_compaction(db): + db.max_hist_row_entries = 40 + row_size = db.max_hist_row_entries * 4 + full_hist = array.array('I', range(100)).tobytes() + hashX = urandom(db.coin.HASHX_LEN) + pairs = ((1, 20), (26, 50), (56, 30)) + + cum = 0 + hist_list = [] + hist_map = {} + for flush_count, count in pairs: + key = hashX + pack('>H', flush_count) + hist = full_hist[cum * 4: (cum+count) * 4] + hist_map[key] = hist + hist_list.append(hist) + cum += count + + write_items = [] + keys_to_delete = set() + write_size = db._compact_hashX(hashX, hist_map, hist_list, + write_items, keys_to_delete) + # Check results for sanity + assert write_size == len(full_hist) + assert len(write_items) == 3 + assert len(keys_to_delete) == 3 + assert len(hist_map) == len(pairs) + for n, item in enumerate(write_items): + assert item == (hashX + pack('>H', n), + full_hist[n * row_size: (n + 1) * row_size]) + for flush_count, count in pairs: + assert hashX + pack('>H', flush_count) in keys_to_delete + + # Check re-compaction is null + hist_map = {key: value for key, value in write_items} + hist_list = [value for key, value in write_items] + write_items.clear() + keys_to_delete.clear() + write_size = db._compact_hashX(hashX, hist_map, hist_list, + write_items, keys_to_delete) + assert write_size == 0 + assert len(write_items) == 0 + assert len(keys_to_delete) == 0 + assert len(hist_map) == len(pairs) + + # Check re-compaction adding a single tx writes the one row + hist_list[-1] += array.array('I', [100]).tobytes() + write_size = db._compact_hashX(hashX, hist_map, hist_list, + write_items, keys_to_delete) + assert write_size == len(hist_list[-1]) + assert write_items == [(hashX + pack('>H', 2), hist_list[-1])] + assert len(keys_to_delete) == 1 + assert write_items[0][0] in keys_to_delete + assert len(hist_map) == len(pairs) + + +def check_written(db, histories): + for hashX, hist in histories.items(): + db_hist = array.array('I', db.get_history_txnums(hashX, limit=None)) + assert hist == db_hist + +def compact_history(db): + '''Synchronously compact the DB history.''' + db.first_sync = False + db.comp_cursor = 0 + + db.comp_flush_count = max(db.comp_flush_count, 1) + limit = 5 * 1000 + + write_size = 0 + while db.comp_cursor != -1: + write_size += db._compact_history(limit) + assert write_size != 0 + +def run_test(db_dir): + environ.clear() + environ['DB_DIRECTORY'] = db_dir + environ['DAEMON_URL'] = '' + env = Env() + db = DB(env) + # Test abstract compaction + check_hashX_compaction(db) + # Now test in with random data + histories = create_histories(db) + check_written(db, histories) + compact_history(db) + check_written(db, histories) + +def test_compaction(tmpdir): + db_dir = str(tmpdir) + print('Temp dir: {}'.format(db_dir)) + run_test(db_dir)