diff --git a/docs/ARCHITECTURE.rst b/docs/ARCHITECTURE.rst index 3af6c99..3bd8f5e 100644 --- a/docs/ARCHITECTURE.rst +++ b/docs/ARCHITECTURE.rst @@ -90,10 +90,3 @@ IRC Not currently imlpemented; will handle IRC communication for the ElectrumX servers. - -Controller ----------- - -A historical artefact that currently coordinates some of the above -components. Not pictured as it is doesn't seem to have a logical -place and so is probably going away. diff --git a/electrumx_server.py b/electrumx_server.py index 9429883..94d65c8 100755 --- a/electrumx_server.py +++ b/electrumx_server.py @@ -17,11 +17,11 @@ import traceback from functools import partial from server.env import Env -from server.controller import Controller +from server.block_processor import BlockServer -def cancel_tasks(loop): - # Cancel and collect the remaining tasks +def close_loop(loop): + '''Close the loop down cleanly. Cancel and collect remaining tasks.''' tasks = asyncio.Task.all_tasks() for task in tasks: task.cancel() @@ -31,41 +31,36 @@ def cancel_tasks(loop): except asyncio.CancelledError: pass + loop.close() + def main_loop(): - '''Get tasks; loop until complete.''' + '''Start the server.''' if os.geteuid() == 0: raise Exception('DO NOT RUN AS ROOT! Create an unpriveleged user ' 'account and use that') - env = Env() - logging.info('switching current directory to {}'.format(env.db_dir)) - os.chdir(env.db_dir) - loop = asyncio.get_event_loop() #loop.set_debug(True) - controller = Controller(loop, env) - - # Signal handlers def on_signal(signame): '''Call on receipt of a signal to cleanly shutdown.''' logging.warning('received {} signal, preparing to shut down' .format(signame)) loop.stop() + # Install signal handlers for signame in ('SIGINT', 'SIGTERM'): loop.add_signal_handler(getattr(signal, signame), partial(on_signal, signame)) - controller.start() + server = BlockServer(Env()) + server.start() try: loop.run_forever() finally: - controller.stop() - cancel_tasks(loop) - - loop.close() + server.stop() + close_loop(loop) def main(): diff --git a/query.py b/query.py index faea87a..8fe7c94 100755 --- a/query.py +++ b/query.py @@ -13,11 +13,10 @@ Not currently documented; might become easier to use in future. ''' -import os import sys from server.env import Env -from server.block_processor import BlockProcessor +from server.DB import DB from lib.hash import hash_to_str @@ -40,9 +39,8 @@ def count_entries(db): def main(): env = Env() + bp = DB(env) coin = env.coin - os.chdir(env.db_dir) - bp = BlockProcessor(env, None) if len(sys.argv) == 1: count_entries(bp.db) return diff --git a/server/block_processor.py b/server/block_processor.py index 6116924..0309491 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -11,6 +11,7 @@ import array import ast import asyncio +import ssl import struct import time from bisect import bisect_left @@ -18,10 +19,12 @@ from collections import defaultdict, namedtuple from functools import partial from server.cache import FSCache, UTXOCache, NO_CACHE_ENTRY -from server.daemon import DaemonError +from server.daemon import Daemon, DaemonError +from server.protocol import ElectrumX, LocalRPC, JSONRPC from lib.hash import hash_to_str from lib.tx import Deserializer from lib.util import chunks, LoggedClass +import server.db from server.storage import open_db @@ -33,9 +36,6 @@ def formatted_time(t): t // 86400, (t % 86400) // 3600, (t % 3600) // 60, t % 60) -UTXO = namedtuple("UTXO", "tx_num tx_pos tx_hash height value") - - class ChainError(Exception): pass @@ -78,7 +78,11 @@ class Prefetcher(LoggedClass): else: return blocks, None - async def start(self): + def start(self): + '''Start the prefetcher.''' + asyncio.ensure_future(self.main_loop()) + + async def main_loop(self): '''Loop forever polling for more blocks.''' self.logger.info('starting daemon poll loop...') while True: @@ -283,21 +287,20 @@ class MemPool(LoggedClass): return value -class BlockProcessor(LoggedClass): +class BlockProcessor(server.db.DB): '''Process blocks and update the DB state to match. Employ a prefetcher to prefetch blocks in batches for processing. Coordinate backing up in case of chain reorganisations. ''' - def __init__(self, env, daemon, on_update=None): + def __init__(self, env): '''on_update is awaitable, and called only when caught up with the - daemon and a new block arrives or the mempool is updated. - ''' - super().__init__() + daemon and a new block arrives or the mempool is updated.''' + super().__init__(env) - self.daemon = daemon - self.on_update = on_update + self.daemon = Daemon(env.daemon_url, env.debug) + self.daemon.debug_set_height(self.height) self.mempool = MemPool(self) self.touched = set() @@ -305,39 +308,16 @@ class BlockProcessor(LoggedClass): self.utxo_MB = env.utxo_MB self.hist_MB = env.hist_MB self.next_cache_check = 0 - self.coin = env.coin self.reorg_limit = env.reorg_limit - # Open DB and metadata files. Record some of its state. - db_name = '{}-{}'.format(self.coin.NAME, self.coin.NET) - self.db = open_db(db_name, env.db_engine) - if self.db.is_new: - self.logger.info('created new {} database {}' - .format(env.db_engine, db_name)) - else: - self.logger.info('successfully opened {} database {}' - .format(env.db_engine, db_name)) - - self.init_state() - self.tx_count = self.db_tx_count - self.height = self.db_height - self.tip = self.db_tip - - # Caches to be flushed later. Headers and tx_hashes have one - # entry per block + # Headers and tx_hashes have one entry per block self.history = defaultdict(partial(array.array, 'I')) self.history_size = 0 - self.utxo_cache = UTXOCache(self, self.db, self.coin) - self.fs_cache = FSCache(self.coin, self.height, self.tx_count) - self.prefetcher = Prefetcher(daemon, self.height) + self.prefetcher = Prefetcher(self.daemon, self.height) self.last_flush = time.time() self.last_flush_tx_count = self.tx_count - # Redirected member funcs - self.get_tx_hash = self.fs_cache.get_tx_hash - self.read_headers = self.fs_cache.read_headers - # Log state self.logger.info('{}/{} height: {:,d} tx count: {:,d} ' 'flush count: {:,d} utxo flush count: {:,d} ' @@ -355,12 +335,13 @@ class BlockProcessor(LoggedClass): self.clean_db() - def coros(self): - self.daemon.debug_set_height(self.height) - return [self.start(), self.prefetcher.start()] + def start(self): + '''Start the block processor.''' + asyncio.ensure_future(self.main_loop()) + self.prefetcher.start() - async def start(self): - '''External entry point for block processing. + async def main_loop(self): + '''Main loop for block processing. Safely flushes the DB on clean shutdown. ''' @@ -385,6 +366,7 @@ class BlockProcessor(LoggedClass): await asyncio.sleep(0) # Yield if caught_up: await self.caught_up(mempool_hashes) + self.touched = set() except ChainReorg: await self.handle_chain_reorg() @@ -396,10 +378,7 @@ class BlockProcessor(LoggedClass): if self.first_sync: self.first_sync = False self.logger.info('synced to height {:,d}'.format(self.height)) - if self.on_update: - self.touched.update(await self.mempool.update(mempool_hashes)) - await self.on_update(self.height, self.touched) - self.touched = set() + self.touched.update(await self.mempool.update(mempool_hashes)) async def handle_chain_reorg(self): # First get all state on disk @@ -451,30 +430,6 @@ class BlockProcessor(LoggedClass): return self.fs_cache.block_hashes(start, count) - def init_state(self): - if self.db.is_new: - self.db_height = -1 - self.db_tx_count = 0 - self.db_tip = b'\0' * 32 - self.flush_count = 0 - self.utxo_flush_count = 0 - self.wall_time = 0 - self.first_sync = True - else: - state = self.db.get(b'state') - state = ast.literal_eval(state.decode()) - if state['genesis'] != self.coin.GENESIS_HASH: - raise ChainError('DB genesis hash {} does not match coin {}' - .format(state['genesis_hash'], - self.coin.GENESIS_HASH)) - self.db_height = state['height'] - self.db_tx_count = state['tx_count'] - self.db_tip = state['tip'] - self.flush_count = state['flush_count'] - self.utxo_flush_count = state['utxo_flush_count'] - self.wall_time = state['wall_time'] - self.first_sync = state.get('first_sync', True) - def clean_db(self): '''Clean out stale DB items. @@ -839,13 +794,6 @@ class BlockProcessor(LoggedClass): assert n == 0 self.tx_count -= len(txs) - @staticmethod - def resolve_limit(limit): - if limit is None: - return -1 - assert isinstance(limit, int) and limit >= 0 - return limit - def mempool_transactions(self, hash168): '''Generate (hex_hash, tx_fee, unconfirmed) tuples for mempool entries for the hash168. @@ -861,59 +809,59 @@ class BlockProcessor(LoggedClass): ''' return self.mempool.value(hash168) - def get_history(self, hash168, limit=1000): - '''Generator that returns an unpruned, sorted list of (tx_hash, - height) tuples of confirmed transactions that touched the address, - earliest in the blockchain first. Includes both spending and - receiving transactions. By default yields at most 1000 entries. - Set limit to None to get them all. - ''' - limit = self.resolve_limit(limit) - prefix = b'H' + hash168 - for key, hist in self.db.iterator(prefix=prefix): - a = array.array('I') - a.frombytes(hist) - for tx_num in a: - if limit == 0: - return - yield self.get_tx_hash(tx_num) - limit -= 1 - - def get_balance(self, hash168): - '''Returns the confirmed balance of an address.''' - return sum(utxo.value for utxo in self.get_utxos(hash168, limit=None)) - - def get_utxos(self, hash168, limit=1000): - '''Generator that yields all UTXOs for an address sorted in no - particular order. By default yields at most 1000 entries. - Set limit to None to get them all. + +class BlockServer(BlockProcessor): + '''Like BlockProcessor but also starts servers when caught up.''' + + def __init__(self, env): + '''on_update is awaitable, and called only when caught up with the + daemon and a new block arrives or the mempool is updated.''' + super().__init__(env) + self.servers = [] + + async def caught_up(self, mempool_hashes): + await super().caught_up(mempool_hashes) + if not self.servers: + await self.start_servers() + ElectrumX.notify(self.height, self.touched) + + async def start_servers(self): + '''Start listening on RPC, TCP and SSL ports. + + Does not start a server if the port wasn't specified. ''' - limit = self.resolve_limit(limit) - unpack = struct.unpack - prefix = b'u' + hash168 - for k, v in self.db.iterator(prefix=prefix): - (tx_pos,) = unpack('= 3.5.3 + ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2) + ssl_context.load_cert_chain(env.ssl_certfile, + keyfile=env.ssl_keyfile) + ssl_server = loop.create_server(protocol, env.host, env.ssl_port, + ssl=ssl_context) + self.servers.append(await ssl_server) + self.logger.info('SSL server listening on {}:{:d}' + .format(env.host, env.ssl_port)) + + def stop(self): + '''Close the listening servers.''' + for server in self.servers: + server.close() diff --git a/server/cache.py b/server/cache.py index 5f27662..f06eb49 100644 --- a/server/cache.py +++ b/server/cache.py @@ -83,9 +83,9 @@ class UTXOCache(LoggedClass): ''' - def __init__(self, parent, db, coin): + def __init__(self, get_tx_hash, db, coin): super().__init__() - self.parent = parent + self.get_tx_hash = get_tx_hash self.coin = coin self.cache = {} self.put = self.cache.__setitem__ @@ -137,7 +137,7 @@ class UTXOCache(LoggedClass): assert len(data) % 12 == 0 for n in range(0, len(data), 12): (tx_num, ) = struct.unpack('= 3.5.3 - ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2) - ssl_context.load_cert_chain(env.ssl_certfile, - keyfile=env.ssl_keyfile) - ssl_server = loop.create_server(protocol, env.host, env.ssl_port, - ssl=ssl_context) - servers.append(await ssl_server) - self.logger.info('SSL server listening on {}:{:d}' - .format(env.host, env.ssl_port)) - - return servers - - def stop(self): - '''Close the listening servers.''' - for server in self.servers: - server.close() diff --git a/server/db.py b/server/db.py new file mode 100644 index 0000000..0256ff4 --- /dev/null +++ b/server/db.py @@ -0,0 +1,148 @@ +# Copyright (c) 2016, Neil Booth +# +# All rights reserved. +# +# See the file "LICENCE" for information about the copyright +# and warranty status of this software. + +'''Interface to the blockchain database.''' + +import array +import ast +import os +import struct +from collections import namedtuple + +from server.cache import FSCache, UTXOCache, NO_CACHE_ENTRY +from lib.util import LoggedClass +from server.storage import open_db + +UTXO = namedtuple("UTXO", "tx_num tx_pos tx_hash height value") + + +class DB(LoggedClass): + '''Simple wrapper of the backend database for querying. + + Performs no DB update, though the DB will be cleaned on opening if + it was shutdown uncleanly. + ''' + + def __init__(self, env): + super().__init__() + self.env = env + self.coin = env.coin + + self.logger.info('switching current directory to {}' + .format(env.db_dir)) + os.chdir(env.db_dir) + + # Open DB and metadata files. Record some of its state. + db_name = '{}-{}'.format(self.coin.NAME, self.coin.NET) + self.db = open_db(db_name, env.db_engine) + if self.db.is_new: + self.logger.info('created new {} database {}' + .format(env.db_engine, db_name)) + else: + self.logger.info('successfully opened {} database {}' + .format(env.db_engine, db_name)) + + self.init_state_from_db() + self.tx_count = self.db_tx_count + self.height = self.db_height + self.tip = self.db_tip + + # Cache wrapping the filesystem and redirected functions + self.fs_cache = FSCache(self.coin, self.height, self.tx_count) + self.get_tx_hash = self.fs_cache.get_tx_hash + self.read_headers = self.fs_cache.read_headers + + # UTXO cache + self.utxo_cache = UTXOCache(self.get_tx_hash, self.db, self.coin) + + def init_state_from_db(self): + if self.db.is_new: + self.db_height = -1 + self.db_tx_count = 0 + self.db_tip = b'\0' * 32 + self.flush_count = 0 + self.utxo_flush_count = 0 + self.wall_time = 0 + self.first_sync = True + else: + state = self.db.get(b'state') + state = ast.literal_eval(state.decode()) + if state['genesis'] != self.coin.GENESIS_HASH: + raise ChainError('DB genesis hash {} does not match coin {}' + .format(state['genesis_hash'], + self.coin.GENESIS_HASH)) + self.db_height = state['height'] + self.db_tx_count = state['tx_count'] + self.db_tip = state['tip'] + self.flush_count = state['flush_count'] + self.utxo_flush_count = state['utxo_flush_count'] + self.wall_time = state['wall_time'] + self.first_sync = state.get('first_sync', True) + + @staticmethod + def _resolve_limit(limit): + if limit is None: + return -1 + assert isinstance(limit, int) and limit >= 0 + return limit + + def get_history(self, hash168, limit=1000): + '''Generator that returns an unpruned, sorted list of (tx_hash, + height) tuples of confirmed transactions that touched the address, + earliest in the blockchain first. Includes both spending and + receiving transactions. By default yields at most 1000 entries. + Set limit to None to get them all. + ''' + limit = self._resolve_limit(limit) + prefix = b'H' + hash168 + for key, hist in self.db.iterator(prefix=prefix): + a = array.array('I') + a.frombytes(hist) + for tx_num in a: + if limit == 0: + return + yield self.get_tx_hash(tx_num) + limit -= 1 + + def get_balance(self, hash168): + '''Returns the confirmed balance of an address.''' + return sum(utxo.value for utxo in self.get_utxos(hash168, limit=None)) + + def get_utxos(self, hash168, limit=1000): + '''Generator that yields all UTXOs for an address sorted in no + particular order. By default yields at most 1000 entries. + Set limit to None to get them all. + ''' + limit = self._resolve_limit(limit) + unpack = struct.unpack + prefix = b'u' + hash168 + for k, v in self.db.iterator(prefix=prefix): + (tx_pos,) = unpack('