diff --git a/HOWTO.rst b/HOWTO.rst index 5a80d0f..2f7abd1 100644 --- a/HOWTO.rst +++ b/HOWTO.rst @@ -6,14 +6,15 @@ successfully on MaxOSX and DragonFlyBSD. It won't run out-of-the-box on Windows, but the changes required to make it do so should be small - patches welcome. -+ Python3: ElectrumX makes heavy use of asyncio so version >=3.5 is required ++ Python3: ElectrumX uses asyncio. Python version >=3.5 is required. + plyvel: Python interface to LevelDB. I am using plyvel-0.9. + aiohttp: Python library for asynchronous HTTP. ElectrumX uses it for - communication with the daemon. I am using aiohttp-0.21. + communication with the daemon. Version >= 1.0 required; I am + using 1.0.5. While not requirements for running ElectrumX, it is intended to be run -with supervisor software such as Daniel Bernstein's daemontools, or -Gerald Pape's runit package. These make administration of secure +with supervisor software such as Daniel Bernstein's daemontools, +Gerald Pape's runit package or systemd. These make administration of secure unix servers very easy, and I strongly recommend you install one of these and familiarise yourself with them. The instructions below and sample run scripts assume daemontools; adapting to runit should be trivial @@ -55,6 +56,10 @@ on an SSD:: mkdir /path/to/db_directory chown electrumx /path/to/db_directory + +Using daemontools +----------------- + Next create a daemontools service directory; this only holds symlinks (see daemontools documentation). The 'svscan' program will ensure the servers in the directory are running by launching a 'supervise' @@ -107,6 +112,34 @@ You can see its logs with:: tail -F /path/to/log/dir/current | tai64nlocal +Using systemd +------------- + +This repository contains a sample systemd unit file that you can use to +setup ElectrumX with systemd. Simply copy it to :code:`/etc/systemd/system`:: + + cp samples/systemd-unit /etc/systemd/system/electrumx.service + +The sample unit file assumes that the repository is located at +:code:`/home/electrumx/electrumx`. If that differs on your system, you need to +change the unit file accordingly. + +You need to set a few configuration variables in :code:`/etc/electrumx.conf`, +see `samples/NOTES` for the list of required variables. + +Now you can start ElectrumX using :code:`systemctl`:: + + systemctl start electrumx + +You can use :code:`journalctl` to check the log output:: + + journalctl -u electrumx -f + +Once configured, you may want to start ElectrumX at boot:: + + systemctl enable electrumx + + Sync Progress ============= @@ -127,13 +160,14 @@ Here is my experience with the current codebase, to given heights and rough wall-time:: Machine A Machine B DB + Metadata - 180,000 7m 10s 0.4 GiB - 245,800 1h 00m 2.7 GiB - 290,000 1h 56m 3.3 GiB - 343,000 3h 56m 6.0 GiB - 386,000 7h 28m 7.0 GiB - 404,000 9h 41m - 434,369 14h 38m 17.1 GiB + 181,000 7m 09s 0.4 GiB + 255,000 1h 02m 2.7 GiB + 289,000 1h 46m 3.3 GiB + 317,000 2h 33m + 351,000 3h 58m + 377,000 6h 06m 6.5 GiB + 403,400 8h 51m + 436,196 14h 03m 17.3 GiB Machine A: a low-spec 2011 1.6GHz AMD E-350 dual-core fanless CPU, 8GB RAM and a DragonFlyBSD HAMMER fileystem on an SSD. It requests blocks @@ -141,7 +175,7 @@ over the LAN from a bitcoind on machine B. Machine B: a late 2012 iMac running El-Capitan 10.11.6, 2.9GHz quad-core Intel i5 CPU with an HDD and 24GB RAM. Running bitcoind on -the same machine. HIST_MB of 400, CACHE_MB of 2,000. +the same machine. HIST_MB of 350, UTXO_MB of 1,600. For chains other than bitcoin-mainnet sychronization should be much faster. @@ -158,7 +192,7 @@ by bringing it down like so:: If processing the blockchain the server will start the process of flushing to disk. Once that is complete the server will exit. Be -patient as disk flushing can take a while. +patient as disk flushing can take many minutes. ElectrumX flushes to leveldb using its transaction functionality. The plyvel documentation claims this is atomic. I have written ElectrumX @@ -228,4 +262,5 @@ After flush-to-disk you may see an aiohttp error; this is the daemon timing out the connection while the disk flush was in progress. This is harmless. -The ETA is just a guide and can be quite volatile. +The ETA is just a guide and can be quite volatile. It is too optimistic +initially. diff --git a/README.rst b/README.rst index ce6955f..ea84b5e 100644 --- a/README.rst +++ b/README.rst @@ -47,33 +47,28 @@ faster? All of the following likely play a part: - aggressive caching and batching of DB writes -- more compact representation of UTXOs, the mp address index, and +- more compact representation of UTXOs, the address index, and history. Electrum server stores full transaction hash and height for all UTXOs. In its pruned history it does the same. ElectrumX just stores the transaction number in the linear history of - transactions, and it looks like that for at least 5 years that will - fit in a 4-byte integer. ElectrumX calculates the height from a - simple lookup in a linear array which is stored on disk. ElectrumX - also stores transaction hashes in a linear array on disk. + transactions. For at least another 5 years the transaction number + will fit in a 4-byte integer. ElectrumX calculates the height from + a simple lookup in a linear array which is stored on disk. + ElectrumX also stores transaction hashes in a linear array on disk. - storing static append-only metadata which is indexed by position on disk rather than in levelDB. It would be nice to do this for histories but I cannot think how they could be easily indexable on a filesystem. - avoiding unnecessary or redundant computations - more efficient memory usage -- asyncio and asynchronous prefetch of blocks. With luck ElectrumX - will have no need of threads or locking primitives -- because it prunes electrum-server needs to store undo information, - ElectrumX should does not need to store undo information for - blockchain reorganisations (note blockchain reorgs are not yet - implemented in ElectrumX) +- asyncio and asynchronous prefetch of blocks. ElectrumX should not + have any need of threads. Roadmap ======= - test a few more performance improvement ideas -- handle blockchain reorgs -- handle client connections +- handle client connections (half-implemented but not functional) - potentially move some functionality to C or C++ Once I get round to writing the server part, I will add DoS @@ -102,12 +97,13 @@ As I've been researching where the time is going during block chain indexing and how various cache sizes and hardware choices affect it, I'd appreciate it if anyone trying to synchronize could tell me:: + - the version of ElectrumX - their O/S and filesystem - their hardware (CPU name and speed, RAM, and disk kind) - whether their daemon was on the same host or not - whatever stats about sync height vs time they can provide (the logs give it all in wall time) - - the network they synced + - the network (e.g. bitcoin mainnet) they synced Neil Booth diff --git a/RELEASE-NOTES b/RELEASE-NOTES new file mode 100644 index 0000000..6352940 --- /dev/null +++ b/RELEASE-NOTES @@ -0,0 +1,11 @@ +Version 0.02 +------------ + +- fix bug where tx counts were incorrectly saved +- large clean-up and refactoring of code, breakout into new files +- several efficiency improvements +- initial implementation of chain reorg handling +- work on RPC and TCP server functionality. Code committed but not + functional, so currently disabled +- note that some of the enivronment variables have been renamed, + see samples/scripts/NOTES for the list \ No newline at end of file diff --git a/electrumx_rpc.py b/electrumx_rpc.py new file mode 100755 index 0000000..68fc74c --- /dev/null +++ b/electrumx_rpc.py @@ -0,0 +1,62 @@ +#!/usr/bin/env python3 + +# See the file "LICENSE" for information about the copyright +# and warranty status of this software. + +import argparse +import asyncio +import json +from functools import partial +from os import environ + + +class RPCClient(asyncio.Protocol): + + def __init__(self, loop): + self.loop = loop + + def connection_made(self, transport): + self.transport = transport + + def connection_lost(self, exc): + self.loop.stop() + + def send(self, payload): + data = json.dumps(payload) + '\n' + self.transport.write(data.encode()) + + def data_received(self, data): + payload = json.loads(data.decode()) + self.transport.close() + print(json.dumps(payload, indent=4, sort_keys=True)) + + +def main(): + '''Send the RPC command to the server and print the result.''' + parser = argparse.ArgumentParser('Send electrumx an RPC command' ) + parser.add_argument('-p', '--port', metavar='port_num', type=int, + help='RPC port number') + parser.add_argument('command', nargs='*', default=[], + help='command to send') + args = parser.parse_args() + + if args.port is None: + args.port = int(environ.get('ELECTRUMX_RPC_PORT', 8000)) + + payload = {'method': args.command[0], 'params': args.command[1:]} + + loop = asyncio.get_event_loop() + proto_factory = partial(RPCClient, loop) + coro = loop.create_connection(proto_factory, 'localhost', args.port) + try: + transport, protocol = loop.run_until_complete(coro) + protocol.send(payload) + loop.run_forever() + except OSError: + print('error connecting - is ElectrumX running?') + finally: + loop.close() + + +if __name__ == '__main__': + main() diff --git a/lib/coins.py b/lib/coins.py index b04bec2..30a0b12 100644 --- a/lib/coins.py +++ b/lib/coins.py @@ -6,7 +6,7 @@ from decimal import Decimal import inspect import sys -from lib.hash import Base58, hash160 +from lib.hash import Base58, hash160, double_sha256 from lib.script import ScriptPubKey from lib.tx import Deserializer @@ -135,10 +135,17 @@ class Coin(object): payload.append(0x01) return Base58.encode_check(payload) + @classmethod + def header_hashes(cls, header): + '''Given a header return the previous block hash and the current block + hash.''' + return header[4:36], double_sha256(header) + @classmethod def read_block(cls, block): - d = Deserializer(block[cls.HEADER_LEN:]) - return d.read_block() + '''Read a block and return (header, tx_hashes, txs)''' + header, rest = block[:cls.HEADER_LEN], block[cls.HEADER_LEN:] + return (header, ) + Deserializer(rest).read_block() @classmethod def decimal_value(cls, value): diff --git a/lib/hash.py b/lib/hash.py index 28fb399..bf0aed6 100644 --- a/lib/hash.py +++ b/lib/hash.py @@ -30,6 +30,16 @@ def hmac_sha512(key, msg): def hash160(x): return ripemd160(sha256(x)) +def hash_to_str(x): + '''Converts a big-endian binary hash to a little-endian hex string, as + shown in block explorers, etc. + ''' + return bytes(reversed(x)).hex() + +def hex_str_to_hash(x): + '''Converts a little-endian hex string as shown to a big-endian binary + hash.''' + return bytes(reversed(bytes.fromhex(x))) class InvalidBase58String(Exception): pass diff --git a/lib/tx.py b/lib/tx.py index dfa417c..c45425c 100644 --- a/lib/tx.py +++ b/lib/tx.py @@ -2,11 +2,10 @@ # and warranty status of this software. from collections import namedtuple -import binascii import struct from lib.util import cachedproperty -from lib.hash import double_sha256 +from lib.hash import double_sha256, hash_to_str class Tx(namedtuple("Tx", "version inputs outputs locktime")): @@ -15,17 +14,17 @@ class Tx(namedtuple("Tx", "version inputs outputs locktime")): def is_coinbase(self): return self.inputs[0].is_coinbase -OutPoint = namedtuple("OutPoint", "hash n") + # FIXME: add hash as a cached property? -# prevout is an OutPoint object -class TxInput(namedtuple("TxInput", "prevout script sequence")): +class TxInput(namedtuple("TxInput", "prev_hash prev_idx script sequence")): ZERO = bytes(32) MINUS_1 = 4294967295 @cachedproperty def is_coinbase(self): - return self.prevout == (TxInput.ZERO, TxInput.MINUS_1) + return (self.prev_hash == TxInput.ZERO + and self.prev_idx == TxInput.MINUS_1) @cachedproperty def script_sig_info(self): @@ -34,11 +33,11 @@ class TxInput(namedtuple("TxInput", "prevout script sequence")): return None return Script.parse_script_sig(self.script) - def __repr__(self): - script = binascii.hexlify(self.script).decode("ascii") - prev_hash = binascii.hexlify(self.prevout.hash).decode("ascii") - return ("Input(prevout=({}, {:d}), script={}, sequence={:d})" - .format(prev_hash, self.prevout.n, script, self.sequence)) + def __str__(self): + script = self.script.hex() + prev_hash = hash_to_str(self.prev_hash) + return ("Input({}, {:d}, script={}, sequence={:d})" + .format(prev_hash, self.prev_idx, script, self.sequence)) class TxOutput(namedtuple("TxOutput", "value pk_script")): @@ -56,11 +55,12 @@ class Deserializer(object): self.cursor = 0 def read_tx(self): - version = self.read_le_int32() - inputs = self.read_inputs() - outputs = self.read_outputs() - locktime = self.read_le_uint32() - return Tx(version, inputs, outputs, locktime) + return Tx( + self.read_le_int32(), # version + self.read_inputs(), # inputs + self.read_outputs(), # outputs + self.read_le_uint32() # locktime + ) def read_block(self): tx_hashes = [] @@ -81,15 +81,12 @@ class Deserializer(object): return [self.read_input() for i in range(n)] def read_input(self): - prevout = self.read_outpoint() - script = self.read_varbytes() - sequence = self.read_le_uint32() - return TxInput(prevout, script, sequence) - - def read_outpoint(self): - hash = self.read_nbytes(32) - n = self.read_le_uint32() - return OutPoint(hash, n) + return TxInput( + self.read_nbytes(32), # prev_hash + self.read_le_uint32(), # prev_idx + self.read_varbytes(), # script + self.read_le_uint32() # sequence + ) def read_outputs(self): n = self.read_varint() diff --git a/lib/util.py b/lib/util.py index 13be38a..552a352 100644 --- a/lib/util.py +++ b/lib/util.py @@ -2,10 +2,18 @@ # and warranty status of this software. import array +import logging import sys from collections import Container, Mapping +class LoggedClass(object): + + def __init__(self): + self.logger = logging.getLogger(self.__class__.__name__) + self.logger.setLevel(logging.INFO) + + # Method decorator. To be used for calculations that will always # deliver the same result. The method cannot take any arguments # and should be accessed as an attribute. diff --git a/query.py b/query.py old mode 100644 new mode 100755 index 1d8b462..57941fa --- a/query.py +++ b/query.py @@ -7,14 +7,35 @@ import os import sys from server.env import Env -from server.db import DB +from server.block_processor import BlockProcessor +from lib.hash import hash_to_str + + +def count_entries(db): + utxos = 0 + for key in db.iterator(prefix=b'u', include_value=False): + utxos += 1 + print("UTXO count:", utxos) + + hash168 = 0 + for key in db.iterator(prefix=b'h', include_value=False): + hash168 += 1 + print("Hash168 count:", hash168) + + hist = 0 + for key in db.iterator(prefix=b'H', include_value=False): + hist += 1 + print("History addresses:", hist) def main(): env = Env() + coin = env.coin os.chdir(env.db_dir) - db = DB(env) - coin = db.coin + bp = BlockProcessor(env, None) + if len(sys.argv) == 1: + count_entries(bp.db) + return argc = 1 try: limit = int(sys.argv[argc]) @@ -25,19 +46,19 @@ def main(): print('Address: ', addr) hash168 = coin.address_to_hash168(addr) n = None - for n, (tx_hash, height) in enumerate(db.get_history(hash168, limit)): + for n, (tx_hash, height) in enumerate(bp.get_history(hash168, limit)): print('History #{:d}: hash: {} height: {:d}' - .format(n + 1, bytes(reversed(tx_hash)).hex(), height)) + .format(n + 1, hash_to_str(tx_hash), height)) if n is None: print('No history') n = None - for n, utxo in enumerate(db.get_utxos(hash168, limit)): + for n, utxo in enumerate(bp.get_utxos(hash168, limit)): print('UTXOs #{:d}: hash: {} pos: {:d} height: {:d} value: {:d}' - .format(n + 1, bytes(reversed(utxo.tx_hash)).hex(), + .format(n + 1, hash_to_str(utxo.tx_hash), utxo.tx_pos, utxo.height, utxo.value)) if n is None: print('No UTXOs') - balance = db.get_balance(hash168) + balance = bp.get_balance(hash168) print('Balance: {} {}'.format(coin.decimal_value(balance), coin.SHORTNAME)) diff --git a/samples/scripts/NOTES b/samples/scripts/NOTES index 2287788..d38a327 100644 --- a/samples/scripts/NOTES +++ b/samples/scripts/NOTES @@ -5,6 +5,11 @@ NETWORK - see lib/coins.py, must be a coin NET DB_DIRECTORY - path to the database directory (if relative, to run script) USERNAME - the username the server will run as SERVER_MAIN - path to the server_main.py script (if relative, to run script) +DAEMON_URL - the URL used to connect to the daemon. Should be of the form + http://username:password@hostname:port/ + Alternatively you can specify DAEMON_USERNAME, DAEMON_PASSWORD, + DAEMON_HOST and DAEMON_PORT. DAEMON_PORT is optional and + will default appropriately for COIN. In addition either RPC_URL must be given as the full RPC URL for connecting to the daemon, or you must specify RPC_HOST, RPC_USER, @@ -14,6 +19,11 @@ the coin and network otherwise). The other environment variables are all optional and will adopt sensible defaults if not specified. +REORG_LIMIT - maximum number of blocks to be able to handle in a chain + reorganisation. ElectrumX retains some fairly compact + undo information for this many blocks in levelDB. + Default is 200. + Your performance might change by tweaking these cache settings. Cache size is only checked roughly every minute, so the caches can grow beyond the specified size. Also the Python process is often quite a @@ -25,14 +35,14 @@ HIST_MB - amount of history cache, in MB, to retain before flushing to disk. Default is 250; probably no benefit being much larger as history is append-only and not searched. -CACHE_MB- amount of UTXO and history cache, in MB, to retain before +UTXO_MB - amount of UTXO and history cache, in MB, to retain before flushing to disk. Default is 1000. This may be too large for small boxes or too small for machines with lots of RAM. Larger caches generally perform better as there is significant searching of the UTXO cache during indexing. However, I don't see much benefit in my tests pushing this - beyond 2000, and in fact beyond there performance begins to - fall. My machine has 24GB RAM; the slow down is probably - because of leveldb caching and Python GC effects. However - this may be very dependent on hardware and you may have - different results. \ No newline at end of file + too high, and in fact performance begins to fall. My + machine has 24GB RAM; the slow down is probably because of + leveldb caching and Python GC effects. However this may be + very dependent on hardware and you may have different + results. \ No newline at end of file diff --git a/samples/scripts/env/RPC_HOST b/samples/scripts/env/DAEMON_HOST similarity index 100% rename from samples/scripts/env/RPC_HOST rename to samples/scripts/env/DAEMON_HOST diff --git a/samples/scripts/env/RPC_PASSWORD b/samples/scripts/env/DAEMON_PASSWORD similarity index 100% rename from samples/scripts/env/RPC_PASSWORD rename to samples/scripts/env/DAEMON_PASSWORD diff --git a/samples/scripts/env/RPC_PORT b/samples/scripts/env/DAEMON_PORT similarity index 100% rename from samples/scripts/env/RPC_PORT rename to samples/scripts/env/DAEMON_PORT diff --git a/samples/scripts/env/RPC_USERNAME b/samples/scripts/env/DAEMON_USERNAME similarity index 100% rename from samples/scripts/env/RPC_USERNAME rename to samples/scripts/env/DAEMON_USERNAME diff --git a/samples/scripts/env/CACHE_MB b/samples/scripts/env/UTXO_MB similarity index 100% rename from samples/scripts/env/CACHE_MB rename to samples/scripts/env/UTXO_MB diff --git a/samples/systemd-unit b/samples/systemd-unit new file mode 100644 index 0000000..94b7d47 --- /dev/null +++ b/samples/systemd-unit @@ -0,0 +1,11 @@ +[Unit] +Description=Electrumx +After=network.target + +[Service] +EnvironmentFile=/etc/electrumx.conf +ExecStart=/home/electrumx/electrumx/server_main.py +User=electrumx + +[Install] +WantedBy=multi-user.target \ No newline at end of file diff --git a/server/block_processor.py b/server/block_processor.py new file mode 100644 index 0000000..e1eebad --- /dev/null +++ b/server/block_processor.py @@ -0,0 +1,714 @@ +# See the file "LICENSE" for information about the copyright +# and warranty status of this software. + +import array +import ast +import asyncio +import struct +import time +from bisect import bisect_left +from collections import defaultdict, namedtuple +from functools import partial + +import plyvel + +from server.cache import FSCache, UTXOCache, NO_CACHE_ENTRY +from server.daemon import DaemonError +from lib.hash import hash_to_str +from lib.script import ScriptPubKey +from lib.util import chunks, LoggedClass + + +def formatted_time(t): + '''Return a number of seconds as a string in days, hours, mins and + secs.''' + t = int(t) + return '{:d}d {:02d}h {:02d}m {:02d}s'.format( + t // 86400, (t % 86400) // 3600, (t % 3600) // 60, t % 60) + + +UTXO = namedtuple("UTXO", "tx_num tx_pos tx_hash height value") + + +class ChainError(Exception): + pass + + +class Prefetcher(LoggedClass): + '''Prefetches blocks (in the forward direction only).''' + + def __init__(self, daemon, height): + super().__init__() + self.daemon = daemon + self.semaphore = asyncio.Semaphore() + self.queue = asyncio.Queue() + self.queue_size = 0 + # Target cache size. Has little effect on sync time. + self.target_cache_size = 10 * 1024 * 1024 + self.fetched_height = height + self.recent_sizes = [0] + + async def get_blocks(self): + '''Returns a list of prefetched blocks.''' + blocks, total_size = await self.queue.get() + self.queue_size -= total_size + return blocks + + async def clear(self, height): + '''Clear prefetched blocks and restart from the given height. + + Used in blockchain reorganisations. This coroutine can be + called asynchronously to the _prefetch coroutine so we must + synchronize. + ''' + with await self.semaphore: + while not self.queue.empty(): + self.queue.get_nowait() + self.queue_size = 0 + self.fetched_height = height + + async def start(self): + '''Loop forever polling for more blocks.''' + self.logger.info('prefetching blocks...') + while True: + while self.queue_size < self.target_cache_size: + try: + with await self.semaphore: + await self._prefetch() + except DaemonError as e: + self.logger.info('ignoring daemon errors: {}'.format(e)) + await asyncio.sleep(2) + + def _prefill_count(self, room): + ave_size = sum(self.recent_sizes) // len(self.recent_sizes) + count = room // ave_size if ave_size else 0 + return max(count, 10) + + async def _prefetch(self): + '''Prefetch blocks if there are any to prefetch.''' + daemon_height = await self.daemon.height() + max_count = min(daemon_height - self.fetched_height, 4000) + count = min(max_count, self._prefill_count(self.target_cache_size)) + first = self.fetched_height + 1 + hex_hashes = await self.daemon.block_hex_hashes(first, count) + if not hex_hashes: + return + + blocks = await self.daemon.raw_blocks(hex_hashes) + sizes = [len(block) for block in blocks] + total_size = sum(sizes) + self.queue.put_nowait((blocks, total_size)) + self.queue_size += total_size + self.fetched_height += len(blocks) + + # Keep 50 most recent block sizes for fetch count estimation + self.recent_sizes.extend(sizes) + excess = len(self.recent_sizes) - 50 + if excess > 0: + self.recent_sizes = self.recent_sizes[excess:] + + +class BlockProcessor(LoggedClass): + '''Process blocks and update the DB state to match. + + Employ a prefetcher to prefetch blocks in batches for processing. + Coordinate backing up in case of chain reorganisations. + ''' + + def __init__(self, env, daemon): + super().__init__() + + self.daemon = daemon + + # Meta + self.utxo_MB = env.utxo_MB + self.hist_MB = env.hist_MB + self.next_cache_check = 0 + self.coin = env.coin + self.caught_up = False + self.reorg_limit = env.reorg_limit + + # Chain state (initialize to genesis in case of new DB) + self.db_height = -1 + self.db_tx_count = 0 + self.db_tip = b'\0' * 32 + self.flush_count = 0 + self.utxo_flush_count = 0 + self.wall_time = 0 + + # Open DB and metadata files. Record some of its state. + self.db = self.open_db(self.coin) + self.tx_count = self.db_tx_count + self.height = self.db_height + self.tip = self.db_tip + + # Caches to be flushed later. Headers and tx_hashes have one + # entry per block + self.history = defaultdict(partial(array.array, 'I')) + self.history_size = 0 + self.backup_hash168s = set() + self.utxo_cache = UTXOCache(self, self.db, self.coin) + self.fs_cache = FSCache(self.coin, self.height, self.tx_count) + self.prefetcher = Prefetcher(daemon, self.height) + + self.last_flush = time.time() + self.last_flush_tx_count = self.tx_count + + # Redirected member func + self.get_tx_hash = self.fs_cache.get_tx_hash + + # Log state + self.logger.info('{}/{} height: {:,d} tx count: {:,d} ' + 'flush count: {:,d} utxo flush count: {:,d} ' + 'sync time: {}' + .format(self.coin.NAME, self.coin.NET, self.height, + self.tx_count, self.flush_count, + self.utxo_flush_count, + formatted_time(self.wall_time))) + self.logger.info('reorg limit of {:,d} blocks' + .format(self.reorg_limit)) + self.logger.info('flushing UTXO cache at {:,d} MB' + .format(self.utxo_MB)) + self.logger.info('flushing history cache at {:,d} MB' + .format(self.hist_MB)) + + self.clean_db() + + def coros(self, force_backup=False): + if force_backup: + return [self.force_chain_reorg(True), self.prefetcher.start()] + else: + return [self.start(), self.prefetcher.start()] + + async def start(self): + '''External entry point for block processing. + + A simple wrapper that safely flushes the DB on clean + shutdown. + ''' + try: + await self.advance_blocks() + finally: + self.flush(True) + + async def advance_blocks(self): + '''Loop forever processing blocks in the forward direction.''' + while True: + blocks = await self.prefetcher.get_blocks() + for block in blocks: + if not self.advance_block(block): + await self.handle_chain_reorg() + self.caught_up = False + break + await asyncio.sleep(0) # Yield + + if self.height != self.daemon.cached_height(): + continue + + if not self.caught_up: + self.caught_up = True + self.logger.info('caught up to height {:,d}' + .format(self.height)) + + # Flush everything when in caught-up state as queries + # are performed on DB not in-memory + self.flush(True) + + async def force_chain_reorg(self, to_genesis): + try: + await self.handle_chain_reorg(to_genesis) + finally: + self.flush(True) + + async def handle_chain_reorg(self, to_genesis=False): + # First get all state on disk + self.logger.info('chain reorg detected') + self.flush(True) + self.logger.info('finding common height...') + hashes = await self.reorg_hashes(to_genesis) + # Reverse and convert to hex strings. + hashes = [hash_to_str(hash) for hash in reversed(hashes)] + for hex_hashes in chunks(hashes, 50): + blocks = await self.daemon.raw_blocks(hex_hashes) + self.backup_blocks(blocks) + self.logger.info('backed up to height {:,d}'.format(self.height)) + await self.prefetcher.clear(self.height) + self.logger.info('prefetcher reset') + + async def reorg_hashes(self, to_genesis): + '''Return the list of hashes to back up beacuse of a reorg. + + The hashes are returned in order of increasing height.''' + def match_pos(hashes1, hashes2): + for n, (hash1, hash2) in enumerate(zip(hashes1, hashes2)): + if hash1 == hash2: + return n + return -1 + + start = self.height - 1 + count = 1 + while start > 0: + self.logger.info('start: {:,d} count: {:,d}'.format(start, count)) + hashes = self.fs_cache.block_hashes(start, count) + hex_hashes = [hash_to_str(hash) for hash in hashes] + d_hex_hashes = await self.daemon.block_hex_hashes(start, count) + n = match_pos(hex_hashes, d_hex_hashes) + if n >= 0 and not to_genesis: + start += n + 1 + break + count = min(count * 2, start) + start -= count + + # Hashes differ from height 'start' + count = (self.height - start) + 1 + + self.logger.info('chain was reorganised for {:,d} blocks from ' + 'height {:,d} to height {:,d}' + .format(count, start, start + count - 1)) + + return self.fs_cache.block_hashes(start, count) + + def open_db(self, coin): + db_name = '{}-{}'.format(coin.NAME, coin.NET) + try: + db = plyvel.DB(db_name, create_if_missing=False, + error_if_exists=False, compression=None) + except: + db = plyvel.DB(db_name, create_if_missing=True, + error_if_exists=True, compression=None) + self.logger.info('created new database {}'.format(db_name)) + else: + self.logger.info('successfully opened database {}'.format(db_name)) + self.read_state(db) + + return db + + def read_state(self, db): + state = db.get(b'state') + state = ast.literal_eval(state.decode()) + if state['genesis'] != self.coin.GENESIS_HASH: + raise ChainError('DB genesis hash {} does not match coin {}' + .format(state['genesis_hash'], + self.coin.GENESIS_HASH)) + self.db_height = state['height'] + self.db_tx_count = state['tx_count'] + self.db_tip = state['tip'] + self.flush_count = state['flush_count'] + self.utxo_flush_count = state['utxo_flush_count'] + self.wall_time = state['wall_time'] + + def clean_db(self): + '''Clean out stale DB items. + + Stale DB items are excess history flushed since the most + recent UTXO flush (only happens on unclean shutdown), and aged + undo information. + ''' + if self.flush_count < self.utxo_flush_count: + raise ChainError('DB corrupt: flush_count < utxo_flush_count') + with self.db.write_batch(transaction=True) as batch: + if self.flush_count > self.utxo_flush_count: + self.logger.info('DB shut down uncleanly. Scanning for ' + 'excess history flushes...') + self.remove_excess_history(batch) + self.utxo_flush_count = self.flush_count + self.remove_stale_undo_items(batch) + self.flush_state(batch) + + def remove_excess_history(self, batch): + prefix = b'H' + unpack = struct.unpack + keys = [] + for key, hist in self.db.iterator(prefix=prefix): + flush_id, = unpack('>H', key[-2:]) + if flush_id > self.utxo_flush_count: + keys.append(key) + + self.logger.info('deleting {:,d} history entries' + .format(len(keys))) + for key in keys: + batch.delete(key) + + def remove_stale_undo_items(self, batch): + prefix = b'U' + unpack = struct.unpack + cutoff = self.db_height - self.reorg_limit + keys = [] + for key, hist in self.db.iterator(prefix=prefix): + height, = unpack('>I', key[-4:]) + if height > cutoff: + break + keys.append(key) + + self.logger.info('deleting {:,d} stale undo entries' + .format(len(keys))) + for key in keys: + batch.delete(key) + + def flush_state(self, batch): + '''Flush chain state to the batch.''' + now = time.time() + self.wall_time += now - self.last_flush + self.last_flush = now + self.last_flush_tx_count = self.tx_count + state = { + 'genesis': self.coin.GENESIS_HASH, + 'height': self.db_height, + 'tx_count': self.db_tx_count, + 'tip': self.db_tip, + 'flush_count': self.flush_count, + 'utxo_flush_count': self.utxo_flush_count, + 'wall_time': self.wall_time, + } + batch.put(b'state', repr(state).encode()) + + def flush_utxos(self, batch): + self.logger.info('flushing UTXOs: {:,d} txs and {:,d} blocks' + .format(self.tx_count - self.db_tx_count, + self.height - self.db_height)) + self.utxo_cache.flush(batch) + self.utxo_flush_count = self.flush_count + self.db_tx_count = self.tx_count + self.db_height = self.height + self.db_tip = self.tip + + def assert_flushed(self): + '''Asserts state is fully flushed.''' + assert self.tx_count == self.db_tx_count + assert not self.history + assert not self.utxo_cache.cache + assert not self.utxo_cache.db_cache + assert not self.backup_hash168s + + def flush(self, flush_utxos=False): + '''Flush out cached state. + + History is always flushed. UTXOs are flushed if flush_utxos.''' + if self.height == self.db_height: + self.logger.info('nothing to flush') + self.assert_flushed() + return + + flush_start = time.time() + last_flush = self.last_flush + tx_diff = self.tx_count - self.last_flush_tx_count + + # Write out the files to the FS before flushing to the DB. If + # the DB transaction fails, the files being too long doesn't + # matter. But if writing the files fails we do not want to + # have updated the DB. + if self.height > self.db_height: + self.fs_cache.flush(self.height, self.tx_count) + + with self.db.write_batch(transaction=True) as batch: + # History first - fast and frees memory. Flush state last + # as it reads the wall time. + if self.height > self.db_height: + self.flush_history(batch) + else: + self.backup_history(batch) + if flush_utxos: + self.flush_utxos(batch) + self.flush_state(batch) + self.logger.info('committing transaction...') + + # Update and put the wall time again - otherwise we drop the + # time it took to commit the batch + self.flush_state(self.db) + + flush_time = int(self.last_flush - flush_start) + self.logger.info('flush #{:,d} to height {:,d} txs: {:,d} took {:,d}s' + .format(self.flush_count, self.height, self.tx_count, + flush_time)) + + # Catch-up stats + if not self.caught_up and tx_diff > 0: + daemon_height = self.daemon.cached_height() + txs_per_sec = int(self.tx_count / self.wall_time) + this_txs_per_sec = 1 + int(tx_diff / (self.last_flush - last_flush)) + if self.height > self.coin.TX_COUNT_HEIGHT: + tx_est = (daemon_height - self.height) * self.coin.TX_PER_BLOCK + else: + tx_est = ((daemon_height - self.coin.TX_COUNT_HEIGHT) + * self.coin.TX_PER_BLOCK + + (self.coin.TX_COUNT - self.tx_count)) + + self.logger.info('tx/sec since genesis: {:,d}, ' + 'since last flush: {:,d}' + .format(txs_per_sec, this_txs_per_sec)) + self.logger.info('sync time: {} ETA: {}' + .format(formatted_time(self.wall_time), + formatted_time(tx_est / this_txs_per_sec))) + + def flush_history(self, batch): + self.logger.info('flushing history') + assert not self.backup_hash168s + + self.flush_count += 1 + flush_id = struct.pack('>H', self.flush_count) + + for hash168, hist in self.history.items(): + key = b'H' + hash168 + flush_id + batch.put(key, hist.tobytes()) + + self.logger.info('{:,d} history entries in {:,d} addrs' + .format(self.history_size, len(self.history))) + + self.history = defaultdict(partial(array.array, 'I')) + self.history_size = 0 + + def backup_history(self, batch): + self.logger.info('backing up history to height {:,d} tx_count {:,d}' + .format(self.height, self.tx_count)) + + # Drop any NO_CACHE entry + self.backup_hash168s.discard(NO_CACHE_ENTRY) + assert not self.history + + nremoves = 0 + for hash168 in sorted(self.backup_hash168s): + prefix = b'H' + hash168 + deletes = [] + puts = {} + for key, hist in self.db.iterator(reverse=True, prefix=prefix): + a = array.array('I') + a.frombytes(hist) + # Remove all history entries >= self.tx_count + idx = bisect_left(a, self.tx_count) + nremoves += len(a) - idx + if idx > 0: + puts[key] = a[:idx].tobytes() + break + deletes.append(key) + + for key in deletes: + batch.delete(key) + for key, value in puts.items(): + batch.put(key, value) + + self.logger.info('removed {:,d} history entries from {:,d} addresses' + .format(nremoves, len(self.backup_hash168s))) + self.backup_hash168s = set() + + def cache_sizes(self): + '''Returns the approximate size of the cache, in MB.''' + # Good average estimates based on traversal of subobjects and + # requesting size from Python (see deep_getsizeof). For + # whatever reason Python O/S mem usage is typically +30% or + # more, so we scale our already bloated object sizes. + one_MB = int(1048576 / 1.3) + utxo_cache_size = len(self.utxo_cache.cache) * 187 + db_cache_size = len(self.utxo_cache.db_cache) * 105 + hist_cache_size = len(self.history) * 180 + self.history_size * 4 + utxo_MB = (db_cache_size + utxo_cache_size) // one_MB + hist_MB = hist_cache_size // one_MB + + self.logger.info('cache stats at height {:,d} daemon height: {:,d}' + .format(self.height, self.daemon.cached_height())) + self.logger.info(' entries: UTXO: {:,d} DB: {:,d} ' + 'hist addrs: {:,d} hist size {:,d}' + .format(len(self.utxo_cache.cache), + len(self.utxo_cache.db_cache), + len(self.history), + self.history_size)) + self.logger.info(' size: {:,d}MB (UTXOs {:,d}MB hist {:,d}MB)' + .format(utxo_MB + hist_MB, utxo_MB, hist_MB)) + return utxo_MB, hist_MB + + def undo_key(self, height): + '''DB key for undo information at the given height.''' + return b'U' + struct.pack('>I', height) + + def write_undo_info(self, height, undo_info): + '''Write out undo information for the current height.''' + self.db.put(self.undo_key(height), undo_info) + + def read_undo_info(self, height): + '''Read undo information from a file for the current height.''' + return self.db.get(self.undo_key(height)) + + def advance_block(self, block): + # We must update the fs_cache before calling advance_txs() as + # the UTXO cache uses the fs_cache via get_tx_hash() to + # resolve compressed key collisions + header, tx_hashes, txs = self.coin.read_block(block) + self.fs_cache.advance_block(header, tx_hashes, txs) + prev_hash, header_hash = self.coin.header_hashes(header) + if prev_hash != self.tip: + return False + + self.tip = header_hash + self.height += 1 + undo_info = self.advance_txs(tx_hashes, txs) + if self.daemon.cached_height() - self.height <= self.reorg_limit: + self.write_undo_info(self.height, b''.join(undo_info)) + + # Check if we're getting full and time to flush? + now = time.time() + if now > self.next_cache_check: + self.next_cache_check = now + 60 + utxo_MB, hist_MB = self.cache_sizes() + if utxo_MB >= self.utxo_MB or hist_MB >= self.hist_MB: + self.flush(utxo_MB >= self.utxo_MB) + + return True + + def advance_txs(self, tx_hashes, txs): + put_utxo = self.utxo_cache.put + spend_utxo = self.utxo_cache.spend + undo_info = [] + + # Use local vars for speed in the loops + history = self.history + tx_num = self.tx_count + coin = self.coin + parse_script = ScriptPubKey.from_script + pack = struct.pack + + for tx, tx_hash in zip(txs, tx_hashes): + hash168s = set() + tx_numb = pack('= 0 + return limit + + def get_history(self, hash168, limit=1000): + '''Generator that returns an unpruned, sorted list of (tx_hash, + height) tuples of transactions that touched the address, + earliest in the blockchain first. Includes both spending and + receiving transactions. By default yields at most 1000 entries. + Set limit to None to get them all. + ''' + limit = self.resolve_limit(limit) + prefix = b'H' + hash168 + for key, hist in self.db.iterator(prefix=prefix): + a = array.array('I') + a.frombytes(hist) + for tx_num in a: + if limit == 0: + return + yield self.get_tx_hash(tx_num) + limit -= 1 + + def get_balance(self, hash168): + '''Returns the confirmed balance of an address.''' + return sum(utxo.value for utxo in self.get_utxos(hash168, limit=None)) + + def get_utxos(self, hash168, limit=1000): + '''Generator that yields all UTXOs for an address sorted in no + particular order. By default yields at most 1000 entries. + Set limit to None to get them all. + ''' + limit = self.resolve_limit(limit) + unpack = struct.unpack + prefix = b'u' + hash168 + utxos = [] + for k, v in self.db.iterator(prefix=prefix): + (tx_pos, ) = unpack('= 0 + # Just update in-memory. It doesn't matter if disk files are + # too long, they will be overwritten when advancing. + self.height -= 1 + self.tx_counts.pop() + + def flush(self, new_height, new_tx_count): + '''Flush the things stored on the filesystem. + The arguments are passed for sanity check assertions only.''' + self.logger.info('flushing to file system') + + blocks_done = len(self.headers) + prior_tx_count = self.tx_counts[self.height] if self.height >= 0 else 0 + cur_tx_count = self.tx_counts[-1] if self.tx_counts else 0 + txs_done = cur_tx_count - prior_tx_count + + assert self.height + blocks_done == new_height + assert len(self.tx_hashes) == blocks_done + assert len(self.tx_counts) == new_height + 1 + assert cur_tx_count == new_tx_count, \ + 'cur: {:,d} new: {:,d}'.format(cur_tx_count, new_tx_count) + + # First the headers + headers = b''.join(self.headers) + header_len = self.coin.HEADER_LEN + self.headers_file.seek((self.height + 1) * header_len) + self.headers_file.write(headers) + self.headers_file.flush() + + # Then the tx counts + self.txcount_file.seek((self.height + 1) * self.tx_counts.itemsize) + self.txcount_file.write(self.tx_counts[self.height + 1:]) + self.txcount_file.flush() + + # Finally the hashes + hashes = memoryview(b''.join(itertools.chain(*self.tx_hashes))) + assert len(hashes) % 32 == 0 + assert len(hashes) // 32 == txs_done + cursor = 0 + file_pos = prior_tx_count * 32 + while cursor < len(hashes): + file_num, offset = divmod(file_pos, self.tx_hash_file_size) + size = min(len(hashes) - cursor, self.tx_hash_file_size - offset) + filename = 'hashes{:04d}'.format(file_num) + with self.open_file(filename, create=True) as f: + f.seek(offset) + f.write(hashes[cursor:cursor + size]) + cursor += size + file_pos += size + + os.sync() + + self.tx_hashes = [] + self.headers = [] + self.height += blocks_done + + def read_headers(self, height, count): + read_count = min(count, self.height + 1 - height) + + assert height >= 0 and read_count >= 0 + assert count <= read_count + len(self.headers) + + result = b'' + if read_count > 0: + header_len = self.coin.HEADER_LEN + self.headers_file.seek(height * header_len) + result = self.headers_file.read(read_count * header_len) + + count -= read_count + if count: + start = (height + read_count) - (self.height + 1) + result += b''.join(self.headers[start: start + count]) + + return result + + def get_tx_hash(self, tx_num): + '''Returns the tx_hash and height of a tx number.''' + height = bisect_right(self.tx_counts, tx_num) + + # Is this on disk or unflushed? + if height > self.height: + tx_hashes = self.tx_hashes[height - (self.height + 1)] + tx_hash = tx_hashes[tx_num - self.tx_counts[height - 1]] + else: + file_pos = tx_num * 32 + file_num, offset = divmod(file_pos, self.tx_hash_file_size) + filename = 'hashes{:04d}'.format(file_num) + with self.open_file(filename) as f: + f.seek(offset) + tx_hash = f.read(32) + + return tx_hash, height + + def block_hashes(self, height, count): + headers = self.read_headers(height, count) + hlen = self.coin.HEADER_LEN + return [double_sha256(header) for header in chunks(headers, hlen)] + + def encode_header(self, height): + if height < 0 or height > self.height + len(self.headers): + raise Exception('no header information for height {:,d}' + .format(height)) + header = self.read_headers(self.height, 1) + unpack = struct.unpack + version, = unpack(' 1: + if len(hashes) & 1: + hashes.append(hashes[-1]) + idx = idx - 1 if (idx & 1) else idx + 1 + merkle_branch.append(hash_to_str(hashes[idx])) + idx //= 2 + hashes = [double_sha256(hashes[n] + hashes[n + 1]) + for n in range(0, len(hashes), 2)] + + return {"block_height": height, "merkle": merkle_branch, "pos": pos} + + def get_peers(self): + '''Returns a dictionary of IRC nick to (ip, host, ports) tuples, one + per peer.''' + return self.peers + + def height(self): + return self.block_processor.height + + def get_current_header(self): + return self.block_processor.get_current_header() + + def get_history(self, hash168): + history = self.block_processor.get_history(hash168, limit=None) + return [ + {'tx_hash': hash_to_str(tx_hash), 'height': height} + for tx_hash, height in history + ] diff --git a/server/daemon.py b/server/daemon.py new file mode 100644 index 0000000..96a07bc --- /dev/null +++ b/server/daemon.py @@ -0,0 +1,98 @@ +# See the file "LICENSE" for information about the copyright +# and warranty status of this software. + +'''Classes for handling asynchronous connections to a blockchain +daemon.''' + +import asyncio +import json + +import aiohttp + +from lib.util import LoggedClass + + +class DaemonError(Exception): + '''Raised when the daemon returns an error in its results that + cannot be remedied by retrying.''' + + +class Daemon(LoggedClass): + '''Handles connections to a daemon at the given URL.''' + + def __init__(self, url): + super().__init__() + self.url = url + self._height = None + self.logger.info('connecting to daemon at URL {}'.format(url)) + + async def send_single(self, method, params=None): + payload = {'method': method} + if params: + payload['params'] = params + result, = await self.send((payload, )) + return result + + async def send_many(self, mp_pairs): + if mp_pairs: + payload = [{'method': method, 'params': params} + for method, params in mp_pairs] + return await self.send(payload) + return [] + + async def send_vector(self, method, params_list): + if params_list: + payload = [{'method': method, 'params': params} + for params in params_list] + return await self.send(payload) + return [] + + async def send(self, payload): + assert isinstance(payload, (tuple, list)) + data = json.dumps(payload) + while True: + try: + async with aiohttp.post(self.url, data=data) as resp: + result = await resp.json() + except asyncio.CancelledError: + raise + except Exception as e: + msg = 'aiohttp error: {}'.format(e) + secs = 3 + else: + errs = tuple(item['error'] for item in result) + if not any(errs): + return tuple(item['result'] for item in result) + if any(err.get('code') == -28 for err in errs): + msg = 'daemon still warming up.' + secs = 30 + else: + msg = '{}'.format(errs) + raise DaemonError(msg) + + self.logger.error('{}. Sleeping {:d}s and trying again...' + .format(msg, secs)) + await asyncio.sleep(secs) + + async def block_hex_hashes(self, first, count): + '''Return the hex hashes of count block starting at height first.''' + param_lists = [[height] for height in range(first, first + count)] + return await self.send_vector('getblockhash', param_lists) + + async def raw_blocks(self, hex_hashes): + '''Return the raw binary blocks with the given hex hashes.''' + param_lists = [(h, False) for h in hex_hashes] + blocks = await self.send_vector('getblock', param_lists) + # Convert hex string to bytes + return [bytes.fromhex(block) for block in blocks] + + async def height(self): + '''Query the daemon for its current height.''' + self._height = await self.send_single('getblockcount') + return self._height + + def cached_height(self): + '''Return the cached daemon height. + + If the daemon has not been queried yet this returns None.''' + return self._height diff --git a/server/db.py b/server/db.py deleted file mode 100644 index 85e2231..0000000 --- a/server/db.py +++ /dev/null @@ -1,679 +0,0 @@ -# See the file "LICENSE" for information about the copyright -# and warranty status of this software. - -import array -import ast -import itertools -import os -import struct -import time -from binascii import hexlify, unhexlify -from bisect import bisect_right -from collections import defaultdict, namedtuple -from functools import partial -import logging - -import plyvel - -from lib.coins import Bitcoin -from lib.script import ScriptPubKey - -# History can hold approx. 65536 * HIST_ENTRIES_PER_KEY entries -HIST_ENTRIES_PER_KEY = 1024 -HIST_VALUE_BYTES = HIST_ENTRIES_PER_KEY * 4 -ADDR_TX_HASH_LEN = 4 -UTXO_TX_HASH_LEN = 4 -UTXO = namedtuple("UTXO", "tx_num tx_pos tx_hash height value") - - -def formatted_time(t): - t = int(t) - return '{:d}d {:02d}h {:02d}m {:02d}s'.format( - t // 86400, (t % 86400) // 3600, (t % 3600) // 60, t % 60) - - -class UTXOCache(object): - '''An in-memory UTXO cache, representing all changes to UTXO state - since the last DB flush. - - We want to store millions, perhaps 10s of millions of these in - memory for optimal performance during initial sync, because then - it is possible to spend UTXOs without ever going to the database - (other than as an entry in the address history, and there is only - one such entry per TX not per UTXO). So store them in a Python - dictionary with binary keys and values. - - Key: TX_HASH + TX_IDX (32 + 2 = 34 bytes) - Value: HASH168 + TX_NUM + VALUE (21 + 4 + 8 = 33 bytes) - - That's 67 bytes of raw data. Python dictionary overhead means - each entry actually uses about 187 bytes of memory. So almost - 11.5 million UTXOs can fit in 2GB of RAM. There are approximately - 42 million UTXOs on bitcoin mainnet at height 433,000. - - Semantics: - - add: Add it to the cache dictionary. - spend: Remove it if in the cache dictionary. - Otherwise it's been flushed to the DB. Each UTXO - is responsible for two entries in the DB stored using - compressed keys. Mark both for deletion in the next - flush of the in-memory UTXO cache. - - A UTXO is stored in the DB in 2 "tables": - - 1. The output value and tx number. Must be keyed with a - hash168 prefix so the unspent outputs and balance of an - arbitrary address can be looked up with a simple key - traversal. - Key: b'u' + hash168 + compressed_tx_hash + tx_idx - Value: a (tx_num, value) pair - - 2. Given a prevout, we need to be able to look up the UTXO key - to remove it. As is keyed by hash168 and that is not part - of the prevout, we need a hash168 lookup. - Key: b'h' + compressed tx_hash + tx_idx - Value: (hash168, tx_num) pair - - The compressed TX hash is just the first few bytes of the hash of - the TX the UTXO is in (and needn't be the same number of bytes in - each table). As this is not unique there will be collisions; - tx_num is stored to resolve them. The collision rate is around - 0.02% for the hash168 table, and almost zero for the UTXO table - (there are around 100 collisions in the whole bitcoin blockchain). - - ''' - - def __init__(self, parent, db, coin): - self.logger = logging.getLogger('UTXO') - self.logger.setLevel(logging.INFO) - self.parent = parent - self.coin = coin - self.cache = {} - self.db = db - self.db_cache = {} - # Statistics - self.adds = 0 - self.cache_hits = 0 - self.db_deletes = 0 - - def add_many(self, tx_hash, tx_num, txouts): - '''Add a sequence of UTXOs to the cache, return the set of hash168s - seen. - - Pass the hash of the TX it appears in, its TX number, and the - TX outputs. - ''' - parse_script = ScriptPubKey.from_script - pack = struct.pack - tx_numb = pack('H', key[-2:]) - if flush_id > self.utxo_flush_count: - keys.append(key) - - self.logger.info('deleting {:,d} history entries'.format(len(keys))) - with db.write_batch(transaction=True) as batch: - for key in keys: - db.delete(key) - self.utxo_flush_count = self.flush_count - self.flush_state(batch) - self.logger.info('deletion complete') - - def flush_to_fs(self): - '''Flush the things stored on the filesystem.''' - # First the headers - headers = b''.join(self.headers) - header_len = self.coin.HEADER_LEN - self.headers_file.seek((self.fs_height + 1) * header_len) - self.headers_file.write(headers) - self.headers_file.flush() - self.headers = [] - - # Then the tx counts - self.txcount_file.seek((self.fs_height + 1) * self.tx_counts.itemsize) - self.txcount_file.write(self.tx_counts[self.fs_height + 1: - self.height + 1]) - self.txcount_file.flush() - - # Finally the hashes - hashes = memoryview(b''.join(itertools.chain(*self.tx_hashes))) - assert len(hashes) % 32 == 0 - assert self.tx_hash_file_size % 32 == 0 - cursor = 0 - file_pos = self.fs_tx_count * 32 - while cursor < len(hashes): - file_num, offset = divmod(file_pos, self.tx_hash_file_size) - size = min(len(hashes) - cursor, self.tx_hash_file_size - offset) - filename = 'hashes{:04d}'.format(file_num) - with self.open_file(filename, create=True) as f: - f.seek(offset) - f.write(hashes[cursor:cursor + size]) - cursor += size - file_pos += size - self.tx_hashes = [] - - self.fs_height = self.height - self.fs_tx_count = self.tx_count - os.sync() - - def flush_state(self, batch): - '''Flush chain state to the batch.''' - now = time.time() - self.wall_time += now - self.last_flush - self.last_flush = now - state = { - 'genesis': self.coin.GENESIS_HASH, - 'height': self.db_height, - 'tx_count': self.db_tx_count, - 'tip': self.tip, - 'flush_count': self.flush_count, - 'utxo_flush_count': self.utxo_flush_count, - 'wall_time': self.wall_time, - } - batch.put(b'state', repr(state).encode('ascii')) - - def flush_utxos(self, batch): - self.logger.info('flushing UTXOs: {:,d} txs and {:,d} blocks' - .format(self.tx_count - self.db_tx_count, - self.height - self.db_height)) - self.utxo_cache.flush(batch) - self.utxo_flush_count = self.flush_count - self.db_tx_count = self.tx_count - self.db_height = self.height - - def flush(self, daemon_height, flush_utxos=False): - '''Flush out cached state. - - History is always flushed. UTXOs are flushed if flush_utxos.''' - flush_start = time.time() - last_flush = self.last_flush - tx_diff = self.tx_count - self.fs_tx_count - - # Write out the files to the FS before flushing to the DB. If - # the DB transaction fails, the files being too long doesn't - # matter. But if writing the files fails we do not want to - # have updated the DB. - self.logger.info('commencing history flush') - self.flush_to_fs() - - with self.db.write_batch(transaction=True) as batch: - # History first - fast and frees memory. Flush state last - # as it reads the wall time. - self.flush_history(batch) - if flush_utxos: - self.flush_utxos(batch) - self.flush_state(batch) - self.logger.info('committing transaction...') - - # Update and put the wall time again - otherwise we drop the - # time it took leveldb to commit the batch - self.flush_state(self.db) - - flush_time = int(self.last_flush - flush_start) - self.logger.info('flush #{:,d} to height {:,d} took {:,d}s' - .format(self.flush_count, self.height, flush_time)) - - # Log handy stats - txs_per_sec = int(self.tx_count / self.wall_time) - this_txs_per_sec = 1 + int(tx_diff / (self.last_flush - last_flush)) - if self.height > self.coin.TX_COUNT_HEIGHT: - tx_est = (daemon_height - self.height) * self.coin.TX_PER_BLOCK - else: - tx_est = ((daemon_height - self.coin.TX_COUNT_HEIGHT) - * self.coin.TX_PER_BLOCK - + (self.coin.TX_COUNT - self.tx_count)) - - self.logger.info('txs: {:,d} tx/sec since genesis: {:,d}, ' - 'since last flush: {:,d}' - .format(self.tx_count, txs_per_sec, this_txs_per_sec)) - self.logger.info('sync time: {} ETA: {}' - .format(formatted_time(self.wall_time), - formatted_time(tx_est / this_txs_per_sec))) - - def flush_history(self, batch): - # Drop any None entry - self.history.pop(None, None) - - self.flush_count += 1 - flush_id = struct.pack('>H', self.flush_count) - for hash168, hist in self.history.items(): - key = b'H' + hash168 + flush_id - batch.put(key, hist.tobytes()) - - self.logger.info('{:,d} history entries in {:,d} addrs' - .format(self.history_size, len(self.history))) - - self.history = defaultdict(partial(array.array, 'I')) - self.history_size = 0 - - def open_file(self, filename, create=False): - '''Open the file name. Return its handle.''' - try: - return open(filename, 'rb+') - except FileNotFoundError: - if create: - return open(filename, 'wb+') - raise - - def read_headers(self, height, count): - header_len = self.coin.HEADER_LEN - self.headers_file.seek(height * header_len) - return self.headers_file.read(count * header_len) - - def cache_sizes(self, daemon_height): - '''Returns the approximate size of the cache, in MB.''' - # Good average estimates based on traversal of subobjects and - # requesting size from Python (see deep_getsizeof). For - # whatever reason Python O/S mem usage is typically +30% or - # more, so we scale our already bloated object sizes. - one_MB = int(1048576 / 1.3) - utxo_cache_size = len(self.utxo_cache.cache) * 187 - db_cache_size = len(self.utxo_cache.db_cache) * 105 - hist_cache_size = len(self.history) * 180 + self.history_size * 4 - utxo_MB = (db_cache_size + utxo_cache_size) // one_MB - hist_MB = hist_cache_size // one_MB - - self.logger.info('cache stats at height {:,d} daemon height: {:,d}' - .format(self.height, daemon_height)) - self.logger.info(' entries: UTXO: {:,d} DB: {:,d} ' - 'hist addrs: {:,d} hist size: {:,d}' - .format(len(self.utxo_cache.cache), - len(self.utxo_cache.db_cache), - len(self.history), - self.history_size)) - self.logger.info(' size: {:,d}MB (UTXOs {:,d}MB hist {:,d}MB)' - .format(utxo_MB + hist_MB, utxo_MB, hist_MB)) - return utxo_MB, hist_MB - - def process_block(self, block, daemon_height): - self.headers.append(block[:self.coin.HEADER_LEN]) - - tx_hashes, txs = self.coin.read_block(block) - self.height += 1 - - assert len(self.tx_counts) == self.height - - # These both need to be updated before calling process_tx(). - # It uses them for tx hash lookup - self.tx_hashes.append(tx_hashes) - self.tx_counts.append(self.tx_count + len(txs)) - - for tx_hash, tx in zip(tx_hashes, txs): - self.process_tx(tx_hash, tx) - - # Check if we're getting full and time to flush? - now = time.time() - if now > self.next_cache_check: - self.next_cache_check = now + 60 - utxo_MB, hist_MB = self.cache_sizes(daemon_height) - if utxo_MB >= self.utxo_MB or hist_MB >= self.hist_MB: - self.flush(daemon_height, utxo_MB >= self.utxo_MB) - - def process_tx(self, tx_hash, tx): - cache = self.utxo_cache - tx_num = self.tx_count - - # Add the outputs as new UTXOs; spend the inputs - hash168s = cache.add_many(tx_hash, tx_num, tx.outputs) - if not tx.is_coinbase: - for txin in tx.inputs: - hash168s.add(cache.spend(txin.prevout)) - - for hash168 in hash168s: - self.history[hash168].append(tx_num) - self.history_size += len(hash168s) - - self.tx_count += 1 - - def get_tx_hash(self, tx_num): - '''Returns the tx_hash and height of a tx number.''' - height = bisect_right(self.tx_counts, tx_num) - - # Is this on disk or unflushed? - if height > self.fs_height: - tx_hashes = self.tx_hashes[height - (self.fs_height + 1)] - tx_hash = tx_hashes[tx_num - self.tx_counts[height - 1]] - else: - file_pos = tx_num * 32 - file_num, offset = divmod(file_pos, self.tx_hash_file_size) - filename = 'hashes{:04d}'.format(file_num) - with self.open_file(filename) as f: - f.seek(offset) - tx_hash = f.read(32) - - return tx_hash, height - - @staticmethod - def resolve_limit(limit): - if limit is None: - return -1 - assert isinstance(limit, int) and limit >= 0 - return limit - - def get_history(self, hash168, limit=1000): - '''Generator that returns an unpruned, sorted list of (tx_hash, - height) tuples of transactions that touched the address, - earliest in the blockchain first. Includes both spending and - receiving transactions. By default yields at most 1000 entries. - Set limit to None to get them all. - ''' - limit = self.resolve_limit(limit) - prefix = b'H' + hash168 - for key, hist in self.db.iterator(prefix=prefix): - a = array.array('I') - a.frombytes(hist) - for tx_num in a: - if limit == 0: - return - yield self.get_tx_hash(tx_num) - limit -= 1 - - def get_balance(self, hash168): - '''Returns the confirmed balance of an address.''' - return sum(utxo.value for utxo in self.get_utxos(hash168, limit=None)) - - def get_utxos(self, hash168, limit=1000): - '''Generator that yields all UTXOs for an address sorted in no - particular order. By default yields at most 1000 entries. - Set limit to None to get them all. - ''' - limit = self.resolve_limit(limit) - unpack = struct.unpack - prefix = b'u' + hash168 - utxos = [] - for k, v in self.db.iterator(prefix=prefix): - (tx_pos, ) = unpack(' cache_limit: - return True - - # Keep going by getting a whole new cache_limit of blocks - self.daemon_height = await self.send_single('getblockcount') - max_count = min(self.daemon_height - self.fetched_height, 4000) - count = min(max_count, self.prefill_count(cache_limit)) - if not count: - return False # Done catching up - - first = self.fetched_height + 1 - param_lists = [[height] for height in range(first, first + count)] - hashes = await self.send_vector('getblockhash', param_lists) - - # Hashes is an array of hex strings - param_lists = [(h, False) for h in hashes] - blocks = await self.send_vector('getblock', param_lists) - self.fetched_height += count - - # Convert hex string to bytes and put in memoryview - blocks = [bytes.fromhex(block) for block in blocks] - # Reverse order and place at front of list - self.blocks = list(reversed(blocks)) + self.blocks - - # Keep 50 most recent block sizes for fetch count estimation - sizes = [len(block) for block in blocks] - self.recent_sizes.extend(sizes) - excess = len(self.recent_sizes) - 50 - if excess > 0: - self.recent_sizes = self.recent_sizes[excess:] - self.ave_size = sum(self.recent_sizes) // len(self.recent_sizes) - - async def send_single(self, method, params=None): - payload = {'method': method} - if params: - payload['params'] = params - result, = await self.send((payload, )) - return result - - async def send_many(self, mp_pairs): - payload = [{'method': method, 'params': params} - for method, params in mp_pairs] - return await self.send(payload) - - async def send_vector(self, method, params_list): - payload = [{'method': method, 'params': params} - for params in params_list] - return await self.send(payload) - - async def send(self, payload): - assert isinstance(payload, (tuple, list)) - data = json.dumps(payload) - while True: - try: - async with aiohttp.request('POST', self.rpc_url, - data = data) as resp: - result = await resp.json() - except asyncio.CancelledError: - raise - except Exception as e: - msg = 'aiohttp error: {}'.format(e) - secs = 3 - else: - errs = tuple(item['error'] for item in result) - if not any(errs): - return tuple(item['result'] for item in result) - if any(err.get('code') == -28 for err in errs): - msg = 'daemon still warming up.' - secs = 30 - else: - msg = 'daemon errors: {}'.format(errs) - secs = 3 - - self.logger.error('{}. Sleeping {:d}s and trying again...' - .format(msg, secs)) - await asyncio.sleep(secs) diff --git a/server/version.py b/server/version.py new file mode 100644 index 0000000..abb6c1a --- /dev/null +++ b/server/version.py @@ -0,0 +1 @@ +VERSION = "ElectrumX 0.02" diff --git a/server_main.py b/server_main.py index 3166d24..3a82c48 100755 --- a/server_main.py +++ b/server_main.py @@ -9,7 +9,7 @@ import os import traceback from server.env import Env -from server.server import Server +from server.controller import Controller def main_loop(): @@ -22,15 +22,19 @@ def main_loop(): logging.info('switching current directory to {}'.format(env.db_dir)) os.chdir(env.db_dir) - server = Server(env) - tasks = server.async_tasks() - loop = asyncio.get_event_loop() + #loop.set_debug(True) + + controller = Controller(loop, env) + controller.start() + + tasks = asyncio.Task.all_tasks(loop) try: loop.run_until_complete(asyncio.gather(*tasks)) except asyncio.CancelledError: logging.warning('task cancelled; asyncio event loop closing') finally: + controller.stop() loop.close()