From 4e777b5861624b9e5d0bc19430406222f25b7106 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Mon, 10 Oct 2016 15:33:52 +0900 Subject: [PATCH 01/17] Let's try a smarter UTXO cache. --- query.py | 2 +- server/db.py | 457 ++++++++++++++++++++++++++++++-------------------- server/env.py | 2 +- 3 files changed, 277 insertions(+), 184 deletions(-) diff --git a/query.py b/query.py index 23ed768..1d8b462 100644 --- a/query.py +++ b/query.py @@ -33,7 +33,7 @@ def main(): n = None for n, utxo in enumerate(db.get_utxos(hash168, limit)): print('UTXOs #{:d}: hash: {} pos: {:d} height: {:d} value: {:d}' - .format(n, bytes(reversed(utxo.tx_hash)).hex(), + .format(n + 1, bytes(reversed(utxo.tx_hash)).hex(), utxo.tx_pos, utxo.height, utxo.value)) if n is None: print('No UTXOs') diff --git a/server/db.py b/server/db.py index d053975..4135a7a 100644 --- a/server/db.py +++ b/server/db.py @@ -31,6 +31,250 @@ def to_4_bytes(value): def from_4_bytes(b): return struct.unpack(' self.flush_size: + if self.utxo_cache.size_MB() + hist_MB > self.flush_MB: self.flush() def process_tx(self, tx_hash, tx): - hash168s = set() + cache = self.utxo_cache + tx_num = self.tx_count + + # Add the outputs as new UTXOs; spend the inputs + hash168s = cache.add_many(tx_hash, tx_num, tx.outputs) if not tx.is_coinbase: for txin in tx.inputs: - hash168s.add(self.spend_utxo(txin.prevout)) - - for idx, txout in enumerate(tx.outputs): - hash168s.add(self.put_utxo(tx_hash, idx, txout)) + hash168s.add(cache.spend(txin.prevout)) for hash168 in hash168s: - self.history[hash168].append(self.tx_count) + self.history[hash168].append(tx_num) + self.history_size += len(hash168s) self.tx_count += 1 diff --git a/server/env.py b/server/env.py index 42dbe1a..ec0e15c 100644 --- a/server/env.py +++ b/server/env.py @@ -20,7 +20,7 @@ class Env(object): network = self.default('NETWORK', 'mainnet') self.coin = Coin.lookup_coin_class(coin_name, network) self.db_dir = self.required('DB_DIRECTORY') - self.flush_size = self.integer('FLUSH_SIZE', 1000000) + self.flush_MB = self.integer('FLUSH_MB', 1000) self.rpc_url = self.build_rpc_url() def default(self, envvar, default): From 14f4228af77046be603b1de03e39ec0139b55709 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Tue, 11 Oct 2016 00:11:10 +0900 Subject: [PATCH 02/17] Add missing line --- server/db.py | 1 + 1 file changed, 1 insertion(+) diff --git a/server/db.py b/server/db.py index 4135a7a..2e4f619 100644 --- a/server/db.py +++ b/server/db.py @@ -507,6 +507,7 @@ class DB(object): self.process_tx(tx_hash, tx) # Flush if we're getting full + hist_MB = self.history_size * 4 // 1048576 if self.utxo_cache.size_MB() + hist_MB > self.flush_MB: self.flush() From d11c60f6adf9c77767c97c45eec5e66627f8505f Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Tue, 11 Oct 2016 06:43:59 +0900 Subject: [PATCH 03/17] Cache size logging --- server/db.py | 28 ++++++++++++++++++++-------- 1 file changed, 20 insertions(+), 8 deletions(-) diff --git a/server/db.py b/server/db.py index 2e4f619..3625e88 100644 --- a/server/db.py +++ b/server/db.py @@ -95,10 +95,6 @@ class UTXOCache(object): self.cache_hits = 0 self.db_deletes = 0 - def size_MB(self): - '''Returns the approximate size of the cache, in MB.''' - return (len(self.cache) + len(self.db_cache)) * 100 // 1048576 - def add_many(self, tx_hash, tx_num, txouts): '''Add a sequence of UTXOs to the cache, return the set of hash168s seen. @@ -122,8 +118,8 @@ class UTXOCache(object): hash168s.add(hash168) key = tx_hash + pack(' self.flush_MB: + if self.cache_MB() > self.flush_MB: self.flush() def process_tx(self, tx_hash, tx): From 682cc8ff86cf3240876e9a6489181953201aab19 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Tue, 11 Oct 2016 06:49:50 +0900 Subject: [PATCH 04/17] Better size est. --- server/db.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/db.py b/server/db.py index 3625e88..74f04ff 100644 --- a/server/db.py +++ b/server/db.py @@ -490,7 +490,7 @@ class DB(object): '''Returns the approximate size of the cache, in MB.''' utxo_MB = ((len(self.utxo_cache.cache) + len(self.utxo_cache.db_cache)) * 100 // 1048576) - hist_MB = (len(self.history) * 48 + self.history_size * 4) // 1048576 + hist_MB = (len(self.history) * 48 + self.history_size * 20) // 1048576 if self.height % 200 == 0: self.logger.info('cache size at height {:,d}: ' 'UTXOs: {:,d} MB history: {:,d} MB' From d8e9eb7796d20df5cfdfcc586fa933783459cc37 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Tue, 11 Oct 2016 07:33:30 +0900 Subject: [PATCH 05/17] Accurate cache accounting --- lib/coins.py | 1 - lib/util.py | 43 ++++++++++++++++++++++++++++++++- server/db.py | 62 +++++++++++++++++++++++++++--------------------- server/server.py | 11 --------- 4 files changed, 77 insertions(+), 40 deletions(-) diff --git a/lib/coins.py b/lib/coins.py index 8afed4d..699fa5f 100644 --- a/lib/coins.py +++ b/lib/coins.py @@ -137,7 +137,6 @@ class Coin(object): @classmethod def read_block(cls, block): - assert isinstance(block, memoryview) d = Deserializer(block[cls.HEADER_LEN:]) return d.read_block() diff --git a/lib/util.py b/lib/util.py index 40f9e2f..81eca15 100644 --- a/lib/util.py +++ b/lib/util.py @@ -1,8 +1,9 @@ # See the file "LICENSE" for information about the copyright # and warranty status of this software. - +import array import sys +from collections import Container, Mapping # Method decorator. To be used for calculations that will always @@ -25,6 +26,46 @@ class cachedproperty(object): .format(self.f.__name__, obj)) +def deep_getsizeof(obj): + """Find the memory footprint of a Python object. + + Based on from code.tutsplus.com: http://goo.gl/fZ0DXK + + This is a recursive function that drills down a Python object graph + like a dictionary holding nested dictionaries with lists of lists + and tuples and sets. + + The sys.getsizeof function does a shallow size of only. It counts each + object inside a container as pointer only regardless of how big it + really is. + + :param o: the object + + :return: + """ + + ids = set() + + def size(o): + if id(o) in ids: + return 0 + + r = sys.getsizeof(o) + ids.add(id(o)) + + if isinstance(o, (str, bytes, bytearray, array.array)): + return r + + if isinstance(o, Mapping): + return r + sum(size(k) + size(v) for k, v in o.items()) + + if isinstance(o, Container): + return r + sum(size(x) for x in o) + + return r + + return size(obj) + def chunks(items, size): for i in range(0, len(items), size): yield items[i: i + size] diff --git a/server/db.py b/server/db.py index 74f04ff..e7eae6f 100644 --- a/server/db.py +++ b/server/db.py @@ -45,10 +45,10 @@ class UTXOCache(object): Key: TX_HASH + TX_IDX (32 + 2 = 34 bytes) Value: HASH168 + TX_NUM + VALUE (21 + 4 + 8 = 33 bytes) - That's 67 bytes of raw data. Assume 100 bytes per UTXO accounting - for Python datastructure overhead, then perhaps 20 million UTXOs - can fit in 2GB of RAM. There are approximately 42 million UTXOs - on bitcoin mainnet at height 433,000. + That's 67 bytes of raw data. Python dictionary overhead means + each entry actually uses about 187 bytes of memory. So almost + 11.5 million UTXOs can fit in 2GB of RAM. There are approximately + 42 million UTXOs on bitcoin mainnet at height 433,000. Semantics: @@ -80,6 +80,7 @@ class UTXOCache(object): tx_num is stored to resolve them. The collision rate is around 0.02% for the hash168 table, and almost zero for the UTXO table (there are around 100 collisions in the whole bitcoin blockchain). + ''' def __init__(self, parent, db, coin): @@ -290,6 +291,7 @@ class DB(object): self.coin = env.coin self.flush_MB = env.flush_MB + self.next_cache_check = 0 self.logger.info('flushing after cache reaches {:,d} MB' .format(self.flush_MB)) @@ -298,7 +300,7 @@ class DB(object): # Unflushed items. Headers and tx_hashes have one entry per block self.headers = [] self.tx_hashes = [] - self.history = defaultdict(list) + self.history = defaultdict(partial(array.array, 'I')) self.history_size = 0 db_name = '{}-{}'.format(self.coin.NAME, self.coin.NET) @@ -432,13 +434,12 @@ class DB(object): flush_id = struct.pack('>H', self.flush_count) for hash168, hist in self.history.items(): key = b'H' + hash168 + flush_id - batch.put(key, array.array('I', hist).tobytes()) + batch.put(key, hist.tobytes()) - self.logger.info('flushed {:,d} history entries ({:,d} MB)...' - .format(self.history_size, - self.history_size * 4 // 1048576)) + self.logger.info('flushed {:,d} history entries in {:,d} addrs...' + .format(self.history_size, len(self.history))) - self.history = defaultdict(list) + self.history = defaultdict(partial(array.array, 'I')) self.history_size = 0 def open_file(self, filename, truncate=False, create=False): @@ -488,20 +489,24 @@ class DB(object): def cache_MB(self): '''Returns the approximate size of the cache, in MB.''' - utxo_MB = ((len(self.utxo_cache.cache) + len(self.utxo_cache.db_cache)) - * 100 // 1048576) - hist_MB = (len(self.history) * 48 + self.history_size * 20) // 1048576 - if self.height % 200 == 0: - self.logger.info('cache size at height {:,d}: ' - 'UTXOs: {:,d} MB history: {:,d} MB' - .format(self.height, utxo_MB, hist_MB)) - self.logger.info('cache entries: UTXOs: {:,d}/{:,d} ' - 'history: {:,d}/{:,d}' - .format(len(self.utxo_cache.cache), - len(self.utxo_cache.db_cache), - len(self.history), - self.history_size)) - return utxo_MB + hist_MB + # Good average estimates + utxo_cache_size = len(self.utxo_cache.cache) * 187 + db_cache_size = len(self.utxo_cache.db_cache) * 105 + hist_cache_size = len(self.history) * 180 + self.history_size * 4 + utxo_MB = (db_cache_size + utxo_cache_size) // 1048576 + hist_MB = hist_cache_size // 1048576 + cache_MB = utxo_MB + hist_MB + + self.logger.info('cache entries: UTXO: {:,d} DB: {:,d} ' + 'hist count: {:,d} hist size: {:,d}' + .format(len(self.utxo_cache.cache), + len(self.utxo_cache.db_cache), + len(self.history), + self.history_size)) + self.logger.info('cache size at height {:,d}: {:,d}MB ' + '(UTXOs {:,d}MB hist {:,d}MB)' + .format(self.height, cache_MB, utxo_MB, hist_MB)) + return cache_MB def process_block(self, block): self.headers.append(block[:self.coin.HEADER_LEN]) @@ -519,9 +524,12 @@ class DB(object): for tx_hash, tx in zip(tx_hashes, txs): self.process_tx(tx_hash, tx) - # Flush if we're getting full - if self.cache_MB() > self.flush_MB: - self.flush() + # Check if we're getting full and time to flush? + now = time.time() + if now > self.next_cache_check: + self.next_cache_check = now + 60 + if self.cache_MB() > self.flush_MB: + self.flush() def process_tx(self, tx_hash, tx): cache = self.utxo_cache diff --git a/server/server.py b/server/server.py index e58f457..91117f9 100644 --- a/server/server.py +++ b/server/server.py @@ -74,18 +74,7 @@ class BlockCache(object): self.logger.info('catching up, block cache limit {:d}MB...' .format(self.cache_limit)) - last_log = 0 - prior_height = self.db.height while await self.maybe_prefill(): - now = time.time() - count = self.fetched_height - prior_height - if now > last_log + 15 and count: - last_log = now - prior_height = self.fetched_height - self.logger.info('prefilled {:,d} blocks to height {:,d} ' - 'daemon height: {:,d}' - .format(count, self.fetched_height, - self.daemon_height)) await asyncio.sleep(1) if not self.stop: From c7f930a18a33ff7f026cfb91752ce8f8d60d9c22 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Tue, 11 Oct 2016 21:20:04 +0900 Subject: [PATCH 06/17] Clean up db initialization and state writing --- server/db.py | 185 +++++++++++++++++++++++------------------------ server/server.py | 2 +- 2 files changed, 93 insertions(+), 94 deletions(-) diff --git a/server/db.py b/server/db.py index e7eae6f..6b71daf 100644 --- a/server/db.py +++ b/server/db.py @@ -2,6 +2,7 @@ # and warranty status of this software. import array +import ast import itertools import os import struct @@ -275,13 +276,6 @@ class UTXOCache(object): class DB(object): - HEIGHT_KEY = b'height' - TIP_KEY = b'tip' - GENESIS_KEY = b'genesis' - TX_COUNT_KEY = b'tx_count' - FLUSH_COUNT_KEY = b'flush_count' - WALL_TIME_KEY = b'wall_time' - class Error(Exception): pass @@ -289,75 +283,90 @@ class DB(object): self.logger = logging.getLogger('DB') self.logger.setLevel(logging.INFO) - self.coin = env.coin + # Meta + self.tx_hash_file_size = 16 * 1024 * 1024 self.flush_MB = env.flush_MB self.next_cache_check = 0 - self.logger.info('flushing after cache reaches {:,d} MB' - .format(self.flush_MB)) + self.last_flush = time.time() + self.coin = env.coin - self.tx_counts = array.array('I') - self.tx_hash_file_size = 4*1024*1024 - # Unflushed items. Headers and tx_hashes have one entry per block + # Chain state (initialize to genesis in case of new DB) + self.height = -1 + self.tx_count = 0 + self.flush_count = 0 + self.wall_time = 0 + self.tip = self.coin.GENESIS_HASH + + # Open DB and metadata files. Record some of its state. + self.db = self.open_db(self.coin) + self.db_tx_count = self.tx_count + self.db_height = self.height + + # Caches to be flushed later. Headers and tx_hashes have one + # entry per block self.headers = [] self.tx_hashes = [] self.history = defaultdict(partial(array.array, 'I')) self.history_size = 0 - - db_name = '{}-{}'.format(self.coin.NAME, self.coin.NET) - try: - self.db = self.open_db(db_name, False) - except: - self.db = self.open_db(db_name, True) - self.headers_file = self.open_file('headers', True) - self.txcount_file = self.open_file('txcount', True) - self.init_db() - self.logger.info('created new database {}'.format(db_name)) - else: - self.logger.info('successfully opened database {}'.format(db_name)) - self.headers_file = self.open_file('headers') - self.txcount_file = self.open_file('txcount') - self.read_db() - self.utxo_cache = UTXOCache(self, self.db, self.coin) + self.tx_counts = array.array('I') + self.txcount_file.seek(0) + self.tx_counts.fromfile(self.txcount_file, self.height + 1) - # Note that DB_HEIGHT is the height of the next block to be written. - # So an empty DB has a DB_HEIGHT of 0 not -1. - self.tx_count = self.db_tx_count - self.height = self.db_height - 1 - self.tx_counts.fromfile(self.txcount_file, self.db_height) - self.last_flush = time.time() - # FIXME: this sucks and causes issues with exceptions in init_db() - if self.tx_count == 0: - self.flush() - - def open_db(self, db_name, create): - return plyvel.DB(db_name, create_if_missing=create, - error_if_exists=create, compression=None) - - def init_db(self): - self.db_height = 0 - self.db_tx_count = 0 - self.flush_count = 0 - self.wall_time = 0 - self.tip = self.coin.GENESIS_HASH - self.db.put(self.GENESIS_KEY, unhexlify(self.tip)) - - def read_db(self): - db = self.db - genesis_hash = hexlify(db.get(self.GENESIS_KEY)) - if genesis_hash != self.coin.GENESIS_HASH: - raise self.Error('DB genesis hash {} does not match coin {}' - .format(genesis_hash, self.coin.GENESIS_HASH)) - self.db_height = from_4_bytes(db.get(self.HEIGHT_KEY)) - self.db_tx_count = from_4_bytes(db.get(self.TX_COUNT_KEY)) - self.flush_count = from_4_bytes(db.get(self.FLUSH_COUNT_KEY)) - self.wall_time = from_4_bytes(db.get(self.WALL_TIME_KEY)) - self.tip = hexlify(db.get(self.TIP_KEY)) + # Log state self.logger.info('{}/{} height: {:,d} tx count: {:,d} ' 'flush count: {:,d} sync time: {}' .format(self.coin.NAME, self.coin.NET, - self.db_height - 1, self.db_tx_count, + self.height, self.tx_count, self.flush_count, self.formatted_wall_time())) + self.logger.info('flushing after cache reaches {:,d} MB' + .format(self.flush_MB)) + + def open_db(self, coin): + self.headers_file = self.open_file('headers', True) + self.txcount_file = self.open_file('txcount', True) + is_new = self.headers_file.seek(0, 2) == 0 + if is_new != (self.txcount_file.seek(0, 2) == 0): + raise self.Error('just one metadata file is zero-length') + + db_name = '{}-{}'.format(coin.NAME, coin.NET) + db = plyvel.DB(db_name, create_if_missing=is_new, + error_if_exists=is_new, compression=None) + if is_new: + self.logger.info('created new database {}'.format(db_name)) + self.flush_state(db) + else: + self.logger.info('successfully opened database {}'.format(db_name)) + self.read_state(db) + return db + + def flush_state(self, batch): + '''Flush chain state to the batch.''' + now = time.time() + self.wall_time += now - self.last_flush + self.last_flush = now + state = { + 'genesis': self.coin.GENESIS_HASH, + 'height': self.height, + 'tx_count': self.tx_count, + 'tip': self.tip, + 'flush_count': self.flush_count, + 'wall_time': self.wall_time, + } + batch.put(b'state', repr(state).encode('ascii')) + + def read_state(self, db): + state = db.get(b'state') + state = ast.literal_eval(state.decode('ascii')) + if state['genesis'] != self.coin.GENESIS_HASH: + raise self.Error('DB genesis hash {} does not match coin {}' + .format(state['genesis_hash'], + self.coin.GENESIS_HASH)) + self.height = state['height'] + self.tx_count = state['tx_count'] + self.tip = state['tip'] + self.flush_count = state['flush_count'] + self.wall_time = state['wall_time'] def formatted_wall_time(self): wall_time = int(self.wall_time) @@ -365,12 +374,12 @@ class DB(object): wall_time // 86400, (wall_time % 86400) // 3600, (wall_time % 3600) // 60, wall_time % 60) - def flush(self): + def flush_all(self): '''Flush out all cached state.''' flush_start = time.time() last_flush = self.last_flush tx_diff = self.tx_count - self.db_tx_count - height_diff = self.height + 1 - self.db_height + height_diff = self.height - self.db_height self.logger.info('starting flush {:,d} txs and {:,d} blocks' .format(tx_diff, height_diff)) @@ -386,9 +395,13 @@ class DB(object): self.flush_state(batch) self.logger.info('committing transaction...') + # The flush succeeded, so update our record of DB state + self.db_tx_count = self.tx_count + self.db_height = self.height + # Update and put the wall time again - otherwise we drop the - # time it takes leveldb to commit the batch - self.update_wall_time(self.db) + # time it took leveldb to commit the batch + self.flush_state(self.db) flush_time = int(self.last_flush - flush_start) self.logger.info('flushed in {:,d}s to height {:,d} tx count {:,d} ' @@ -410,27 +423,11 @@ class DB(object): self.write_tx_hashes() os.sync() - def update_wall_time(self, dest): - '''Put the wall time to dest - a DB or batch.''' - now = time.time() - self.wall_time += now - self.last_flush - self.last_flush = now - dest.put(self.WALL_TIME_KEY, to_4_bytes(int(self.wall_time))) - - def flush_state(self, batch): - self.db_tx_count = self.tx_count - self.db_height = self.height + 1 - batch.put(self.HEIGHT_KEY, to_4_bytes(self.db_height)) - batch.put(self.TX_COUNT_KEY, to_4_bytes(self.db_tx_count)) - batch.put(self.FLUSH_COUNT_KEY, to_4_bytes(self.flush_count)) - batch.put(self.TIP_KEY, unhexlify(self.tip)) - self.update_wall_time(batch) - self.flush_count += 1 - def flush_history(self, batch): # Drop any None entry self.history.pop(None, None) + self.flush_count += 1 flush_id = struct.pack('>H', self.flush_count) for hash168, hist in self.history.items(): key = b'H' + hash168 + flush_id @@ -442,9 +439,10 @@ class DB(object): self.history = defaultdict(partial(array.array, 'I')) self.history_size = 0 - def open_file(self, filename, truncate=False, create=False): + def open_file(self, filename, create=False): + '''Open the file name. Return its handle.''' try: - return open(filename, 'wb+' if truncate else 'rb+') + return open(filename, 'rb+') except FileNotFoundError: if create: return open(filename, 'wb+') @@ -459,14 +457,15 @@ class DB(object): headers = b''.join(self.headers) header_len = self.coin.HEADER_LEN assert len(headers) % header_len == 0 - self.headers_file.seek(self.db_height * header_len) + self.headers_file.seek((self.db_height + 1) * header_len) self.headers_file.write(headers) self.headers_file.flush() self.headers = [] def write_tx_counts(self): - self.txcount_file.seek(self.db_height * self.tx_counts.itemsize) - self.txcount_file.write(self.tx_counts[self.db_height: self.height + 1]) + self.txcount_file.seek((self.db_height + 1) * self.tx_counts.itemsize) + self.txcount_file.write(self.tx_counts[self.db_height + 1: + self.height + 1]) self.txcount_file.flush() def write_tx_hashes(self): @@ -529,7 +528,7 @@ class DB(object): if now > self.next_cache_check: self.next_cache_check = now + 60 if self.cache_MB() > self.flush_MB: - self.flush() + self.flush_all() def process_tx(self, tx_hash, tx): cache = self.utxo_cache @@ -552,13 +551,13 @@ class DB(object): height = bisect_right(self.tx_counts, tx_num) # Is this on disk or unflushed? - if height >= self.db_height: - tx_hashes = self.tx_hashes[height - self.db_height] + if height > self.db_height: + tx_hashes = self.tx_hashes[height - (self.db_height + 1)] tx_hash = tx_hashes[tx_num - self.tx_counts[height - 1]] else: file_pos = tx_num * 32 file_num, offset = divmod(file_pos, self.tx_hash_file_size) - filename = 'hashes{:05d}'.format(file_num) + filename = 'hashes{:04d}'.format(file_num) with self.open_file(filename) as f: f.seek(offset) tx_hash = f.read(32) diff --git a/server/server.py b/server/server.py index 91117f9..50a9e35 100644 --- a/server/server.py +++ b/server/server.py @@ -43,7 +43,7 @@ class BlockCache(object): # Cache target size is in MB. Has little effect on sync time. self.cache_limit = 10 self.daemon_height = 0 - self.fetched_height = db.db_height + self.fetched_height = db.height # Blocks stored in reverse order. Next block is at end of list. self.blocks = [] self.recent_sizes = [] From ba1662bbf7441d4e63e952e68c94ec5098940d12 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Tue, 11 Oct 2016 21:45:48 +0900 Subject: [PATCH 07/17] Add ETA stats. Show daemon height again. --- server/db.py | 59 +++++++++++++++++++++++------------------------- server/server.py | 4 ++-- 2 files changed, 30 insertions(+), 33 deletions(-) diff --git a/server/db.py b/server/db.py index 6b71daf..bfe0b7e 100644 --- a/server/db.py +++ b/server/db.py @@ -26,11 +26,11 @@ UTXO_TX_HASH_LEN = 4 UTXO = namedtuple("UTXO", "tx_num tx_pos tx_hash height value") -def to_4_bytes(value): - return struct.pack(' self.next_cache_check: self.next_cache_check = now + 60 - if self.cache_MB() > self.flush_MB: - self.flush_all() + if self.cache_MB(daemon_height) > self.flush_MB: + self.flush_all(daemon_height) def process_tx(self, tx_hash, tx): cache = self.utxo_cache diff --git a/server/server.py b/server/server.py index 50a9e35..55ef6ab 100644 --- a/server/server.py +++ b/server/server.py @@ -64,11 +64,11 @@ class BlockCache(object): while not self.stop: await asyncio.sleep(1) while self.blocks: - self.db.process_block(self.blocks.pop()) + self.db.process_block(self.blocks.pop(), self.daemon_height) # Release asynchronous block fetching await asyncio.sleep(0) - self.db.flush() + self.db.flush_all(self.daemon_height) async def catch_up(self): self.logger.info('catching up, block cache limit {:d}MB...' From 5653bf75f514a8afa27d6bc06f14f9e59bd2f25d Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Wed, 12 Oct 2016 05:38:24 +0900 Subject: [PATCH 08/17] More robust init --- server/db.py | 21 +++++++++++++-------- server/server.py | 2 +- 2 files changed, 14 insertions(+), 9 deletions(-) diff --git a/server/db.py b/server/db.py index bfe0b7e..e99427b 100644 --- a/server/db.py +++ b/server/db.py @@ -323,21 +323,26 @@ class DB(object): .format(self.flush_MB)) def open_db(self, coin): - self.headers_file = self.open_file('headers', True) - self.txcount_file = self.open_file('txcount', True) - is_new = self.headers_file.seek(0, 2) == 0 - if is_new != (self.txcount_file.seek(0, 2) == 0): - raise self.Error('just one metadata file is zero-length') - db_name = '{}-{}'.format(coin.NAME, coin.NET) - db = plyvel.DB(db_name, create_if_missing=is_new, - error_if_exists=is_new, compression=None) + is_new = False + try: + db = plyvel.DB(db_name, create_if_missing=False, + error_if_exists=False, compression=None) + except: + db = plyvel.DB(db_name, create_if_missing=True, + error_if_exists=True, compression=None) + is_new = True + if is_new: self.logger.info('created new database {}'.format(db_name)) self.flush_state(db) else: self.logger.info('successfully opened database {}'.format(db_name)) self.read_state(db) + + self.headers_file = self.open_file('headers', is_new) + self.txcount_file = self.open_file('txcount', is_new) + return db def flush_state(self, batch): diff --git a/server/server.py b/server/server.py index 55ef6ab..3b81617 100644 --- a/server/server.py +++ b/server/server.py @@ -125,7 +125,7 @@ class BlockCache(object): return False # Convert hex string to bytes and put in memoryview - blocks = [memoryview(bytes.fromhex(block)) for block in blocks] + blocks = [bytes.fromhex(block) for block in blocks] # Reverse order and place at front of list self.blocks = list(reversed(blocks)) + self.blocks From 3fce298835269163d583492e15be99d0b25a4bc3 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Wed, 12 Oct 2016 18:59:15 +0900 Subject: [PATCH 09/17] Better time est. A couple of fixes. --- lib/coins.py | 3 +++ server/db.py | 21 +++++++++++++-------- server/server.py | 1 + 3 files changed, 17 insertions(+), 8 deletions(-) diff --git a/lib/coins.py b/lib/coins.py index 699fa5f..7ebccbb 100644 --- a/lib/coins.py +++ b/lib/coins.py @@ -156,6 +156,9 @@ class Bitcoin(Coin): WIF_BYTE = 0x80 GENESIS_HASH=(b'000000000019d6689c085ae165831e93' b'4ff763ae46a2a6c172b3f1b60a8ce26f') + TX_COUNT = 109611638 + TX_COUNT_HEIGHT = 398055 + TX_PER_BLOCK = 2000 class BitcoinTestnet(Coin): NAME = "Bitcoin" diff --git a/server/db.py b/server/db.py index e99427b..58a22e4 100644 --- a/server/db.py +++ b/server/db.py @@ -379,7 +379,7 @@ class DB(object): last_flush = self.last_flush tx_diff = self.tx_count - self.db_tx_count height_diff = self.height - self.db_height - self.logger.info('starting flush {:,d} txs and {:,d} blocks' + self.logger.info('flushing cache: {:,d} transactions and {:,d} blocks' .format(tx_diff, height_diff)) # Write out the files to the FS before flushing to the DB. If @@ -406,16 +406,21 @@ class DB(object): self.logger.info('flush #{:,d} to height {:,d} took {:,d}s' .format(self.flush_count, self.height, flush_time)) - # Roughly 2500 tx/block at end + # Log handy stats txs_per_sec = int(self.tx_count / self.wall_time) - this_txs_per_sec = int(tx_diff / (self.last_flush - last_flush)) - eta = (daemon_height - self.height) * 2500 / (this_txs_per_sec + 0.01) + this_txs_per_sec = 1 + int(tx_diff / (self.last_flush - last_flush)) + if self.height > self.coin.TX_COUNT_HEIGHT: + tx_est = (daemon_height - self.height) * self.coin.TX_PER_BLOCK + else: + tx_est = ((daemon_height - self.coin.TX_COUNT_HEIGHT) + * self.coin.TX_PER_BLOCK + self.coin.TX_COUNT) + self.logger.info('txs: {:,d} tx/sec since genesis: {:,d}, ' 'since last flush: {:,d}' .format(self.tx_count, txs_per_sec, this_txs_per_sec)) self.logger.info('sync time: {} ETA: {}' .format(formatted_time(self.wall_time), - formatted_time(eta))) + formatted_time(tx_est / this_txs_per_sec))) def flush_to_fs(self): '''Flush the things stored on the filesystem.''' @@ -479,7 +484,7 @@ class DB(object): while cursor < len(hashes): file_num, offset = divmod(file_pos, self.tx_hash_file_size) size = min(len(hashes) - cursor, self.tx_hash_file_size - offset) - filename = 'hashes{:05d}'.format(file_num) + filename = 'hashes{:04d}'.format(file_num) with self.open_file(filename, create=True) as f: f.seek(offset) f.write(hashes[cursor:cursor + size]) @@ -487,7 +492,7 @@ class DB(object): file_pos += size self.tx_hashes = [] - def cache_MB(self, daemon_height): + def cache_size(self, daemon_height): '''Returns the approximate size of the cache, in MB.''' # Good average estimates utxo_cache_size = len(self.utxo_cache.cache) * 187 @@ -529,7 +534,7 @@ class DB(object): now = time.time() if now > self.next_cache_check: self.next_cache_check = now + 60 - if self.cache_MB(daemon_height) > self.flush_MB: + if self.cache_size(daemon_height) > self.flush_MB: self.flush_all(daemon_height) def process_tx(self, tx_hash, tx): diff --git a/server/server.py b/server/server.py index 3b81617..01ee46d 100644 --- a/server/server.py +++ b/server/server.py @@ -80,6 +80,7 @@ class BlockCache(object): if not self.stop: self.logger.info('caught up to height {:d}' .format(self.daemon_height)) + self.db.flush_all(self.daemon_height) def cache_used(self): return sum(len(block) for block in self.blocks) From 4b99ae4e11787096e7d62891fea6a30b0bd210dd Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Thu, 13 Oct 2016 07:16:42 +0900 Subject: [PATCH 10/17] Fix ETA calc --- server/db.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/db.py b/server/db.py index 58a22e4..d97fcb4 100644 --- a/server/db.py +++ b/server/db.py @@ -413,7 +413,8 @@ class DB(object): tx_est = (daemon_height - self.height) * self.coin.TX_PER_BLOCK else: tx_est = ((daemon_height - self.coin.TX_COUNT_HEIGHT) - * self.coin.TX_PER_BLOCK + self.coin.TX_COUNT) + * self.coin.TX_PER_BLOCK + + (self.coin.TX_COUNT - self.tx_count)) self.logger.info('txs: {:,d} tx/sec since genesis: {:,d}, ' 'since last flush: {:,d}' From 370cceab83ce312ff3240bc9717293ac054b8f62 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Thu, 13 Oct 2016 22:10:34 +0900 Subject: [PATCH 11/17] Clean up RPC handling Remove class Handle exceptions properly by cancelling tasks Log what is happening Generalise send interface --- server/server.py | 157 ++++++++++++++++++++--------------------------- server_main.py | 2 + 2 files changed, 67 insertions(+), 92 deletions(-) diff --git a/server/server.py b/server/server.py index 01ee46d..1e96080 100644 --- a/server/server.py +++ b/server/server.py @@ -19,27 +19,37 @@ class Server(object): def __init__(self, env): self.env = env self.db = DB(env) - self.rpc = RPC(env) - self.block_cache = BlockCache(env, self.db, self.rpc) - - def async_tasks(self): - return [ + self.block_cache = BlockCache(env, self.db) + self.tasks = [ asyncio.ensure_future(self.block_cache.catch_up()), asyncio.ensure_future(self.block_cache.process_cache()), ] + loop = asyncio.get_event_loop() + for signame in ('SIGINT', 'SIGTERM'): + loop.add_signal_handler(getattr(signal, signame), + partial(self.on_signal, signame)) + + def on_signal(self, signame): + logging.warning('received {} signal, preparing to shut down' + .format(signame)) + for task in self.tasks: + task.cancel() + + def async_tasks(self): + return self.tasks + class BlockCache(object): '''Requests blocks ahead of time from the daemon. Serves them to the blockchain processor.''' - def __init__(self, env, db, rpc): + def __init__(self, env, db): self.logger = logging.getLogger('BlockCache') self.logger.setLevel(logging.INFO) self.db = db - self.rpc = rpc - self.stop = False + self.rpc_url = env.rpc_url # Cache target size is in MB. Has little effect on sync time. self.cache_limit = 10 self.daemon_height = 0 @@ -49,37 +59,26 @@ class BlockCache(object): self.recent_sizes = [] self.ave_size = 0 - loop = asyncio.get_event_loop() - for signame in ('SIGINT', 'SIGTERM'): - loop.add_signal_handler(getattr(signal, signame), - partial(self.on_signal, signame)) - - def on_signal(self, signame): - logging.warning('Received {} signal, preparing to shut down' - .format(signame)) - self.blocks = [] - self.stop = True + self.logger.info('using RPC URL {}'.format(self.rpc_url)) async def process_cache(self): - while not self.stop: + while True: await asyncio.sleep(1) while self.blocks: self.db.process_block(self.blocks.pop(), self.daemon_height) # Release asynchronous block fetching await asyncio.sleep(0) - self.db.flush_all(self.daemon_height) - async def catch_up(self): self.logger.info('catching up, block cache limit {:d}MB...' .format(self.cache_limit)) - while await self.maybe_prefill(): - await asyncio.sleep(1) - - if not self.stop: + try: + while await self.maybe_prefill(): + await asyncio.sleep(1) self.logger.info('caught up to height {:d}' .format(self.daemon_height)) + finally: self.db.flush_all(self.daemon_height) def cache_used(self): @@ -96,35 +95,26 @@ class BlockCache(object): processing.''' cache_limit = self.cache_limit * 1024 * 1024 while True: - if self.stop: - return False - cache_used = self.cache_used() if cache_used > cache_limit: return True # Keep going by getting a whole new cache_limit of blocks - self.daemon_height = await self.rpc.rpc_single('getblockcount') + self.daemon_height = await self.send_single('getblockcount') max_count = min(self.daemon_height - self.fetched_height, 4000) count = min(max_count, self.prefill_count(cache_limit)) - if not count or self.stop: + if not count: return False # Done catching up first = self.fetched_height + 1 param_lists = [[height] for height in range(first, first + count)] - hashes = await self.rpc.rpc_multi('getblockhash', param_lists) - - if self.stop: - return False + hashes = await self.send_vector('getblockhash', param_lists) # Hashes is an array of hex strings param_lists = [(h, False) for h in hashes] - blocks = await self.rpc.rpc_multi('getblock', param_lists) + blocks = await self.send_vector('getblock', param_lists) self.fetched_height += count - if self.stop: - return False - # Convert hex string to bytes and put in memoryview blocks = [bytes.fromhex(block) for block in blocks] # Reverse order and place at front of list @@ -138,64 +128,47 @@ class BlockCache(object): self.recent_sizes = self.recent_sizes[excess:] self.ave_size = sum(self.recent_sizes) // len(self.recent_sizes) - -class RPC(object): - - def __init__(self, env): - self.logger = logging.getLogger('RPC') - self.logger.setLevel(logging.INFO) - self.rpc_url = env.rpc_url - self.logger.info('using RPC URL {}'.format(self.rpc_url)) - - async def rpc_multi(self, method, param_lists): - payload = [{'method': method, 'params': param_list} - for param_list in param_lists] - while True: - dresults = await self.daemon(payload) - errs = [dresult['error'] for dresult in dresults] - if not any(errs): - return [dresult['result'] for dresult in dresults] - for err in errs: - if err.get('code') == -28: - self.logger.warning('daemon still warming up...') - secs = 10 - break - else: - self.logger.error('daemon returned errors: {}'.format(errs)) - secs = 0 - self.logger.info('sleeping {:d} seconds and trying again...' - .format(secs)) - await asyncio.sleep(secs) - - - async def rpc_single(self, method, params=None): + async def send_single(self, method, params=None): payload = {'method': method} if params: payload['params'] = params - while True: - dresult = await self.daemon(payload) - err = dresult['error'] - if not err: - return dresult['result'] - if err.get('code') == -28: - self.logger.warning('daemon still warming up...') - secs = 10 - else: - self.logger.error('daemon returned error: {}'.format(err)) - secs = 0 - self.logger.info('sleeping {:d} seconds and trying again...' - .format(secs)) - await asyncio.sleep(secs) - - async def daemon(self, payload): + result, = await self.send((payload, )) + return result + + async def send_many(self, mp_pairs): + payload = [{'method': method, 'params': params} + for method, params in mp_pairs] + return await self.send(payload) + + async def send_vector(self, method, params_list): + payload = [{'method': method, 'params': params} + for params in params_list] + return await self.send(payload) + + async def send(self, payload): + assert isinstance(payload, (tuple, list)) + data = json.dumps(payload) while True: try: - async with aiohttp.ClientSession() as session: - async with session.post(self.rpc_url, - data=json.dumps(payload)) as resp: - return await resp.json() + async with aiohttp.request('POST', self.rpc_url, + data = data) as resp: + result = await resp.json() + except asyncio.CancelledError: + raise except Exception as e: - self.logger.error('aiohttp error: {}'.format(e)) + msg = 'aiohttp error: {}'.format(e) + secs = 3 + else: + errs = tuple(item['error'] for item in result) + if not any(errs): + return tuple(item['result'] for item in result) + if any(err.get('code') == -28 for err in errs): + msg = 'daemon still warming up...' + secs = 10 + else: + msg = 'daemon errors: {}'.format(errs) + secs = 1 - self.logger.info('sleeping 1 second and trying again...') - await asyncio.sleep(1) + self.logger.error('{}. Sleeping {:d}s and trying again...' + .format(msg, secs)) + await asyncio.sleep(secs) diff --git a/server_main.py b/server_main.py index 0ebd347..3166d24 100755 --- a/server_main.py +++ b/server_main.py @@ -28,6 +28,8 @@ def main_loop(): loop = asyncio.get_event_loop() try: loop.run_until_complete(asyncio.gather(*tasks)) + except asyncio.CancelledError: + logging.warning('task cancelled; asyncio event loop closing') finally: loop.close() From 6b513a91d01d548425576cb5704ff783d24c6684 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Thu, 13 Oct 2016 22:17:32 +0900 Subject: [PATCH 12/17] CACHE_MB is now the environment var. --- lib/coins.py | 6 +++--- server/db.py | 20 ++++++++++++-------- server/env.py | 2 +- 3 files changed, 16 insertions(+), 12 deletions(-) diff --git a/lib/coins.py b/lib/coins.py index 7ebccbb..b04bec2 100644 --- a/lib/coins.py +++ b/lib/coins.py @@ -156,9 +156,9 @@ class Bitcoin(Coin): WIF_BYTE = 0x80 GENESIS_HASH=(b'000000000019d6689c085ae165831e93' b'4ff763ae46a2a6c172b3f1b60a8ce26f') - TX_COUNT = 109611638 - TX_COUNT_HEIGHT = 398055 - TX_PER_BLOCK = 2000 + TX_COUNT = 142791895 + TX_COUNT_HEIGHT = 420976 + TX_PER_BLOCK = 1600 class BitcoinTestnet(Coin): NAME = "Bitcoin" diff --git a/server/db.py b/server/db.py index d97fcb4..99c196e 100644 --- a/server/db.py +++ b/server/db.py @@ -285,7 +285,7 @@ class DB(object): # Meta self.tx_hash_file_size = 16 * 1024 * 1024 - self.flush_MB = env.flush_MB + self.cache_MB = env.cache_MB self.next_cache_check = 0 self.last_flush = time.time() self.coin = env.coin @@ -320,7 +320,7 @@ class DB(object): self.tx_count, self.flush_count, formatted_time(self.wall_time))) self.logger.info('flushing after cache reaches {:,d} MB' - .format(self.flush_MB)) + .format(self.cache_MB)) def open_db(self, coin): db_name = '{}-{}'.format(coin.NAME, coin.NET) @@ -379,7 +379,7 @@ class DB(object): last_flush = self.last_flush tx_diff = self.tx_count - self.db_tx_count height_diff = self.height - self.db_height - self.logger.info('flushing cache: {:,d} transactions and {:,d} blocks' + self.logger.info('starting flush of {:,d} transactions, {:,d} blocks' .format(tx_diff, height_diff)) # Write out the files to the FS before flushing to the DB. If @@ -440,7 +440,7 @@ class DB(object): key = b'H' + hash168 + flush_id batch.put(key, hist.tobytes()) - self.logger.info('flushed {:,d} history entries in {:,d} addrs...' + self.logger.info('{:,d} history entries in {:,d} addrs' .format(self.history_size, len(self.history))) self.history = defaultdict(partial(array.array, 'I')) @@ -495,12 +495,16 @@ class DB(object): def cache_size(self, daemon_height): '''Returns the approximate size of the cache, in MB.''' - # Good average estimates + # Good average estimates based on traversal of subobjects and + # requesting size from Python (see deep_getsizeof). For + # whatever reason Python O/S mem usage is typically +30% or + # more, so we scale our already bloated object sizes. + one_MB = int(1048576 / 1.3) utxo_cache_size = len(self.utxo_cache.cache) * 187 db_cache_size = len(self.utxo_cache.db_cache) * 105 hist_cache_size = len(self.history) * 180 + self.history_size * 4 - utxo_MB = (db_cache_size + utxo_cache_size) // 1048576 - hist_MB = hist_cache_size // 1048576 + utxo_MB = (db_cache_size + utxo_cache_size) // one_MB + hist_MB = hist_cache_size // one_MB cache_MB = utxo_MB + hist_MB self.logger.info('cache stats at height {:,d} daemon height: {:,d}' @@ -535,7 +539,7 @@ class DB(object): now = time.time() if now > self.next_cache_check: self.next_cache_check = now + 60 - if self.cache_size(daemon_height) > self.flush_MB: + if self.cache_size(daemon_height) > self.cache_MB: self.flush_all(daemon_height) def process_tx(self, tx_hash, tx): diff --git a/server/env.py b/server/env.py index ec0e15c..5e20586 100644 --- a/server/env.py +++ b/server/env.py @@ -20,7 +20,7 @@ class Env(object): network = self.default('NETWORK', 'mainnet') self.coin = Coin.lookup_coin_class(coin_name, network) self.db_dir = self.required('DB_DIRECTORY') - self.flush_MB = self.integer('FLUSH_MB', 1000) + self.cache_MB = self.integer('CACHE_MB', 1000) self.rpc_url = self.build_rpc_url() def default(self, envvar, default): From 5f77ff4ef06d1bf52e696a556238f92a0acb2fa7 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Fri, 14 Oct 2016 20:09:47 +0900 Subject: [PATCH 13/17] Flush history separately. We do not yet remove excess history. --- server/db.py | 89 ++++++++++++++++++++++++++++++------------------ server/env.py | 1 + server/server.py | 2 +- 3 files changed, 57 insertions(+), 35 deletions(-) diff --git a/server/db.py b/server/db.py index 99c196e..cb7240f 100644 --- a/server/db.py +++ b/server/db.py @@ -286,14 +286,17 @@ class DB(object): # Meta self.tx_hash_file_size = 16 * 1024 * 1024 self.cache_MB = env.cache_MB + self.hist_MB = env.hist_MB self.next_cache_check = 0 self.last_flush = time.time() + self.last_flush_tx_count = 0 self.coin = env.coin # Chain state (initialize to genesis in case of new DB) self.height = -1 self.tx_count = 0 self.flush_count = 0 + self.utxo_flush_count = 0 self.wall_time = 0 self.tip = self.coin.GENESIS_HASH @@ -315,12 +318,16 @@ class DB(object): # Log state self.logger.info('{}/{} height: {:,d} tx count: {:,d} ' - 'flush count: {:,d} sync time: {}' + 'flush count: {:,d} utxo flush count: {:,d} ' + 'sync time: {}' .format(self.coin.NAME, self.coin.NET, self.height, self.tx_count, self.flush_count, + self.utxo_flush_count, formatted_time(self.wall_time))) - self.logger.info('flushing after cache reaches {:,d} MB' + self.logger.info('flushing all after cache reaches {:,d} MB' .format(self.cache_MB)) + self.logger.info('flushing history cache at {:,d} MB' + .format(self.hist_MB)) def open_db(self, coin): db_name = '{}-{}'.format(coin.NAME, coin.NET) @@ -339,12 +346,28 @@ class DB(object): else: self.logger.info('successfully opened database {}'.format(db_name)) self.read_state(db) + assert self.flush_count == self.utxo_flush_count # FIXME self.headers_file = self.open_file('headers', is_new) self.txcount_file = self.open_file('txcount', is_new) return db + def read_state(self, db): + state = db.get(b'state') + state = ast.literal_eval(state.decode('ascii')) + if state['genesis'] != self.coin.GENESIS_HASH: + raise self.Error('DB genesis hash {} does not match coin {}' + .format(state['genesis_hash'], + self.coin.GENESIS_HASH)) + self.height = state['height'] + self.tx_count = state['tx_count'] + self.tip = state['tip'] + self.flush_count = state['flush_count'] + self.utxo_flush_count = state['utxo_flush_count'] + self.wall_time = state['wall_time'] + self.last_flush_tx_count = self.tx_count + def flush_state(self, batch): '''Flush chain state to the batch.''' now = time.time() @@ -356,47 +379,42 @@ class DB(object): 'tx_count': self.tx_count, 'tip': self.tip, 'flush_count': self.flush_count, + 'utxo_flush_count': self.utxo_flush_count, 'wall_time': self.wall_time, } batch.put(b'state', repr(state).encode('ascii')) - def read_state(self, db): - state = db.get(b'state') - state = ast.literal_eval(state.decode('ascii')) - if state['genesis'] != self.coin.GENESIS_HASH: - raise self.Error('DB genesis hash {} does not match coin {}' - .format(state['genesis_hash'], - self.coin.GENESIS_HASH)) - self.height = state['height'] - self.tx_count = state['tx_count'] - self.tip = state['tip'] - self.flush_count = state['flush_count'] - self.wall_time = state['wall_time'] + def flush(self, daemon_height, flush_utxos=False): + '''Flush out cached state. - def flush_all(self, daemon_height): - '''Flush out all cached state.''' + History is always flushed. UTXOs are flushed if flush_utxos.''' flush_start = time.time() last_flush = self.last_flush - tx_diff = self.tx_count - self.db_tx_count - height_diff = self.height - self.db_height - self.logger.info('starting flush of {:,d} transactions, {:,d} blocks' - .format(tx_diff, height_diff)) - - # Write out the files to the FS before flushing to the DB. If - # the DB transaction fails, the files being too long doesn't - # matter. But if writing the files fails we do not want to - # have updated the DB. Flush state last as it reads the wall - # time. - self.flush_to_fs() + + if flush_utxos: + # Write out the files to the FS before flushing to the DB. + # If the DB transaction fails, the files being too long + # doesn't matter. But if writing the files fails we do + # not want to have updated the DB. Flush state last as it + # reads the wall time. + self.logger.info('flushing UTXOs: {:,d} txs and {:,d} blocks' + .format(self.tx_count - self.db_tx_count, + self.height - self.db_height)) + + self.flush_to_fs() + with self.db.write_batch(transaction=True) as batch: - self.utxo_cache.flush(batch) + if flush_utxos: + self.utxo_cache.flush(batch) + self.utxo_flush_count = self.flush_count + 1 self.flush_history(batch) self.flush_state(batch) self.logger.info('committing transaction...') # The flush succeeded, so update our record of DB state - self.db_tx_count = self.tx_count - self.db_height = self.height + if flush_utxos: + self.db_tx_count = self.tx_count + self.db_height = self.height # Update and put the wall time again - otherwise we drop the # time it took leveldb to commit the batch @@ -407,6 +425,8 @@ class DB(object): .format(self.flush_count, self.height, flush_time)) # Log handy stats + tx_diff = self.tx_count - self.last_flush_tx_count + self.last_flush_tx_count = self.tx_count txs_per_sec = int(self.tx_count / self.wall_time) this_txs_per_sec = 1 + int(tx_diff / (self.last_flush - last_flush)) if self.height > self.coin.TX_COUNT_HEIGHT: @@ -493,7 +513,7 @@ class DB(object): file_pos += size self.tx_hashes = [] - def cache_size(self, daemon_height): + def cache_sizes(self, daemon_height): '''Returns the approximate size of the cache, in MB.''' # Good average estimates based on traversal of subobjects and # requesting size from Python (see deep_getsizeof). For @@ -517,7 +537,7 @@ class DB(object): self.history_size)) self.logger.info(' size: {:,d}MB (UTXOs {:,d}MB hist {:,d}MB)' .format(cache_MB, utxo_MB, hist_MB)) - return cache_MB + return cache_MB, hist_MB def process_block(self, block, daemon_height): self.headers.append(block[:self.coin.HEADER_LEN]) @@ -539,8 +559,9 @@ class DB(object): now = time.time() if now > self.next_cache_check: self.next_cache_check = now + 60 - if self.cache_size(daemon_height) > self.cache_MB: - self.flush_all(daemon_height) + cache_MB, hist_MB = self.cache_sizes(daemon_height) + if cache_MB >= self.cache_MB or hist_MB >= self.hist_MB: + self.flush(daemon_height, cache_MB >= self.cache_MB) def process_tx(self, tx_hash, tx): cache = self.utxo_cache diff --git a/server/env.py b/server/env.py index 5e20586..d17c0a5 100644 --- a/server/env.py +++ b/server/env.py @@ -21,6 +21,7 @@ class Env(object): self.coin = Coin.lookup_coin_class(coin_name, network) self.db_dir = self.required('DB_DIRECTORY') self.cache_MB = self.integer('CACHE_MB', 1000) + self.hist_MB = self.integer('HIST_MB', 250) self.rpc_url = self.build_rpc_url() def default(self, envvar, default): diff --git a/server/server.py b/server/server.py index 1e96080..5bd15ef 100644 --- a/server/server.py +++ b/server/server.py @@ -79,7 +79,7 @@ class BlockCache(object): self.logger.info('caught up to height {:d}' .format(self.daemon_height)) finally: - self.db.flush_all(self.daemon_height) + self.db.flush(self.daemon_height, True) def cache_used(self): return sum(len(block) for block in self.blocks) From 46b9519ee004a38f673898c471a5bf949c7f294f Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Fri, 14 Oct 2016 20:17:09 +0900 Subject: [PATCH 14/17] Adjust sleeps. --- server/server.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/server/server.py b/server/server.py index 5bd15ef..8efde5f 100644 --- a/server/server.py +++ b/server/server.py @@ -163,11 +163,11 @@ class BlockCache(object): if not any(errs): return tuple(item['result'] for item in result) if any(err.get('code') == -28 for err in errs): - msg = 'daemon still warming up...' - secs = 10 + msg = 'daemon still warming up.' + secs = 30 else: msg = 'daemon errors: {}'.format(errs) - secs = 1 + secs = 3 self.logger.error('{}. Sleeping {:d}s and trying again...' .format(msg, secs)) From cbe1ef60ca0be494b4199ebd38776e362363d7f0 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Fri, 14 Oct 2016 20:26:10 +0900 Subject: [PATCH 15/17] Better logging --- server/db.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/db.py b/server/db.py index cb7240f..3af0225 100644 --- a/server/db.py +++ b/server/db.py @@ -400,8 +400,9 @@ class DB(object): self.logger.info('flushing UTXOs: {:,d} txs and {:,d} blocks' .format(self.tx_count - self.db_tx_count, self.height - self.db_height)) - self.flush_to_fs() + else: + self.logger.info('commencing history flush') with self.db.write_batch(transaction=True) as batch: if flush_utxos: From 34096a02e9576cf58c4198674dd9c06b6252693c Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Sat, 15 Oct 2016 08:42:42 +0900 Subject: [PATCH 16/17] Recovery from excess history flushes --- server/db.py | 75 +++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 54 insertions(+), 21 deletions(-) diff --git a/server/db.py b/server/db.py index 3af0225..97d558c 100644 --- a/server/db.py +++ b/server/db.py @@ -119,9 +119,14 @@ class UTXOCache(object): hash168s.add(hash168) key = tx_hash + pack('H', key[-2:]) + if flush_id > self.utxo_flush_count: + keys.append(key) + + self.logger.info('deleting {:,d} history entries'.format(len(keys))) + with db.write_batch(transaction=True) as batch: + for key in keys: + db.delete(key) + self.utxo_flush_count = self.flush_count + self.flush_state(batch) + self.logger.info('deletion complete') def flush_state(self, batch): '''Flush chain state to the batch.''' @@ -375,8 +410,8 @@ class DB(object): self.last_flush = now state = { 'genesis': self.coin.GENESIS_HASH, - 'height': self.height, - 'tx_count': self.tx_count, + 'height': self.db_height, + 'tx_count': self.db_tx_count, 'tip': self.tip, 'flush_count': self.flush_count, 'utxo_flush_count': self.utxo_flush_count, @@ -405,18 +440,16 @@ class DB(object): self.logger.info('commencing history flush') with self.db.write_batch(transaction=True) as batch: + # History first - fast and frees memory + self.flush_history(batch) if flush_utxos: self.utxo_cache.flush(batch) - self.utxo_flush_count = self.flush_count + 1 - self.flush_history(batch) + self.utxo_flush_count = self.flush_count + self.db_tx_count = self.tx_count + self.db_height = self.height self.flush_state(batch) self.logger.info('committing transaction...') - # The flush succeeded, so update our record of DB state - if flush_utxos: - self.db_tx_count = self.tx_count - self.db_height = self.height - # Update and put the wall time again - otherwise we drop the # time it took leveldb to commit the batch self.flush_state(self.db) @@ -531,7 +564,7 @@ class DB(object): self.logger.info('cache stats at height {:,d} daemon height: {:,d}' .format(self.height, daemon_height)) self.logger.info(' entries: UTXO: {:,d} DB: {:,d} ' - 'hist count: {:,d} hist size: {:,d}' + 'hist addrs: {:,d} hist size: {:,d}' .format(len(self.utxo_cache.cache), len(self.utxo_cache.db_cache), len(self.history), From 4cf6a96707e02936e71623a62313d06235d9a2ca Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Sat, 15 Oct 2016 12:26:55 +0900 Subject: [PATCH 17/17] Update notes etc. --- HOWTO.rst | 130 ++++++++++++--------------------- README.rst | 35 ++++----- samples/scripts/NOTES | 27 ++++++- samples/scripts/env/CACHE_MB | 1 + samples/scripts/env/FLUSH_SIZE | 1 - samples/scripts/env/HIST_MB | 1 + 6 files changed, 88 insertions(+), 107 deletions(-) create mode 100644 samples/scripts/env/CACHE_MB delete mode 100644 samples/scripts/env/FLUSH_SIZE create mode 100644 samples/scripts/env/HIST_MB diff --git a/HOWTO.rst b/HOWTO.rst index e9d510a..75b930a 100644 --- a/HOWTO.rst +++ b/HOWTO.rst @@ -107,8 +107,8 @@ You can see its logs with:: tail -F /path/to/log/dir/current | tai64nlocal -Progress -======== +Sync Progress +============= Speed indexing the blockchain depends on your hardware of course. As Python is single-threaded most of the time only 1 core is kept busy. @@ -120,30 +120,20 @@ may even be beneficial to have the daemon on a separate machine so the machine doing the indexing is focussing on the one task and not the wider network. -The FLUSH_SIZE environment variable is an upper bound on how much -unflushed data is cached before writing to disk + leveldb. The -default is 4 million items, which is probably fine unless your -hardware is quite poor. If you've got a really fat machine with lots -of RAM, 10 million or even higher is likely good (I used 10 million on -Machine B below without issue so far). A higher number will have -fewer flushes and save your disk thrashing, but you don't want it so -high your machine is swapping. If your machine loses power all -synchronization since the previous flush is lost. - -When syncing, ElectrumX is CPU bound over 70% of the time, with the -rest being bursts of disk activity whilst flushing. Here is my -experience with the current codebase, to given heights and rough -wall-time:: +The HIST_MB and CACHE_MB environment variables control cache sizes +before they spill to disk; see the NOTES file under samples/scripts. + +Here is my experience with the current codebase, to given heights and +rough wall-time:: Machine A Machine B DB + Metadata - 100,000 2m 30s 0 (unflushed) - 150,000 35m 4m 30s 0.2 GiB - 180,000 1h 5m 9m 0.4 GiB - 245,800 3h 1h 30m 2.7 GiB - 290,000 13h 15m 3h 5m 3.3 GiB - 307,000 17h 16m 3h 50m 4.1 GiB - 343,000 6h 54m 6.0 GiB - 386,600 17h 07m 7.0 GiB + 180,000 7m 10s 0.4 GiB + 245,800 1h 00m 2.7 GiB + 290,000 1h 56m 3.3 GiB + 343,000 3h 56m 6.0 GiB + 386,000 7h 28m 7.0 GiB + 404,000 9h 41m + 434,369 14h 38m 17.1 GiB Machine A: a low-spec 2011 1.6GHz AMD E-350 dual-core fanless CPU, 8GB RAM and a DragonFlyBSD HAMMER fileystem on an SSD. It requests blocks @@ -157,10 +147,6 @@ quad-core Intel i5 CPU with an HDD and 24GB RAM. Running bitcoind on the same machine. FLUSH_SIZE of 10 million. First flush at height 195,146. -Transactions processed per second seems to gradually decrease over -time but this statistic is not currently logged and I've not looked -closely. - For chains other than bitcoin-mainnet sychronization should be much faster. @@ -184,19 +170,11 @@ with the intent that, to the extent this atomicity guarantee holds, the database should not get corrupted even if the ElectrumX process if forcibly killed or there is loss of power. The worst case is losing unflushed in-memory blockchain processing and having to restart from -the state as of the prior successfully completed flush. - -During development I have terminated ElectrumX processes in various -ways and at random times, and not once have I had any corruption as a -result of doing so. Mmy only DB corruption has been through buggy -code. If you do have any database corruption as a result of -terminating the process without modifying the code I would be very -interested in hearing details. +the state as of the prior successfully completed UTXO flush. -I have heard about corruption issues with electrum-server. I cannot -be sure but with a brief look at the code it does seem that if -interrupted at the wrong time the databases it uses could become -inconsistent. +If you do have any database corruption as a result of terminating the +process (without modifying the code) I would be interested in the +details. Once the process has terminated, you can start it up again with:: @@ -211,7 +189,6 @@ same service directory, such as a testnet or altcoin server. See the man pages of these various commands for more information. - Understanding the Logs ====================== @@ -221,49 +198,38 @@ You can see the logs usefully like so:: Here is typical log output on startup:: - 2016-10-08 14:46:48.088516500 Launching ElectrumX server... - 2016-10-08 14:46:49.145281500 INFO:root:ElectrumX server starting - 2016-10-08 14:46:49.147215500 INFO:root:switching current directory to /var/nohist/server-test - 2016-10-08 14:46:49.150765500 INFO:DB:using flush size of 1,000,000 entries - 2016-10-08 14:46:49.156489500 INFO:DB:created new database Bitcoin-mainnet - 2016-10-08 14:46:49.157531500 INFO:DB:flushing to levelDB 0 txs and 0 blocks to height -1 tx count: 0 - 2016-10-08 14:46:49.158640500 INFO:DB:flushed. Cache hits: 0/0 writes: 5 deletes: 0 elided: 0 sync: 0d 00h 00m 00s - 2016-10-08 14:46:49.159508500 INFO:RPC:using RPC URL http://user:pass@192.168.0.2:8332/ - 2016-10-08 14:46:49.167352500 INFO:BlockCache:catching up, block cache limit 10MB... - 2016-10-08 14:46:49.318374500 INFO:BlockCache:prefilled 10 blocks to height 10 daemon height: 433,401 block cache size: 2,150 - 2016-10-08 14:46:50.193962500 INFO:BlockCache:prefilled 4,000 blocks to height 4,010 daemon height: 433,401 block cache size: 900,043 - 2016-10-08 14:46:51.253644500 INFO:BlockCache:prefilled 4,000 blocks to height 8,010 daemon height: 433,401 block cache size: 1,600,613 - 2016-10-08 14:46:52.195633500 INFO:BlockCache:prefilled 4,000 blocks to height 12,010 daemon height: 433,401 block cache size: 2,329,325 - -Under normal operation these prefill messages repeat fairly regularly. -Occasionally (depending on how big your FLUSH_SIZE environment -variable was set, and your hardware, this could be anything from every -5 minutes to every hour) you will get a flush to disk that begins with: - - 2016-10-08 06:34:20.841563500 INFO:DB:flushing to levelDB 828,190 txs and 3,067 blocks to height 243,982 tx count: 20,119,669 - -During the flush, which can take many minutes, you may see logs like -this: - - 2016-10-08 12:20:08.558750500 INFO:DB:address 1dice7W2AicHosf5EL3GFDUVga7TgtPFn hist moving to idx 3000 - -These are just informational messages about addresses that have very -large histories that are generated as those histories are being -written out. After the flush has completed a few stats are printed -about cache hits, the number of writes and deletes, and the number of -writes that were elided by the cache:: - - 2016-10-08 06:37:41.035139500 INFO:DB:flushed. Cache hits: 3,185,958/192,336 writes: 781,526 deletes: 465,236 elided: 3,185,958 sync: 0d 06h 57m 03s + 2016-10-14 20:22:10.747808500 Launching ElectrumX server... + 2016-10-14 20:22:13.032415500 INFO:root:ElectrumX server starting + 2016-10-14 20:22:13.032633500 INFO:root:switching current directory to /Users/neil/server-btc + 2016-10-14 20:22:13.038495500 INFO:DB:created new database Bitcoin-mainnet + 2016-10-14 20:22:13.038892500 INFO:DB:Bitcoin/mainnet height: -1 tx count: 0 flush count: 0 utxo flush count: 0 sync time: 0d 00h 00m 00s + 2016-10-14 20:22:13.038935500 INFO:DB:flushing all after cache reaches 2,000 MB + 2016-10-14 20:22:13.038978500 INFO:DB:flushing history cache at 400 MB + 2016-10-14 20:22:13.039076500 INFO:BlockCache:using RPC URL http://user:password@192.168.0.2:8332/ + 2016-10-14 20:22:13.039796500 INFO:BlockCache:catching up, block cache limit 10MB... + 2016-10-14 20:22:14.092192500 INFO:DB:cache stats at height 0 daemon height: 434,293 + 2016-10-14 20:22:14.092243500 INFO:DB: entries: UTXO: 1 DB: 0 hist count: 1 hist size: 1 + 2016-10-14 20:22:14.092288500 INFO:DB: size: 0MB (UTXOs 0MB hist 0MB) + 2016-10-14 20:22:32.302394500 INFO:UTXO:duplicate tx hash d5d27987d2a3dfc724e359870c6644b40e497bdc0589a033220fe15429d88599 + 2016-10-14 20:22:32.310441500 INFO:UTXO:duplicate tx hash e3bf3d07d4b0375638d5f1db5255fe07ba2c4cb067cd81b84ee974b6585fb468 + 2016-10-14 20:23:14.094855500 INFO:DB:cache stats at height 125,278 daemon height: 434,293 + 2016-10-14 20:23:14.095026500 INFO:DB: entries: UTXO: 191,155 DB: 0 hist count: 543,455 hist size: 1,394,187 + 2016-10-14 20:23:14.095028500 INFO:DB: size: 172MB (UTXOs 44MB hist 128MB) + +Under normal operation these cache stats repeat roughly every minute. +Flushes can take many minutes and look like this:: + + 2016-10-14 21:30:29.085479500 INFO:DB:flushing UTXOs: 22,910,848 txs and 254,753 blocks + 2016-10-14 21:32:05.383413500 INFO:UTXO:UTXO cache adds: 55,647,862 spends: 48,751,219 + 2016-10-14 21:32:05.383460500 INFO:UTXO:UTXO DB adds: 6,875,315 spends: 0. Collisions: hash168: 268 UTXO: 0 + 2016-10-14 21:32:07.056008500 INFO:DB:6,982,386 history entries in 1,708,991 addrs + 2016-10-14 21:32:08.169468500 INFO:DB:committing transaction... + 2016-10-14 21:33:17.644296500 INFO:DB:flush #11 to height 254,752 took 168s + 2016-10-14 21:33:17.644357500 INFO:DB:txs: 22,910,848 tx/sec since genesis: 5,372, since last flush: 3,447 + 2016-10-14 21:33:17.644536500 INFO:DB:sync time: 0d 01h 11m 04s ETA: 0d 11h 22m 42s After flush-to-disk you may see an aiohttp error; this is the daemon timing out the connection while the disk flush was in progress. This is harmless; I intend to fix this soon by yielding whilst flushing. -You may see one or two logs about UTXOs or hash160 key collisions:: - - 2016-10-08 07:24:34.068609500 INFO:DB:UTXO compressed key collision at height 252943 utxo 115cc1408e5321636675a8fcecd204661a6f27b4b7482b1b7c4402ca4b94b72f / 1 - -These are informational messages about an artefact of the compression -scheme ElectrumX uses and are harmless. However, if you see more than -a handful of these, particularly close together, something is very -wrong and your DB is probably corrupt. +The ETA is just a guide and can be quite volatile. diff --git a/README.rst b/README.rst index 0364edb..ce6955f 100644 --- a/README.rst +++ b/README.rst @@ -41,11 +41,12 @@ Implementation ElectrumX does not currently do any pruning. With luck it may never become necessary. So how does it achieve a much more compact database -than Electrum server, which throws away a lot of information? And -sync faster to boot? +than Electrum server, which prunes a lot of hisory, and also sync +faster? All of the following likely play a part: +- aggressive caching and batching of DB writes - more compact representation of UTXOs, the mp address index, and history. Electrum server stores full transaction hash and height for all UTXOs. In its pruned history it does the same. ElectrumX @@ -58,32 +59,22 @@ All of the following likely play a part: disk rather than in levelDB. It would be nice to do this for histories but I cannot think how they could be easily indexable on a filesystem. - avoiding unnecessary or redundant computations -- more efficient memory usage - through more compact data structures and - and judicious use of memoryviews -- big caches (controlled via FLUSH_SIZE) +- more efficient memory usage - asyncio and asynchronous prefetch of blocks. With luck ElectrumX will have no need of threads or locking primitives - because it prunes electrum-server needs to store undo information, ElectrumX should does not need to store undo information for blockchain reorganisations (note blockchain reorgs are not yet implemented in ElectrumX) -- finally electrum-server maintains a patricia tree of UTXOs. My - understanding is this is for future features and not currently - required. It's unclear precisely how this will be used or what - could replace or duplicate its functionality in ElectrumX. Since - ElectrumX stores all necessary blockchain metadata some solution - should exist. - - -Future/TODO -=========== - -- handling blockchain reorgs -- handling client connections (heh!) -- investigating leveldb space / speed tradeoffs -- seeking out further efficiencies. ElectrumX is CPU bound; it would not - surprise me if there is a way to cut CPU load by 10-20% more. To squeeze - even more out would probably require some things to move to C or C++. + + +Roadmap +======= + +- test a few more performance improvement ideas +- handle blockchain reorgs +- handle client connections +- potentially move some functionality to C or C++ Once I get round to writing the server part, I will add DoS protections if necessary to defend against requests for large diff --git a/samples/scripts/NOTES b/samples/scripts/NOTES index 20c6284..2287788 100644 --- a/samples/scripts/NOTES +++ b/samples/scripts/NOTES @@ -11,5 +11,28 @@ connecting to the daemon, or you must specify RPC_HOST, RPC_USER, RPC_PASSWORD and optionally RPC_PORT (it defaults appropriately for the coin and network otherwise). -The other environment variables are all optional and will adopt sensible defaults if not -specified. +The other environment variables are all optional and will adopt +sensible defaults if not specified. + +Your performance might change by tweaking these cache settings. Cache +size is only checked roughly every minute, so the caches can grow +beyond the specified size. Also the Python process is often quite a +bit bigger than the combine cache size, because of Python overhead and +also because leveldb can consume quite a lot of memory during UTXO +flushing. So these are rough numbers only: + +HIST_MB - amount of history cache, in MB, to retain before flushing to + disk. Default is 250; probably no benefit being much larger + as history is append-only and not searched. + +CACHE_MB- amount of UTXO and history cache, in MB, to retain before + flushing to disk. Default is 1000. This may be too large + for small boxes or too small for machines with lots of RAM. + Larger caches generally perform better as there is + significant searching of the UTXO cache during indexing. + However, I don't see much benefit in my tests pushing this + beyond 2000, and in fact beyond there performance begins to + fall. My machine has 24GB RAM; the slow down is probably + because of leveldb caching and Python GC effects. However + this may be very dependent on hardware and you may have + different results. \ No newline at end of file diff --git a/samples/scripts/env/CACHE_MB b/samples/scripts/env/CACHE_MB new file mode 100644 index 0000000..83b33d2 --- /dev/null +++ b/samples/scripts/env/CACHE_MB @@ -0,0 +1 @@ +1000 diff --git a/samples/scripts/env/FLUSH_SIZE b/samples/scripts/env/FLUSH_SIZE deleted file mode 100644 index d508e66..0000000 --- a/samples/scripts/env/FLUSH_SIZE +++ /dev/null @@ -1 +0,0 @@ -4000000 diff --git a/samples/scripts/env/HIST_MB b/samples/scripts/env/HIST_MB new file mode 100644 index 0000000..cb1a40d --- /dev/null +++ b/samples/scripts/env/HIST_MB @@ -0,0 +1 @@ +250