From 5e1ed3ffa69b5ed0d0a5a0478bb1908ef6d9ed35 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Mon, 21 May 2018 19:19:32 +0800 Subject: [PATCH] Separate history management into its own object - the object also manages unflushed history - the history DB has its own version, starting at 0 This is the first step to making history management into a separate service and to larger block sizes. The next step is an improved history format, which I hope will both save space and be a solution to issue #185. The DB should be able to upgrade in-place without re-syncing the chain. --- compact_history.py | 13 +- lib/util.py | 7 + server/block_processor.py | 47 +++-- server/db.py | 317 +++----------------------------- server/history.py | 311 +++++++++++++++++++++++++++++++ tests/server/test_compaction.py | 2 +- 6 files changed, 371 insertions(+), 326 deletions(-) create mode 100644 server/history.py diff --git a/compact_history.py b/compact_history.py index ab21464..ae2e339 100755 --- a/compact_history.py +++ b/compact_history.py @@ -49,16 +49,19 @@ def compact_history(): db = DB(env) assert not db.first_sync + history = db.history # Continue where we left off, if interrupted - if db.comp_cursor == -1: - db.comp_cursor = 0 + if history.comp_cursor == -1: + history.comp_cursor = 0 - db.comp_flush_count = max(db.comp_flush_count, 1) + history.comp_flush_count = max(history.comp_flush_count, 1) limit = 8 * 1000 * 1000 - while db.comp_cursor != -1: - db._compact_history(limit) + while history.comp_cursor != -1: + history._compact_history(limit) + # When completed also update the UTXO flush count + db.set_flush_count(history.flush_count) def main(): logging.basicConfig(level=logging.INFO) diff --git a/lib/util.py b/lib/util.py index 88d06d3..0da31ca 100644 --- a/lib/util.py +++ b/lib/util.py @@ -128,6 +128,13 @@ def chunks(items, size): yield items[i: i + size] +def resolve_limit(limit): + if limit is None: + return -1 + assert isinstance(limit, int) and limit >= 0 + return limit + + def bytes_to_int(be_bytes): '''Interprets a big-endian sequence of bytes as an integer''' return int.from_bytes(be_bytes, 'big') diff --git a/server/block_processor.py b/server/block_processor.py index c7f23d9..2575bd3 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -145,7 +145,7 @@ class BlockProcessor(server.db.DB): # An incomplete compaction needs to be cancelled otherwise # restarting it will corrupt the history - self.cancel_history_compaction() + self.history.cancel_compaction() self.daemon = daemon self.controller = controller @@ -171,8 +171,6 @@ class BlockProcessor(server.db.DB): self.headers = [] self.tx_hashes = [] self.undo_infos = [] - self.history = defaultdict(partial(array.array, 'I')) - self.history_size = 0 # UTXO cache self.utxo_cache = {} @@ -348,9 +346,9 @@ class BlockProcessor(server.db.DB): assert self.tx_count == self.fs_tx_count == self.db_tx_count assert self.height == self.fs_height == self.db_height assert not self.undo_infos - assert not self.history assert not self.utxo_cache assert not self.db_deletes + self.history.assert_flushed() def flush(self, flush_utxos=False): '''Flush out cached state. @@ -372,12 +370,10 @@ class BlockProcessor(server.db.DB): .format(fs_end - flush_start)) # History next - it's fast and frees memory - self.flush_history(self.history) + hashX_count = self.history.flush() if self.utxo_db.for_sync: self.logger.info('flushed history in {:.1f}s for {:,d} addrs' - .format(time.time() - fs_end, len(self.history))) - self.history = defaultdict(partial(array.array, 'I')) - self.history_size = 0 + .format(time.time() - fs_end, hashX_count)) # Flush state last as it reads the wall time. with self.utxo_db.write_batch() as batch: @@ -390,7 +386,7 @@ class BlockProcessor(server.db.DB): self.flush_state(self.utxo_db) self.logger.info('flush #{:,d} took {:.1f}s. Height {:,d} txs: {:,d}' - .format(self.flush_count, + .format(self.history.flush_count, self.last_flush - flush_start, self.height, self.tx_count)) @@ -437,7 +433,7 @@ class BlockProcessor(server.db.DB): height. ''' assert self.height < self.db_height - assert not self.history + self.history.assert_flushed() flush_start = time.time() @@ -450,7 +446,7 @@ class BlockProcessor(server.db.DB): # Backup history. self.touched can include other addresses # which is harmless, but remove None. self.touched.discard(None) - nremoves = self.backup_history(self.touched) + nremoves = self.history.backup(self.touched, self.tx_count) self.logger.info('backing up removed {:,d} history entries' .format(nremoves)) @@ -461,7 +457,7 @@ class BlockProcessor(server.db.DB): self.logger.info('backup flush #{:,d} took {:.1f}s. ' 'Height {:,d} txs: {:,d}' - .format(self.flush_count, + .format(self.history.flush_count, self.last_flush - flush_start, self.height, self.tx_count)) @@ -472,7 +468,7 @@ class BlockProcessor(server.db.DB): one_MB = 1000*1000 utxo_cache_size = len(self.utxo_cache) * 205 db_deletes_size = len(self.db_deletes) * 57 - hist_cache_size = len(self.history) * 180 + self.history_size * 4 + hist_cache_size = self.history.unflushed_memsize() # Roughly ntxs * 32 + nblocks * 42 tx_hash_size = ((self.tx_count - self.fs_tx_count) * 32 + (self.height - self.fs_height) * 42) @@ -522,19 +518,19 @@ class BlockProcessor(server.db.DB): # Use local vars for speed in the loops undo_info = [] - history = self.history - history_size = self.history_size tx_num = self.tx_count script_hashX = self.coin.hashX_from_script s_pack = pack put_utxo = self.utxo_cache.__setitem__ spend_utxo = self.spend_utxo undo_info_append = undo_info.append - touched = self.touched + update_touched = self.touched.update + hashXs_by_tx = [] + append_hashXs = hashXs_by_tx.append for tx, tx_hash in txs: - hashXs = set() - add_hashX = hashXs.add + hashXs = [] + append_hashX = hashXs.append tx_numb = s_pack(' self.utxo_flush_count: - self.clear_excess_history(self.utxo_flush_count) + self.utxo_flush_count = self.history.clear_excess(self.utxo_flush_count) self.clear_excess_undo_info() def fs_update_header_offsets(self, offset_start, height_start, headers): @@ -253,12 +244,15 @@ class DB(object): return [self.coin.header_hash(header) for header in headers] - @staticmethod - def _resolve_limit(limit): - if limit is None: - return -1 - assert isinstance(limit, int) and limit >= 0 - return limit + def get_history(self, hashX, limit=1000): + '''Generator that returns an unpruned, sorted list of (tx_hash, + height) tuples of confirmed transactions that touched the address, + earliest in the blockchain first. Includes both spending and + receiving transactions. By default yields at most 1000 entries. + Set limit to None to get them all. + ''' + for tx_num in self.history.get_txnums(hashX, limit): + yield self.fs_tx_hash(tx_num) # -- Undo information @@ -315,8 +309,8 @@ class DB(object): raise self.DBError('failed reading state from DB') self.db_version = state['db_version'] if self.db_version not in self.DB_VERSIONS: - raise self.DBError('your DB version is {} but this software ' - 'only handles versions {}' + raise self.DBError('your UTXO DB version is {} but this ' + 'software only handles versions {}' .format(self.db_version, self.DB_VERSIONS)) # backwards compat genesis_hash = state['genesis'] @@ -347,6 +341,11 @@ class DB(object): } batch.put(b'state', repr(state).encode()) + def set_flush_count(self, count): + self.utxo_flush_count = count + with self.utxo_db.write_batch() as batch: + self.write_utxo_state(batch) + def get_balance(self, hashX): '''Returns the confirmed balance of an address.''' return sum(utxo.value for utxo in self.get_utxos(hashX, limit=None)) @@ -356,7 +355,7 @@ class DB(object): particular order. By default yields at most 1000 entries. Set limit to None to get them all. ''' - limit = self._resolve_limit(limit) + limit = util.resolve_limit(limit) s_unpack = unpack # Key: b'u' + address_hashX + tx_idx + tx_num # Value: the UTXO value as a 64-bit unsigned integer @@ -410,273 +409,3 @@ class DB(object): return hashX, tx_num_packed return None, None - - # -- History database - - def clear_excess_history(self, flush_count): - self.logger.info('DB shut down uncleanly. Scanning for ' - 'excess history flushes...') - - keys = [] - for key, hist in self.hist_db.iterator(prefix=b''): - flush_id, = unpack('>H', key[-2:]) - if flush_id > flush_count: - keys.append(key) - - self.logger.info('deleting {:,d} history entries'.format(len(keys))) - - self.flush_count = flush_count - with self.hist_db.write_batch() as batch: - for key in keys: - batch.delete(key) - self.write_history_state(batch) - - self.logger.info('deleted excess history entries') - - def write_history_state(self, batch): - '''Write state to hist_db.''' - state = { - 'flush_count': self.flush_count, - 'comp_flush_count': self.comp_flush_count, - 'comp_cursor': self.comp_cursor, - } - # History entries are not prefixed; the suffix \0\0 ensures we - # look similar to other entries and aren't interfered with - batch.put(b'state\0\0', repr(state).encode()) - - def read_history_state(self): - state = self.hist_db.get(b'state\0\0') - if state: - state = ast.literal_eval(state.decode()) - if not isinstance(state, dict): - raise self.DBError('failed reading state from history DB') - self.flush_count = state['flush_count'] - self.comp_flush_count = state.get('comp_flush_count', -1) - self.comp_cursor = state.get('comp_cursor', -1) - else: - self.flush_count = 0 - self.comp_flush_count = -1 - self.comp_cursor = -1 - - def flush_history(self, history): - self.flush_count += 1 - flush_id = pack('>H', self.flush_count) - - with self.hist_db.write_batch() as batch: - for hashX in sorted(history): - key = hashX + flush_id - batch.put(key, history[hashX].tobytes()) - self.write_history_state(batch) - - def backup_history(self, hashXs): - # Not certain this is needed, but it doesn't hurt - self.flush_count += 1 - nremoves = 0 - - with self.hist_db.write_batch() as batch: - for hashX in sorted(hashXs): - deletes = [] - puts = {} - for key, hist in self.hist_db.iterator(prefix=hashX, - reverse=True): - a = array.array('I') - a.frombytes(hist) - # Remove all history entries >= self.tx_count - idx = bisect_left(a, self.tx_count) - nremoves += len(a) - idx - if idx > 0: - puts[key] = a[:idx].tobytes() - break - deletes.append(key) - - for key in deletes: - batch.delete(key) - for key, value in puts.items(): - batch.put(key, value) - self.write_history_state(batch) - - return nremoves - - def get_history_txnums(self, hashX, limit=1000): - '''Generator that returns an unpruned, sorted list of tx_nums in the - history of a hashX. Includes both spending and receiving - transactions. By default yields at most 1000 entries. Set - limit to None to get them all. ''' - limit = self._resolve_limit(limit) - for key, hist in self.hist_db.iterator(prefix=hashX): - a = array.array('I') - a.frombytes(hist) - for tx_num in a: - if limit == 0: - return - yield tx_num - limit -= 1 - - def get_history(self, hashX, limit=1000): - '''Generator that returns an unpruned, sorted list of (tx_hash, - height) tuples of confirmed transactions that touched the address, - earliest in the blockchain first. Includes both spending and - receiving transactions. By default yields at most 1000 entries. - Set limit to None to get them all. - ''' - for tx_num in self.get_history_txnums(hashX, limit): - yield self.fs_tx_hash(tx_num) - - # - # History compaction - # - - # comp_cursor is a cursor into compaction progress. - # -1: no compaction in progress - # 0-65535: Compaction in progress; all prefixes < comp_cursor have - # been compacted, and later ones have not. - # 65536: compaction complete in-memory but not flushed - # - # comp_flush_count applies during compaction, and is a flush count - # for history with prefix < comp_cursor. flush_count applies - # to still uncompacted history. It is -1 when no compaction is - # taking place. Key suffixes up to and including comp_flush_count - # are used, so a parallel history flush must first increment this - # - # When compaction is complete and the final flush takes place, - # flush_count is reset to comp_flush_count, and comp_flush_count to -1 - - def _flush_compaction(self, cursor, write_items, keys_to_delete): - '''Flush a single compaction pass as a batch.''' - # Update compaction state - if cursor == 65536: - self.flush_count = self.comp_flush_count - self.comp_cursor = -1 - self.comp_flush_count = -1 - else: - self.comp_cursor = cursor - - # History DB. Flush compacted history and updated state - with self.hist_db.write_batch() as batch: - # Important: delete first! The keyspace may overlap. - for key in keys_to_delete: - batch.delete(key) - for key, value in write_items: - batch.put(key, value) - self.write_history_state(batch) - - # If compaction was completed also update the UTXO flush count - if cursor == 65536: - self.utxo_flush_count = self.flush_count - with self.utxo_db.write_batch() as batch: - self.write_utxo_state(batch) - - def _compact_hashX(self, hashX, hist_map, hist_list, - write_items, keys_to_delete): - '''Compres history for a hashX. hist_list is an ordered list of - the histories to be compressed.''' - # History entries (tx numbers) are 4 bytes each. Distribute - # over rows of up to 50KB in size. A fixed row size means - # future compactions will not need to update the first N - 1 - # rows. - max_row_size = self.max_hist_row_entries * 4 - full_hist = b''.join(hist_list) - nrows = (len(full_hist) + max_row_size - 1) // max_row_size - if nrows > 4: - self.logger.info('hashX {} is large: {:,d} entries across ' - '{:,d} rows' - .format(hash_to_str(hashX), len(full_hist) // 4, - nrows)) - - # Find what history needs to be written, and what keys need to - # be deleted. Start by assuming all keys are to be deleted, - # and then remove those that are the same on-disk as when - # compacted. - write_size = 0 - keys_to_delete.update(hist_map) - for n, chunk in enumerate(util.chunks(full_hist, max_row_size)): - key = hashX + pack('>H', n) - if hist_map.get(key) == chunk: - keys_to_delete.remove(key) - else: - write_items.append((key, chunk)) - write_size += len(chunk) - - assert n + 1 == nrows - self.comp_flush_count = max(self.comp_flush_count, n) - - return write_size - - def _compact_prefix(self, prefix, write_items, keys_to_delete): - '''Compact all history entries for hashXs beginning with the - given prefix. Update keys_to_delete and write.''' - prior_hashX = None - hist_map = {} - hist_list = [] - - key_len = HASHX_LEN + 2 - write_size = 0 - for key, hist in self.hist_db.iterator(prefix=prefix): - # Ignore non-history entries - if len(key) != key_len: - continue - hashX = key[:-2] - if hashX != prior_hashX and prior_hashX: - write_size += self._compact_hashX(prior_hashX, hist_map, - hist_list, write_items, - keys_to_delete) - hist_map.clear() - hist_list.clear() - prior_hashX = hashX - hist_map[key] = hist - hist_list.append(hist) - - if prior_hashX: - write_size += self._compact_hashX(prior_hashX, hist_map, hist_list, - write_items, keys_to_delete) - return write_size - - def _compact_history(self, limit): - '''Inner loop of history compaction. Loops until limit bytes have - been processed. - ''' - keys_to_delete = set() - write_items = [] # A list of (key, value) pairs - write_size = 0 - - # Loop over 2-byte prefixes - cursor = self.comp_cursor - while write_size < limit and cursor < 65536: - prefix = pack('>H', cursor) - write_size += self._compact_prefix(prefix, write_items, - keys_to_delete) - cursor += 1 - - max_rows = self.comp_flush_count + 1 - self._flush_compaction(cursor, write_items, keys_to_delete) - - self.logger.info('history compaction: wrote {:,d} rows ({:.1f} MB), ' - 'removed {:,d} rows, largest: {:,d}, {:.1f}% complete' - .format(len(write_items), write_size / 1000000, - len(keys_to_delete), max_rows, - 100 * cursor / 65536)) - return write_size - - async def compact_history(self, loop): - '''Start a background history compaction and reset the flush count if - its getting high. - ''' - # Do nothing if during initial sync or if a compaction hasn't - # been initiated - if self.first_sync or self.comp_cursor == -1: - return - - self.comp_flush_count = max(self.comp_flush_count, 1) - limit = 50 * 1000 * 1000 - - while self.comp_cursor != -1: - if self.semaphore.locked: - self.logger.info('compact_history: waiting on semaphore...') - async with self.semaphore: - await loop.run_in_executor(None, self._compact_history, limit) - - def cancel_history_compaction(self): - if self.comp_cursor != -1: - self.logger.warning('cancelling in-progress history compaction') - self.comp_flush_count = -1 - self.comp_cursor = -1 diff --git a/server/history.py b/server/history.py new file mode 100644 index 0000000..81ed496 --- /dev/null +++ b/server/history.py @@ -0,0 +1,311 @@ +# Copyright (c) 2016-2018, Neil Booth +# Copyright (c) 2017, the ElectrumX authors +# +# All rights reserved. +# +# See the file "LICENCE" for information about the copyright +# and warranty status of this software. + +'''History by script hash (address).''' + +import array +import ast +import bisect +from collections import defaultdict +from functools import partial +import logging +from struct import pack, unpack + +import lib.util as util +from lib.hash import hash_to_str, HASHX_LEN + + +class History(object): + + DB_VERSIONS = [0] + + def __init__(self): + self.logger = logging.getLogger(self.__class__.__name__) + # For history compaction + self.max_hist_row_entries = 12500 + self.unflushed = defaultdict(partial(array.array, 'I')) + self.unflushed_count = 0 + + def open_db(self, db_class, for_sync): + self.db = db_class('hist', for_sync) + self.read_state() + + def close_db(self): + self.db.close() + + def read_state(self): + state = self.db.get(b'state\0\0') + if state: + state = ast.literal_eval(state.decode()) + if not isinstance(state, dict): + raise RuntimeError('failed reading state from history DB') + self.flush_count = state['flush_count'] + self.comp_flush_count = state.get('comp_flush_count', -1) + self.comp_cursor = state.get('comp_cursor', -1) + self.db_version = state.get('db_version', 0) + else: + self.flush_count = 0 + self.comp_flush_count = -1 + self.comp_cursor = -1 + self.db_version = max(self.DB_VERSIONS) + + self.logger.info(f'flush count: {self.flush_count:,d}') + self.logger.info(f'history DB version: {self.db_version}') + if self.db_version not in self.DB_VERSIONS: + msg = f'this software only handles DB versions {self.DB_VERSIONS}' + self.logger.error(msg) + raise RuntimeError(msg) + + def clear_excess(self, flush_count): + # < might happen at end of compaction as both DBs cannot be + # updated atomically + if self.flush_count <= flush_count: + return self.flush_count + + self.logger.info('DB shut down uncleanly. Scanning for ' + 'excess history flushes...') + + keys = [] + for key, hist in self.db.iterator(prefix=b''): + flush_id, = unpack('>H', key[-2:]) + if flush_id > flush_count: + keys.append(key) + + self.logger.info('deleting {:,d} history entries'.format(len(keys))) + + self.flush_count = flush_count + with self.db.write_batch() as batch: + for key in keys: + batch.delete(key) + self.write_state(batch) + + self.logger.info('deleted excess history entries') + return self.flush_count + + def write_state(self, batch): + '''Write state to the history DB.''' + state = { + 'flush_count': self.flush_count, + 'comp_flush_count': self.comp_flush_count, + 'comp_cursor': self.comp_cursor, + 'db_version': self.db_version, + } + # History entries are not prefixed; the suffix \0\0 ensures we + # look similar to other entries and aren't interfered with + batch.put(b'state\0\0', repr(state).encode()) + + def add_unflushed(self, hashXs_by_tx, first_tx_num): + unflushed = self.unflushed + count = 0 + for tx_num, hashXs in enumerate(hashXs_by_tx, start=first_tx_num): + hashXs = set(hashXs) + for hashX in hashXs: + unflushed[hashX].append(tx_num) + count += len(hashXs) + self.unflushed_count += count + + def unflushed_memsize(self): + return len(self.unflushed) * 180 + self.unflushed_count * 4 + + def assert_flushed(self): + assert not self.unflushed + + def flush(self): + self.flush_count += 1 + flush_id = pack('>H', self.flush_count) + unflushed = self.unflushed + + with self.db.write_batch() as batch: + for hashX in sorted(unflushed): + key = hashX + flush_id + batch.put(key, unflushed[hashX].tobytes()) + self.write_state(batch) + + count = len(unflushed) + unflushed.clear() + self.unflushed_count = 0 + return count + + def backup(self, hashXs, tx_count): + # Not certain this is needed, but it doesn't hurt + self.flush_count += 1 + nremoves = 0 + bisect_left = bisect.bisect_left + + with self.db.write_batch() as batch: + for hashX in sorted(hashXs): + deletes = [] + puts = {} + for key, hist in self.db.iterator(prefix=hashX, reverse=True): + a = array.array('I') + a.frombytes(hist) + # Remove all history entries >= tx_count + idx = bisect_left(a, tx_count) + nremoves += len(a) - idx + if idx > 0: + puts[key] = a[:idx].tobytes() + break + deletes.append(key) + + for key in deletes: + batch.delete(key) + for key, value in puts.items(): + batch.put(key, value) + self.write_state(batch) + + return nremoves + + def get_txnums(self, hashX, limit=1000): + '''Generator that returns an unpruned, sorted list of tx_nums in the + history of a hashX. Includes both spending and receiving + transactions. By default yields at most 1000 entries. Set + limit to None to get them all. ''' + limit = util.resolve_limit(limit) + for key, hist in self.db.iterator(prefix=hashX): + a = array.array('I') + a.frombytes(hist) + for tx_num in a: + if limit == 0: + return + yield tx_num + limit -= 1 + + # + # History compaction + # + + # comp_cursor is a cursor into compaction progress. + # -1: no compaction in progress + # 0-65535: Compaction in progress; all prefixes < comp_cursor have + # been compacted, and later ones have not. + # 65536: compaction complete in-memory but not flushed + # + # comp_flush_count applies during compaction, and is a flush count + # for history with prefix < comp_cursor. flush_count applies + # to still uncompacted history. It is -1 when no compaction is + # taking place. Key suffixes up to and including comp_flush_count + # are used, so a parallel history flush must first increment this + # + # When compaction is complete and the final flush takes place, + # flush_count is reset to comp_flush_count, and comp_flush_count to -1 + + def _flush_compaction(self, cursor, write_items, keys_to_delete): + '''Flush a single compaction pass as a batch.''' + # Update compaction state + if cursor == 65536: + self.flush_count = self.comp_flush_count + self.comp_cursor = -1 + self.comp_flush_count = -1 + else: + self.comp_cursor = cursor + + # History DB. Flush compacted history and updated state + with self.db.write_batch() as batch: + # Important: delete first! The keyspace may overlap. + for key in keys_to_delete: + batch.delete(key) + for key, value in write_items: + batch.put(key, value) + self.write_state(batch) + + def _compact_hashX(self, hashX, hist_map, hist_list, + write_items, keys_to_delete): + '''Compres history for a hashX. hist_list is an ordered list of + the histories to be compressed.''' + # History entries (tx numbers) are 4 bytes each. Distribute + # over rows of up to 50KB in size. A fixed row size means + # future compactions will not need to update the first N - 1 + # rows. + max_row_size = self.max_hist_row_entries * 4 + full_hist = b''.join(hist_list) + nrows = (len(full_hist) + max_row_size - 1) // max_row_size + if nrows > 4: + self.logger.info('hashX {} is large: {:,d} entries across ' + '{:,d} rows' + .format(hash_to_str(hashX), len(full_hist) // 4, + nrows)) + + # Find what history needs to be written, and what keys need to + # be deleted. Start by assuming all keys are to be deleted, + # and then remove those that are the same on-disk as when + # compacted. + write_size = 0 + keys_to_delete.update(hist_map) + for n, chunk in enumerate(util.chunks(full_hist, max_row_size)): + key = hashX + pack('>H', n) + if hist_map.get(key) == chunk: + keys_to_delete.remove(key) + else: + write_items.append((key, chunk)) + write_size += len(chunk) + + assert n + 1 == nrows + self.comp_flush_count = max(self.comp_flush_count, n) + + return write_size + + def _compact_prefix(self, prefix, write_items, keys_to_delete): + '''Compact all history entries for hashXs beginning with the + given prefix. Update keys_to_delete and write.''' + prior_hashX = None + hist_map = {} + hist_list = [] + + key_len = HASHX_LEN + 2 + write_size = 0 + for key, hist in self.db.iterator(prefix=prefix): + # Ignore non-history entries + if len(key) != key_len: + continue + hashX = key[:-2] + if hashX != prior_hashX and prior_hashX: + write_size += self._compact_hashX(prior_hashX, hist_map, + hist_list, write_items, + keys_to_delete) + hist_map.clear() + hist_list.clear() + prior_hashX = hashX + hist_map[key] = hist + hist_list.append(hist) + + if prior_hashX: + write_size += self._compact_hashX(prior_hashX, hist_map, hist_list, + write_items, keys_to_delete) + return write_size + + def _compact_history(self, limit): + '''Inner loop of history compaction. Loops until limit bytes have + been processed. + ''' + keys_to_delete = set() + write_items = [] # A list of (key, value) pairs + write_size = 0 + + # Loop over 2-byte prefixes + cursor = self.comp_cursor + while write_size < limit and cursor < 65536: + prefix = pack('>H', cursor) + write_size += self._compact_prefix(prefix, write_items, + keys_to_delete) + cursor += 1 + + max_rows = self.comp_flush_count + 1 + self._flush_compaction(cursor, write_items, keys_to_delete) + + self.logger.info('history compaction: wrote {:,d} rows ({:.1f} MB), ' + 'removed {:,d} rows, largest: {:,d}, {:.1f}% complete' + .format(len(write_items), write_size / 1000000, + len(keys_to_delete), max_rows, + 100 * cursor / 65536)) + return write_size + + def cancel_compaction(self): + if self.comp_cursor != -1: + self.logger.warning('cancelling in-progress history compaction') + self.comp_flush_count = -1 + self.comp_cursor = -1 diff --git a/tests/server/test_compaction.py b/tests/server/test_compaction.py index 109658b..4807844 100644 --- a/tests/server/test_compaction.py +++ b/tests/server/test_compaction.py @@ -95,7 +95,7 @@ def check_hashX_compaction(db): def check_written(db, histories): for hashX, hist in histories.items(): - db_hist = array.array('I', db.get_history_txnums(hashX, limit=None)) + db_hist = array.array('I', db.history.get_txnums(hashX, limit=None)) assert hist == db_hist def compact_history(db):