From c09182534f65b2a15a8685515a21594f742757ca Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Sat, 29 Oct 2016 16:24:49 +0900 Subject: [PATCH 01/17] Update for release --- HOWTO.rst | 27 ++++++++++++----------- README.rst | 1 + RELEASE-NOTES | 11 +++++++++ samples/scripts/NOTES | 22 +++++++++++++----- samples/scripts/env/RPC_PORT | 1 - samples/scripts/env/SSL_PORT | 1 - samples/scripts/env/TCP_PORT | 0 samples/scripts/env/{CACHE_MB => UTXO_MB} | 0 server/env.py | 5 +++-- 9 files changed, 45 insertions(+), 23 deletions(-) create mode 100644 RELEASE-NOTES delete mode 100644 samples/scripts/env/RPC_PORT delete mode 100644 samples/scripts/env/SSL_PORT delete mode 100644 samples/scripts/env/TCP_PORT rename samples/scripts/env/{CACHE_MB => UTXO_MB} (100%) diff --git a/HOWTO.rst b/HOWTO.rst index 5ccc4fe..2f7abd1 100644 --- a/HOWTO.rst +++ b/HOWTO.rst @@ -13,7 +13,7 @@ small - patches welcome. using 1.0.5. While not requirements for running ElectrumX, it is intended to be run -with supervisor software such as Daniel Bernstein's daemontools, +with supervisor software such as Daniel Bernstein's daemontools, Gerald Pape's runit package or systemd. These make administration of secure unix servers very easy, and I strongly recommend you install one of these and familiarise yourself with them. The instructions below and sample @@ -120,7 +120,7 @@ setup ElectrumX with systemd. Simply copy it to :code:`/etc/systemd/system`:: cp samples/systemd-unit /etc/systemd/system/electrumx.service -The sample unit file assumes that the repository is located at +The sample unit file assumes that the repository is located at :code:`/home/electrumx/electrumx`. If that differs on your system, you need to change the unit file accordingly. @@ -140,7 +140,6 @@ Once configured, you may want to start ElectrumX at boot:: systemctl enable electrumx - Sync Progress ============= @@ -161,13 +160,14 @@ Here is my experience with the current codebase, to given heights and rough wall-time:: Machine A Machine B DB + Metadata - 180,000 7m 10s 0.4 GiB - 245,800 1h 00m 2.7 GiB - 290,000 1h 56m 3.3 GiB - 343,000 3h 56m 6.0 GiB - 386,000 7h 28m 7.0 GiB - 404,000 9h 41m - 434,369 14h 38m 17.1 GiB + 181,000 7m 09s 0.4 GiB + 255,000 1h 02m 2.7 GiB + 289,000 1h 46m 3.3 GiB + 317,000 2h 33m + 351,000 3h 58m + 377,000 6h 06m 6.5 GiB + 403,400 8h 51m + 436,196 14h 03m 17.3 GiB Machine A: a low-spec 2011 1.6GHz AMD E-350 dual-core fanless CPU, 8GB RAM and a DragonFlyBSD HAMMER fileystem on an SSD. It requests blocks @@ -175,7 +175,7 @@ over the LAN from a bitcoind on machine B. Machine B: a late 2012 iMac running El-Capitan 10.11.6, 2.9GHz quad-core Intel i5 CPU with an HDD and 24GB RAM. Running bitcoind on -the same machine. HIST_MB of 400, CACHE_MB of 2,000. +the same machine. HIST_MB of 350, UTXO_MB of 1,600. For chains other than bitcoin-mainnet sychronization should be much faster. @@ -192,7 +192,7 @@ by bringing it down like so:: If processing the blockchain the server will start the process of flushing to disk. Once that is complete the server will exit. Be -patient as disk flushing can take a while. +patient as disk flushing can take many minutes. ElectrumX flushes to leveldb using its transaction functionality. The plyvel documentation claims this is atomic. I have written ElectrumX @@ -262,4 +262,5 @@ After flush-to-disk you may see an aiohttp error; this is the daemon timing out the connection while the disk flush was in progress. This is harmless. -The ETA is just a guide and can be quite volatile. +The ETA is just a guide and can be quite volatile. It is too optimistic +initially. diff --git a/README.rst b/README.rst index b44b5e1..ea84b5e 100644 --- a/README.rst +++ b/README.rst @@ -97,6 +97,7 @@ As I've been researching where the time is going during block chain indexing and how various cache sizes and hardware choices affect it, I'd appreciate it if anyone trying to synchronize could tell me:: + - the version of ElectrumX - their O/S and filesystem - their hardware (CPU name and speed, RAM, and disk kind) - whether their daemon was on the same host or not diff --git a/RELEASE-NOTES b/RELEASE-NOTES new file mode 100644 index 0000000..6352940 --- /dev/null +++ b/RELEASE-NOTES @@ -0,0 +1,11 @@ +Version 0.02 +------------ + +- fix bug where tx counts were incorrectly saved +- large clean-up and refactoring of code, breakout into new files +- several efficiency improvements +- initial implementation of chain reorg handling +- work on RPC and TCP server functionality. Code committed but not + functional, so currently disabled +- note that some of the enivronment variables have been renamed, + see samples/scripts/NOTES for the list \ No newline at end of file diff --git a/samples/scripts/NOTES b/samples/scripts/NOTES index 2287788..d38a327 100644 --- a/samples/scripts/NOTES +++ b/samples/scripts/NOTES @@ -5,6 +5,11 @@ NETWORK - see lib/coins.py, must be a coin NET DB_DIRECTORY - path to the database directory (if relative, to run script) USERNAME - the username the server will run as SERVER_MAIN - path to the server_main.py script (if relative, to run script) +DAEMON_URL - the URL used to connect to the daemon. Should be of the form + http://username:password@hostname:port/ + Alternatively you can specify DAEMON_USERNAME, DAEMON_PASSWORD, + DAEMON_HOST and DAEMON_PORT. DAEMON_PORT is optional and + will default appropriately for COIN. In addition either RPC_URL must be given as the full RPC URL for connecting to the daemon, or you must specify RPC_HOST, RPC_USER, @@ -14,6 +19,11 @@ the coin and network otherwise). The other environment variables are all optional and will adopt sensible defaults if not specified. +REORG_LIMIT - maximum number of blocks to be able to handle in a chain + reorganisation. ElectrumX retains some fairly compact + undo information for this many blocks in levelDB. + Default is 200. + Your performance might change by tweaking these cache settings. Cache size is only checked roughly every minute, so the caches can grow beyond the specified size. Also the Python process is often quite a @@ -25,14 +35,14 @@ HIST_MB - amount of history cache, in MB, to retain before flushing to disk. Default is 250; probably no benefit being much larger as history is append-only and not searched. -CACHE_MB- amount of UTXO and history cache, in MB, to retain before +UTXO_MB - amount of UTXO and history cache, in MB, to retain before flushing to disk. Default is 1000. This may be too large for small boxes or too small for machines with lots of RAM. Larger caches generally perform better as there is significant searching of the UTXO cache during indexing. However, I don't see much benefit in my tests pushing this - beyond 2000, and in fact beyond there performance begins to - fall. My machine has 24GB RAM; the slow down is probably - because of leveldb caching and Python GC effects. However - this may be very dependent on hardware and you may have - different results. \ No newline at end of file + too high, and in fact performance begins to fall. My + machine has 24GB RAM; the slow down is probably because of + leveldb caching and Python GC effects. However this may be + very dependent on hardware and you may have different + results. \ No newline at end of file diff --git a/samples/scripts/env/RPC_PORT b/samples/scripts/env/RPC_PORT deleted file mode 100644 index e002b36..0000000 --- a/samples/scripts/env/RPC_PORT +++ /dev/null @@ -1 +0,0 @@ -8000 diff --git a/samples/scripts/env/SSL_PORT b/samples/scripts/env/SSL_PORT deleted file mode 100644 index d1c5b6b..0000000 --- a/samples/scripts/env/SSL_PORT +++ /dev/null @@ -1 +0,0 @@ -50002 diff --git a/samples/scripts/env/TCP_PORT b/samples/scripts/env/TCP_PORT deleted file mode 100644 index e69de29..0000000 diff --git a/samples/scripts/env/CACHE_MB b/samples/scripts/env/UTXO_MB similarity index 100% rename from samples/scripts/env/CACHE_MB rename to samples/scripts/env/UTXO_MB diff --git a/server/env.py b/server/env.py index 0bee0ad..0a447ed 100644 --- a/server/env.py +++ b/server/env.py @@ -22,11 +22,12 @@ class Env(LoggedClass): self.utxo_MB = self.integer('UTXO_MB', 1000) self.hist_MB = self.integer('HIST_MB', 250) self.host = self.default('HOST', 'localhost') + self.reorg_limit = self.integer('REORG_LIMIT', 200) + self.daemon_url = self.build_daemon_url() + # Server stuff self.tcp_port = self.integer('TCP_PORT', None) self.ssl_port = self.integer('SSL_PORT', None) self.rpc_port = self.integer('RPC_PORT', 8000) - self.reorg_limit = self.integer('REORG_LIMIT', 200) - self.daemon_url = self.build_daemon_url() self.max_subscriptions = self.integer('MAX_SUBSCRIPTIONS', 10000) self.banner_file = self.default('BANNER_FILE', None) # The electrum client takes the empty string as unspecified From 58a5e69fcadb056574304aaddea2da4edbb3304b Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Thu, 3 Nov 2016 16:30:02 +0900 Subject: [PATCH 02/17] Fix logging issues on sync without the perf hit --- server/block_processor.py | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/server/block_processor.py b/server/block_processor.py index e1eebad..aeaade9 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -69,7 +69,7 @@ class Prefetcher(LoggedClass): async def start(self): '''Loop forever polling for more blocks.''' - self.logger.info('prefetching blocks...') + self.logger.info('looping forever prefetching blocks...') while True: while self.queue_size < self.target_cache_size: try: @@ -89,10 +89,13 @@ class Prefetcher(LoggedClass): daemon_height = await self.daemon.height() max_count = min(daemon_height - self.fetched_height, 4000) count = min(max_count, self._prefill_count(self.target_cache_size)) + if not count: + return 0 first = self.fetched_height + 1 hex_hashes = await self.daemon.block_hex_hashes(first, count) if not hex_hashes: - return + self.logger.error('requested {:,d} hashes, got none'.format(count)) + return 0 blocks = await self.daemon.raw_blocks(hex_hashes) sizes = [len(block) for block in blocks] @@ -106,6 +109,7 @@ class Prefetcher(LoggedClass): excess = len(self.recent_sizes) - 50 if excess > 0: self.recent_sizes = self.recent_sizes[excess:] + return count class BlockProcessor(LoggedClass): @@ -135,6 +139,7 @@ class BlockProcessor(LoggedClass): self.flush_count = 0 self.utxo_flush_count = 0 self.wall_time = 0 + self.first_sync = True # Open DB and metadata files. Record some of its state. self.db = self.open_db(self.coin) @@ -296,6 +301,7 @@ class BlockProcessor(LoggedClass): self.flush_count = state['flush_count'] self.utxo_flush_count = state['utxo_flush_count'] self.wall_time = state['wall_time'] + self.first_sync = state.get('first_sync', True) def clean_db(self): '''Clean out stale DB items. @@ -347,6 +353,8 @@ class BlockProcessor(LoggedClass): def flush_state(self, batch): '''Flush chain state to the batch.''' + if self.caught_up: + self.first_sync = False now = time.time() self.wall_time += now - self.last_flush self.last_flush = now @@ -359,6 +367,7 @@ class BlockProcessor(LoggedClass): 'flush_count': self.flush_count, 'utxo_flush_count': self.utxo_flush_count, 'wall_time': self.wall_time, + 'first_sync': self.first_sync, } batch.put(b'state', repr(state).encode()) @@ -392,6 +401,7 @@ class BlockProcessor(LoggedClass): flush_start = time.time() last_flush = self.last_flush tx_diff = self.tx_count - self.last_flush_tx_count + show_stats = self.first_sync # Write out the files to the FS before flushing to the DB. If # the DB transaction fails, the files being too long doesn't @@ -422,7 +432,7 @@ class BlockProcessor(LoggedClass): flush_time)) # Catch-up stats - if not self.caught_up and tx_diff > 0: + if show_stats: daemon_height = self.daemon.cached_height() txs_per_sec = int(self.tx_count / self.wall_time) this_txs_per_sec = 1 + int(tx_diff / (self.last_flush - last_flush)) From d2ebb80fac2005530ef09ea00cf205257072bb61 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Thu, 3 Nov 2016 16:45:06 +0900 Subject: [PATCH 03/17] Extend copyright notice; improve comments --- electrumx_rpc.py | 11 +++++++-- lib/coins.py | 48 ++++++++++++++++++++++++++++++--------- lib/enum.py | 23 +++++++++++++------ lib/hash.py | 44 +++++++++++++++++++++++------------ lib/script.py | 25 ++++++++++++++------ lib/tx.py | 15 ++++++++++-- lib/util.py | 10 +++++++- query.py | 14 ++++++++++-- server/block_processor.py | 9 +++++++- server/cache.py | 13 ++++++++++- server/controller.py | 12 +++++++++- server/daemon.py | 8 +++++-- server/env.py | 9 +++++++- server/protocol.py | 9 +++++++- server_main.py | 11 +++++++-- 15 files changed, 205 insertions(+), 56 deletions(-) diff --git a/electrumx_rpc.py b/electrumx_rpc.py index 68fc74c..2ce4dee 100755 --- a/electrumx_rpc.py +++ b/electrumx_rpc.py @@ -1,8 +1,15 @@ #!/usr/bin/env python3 - -# See the file "LICENSE" for information about the copyright +# +# Copyright (c) 2016, Neil Booth +# +# All rights reserved. +# +# See the file "LICENCE" for information about the copyright # and warranty status of this software. +'''Script to send RPC commands to a running ElectrumX server.''' + + import argparse import asyncio import json diff --git a/lib/coins.py b/lib/coins.py index 30a0b12..6537e74 100644 --- a/lib/coins.py +++ b/lib/coins.py @@ -1,6 +1,15 @@ -# See the file "LICENSE" for information about the copyright +# Copyright (c) 2016, Neil Booth +# +# All rights reserved. +# +# See the file "LICENCE" for information about the copyright # and warranty status of this software. +'''Module providing coin abstraction. + +Anything coin-specific should go in this file and be subclassed where +necessary for appropriate handling. +''' from decimal import Decimal import inspect @@ -12,7 +21,7 @@ from lib.tx import Deserializer class CoinError(Exception): - pass + '''Exception raised for coin-related errors.''' class Coin(object): @@ -24,17 +33,20 @@ class Coin(object): VALUE_PER_COIN = 100000000 @staticmethod - def coins(): + def coin_classes(): + '''Return a list of coin classes in declaration order.''' is_coin = lambda obj: (inspect.isclass(obj) and issubclass(obj, Coin) and obj != Coin) pairs = inspect.getmembers(sys.modules[__name__], is_coin) - # Returned in the order they appear in this file return [pair[1] for pair in pairs] @classmethod def lookup_coin_class(cls, name, net): - for coin in cls.coins(): + '''Return a coin class given name and network. + + Raise an exception if unrecognised.''' + for coin in cls.coin_classes(): if (coin.NAME.lower() == name.lower() and coin.NET.lower() == net.lower()): return coin @@ -43,13 +55,14 @@ class Coin(object): @staticmethod def lookup_xverbytes(verbytes): + '''Return a (is_xpub, coin_class) pair given xpub/xprv verbytes.''' # Order means BTC testnet will override NMC testnet - for coin in Coin.coins(): + for coin in Coin.coin_classes(): if verbytes == coin.XPUB_VERBYTES: return True, coin if verbytes == coin.XPRV_VERBYTES: return False, coin - raise CoinError("version bytes unrecognised") + raise CoinError('version bytes unrecognised') @classmethod def address_to_hash168(cls, addr): @@ -129,7 +142,7 @@ class Coin(object): @classmethod def prvkey_WIF(privkey_bytes, compressed): - "Return the private key encoded in Wallet Import Format." + '''Return the private key encoded in Wallet Import Format.''' payload = bytearray([cls.WIF_BYTE]) + privkey_bytes if compressed: payload.append(0x01) @@ -137,18 +150,22 @@ class Coin(object): @classmethod def header_hashes(cls, header): - '''Given a header return the previous block hash and the current block - hash.''' + '''Given a header return the previous and current block hashes.''' return header[4:36], double_sha256(header) @classmethod def read_block(cls, block): - '''Read a block and return (header, tx_hashes, txs)''' + '''Return a tuple (header, tx_hashes, txs) given a raw block.''' header, rest = block[:cls.HEADER_LEN], block[cls.HEADER_LEN:] return (header, ) + Deserializer(rest).read_block() @classmethod def decimal_value(cls, value): + '''Return the number of standard coin units as a Decimal given a + quantity of smallest units. + + For example 1 BTC is returned for 100 million satoshis. + ''' return Decimal(value) / cls.VALUE_PER_COIN @@ -167,6 +184,7 @@ class Bitcoin(Coin): TX_COUNT_HEIGHT = 420976 TX_PER_BLOCK = 1600 + class BitcoinTestnet(Coin): NAME = "Bitcoin" SHORTNAME = "XTN" @@ -177,6 +195,7 @@ class BitcoinTestnet(Coin): P2SH_VERBYTE = 0xc4 WIF_BYTE = 0xef + # Source: pycoin and others class Litecoin(Coin): NAME = "Litecoin" @@ -188,6 +207,7 @@ class Litecoin(Coin): P2SH_VERBYTE = 0x05 WIF_BYTE = 0xb0 + class LitecoinTestnet(Coin): NAME = "Litecoin" SHORTNAME = "XLT" @@ -198,6 +218,7 @@ class LitecoinTestnet(Coin): P2SH_VERBYTE = 0xc4 WIF_BYTE = 0xef + # Source: namecoin.org class Namecoin(Coin): NAME = "Namecoin" @@ -209,6 +230,7 @@ class Namecoin(Coin): P2SH_VERBYTE = 0x0d WIF_BYTE = 0xe4 + class NamecoinTestnet(Coin): NAME = "Namecoin" SHORTNAME = "XNM" @@ -219,6 +241,7 @@ class NamecoinTestnet(Coin): P2SH_VERBYTE = 0xc4 WIF_BYTE = 0xef + # For DOGE there is disagreement across sites like bip32.org and # pycoin. Taken from bip32.org and bitmerchant on github class Dogecoin(Coin): @@ -231,6 +254,7 @@ class Dogecoin(Coin): P2SH_VERBYTE = 0x16 WIF_BYTE = 0x9e + class DogecoinTestnet(Coin): NAME = "Dogecoin" SHORTNAME = "XDT" @@ -241,6 +265,7 @@ class DogecoinTestnet(Coin): P2SH_VERBYTE = 0xc4 WIF_BYTE = 0xf1 + # Source: pycoin class Dash(Coin): NAME = "Dash" @@ -252,6 +277,7 @@ class Dash(Coin): P2SH_VERBYTE = 0x10 WIF_BYTE = 0xcc + class DashTestnet(Coin): NAME = "Dogecoin" SHORTNAME = "tDASH" diff --git a/lib/enum.py b/lib/enum.py index e137c7f..920e038 100644 --- a/lib/enum.py +++ b/lib/enum.py @@ -1,8 +1,17 @@ -# enum-like type -# From the Python Cookbook from http://code.activestate.com/recipes/67107/ +# Copyright (c) 2016, Neil Booth +# +# All rights reserved. +# +# See the file "LICENCE" for information about the copyright +# and warranty status of this software. +'''An enum-like type with reverse lookup. -class EnumException(Exception): +Source: Python Cookbook, http://code.activestate.com/recipes/67107/ +''' + + +class EnumError(Exception): pass @@ -20,13 +29,13 @@ class Enumeration: if isinstance(x, tuple): x, i = x if not isinstance(x, str): - raise EnumException("enum name {} not a string".format(x)) + raise EnumError("enum name {} not a string".format(x)) if not isinstance(i, int): - raise EnumException("enum value {} not an integer".format(i)) + raise EnumError("enum value {} not an integer".format(i)) if x in uniqueNames: - raise EnumException("enum name {} not unique".format(x)) + raise EnumError("enum name {} not unique".format(x)) if i in uniqueValues: - raise EnumException("enum value {} not unique".format(x)) + raise EnumError("enum value {} not unique".format(x)) uniqueNames.add(x) uniqueValues.add(i) lookup[x] = i diff --git a/lib/hash.py b/lib/hash.py index bf0aed6..1563adc 100644 --- a/lib/hash.py +++ b/lib/hash.py @@ -1,6 +1,13 @@ -# See the file "LICENSE" for information about the copyright +# Copyright (c) 2016, Neil Booth +# +# All rights reserved. +# +# See the file "LICENCE" for information about the copyright # and warranty status of this software. +'''Cryptograph hash functions and related classes.''' + + import hashlib import hmac @@ -8,11 +15,13 @@ from lib.util import bytes_to_int, int_to_bytes def sha256(x): + '''Simple wrapper of hashlib sha256.''' assert isinstance(x, (bytes, bytearray, memoryview)) return hashlib.sha256(x).digest() def ripemd160(x): + '''Simple wrapper of hashlib ripemd160.''' assert isinstance(x, (bytes, bytearray, memoryview)) h = hashlib.new('ripemd160') h.update(x) @@ -20,36 +29,41 @@ def ripemd160(x): def double_sha256(x): + '''SHA-256 of SHA-256, as used extensively in bitcoin.''' return sha256(sha256(x)) def hmac_sha512(key, msg): + '''Use SHA-512 to provide an HMAC.''' return hmac.new(key, msg, hashlib.sha512).digest() def hash160(x): + '''RIPEMD-160 of SHA-256. + + Used to make bitcoin addresses from pubkeys.''' return ripemd160(sha256(x)) + def hash_to_str(x): - '''Converts a big-endian binary hash to a little-endian hex string, as - shown in block explorers, etc. + '''Convert a big-endian binary hash to displayed hex string. + + Display form of a binary hash is reversed and converted to hex. ''' return bytes(reversed(x)).hex() + def hex_str_to_hash(x): - '''Converts a little-endian hex string as shown to a big-endian binary - hash.''' + '''Convert a displayed hex string to a binary hash.''' return bytes(reversed(bytes.fromhex(x))) -class InvalidBase58String(Exception): - pass - -class InvalidBase58CheckSum(Exception): - pass +class Base58Error(Exception): + '''Exception used for Base58 errors.''' class Base58(object): + '''Class providing base 58 functionality.''' chars = '123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz' assert len(chars) == 58 @@ -59,17 +73,17 @@ class Base58(object): def char_value(c): val = Base58.cmap.get(c) if val is None: - raise InvalidBase58String + raise Base58Error('invalid base 58 character "{}"'.format(c)) return val @staticmethod def decode(txt): """Decodes txt into a big-endian bytearray.""" if not isinstance(txt, str): - raise InvalidBase58String("a string is required") + raise Base58Error('a string is required') if not txt: - raise InvalidBase58String("string cannot be empty") + raise Base58Error('string cannot be empty') value = 0 for c in txt: @@ -112,14 +126,14 @@ class Base58(object): be_bytes = Base58.decode(txt) result, check = be_bytes[:-4], be_bytes[-4:] if check != double_sha256(result)[:4]: - raise InvalidBase58CheckSum + raise Base58Error('invalid base 58 checksum for {}'.format(txt)) return result @staticmethod def encode_check(payload): """Encodes a payload bytearray (which includes the version byte(s)) into a Base58Check string.""" - assert isinstance(payload, (bytes, bytearray)) + assert isinstance(payload, (bytes, bytearray, memoryview)) be_bytes = payload + double_sha256(payload)[:4] return Base58.encode(be_bytes) diff --git a/lib/script.py b/lib/script.py index 0239a1d..3894892 100644 --- a/lib/script.py +++ b/lib/script.py @@ -1,6 +1,13 @@ -# See the file "LICENSE" for information about the copyright +# Copyright (c) 2016, Neil Booth +# +# All rights reserved. +# +# See the file "LICENCE" for information about the copyright # and warranty status of this software. +'''Script-related classes and functions.''' + + from binascii import hexlify import struct @@ -10,7 +17,7 @@ from lib.util import cachedproperty class ScriptError(Exception): - pass + '''Exception used for script errors.''' OpCodes = Enumeration("Opcodes", [ @@ -52,7 +59,9 @@ assert OpCodes.OP_CHECKMULTISIG == 0xae class ScriptSig(object): - '''A script from a tx input, typically provides one or more signatures.''' + '''A script from a tx input. + + Typically provides one or more signatures.''' SIG_ADDRESS, SIG_MULTI, SIG_PUBKEY, SIG_UNKNOWN = range(4) @@ -73,8 +82,9 @@ class ScriptSig(object): @classmethod def from_script(cls, script, coin): - '''Returns an instance of this class. Uncrecognised scripts return - an object of kind SIG_UNKNOWN.''' + '''Return an instance of this class. + + Return an object with kind SIG_UNKNOWN for unrecognised scripts.''' try: return cls.parse_script(script, coin) except ScriptError: @@ -82,8 +92,9 @@ class ScriptSig(object): @classmethod def parse_script(cls, script, coin): - '''Returns an instance of this class. Raises on unrecognised - scripts.''' + '''Return an instance of this class. + + Raises on unrecognised scripts.''' ops, datas = Script.get_ops(script) # Address, PubKey and P2SH redeems only push data diff --git a/lib/tx.py b/lib/tx.py index c45425c..7ea35f4 100644 --- a/lib/tx.py +++ b/lib/tx.py @@ -1,6 +1,13 @@ -# See the file "LICENSE" for information about the copyright +# Copyright (c) 2016, Neil Booth +# +# All rights reserved. +# +# See the file "LICENCE" for information about the copyright # and warranty status of this software. +'''Transaction-related classes and functions.''' + + from collections import namedtuple import struct @@ -9,6 +16,7 @@ from lib.hash import double_sha256, hash_to_str class Tx(namedtuple("Tx", "version inputs outputs locktime")): + '''Class representing a transaction.''' @cachedproperty def is_coinbase(self): @@ -17,6 +25,7 @@ class Tx(namedtuple("Tx", "version inputs outputs locktime")): # FIXME: add hash as a cached property? class TxInput(namedtuple("TxInput", "prev_hash prev_idx script sequence")): + '''Class representing a transaction input.''' ZERO = bytes(32) MINUS_1 = 4294967295 @@ -41,6 +50,7 @@ class TxInput(namedtuple("TxInput", "prev_hash prev_idx script sequence")): class TxOutput(namedtuple("TxOutput", "value pk_script")): + '''Class representing a transaction output.''' @cachedproperty def pay_to(self): @@ -48,9 +58,10 @@ class TxOutput(namedtuple("TxOutput", "value pk_script")): class Deserializer(object): + '''Deserializes blocks into transactions.''' def __init__(self, binary): - assert isinstance(binary, (bytes, memoryview)) + assert isinstance(binary, bytes) self.binary = binary self.cursor = 0 diff --git a/lib/util.py b/lib/util.py index 552a352..f59439b 100644 --- a/lib/util.py +++ b/lib/util.py @@ -1,6 +1,13 @@ -# See the file "LICENSE" for information about the copyright +# Copyright (c) 2016, Neil Booth +# +# All rights reserved. +# +# See the file "LICENCE" for information about the copyright # and warranty status of this software. +'''Miscellaneous utility classes and functions.''' + + import array import logging import sys @@ -72,6 +79,7 @@ def deep_getsizeof(obj): def chunks(items, size): + '''Break up items, an iterable, into chunks of length size.''' for i in range(0, len(items), size): yield items[i: i + size] diff --git a/query.py b/query.py index 57941fa..faea87a 100755 --- a/query.py +++ b/query.py @@ -1,8 +1,18 @@ #!/usr/bin/env python3 - -# See the file "LICENSE" for information about the copyright +# +# Copyright (c) 2016, Neil Booth +# +# All rights reserved. +# +# See the file "LICENCE" for information about the copyright # and warranty status of this software. +'''Script to query the database for debugging purposes. + +Not currently documented; might become easier to use in future. +''' + + import os import sys diff --git a/server/block_processor.py b/server/block_processor.py index aeaade9..ba42a0e 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -1,6 +1,13 @@ -# See the file "LICENSE" for information about the copyright +# Copyright (c) 2016, Neil Booth +# +# All rights reserved. +# +# See the file "LICENCE" for information about the copyright # and warranty status of this software. +'''Block prefetcher and chain processor.''' + + import array import ast import asyncio diff --git a/server/cache.py b/server/cache.py index 0f0cc71..3adfbe8 100644 --- a/server/cache.py +++ b/server/cache.py @@ -1,6 +1,17 @@ -# See the file "LICENSE" for information about the copyright +# Copyright (c) 2016, Neil Booth +# +# All rights reserved. +# +# See the file "LICENCE" for information about the copyright # and warranty status of this software. +'''UTXO and file cache. + +During initial sync these cache data and only flush occasionally. +Once synced flushes are performed after processing each block. +''' + + import array import itertools import os diff --git a/server/controller.py b/server/controller.py index 5812f04..9d1cf64 100644 --- a/server/controller.py +++ b/server/controller.py @@ -1,6 +1,16 @@ -# See the file "LICENSE" for information about the copyright +# Copyright (c) 2016, Neil Booth +# +# All rights reserved. +# +# See the file "LICENCE" for information about the copyright # and warranty status of this software. +'''Server controller. + +Coordinates the parts of the server. Serves as a cache for +client-serving data such as histories. +''' + import asyncio import signal import traceback diff --git a/server/daemon.py b/server/daemon.py index 96a07bc..aaef6c1 100644 --- a/server/daemon.py +++ b/server/daemon.py @@ -1,7 +1,11 @@ -# See the file "LICENSE" for information about the copyright +# Copyright (c) 2016, Neil Booth +# +# All rights reserved. +# +# See the file "LICENCE" for information about the copyright # and warranty status of this software. -'''Classes for handling asynchronous connections to a blockchain +'''Class for handling asynchronous connections to a blockchain daemon.''' import asyncio diff --git a/server/env.py b/server/env.py index 0a447ed..e27f570 100644 --- a/server/env.py +++ b/server/env.py @@ -1,6 +1,13 @@ -# See the file "LICENSE" for information about the copyright +# Copyright (c) 2016, Neil Booth +# +# All rights reserved. +# +# See the file "LICENCE" for information about the copyright # and warranty status of this software. +'''Class for handling environment configuration and defaults.''' + + from os import environ from lib.coins import Coin diff --git a/server/protocol.py b/server/protocol.py index 04b4dcf..ebd8df7 100644 --- a/server/protocol.py +++ b/server/protocol.py @@ -1,6 +1,13 @@ -# See the file "LICENSE" for information about the copyright +# Copyright (c) 2016, Neil Booth +# +# All rights reserved. +# +# See the file "LICENCE" for information about the copyright # and warranty status of this software. +'''Classes for local RPC server and remote client TCP/SSL servers.''' + + import asyncio import codecs import json diff --git a/server_main.py b/server_main.py index 3a82c48..60948fa 100755 --- a/server_main.py +++ b/server_main.py @@ -1,8 +1,15 @@ #!/usr/bin/env python3 - -# See the file "LICENSE" for information about the copyright +# +# Copyright (c) 2016, Neil Booth +# +# All rights reserved. +# +# See the file "LICENCE" for information about the copyright # and warranty status of this software. +'''Script to kick off the server.''' + + import asyncio import logging import os From 3d11afbda22389c82c68119fac9ffb6921866f2c Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Sun, 30 Oct 2016 12:31:53 +0900 Subject: [PATCH 04/17] Enable servers --- server/block_processor.py | 40 ++++++++++++--------- server/controller.py | 74 ++++++++++++++++++++++----------------- server/env.py | 3 ++ server/protocol.py | 38 ++++++++++++-------- 4 files changed, 92 insertions(+), 63 deletions(-) diff --git a/server/block_processor.py b/server/block_processor.py index ba42a0e..6be4e7d 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -126,17 +126,18 @@ class BlockProcessor(LoggedClass): Coordinate backing up in case of chain reorganisations. ''' - def __init__(self, env, daemon): + def __init__(self, env, daemon, on_catchup=None): super().__init__() self.daemon = daemon + self.on_catchup = on_catchup # Meta self.utxo_MB = env.utxo_MB self.hist_MB = env.hist_MB self.next_cache_check = 0 self.coin = env.coin - self.caught_up = False + self.have_caught_up = False self.reorg_limit = env.reorg_limit # Chain state (initialize to genesis in case of new DB) @@ -192,6 +193,17 @@ class BlockProcessor(LoggedClass): else: return [self.start(), self.prefetcher.start()] + async def caught_up(self): + '''Call when we catch up to the daemon's height.''' + # Flush everything when in caught-up state as queries + # are performed on DB and not in-memory. + self.flush(True) + if not self.have_caught_up: + self.have_caught_up = True + self.logger.info('caught up to height {:,d}'.format(self.height)) + if self.on_catchup: + await self.on_catchup() + async def start(self): '''External entry point for block processing. @@ -199,32 +211,26 @@ class BlockProcessor(LoggedClass): shutdown. ''' try: - await self.advance_blocks() + # If we're caught up so the start servers immediately + if self.height == await self.daemon.height(): + await self.caught_up() + await self.wait_for_blocks() finally: self.flush(True) - async def advance_blocks(self): + async def wait_for_blocks(self): '''Loop forever processing blocks in the forward direction.''' while True: blocks = await self.prefetcher.get_blocks() for block in blocks: if not self.advance_block(block): await self.handle_chain_reorg() - self.caught_up = False + self.have_caught_up = False break await asyncio.sleep(0) # Yield - if self.height != self.daemon.cached_height(): - continue - - if not self.caught_up: - self.caught_up = True - self.logger.info('caught up to height {:,d}' - .format(self.height)) - - # Flush everything when in caught-up state as queries - # are performed on DB not in-memory - self.flush(True) + if self.height == self.daemon.cached_height(): + await self.caught_up() async def force_chain_reorg(self, to_genesis): try: @@ -360,7 +366,7 @@ class BlockProcessor(LoggedClass): def flush_state(self, batch): '''Flush chain state to the batch.''' - if self.caught_up: + if self.have_caught_up: self.first_sync = False now = time.time() self.wall_time += now - self.last_flush diff --git a/server/controller.py b/server/controller.py index 9d1cf64..a7af664 100644 --- a/server/controller.py +++ b/server/controller.py @@ -13,6 +13,7 @@ client-serving data such as histories. import asyncio import signal +import ssl import traceback from functools import partial @@ -35,51 +36,62 @@ class Controller(LoggedClass): self.loop = loop self.env = env self.daemon = Daemon(env.daemon_url) - self.block_processor = BlockProcessor(env, self.daemon) + self.block_processor = BlockProcessor(env, self.daemon, + on_catchup=self.start_servers) self.servers = [] self.sessions = set() self.addresses = {} - self.jobs = set() + self.jobs = asyncio.Queue() self.peers = {} def start(self): - '''Prime the event loop with asynchronous servers and jobs.''' - env = self.env - loop = self.loop - + '''Prime the event loop with asynchronous jobs.''' coros = self.block_processor.coros() - - if False: - self.start_servers() - coros.append(self.reap_jobs()) + coros.append(self.run_jobs()) for coro in coros: asyncio.ensure_future(coro) # Signal handlers for signame in ('SIGINT', 'SIGTERM'): - loop.add_signal_handler(getattr(signal, signame), - partial(self.on_signal, signame)) + self.loop.add_signal_handler(getattr(signal, signame), + partial(self.on_signal, signame)) + + async def start_servers(self): + '''Start listening on RPC, TCP and SSL ports. + + Does not start a server if the port wasn't specified. Does + nothing if servers are already running. + ''' + if self.servers: + return + + env = self.env + loop = self.loop - def start_servers(self): protocol = partial(LocalRPC, self) if env.rpc_port is not None: host = 'localhost' rpc_server = loop.create_server(protocol, host, env.rpc_port) - self.servers.append(loop.run_until_complete(rpc_server)) + self.servers.append(await rpc_server) self.logger.info('RPC server listening on {}:{:d}' .format(host, env.rpc_port)) protocol = partial(ElectrumX, self, self.daemon, env) if env.tcp_port is not None: tcp_server = loop.create_server(protocol, env.host, env.tcp_port) - self.servers.append(loop.run_until_complete(tcp_server)) + self.servers.append(await tcp_server) self.logger.info('TCP server listening on {}:{:d}' .format(env.host, env.tcp_port)) if env.ssl_port is not None: - ssl_server = loop.create_server(protocol, env.host, env.ssl_port) - self.servers.append(loop.run_until_complete(ssl_server)) + # FIXME: update if we want to require Python >= 3.5.3 + ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2) + ssl_context.load_cert_chain(env.ssl_certfile, + keyfile=env.ssl_keyfile) + ssl_server = loop.create_server(protocol, env.host, env.ssl_port, + ssl=ssl_context) + self.servers.append(await ssl_server) self.logger.info('SSL server listening on {}:{:d}' .format(env.host, env.ssl_port)) @@ -96,30 +108,28 @@ class Controller(LoggedClass): task.cancel() def add_session(self, session): + '''Add a session representing one incoming connection.''' self.sessions.add(session) def remove_session(self, session): + '''Remove a session.''' self.sessions.remove(session) def add_job(self, coro): '''Queue a job for asynchronous processing.''' - self.jobs.add(asyncio.ensure_future(coro)) + self.jobs.put_nowait(coro) - async def reap_jobs(self): + async def run_jobs(self): + '''Asynchronously run through the job queue.''' while True: - jobs = set() - for job in self.jobs: - if job.done(): - try: - job.result() - except Exception as e: - traceback.print_exc() - else: - jobs.add(job) - self.logger.info('reaped {:d} jobs, {:d} jobs pending' - .format(len(self.jobs) - len(jobs), len(jobs))) - self.jobs = jobs - await asyncio.sleep(5) + job = await self.jobs.get() + try: + await job + except asyncio.CancelledError: + raise + except Exception: + # Getting here should probably be considered a bug and fixed + traceback.print_exc() def address_status(self, hash168): '''Returns status as 32 bytes.''' diff --git a/server/env.py b/server/env.py index e27f570..e3e8bf7 100644 --- a/server/env.py +++ b/server/env.py @@ -34,6 +34,9 @@ class Env(LoggedClass): # Server stuff self.tcp_port = self.integer('TCP_PORT', None) self.ssl_port = self.integer('SSL_PORT', None) + if self.ssl_port: + self.ssl_certfile = self.required('SSL_CERTFILE') + self.ssl_keyfile = self.required('SSL_KEYFILE') self.rpc_port = self.integer('RPC_PORT', 8000) self.max_subscriptions = self.integer('MAX_SUBSCRIPTIONS', 10000) self.banner_file = self.default('BANNER_FILE', None) diff --git a/server/protocol.py b/server/protocol.py index ebd8df7..2d11b7a 100644 --- a/server/protocol.py +++ b/server/protocol.py @@ -24,6 +24,14 @@ class Error(Exception): class JSONRPC(asyncio.Protocol, LoggedClass): + '''Base class that manages a JSONRPC connection. + + When a request comes in for an RPC method M, then a member + function handle_M is called with the request params array, except + that periods in M are replaced with underscores. So a RPC call + for method 'blockchain.estimatefee' will be passed to + handle_blockchain_estimatefee. + ''' def __init__(self, controller): super().__init__() @@ -31,39 +39,41 @@ class JSONRPC(asyncio.Protocol, LoggedClass): self.parts = [] def connection_made(self, transport): + '''Handle an incoming client connection.''' self.transport = transport - peername = transport.get_extra_info('peername') - self.logger.info('connection from {}'.format(peername)) + self.peername = transport.get_extra_info('peername') + self.logger.info('connection from {}'.format(self.peername)) self.controller.add_session(self) def connection_lost(self, exc): - self.logger.info('disconnected') + '''Handle client disconnection.''' + self.logger.info('disconnected: {}'.format(self.peername)) self.controller.remove_session(self) def data_received(self, data): + '''Handle incoming data (synchronously). + + Requests end in newline characters. Pass complete requests to + decode_message for handling. + ''' while True: npos = data.find(ord('\n')) if npos == -1: + self.parts.append(data) break tail, data = data[:npos], data[npos + 1:] - parts = self.parts - self.parts = [] + parts, self.parts = self.parts, [] parts.append(tail) self.decode_message(b''.join(parts)) - if data: - self.parts.append(data) - def decode_message(self, message): - '''Message is a binary message.''' + '''Decode a binary message and queue it for asynchronous handling.''' try: message = json.loads(message.decode()) except Exception as e: - self.logger.info('caught exception decoding message'.format(e)) - return - - job = self.request_handler(message) - self.controller.add_job(job) + self.logger.info('error decoding JSON message'.format(e)) + else: + self.controller.add_job(self.request_handler(message)) async def request_handler(self, request): '''Called asynchronously.''' From f9fcdf22321ae65492c41581f1e8311337634be6 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Mon, 31 Oct 2016 21:55:36 +0900 Subject: [PATCH 05/17] Merge branch 'master' into develop --- AUTHORS | 3 +- HOWTO.rst | 13 ++++ samples/scripts/NOTES | 33 +++++---- server/block_processor.py | 41 +++++----- server/env.py | 1 + server/storage.py | 152 ++++++++++++++++++++++++++++++++++++++ 6 files changed, 209 insertions(+), 34 deletions(-) create mode 100644 server/storage.py diff --git a/AUTHORS b/AUTHORS index 10c4bb7..722c701 100644 --- a/AUTHORS +++ b/AUTHORS @@ -1 +1,2 @@ -Neil Booth: creator and maintainer \ No newline at end of file +Neil Booth: creator and maintainer +Johann Bauer: backend DB abstraction \ No newline at end of file diff --git a/HOWTO.rst b/HOWTO.rst index 2f7abd1..af5be2e 100644 --- a/HOWTO.rst +++ b/HOWTO.rst @@ -29,6 +29,19 @@ metadata comes to just over 17GB. Leveldb needs a bit more for brief periods, and the block chain is only getting longer, so I would recommend having at least 30-40GB free space. +Database Engine +=============== + +You can choose between either RocksDB, LevelDB or LMDB to store transaction +information on disk. Currently, the fastest seems to be RocksDB with LevelDB +being about 10% slower. LMDB seems to be the slowest but maybe that's because +of bad implementation or configuration. + +You will need to install either: + ++ `plyvel `_ for LevelDB ++ `pyrocksdb `_ for RocksDB ++ `lmdb `_ for LMDB Running ======= diff --git a/samples/scripts/NOTES b/samples/scripts/NOTES index d38a327..54c9fca 100644 --- a/samples/scripts/NOTES +++ b/samples/scripts/NOTES @@ -31,18 +31,21 @@ bit bigger than the combine cache size, because of Python overhead and also because leveldb can consume quite a lot of memory during UTXO flushing. So these are rough numbers only: -HIST_MB - amount of history cache, in MB, to retain before flushing to - disk. Default is 250; probably no benefit being much larger - as history is append-only and not searched. - -UTXO_MB - amount of UTXO and history cache, in MB, to retain before - flushing to disk. Default is 1000. This may be too large - for small boxes or too small for machines with lots of RAM. - Larger caches generally perform better as there is - significant searching of the UTXO cache during indexing. - However, I don't see much benefit in my tests pushing this - too high, and in fact performance begins to fall. My - machine has 24GB RAM; the slow down is probably because of - leveldb caching and Python GC effects. However this may be - very dependent on hardware and you may have different - results. \ No newline at end of file +HIST_MB - amount of history cache, in MB, to retain before flushing to + disk. Default is 250; probably no benefit being much larger + as history is append-only and not searched. + +UTXO_MB - amount of UTXO and history cache, in MB, to retain before + flushing to disk. Default is 1000. This may be too large + for small boxes or too small for machines with lots of RAM. + Larger caches generally perform better as there is + significant searching of the UTXO cache during indexing. + However, I don't see much benefit in my tests pushing this + too high, and in fact performance begins to fall. My + machine has 24GB RAM; the slow down is probably because of + leveldb caching and Python GC effects. However this may be + very dependent on hardware and you may have different + results. + +DB_ENGINE - database engine for the transaction database, either rocksdb, + leveldb or lmdb \ No newline at end of file diff --git a/server/block_processor.py b/server/block_processor.py index 6be4e7d..60cf37d 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -17,13 +17,12 @@ from bisect import bisect_left from collections import defaultdict, namedtuple from functools import partial -import plyvel - from server.cache import FSCache, UTXOCache, NO_CACHE_ENTRY from server.daemon import DaemonError from lib.hash import hash_to_str from lib.script import ScriptPubKey from lib.util import chunks, LoggedClass +from server.storage import LMDB, RocksDB, LevelDB, NoDatabaseException def formatted_time(t): @@ -150,7 +149,7 @@ class BlockProcessor(LoggedClass): self.first_sync = True # Open DB and metadata files. Record some of its state. - self.db = self.open_db(self.coin) + self.db = self.open_db(self.coin, env.db_engine) self.tx_count = self.db_tx_count self.height = self.db_height self.tip = self.db_tip @@ -227,7 +226,7 @@ class BlockProcessor(LoggedClass): await self.handle_chain_reorg() self.have_caught_up = False break - await asyncio.sleep(0) # Yield + await asyncio.sleep(0) # Yield if self.height == self.daemon.cached_height(): await self.caught_up() @@ -257,6 +256,7 @@ class BlockProcessor(LoggedClass): '''Return the list of hashes to back up beacuse of a reorg. The hashes are returned in order of increasing height.''' + def match_pos(hashes1, hashes2): for n, (hash1, hash2) in enumerate(zip(hashes1, hashes2)): if hash1 == hash2: @@ -286,17 +286,22 @@ class BlockProcessor(LoggedClass): return self.fs_cache.block_hashes(start, count) - def open_db(self, coin): + def open_db(self, coin, db_engine): db_name = '{}-{}'.format(coin.NAME, coin.NET) + db_engine_class = { + "leveldb": LevelDB, + "rocksdb": RocksDB, + "lmdb": LMDB + }[db_engine.lower()] try: - db = plyvel.DB(db_name, create_if_missing=False, - error_if_exists=False, compression=None) - except: - db = plyvel.DB(db_name, create_if_missing=True, - error_if_exists=True, compression=None) - self.logger.info('created new database {}'.format(db_name)) + db = db_engine_class(db_name, create_if_missing=False, + error_if_exists=False, compression=None) + except NoDatabaseException: + db = db_engine_class(db_name, create_if_missing=True, + error_if_exists=True, compression=None) + self.logger.info('created new {} database {}'.format(db_engine, db_name)) else: - self.logger.info('successfully opened database {}'.format(db_name)) + self.logger.info('successfully opened {} database {}'.format(db_engine, db_name)) self.read_state(db) return db @@ -325,7 +330,7 @@ class BlockProcessor(LoggedClass): ''' if self.flush_count < self.utxo_flush_count: raise ChainError('DB corrupt: flush_count < utxo_flush_count') - with self.db.write_batch(transaction=True) as batch: + with self.db.write_batch() as batch: if self.flush_count > self.utxo_flush_count: self.logger.info('DB shut down uncleanly. Scanning for ' 'excess history flushes...') @@ -423,7 +428,7 @@ class BlockProcessor(LoggedClass): if self.height > self.db_height: self.fs_cache.flush(self.height, self.tx_count) - with self.db.write_batch(transaction=True) as batch: + with self.db.write_batch() as batch: # History first - fast and frees memory. Flush state last # as it reads the wall time. if self.height > self.db_height: @@ -669,7 +674,7 @@ class BlockProcessor(LoggedClass): if not tx.is_coinbase: for txin in reversed(tx.inputs): n -= 33 - undo_item = undo_info[n:n+33] + undo_item = undo_info[n:n + 33] put_utxo(txin.prev_hash + pack(' Date: Mon, 31 Oct 2016 22:03:48 +0900 Subject: [PATCH 06/17] Add release blurb. --- RELEASE-NOTES | 9 +++++++++ samples/scripts/NOTES | 23 ++++++++++------------- server/version.py | 2 +- 3 files changed, 20 insertions(+), 14 deletions(-) diff --git a/RELEASE-NOTES b/RELEASE-NOTES index 6352940..295f5d5 100644 --- a/RELEASE-NOTES +++ b/RELEASE-NOTES @@ -1,3 +1,12 @@ +Version 0.03 +------------ + +- merged bauerj's abstracted DB engine contribution to make it easy to + play with different backends. In addition to LevelDB this adds + support for RocksDB and LMDB. We're interested in your comparitive + performance experiences. + + Version 0.02 ------------ diff --git a/samples/scripts/NOTES b/samples/scripts/NOTES index 54c9fca..642ce1f 100644 --- a/samples/scripts/NOTES +++ b/samples/scripts/NOTES @@ -1,7 +1,5 @@ The following environment variables are required: -COIN - see lib/coins.py, must be a coin NAME -NETWORK - see lib/coins.py, must be a coin NET DB_DIRECTORY - path to the database directory (if relative, to run script) USERNAME - the username the server will run as SERVER_MAIN - path to the server_main.py script (if relative, to run script) @@ -11,18 +9,15 @@ DAEMON_URL - the URL used to connect to the daemon. Should be of the form DAEMON_HOST and DAEMON_PORT. DAEMON_PORT is optional and will default appropriately for COIN. -In addition either RPC_URL must be given as the full RPC URL for -connecting to the daemon, or you must specify RPC_HOST, RPC_USER, -RPC_PASSWORD and optionally RPC_PORT (it defaults appropriately for -the coin and network otherwise). - The other environment variables are all optional and will adopt sensible defaults if not specified. -REORG_LIMIT - maximum number of blocks to be able to handle in a chain - reorganisation. ElectrumX retains some fairly compact - undo information for this many blocks in levelDB. - Default is 200. +COIN - see lib/coins.py, must be a coin NAME. Defaults to Bitcoin. +NETWORK - see lib/coins.py, must be a coin NET. Defaults to mainnet. +REORG_LIMIT - maximum number of blocks to be able to handle in a chain + reorganisation. ElectrumX retains some fairly compact + undo information for this many blocks in levelDB. + Default is 200. Your performance might change by tweaking these cache settings. Cache size is only checked roughly every minute, so the caches can grow @@ -47,5 +42,7 @@ UTXO_MB - amount of UTXO and history cache, in MB, to retain before very dependent on hardware and you may have different results. -DB_ENGINE - database engine for the transaction database, either rocksdb, - leveldb or lmdb \ No newline at end of file +DB_ENGINE - database engine for the transaction database. Default is + leveldb. Supported alternatives are rocksdb and lmdb, + which will require installation of the appropriate python + packages. \ No newline at end of file diff --git a/server/version.py b/server/version.py index abb6c1a..c83fa77 100644 --- a/server/version.py +++ b/server/version.py @@ -1 +1 @@ -VERSION = "ElectrumX 0.02" +VERSION = "ElectrumX 0.03" From 2dd5b7ef1f5bca70c3ed422694b998a32e10f7df Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Wed, 2 Nov 2016 06:17:15 +0900 Subject: [PATCH 07/17] Add ARCHITECTURE.rst --- server/ARCHITECTURE.rst | 99 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 99 insertions(+) create mode 100644 server/ARCHITECTURE.rst diff --git a/server/ARCHITECTURE.rst b/server/ARCHITECTURE.rst new file mode 100644 index 0000000..3af6c99 --- /dev/null +++ b/server/ARCHITECTURE.rst @@ -0,0 +1,99 @@ +Components +========== + +The components of the server are roughly like this:: + + ------- + - Env - + ------- + + ------- + - IRC - + ------- + < + ------------- ------------ + - ElectrumX -<<<<<- LocalRPC - + ------------- ------------ + < > + ---------- ------------------- ---------- + - Daemon -<<<<<<<<- Block processor ->>>>- Caches - + ---------- ------------------- ---------- + < < > < + -------------- ----------- + - Prefetcher - - Storage - + -------------- ----------- + + +Env +--- + +Holds configuration taken from the environment. Handles defaults +appropriately. Generally passed to the constructor of other +components which take their settings from it. + + +LocalRPC +-------- + +Handles local JSON RPC connections querying ElectrumX server state. +Not started until the block processor has caught up with the daemon. + +ElectrumX +--------- + +Handles JSON Electrum client connections over TCP or SSL. One +instance per client session. Should be the only component concerned +with the details of the Electrum wire protocol. Responsible for +caching of client responses. Not started until the block processor +has caught up with the daemon. Logically, if not yet in practice, a +coin-specific class. + +Daemon +------ + +Used by the block processor, ElectrumX servers and prefetcher. +Encapsulates daemon RPC wire protcol. Logically, if not yet in +practice, a coin-specific class. + +Block Processor +--------------- + +Responsible for managing block chain state (UTXO set, history, +transaction and undo information) and processing towards the chain +tip. Uses the caches for in-memory state caching. Flushes state to +the storage layer. Reponsible for handling block chain +reorganisations. Once caught up maintains a representation of daemon +mempool state. + +Caches +------ + +The file system cache and the UTXO cache are implementation details of +the block processor, nothing else should interface with them. + +Storage +------- + +Backend database abstraction. Along with the host filesystem, used by +the block processor (and therefore its caches) to store chain state. + +Prefetcher +---------- + +Used by the block processor to asynchronously prefetch blocks from the +daemon. Holds fetched block height. Once it has caught up +additionally obtains daemon mempool tx hashes. Serves blocks and +mempool hashes to the block processor via a queue. + +IRC +--- + +Not currently imlpemented; will handle IRC communication for the +ElectrumX servers. + +Controller +---------- + +A historical artefact that currently coordinates some of the above +components. Not pictured as it is doesn't seem to have a logical +place and so is probably going away. From ceecdc54ac7a47bbddd67473f83558fa63e9786e Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Thu, 3 Nov 2016 17:26:50 +0900 Subject: [PATCH 08/17] Server work Avoid touching the block preprocessor hot loop for now --- lib/coins.py | 19 +- server/block_processor.py | 12 +- server/cache.py | 19 -- server/controller.py | 55 ++++-- server/daemon.py | 25 ++- server/protocol.py | 378 ++++++++++++++++++++++++++++---------- 6 files changed, 365 insertions(+), 143 deletions(-) diff --git a/lib/coins.py b/lib/coins.py index 6537e74..ec84b96 100644 --- a/lib/coins.py +++ b/lib/coins.py @@ -13,9 +13,10 @@ necessary for appropriate handling. from decimal import Decimal import inspect +import struct import sys -from lib.hash import Base58, hash160, double_sha256 +from lib.hash import Base58, hash160, double_sha256, hash_to_str from lib.script import ScriptPubKey from lib.tx import Deserializer @@ -31,6 +32,7 @@ class Coin(object): HEADER_LEN = 80 DEFAULT_RPC_PORT = 8332 VALUE_PER_COIN = 100000000 + CHUNK_SIZE=2016 @staticmethod def coin_classes(): @@ -168,6 +170,21 @@ class Coin(object): ''' return Decimal(value) / cls.VALUE_PER_COIN + @classmethod + def electrum_header(cls, header, height): + version, = struct.unpack(' self.height + len(self.headers): - raise Exception('no header information for height {:,d}' - .format(height)) - header = self.read_headers(self.height, 1) - unpack = struct.unpack - version, = unpack('= 0: + return param + raise RPCError('param should be a non-negative integer: {}' + .format(param)) + + @classmethod + def extract_hash168(cls, params): + if len(params) == 1: + return cls.hash168_from_param(params[0]) + raise RPCError('params should contain a single address: {}' + .format(params)) + + @classmethod + def extract_non_negative_integer(cls, params): + if len(params) == 1: + return cls.non_negative_integer_from_param(params[0]) + raise RPCError('params should contain a non-negative integer: {}' + .format(params)) + + @classmethod + def require_empty_params(cls, params): + if params: + raise RPCError('params should be empty: {}'.format(params)) + + @classmethod + def init(cls, block_processor, coin): + cls.BLOCK_PROCESSOR = block_processor + cls.COIN = coin + + @classmethod + def height(cls): + '''Return the current height.''' + return cls.BLOCK_PROCESSOR.height + + @classmethod + def electrum_header(cls, height=None): + '''Return the binary header at the given height.''' + if not 0 <= height <= cls.height(): + raise RPCError('height {:,d} out of range'.format(height)) + header = cls.BLOCK_PROCESSOR.read_headers(height, 1) + return cls.COIN.electrum_header(header, height) + + @classmethod + def current_electrum_header(cls): + '''Used as response to a headers subscription request.''' + return cls.electrum_header(cls.height()) + + @classmethod + def notify(cls, height, touched): + '''Notify electrum clients about height changes and touched + addresses.''' + headers_payload = json_notification( + 'blockchain.headers.subscribe', + (cls.electrum_header(height), ), + ) + height_payload = json_notification( + 'blockchain.numblocks.subscribe', + (height, ), + ) + for session in cls.SESSIONS: + if height != session.notified_height: + session.notified_height = height + if session.subscribe_headers: + session.json_send(headers_payload) + if session.subscribe_height: + session.json_send(height_payload) + + for hash168 in session.hash168s.intersection(touched): + payload = json_notification('blockchain.address.subscribe', + (Base58.encode_check(hash168), )) + session.json_send(payload) class ElectrumX(JSONRPC): @@ -122,60 +222,138 @@ class ElectrumX(JSONRPC): super().__init__(controller) self.daemon = daemon self.env = env - self.addresses = set() - self.subscribe_headers = False + self.hash168s = set() + rpcs = [( + 'blockchain', + 'address.get_balance address.get_history address.get_mempool ' + 'address.get_proof address.listunspent address.subscribe ' + 'block.get_header block.get_chunk estimatefee headers.subscribe ' + 'numblocks.subscribe relayfee transaction.broadcast ' + 'transaction.get transaction.get_merkle utxo.get_address'), + ( + 'server', + 'banner donation_address peers.subscribe version'), + ] + self.handlers = {'.'.join([prefix, suffix]): + getattr(self.__class__, suffix.replace('.', '_')) + for prefix, suffixes in rpcs + for suffix in suffixes.split()} + + @classmethod + def watched_address_count(cls): + return sum(len(session.hash168s) for session in self.SESSIONS + if isinstance(session, cls)) + + # --- blockchain commands + + async def address_get_balance(self, params): + hash168 = self.extract_hash168(params) + return self.controller.get_balance(hash168) + + async def address_get_history(self, params): + hash168 = self.extract_hash168(params) + return self.controller.get_history(hash168) - def params_to_hash168(self, params): - if len(params) != 1: - raise Error(Error.BAD_REQUEST, - 'params should contain a single address') - address = params[0] - try: - return self.env.coin.address_to_hash168(address) - except: - raise Error(Error.BAD_REQUEST, - 'invalid address: {}'.format(address)) + async def address_get_mempool(self, params): + hash168 = self.extract_hash168(params) + raise RPCError('get_mempool is not yet implemented') - async def handle_blockchain_address_get_history(self, params): - hash168 = self.params_to_hash168(params) - return self.controller.get_history(hash168) + async def address_get_proof(self, params): + hash168 = self.extract_hash168(params) + raise RPCError('get_proof is not yet implemented') - async def handle_blockchain_address_subscribe(self, params): - hash168 = self.params_to_hash168(params) + async def address_listunspent(self, params): + hash168 = self.extract_hash168(params) + return self.controller.list_unspent(hash168) + + async def address_subscribe(self, params): + hash168 = self.extract_hash168(params) + self.hash168s.add(hash168) status = self.controller.address_status(hash168) return status.hex() if status else None - async def handle_blockchain_estimatefee(self, params): - result = await self.daemon.send_single('estimatefee', params) - return result + async def block_get_chunk(self, params): + index = self.extract_non_negative_integer(params) + return self.controller.get_chunk(index) + + async def block_get_header(self, params): + height = self.extract_non_negative_integer(params) + return self.electrum_header(height) + + async def estimatefee(self, params): + return await self.daemon.estimatefee(params) - async def handle_blockchain_headers_subscribe(self, params): + async def headers_subscribe(self, params): + self.require_empty_params(params) self.subscribe_headers = True - return self.controller.get_current_header() + return self.current_electrum_header() - async def handle_blockchain_relayfee(self, params): + async def numblocks_subscribe(self, params): + self.require_empty_params(params) + self.subscribe_height = True + return self.height() + + async def relayfee(self, params): '''The minimum fee a low-priority tx must pay in order to be accepted - to this daemon's memory pool. + to the daemon's memory pool.''' + self.require_empty_params(params) + return await self.daemon.relayfee() + + async def transaction_broadcast(self, params): + '''Pass through the parameters to the daemon. + + An ugly API: current Electrum clients only pass the raw + transaction in hex and expect error messages to be returned in + the result field. And the server shouldn't be doing the client's + user interface job here. ''' - net_info = await self.daemon.send_single('getnetworkinfo') - return net_info['relayfee'] - - async def handle_blockchain_transaction_get(self, params): - if len(params) != 1: - raise Error(Error.BAD_REQUEST, - 'params should contain a transaction hash') - tx_hash = params[0] - return await self.daemon.send_single('getrawtransaction', (tx_hash, 0)) - - async def handle_blockchain_transaction_get_merkle(self, params): - if len(params) != 2: - raise Error(Error.BAD_REQUEST, - 'params should contain a transaction hash and height') - tx_hash, height = params - return await self.controller.get_merkle(tx_hash, height) - - async def handle_server_banner(self, params): + try: + tx_hash = await self.daemon.sendrawtransaction(params) + self.logger.info('sent tx: {}'.format(tx_hash)) + return tx_hash + except DaemonError as e: + errors = e.args[0] + error = errors[0] + message = error['message'] + self.logger.info('sendrawtransaction: {}'.format(message)) + if 'non-mandatory-script-verify-flag' in message: + return ( + 'Your client produced a transaction that is not accepted ' + 'by the network any more. Please upgrade to Electrum ' + '2.5.1 or newer.' + ) + + return ( + 'The transaction was rejected by network rules. ({})\n[{}]' + .format(message, params[0]) + ) + + async def transaction_get(self, params): + '''Return the serialized raw transaction.''' + # For some reason Electrum passes a height. Don't require it + # in anticipation it might be dropped in the future. + if 1 <= len(params) <= 2: + tx_hash = self.tx_hash_from_param(params[0]) + return await self.daemon.getrawtransaction(tx_hash) + + raise RPCError('params wrong length: {}'.format(params)) + + async def transaction_get_merkle(self, params): + if len(params) == 2: + tx_hash = self.tx_hash_from_param(params[0]) + height = self.non_negative_integer_from_param(params[1]) + return await self.controller.get_merkle(tx_hash, height) + + raise RPCError('params should contain a transaction hash and height') + + async def utxo_get_address(self, params): + pass # TODO + + # --- server commands + + async def banner(self, params): '''Return the server banner.''' + self.require_empty_params(params) banner = 'Welcome to Electrum!' if self.env.banner_file: try: @@ -186,23 +364,25 @@ class ElectrumX(JSONRPC): .format(self.env.banner_file, e)) return banner - async def handle_server_donation_address(self, params): + async def donation_address(self, params): '''Return the donation address as a string. If none is specified return the empty string. ''' + self.require_empty_params(params) return self.env.donation_address - async def handle_server_peers_subscribe(self, params): + async def peers_subscribe(self, params): '''Returns the peer (ip, host, ports) tuples. Despite the name electrum-server does not treat this as a subscription. ''' + self.require_empty_params(params) peers = self.controller.get_peers() return tuple(peers.values()) - async def handle_server_version(self, params): + async def version(self, params): '''Return the server version as a string.''' return VERSION @@ -210,24 +390,28 @@ class ElectrumX(JSONRPC): class LocalRPC(JSONRPC): '''A local TCP RPC server for querying status.''' - async def handle_getinfo(self, params): + def __init__(self): + super().__init__() + cmds = 'getinfo sessions numsessions peers numpeers'.split() + self.handlers = {cmd: getattr(self.__class__, cmd) for cmd in cmds} + + async def getinfo(self, params): return { - 'blocks': self.controller.height(), + 'blocks': self.height(), 'peers': len(self.controller.get_peers()), - 'sessions': len(self.controller.sessions), - 'watched': sum(len(s.addresses) for s in self.controller.sessions - if isinstance(s, ElectrumX)), + 'sessions': len(self.SESSIONS), + 'watched': ElectrumX.watched_address_count(), 'cached': 0, } - async def handle_sessions(self, params): + async def sessions(self, params): return [] - async def handle_numsessions(self, params): - return len(self.controller.sessions) + async def numsessions(self, params): + return len(self.SESSIONS) - async def handle_peers(self, params): + async def peers(self, params): return tuple(self.controller.get_peers().keys()) - async def handle_numpeers(self, params): + async def numpeers(self, params): return len(self.controller.get_peers()) From e717e719c1a56b1699dec244159611bf02cacb89 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Thu, 3 Nov 2016 17:43:32 +0900 Subject: [PATCH 09/17] Prefetcher cleanup --- server/block_processor.py | 102 +++++++++++++++++++++++--------------- 1 file changed, 61 insertions(+), 41 deletions(-) diff --git a/server/block_processor.py b/server/block_processor.py index 7397582..9319fe4 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -49,16 +49,12 @@ class Prefetcher(LoggedClass): self.semaphore = asyncio.Semaphore() self.queue = asyncio.Queue() self.queue_size = 0 + self.fetched_height = height + self.mempool = [] # Target cache size. Has little effect on sync time. self.target_cache_size = 10 * 1024 * 1024 - self.fetched_height = height - self.recent_sizes = [0] - - async def get_blocks(self): - '''Returns a list of prefetched blocks.''' - blocks, total_size = await self.queue.get() - self.queue_size -= total_size - return blocks + # First fetch to be 10 blocks + self.ave_size = self.target_cache_size // 10 async def clear(self, height): '''Clear prefetched blocks and restart from the given height. @@ -73,49 +69,73 @@ class Prefetcher(LoggedClass): self.queue_size = 0 self.fetched_height = height + async def get_blocks(self): + '''Returns a list of prefetched blocks and the mempool.''' + blocks, height, size = await self.queue.get() + self.queue_size -= size + if height == self.daemon.cached_height(): + return blocks, self.mempool + else: + return blocks, None + async def start(self): '''Loop forever polling for more blocks.''' - self.logger.info('looping forever prefetching blocks...') + self.logger.info('starting prefetch loop...') while True: - while self.queue_size < self.target_cache_size: - try: - with await self.semaphore: - await self._prefetch() - except DaemonError as e: - self.logger.info('ignoring daemon errors: {}'.format(e)) - await asyncio.sleep(2) - - def _prefill_count(self, room): - ave_size = sum(self.recent_sizes) // len(self.recent_sizes) - count = room // ave_size if ave_size else 0 - return max(count, 10) + try: + if await self._caught_up(): + await asyncio.sleep(5) + else: + await asyncio.sleep(0) + except DaemonError as e: + self.logger.info('ignoring daemon errors: {}'.format(e)) + + async def _caught_up(self): + '''Poll for new blocks and mempool state. + + Mempool is only queried if caught up with daemon.''' + with await self.semaphore: + blocks, size = await self._prefetch() + self.fetched_height += len(blocks) + caught_up = self.fetched_height == self.daemon.cached_height() + if caught_up: + self.mempool = await self.daemon.mempool_hashes() + + # Wake up block processor if we have something + if blocks or caught_up: + self.queue.put_nowait((blocks, self.fetched_height, size)) + self.queue_size += size + + return caught_up async def _prefetch(self): - '''Prefetch blocks if there are any to prefetch.''' + '''Prefetch blocks unless the prefetch queue is full.''' + if self.queue_size >= self.target_cache_size: + return [], 0 + daemon_height = await self.daemon.height() - max_count = min(daemon_height - self.fetched_height, 4000) - count = min(max_count, self._prefill_count(self.target_cache_size)) + cache_room = self.target_cache_size // self.ave_size + + # Try and catch up all blocks but limit to room in cache. + # Constrain count to between 0 and 4000 regardless + count = min(daemon_height - self.fetched_height, cache_room) + count = min(4000, max(count, 0)) if not count: - return 0 + return [], 0 + first = self.fetched_height + 1 hex_hashes = await self.daemon.block_hex_hashes(first, count) - if not hex_hashes: - self.logger.error('requested {:,d} hashes, got none'.format(count)) - return 0 - blocks = await self.daemon.raw_blocks(hex_hashes) - sizes = [len(block) for block in blocks] - total_size = sum(sizes) - self.queue.put_nowait((blocks, total_size)) - self.queue_size += total_size - self.fetched_height += len(blocks) - # Keep 50 most recent block sizes for fetch count estimation - self.recent_sizes.extend(sizes) - excess = len(self.recent_sizes) - 50 - if excess > 0: - self.recent_sizes = self.recent_sizes[excess:] - return count + size = sum(len(block) for block in blocks) + + # Update our recent average block size estimate + if count >= 10: + self.ave_size = size // count + else: + self.ave_size = (size + (10 - count) * self.ave_size) // 10 + + return blocks, size class BlockProcessor(LoggedClass): @@ -224,7 +244,7 @@ class BlockProcessor(LoggedClass): async def wait_for_blocks(self): '''Loop forever processing blocks in the forward direction.''' while True: - blocks = await self.prefetcher.get_blocks() + blocks, mempool = await self.prefetcher.get_blocks() for block in blocks: if not self.advance_block(block): await self.handle_chain_reorg() From 51accf7dfe56928f99215725572b34530f803276 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Thu, 3 Nov 2016 19:03:29 +0900 Subject: [PATCH 10/17] Update BlockProcessor for server changes --- server/block_processor.py | 125 +++++++++++++++++++------------------- 1 file changed, 62 insertions(+), 63 deletions(-) diff --git a/server/block_processor.py b/server/block_processor.py index 9319fe4..0d1fc5f 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -149,7 +149,6 @@ class BlockProcessor(LoggedClass): '''on_update is awaitable, and called only when caught up with the daemon and a new block arrives or the mempool is updated. ''' - super().__init__() self.daemon = daemon @@ -160,7 +159,6 @@ class BlockProcessor(LoggedClass): self.hist_MB = env.hist_MB self.next_cache_check = 0 self.coin = env.coin - self.have_caught_up = False self.reorg_limit = env.reorg_limit # Chain state (initialize to genesis in case of new DB) @@ -182,7 +180,6 @@ class BlockProcessor(LoggedClass): # entry per block self.history = defaultdict(partial(array.array, 'I')) self.history_size = 0 - self.backup_hash168s = set() self.utxo_cache = UTXOCache(self, self.db, self.coin) self.fs_cache = FSCache(self.coin, self.height, self.tx_count) self.prefetcher = Prefetcher(daemon, self.height) @@ -190,8 +187,9 @@ class BlockProcessor(LoggedClass): self.last_flush = time.time() self.last_flush_tx_count = self.tx_count - # Redirected member func + # Redirected member funcs self.get_tx_hash = self.fs_cache.get_tx_hash + self.read_headers = self.fs_cache.read_headers # Log state self.logger.info('{}/{} height: {:,d} tx count: {:,d} ' @@ -216,44 +214,45 @@ class BlockProcessor(LoggedClass): else: return [self.start(), self.prefetcher.start()] - async def caught_up(self): - '''Call when we catch up to the daemon's height.''' - # Flush everything when in caught-up state as queries - # are performed on DB and not in-memory. - self.flush(True) - if not self.have_caught_up: - self.have_caught_up = True - self.logger.info('caught up to height {:,d}'.format(self.height)) - if self.on_update: - await self.on_update(self.height, set()) - async def start(self): '''External entry point for block processing. - A simple wrapper that safely flushes the DB on clean - shutdown. + Safely flushes the DB on clean shutdown. ''' try: - # If we're caught up so the start servers immediately - if self.height == await self.daemon.height(): - await self.caught_up() - await self.wait_for_blocks() + while True: + await self._wait_for_update() + await asyncio.sleep(0) # Yield finally: self.flush(True) - async def wait_for_blocks(self): - '''Loop forever processing blocks in the forward direction.''' - while True: - blocks, mempool = await self.prefetcher.get_blocks() - for block in blocks: - if not self.advance_block(block): - await self.handle_chain_reorg() - self.have_caught_up = False - break - await asyncio.sleep(0) # Yield + async def _wait_for_update(self): + '''Wait for the prefetcher to deliver blocks or a mempool update. + + Blocks are only processed in the forward direction. The + prefetcher only provides a non-None mempool when caught up. + ''' + all_touched = [] + blocks, mempool = await self.prefetcher.get_blocks() + for block in blocks: + touched = self.advance_block(block) + if touched is None: + all_touched.append(await self.handle_chain_reorg()) + mempool = None + break + all_touched.append(touched) + await asyncio.sleep(0) # Yield + + if mempool is not None: + # Caught up to daemon height. Flush everything as queries + # are performed on the DB and not in-memory. + self.flush(True) + if self.first_sync: + self.first_sync = False + self.logger.info('synced to height {:,d}'.format(self.height)) + if self.on_update: + await self.on_update(self.height, set.union(*all_touched)) - if self.height == self.daemon.cached_height(): - await self.caught_up() async def force_chain_reorg(self, to_genesis): try: @@ -266,16 +265,21 @@ class BlockProcessor(LoggedClass): self.logger.info('chain reorg detected') self.flush(True) self.logger.info('finding common height...') + + touched = set() hashes = await self.reorg_hashes(to_genesis) # Reverse and convert to hex strings. hashes = [hash_to_str(hash) for hash in reversed(hashes)] for hex_hashes in chunks(hashes, 50): blocks = await self.daemon.raw_blocks(hex_hashes) - self.backup_blocks(blocks) + touched.update(self.backup_blocks(blocks)) + self.logger.info('backed up to height {:,d}'.format(self.height)) await self.prefetcher.clear(self.height) self.logger.info('prefetcher reset') + return touched + async def reorg_hashes(self, to_genesis): '''Return the list of hashes to back up beacuse of a reorg. @@ -395,8 +399,6 @@ class BlockProcessor(LoggedClass): def flush_state(self, batch): '''Flush chain state to the batch.''' - if self.have_caught_up: - self.first_sync = False now = time.time() self.wall_time += now - self.last_flush self.last_flush = now @@ -429,14 +431,13 @@ class BlockProcessor(LoggedClass): assert not self.history assert not self.utxo_cache.cache assert not self.utxo_cache.db_cache - assert not self.backup_hash168s - def flush(self, flush_utxos=False): + def flush(self, flush_utxos=False, flush_history=None): '''Flush out cached state. History is always flushed. UTXOs are flushed if flush_utxos.''' if self.height == self.db_height: - self.logger.info('nothing to flush') + assert flush_history is None self.assert_flushed() return @@ -450,15 +451,14 @@ class BlockProcessor(LoggedClass): # matter. But if writing the files fails we do not want to # have updated the DB. if self.height > self.db_height: + assert flush_history is None + flush_history = self.flush_history self.fs_cache.flush(self.height, self.tx_count) with self.db.write_batch() as batch: # History first - fast and frees memory. Flush state last # as it reads the wall time. - if self.height > self.db_height: - self.flush_history(batch) - else: - self.backup_history(batch) + flush_history(batch) if flush_utxos: self.flush_utxos(batch) self.flush_state(batch) @@ -494,7 +494,6 @@ class BlockProcessor(LoggedClass): def flush_history(self, batch): self.logger.info('flushing history') - assert not self.backup_hash168s self.flush_count += 1 flush_id = struct.pack('>H', self.flush_count) @@ -509,16 +508,16 @@ class BlockProcessor(LoggedClass): self.history = defaultdict(partial(array.array, 'I')) self.history_size = 0 - def backup_history(self, batch): + def backup_history(self, batch, hash168s): self.logger.info('backing up history to height {:,d} tx_count {:,d}' .format(self.height, self.tx_count)) # Drop any NO_CACHE entry - self.backup_hash168s.discard(NO_CACHE_ENTRY) + hash168s.discard(NO_CACHE_ENTRY) assert not self.history nremoves = 0 - for hash168 in sorted(self.backup_hash168s): + for hash168 in sorted(hash168s): prefix = b'H' + hash168 deletes = [] puts = {} @@ -539,8 +538,7 @@ class BlockProcessor(LoggedClass): batch.put(key, value) self.logger.info('removed {:,d} history entries from {:,d} addresses' - .format(nremoves, len(self.backup_hash168s))) - self.backup_hash168s = set() + .format(nremoves, len(hash168s))) def cache_sizes(self): '''Returns the approximate size of the cache, in MB.''' @@ -587,11 +585,12 @@ class BlockProcessor(LoggedClass): self.fs_cache.advance_block(header, tx_hashes, txs) prev_hash, header_hash = self.coin.header_hashes(header) if prev_hash != self.tip: - return False + return None + touched = set() self.tip = header_hash self.height += 1 - undo_info = self.advance_txs(tx_hashes, txs) + undo_info = self.advance_txs(tx_hashes, txs, touched) if self.daemon.cached_height() - self.height <= self.reorg_limit: self.write_undo_info(self.height, b''.join(undo_info)) @@ -603,9 +602,9 @@ class BlockProcessor(LoggedClass): if utxo_MB >= self.utxo_MB or hist_MB >= self.hist_MB: self.flush(utxo_MB >= self.utxo_MB) - return True + return touched - def advance_txs(self, tx_hashes, txs): + def advance_txs(self, tx_hashes, txs, touched): put_utxo = self.utxo_cache.put spend_utxo = self.utxo_cache.spend undo_info = [] @@ -642,6 +641,7 @@ class BlockProcessor(LoggedClass): for hash168 in hash168s: history[hash168].append(tx_num) self.history_size += len(hash168s) + touched.update(hash168s) tx_num += 1 self.tx_count = tx_num @@ -657,6 +657,7 @@ class BlockProcessor(LoggedClass): self.logger.info('backing up {:,d} blocks'.format(len(blocks))) self.assert_flushed() + touched = set() for block in blocks: header, tx_hashes, txs = self.coin.read_block(block) prev_hash, header_hash = self.coin.header_hashes(header) @@ -665,15 +666,18 @@ class BlockProcessor(LoggedClass): .format(hash_to_str(header_hash), hash_to_str(self.tip), self.height)) - self.backup_txs(tx_hashes, txs) + self.backup_txs(tx_hashes, txs, touched) self.fs_cache.backup_block() self.tip = prev_hash self.height -= 1 self.logger.info('backed up to height {:,d}'.format(self.height)) - self.flush(True) - def backup_txs(self, tx_hashes, txs): + flush_history = partial(self.backup_history, hash168s=touched) + self.flush(True, flush_history=flush_history) + return touched + + def backup_txs(self, tx_hashes, txs, touched): # Prevout values, in order down the block (coinbase first if present) # undo_info is in reverse block order undo_info = self.read_undo_info(self.height) @@ -683,7 +687,6 @@ class BlockProcessor(LoggedClass): pack = struct.pack put_utxo = self.utxo_cache.put spend_utxo = self.utxo_cache.spend - hash168s = self.backup_hash168s rtxs = reversed(txs) rtx_hashes = reversed(tx_hashes) @@ -692,7 +695,7 @@ class BlockProcessor(LoggedClass): # Spend the outputs for idx, txout in enumerate(tx.outputs): cache_value = spend_utxo(tx_hash, idx) - hash168s.add(cache_value[:21]) + touched.add(cache_value[:21]) # Restore the inputs if not tx.is_coinbase: @@ -701,7 +704,7 @@ class BlockProcessor(LoggedClass): undo_item = undo_info[n:n + 33] put_utxo(txin.prev_hash + pack(' Date: Thu, 3 Nov 2016 10:23:21 +0900 Subject: [PATCH 11/17] Fix reorg assertion, I think --- server/block_processor.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/server/block_processor.py b/server/block_processor.py index 0d1fc5f..47aa0e1 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -294,7 +294,6 @@ class BlockProcessor(LoggedClass): start = self.height - 1 count = 1 while start > 0: - self.logger.info('start: {:,d} count: {:,d}'.format(start, count)) hashes = self.fs_cache.block_hashes(start, count) hex_hashes = [hash_to_str(hash) for hash in hashes] d_hex_hashes = await self.daemon.block_hex_hashes(start, count) @@ -582,12 +581,12 @@ class BlockProcessor(LoggedClass): # the UTXO cache uses the fs_cache via get_tx_hash() to # resolve compressed key collisions header, tx_hashes, txs = self.coin.read_block(block) - self.fs_cache.advance_block(header, tx_hashes, txs) prev_hash, header_hash = self.coin.header_hashes(header) if prev_hash != self.tip: return None touched = set() + self.fs_cache.advance_block(header, tx_hashes, txs) self.tip = header_hash self.height += 1 undo_info = self.advance_txs(tx_hashes, txs, touched) From be2475f61765e1e0202f446a4137ba859b1c7fe8 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Thu, 3 Nov 2016 10:46:09 +0900 Subject: [PATCH 12/17] Handle utxo.get_address --- lib/coins.py | 5 +++++ server/block_processor.py | 9 +++++++++ server/controller.py | 7 +++---- server/protocol.py | 17 +++++++++++++++-- 4 files changed, 32 insertions(+), 6 deletions(-) diff --git a/lib/coins.py b/lib/coins.py index ec84b96..5c89832 100644 --- a/lib/coins.py +++ b/lib/coins.py @@ -77,6 +77,11 @@ class Coin(object): raise CoinError('invalid address: {}'.format(addr)) return result + @classmethod + def hash168_to_address(cls, hash168): + '''Return an address given a 21-byte hash.''' + return Base58.encode_check(hash168) + @classmethod def P2PKH_address_from_hash160(cls, hash_bytes): '''Return a P2PKH address given a public key.''' diff --git a/server/block_processor.py b/server/block_processor.py index 47aa0e1..908b245 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -762,3 +762,12 @@ class BlockProcessor(LoggedClass): '''Returns all the UTXOs for an address sorted by height and position in the block.''' return sorted(self.get_utxos(hash168, limit=None)) + + def get_utxo_hash168(self, tx_hash, index): + '''Returns the hash168 for a UTXO.''' + hash168 = None + if 0 <= index <= 65535: + hash168 = self.utxo_cache(tx_hash, struct.pack(' Date: Thu, 3 Nov 2016 11:25:36 +0900 Subject: [PATCH 13/17] Fix some of the DB interface --- server/block_processor.py | 2 +- server/storage.py | 42 ++++++++++++++++----------------------- 2 files changed, 18 insertions(+), 26 deletions(-) diff --git a/server/block_processor.py b/server/block_processor.py index 908b245..6c9483f 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -520,7 +520,7 @@ class BlockProcessor(LoggedClass): prefix = b'H' + hash168 deletes = [] puts = {} - for key, hist in self.db.iterator(reverse=True, prefix=prefix): + for key, hist in self.db.iterator(prefix=prefix, reverse=True): a = array.array('I') a.frombytes(hist) # Remove all history entries >= self.tx_count diff --git a/server/storage.py b/server/storage.py index 13b1847..399ed69 100644 --- a/server/storage.py +++ b/server/storage.py @@ -1,4 +1,5 @@ import os +from functools import partial class Storage(object): @@ -19,7 +20,7 @@ class Storage(object): """ raise NotImplementedError() - def iterator(self, prefix=b""): + def iterator(self, prefix=b'', reverse=False): """ Returns an iterator that yields (key, value) pairs from the database sorted by key. If `prefix` is set, only keys starting with `prefix` will be included. @@ -37,18 +38,10 @@ class LevelDB(Storage): import plyvel self.db = plyvel.DB(name, create_if_missing=create_if_missing, error_if_exists=error_if_exists, compression=compression) - - def get(self, key): - return self.db.get(key) - - def write_batch(self): - return self.db.write_batch(transaction=True) - - def iterator(self, prefix=b""): - return self.db.iterator(prefix=prefix) - - def put(self, key, value): - self.db.put(key, value) + self.get = self.db.get + self.put = self.db.put + self.iterator = self.db.iterator + self.write_batch = partial(self.db.write_batch, transaction=True) class RocksDB(Storage): @@ -65,9 +58,8 @@ class RocksDB(Storage): compression=compression, target_file_size_base=33554432, max_open_files=1024)) - - def get(self, key): - return self.db.get(key) + self.get = self.db.get + self.put = self.db.put class WriteBatch(object): def __init__(self, db): @@ -85,8 +77,10 @@ class RocksDB(Storage): return RocksDB.WriteBatch(self.db) class Iterator(object): - def __init__(self, db, prefix): + def __init__(self, db, prefix, reverse): self.it = db.iteritems() + if reverse: + self.it = reversed(self.it) self.prefix = prefix def __iter__(self): @@ -100,11 +94,8 @@ class RocksDB(Storage): raise StopIteration return k, v - def iterator(self, prefix=b""): - return RocksDB.Iterator(self.db, prefix) - - def put(self, key, value): - return self.db.put(key, value) + def iterator(self, prefix=b'', reverse=False): + return RocksDB.Iterator(self.db, prefix, reverse) class LMDB(Storage): @@ -128,15 +119,16 @@ class LMDB(Storage): def write_batch(self): return self.env.begin(db=self.db, write=True) - def iterator(self, prefix=b""): - return LMDB.lmdb.Iterator(self.db, self.env, prefix) + def iterator(self, prefix=b'', reverse=False): + return LMDB.Iterator(self.db, self.env, prefix, reverse) class Iterator: - def __init__(self, db, env, prefix): + def __init__(self, db, env, prefix, reverse): self.transaction = env.begin(db=db) self.transaction.__enter__() self.db = db self.prefix = prefix + self.reverse = reverse # FIXME def __iter__(self): self.iterator = LMDB.lmdb.Cursor(self.db, self.transaction) From 892e9524e503be2c7cb39ca8095a9b4b7fee66f5 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Thu, 3 Nov 2016 11:40:17 +0900 Subject: [PATCH 14/17] Fix bugs handling client UTXO requests Allow strings for ints - Electrum command line doesn't convert Don't blow away hash168s from the DB --- server/block_processor.py | 3 ++- server/cache.py | 10 ++++++---- server/protocol.py | 10 ++++++++-- 3 files changed, 16 insertions(+), 7 deletions(-) diff --git a/server/block_processor.py b/server/block_processor.py index 6c9483f..3887204 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -767,7 +767,8 @@ class BlockProcessor(LoggedClass): '''Returns the hash168 for a UTXO.''' hash168 = None if 0 <= index <= 65535: - hash168 = self.utxo_cache(tx_hash, struct.pack('= 0: - return param + try: + param = int(param) + except ValueError: + pass + else: + if param >= 0: + return param + raise RPCError('param should be a non-negative integer: {}' .format(param)) From c0568daec37f286e15346f88abf8ff47338602ad Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Thu, 3 Nov 2016 13:30:29 +0900 Subject: [PATCH 15/17] Rework the DB API a bit --- lib/coins.py | 12 +--- lib/util.py | 9 +++ server/block_processor.py | 77 ++++++++++------------- server/storage.py | 126 ++++++++++++++++++++++++++------------ 4 files changed, 130 insertions(+), 94 deletions(-) diff --git a/lib/coins.py b/lib/coins.py index 5c89832..a87b652 100644 --- a/lib/coins.py +++ b/lib/coins.py @@ -19,6 +19,7 @@ import sys from lib.hash import Base58, hash160, double_sha256, hash_to_str from lib.script import ScriptPubKey from lib.tx import Deserializer +from lib.util import subclasses class CoinError(Exception): @@ -34,21 +35,12 @@ class Coin(object): VALUE_PER_COIN = 100000000 CHUNK_SIZE=2016 - @staticmethod - def coin_classes(): - '''Return a list of coin classes in declaration order.''' - is_coin = lambda obj: (inspect.isclass(obj) - and issubclass(obj, Coin) - and obj != Coin) - pairs = inspect.getmembers(sys.modules[__name__], is_coin) - return [pair[1] for pair in pairs] - @classmethod def lookup_coin_class(cls, name, net): '''Return a coin class given name and network. Raise an exception if unrecognised.''' - for coin in cls.coin_classes(): + for coin in subclasses(Coin): if (coin.NAME.lower() == name.lower() and coin.NET.lower() == net.lower()): return coin diff --git a/lib/util.py b/lib/util.py index f59439b..dd8187e 100644 --- a/lib/util.py +++ b/lib/util.py @@ -9,6 +9,7 @@ import array +import inspect import logging import sys from collections import Container, Mapping @@ -77,6 +78,14 @@ def deep_getsizeof(obj): return size(obj) +def subclasses(base_class, strict=True): + '''Return a list of subclasses of base_class in its module.''' + def select(obj): + return (inspect.isclass(obj) and issubclass(obj, base_class) + and (not strict or obj != base_class)) + + pairs = inspect.getmembers(sys.modules[base_class.__module__], select) + return [pair[1] for pair in pairs] def chunks(items, size): '''Break up items, an iterable, into chunks of length size.''' diff --git a/server/block_processor.py b/server/block_processor.py index 3887204..0bc48e2 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -22,7 +22,7 @@ from server.daemon import DaemonError from lib.hash import hash_to_str from lib.script import ScriptPubKey from lib.util import chunks, LoggedClass -from server.storage import LMDB, RocksDB, LevelDB, NoDatabaseException +from server.storage import open_db def formatted_time(t): @@ -161,17 +161,17 @@ class BlockProcessor(LoggedClass): self.coin = env.coin self.reorg_limit = env.reorg_limit - # Chain state (initialize to genesis in case of new DB) - self.db_height = -1 - self.db_tx_count = 0 - self.db_tip = b'\0' * 32 - self.flush_count = 0 - self.utxo_flush_count = 0 - self.wall_time = 0 - self.first_sync = True - # Open DB and metadata files. Record some of its state. - self.db = self.open_db(self.coin, env.db_engine) + db_name = '{}-{}'.format(self.coin.NAME, self.coin.NET) + self.db = open_db(db_name, env.db_engine) + if self.db.is_new: + self.logger.info('created new {} database {}' + .format(env.db_engine, db_name)) + else: + self.logger.info('successfully opened {} database {}' + .format(env.db_engine, db_name)) + + self.init_state() self.tx_count = self.db_tx_count self.height = self.db_height self.tip = self.db_tip @@ -313,40 +313,29 @@ class BlockProcessor(LoggedClass): return self.fs_cache.block_hashes(start, count) - def open_db(self, coin, db_engine): - db_name = '{}-{}'.format(coin.NAME, coin.NET) - db_engine_class = { - "leveldb": LevelDB, - "rocksdb": RocksDB, - "lmdb": LMDB - }[db_engine.lower()] - try: - db = db_engine_class(db_name, create_if_missing=False, - error_if_exists=False, compression=None) - except NoDatabaseException: - db = db_engine_class(db_name, create_if_missing=True, - error_if_exists=True, compression=None) - self.logger.info('created new {} database {}'.format(db_engine, db_name)) + def init_state(self): + if self.db.is_new: + self.db_height = -1 + self.db_tx_count = 0 + self.db_tip = b'\0' * 32 + self.flush_count = 0 + self.utxo_flush_count = 0 + self.wall_time = 0 + self.first_sync = True else: - self.logger.info('successfully opened {} database {}'.format(db_engine, db_name)) - self.read_state(db) - - return db - - def read_state(self, db): - state = db.get(b'state') - state = ast.literal_eval(state.decode()) - if state['genesis'] != self.coin.GENESIS_HASH: - raise ChainError('DB genesis hash {} does not match coin {}' - .format(state['genesis_hash'], - self.coin.GENESIS_HASH)) - self.db_height = state['height'] - self.db_tx_count = state['tx_count'] - self.db_tip = state['tip'] - self.flush_count = state['flush_count'] - self.utxo_flush_count = state['utxo_flush_count'] - self.wall_time = state['wall_time'] - self.first_sync = state.get('first_sync', True) + state = self.db.get(b'state') + state = ast.literal_eval(state.decode()) + if state['genesis'] != self.coin.GENESIS_HASH: + raise ChainError('DB genesis hash {} does not match coin {}' + .format(state['genesis_hash'], + self.coin.GENESIS_HASH)) + self.db_height = state['height'] + self.db_tx_count = state['tx_count'] + self.db_tip = state['tip'] + self.flush_count = state['flush_count'] + self.utxo_flush_count = state['utxo_flush_count'] + self.wall_time = state['wall_time'] + self.first_sync = state.get('first_sync', True) def clean_db(self): '''Clean out stale DB items. diff --git a/server/storage.py b/server/storage.py index 399ed69..d4557d4 100644 --- a/server/storage.py +++ b/server/storage.py @@ -1,43 +1,83 @@ +# Copyright (c) 2016, the ElectrumX authors +# +# All rights reserved. +# +# See the file "LICENCE" for information about the copyright +# and warranty status of this software. + +'''Backend database abstraction. + +The abstraction needs to be improved to not heavily penalise LMDB. +''' + import os from functools import partial +from lib.util import subclasses + + +def open_db(name, db_engine): + '''Returns a database handle.''' + for db_class in subclasses(Storage): + if db_class.__name__.lower() == db_engine.lower(): + db_class.import_module() + return db_class(name) + + raise RuntimeError('unrecognised DB engine "{}"'.format(db_engine)) + class Storage(object): - def __init__(self, name, create_if_missing=False, error_if_exists=False, compression=None): - if not create_if_missing and not os.path.exists(name): - raise NoDatabaseException + '''Abstract base class of the DB backend abstraction.''' + + def __init__(self, name): + self.is_new = not os.path.exists(name) + self.open(name, create=self.is_new) + + @classmethod + def import_module(cls): + '''Import the DB engine module.''' + raise NotImplementedError + + def open(self, name, create): + '''Open an existing database or create a new one.''' + raise NotImplementedError def get(self, key): - raise NotImplementedError() + raise NotImplementedError def put(self, key, value): - raise NotImplementedError() + raise NotImplementedError def write_batch(self): - """ - Returns a context manager that provides `put` and `delete`. - Changes should only be committed when the context manager closes without an exception. - """ - raise NotImplementedError() + '''Return a context manager that provides `put` and `delete`. - def iterator(self, prefix=b'', reverse=False): - """ - Returns an iterator that yields (key, value) pairs from the database sorted by key. - If `prefix` is set, only keys starting with `prefix` will be included. - """ - raise NotImplementedError() + Changes should only be committed when the context manager + closes without an exception. + ''' + raise NotImplementedError + def iterator(self, prefix=b'', reverse=False): + '''Return an iterator that yields (key, value) pairs from the + database sorted by key. -class NoDatabaseException(Exception): - pass + If `prefix` is set, only keys starting with `prefix` will be + included. If `reverse` is True the items are returned in + reverse order. + ''' + raise NotImplementedError class LevelDB(Storage): - def __init__(self, name, create_if_missing=False, error_if_exists=False, compression=None): - super().__init__(name, create_if_missing, error_if_exists, compression) + '''LevelDB database engine.''' + + @classmethod + def import_module(cls): import plyvel - self.db = plyvel.DB(name, create_if_missing=create_if_missing, - error_if_exists=error_if_exists, compression=compression) + cls.module = plyvel + + def open(self, name, create): + self.db = self.module.DB(name, create_if_missing=create, + compression=None) self.get = self.db.get self.put = self.db.put self.iterator = self.db.iterator @@ -45,25 +85,28 @@ class LevelDB(Storage): class RocksDB(Storage): - rocksdb = None + '''RocksDB database engine.''' - def __init__(self, name, create_if_missing=False, error_if_exists=False, compression=None): - super().__init__(name, create_if_missing, error_if_exists, compression) + @classmethod + def import_module(cls): import rocksdb - RocksDB.rocksdb = rocksdb - if not compression: - compression = "no" - compression = getattr(rocksdb.CompressionType, compression + "_compression") - self.db = rocksdb.DB(name, rocksdb.Options(create_if_missing=create_if_missing, - compression=compression, - target_file_size_base=33554432, - max_open_files=1024)) + cls.module = rocksdb + + def open(self, name, create): + compression = "no" + compression = getattr(self.module.CompressionType, + compression + "_compression") + options = self.module.Options(create_if_missing=create, + compression=compression, + target_file_size_base=33554432, + max_open_files=1024) + self.db = self.module.DB(name, options) self.get = self.db.get self.put = self.db.put class WriteBatch(object): def __init__(self, db): - self.batch = RocksDB.rocksdb.WriteBatch() + self.batch = RocksDB.module.WriteBatch() self.db = db def __enter__(self): @@ -99,14 +142,17 @@ class RocksDB(Storage): class LMDB(Storage): - lmdb = None + '''RocksDB database engine.''' - def __init__(self, name, create_if_missing=False, error_if_exists=False, compression=None): - super().__init__(name, create_if_missing, error_if_exists, compression) + @classmethod + def import_module(cls): import lmdb - LMDB.lmdb = lmdb - self.env = lmdb.Environment(".", subdir=True, create=create_if_missing, max_dbs=32, map_size=5 * 10 ** 10) - self.db = self.env.open_db(create=create_if_missing) + cls.module = lmdb + + def open(self, name, create): + self.env = cls.module.Environment('.', subdir=True, create=create, + max_dbs=32, map_size=5 * 10 ** 10) + self.db = self.env.open_db(create=create) def get(self, key): with self.env.begin(db=self.db) as tx: From fb43712869accdd044187775c7a67bdd950f3cb9 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Thu, 3 Nov 2016 14:03:19 +0900 Subject: [PATCH 16/17] Controller clean up --- server/controller.py | 80 +------------------- server/protocol.py | 171 ++++++++++++++++++++++++++++++------------- 2 files changed, 125 insertions(+), 126 deletions(-) diff --git a/server/controller.py b/server/controller.py index 7519a17..cbbfaf1 100644 --- a/server/controller.py +++ b/server/controller.py @@ -20,7 +20,6 @@ from functools import partial from server.daemon import Daemon from server.block_processor import BlockProcessor from server.protocol import ElectrumX, LocalRPC, JSONRPC -from lib.hash import sha256, double_sha256, hash_to_str, hex_str_to_hash from lib.util import LoggedClass @@ -38,11 +37,10 @@ class Controller(LoggedClass): self.daemon = Daemon(env.daemon_url) self.block_processor = BlockProcessor(env, self.daemon, on_update=self.on_update) - JSONRPC.init(self.block_processor, self.coin) + JSONRPC.init(self.block_processor, self.daemon, self.coin, + self.add_job) self.servers = [] - self.addresses = {} self.jobs = asyncio.Queue() - self.peers = {} def start(self): '''Prime the event loop with asynchronous jobs.''' @@ -72,7 +70,7 @@ class Controller(LoggedClass): env = self.env loop = self.loop - protocol = partial(LocalRPC, self) + protocol = LocalRPC if env.rpc_port is not None: host = 'localhost' rpc_server = loop.create_server(protocol, host, env.rpc_port) @@ -80,7 +78,7 @@ class Controller(LoggedClass): self.logger.info('RPC server listening on {}:{:d}' .format(host, env.rpc_port)) - protocol = partial(ElectrumX, self, self.daemon, env) + protocol = partial(ElectrumX, env) if env.tcp_port is not None: tcp_server = loop.create_server(protocol, env.host, env.tcp_port) servers.append(await tcp_server) @@ -127,73 +125,3 @@ class Controller(LoggedClass): except Exception: # Getting here should probably be considered a bug and fixed traceback.print_exc() - - def address_status(self, hash168): - '''Returns status as 32 bytes.''' - status = self.addresses.get(hash168) - if status is None: - history = self.block_processor.get_history(hash168) - status = ''.join('{}:{:d}:'.format(hash_to_str(tx_hash), height) - for tx_hash, height in history) - if status: - status = sha256(status.encode()) - self.addresses[hash168] = status - - return status - - async def get_merkle(self, tx_hash, height): - '''tx_hash is a hex string.''' - block_hash = await self.daemon.send_single('getblockhash', (height,)) - block = await self.daemon.send_single('getblock', (block_hash, True)) - tx_hashes = block['tx'] - # This will throw if the tx_hash is bad - pos = tx_hashes.index(tx_hash) - - idx = pos - hashes = [hex_str_to_hash(txh) for txh in tx_hashes] - merkle_branch = [] - while len(hashes) > 1: - if len(hashes) & 1: - hashes.append(hashes[-1]) - idx = idx - 1 if (idx & 1) else idx + 1 - merkle_branch.append(hash_to_str(hashes[idx])) - idx //= 2 - hashes = [double_sha256(hashes[n] + hashes[n + 1]) - for n in range(0, len(hashes), 2)] - - return {"block_height": height, "merkle": merkle_branch, "pos": pos} - - def get_peers(self): - '''Returns a dictionary of IRC nick to (ip, host, ports) tuples, one - per peer.''' - return self.peers - - def height(self): - return self.block_processor.height - - def get_history(self, hash168): - history = self.block_processor.get_history(hash168, limit=None) - return [ - {'tx_hash': hash_to_str(tx_hash), 'height': height} - for tx_hash, height in history - ] - - def get_chunk(self, index): - '''Return header chunk as hex. Index is a non-negative integer.''' - chunk_size = self.coin.CHUNK_SIZE - next_height = self.height() + 1 - start_height = min(index * chunk_size, next_height) - count = min(next_height - start_height, chunk_size) - return self.block_processor.read_headers(start_height, count).hex() - - def get_balance(self, hash168): - confirmed = self.block_processor.get_balance(hash168) - unconfirmed = -1 # FIXME - return {'confirmed': confirmed, 'unconfirmed': unconfirmed} - - def list_unspent(self, hash168): - utxos = self.block_processor.get_utxos_sorted(hash168) - return tuple({'tx_hash': hash_to_str(utxo.tx_hash), - 'tx_pos': utxo.tx_pos, 'height': utxo.height, - 'value': utxo.value} - for utxo in utxos) diff --git a/server/protocol.py b/server/protocol.py index 7eef090..23f8333 100644 --- a/server/protocol.py +++ b/server/protocol.py @@ -16,7 +16,7 @@ import traceback from functools import partial from server.daemon import DaemonError -from lib.hash import hex_str_to_hash +from lib.hash import sha256, double_sha256, hash_to_str, hex_str_to_hash from lib.util import LoggedClass from server.version import VERSION @@ -33,19 +33,13 @@ def json_notification(method, params): class JSONRPC(asyncio.Protocol, LoggedClass): '''Base class that manages a JSONRPC connection.''' SESSIONS = set() - BLOCK_PROCESSOR = None - COIN = None - def __init__(self, controller): + def __init__(self): super().__init__() - self.controller = controller self.parts = [] self.send_count = 0 self.send_size = 0 self.error_count = 0 - self.subscribe_headers = False - self.subscribe_height = False - self.notified_height = None def connection_made(self, transport): '''Handle an incoming client connection.''' @@ -85,7 +79,7 @@ class JSONRPC(asyncio.Protocol, LoggedClass): except Exception as e: self.logger.info('error decoding JSON message'.format(e)) else: - self.controller.add_job(self.request_handler(message)) + self.ADD_JOB(self.request_handler(message)) async def request_handler(self, request): '''Called asynchronously.''' @@ -174,9 +168,11 @@ class JSONRPC(asyncio.Protocol, LoggedClass): raise RPCError('params should be empty: {}'.format(params)) @classmethod - def init(cls, block_processor, coin): + def init(cls, block_processor, daemon, coin, add_job): cls.BLOCK_PROCESSOR = block_processor + cls.DAEMON = daemon cls.COIN = coin + cls.ADD_JOB = add_job @classmethod def height(cls): @@ -196,6 +192,37 @@ class JSONRPC(asyncio.Protocol, LoggedClass): '''Used as response to a headers subscription request.''' return cls.electrum_header(cls.height()) + +class ElectrumX(JSONRPC): + '''A TCP server that handles incoming Electrum connections.''' + + def __init__(self, env): + super().__init__() + self.env = env + self.hash168s = set() + self.subscribe_headers = False + self.subscribe_height = False + self.notified_height = None + rpcs = [ + ('blockchain', + 'address.get_balance address.get_history address.get_mempool ' + 'address.get_proof address.listunspent address.subscribe ' + 'block.get_header block.get_chunk estimatefee headers.subscribe ' + 'numblocks.subscribe relayfee transaction.broadcast ' + 'transaction.get transaction.get_merkle utxo.get_address'), + ('server', + 'banner donation_address peers.subscribe version'), + ] + self.handlers = {'.'.join([prefix, suffix]): + getattr(self.__class__, suffix.replace('.', '_')) + for prefix, suffixes in rpcs + for suffix in suffixes.split()} + + @classmethod + def watched_address_count(cls): + return sum(len(session.hash168s) for session in self.SESSIONS + if isinstance(session, cls)) + @classmethod def notify(cls, height, touched): '''Notify electrum clients about height changes and touched @@ -220,49 +247,94 @@ class JSONRPC(asyncio.Protocol, LoggedClass): for hash168 in session.hash168s.intersection(touched): address = hash168_to_address(hash168) + status = cls.address_status(hash168) payload = json_notification('blockchain.address.subscribe', - (address, )) + (address, status)) session.json_send(payload) + @classmethod + def address_status(cls, hash168): + '''Returns status as 32 bytes.''' + history = cls.BLOCK_PROCESSOR.get_history(hash168) + status = ''.join('{}:{:d}:'.format(hash_to_str(tx_hash), height) + for tx_hash, height in history) + if status: + return sha256(status.encode()).hex() + return None -class ElectrumX(JSONRPC): - '''A TCP server that handles incoming Electrum connections.''' + @classmethod + async def tx_merkle(cls, tx_hash, height): + '''tx_hash is a hex string.''' + block_hash = await cls.DAEMON.send_single('getblockhash', (height,)) + block = await cls.DAEMON.send_single('getblock', (block_hash, True)) + tx_hashes = block['tx'] + # This will throw if the tx_hash is bad + pos = tx_hashes.index(tx_hash) + + idx = pos + hashes = [hex_str_to_hash(txh) for txh in tx_hashes] + merkle_branch = [] + while len(hashes) > 1: + if len(hashes) & 1: + hashes.append(hashes[-1]) + idx = idx - 1 if (idx & 1) else idx + 1 + merkle_branch.append(hash_to_str(hashes[idx])) + idx //= 2 + hashes = [double_sha256(hashes[n] + hashes[n + 1]) + for n in range(0, len(hashes), 2)] + + return {"block_height": height, "merkle": merkle_branch, "pos": pos} - def __init__(self, controller, daemon, env): - super().__init__(controller) - self.daemon = daemon - self.env = env - self.hash168s = set() - rpcs = [( - 'blockchain', - 'address.get_balance address.get_history address.get_mempool ' - 'address.get_proof address.listunspent address.subscribe ' - 'block.get_header block.get_chunk estimatefee headers.subscribe ' - 'numblocks.subscribe relayfee transaction.broadcast ' - 'transaction.get transaction.get_merkle utxo.get_address'), - ( - 'server', - 'banner donation_address peers.subscribe version'), + @classmethod + def irc_peers(cls): + '''Returns a dictionary of IRC nick to (ip, host, ports) tuples, one + per peer.''' + return {} + + @classmethod + def height(cls): + return cls.BLOCK_PROCESSOR.height + + @classmethod + def get_history(cls, hash168): + history = cls.BLOCK_PROCESSOR.get_history(hash168, limit=None) + return [ + {'tx_hash': hash_to_str(tx_hash), 'height': height} + for tx_hash, height in history ] - self.handlers = {'.'.join([prefix, suffix]): - getattr(self.__class__, suffix.replace('.', '_')) - for prefix, suffixes in rpcs - for suffix in suffixes.split()} @classmethod - def watched_address_count(cls): - return sum(len(session.hash168s) for session in self.SESSIONS - if isinstance(session, cls)) + def get_chunk(cls, index): + '''Return header chunk as hex. Index is a non-negative integer.''' + chunk_size = cls.COIN.CHUNK_SIZE + next_height = cls.height() + 1 + start_height = min(index * chunk_size, next_height) + count = min(next_height - start_height, chunk_size) + return cls.BLOCK_PROCESSOR.read_headers(start_height, count).hex() + + @classmethod + def get_balance(cls, hash168): + confirmed = cls.BLOCK_PROCESSOR.get_balance(hash168) + unconfirmed = -1 # FIXME + return {'confirmed': confirmed, 'unconfirmed': unconfirmed} + + @classmethod + def list_unspent(cls, hash168): + utxos = cls.BLOCK_PROCESSOR.get_utxos_sorted(hash168) + return tuple({'tx_hash': hash_to_str(utxo.tx_hash), + 'tx_pos': utxo.tx_pos, 'height': utxo.height, + 'value': utxo.value} + for utxo in utxos) # --- blockchain commands async def address_get_balance(self, params): hash168 = self.extract_hash168(params) - return self.controller.get_balance(hash168) + return self.get_balance(hash168) async def address_get_history(self, params): hash168 = self.extract_hash168(params) - return self.controller.get_history(hash168) + return self.get_history(hash168) async def address_get_mempool(self, params): hash168 = self.extract_hash168(params) @@ -274,24 +346,23 @@ class ElectrumX(JSONRPC): async def address_listunspent(self, params): hash168 = self.extract_hash168(params) - return self.controller.list_unspent(hash168) + return self.list_unspent(hash168) async def address_subscribe(self, params): hash168 = self.extract_hash168(params) self.hash168s.add(hash168) - status = self.controller.address_status(hash168) - return status.hex() if status else None + return self.address_status(hash168) async def block_get_chunk(self, params): index = self.extract_non_negative_integer(params) - return self.controller.get_chunk(index) + return self.get_chunk(index) async def block_get_header(self, params): height = self.extract_non_negative_integer(params) return self.electrum_header(height) async def estimatefee(self, params): - return await self.daemon.estimatefee(params) + return await self.DAEMON.estimatefee(params) async def headers_subscribe(self, params): self.require_empty_params(params) @@ -307,7 +378,7 @@ class ElectrumX(JSONRPC): '''The minimum fee a low-priority tx must pay in order to be accepted to the daemon's memory pool.''' self.require_empty_params(params) - return await self.daemon.relayfee() + return await self.DAEMON.relayfee() async def transaction_broadcast(self, params): '''Pass through the parameters to the daemon. @@ -318,7 +389,7 @@ class ElectrumX(JSONRPC): user interface job here. ''' try: - tx_hash = await self.daemon.sendrawtransaction(params) + tx_hash = await self.DAEMON.sendrawtransaction(params) self.logger.info('sent tx: {}'.format(tx_hash)) return tx_hash except DaemonError as e: @@ -344,7 +415,7 @@ class ElectrumX(JSONRPC): # in anticipation it might be dropped in the future. if 1 <= len(params) <= 2: tx_hash = self.tx_hash_from_param(params[0]) - return await self.daemon.getrawtransaction(tx_hash) + return await self.DAEMON.getrawtransaction(tx_hash) raise RPCError('params wrong length: {}'.format(params)) @@ -352,7 +423,7 @@ class ElectrumX(JSONRPC): if len(params) == 2: tx_hash = self.tx_hash_from_param(params[0]) height = self.non_negative_integer_from_param(params[1]) - return await self.controller.get_merkle(tx_hash, height) + return await self.tx_merkle(tx_hash, height) raise RPCError('params should contain a transaction hash and height') @@ -398,7 +469,7 @@ class ElectrumX(JSONRPC): subscription. ''' self.require_empty_params(params) - peers = self.controller.get_peers() + peers = ElectrumX.irc_peers() return tuple(peers.values()) async def version(self, params): @@ -417,7 +488,7 @@ class LocalRPC(JSONRPC): async def getinfo(self, params): return { 'blocks': self.height(), - 'peers': len(self.controller.get_peers()), + 'peers': len(ElectrumX.irc_peers()), 'sessions': len(self.SESSIONS), 'watched': ElectrumX.watched_address_count(), 'cached': 0, @@ -430,7 +501,7 @@ class LocalRPC(JSONRPC): return len(self.SESSIONS) async def peers(self, params): - return tuple(self.controller.get_peers().keys()) + return tuple(ElectrumX.irc_peers().keys()) async def numpeers(self, params): - return len(self.controller.get_peers()) + return len(ElectrumX.irc_peers()) From 1711834fe88410923b8cf1f4bb2db9bac0158795 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Thu, 3 Nov 2016 14:46:27 +0900 Subject: [PATCH 17/17] Release preparation --- HOWTO.rst | 16 ++++++++-------- README.rst | 24 +++++++++++------------- RELEASE-NOTES | 20 ++++++++++++++++++++ samples/scripts/NOTES | 23 +++++++++++++++-------- server/version.py | 2 +- 5 files changed, 55 insertions(+), 30 deletions(-) diff --git a/HOWTO.rst b/HOWTO.rst index af5be2e..e2ae867 100644 --- a/HOWTO.rst +++ b/HOWTO.rst @@ -32,12 +32,12 @@ recommend having at least 30-40GB free space. Database Engine =============== -You can choose between either RocksDB, LevelDB or LMDB to store transaction -information on disk. Currently, the fastest seems to be RocksDB with LevelDB -being about 10% slower. LMDB seems to be the slowest but maybe that's because -of bad implementation or configuration. +You can choose between either RocksDB, LevelDB or LMDB to store +transaction information on disk. Currently, the fastest seems to be +RocksDB with LevelDB being about 10% slower. LMDB is slowest but that +is because it is not yet efficiently abstracted. -You will need to install either: +You will need to install one of: + `plyvel `_ for LevelDB + `pyrocksdb `_ for RocksDB @@ -188,7 +188,7 @@ over the LAN from a bitcoind on machine B. Machine B: a late 2012 iMac running El-Capitan 10.11.6, 2.9GHz quad-core Intel i5 CPU with an HDD and 24GB RAM. Running bitcoind on -the same machine. HIST_MB of 350, UTXO_MB of 1,600. +the same machine. HIST_MB of 350, UTXO_MB of 1,600. LevelDB. For chains other than bitcoin-mainnet sychronization should be much faster. @@ -275,5 +275,5 @@ After flush-to-disk you may see an aiohttp error; this is the daemon timing out the connection while the disk flush was in progress. This is harmless. -The ETA is just a guide and can be quite volatile. It is too optimistic -initially. +The ETA is just a guide and can be quite volatile, particularly around +flushes. It is too optimistic initially. diff --git a/README.rst b/README.rst index ea84b5e..55165a6 100644 --- a/README.rst +++ b/README.rst @@ -68,26 +68,24 @@ Roadmap ======= - test a few more performance improvement ideas -- handle client connections (half-implemented but not functional) +- handle the mempool +- implement light caching of client responses +- yield during expensive requests and/or penalize the connection +- improve DB abstraction so LMDB is not penalized +- continue to clean up the code and remove layering violations +- store all UTXOs, not just those with addresses +- implement IRC connectivity - potentially move some functionality to C or C++ -Once I get round to writing the server part, I will add DoS -protections if necessary to defend against requests for large -histories. However with asyncio it would not surprise me if ElectrumX -could smoothly serve the whole history of the biggest Satoshi dice -address with minimal negative impact on other connections; we shall -have to see. If the requestor is running Electrum client I am -confident that it would collapse under the load far more quickly that -the server would; it is very inefficient at handling large wallets -and histories. +The above are in no particular order. Database Format =============== -The database and metadata formats of ElectrumX are very likely to -change in the future which will render old DBs unusable. For now I do -not intend to provide converters as the rate of flux is high. +The database and metadata formats of ElectrumX is certain to change in +the future which will render old DBs unusable. For now I do not +intend to provide converters as the rate of flux is high. Miscellany diff --git a/RELEASE-NOTES b/RELEASE-NOTES index 295f5d5..6ffc948 100644 --- a/RELEASE-NOTES +++ b/RELEASE-NOTES @@ -1,3 +1,23 @@ +Version 0.04 +------------ + +- made the DB interface a little faster for LevelDB and RocksDB; this was + a small regression in 0.03 +- fixed a bug that prevented block reorgs from working +- implement and enable client connectivity. This is not yet ready for + public use for several reasons. Local RPC, and remote TCP and SSL + connections are all supported in the same way as Electrum-server. + ElectrumX does not begin listening for incoming connections until it + has caught up with the daemon's height. Which ports it is listening + on will appear in the logs when it starts listening. The complete + Electrum wire protocol is implemented, so it is possible to now use + as a server for your own Electrum client. Note that mempools are + not yet handled so unconfirmed transactions will not be notified or + appear; they will appear once they get in a block. Also no + responses are cached, so performance would likely degrade if used by + many clients. I welcome feedback on your experience using this. + + Version 0.03 ------------ diff --git a/samples/scripts/NOTES b/samples/scripts/NOTES index 642ce1f..091ae6e 100644 --- a/samples/scripts/NOTES +++ b/samples/scripts/NOTES @@ -1,8 +1,8 @@ The following environment variables are required: -DB_DIRECTORY - path to the database directory (if relative, to run script) -USERNAME - the username the server will run as -SERVER_MAIN - path to the server_main.py script (if relative, to run script) +DB_DIRECTORY - path to the database directory (if relative, to `run` script) +USERNAME - the username the server will run as if using `run` script +SERVER_MAIN - path to the server_main.py script (if relative, to `run` script) DAEMON_URL - the URL used to connect to the daemon. Should be of the form http://username:password@hostname:port/ Alternatively you can specify DAEMON_USERNAME, DAEMON_PASSWORD, @@ -14,10 +14,22 @@ sensible defaults if not specified. COIN - see lib/coins.py, must be a coin NAME. Defaults to Bitcoin. NETWORK - see lib/coins.py, must be a coin NET. Defaults to mainnet. +DB_ENGINE - database engine for the transaction database. Default is + leveldb. Supported alternatives are rocksdb and lmdb. + You will need to install the appropriate python packages. + Not case sensitive. REORG_LIMIT - maximum number of blocks to be able to handle in a chain reorganisation. ElectrumX retains some fairly compact undo information for this many blocks in levelDB. Default is 200. +TCP_PORT - if set will serve Electrum clients on that port +SSL_PORT - if set will serve Electrum clients over SSL on that port. + If set SSL_CERTFILE and SSL_KEYFILE must be filesystem paths +RPC_PORT - Listen on this port for local RPC connections, defaults to + 8000. +BANNER_FILE - a path to a banner file to serve to clients. The banner file + is re-read for each new client. +DONATION_ADDRESS - server donation address. Defaults to none. Your performance might change by tweaking these cache settings. Cache size is only checked roughly every minute, so the caches can grow @@ -41,8 +53,3 @@ UTXO_MB - amount of UTXO and history cache, in MB, to retain before leveldb caching and Python GC effects. However this may be very dependent on hardware and you may have different results. - -DB_ENGINE - database engine for the transaction database. Default is - leveldb. Supported alternatives are rocksdb and lmdb, - which will require installation of the appropriate python - packages. \ No newline at end of file diff --git a/server/version.py b/server/version.py index c83fa77..285207d 100644 --- a/server/version.py +++ b/server/version.py @@ -1 +1 @@ -VERSION = "ElectrumX 0.03" +VERSION = "ElectrumX 0.04"