diff --git a/README.rst b/README.rst index a903304..05edc14 100644 --- a/README.rst +++ b/README.rst @@ -50,52 +50,75 @@ testnets, of course. Implementation ============== -ElectrumX does not currently do any pruning. With luck it may never -become necessary. So how does it achieve a much more compact database -than Electrum server, which prunes a lot of hisory, and also sync -faster? +ElectrumX does not do any pruning or throwing away of history. It +will retain this property for as long as feasible, and I believe it is +efficiently achievable for the forseeable future with plain Python. -All of the following likely play a part: +So how does it achieve a much more compact database than Electrum +server, which is forced to prune hisory for busy addresses, and yet +sync roughly 2 orders of magnitude faster? + +I believe all of the following play a part: - aggressive caching and batching of DB writes -- more compact representation of UTXOs, the address index, and - history. Electrum server stores full transaction hash and height - for all UTXOs. In its pruned history it does the same. ElectrumX - just stores the transaction number in the linear history of - transactions. For at least another 5 years the transaction number - will fit in a 4-byte integer. ElectrumX calculates the height from - a simple lookup in a linear array which is stored on disk. - ElectrumX also stores transaction hashes in a linear array on disk. -- storing static append-only metadata which is indexed by position on - disk rather than in levelDB. It would be nice to do this for histories - but I cannot think how they could be easily indexable on a filesystem. -- avoiding unnecessary or redundant computations -- more efficient memory usage -- asyncio and asynchronous prefetch of blocks. - -ElectrumX should not have any need of threads. - - -Roadmap -======= - -- come up with UTXO root logic and implement it -- test a few more performance improvement ideas -- implement light caching of client responses -- yield during expensive requests and/or penalize the connection +- more compact and efficient representation of UTXOs, address index, + and history. Electrum-Server stores full transaction hash and + height for each UTXO, and does the same in its pruned history. In + contrast ElectrumX just stores the transaction number in the linear + history of transactions. For at least another 5 years this + transaction number will fit in a 4-byte integer, and when necessary + expanding to 5 or 6 bytes is trivial. ElectrumX can determine block + height from a simple binary search of tx counts stored on disk. + ElectrumX stores historical transaction hashes in a linear array on + disk. +- placing static append-only metadata indexable by position on disk + rather than in levelDB. It would be nice to do this for histories + but I cannot think of a way. +- avoiding unnecessary or redundant computations, such as converting + address hashes to human-readable ASCII strings with expensive bignum + arithmetic, and then back again. +- better choice of Python data structures giving lower memory usage as + well as faster traversal +- leveraging asyncio for asynchronous prefetch of blocks to mostly + eliminate CPU idling. As a Python program ElectrumX is unavoidably + single-threaded in its essence; we must keep that CPU core busy. + +Python's asyncio means ElectrumX has no (direct) use for threads and +associated complications. I cannot foresee any case where they might +be necessary. + + +Roadmap Pre-1.0 +=============== + +- minor code cleanups +- minor additions of missing functionality +- logging improvements, mostly post-sync. Pre-sync logs seem decent. +- at most 1 more DB format change; I will make a weak attempt to + retain 0.6 release's DB format if possible +- provision of configurable ways to limit client connections so as to + mitigate intentional or unintentional degradation of server response + time to other clients. Based on IRC discussion this will likely be a + combination of address subscription and bandwidth limits. + + +Roadmap Post-1.0 +================ + +- UTXO root logic and implementation - improve DB abstraction so LMDB is not penalized +- investigate effects of cache defaults and DB configuration defaults + on sync time and simplify / optimize the default config accordingly - potentially move some functionality to C or C++ -The above are in no particular order. - Database Format =============== -The database and metadata formats of ElectrumX are certain to change -in the future. Such a change will render old DBs unusable. For now I -do not intend to provide converters as this is still non-production -software. Moreover from-genesis sync time is quite bearable. +The database and metadata formats of ElectrumX are likely to change. +Such changes will render old DBs unusable. At least until 1.0 I do +not intend to provide converters; moreover from-genesis sync time to +create a pristine database is quite tolerable. Miscellany diff --git a/samples/scripts/NOTES b/docs/ENV-NOTES similarity index 98% rename from samples/scripts/NOTES rename to docs/ENV-NOTES index 8e336d4..14ab73a 100644 --- a/samples/scripts/NOTES +++ b/docs/ENV-NOTES @@ -54,7 +54,7 @@ you set the sum of these to nothing over half your available physical RAM: HIST_MB - amount of history cache, in MB, to retain before flushing to - disk. Default is 250; probably no benefit being much larger + disk. Default is 300; probably no benefit being much larger as history is append-only and not searched. UTXO_MB - amount of UTXO and history cache, in MB, to retain before diff --git a/docs/HOWTO.rst b/docs/HOWTO.rst index ad468c1..3f3fa5a 100644 --- a/docs/HOWTO.rst +++ b/docs/HOWTO.rst @@ -102,17 +102,17 @@ Then copy the all sample scripts from the ElectrumX source tree there:: cp -R /path/to/repo/electrumx/samples/scripts ~/scripts/electrumx -This copies 4 things: the top level server run script, a log/ directory -with the logger run script, an env/ directory, and a NOTES file. +This copies 3 things: the top level server run script, a log/ directory +with the logger run script, an env/ directory. You need to configure the environment variables under env/ to your -setup, as explained in NOTES. ElectrumX server currently takes no -command line arguments; all of its configuration is taken from its -environment which is set up according to env/ directory (see 'envdir' -man page). Finally you need to change the log/run script to use the -directory where you want the logs to be written by multilog. The -directory need not exist as multilog will create it, but its parent -directory must exist. +setup, as explained in docs/ENV-NOTES. ElectrumX server currently +takes no command line arguments; all of its configuration is taken +from its environment which is set up according to env/ directory (see +'envdir' man page). Finally you need to change the log/run script to +use the directory where you want the logs to be written by multilog. +The directory need not exist as multilog will create it, but its +parent directory must exist. Now start the 'svscan' process. This will not do much as the service directory is still empty:: @@ -143,7 +143,7 @@ The sample unit file assumes that the repository is located at change the unit file accordingly. You need to set a few configuration variables in :code:`/etc/electrumx.conf`, -see `samples/NOTES` for the list of required variables. +see `docs/ENV-NOTES` for the list of required variables. Now you can start ElectrumX using :code:`systemctl`:: @@ -172,7 +172,7 @@ machine doing the indexing is focussing on the one task and not the wider network. The HIST_MB and CACHE_MB environment variables control cache sizes -before they spill to disk; see the NOTES file under samples/scripts. +before they spill to disk; see the ENV-NOTES file under docs/. Here is my experience with the current codebase, to given heights and rough wall-time:: diff --git a/docs/RELEASE-NOTES b/docs/RELEASE-NOTES index a599d6c..214c459 100644 --- a/docs/RELEASE-NOTES +++ b/docs/RELEASE-NOTES @@ -1,3 +1,24 @@ +version 0.6.0 +------------- + +- DB format has changed again. This doesn't give a performance gain + or reduction that I could measure, but is cleaner in that each table + entry is now a singleton and not an array, which I much prefer as a + cleaner solution. It may enable other goodness in the future. +- Logging is much less noisy when serving clients. In fact anything + in your logs that isn't just status updates probably is a bug that I + would like to know about. Unfortunately clean shutdown whilst + serving clients leads to massive log spew. This is harmless and I + believe because of my noob status with asyncio. I intend to fix + this in a nearby release. +- expensive client requests are intended to yield to other requests + sufficiently frequently that there should be no noticeable delays or + pauses under normal load from hog clients. +- Notifications to hog clients are now queued in sequence with their + request responses. They used to be sent immediately regardless of + pending requests which seems less than ideal. +- some trivial improvements and fixes to local RPC query output + version 0.5.1 ------------- diff --git a/electrumx_rpc.py b/electrumx_rpc.py index 06aa56f..5f02bbf 100755 --- a/electrumx_rpc.py +++ b/electrumx_rpc.py @@ -43,7 +43,7 @@ class RPCClient(asyncio.Protocol): return ('{:3d}:{:02d}:{:02d}' .format(t // 3600, (t % 3600) // 60, t % 60)) - fmt = ('{:<4} {:>23} {:>15} {:>5} ' + fmt = ('{:<4} {:>23} {:>15} {:>7} ' '{:>7} {:>7} {:>7} {:>7} {:>5} {:>9}') print(fmt.format('Type', 'Peer', 'Client', 'Subs', 'Recv #', 'Recv KB', 'Sent #', 'Sent KB', diff --git a/electrumx_server.py b/electrumx_server.py index d851aa6..d1ef476 100755 --- a/electrumx_server.py +++ b/electrumx_server.py @@ -19,6 +19,9 @@ from functools import partial from server.env import Env from server.protocol import BlockServer +SUPPRESS_MESSAGES = [ + 'Fatal read error on socket transport', +] def main_loop(): '''Start the server.''' @@ -34,6 +37,14 @@ def main_loop(): logging.warning('received {} signal, shutting down'.format(signame)) future.cancel() + def on_exception(loop, context): + '''Suppress spurious messages it appears we cannot control.''' + message = context.get('message') + if not message in SUPPRESS_MESSAGES: + if not ('task' in context and + 'accept_connection2()' in repr(context.get('task'))): + loop.default_exception_handler(context) + server = BlockServer(Env()) future = asyncio.ensure_future(server.main_loop()) @@ -42,6 +53,8 @@ def main_loop(): loop.add_signal_handler(getattr(signal, signame), partial(on_signal, signame)) + # Install exception handler + loop.set_exception_handler(on_exception) loop.run_until_complete(future) loop.close() diff --git a/samples/scripts/env/HIST_MB b/samples/scripts/env/HIST_MB index cb1a40d..697cb3a 100644 --- a/samples/scripts/env/HIST_MB +++ b/samples/scripts/env/HIST_MB @@ -1 +1 @@ -250 +300 diff --git a/server/block_processor.py b/server/block_processor.py index 63a3c88..3761609 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -353,9 +353,8 @@ class BlockProcessor(server.db.DB): # UTXO cache self.utxo_cache = {} - self.db_cache = {} self.utxo_cache_spends = 0 - self.db_deletes = 0 + self.db_deletes = [] # Log state self.logger.info('{}/{} height: {:,d} tx count: {:,d} ' @@ -540,7 +539,7 @@ class BlockProcessor(server.db.DB): assert self.height == self.fs_height == self.db_height assert not self.history assert not self.utxo_cache - assert not self.db_cache + assert not self.db_deletes def flush(self, flush_utxos=False, flush_history=None): '''Flush out cached state. @@ -708,15 +707,16 @@ class BlockProcessor(server.db.DB): # more, so we scale our already bloated object sizes. one_MB = int(1048576 / 1.3) utxo_cache_size = len(self.utxo_cache) * 187 - db_cache_size = len(self.db_cache) * 105 + db_deletes_size = len(self.db_deletes) * 61 hist_cache_size = len(self.history) * 180 + self.history_size * 4 tx_hash_size = (self.tx_count - self.fs_tx_count) * 74 - utxo_MB = (db_cache_size + utxo_cache_size) // one_MB + utxo_MB = (db_deletes_size + utxo_cache_size) // one_MB hist_MB = (hist_cache_size + tx_hash_size) // one_MB self.logger.info('UTXOs: {:,d} deletes: {:,d} ' 'UTXOs {:,d}MB hist {:,d}MB' - .format(len(self.utxo_cache), self.db_deletes, + .format(len(self.utxo_cache), + len(self.db_deletes) // 2, utxo_MB, hist_MB)) self.logger.info('our height: {:,d} daemon height: {:,d}' .format(self.height, self.daemon.cached_height())) @@ -915,17 +915,18 @@ class BlockProcessor(server.db.DB): To this end we maintain two "tables", one for each point above: - 1. Key: b'u' + address_hash168 + tx_num + tx_idx + 1. Key: b'u' + address_hash168 + tx_idx + tx_num Value: the UTXO value as a 64-bit unsigned integer - 2. Key: b'h' + compressed_tx_hash + tx_idx - Value: [address_hash168 + tx_num] + 2. Key: b'h' + compressed_tx_hash + tx_idx + tx_num + Value: hash168 The compressed tx hash is just the first few bytes of the hash of the tx in which the UTXO was created. As this is not unique there - will are potential collisions when saving and looking up UTXOs; - hence why the second table has a list as its value. The collision - can be resolved with the tx_num. The collision rate is low (<0.1%). + will be potential collisions so tx_num is also in the key. When + looking up a UTXO the prefix space of the compressed hash needs to + be searched and resolved if necessary with the tx_num. The + collision rate is low (<0.1%). ''' def spend_utxo(self, tx_hash, tx_idx): @@ -942,55 +943,36 @@ class BlockProcessor(server.db.DB): self.utxo_cache_spends += 1 return cache_value - # Spend it from the DB. Read the UTXO through the cache - # because compressed keys can collide. - # The 4 is the COMPRESSED_TX_HASH_LEN - db_key = b'h' + tx_hash[:4] + idx_packed - db_value = self.db_cache_get(db_key) - if db_value: - # FIXME: this matches what we did previously but until we store - # all UTXOs isn't safe - if len(db_value) == 25: - udb_key = b'u' + db_value + idx_packed - utxo_value_packed = self.db.get(udb_key) - if utxo_value_packed: - # Remove the UTXO from both tables - self.db_deletes += 1 - self.db_cache[db_key] = None - self.db_cache[udb_key] = None - return db_value + utxo_value_packed - # Fall through to below loop for error - - assert len(db_value) % 25 == 0 - - # Find which entry, if any, the TX_HASH matches. - for n in range(0, len(db_value), 25): - tx_num, = unpack(' 1: + tx_num, = unpack(' 1: - self.logger.warning('slow request for {} took {:.1f}s: {}' - .format(self.peername(), secs, - request)) + if isinstance(job, tuple): # Height / mempool notification + await self.notify(*job) + else: + await self.handle_json_request(job) except asyncio.CancelledError: break except Exception: # Getting here should probably be considered a bug and fixed - self.logger.error('error handling request {}'.format(request)) + self.logger.error('error handling request {}'.format(job)) traceback.print_exc() def peername(self, *, for_log=True): @@ -261,8 +260,8 @@ class Session(JSONRPC): return param except ValueError: pass - raise RPCError('parameter should be a transaction hash: {}' - .format(param)) + raise self.RPCError('parameter should be a transaction hash: {}' + .format(param)) def hash168_from_param(self, param): if isinstance(param, str): @@ -270,7 +269,8 @@ class Session(JSONRPC): return self.coin.address_to_hash168(param) except: pass - raise RPCError('parameter should be a valid address: {}'.format(param)) + raise self.RPCError('parameter should be a valid address: {}' + .format(param)) def non_negative_integer_from_param(self, param): try: @@ -281,24 +281,24 @@ class Session(JSONRPC): if param >= 0: return param - raise RPCError('param should be a non-negative integer: {}' - .format(param)) + raise self.RPCError('param should be a non-negative integer: {}' + .format(param)) def extract_hash168(self, params): if len(params) == 1: return self.hash168_from_param(params[0]) - raise RPCError('params should contain a single address: {}' - .format(params)) + raise self.RPCError('params should contain a single address: {}' + .format(params)) def extract_non_negative_integer(self, params): if len(params) == 1: return self.non_negative_integer_from_param(params[0]) - raise RPCError('params should contain a non-negative integer: {}' - .format(params)) + raise self.RPCError('params should contain a non-negative integer: {}' + .format(params)) def require_empty_params(self, params): if params: - raise RPCError('params should be empty: {}'.format(params)) + raise self.RPCError('params should be empty: {}'.format(params)) class ElectrumX(Session): @@ -324,36 +324,41 @@ class ElectrumX(Session): for prefix, suffixes in rpcs for suffix in suffixes.split()} - @classmethod - def notify(cls, sessions, height, touched): - headers_payload = height_payload = None + async def notify(self, height, touched, cache): + '''Notify the client about changes in height and touched addresses. - for session in sessions: - if height != session.notified_height: - session.notified_height = height - if session.subscribe_headers: - if headers_payload is None: - headers_payload = json_notification_payload( - 'blockchain.headers.subscribe', - (session.electrum_header(height), ), - ) - session.send_json(headers_payload) - - if session.subscribe_height: - if height_payload is None: - height_payload = json_notification_payload( - 'blockchain.numblocks.subscribe', - (height, ), - ) - session.send_json(height_payload) - - hash168_to_address = session.coin.hash168_to_address - for hash168 in session.hash168s.intersection(touched): - address = hash168_to_address(hash168) - status = session.address_status(hash168) + Cache is a shared cache for this update. + ''' + if height != self.notified_height: + self.notified_height = height + if self.subscribe_headers: + key = 'headers_payload' + if key not in cache: + cache[key] = json_notification_payload( + 'blockchain.headers.subscribe', + (self.electrum_header(height), ), + ) + self.send_json(cache[key]) + + if self.subscribe_height: payload = json_notification_payload( - 'blockchain.address.subscribe', (address, status)) - session.send_json(payload) + 'blockchain.numblocks.subscribe', + (height, ), + ) + self.send_json(payload) + + hash168_to_address = self.coin.hash168_to_address + matches = self.hash168s.intersection(touched) + for hash168 in matches: + address = hash168_to_address(hash168) + status = await self.address_status(hash168) + payload = json_notification_payload( + 'blockchain.address.subscribe', (address, status)) + self.send_json(payload) + + if matches: + self.logger.info('notified {} of {} addresses' + .format(self.peername(), len(matches))) def height(self): '''Return the block processor's current height.''' @@ -366,15 +371,15 @@ class ElectrumX(Session): def electrum_header(self, height): '''Return the binary header at the given height.''' if not 0 <= height <= self.height(): - raise RPCError('height {:,d} out of range'.format(height)) + raise self.RPCError('height {:,d} out of range'.format(height)) header = self.bp.read_headers(height, 1) return self.coin.electrum_header(header, height) - def address_status(self, hash168): + async def address_status(self, hash168): '''Returns status as 32 bytes.''' # Note history is ordered and mempool unordered in electrum-server # For mempool, height is -1 if unconfirmed txins, otherwise 0 - history = self.bp.get_history(hash168) + history = await self.async_get_history(hash168) mempool = self.bp.mempool_transactions(hash168) status = ''.join('{}:{:d}:'.format(hash_to_str(tx_hash), height) @@ -407,10 +412,10 @@ class ElectrumX(Session): return {"block_height": height, "merkle": merkle_branch, "pos": pos} - def get_history(self, hash168): + async def get_history(self, hash168): # Note history is ordered and mempool unordered in electrum-server # For mempool, height is -1 if unconfirmed txins, otherwise 0 - history = self.bp.get_history(hash168, limit=None) + history = await self.async_get_history(hash168) mempool = self.bp.mempool_transactions(hash168) conf = tuple({'tx_hash': hash_to_str(tx_hash), 'height': height} @@ -427,44 +432,61 @@ class ElectrumX(Session): count = min(next_height - start_height, chunk_size) return self.bp.read_headers(start_height, count).hex() - def get_balance(self, hash168): - confirmed = self.bp.get_balance(hash168) + async def async_get_history(self, hash168): + # Python 3.6: use async generators; update callers + history = [] + for item in self.bp.get_history(hash168, limit=None): + history.append(item) + if len(history) % 100 == 0: + await asyncio.sleep(0) + return history + + async def get_utxos(self, hash168): + # Python 3.6: use async generators; update callers + utxos = [] + for utxo in self.bp.get_utxos(hash168, limit=None): + utxos.append(utxo) + if len(utxos) % 25 == 0: + await asyncio.sleep(0) + return utxos + + async def get_balance(self, hash168): + utxos = await self.get_utxos(hash168) + confirmed = sum(utxo.value for utxo in utxos) unconfirmed = self.bp.mempool_value(hash168) return {'confirmed': confirmed, 'unconfirmed': unconfirmed} - def list_unspent(self, hash168): - utxos = self.bp.get_utxos_sorted(hash168) - return tuple({'tx_hash': hash_to_str(utxo.tx_hash), - 'tx_pos': utxo.tx_pos, 'height': utxo.height, - 'value': utxo.value} - for utxo in utxos) + async def list_unspent(self, hash168): + return [{'tx_hash': hash_to_str(utxo.tx_hash), 'tx_pos': utxo.tx_pos, + 'height': utxo.height, 'value': utxo.value} + for utxo in sorted(await self.get_utxos(hash168))] # --- blockchain commands async def address_get_balance(self, params): hash168 = self.extract_hash168(params) - return self.get_balance(hash168) + return await self.get_balance(hash168) async def address_get_history(self, params): hash168 = self.extract_hash168(params) - return self.get_history(hash168) + return await self.get_history(hash168) async def address_get_mempool(self, params): hash168 = self.extract_hash168(params) - raise RPCError('get_mempool is not yet implemented') + raise self.RPCError('get_mempool is not yet implemented') async def address_get_proof(self, params): hash168 = self.extract_hash168(params) - raise RPCError('get_proof is not yet implemented') + raise self.RPCError('get_proof is not yet implemented') async def address_listunspent(self, params): hash168 = self.extract_hash168(params) - return self.list_unspent(hash168) + return await self.list_unspent(hash168) async def address_subscribe(self, params): hash168 = self.extract_hash168(params) self.hash168s.add(hash168) - return self.address_status(hash168) + return await self.address_status(hash168) async def block_get_chunk(self, params): index = self.extract_non_negative_integer(params) @@ -529,7 +551,7 @@ class ElectrumX(Session): tx_hash = self.tx_hash_from_param(params[0]) return await self.daemon.getrawtransaction(tx_hash) - raise RPCError('params wrong length: {}'.format(params)) + raise self.RPCError('params wrong length: {}'.format(params)) async def transaction_get_merkle(self, params): if len(params) == 2: @@ -537,7 +559,8 @@ class ElectrumX(Session): height = self.non_negative_integer_from_param(params[1]) return await self.tx_merkle(tx_hash, height) - raise RPCError('params should contain a transaction hash and height') + raise self.RPCError('params should contain a transaction hash ' + 'and height') async def utxo_get_address(self, params): if len(params) == 2: @@ -549,7 +572,8 @@ class ElectrumX(Session): return self.coin.hash168_to_address(hash168) return None - raise RPCError('params should contain a transaction hash and index') + raise self.RPCError('params should contain a transaction hash ' + 'and index') # --- server commands diff --git a/server/version.py b/server/version.py index 77fe367..d0bffae 100644 --- a/server/version.py +++ b/server/version.py @@ -1 +1 @@ -VERSION = "ElectrumX 0.5.1" +VERSION = "ElectrumX 0.6.0"