diff --git a/HOWTO.rst b/HOWTO.rst index af5be2e..63c67cc 100644 --- a/HOWTO.rst +++ b/HOWTO.rst @@ -32,12 +32,12 @@ recommend having at least 30-40GB free space. Database Engine =============== -You can choose between either RocksDB, LevelDB or LMDB to store transaction -information on disk. Currently, the fastest seems to be RocksDB with LevelDB -being about 10% slower. LMDB seems to be the slowest but maybe that's because -of bad implementation or configuration. +You can choose from RocksDB, LevelDB or LMDB to store transaction +information on disk. Currently, the fastest seems to be RocksDB with +LevelDB being about 10% slower. LMDB is slowest but that is because it +is not yet efficiently abstracted. -You will need to install either: +You will need to install one of: + `plyvel `_ for LevelDB + `pyrocksdb `_ for RocksDB @@ -188,7 +188,7 @@ over the LAN from a bitcoind on machine B. Machine B: a late 2012 iMac running El-Capitan 10.11.6, 2.9GHz quad-core Intel i5 CPU with an HDD and 24GB RAM. Running bitcoind on -the same machine. HIST_MB of 350, UTXO_MB of 1,600. +the same machine. HIST_MB of 350, UTXO_MB of 1,600. LevelDB. For chains other than bitcoin-mainnet sychronization should be much faster. @@ -275,5 +275,5 @@ After flush-to-disk you may see an aiohttp error; this is the daemon timing out the connection while the disk flush was in progress. This is harmless. -The ETA is just a guide and can be quite volatile. It is too optimistic -initially. +The ETA is just a guide and can be quite volatile, particularly around +flushes. It is too optimistic initially. diff --git a/README.rst b/README.rst index ea84b5e..55165a6 100644 --- a/README.rst +++ b/README.rst @@ -68,26 +68,24 @@ Roadmap ======= - test a few more performance improvement ideas -- handle client connections (half-implemented but not functional) +- handle the mempool +- implement light caching of client responses +- yield during expensive requests and/or penalize the connection +- improve DB abstraction so LMDB is not penalized +- continue to clean up the code and remove layering violations +- store all UTXOs, not just those with addresses +- implement IRC connectivity - potentially move some functionality to C or C++ -Once I get round to writing the server part, I will add DoS -protections if necessary to defend against requests for large -histories. However with asyncio it would not surprise me if ElectrumX -could smoothly serve the whole history of the biggest Satoshi dice -address with minimal negative impact on other connections; we shall -have to see. If the requestor is running Electrum client I am -confident that it would collapse under the load far more quickly that -the server would; it is very inefficient at handling large wallets -and histories. +The above are in no particular order. Database Format =============== -The database and metadata formats of ElectrumX are very likely to -change in the future which will render old DBs unusable. For now I do -not intend to provide converters as the rate of flux is high. +The database and metadata formats of ElectrumX is certain to change in +the future which will render old DBs unusable. For now I do not +intend to provide converters as the rate of flux is high. Miscellany diff --git a/RELEASE-NOTES b/RELEASE-NOTES index 295f5d5..6ffc948 100644 --- a/RELEASE-NOTES +++ b/RELEASE-NOTES @@ -1,3 +1,23 @@ +Version 0.04 +------------ + +- made the DB interface a little faster for LevelDB and RocksDB; this was + a small regression in 0.03 +- fixed a bug that prevented block reorgs from working +- implement and enable client connectivity. This is not yet ready for + public use for several reasons. Local RPC, and remote TCP and SSL + connections are all supported in the same way as Electrum-server. + ElectrumX does not begin listening for incoming connections until it + has caught up with the daemon's height. Which ports it is listening + on will appear in the logs when it starts listening. The complete + Electrum wire protocol is implemented, so it is possible to now use + as a server for your own Electrum client. Note that mempools are + not yet handled so unconfirmed transactions will not be notified or + appear; they will appear once they get in a block. Also no + responses are cached, so performance would likely degrade if used by + many clients. I welcome feedback on your experience using this. + + Version 0.03 ------------ diff --git a/electrumx_rpc.py b/electrumx_rpc.py index 68fc74c..2ce4dee 100755 --- a/electrumx_rpc.py +++ b/electrumx_rpc.py @@ -1,8 +1,15 @@ #!/usr/bin/env python3 - -# See the file "LICENSE" for information about the copyright +# +# Copyright (c) 2016, Neil Booth +# +# All rights reserved. +# +# See the file "LICENCE" for information about the copyright # and warranty status of this software. +'''Script to send RPC commands to a running ElectrumX server.''' + + import argparse import asyncio import json diff --git a/lib/coins.py b/lib/coins.py index 30a0b12..a87b652 100644 --- a/lib/coins.py +++ b/lib/coins.py @@ -1,18 +1,29 @@ -# See the file "LICENSE" for information about the copyright +# Copyright (c) 2016, Neil Booth +# +# All rights reserved. +# +# See the file "LICENCE" for information about the copyright # and warranty status of this software. +'''Module providing coin abstraction. + +Anything coin-specific should go in this file and be subclassed where +necessary for appropriate handling. +''' from decimal import Decimal import inspect +import struct import sys -from lib.hash import Base58, hash160, double_sha256 +from lib.hash import Base58, hash160, double_sha256, hash_to_str from lib.script import ScriptPubKey from lib.tx import Deserializer +from lib.util import subclasses class CoinError(Exception): - pass + '''Exception raised for coin-related errors.''' class Coin(object): @@ -22,19 +33,14 @@ class Coin(object): HEADER_LEN = 80 DEFAULT_RPC_PORT = 8332 VALUE_PER_COIN = 100000000 - - @staticmethod - def coins(): - is_coin = lambda obj: (inspect.isclass(obj) - and issubclass(obj, Coin) - and obj != Coin) - pairs = inspect.getmembers(sys.modules[__name__], is_coin) - # Returned in the order they appear in this file - return [pair[1] for pair in pairs] + CHUNK_SIZE=2016 @classmethod def lookup_coin_class(cls, name, net): - for coin in cls.coins(): + '''Return a coin class given name and network. + + Raise an exception if unrecognised.''' + for coin in subclasses(Coin): if (coin.NAME.lower() == name.lower() and coin.NET.lower() == net.lower()): return coin @@ -43,13 +49,14 @@ class Coin(object): @staticmethod def lookup_xverbytes(verbytes): + '''Return a (is_xpub, coin_class) pair given xpub/xprv verbytes.''' # Order means BTC testnet will override NMC testnet - for coin in Coin.coins(): + for coin in Coin.coin_classes(): if verbytes == coin.XPUB_VERBYTES: return True, coin if verbytes == coin.XPRV_VERBYTES: return False, coin - raise CoinError("version bytes unrecognised") + raise CoinError('version bytes unrecognised') @classmethod def address_to_hash168(cls, addr): @@ -62,6 +69,11 @@ class Coin(object): raise CoinError('invalid address: {}'.format(addr)) return result + @classmethod + def hash168_to_address(cls, hash168): + '''Return an address given a 21-byte hash.''' + return Base58.encode_check(hash168) + @classmethod def P2PKH_address_from_hash160(cls, hash_bytes): '''Return a P2PKH address given a public key.''' @@ -129,7 +141,7 @@ class Coin(object): @classmethod def prvkey_WIF(privkey_bytes, compressed): - "Return the private key encoded in Wallet Import Format." + '''Return the private key encoded in Wallet Import Format.''' payload = bytearray([cls.WIF_BYTE]) + privkey_bytes if compressed: payload.append(0x01) @@ -137,20 +149,39 @@ class Coin(object): @classmethod def header_hashes(cls, header): - '''Given a header return the previous block hash and the current block - hash.''' + '''Given a header return the previous and current block hashes.''' return header[4:36], double_sha256(header) @classmethod def read_block(cls, block): - '''Read a block and return (header, tx_hashes, txs)''' + '''Return a tuple (header, tx_hashes, txs) given a raw block.''' header, rest = block[:cls.HEADER_LEN], block[cls.HEADER_LEN:] return (header, ) + Deserializer(rest).read_block() @classmethod def decimal_value(cls, value): + '''Return the number of standard coin units as a Decimal given a + quantity of smallest units. + + For example 1 BTC is returned for 100 million satoshis. + ''' return Decimal(value) / cls.VALUE_PER_COIN + @classmethod + def electrum_header(cls, header, height): + version, = struct.unpack(' + ---------- ------------------- ---------- + - Daemon -<<<<<<<<- Block processor ->>>>- Caches - + ---------- ------------------- ---------- + < < > < + -------------- ----------- + - Prefetcher - - Storage - + -------------- ----------- + + +Env +--- + +Holds configuration taken from the environment. Handles defaults +appropriately. Generally passed to the constructor of other +components which take their settings from it. + + +LocalRPC +-------- + +Handles local JSON RPC connections querying ElectrumX server state. +Not started until the block processor has caught up with the daemon. + +ElectrumX +--------- + +Handles JSON Electrum client connections over TCP or SSL. One +instance per client session. Should be the only component concerned +with the details of the Electrum wire protocol. Responsible for +caching of client responses. Not started until the block processor +has caught up with the daemon. Logically, if not yet in practice, a +coin-specific class. + +Daemon +------ + +Used by the block processor, ElectrumX servers and prefetcher. +Encapsulates daemon RPC wire protcol. Logically, if not yet in +practice, a coin-specific class. + +Block Processor +--------------- + +Responsible for managing block chain state (UTXO set, history, +transaction and undo information) and processing towards the chain +tip. Uses the caches for in-memory state caching. Flushes state to +the storage layer. Reponsible for handling block chain +reorganisations. Once caught up maintains a representation of daemon +mempool state. + +Caches +------ + +The file system cache and the UTXO cache are implementation details of +the block processor, nothing else should interface with them. + +Storage +------- + +Backend database abstraction. Along with the host filesystem, used by +the block processor (and therefore its caches) to store chain state. + +Prefetcher +---------- + +Used by the block processor to asynchronously prefetch blocks from the +daemon. Holds fetched block height. Once it has caught up +additionally obtains daemon mempool tx hashes. Serves blocks and +mempool hashes to the block processor via a queue. + +IRC +--- + +Not currently imlpemented; will handle IRC communication for the +ElectrumX servers. + +Controller +---------- + +A historical artefact that currently coordinates some of the above +components. Not pictured as it is doesn't seem to have a logical +place and so is probably going away. diff --git a/server/block_processor.py b/server/block_processor.py index 526065f..0bc48e2 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -1,6 +1,13 @@ -# See the file "LICENSE" for information about the copyright +# Copyright (c) 2016, Neil Booth +# +# All rights reserved. +# +# See the file "LICENCE" for information about the copyright # and warranty status of this software. +'''Block prefetcher and chain processor.''' + + import array import ast import asyncio @@ -15,7 +22,7 @@ from server.daemon import DaemonError from lib.hash import hash_to_str from lib.script import ScriptPubKey from lib.util import chunks, LoggedClass -from server.storage import LMDB, RocksDB, LevelDB, NoDatabaseException +from server.storage import open_db def formatted_time(t): @@ -42,16 +49,12 @@ class Prefetcher(LoggedClass): self.semaphore = asyncio.Semaphore() self.queue = asyncio.Queue() self.queue_size = 0 + self.fetched_height = height + self.mempool = [] # Target cache size. Has little effect on sync time. self.target_cache_size = 10 * 1024 * 1024 - self.fetched_height = height - self.recent_sizes = [0] - - async def get_blocks(self): - '''Returns a list of prefetched blocks.''' - blocks, total_size = await self.queue.get() - self.queue_size -= total_size - return blocks + # First fetch to be 10 blocks + self.ave_size = self.target_cache_size // 10 async def clear(self, height): '''Clear prefetched blocks and restart from the given height. @@ -66,49 +69,73 @@ class Prefetcher(LoggedClass): self.queue_size = 0 self.fetched_height = height + async def get_blocks(self): + '''Returns a list of prefetched blocks and the mempool.''' + blocks, height, size = await self.queue.get() + self.queue_size -= size + if height == self.daemon.cached_height(): + return blocks, self.mempool + else: + return blocks, None + async def start(self): '''Loop forever polling for more blocks.''' - self.logger.info('looping forever prefetching blocks...') + self.logger.info('starting prefetch loop...') while True: try: - with await self.semaphore: - count = await self._prefetch() - if not count: + if await self._caught_up(): await asyncio.sleep(5) + else: + await asyncio.sleep(0) except DaemonError as e: self.logger.info('ignoring daemon errors: {}'.format(e)) - def _prefill_count(self, room): - ave_size = sum(self.recent_sizes) // len(self.recent_sizes) - count = room // ave_size if ave_size else 0 - return max(count, 10) + async def _caught_up(self): + '''Poll for new blocks and mempool state. + + Mempool is only queried if caught up with daemon.''' + with await self.semaphore: + blocks, size = await self._prefetch() + self.fetched_height += len(blocks) + caught_up = self.fetched_height == self.daemon.cached_height() + if caught_up: + self.mempool = await self.daemon.mempool_hashes() + + # Wake up block processor if we have something + if blocks or caught_up: + self.queue.put_nowait((blocks, self.fetched_height, size)) + self.queue_size += size + + return caught_up async def _prefetch(self): - '''Prefetch blocks if there are any to prefetch.''' + '''Prefetch blocks unless the prefetch queue is full.''' + if self.queue_size >= self.target_cache_size: + return [], 0 + daemon_height = await self.daemon.height() - max_count = min(daemon_height - self.fetched_height, 4000) - count = min(max_count, self._prefill_count(self.target_cache_size)) + cache_room = self.target_cache_size // self.ave_size + + # Try and catch up all blocks but limit to room in cache. + # Constrain count to between 0 and 4000 regardless + count = min(daemon_height - self.fetched_height, cache_room) + count = min(4000, max(count, 0)) if not count: - return 0 + return [], 0 + first = self.fetched_height + 1 hex_hashes = await self.daemon.block_hex_hashes(first, count) - if not hex_hashes: - self.logger.error('requested {:,d} hashes, got none'.format(count)) - return 0 - blocks = await self.daemon.raw_blocks(hex_hashes) - sizes = [len(block) for block in blocks] - total_size = sum(sizes) - self.queue.put_nowait((blocks, total_size)) - self.queue_size += total_size - self.fetched_height += len(blocks) - # Keep 50 most recent block sizes for fetch count estimation - self.recent_sizes.extend(sizes) - excess = len(self.recent_sizes) - 50 - if excess > 0: - self.recent_sizes = self.recent_sizes[excess:] - return count + size = sum(len(block) for block in blocks) + + # Update our recent average block size estimate + if count >= 10: + self.ave_size = size // count + else: + self.ave_size = (size + (10 - count) * self.ave_size) // 10 + + return blocks, size class BlockProcessor(LoggedClass): @@ -118,30 +145,33 @@ class BlockProcessor(LoggedClass): Coordinate backing up in case of chain reorganisations. ''' - def __init__(self, env, daemon): + def __init__(self, env, daemon, on_update=None): + '''on_update is awaitable, and called only when caught up with the + daemon and a new block arrives or the mempool is updated. + ''' super().__init__() self.daemon = daemon + self.on_update = on_update # Meta self.utxo_MB = env.utxo_MB self.hist_MB = env.hist_MB self.next_cache_check = 0 self.coin = env.coin - self.caught_up = False self.reorg_limit = env.reorg_limit - # Chain state (initialize to genesis in case of new DB) - self.db_height = -1 - self.db_tx_count = 0 - self.db_tip = b'\0' * 32 - self.flush_count = 0 - self.utxo_flush_count = 0 - self.wall_time = 0 - self.first_sync = True - # Open DB and metadata files. Record some of its state. - self.db = self.open_db(self.coin, env.db_engine) + db_name = '{}-{}'.format(self.coin.NAME, self.coin.NET) + self.db = open_db(db_name, env.db_engine) + if self.db.is_new: + self.logger.info('created new {} database {}' + .format(env.db_engine, db_name)) + else: + self.logger.info('successfully opened {} database {}' + .format(env.db_engine, db_name)) + + self.init_state() self.tx_count = self.db_tx_count self.height = self.db_height self.tip = self.db_tip @@ -150,7 +180,6 @@ class BlockProcessor(LoggedClass): # entry per block self.history = defaultdict(partial(array.array, 'I')) self.history_size = 0 - self.backup_hash168s = set() self.utxo_cache = UTXOCache(self, self.db, self.coin) self.fs_cache = FSCache(self.coin, self.height, self.tx_count) self.prefetcher = Prefetcher(daemon, self.height) @@ -158,8 +187,9 @@ class BlockProcessor(LoggedClass): self.last_flush = time.time() self.last_flush_tx_count = self.tx_count - # Redirected member func + # Redirected member funcs self.get_tx_hash = self.fs_cache.get_tx_hash + self.read_headers = self.fs_cache.read_headers # Log state self.logger.info('{}/{} height: {:,d} tx count: {:,d} ' @@ -187,36 +217,42 @@ class BlockProcessor(LoggedClass): async def start(self): '''External entry point for block processing. - A simple wrapper that safely flushes the DB on clean - shutdown. + Safely flushes the DB on clean shutdown. ''' try: - await self.advance_blocks() + while True: + await self._wait_for_update() + await asyncio.sleep(0) # Yield finally: self.flush(True) - async def advance_blocks(self): - '''Loop forever processing blocks in the forward direction.''' - while True: - blocks = await self.prefetcher.get_blocks() - for block in blocks: - if not self.advance_block(block): - await self.handle_chain_reorg() - self.caught_up = False - break - await asyncio.sleep(0) # Yield - - if self.height != self.daemon.cached_height(): - continue + async def _wait_for_update(self): + '''Wait for the prefetcher to deliver blocks or a mempool update. - if not self.caught_up: - self.caught_up = True - self.logger.info('caught up to height {:,d}' - .format(self.height)) + Blocks are only processed in the forward direction. The + prefetcher only provides a non-None mempool when caught up. + ''' + all_touched = [] + blocks, mempool = await self.prefetcher.get_blocks() + for block in blocks: + touched = self.advance_block(block) + if touched is None: + all_touched.append(await self.handle_chain_reorg()) + mempool = None + break + all_touched.append(touched) + await asyncio.sleep(0) # Yield - # Flush everything when in caught-up state as queries - # are performed on DB not in-memory + if mempool is not None: + # Caught up to daemon height. Flush everything as queries + # are performed on the DB and not in-memory. self.flush(True) + if self.first_sync: + self.first_sync = False + self.logger.info('synced to height {:,d}'.format(self.height)) + if self.on_update: + await self.on_update(self.height, set.union(*all_touched)) + async def force_chain_reorg(self, to_genesis): try: @@ -229,16 +265,21 @@ class BlockProcessor(LoggedClass): self.logger.info('chain reorg detected') self.flush(True) self.logger.info('finding common height...') + + touched = set() hashes = await self.reorg_hashes(to_genesis) # Reverse and convert to hex strings. hashes = [hash_to_str(hash) for hash in reversed(hashes)] for hex_hashes in chunks(hashes, 50): blocks = await self.daemon.raw_blocks(hex_hashes) - self.backup_blocks(blocks) + touched.update(self.backup_blocks(blocks)) + self.logger.info('backed up to height {:,d}'.format(self.height)) await self.prefetcher.clear(self.height) self.logger.info('prefetcher reset') + return touched + async def reorg_hashes(self, to_genesis): '''Return the list of hashes to back up beacuse of a reorg. @@ -253,7 +294,6 @@ class BlockProcessor(LoggedClass): start = self.height - 1 count = 1 while start > 0: - self.logger.info('start: {:,d} count: {:,d}'.format(start, count)) hashes = self.fs_cache.block_hashes(start, count) hex_hashes = [hash_to_str(hash) for hash in hashes] d_hex_hashes = await self.daemon.block_hex_hashes(start, count) @@ -273,40 +313,29 @@ class BlockProcessor(LoggedClass): return self.fs_cache.block_hashes(start, count) - def open_db(self, coin, db_engine): - db_name = '{}-{}'.format(coin.NAME, coin.NET) - db_engine_class = { - "leveldb": LevelDB, - "rocksdb": RocksDB, - "lmdb": LMDB - }[db_engine.lower()] - try: - db = db_engine_class(db_name, create_if_missing=False, - error_if_exists=False, compression=None) - except NoDatabaseException: - db = db_engine_class(db_name, create_if_missing=True, - error_if_exists=True, compression=None) - self.logger.info('created new {} database {}'.format(db_engine, db_name)) + def init_state(self): + if self.db.is_new: + self.db_height = -1 + self.db_tx_count = 0 + self.db_tip = b'\0' * 32 + self.flush_count = 0 + self.utxo_flush_count = 0 + self.wall_time = 0 + self.first_sync = True else: - self.logger.info('successfully opened {} database {}'.format(db_engine, db_name)) - self.read_state(db) - - return db - - def read_state(self, db): - state = db.get(b'state') - state = ast.literal_eval(state.decode()) - if state['genesis'] != self.coin.GENESIS_HASH: - raise ChainError('DB genesis hash {} does not match coin {}' - .format(state['genesis_hash'], - self.coin.GENESIS_HASH)) - self.db_height = state['height'] - self.db_tx_count = state['tx_count'] - self.db_tip = state['tip'] - self.flush_count = state['flush_count'] - self.utxo_flush_count = state['utxo_flush_count'] - self.wall_time = state['wall_time'] - self.first_sync = state.get('first_sync', True) + state = self.db.get(b'state') + state = ast.literal_eval(state.decode()) + if state['genesis'] != self.coin.GENESIS_HASH: + raise ChainError('DB genesis hash {} does not match coin {}' + .format(state['genesis_hash'], + self.coin.GENESIS_HASH)) + self.db_height = state['height'] + self.db_tx_count = state['tx_count'] + self.db_tip = state['tip'] + self.flush_count = state['flush_count'] + self.utxo_flush_count = state['utxo_flush_count'] + self.wall_time = state['wall_time'] + self.first_sync = state.get('first_sync', True) def clean_db(self): '''Clean out stale DB items. @@ -358,8 +387,6 @@ class BlockProcessor(LoggedClass): def flush_state(self, batch): '''Flush chain state to the batch.''' - if self.caught_up: - self.first_sync = False now = time.time() self.wall_time += now - self.last_flush self.last_flush = now @@ -392,14 +419,13 @@ class BlockProcessor(LoggedClass): assert not self.history assert not self.utxo_cache.cache assert not self.utxo_cache.db_cache - assert not self.backup_hash168s - def flush(self, flush_utxos=False): + def flush(self, flush_utxos=False, flush_history=None): '''Flush out cached state. History is always flushed. UTXOs are flushed if flush_utxos.''' if self.height == self.db_height: - self.logger.info('nothing to flush') + assert flush_history is None self.assert_flushed() return @@ -413,15 +439,14 @@ class BlockProcessor(LoggedClass): # matter. But if writing the files fails we do not want to # have updated the DB. if self.height > self.db_height: + assert flush_history is None + flush_history = self.flush_history self.fs_cache.flush(self.height, self.tx_count) with self.db.write_batch() as batch: # History first - fast and frees memory. Flush state last # as it reads the wall time. - if self.height > self.db_height: - self.flush_history(batch) - else: - self.backup_history(batch) + flush_history(batch) if flush_utxos: self.flush_utxos(batch) self.flush_state(batch) @@ -457,7 +482,6 @@ class BlockProcessor(LoggedClass): def flush_history(self, batch): self.logger.info('flushing history') - assert not self.backup_hash168s self.flush_count += 1 flush_id = struct.pack('>H', self.flush_count) @@ -472,20 +496,20 @@ class BlockProcessor(LoggedClass): self.history = defaultdict(partial(array.array, 'I')) self.history_size = 0 - def backup_history(self, batch): + def backup_history(self, batch, hash168s): self.logger.info('backing up history to height {:,d} tx_count {:,d}' .format(self.height, self.tx_count)) # Drop any NO_CACHE entry - self.backup_hash168s.discard(NO_CACHE_ENTRY) + hash168s.discard(NO_CACHE_ENTRY) assert not self.history nremoves = 0 - for hash168 in sorted(self.backup_hash168s): + for hash168 in sorted(hash168s): prefix = b'H' + hash168 deletes = [] puts = {} - for key, hist in self.db.iterator(reverse=True, prefix=prefix): + for key, hist in self.db.iterator(prefix=prefix, reverse=True): a = array.array('I') a.frombytes(hist) # Remove all history entries >= self.tx_count @@ -502,8 +526,7 @@ class BlockProcessor(LoggedClass): batch.put(key, value) self.logger.info('removed {:,d} history entries from {:,d} addresses' - .format(nremoves, len(self.backup_hash168s))) - self.backup_hash168s = set() + .format(nremoves, len(hash168s))) def cache_sizes(self): '''Returns the approximate size of the cache, in MB.''' @@ -547,14 +570,15 @@ class BlockProcessor(LoggedClass): # the UTXO cache uses the fs_cache via get_tx_hash() to # resolve compressed key collisions header, tx_hashes, txs = self.coin.read_block(block) - self.fs_cache.advance_block(header, tx_hashes, txs) prev_hash, header_hash = self.coin.header_hashes(header) if prev_hash != self.tip: - return False + return None + touched = set() + self.fs_cache.advance_block(header, tx_hashes, txs) self.tip = header_hash self.height += 1 - undo_info = self.advance_txs(tx_hashes, txs) + undo_info = self.advance_txs(tx_hashes, txs, touched) if self.daemon.cached_height() - self.height <= self.reorg_limit: self.write_undo_info(self.height, b''.join(undo_info)) @@ -566,9 +590,9 @@ class BlockProcessor(LoggedClass): if utxo_MB >= self.utxo_MB or hist_MB >= self.hist_MB: self.flush(utxo_MB >= self.utxo_MB) - return True + return touched - def advance_txs(self, tx_hashes, txs): + def advance_txs(self, tx_hashes, txs, touched): put_utxo = self.utxo_cache.put spend_utxo = self.utxo_cache.spend undo_info = [] @@ -605,6 +629,7 @@ class BlockProcessor(LoggedClass): for hash168 in hash168s: history[hash168].append(tx_num) self.history_size += len(hash168s) + touched.update(hash168s) tx_num += 1 self.tx_count = tx_num @@ -620,6 +645,7 @@ class BlockProcessor(LoggedClass): self.logger.info('backing up {:,d} blocks'.format(len(blocks))) self.assert_flushed() + touched = set() for block in blocks: header, tx_hashes, txs = self.coin.read_block(block) prev_hash, header_hash = self.coin.header_hashes(header) @@ -628,15 +654,18 @@ class BlockProcessor(LoggedClass): .format(hash_to_str(header_hash), hash_to_str(self.tip), self.height)) - self.backup_txs(tx_hashes, txs) + self.backup_txs(tx_hashes, txs, touched) self.fs_cache.backup_block() self.tip = prev_hash self.height -= 1 self.logger.info('backed up to height {:,d}'.format(self.height)) - self.flush(True) - def backup_txs(self, tx_hashes, txs): + flush_history = partial(self.backup_history, hash168s=touched) + self.flush(True, flush_history=flush_history) + return touched + + def backup_txs(self, tx_hashes, txs, touched): # Prevout values, in order down the block (coinbase first if present) # undo_info is in reverse block order undo_info = self.read_undo_info(self.height) @@ -646,7 +675,6 @@ class BlockProcessor(LoggedClass): pack = struct.pack put_utxo = self.utxo_cache.put spend_utxo = self.utxo_cache.spend - hash168s = self.backup_hash168s rtxs = reversed(txs) rtx_hashes = reversed(tx_hashes) @@ -655,7 +683,7 @@ class BlockProcessor(LoggedClass): # Spend the outputs for idx, txout in enumerate(tx.outputs): cache_value = spend_utxo(tx_hash, idx) - hash168s.add(cache_value[:21]) + touched.add(cache_value[:21]) # Restore the inputs if not tx.is_coinbase: @@ -664,7 +692,7 @@ class BlockProcessor(LoggedClass): undo_item = undo_info[n:n + 33] put_utxo(txin.prev_hash + pack(' self.height + len(self.headers): - raise Exception('no header information for height {:,d}' - .format(height)) - header = self.read_headers(self.height, 1) - unpack = struct.unpack - version, = unpack('= 3.5.3 + ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2) + ssl_context.load_cert_chain(env.ssl_certfile, + keyfile=env.ssl_keyfile) + ssl_server = loop.create_server(protocol, env.host, env.ssl_port, + ssl=ssl_context) + servers.append(await ssl_server) self.logger.info('SSL server listening on {}:{:d}' .format(env.host, env.ssl_port)) + return servers + def stop(self): '''Close the listening servers.''' for server in self.servers: @@ -85,81 +110,18 @@ class Controller(LoggedClass): for task in asyncio.Task.all_tasks(self.loop): task.cancel() - def add_session(self, session): - self.sessions.add(session) - - def remove_session(self, session): - self.sessions.remove(session) - def add_job(self, coro): '''Queue a job for asynchronous processing.''' - self.jobs.add(asyncio.ensure_future(coro)) + self.jobs.put_nowait(coro) - async def reap_jobs(self): + async def run_jobs(self): + '''Asynchronously run through the job queue.''' while True: - jobs = set() - for job in self.jobs: - if job.done(): - try: - job.result() - except Exception as e: - traceback.print_exc() - else: - jobs.add(job) - self.logger.info('reaped {:d} jobs, {:d} jobs pending' - .format(len(self.jobs) - len(jobs), len(jobs))) - self.jobs = jobs - await asyncio.sleep(5) - - def address_status(self, hash168): - '''Returns status as 32 bytes.''' - status = self.addresses.get(hash168) - if status is None: - history = self.block_processor.get_history(hash168) - status = ''.join('{}:{:d}:'.format(hash_to_str(tx_hash), height) - for tx_hash, height in history) - if status: - status = sha256(status.encode()) - self.addresses[hash168] = status - - return status - - async def get_merkle(self, tx_hash, height): - '''tx_hash is a hex string.''' - block_hash = await self.daemon.send_single('getblockhash', (height,)) - block = await self.daemon.send_single('getblock', (block_hash, True)) - tx_hashes = block['tx'] - # This will throw if the tx_hash is bad - pos = tx_hashes.index(tx_hash) - - idx = pos - hashes = [hex_str_to_hash(txh) for txh in tx_hashes] - merkle_branch = [] - while len(hashes) > 1: - if len(hashes) & 1: - hashes.append(hashes[-1]) - idx = idx - 1 if (idx & 1) else idx + 1 - merkle_branch.append(hash_to_str(hashes[idx])) - idx //= 2 - hashes = [double_sha256(hashes[n] + hashes[n + 1]) - for n in range(0, len(hashes), 2)] - - return {"block_height": height, "merkle": merkle_branch, "pos": pos} - - def get_peers(self): - '''Returns a dictionary of IRC nick to (ip, host, ports) tuples, one - per peer.''' - return self.peers - - def height(self): - return self.block_processor.height - - def get_current_header(self): - return self.block_processor.get_current_header() - - def get_history(self, hash168): - history = self.block_processor.get_history(hash168, limit=None) - return [ - {'tx_hash': hash_to_str(tx_hash), 'height': height} - for tx_hash, height in history - ] + job = await self.jobs.get() + try: + await job + except asyncio.CancelledError: + raise + except Exception: + # Getting here should probably be considered a bug and fixed + traceback.print_exc() diff --git a/server/daemon.py b/server/daemon.py index 96a07bc..5fddde6 100644 --- a/server/daemon.py +++ b/server/daemon.py @@ -1,7 +1,11 @@ -# See the file "LICENSE" for information about the copyright +# Copyright (c) 2016, Neil Booth +# +# All rights reserved. +# +# See the file "LICENCE" for information about the copyright # and warranty status of this software. -'''Classes for handling asynchronous connections to a blockchain +'''Class for handling asynchronous connections to a blockchain daemon.''' import asyncio @@ -67,8 +71,7 @@ class Daemon(LoggedClass): msg = 'daemon still warming up.' secs = 30 else: - msg = '{}'.format(errs) - raise DaemonError(msg) + raise DaemonError(errs) self.logger.error('{}. Sleeping {:d}s and trying again...' .format(msg, secs)) @@ -86,6 +89,28 @@ class Daemon(LoggedClass): # Convert hex string to bytes return [bytes.fromhex(block) for block in blocks] + async def mempool_hashes(self): + '''Return the hashes of the txs in the daemon's mempool.''' + return await self.send_single('getrawmempool') + + async def estimatefee(self, params): + '''Return the fee estimate for the given parameters.''' + return await self.send_single('estimatefee', params) + + async def relayfee(self): + '''The minimum fee a low-priority tx must pay in order to be accepted + to the daemon's memory pool.''' + net_info = await self.send_single('getnetworkinfo') + return net_info['relayfee'] + + async def getrawtransaction(self, hex_hash): + '''Return the serialized raw transaction with the given hash.''' + return await self.send_single('getrawtransaction', (hex_hash, 0)) + + async def sendrawtransaction(self, params): + '''Broadcast a transaction to the network.''' + return await self.send_single('sendrawtransaction', params) + async def height(self): '''Query the daemon for its current height.''' self._height = await self.send_single('getblockcount') diff --git a/server/env.py b/server/env.py index 9dd29eb..21fa8ae 100644 --- a/server/env.py +++ b/server/env.py @@ -1,6 +1,13 @@ -# See the file "LICENSE" for information about the copyright +# Copyright (c) 2016, Neil Booth +# +# All rights reserved. +# +# See the file "LICENCE" for information about the copyright # and warranty status of this software. +'''Class for handling environment configuration and defaults.''' + + from os import environ from lib.coins import Coin @@ -27,6 +34,9 @@ class Env(LoggedClass): # Server stuff self.tcp_port = self.integer('TCP_PORT', None) self.ssl_port = self.integer('SSL_PORT', None) + if self.ssl_port: + self.ssl_certfile = self.required('SSL_CERTFILE') + self.ssl_keyfile = self.required('SSL_KEYFILE') self.rpc_port = self.integer('RPC_PORT', 8000) self.max_subscriptions = self.integer('MAX_SUBSCRIPTIONS', 10000) self.banner_file = self.default('BANNER_FILE', None) diff --git a/server/protocol.py b/server/protocol.py index 04b4dcf..23f8333 100644 --- a/server/protocol.py +++ b/server/protocol.py @@ -1,164 +1,449 @@ -# See the file "LICENSE" for information about the copyright +# Copyright (c) 2016, Neil Booth +# +# All rights reserved. +# +# See the file "LICENCE" for information about the copyright # and warranty status of this software. +'''Classes for local RPC server and remote client TCP/SSL servers.''' + + import asyncio import codecs import json +import struct import traceback from functools import partial +from server.daemon import DaemonError +from lib.hash import sha256, double_sha256, hash_to_str, hex_str_to_hash from lib.util import LoggedClass from server.version import VERSION -class Error(Exception): - BAD_REQUEST = 1 - INTERNAL_ERROR = 2 +class RPCError(Exception): + '''RPC handlers raise this error.''' + + +def json_notification(method, params): + '''Create a json notification.''' + return {'id': None, 'method': method, 'params': params} class JSONRPC(asyncio.Protocol, LoggedClass): + '''Base class that manages a JSONRPC connection.''' + SESSIONS = set() - def __init__(self, controller): + def __init__(self): super().__init__() - self.controller = controller self.parts = [] + self.send_count = 0 + self.send_size = 0 + self.error_count = 0 def connection_made(self, transport): + '''Handle an incoming client connection.''' self.transport = transport - peername = transport.get_extra_info('peername') - self.logger.info('connection from {}'.format(peername)) - self.controller.add_session(self) + self.peername = transport.get_extra_info('peername') + self.logger.info('connection from {}'.format(self.peername)) + self.SESSIONS.add(self) def connection_lost(self, exc): - self.logger.info('disconnected') - self.controller.remove_session(self) + '''Handle client disconnection.''' + self.logger.info('{} disconnected. ' + 'Sent {:,d} bytes in {:,d} messages {:,d} errors' + .format(self.peername, self.send_size, + self.send_count, self.error_count)) + self.SESSIONS.remove(self) def data_received(self, data): + '''Handle incoming data (synchronously). + + Requests end in newline characters. Pass complete requests to + decode_message for handling. + ''' while True: npos = data.find(ord('\n')) if npos == -1: + self.parts.append(data) break tail, data = data[:npos], data[npos + 1:] - parts = self.parts - self.parts = [] + parts, self.parts = self.parts, [] parts.append(tail) self.decode_message(b''.join(parts)) - if data: - self.parts.append(data) - def decode_message(self, message): - '''Message is a binary message.''' + '''Decode a binary message and queue it for asynchronous handling.''' try: message = json.loads(message.decode()) except Exception as e: - self.logger.info('caught exception decoding message'.format(e)) - return - - job = self.request_handler(message) - self.controller.add_job(job) + self.logger.info('error decoding JSON message'.format(e)) + else: + self.ADD_JOB(self.request_handler(message)) async def request_handler(self, request): '''Called asynchronously.''' error = result = None try: - result = await self.json_handler(request) - except Error as e: - error = {'code': e.args[0], 'message': e.args[1]} - except asyncio.CancelledError: - raise - except Exception as e: - # This should be considered a bug and fixed - traceback.print_exc() - error = {'code': Error.INTERNAL_ERROR, 'message': str(e)} - + handler = self.rpc_handler(request.get('method'), + request.get('params', [])) + result = await handler() + except RPCError as e: + self.error_count += 1 + error = {'code': 1, 'message': e.args[0]} payload = {'id': request.get('id'), 'error': error, 'result': result} - try: - data = json.dumps(payload) + '\n' - except TypeError: - msg = 'cannot JSON encode response to request {}'.format(request) - self.logger.error(msg) - error = {'code': Error.INTERNAL_ERROR, 'message': msg} - payload = {'id': request.get('id'), 'error': error, 'result': None} - data = json.dumps(payload) + '\n' - self.transport.write(data.encode()) - - async def json_handler(self, request): - method = request.get('method') + self.json_send(payload) + + def json_send(self, payload): + data = (json.dumps(payload) + '\n').encode() + self.transport.write(data) + self.send_count += 1 + self.send_size += len(data) + + def rpc_handler(self, method, params): handler = None if isinstance(method, str): - handler_name = 'handle_{}'.format(method.replace('.', '_')) - handler = getattr(self, handler_name, None) + handler = self.handlers.get(method) if not handler: self.logger.info('unknown method: {}'.format(method)) - raise Error(Error.BAD_REQUEST, 'unknown method: {}'.format(method)) - params = request.get('params', []) + raise RPCError('unknown method: {}'.format(method)) + if not isinstance(params, list): - raise Error(Error.BAD_REQUEST, 'params should be an array') - return await handler(params) + raise RPCError('params should be an array') + + return partial(handler, self, params) + + @classmethod + def tx_hash_from_param(cls, param): + '''Raise an RPCError if the parameter is not a valid transaction + hash.''' + if isinstance(param, str) and len(param) == 64: + try: + bytes.fromhex(param) + return param + except ValueError: + pass + raise RPCError('parameter should be a transaction hash: {}' + .format(param)) + + @classmethod + def hash168_from_param(cls, param): + if isinstance(param, str): + try: + return cls.COIN.address_to_hash168(param) + except: + pass + raise RPCError('parameter should be a valid address: {}'.format(param)) + + @classmethod + def non_negative_integer_from_param(cls, param): + try: + param = int(param) + except ValueError: + pass + else: + if param >= 0: + return param + + raise RPCError('param should be a non-negative integer: {}' + .format(param)) + + @classmethod + def extract_hash168(cls, params): + if len(params) == 1: + return cls.hash168_from_param(params[0]) + raise RPCError('params should contain a single address: {}' + .format(params)) + + @classmethod + def extract_non_negative_integer(cls, params): + if len(params) == 1: + return cls.non_negative_integer_from_param(params[0]) + raise RPCError('params should contain a non-negative integer: {}' + .format(params)) + + @classmethod + def require_empty_params(cls, params): + if params: + raise RPCError('params should be empty: {}'.format(params)) + + @classmethod + def init(cls, block_processor, daemon, coin, add_job): + cls.BLOCK_PROCESSOR = block_processor + cls.DAEMON = daemon + cls.COIN = coin + cls.ADD_JOB = add_job + + @classmethod + def height(cls): + '''Return the current height.''' + return cls.BLOCK_PROCESSOR.height + + @classmethod + def electrum_header(cls, height=None): + '''Return the binary header at the given height.''' + if not 0 <= height <= cls.height(): + raise RPCError('height {:,d} out of range'.format(height)) + header = cls.BLOCK_PROCESSOR.read_headers(height, 1) + return cls.COIN.electrum_header(header, height) + + @classmethod + def current_electrum_header(cls): + '''Used as response to a headers subscription request.''' + return cls.electrum_header(cls.height()) class ElectrumX(JSONRPC): '''A TCP server that handles incoming Electrum connections.''' - def __init__(self, controller, daemon, env): - super().__init__(controller) - self.daemon = daemon + def __init__(self, env): + super().__init__() self.env = env - self.addresses = set() + self.hash168s = set() self.subscribe_headers = False + self.subscribe_height = False + self.notified_height = None + rpcs = [ + ('blockchain', + 'address.get_balance address.get_history address.get_mempool ' + 'address.get_proof address.listunspent address.subscribe ' + 'block.get_header block.get_chunk estimatefee headers.subscribe ' + 'numblocks.subscribe relayfee transaction.broadcast ' + 'transaction.get transaction.get_merkle utxo.get_address'), + ('server', + 'banner donation_address peers.subscribe version'), + ] + self.handlers = {'.'.join([prefix, suffix]): + getattr(self.__class__, suffix.replace('.', '_')) + for prefix, suffixes in rpcs + for suffix in suffixes.split()} + + @classmethod + def watched_address_count(cls): + return sum(len(session.hash168s) for session in self.SESSIONS + if isinstance(session, cls)) + + @classmethod + def notify(cls, height, touched): + '''Notify electrum clients about height changes and touched + addresses.''' + headers_payload = json_notification( + 'blockchain.headers.subscribe', + (cls.electrum_header(height), ), + ) + height_payload = json_notification( + 'blockchain.numblocks.subscribe', + (height, ), + ) + hash168_to_address = cls.COIN.hash168_to_address + + for session in cls.SESSIONS: + if height != session.notified_height: + session.notified_height = height + if session.subscribe_headers: + session.json_send(headers_payload) + if session.subscribe_height: + session.json_send(height_payload) + + for hash168 in session.hash168s.intersection(touched): + address = hash168_to_address(hash168) + status = cls.address_status(hash168) + payload = json_notification('blockchain.address.subscribe', + (address, status)) + session.json_send(payload) + + @classmethod + def address_status(cls, hash168): + '''Returns status as 32 bytes.''' + history = cls.BLOCK_PROCESSOR.get_history(hash168) + status = ''.join('{}:{:d}:'.format(hash_to_str(tx_hash), height) + for tx_hash, height in history) + if status: + return sha256(status.encode()).hex() + return None + + @classmethod + async def tx_merkle(cls, tx_hash, height): + '''tx_hash is a hex string.''' + block_hash = await cls.DAEMON.send_single('getblockhash', (height,)) + block = await cls.DAEMON.send_single('getblock', (block_hash, True)) + tx_hashes = block['tx'] + # This will throw if the tx_hash is bad + pos = tx_hashes.index(tx_hash) + + idx = pos + hashes = [hex_str_to_hash(txh) for txh in tx_hashes] + merkle_branch = [] + while len(hashes) > 1: + if len(hashes) & 1: + hashes.append(hashes[-1]) + idx = idx - 1 if (idx & 1) else idx + 1 + merkle_branch.append(hash_to_str(hashes[idx])) + idx //= 2 + hashes = [double_sha256(hashes[n] + hashes[n + 1]) + for n in range(0, len(hashes), 2)] + + return {"block_height": height, "merkle": merkle_branch, "pos": pos} + + @classmethod + def irc_peers(cls): + '''Returns a dictionary of IRC nick to (ip, host, ports) tuples, one + per peer.''' + return {} + + @classmethod + def height(cls): + return cls.BLOCK_PROCESSOR.height + + @classmethod + def get_history(cls, hash168): + history = cls.BLOCK_PROCESSOR.get_history(hash168, limit=None) + return [ + {'tx_hash': hash_to_str(tx_hash), 'height': height} + for tx_hash, height in history + ] + + @classmethod + def get_chunk(cls, index): + '''Return header chunk as hex. Index is a non-negative integer.''' + chunk_size = cls.COIN.CHUNK_SIZE + next_height = cls.height() + 1 + start_height = min(index * chunk_size, next_height) + count = min(next_height - start_height, chunk_size) + return cls.BLOCK_PROCESSOR.read_headers(start_height, count).hex() + + @classmethod + def get_balance(cls, hash168): + confirmed = cls.BLOCK_PROCESSOR.get_balance(hash168) + unconfirmed = -1 # FIXME + return {'confirmed': confirmed, 'unconfirmed': unconfirmed} + + @classmethod + def list_unspent(cls, hash168): + utxos = cls.BLOCK_PROCESSOR.get_utxos_sorted(hash168) + return tuple({'tx_hash': hash_to_str(utxo.tx_hash), + 'tx_pos': utxo.tx_pos, 'height': utxo.height, + 'value': utxo.value} + for utxo in utxos) + + # --- blockchain commands + + async def address_get_balance(self, params): + hash168 = self.extract_hash168(params) + return self.get_balance(hash168) + + async def address_get_history(self, params): + hash168 = self.extract_hash168(params) + return self.get_history(hash168) + + async def address_get_mempool(self, params): + hash168 = self.extract_hash168(params) + raise RPCError('get_mempool is not yet implemented') + + async def address_get_proof(self, params): + hash168 = self.extract_hash168(params) + raise RPCError('get_proof is not yet implemented') + + async def address_listunspent(self, params): + hash168 = self.extract_hash168(params) + return self.list_unspent(hash168) + + async def address_subscribe(self, params): + hash168 = self.extract_hash168(params) + self.hash168s.add(hash168) + return self.address_status(hash168) + + async def block_get_chunk(self, params): + index = self.extract_non_negative_integer(params) + return self.get_chunk(index) + + async def block_get_header(self, params): + height = self.extract_non_negative_integer(params) + return self.electrum_header(height) + + async def estimatefee(self, params): + return await self.DAEMON.estimatefee(params) + + async def headers_subscribe(self, params): + self.require_empty_params(params) + self.subscribe_headers = True + return self.current_electrum_header() - def params_to_hash168(self, params): - if len(params) != 1: - raise Error(Error.BAD_REQUEST, - 'params should contain a single address') - address = params[0] - try: - return self.env.coin.address_to_hash168(address) - except: - raise Error(Error.BAD_REQUEST, - 'invalid address: {}'.format(address)) - - async def handle_blockchain_address_get_history(self, params): - hash168 = self.params_to_hash168(params) - return self.controller.get_history(hash168) - - async def handle_blockchain_address_subscribe(self, params): - hash168 = self.params_to_hash168(params) - status = self.controller.address_status(hash168) - return status.hex() if status else None + async def numblocks_subscribe(self, params): + self.require_empty_params(params) + self.subscribe_height = True + return self.height() - async def handle_blockchain_estimatefee(self, params): - result = await self.daemon.send_single('estimatefee', params) - return result + async def relayfee(self, params): + '''The minimum fee a low-priority tx must pay in order to be accepted + to the daemon's memory pool.''' + self.require_empty_params(params) + return await self.DAEMON.relayfee() - async def handle_blockchain_headers_subscribe(self, params): - self.subscribe_headers = True - return self.controller.get_current_header() + async def transaction_broadcast(self, params): + '''Pass through the parameters to the daemon. - async def handle_blockchain_relayfee(self, params): - '''The minimum fee a low-priority tx must pay in order to be accepted - to this daemon's memory pool. + An ugly API: current Electrum clients only pass the raw + transaction in hex and expect error messages to be returned in + the result field. And the server shouldn't be doing the client's + user interface job here. ''' - net_info = await self.daemon.send_single('getnetworkinfo') - return net_info['relayfee'] - - async def handle_blockchain_transaction_get(self, params): - if len(params) != 1: - raise Error(Error.BAD_REQUEST, - 'params should contain a transaction hash') - tx_hash = params[0] - return await self.daemon.send_single('getrawtransaction', (tx_hash, 0)) - - async def handle_blockchain_transaction_get_merkle(self, params): - if len(params) != 2: - raise Error(Error.BAD_REQUEST, - 'params should contain a transaction hash and height') - tx_hash, height = params - return await self.controller.get_merkle(tx_hash, height) - - async def handle_server_banner(self, params): + try: + tx_hash = await self.DAEMON.sendrawtransaction(params) + self.logger.info('sent tx: {}'.format(tx_hash)) + return tx_hash + except DaemonError as e: + errors = e.args[0] + error = errors[0] + message = error['message'] + self.logger.info('sendrawtransaction: {}'.format(message)) + if 'non-mandatory-script-verify-flag' in message: + return ( + 'Your client produced a transaction that is not accepted ' + 'by the network any more. Please upgrade to Electrum ' + '2.5.1 or newer.' + ) + + return ( + 'The transaction was rejected by network rules. ({})\n[{}]' + .format(message, params[0]) + ) + + async def transaction_get(self, params): + '''Return the serialized raw transaction.''' + # For some reason Electrum passes a height. Don't require it + # in anticipation it might be dropped in the future. + if 1 <= len(params) <= 2: + tx_hash = self.tx_hash_from_param(params[0]) + return await self.DAEMON.getrawtransaction(tx_hash) + + raise RPCError('params wrong length: {}'.format(params)) + + async def transaction_get_merkle(self, params): + if len(params) == 2: + tx_hash = self.tx_hash_from_param(params[0]) + height = self.non_negative_integer_from_param(params[1]) + return await self.tx_merkle(tx_hash, height) + + raise RPCError('params should contain a transaction hash and height') + + async def utxo_get_address(self, params): + if len(params) == 2: + tx_hash = self.tx_hash_from_param(params[0]) + index = self.non_negative_integer_from_param(params[1]) + tx_hash = hex_str_to_hash(tx_hash) + hash168 = self.BLOCK_PROCESSOR.get_utxo_hash168(tx_hash, index) + if hash168: + return self.COIN.hash168_to_address(hash168) + return None + + raise RPCError('params should contain a transaction hash and index') + + # --- server commands + + async def banner(self, params): '''Return the server banner.''' + self.require_empty_params(params) banner = 'Welcome to Electrum!' if self.env.banner_file: try: @@ -169,23 +454,25 @@ class ElectrumX(JSONRPC): .format(self.env.banner_file, e)) return banner - async def handle_server_donation_address(self, params): + async def donation_address(self, params): '''Return the donation address as a string. If none is specified return the empty string. ''' + self.require_empty_params(params) return self.env.donation_address - async def handle_server_peers_subscribe(self, params): + async def peers_subscribe(self, params): '''Returns the peer (ip, host, ports) tuples. Despite the name electrum-server does not treat this as a subscription. ''' - peers = self.controller.get_peers() + self.require_empty_params(params) + peers = ElectrumX.irc_peers() return tuple(peers.values()) - async def handle_server_version(self, params): + async def version(self, params): '''Return the server version as a string.''' return VERSION @@ -193,24 +480,28 @@ class ElectrumX(JSONRPC): class LocalRPC(JSONRPC): '''A local TCP RPC server for querying status.''' - async def handle_getinfo(self, params): + def __init__(self): + super().__init__() + cmds = 'getinfo sessions numsessions peers numpeers'.split() + self.handlers = {cmd: getattr(self.__class__, cmd) for cmd in cmds} + + async def getinfo(self, params): return { - 'blocks': self.controller.height(), - 'peers': len(self.controller.get_peers()), - 'sessions': len(self.controller.sessions), - 'watched': sum(len(s.addresses) for s in self.controller.sessions - if isinstance(s, ElectrumX)), + 'blocks': self.height(), + 'peers': len(ElectrumX.irc_peers()), + 'sessions': len(self.SESSIONS), + 'watched': ElectrumX.watched_address_count(), 'cached': 0, } - async def handle_sessions(self, params): + async def sessions(self, params): return [] - async def handle_numsessions(self, params): - return len(self.controller.sessions) + async def numsessions(self, params): + return len(self.SESSIONS) - async def handle_peers(self, params): - return tuple(self.controller.get_peers().keys()) + async def peers(self, params): + return tuple(ElectrumX.irc_peers().keys()) - async def handle_numpeers(self, params): - return len(self.controller.get_peers()) + async def numpeers(self, params): + return len(ElectrumX.irc_peers()) diff --git a/server/storage.py b/server/storage.py index 13b1847..d4557d4 100644 --- a/server/storage.py +++ b/server/storage.py @@ -1,77 +1,112 @@ +# Copyright (c) 2016, the ElectrumX authors +# +# All rights reserved. +# +# See the file "LICENCE" for information about the copyright +# and warranty status of this software. + +'''Backend database abstraction. + +The abstraction needs to be improved to not heavily penalise LMDB. +''' + import os +from functools import partial + +from lib.util import subclasses + + +def open_db(name, db_engine): + '''Returns a database handle.''' + for db_class in subclasses(Storage): + if db_class.__name__.lower() == db_engine.lower(): + db_class.import_module() + return db_class(name) + + raise RuntimeError('unrecognised DB engine "{}"'.format(db_engine)) class Storage(object): - def __init__(self, name, create_if_missing=False, error_if_exists=False, compression=None): - if not create_if_missing and not os.path.exists(name): - raise NoDatabaseException + '''Abstract base class of the DB backend abstraction.''' + + def __init__(self, name): + self.is_new = not os.path.exists(name) + self.open(name, create=self.is_new) + + @classmethod + def import_module(cls): + '''Import the DB engine module.''' + raise NotImplementedError + + def open(self, name, create): + '''Open an existing database or create a new one.''' + raise NotImplementedError def get(self, key): - raise NotImplementedError() + raise NotImplementedError def put(self, key, value): - raise NotImplementedError() + raise NotImplementedError def write_batch(self): - """ - Returns a context manager that provides `put` and `delete`. - Changes should only be committed when the context manager closes without an exception. - """ - raise NotImplementedError() + '''Return a context manager that provides `put` and `delete`. - def iterator(self, prefix=b""): - """ - Returns an iterator that yields (key, value) pairs from the database sorted by key. - If `prefix` is set, only keys starting with `prefix` will be included. - """ - raise NotImplementedError() + Changes should only be committed when the context manager + closes without an exception. + ''' + raise NotImplementedError + def iterator(self, prefix=b'', reverse=False): + '''Return an iterator that yields (key, value) pairs from the + database sorted by key. -class NoDatabaseException(Exception): - pass + If `prefix` is set, only keys starting with `prefix` will be + included. If `reverse` is True the items are returned in + reverse order. + ''' + raise NotImplementedError class LevelDB(Storage): - def __init__(self, name, create_if_missing=False, error_if_exists=False, compression=None): - super().__init__(name, create_if_missing, error_if_exists, compression) - import plyvel - self.db = plyvel.DB(name, create_if_missing=create_if_missing, - error_if_exists=error_if_exists, compression=compression) + '''LevelDB database engine.''' - def get(self, key): - return self.db.get(key) - - def write_batch(self): - return self.db.write_batch(transaction=True) - - def iterator(self, prefix=b""): - return self.db.iterator(prefix=prefix) + @classmethod + def import_module(cls): + import plyvel + cls.module = plyvel - def put(self, key, value): - self.db.put(key, value) + def open(self, name, create): + self.db = self.module.DB(name, create_if_missing=create, + compression=None) + self.get = self.db.get + self.put = self.db.put + self.iterator = self.db.iterator + self.write_batch = partial(self.db.write_batch, transaction=True) class RocksDB(Storage): - rocksdb = None + '''RocksDB database engine.''' - def __init__(self, name, create_if_missing=False, error_if_exists=False, compression=None): - super().__init__(name, create_if_missing, error_if_exists, compression) + @classmethod + def import_module(cls): import rocksdb - RocksDB.rocksdb = rocksdb - if not compression: - compression = "no" - compression = getattr(rocksdb.CompressionType, compression + "_compression") - self.db = rocksdb.DB(name, rocksdb.Options(create_if_missing=create_if_missing, - compression=compression, - target_file_size_base=33554432, - max_open_files=1024)) - - def get(self, key): - return self.db.get(key) + cls.module = rocksdb + + def open(self, name, create): + compression = "no" + compression = getattr(self.module.CompressionType, + compression + "_compression") + options = self.module.Options(create_if_missing=create, + compression=compression, + target_file_size_base=33554432, + max_open_files=1024) + self.db = self.module.DB(name, options) + self.get = self.db.get + self.put = self.db.put class WriteBatch(object): def __init__(self, db): - self.batch = RocksDB.rocksdb.WriteBatch() + self.batch = RocksDB.module.WriteBatch() self.db = db def __enter__(self): @@ -85,8 +120,10 @@ class RocksDB(Storage): return RocksDB.WriteBatch(self.db) class Iterator(object): - def __init__(self, db, prefix): + def __init__(self, db, prefix, reverse): self.it = db.iteritems() + if reverse: + self.it = reversed(self.it) self.prefix = prefix def __iter__(self): @@ -100,22 +137,22 @@ class RocksDB(Storage): raise StopIteration return k, v - def iterator(self, prefix=b""): - return RocksDB.Iterator(self.db, prefix) - - def put(self, key, value): - return self.db.put(key, value) + def iterator(self, prefix=b'', reverse=False): + return RocksDB.Iterator(self.db, prefix, reverse) class LMDB(Storage): - lmdb = None + '''RocksDB database engine.''' - def __init__(self, name, create_if_missing=False, error_if_exists=False, compression=None): - super().__init__(name, create_if_missing, error_if_exists, compression) + @classmethod + def import_module(cls): import lmdb - LMDB.lmdb = lmdb - self.env = lmdb.Environment(".", subdir=True, create=create_if_missing, max_dbs=32, map_size=5 * 10 ** 10) - self.db = self.env.open_db(create=create_if_missing) + cls.module = lmdb + + def open(self, name, create): + self.env = cls.module.Environment('.', subdir=True, create=create, + max_dbs=32, map_size=5 * 10 ** 10) + self.db = self.env.open_db(create=create) def get(self, key): with self.env.begin(db=self.db) as tx: @@ -128,15 +165,16 @@ class LMDB(Storage): def write_batch(self): return self.env.begin(db=self.db, write=True) - def iterator(self, prefix=b""): - return LMDB.lmdb.Iterator(self.db, self.env, prefix) + def iterator(self, prefix=b'', reverse=False): + return LMDB.Iterator(self.db, self.env, prefix, reverse) class Iterator: - def __init__(self, db, env, prefix): + def __init__(self, db, env, prefix, reverse): self.transaction = env.begin(db=db) self.transaction.__enter__() self.db = db self.prefix = prefix + self.reverse = reverse # FIXME def __iter__(self): self.iterator = LMDB.lmdb.Cursor(self.db, self.transaction) diff --git a/server/version.py b/server/version.py index c83fa77..285207d 100644 --- a/server/version.py +++ b/server/version.py @@ -1 +1 @@ -VERSION = "ElectrumX 0.03" +VERSION = "ElectrumX 0.04" diff --git a/server_main.py b/server_main.py index 3a82c48..60948fa 100755 --- a/server_main.py +++ b/server_main.py @@ -1,8 +1,15 @@ #!/usr/bin/env python3 - -# See the file "LICENSE" for information about the copyright +# +# Copyright (c) 2016, Neil Booth +# +# All rights reserved. +# +# See the file "LICENCE" for information about the copyright # and warranty status of this software. +'''Script to kick off the server.''' + + import asyncio import logging import os