From c0a112f8ea3a9d5bc790fdc149f90b91b62dcfe3 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Tue, 8 Nov 2016 07:32:55 +0900 Subject: [PATCH 1/4] Split out part of block processor into db.py The part that doesn't actually do any block processing... --- server/block_processor.py | 124 ++------------------------------- server/cache.py | 8 +-- server/db.py | 142 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 151 insertions(+), 123 deletions(-) create mode 100644 server/db.py diff --git a/server/block_processor.py b/server/block_processor.py index 6116924..1412cbc 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -22,6 +22,7 @@ from server.daemon import DaemonError from lib.hash import hash_to_str from lib.tx import Deserializer from lib.util import chunks, LoggedClass +import server.db from server.storage import open_db @@ -33,9 +34,6 @@ def formatted_time(t): t // 86400, (t % 86400) // 3600, (t % 3600) // 60, t % 60) -UTXO = namedtuple("UTXO", "tx_num tx_pos tx_hash height value") - - class ChainError(Exception): pass @@ -283,7 +281,7 @@ class MemPool(LoggedClass): return value -class BlockProcessor(LoggedClass): +class BlockProcessor(server.db.DB): '''Process blocks and update the DB state to match. Employ a prefetcher to prefetch blocks in batches for processing. @@ -292,9 +290,8 @@ class BlockProcessor(LoggedClass): def __init__(self, env, daemon, on_update=None): '''on_update is awaitable, and called only when caught up with the - daemon and a new block arrives or the mempool is updated. - ''' - super().__init__() + daemon and a new block arrives or the mempool is updated.''' + super().__init__(env.coin, env.db_engine) self.daemon = daemon self.on_update = on_update @@ -305,39 +302,16 @@ class BlockProcessor(LoggedClass): self.utxo_MB = env.utxo_MB self.hist_MB = env.hist_MB self.next_cache_check = 0 - self.coin = env.coin self.reorg_limit = env.reorg_limit - # Open DB and metadata files. Record some of its state. - db_name = '{}-{}'.format(self.coin.NAME, self.coin.NET) - self.db = open_db(db_name, env.db_engine) - if self.db.is_new: - self.logger.info('created new {} database {}' - .format(env.db_engine, db_name)) - else: - self.logger.info('successfully opened {} database {}' - .format(env.db_engine, db_name)) - - self.init_state() - self.tx_count = self.db_tx_count - self.height = self.db_height - self.tip = self.db_tip - - # Caches to be flushed later. Headers and tx_hashes have one - # entry per block + # Headers and tx_hashes have one entry per block self.history = defaultdict(partial(array.array, 'I')) self.history_size = 0 - self.utxo_cache = UTXOCache(self, self.db, self.coin) - self.fs_cache = FSCache(self.coin, self.height, self.tx_count) self.prefetcher = Prefetcher(daemon, self.height) self.last_flush = time.time() self.last_flush_tx_count = self.tx_count - # Redirected member funcs - self.get_tx_hash = self.fs_cache.get_tx_hash - self.read_headers = self.fs_cache.read_headers - # Log state self.logger.info('{}/{} height: {:,d} tx count: {:,d} ' 'flush count: {:,d} utxo flush count: {:,d} ' @@ -451,30 +425,6 @@ class BlockProcessor(LoggedClass): return self.fs_cache.block_hashes(start, count) - def init_state(self): - if self.db.is_new: - self.db_height = -1 - self.db_tx_count = 0 - self.db_tip = b'\0' * 32 - self.flush_count = 0 - self.utxo_flush_count = 0 - self.wall_time = 0 - self.first_sync = True - else: - state = self.db.get(b'state') - state = ast.literal_eval(state.decode()) - if state['genesis'] != self.coin.GENESIS_HASH: - raise ChainError('DB genesis hash {} does not match coin {}' - .format(state['genesis_hash'], - self.coin.GENESIS_HASH)) - self.db_height = state['height'] - self.db_tx_count = state['tx_count'] - self.db_tip = state['tip'] - self.flush_count = state['flush_count'] - self.utxo_flush_count = state['utxo_flush_count'] - self.wall_time = state['wall_time'] - self.first_sync = state.get('first_sync', True) - def clean_db(self): '''Clean out stale DB items. @@ -839,13 +789,6 @@ class BlockProcessor(LoggedClass): assert n == 0 self.tx_count -= len(txs) - @staticmethod - def resolve_limit(limit): - if limit is None: - return -1 - assert isinstance(limit, int) and limit >= 0 - return limit - def mempool_transactions(self, hash168): '''Generate (hex_hash, tx_fee, unconfirmed) tuples for mempool entries for the hash168. @@ -860,60 +803,3 @@ class BlockProcessor(LoggedClass): Can be positive or negative. ''' return self.mempool.value(hash168) - - def get_history(self, hash168, limit=1000): - '''Generator that returns an unpruned, sorted list of (tx_hash, - height) tuples of confirmed transactions that touched the address, - earliest in the blockchain first. Includes both spending and - receiving transactions. By default yields at most 1000 entries. - Set limit to None to get them all. - ''' - limit = self.resolve_limit(limit) - prefix = b'H' + hash168 - for key, hist in self.db.iterator(prefix=prefix): - a = array.array('I') - a.frombytes(hist) - for tx_num in a: - if limit == 0: - return - yield self.get_tx_hash(tx_num) - limit -= 1 - - def get_balance(self, hash168): - '''Returns the confirmed balance of an address.''' - return sum(utxo.value for utxo in self.get_utxos(hash168, limit=None)) - - def get_utxos(self, hash168, limit=1000): - '''Generator that yields all UTXOs for an address sorted in no - particular order. By default yields at most 1000 entries. - Set limit to None to get them all. - ''' - limit = self.resolve_limit(limit) - unpack = struct.unpack - prefix = b'u' + hash168 - for k, v in self.db.iterator(prefix=prefix): - (tx_pos,) = unpack('= 0 + return limit + + def get_history(self, hash168, limit=1000): + '''Generator that returns an unpruned, sorted list of (tx_hash, + height) tuples of confirmed transactions that touched the address, + earliest in the blockchain first. Includes both spending and + receiving transactions. By default yields at most 1000 entries. + Set limit to None to get them all. + ''' + limit = self._resolve_limit(limit) + prefix = b'H' + hash168 + for key, hist in self.db.iterator(prefix=prefix): + a = array.array('I') + a.frombytes(hist) + for tx_num in a: + if limit == 0: + return + yield self.get_tx_hash(tx_num) + limit -= 1 + + def get_balance(self, hash168): + '''Returns the confirmed balance of an address.''' + return sum(utxo.value for utxo in self.get_utxos(hash168, limit=None)) + + def get_utxos(self, hash168, limit=1000): + '''Generator that yields all UTXOs for an address sorted in no + particular order. By default yields at most 1000 entries. + Set limit to None to get them all. + ''' + limit = self._resolve_limit(limit) + unpack = struct.unpack + prefix = b'u' + hash168 + for k, v in self.db.iterator(prefix=prefix): + (tx_pos,) = unpack(' Date: Tue, 8 Nov 2016 08:09:59 +0900 Subject: [PATCH 2/4] Create BlockServer Controller now an empty shell --- server/block_processor.py | 71 +++++++++++++++++++++++++++++++++++---- server/controller.py | 56 ++---------------------------- 2 files changed, 67 insertions(+), 60 deletions(-) diff --git a/server/block_processor.py b/server/block_processor.py index 1412cbc..c58284a 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -11,6 +11,7 @@ import array import ast import asyncio +import ssl import struct import time from bisect import bisect_left @@ -19,6 +20,7 @@ from functools import partial from server.cache import FSCache, UTXOCache, NO_CACHE_ENTRY from server.daemon import DaemonError +from server.protocol import ElectrumX, LocalRPC, JSONRPC from lib.hash import hash_to_str from lib.tx import Deserializer from lib.util import chunks, LoggedClass @@ -288,13 +290,14 @@ class BlockProcessor(server.db.DB): Coordinate backing up in case of chain reorganisations. ''' - def __init__(self, env, daemon, on_update=None): + def __init__(self, env, daemon): '''on_update is awaitable, and called only when caught up with the daemon and a new block arrives or the mempool is updated.''' super().__init__(env.coin, env.db_engine) + daemon.debug_set_height(self.height) + self.env = env self.daemon = daemon - self.on_update = on_update self.mempool = MemPool(self) self.touched = set() @@ -330,7 +333,6 @@ class BlockProcessor(server.db.DB): self.clean_db() def coros(self): - self.daemon.debug_set_height(self.height) return [self.start(), self.prefetcher.start()] async def start(self): @@ -359,6 +361,7 @@ class BlockProcessor(server.db.DB): await asyncio.sleep(0) # Yield if caught_up: await self.caught_up(mempool_hashes) + self.touched = set() except ChainReorg: await self.handle_chain_reorg() @@ -370,10 +373,7 @@ class BlockProcessor(server.db.DB): if self.first_sync: self.first_sync = False self.logger.info('synced to height {:,d}'.format(self.height)) - if self.on_update: - self.touched.update(await self.mempool.update(mempool_hashes)) - await self.on_update(self.height, self.touched) - self.touched = set() + self.touched.update(await self.mempool.update(mempool_hashes)) async def handle_chain_reorg(self): # First get all state on disk @@ -803,3 +803,60 @@ class BlockProcessor(server.db.DB): Can be positive or negative. ''' return self.mempool.value(hash168) + + +class BlockServer(BlockProcessor): + '''Like BlockProcessor but also starts servers when caught up.''' + + def __init__(self, env, daemon): + '''on_update is awaitable, and called only when caught up with the + daemon and a new block arrives or the mempool is updated.''' + super().__init__(env, daemon) + self.servers = [] + + async def caught_up(self, mempool_hashes): + await super().caught_up(mempool_hashes) + if not self.servers: + await self.start_servers() + ElectrumX.notify(self.height, self.touched) + + async def start_servers(self): + '''Start listening on RPC, TCP and SSL ports. + + Does not start a server if the port wasn't specified. + ''' + env = self.env + loop = asyncio.get_event_loop() + + JSONRPC.init(self, self.daemon, self.coin) + + protocol = LocalRPC + if env.rpc_port is not None: + host = 'localhost' + rpc_server = loop.create_server(protocol, host, env.rpc_port) + self.servers.append(await rpc_server) + self.logger.info('RPC server listening on {}:{:d}' + .format(host, env.rpc_port)) + + protocol = partial(ElectrumX, env) + if env.tcp_port is not None: + tcp_server = loop.create_server(protocol, env.host, env.tcp_port) + self.servers.append(await tcp_server) + self.logger.info('TCP server listening on {}:{:d}' + .format(env.host, env.tcp_port)) + + if env.ssl_port is not None: + # FIXME: update if we want to require Python >= 3.5.3 + ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2) + ssl_context.load_cert_chain(env.ssl_certfile, + keyfile=env.ssl_keyfile) + ssl_server = loop.create_server(protocol, env.host, env.ssl_port, + ssl=ssl_context) + self.servers.append(await ssl_server) + self.logger.info('SSL server listening on {}:{:d}' + .format(env.host, env.ssl_port)) + + def stop(self): + '''Close the listening servers.''' + for server in self.servers: + server.close() diff --git a/server/controller.py b/server/controller.py index a30aeb4..dffae97 100644 --- a/server/controller.py +++ b/server/controller.py @@ -12,12 +12,9 @@ client-serving data such as histories. ''' import asyncio -import ssl -from functools import partial from server.daemon import Daemon -from server.block_processor import BlockProcessor -from server.protocol import ElectrumX, LocalRPC, JSONRPC +from server.block_processor import BlockServer from lib.util import LoggedClass @@ -33,10 +30,7 @@ class Controller(LoggedClass): self.env = env self.coin = env.coin self.daemon = Daemon(env.daemon_url, env.debug) - self.block_processor = BlockProcessor(env, self.daemon, - on_update=self.on_update) - JSONRPC.init(self.block_processor, self.daemon, self.coin) - self.servers = [] + self.block_processor = BlockServer(env, self.daemon) def start(self): '''Prime the event loop with asynchronous jobs.''' @@ -45,50 +39,6 @@ class Controller(LoggedClass): for coro in coros: asyncio.ensure_future(coro) - async def on_update(self, height, touched): - if not self.servers: - self.servers = await self.start_servers() - ElectrumX.notify(height, touched) - - async def start_servers(self): - '''Start listening on RPC, TCP and SSL ports. - - Does not start a server if the port wasn't specified. Does - nothing if servers are already running. - ''' - servers = [] - env = self.env - loop = self.loop - - protocol = LocalRPC - if env.rpc_port is not None: - host = 'localhost' - rpc_server = loop.create_server(protocol, host, env.rpc_port) - servers.append(await rpc_server) - self.logger.info('RPC server listening on {}:{:d}' - .format(host, env.rpc_port)) - - protocol = partial(ElectrumX, env) - if env.tcp_port is not None: - tcp_server = loop.create_server(protocol, env.host, env.tcp_port) - servers.append(await tcp_server) - self.logger.info('TCP server listening on {}:{:d}' - .format(env.host, env.tcp_port)) - - if env.ssl_port is not None: - # FIXME: update if we want to require Python >= 3.5.3 - ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2) - ssl_context.load_cert_chain(env.ssl_certfile, - keyfile=env.ssl_keyfile) - ssl_server = loop.create_server(protocol, env.host, env.ssl_port, - ssl=ssl_context) - servers.append(await ssl_server) - self.logger.info('SSL server listening on {}:{:d}' - .format(env.host, env.ssl_port)) - - return servers - def stop(self): '''Close the listening servers.''' - for server in self.servers: - server.close() + self.block_processor.stop() From 93d53bdd8772721f1eb4d35c4e99f88466f922a7 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Tue, 8 Nov 2016 08:29:56 +0900 Subject: [PATCH 3/4] The controller is dead! --- docs/ARCHITECTURE.rst | 7 ------- electrumx_server.py | 27 ++++++++++-------------- server/block_processor.py | 33 ++++++++++++++++------------- server/controller.py | 44 --------------------------------------- server/db.py | 18 ++++++++++------ 5 files changed, 42 insertions(+), 87 deletions(-) delete mode 100644 server/controller.py diff --git a/docs/ARCHITECTURE.rst b/docs/ARCHITECTURE.rst index 3af6c99..3bd8f5e 100644 --- a/docs/ARCHITECTURE.rst +++ b/docs/ARCHITECTURE.rst @@ -90,10 +90,3 @@ IRC Not currently imlpemented; will handle IRC communication for the ElectrumX servers. - -Controller ----------- - -A historical artefact that currently coordinates some of the above -components. Not pictured as it is doesn't seem to have a logical -place and so is probably going away. diff --git a/electrumx_server.py b/electrumx_server.py index 9429883..94d65c8 100755 --- a/electrumx_server.py +++ b/electrumx_server.py @@ -17,11 +17,11 @@ import traceback from functools import partial from server.env import Env -from server.controller import Controller +from server.block_processor import BlockServer -def cancel_tasks(loop): - # Cancel and collect the remaining tasks +def close_loop(loop): + '''Close the loop down cleanly. Cancel and collect remaining tasks.''' tasks = asyncio.Task.all_tasks() for task in tasks: task.cancel() @@ -31,41 +31,36 @@ def cancel_tasks(loop): except asyncio.CancelledError: pass + loop.close() + def main_loop(): - '''Get tasks; loop until complete.''' + '''Start the server.''' if os.geteuid() == 0: raise Exception('DO NOT RUN AS ROOT! Create an unpriveleged user ' 'account and use that') - env = Env() - logging.info('switching current directory to {}'.format(env.db_dir)) - os.chdir(env.db_dir) - loop = asyncio.get_event_loop() #loop.set_debug(True) - controller = Controller(loop, env) - - # Signal handlers def on_signal(signame): '''Call on receipt of a signal to cleanly shutdown.''' logging.warning('received {} signal, preparing to shut down' .format(signame)) loop.stop() + # Install signal handlers for signame in ('SIGINT', 'SIGTERM'): loop.add_signal_handler(getattr(signal, signame), partial(on_signal, signame)) - controller.start() + server = BlockServer(Env()) + server.start() try: loop.run_forever() finally: - controller.stop() - cancel_tasks(loop) - - loop.close() + server.stop() + close_loop(loop) def main(): diff --git a/server/block_processor.py b/server/block_processor.py index c58284a..0309491 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -19,7 +19,7 @@ from collections import defaultdict, namedtuple from functools import partial from server.cache import FSCache, UTXOCache, NO_CACHE_ENTRY -from server.daemon import DaemonError +from server.daemon import Daemon, DaemonError from server.protocol import ElectrumX, LocalRPC, JSONRPC from lib.hash import hash_to_str from lib.tx import Deserializer @@ -78,7 +78,11 @@ class Prefetcher(LoggedClass): else: return blocks, None - async def start(self): + def start(self): + '''Start the prefetcher.''' + asyncio.ensure_future(self.main_loop()) + + async def main_loop(self): '''Loop forever polling for more blocks.''' self.logger.info('starting daemon poll loop...') while True: @@ -290,14 +294,13 @@ class BlockProcessor(server.db.DB): Coordinate backing up in case of chain reorganisations. ''' - def __init__(self, env, daemon): + def __init__(self, env): '''on_update is awaitable, and called only when caught up with the daemon and a new block arrives or the mempool is updated.''' - super().__init__(env.coin, env.db_engine) - daemon.debug_set_height(self.height) + super().__init__(env) - self.env = env - self.daemon = daemon + self.daemon = Daemon(env.daemon_url, env.debug) + self.daemon.debug_set_height(self.height) self.mempool = MemPool(self) self.touched = set() @@ -310,7 +313,7 @@ class BlockProcessor(server.db.DB): # Headers and tx_hashes have one entry per block self.history = defaultdict(partial(array.array, 'I')) self.history_size = 0 - self.prefetcher = Prefetcher(daemon, self.height) + self.prefetcher = Prefetcher(self.daemon, self.height) self.last_flush = time.time() self.last_flush_tx_count = self.tx_count @@ -332,11 +335,13 @@ class BlockProcessor(server.db.DB): self.clean_db() - def coros(self): - return [self.start(), self.prefetcher.start()] + def start(self): + '''Start the block processor.''' + asyncio.ensure_future(self.main_loop()) + self.prefetcher.start() - async def start(self): - '''External entry point for block processing. + async def main_loop(self): + '''Main loop for block processing. Safely flushes the DB on clean shutdown. ''' @@ -808,10 +813,10 @@ class BlockProcessor(server.db.DB): class BlockServer(BlockProcessor): '''Like BlockProcessor but also starts servers when caught up.''' - def __init__(self, env, daemon): + def __init__(self, env): '''on_update is awaitable, and called only when caught up with the daemon and a new block arrives or the mempool is updated.''' - super().__init__(env, daemon) + super().__init__(env) self.servers = [] async def caught_up(self, mempool_hashes): diff --git a/server/controller.py b/server/controller.py deleted file mode 100644 index dffae97..0000000 --- a/server/controller.py +++ /dev/null @@ -1,44 +0,0 @@ -# Copyright (c) 2016, Neil Booth -# -# All rights reserved. -# -# See the file "LICENCE" for information about the copyright -# and warranty status of this software. - -'''Server controller. - -Coordinates the parts of the server. Serves as a cache for -client-serving data such as histories. -''' - -import asyncio - -from server.daemon import Daemon -from server.block_processor import BlockServer -from lib.util import LoggedClass - - -class Controller(LoggedClass): - - def __init__(self, loop, env): - '''Create up the controller. - - Creates DB, Daemon and BlockProcessor instances. - ''' - super().__init__() - self.loop = loop - self.env = env - self.coin = env.coin - self.daemon = Daemon(env.daemon_url, env.debug) - self.block_processor = BlockServer(env, self.daemon) - - def start(self): - '''Prime the event loop with asynchronous jobs.''' - coros = self.block_processor.coros() - - for coro in coros: - asyncio.ensure_future(coro) - - def stop(self): - '''Close the listening servers.''' - self.block_processor.stop() diff --git a/server/db.py b/server/db.py index f253213..0256ff4 100644 --- a/server/db.py +++ b/server/db.py @@ -9,6 +9,7 @@ import array import ast +import os import struct from collections import namedtuple @@ -26,19 +27,24 @@ class DB(LoggedClass): it was shutdown uncleanly. ''' - def __init__(self, coin, db_engine): + def __init__(self, env): super().__init__() - self.coin = coin + self.env = env + self.coin = env.coin + + self.logger.info('switching current directory to {}' + .format(env.db_dir)) + os.chdir(env.db_dir) # Open DB and metadata files. Record some of its state. - db_name = '{}-{}'.format(coin.NAME, coin.NET) - self.db = open_db(db_name, db_engine) + db_name = '{}-{}'.format(self.coin.NAME, self.coin.NET) + self.db = open_db(db_name, env.db_engine) if self.db.is_new: self.logger.info('created new {} database {}' - .format(db_engine, db_name)) + .format(env.db_engine, db_name)) else: self.logger.info('successfully opened {} database {}' - .format(db_engine, db_name)) + .format(env.db_engine, db_name)) self.init_state_from_db() self.tx_count = self.db_tx_count From f020fcf9770b540c3a6c35e6388b02d37ad5b20b Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Tue, 8 Nov 2016 08:31:43 +0900 Subject: [PATCH 4/4] Update query.py --- query.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/query.py b/query.py index faea87a..8fe7c94 100755 --- a/query.py +++ b/query.py @@ -13,11 +13,10 @@ Not currently documented; might become easier to use in future. ''' -import os import sys from server.env import Env -from server.block_processor import BlockProcessor +from server.DB import DB from lib.hash import hash_to_str @@ -40,9 +39,8 @@ def count_entries(db): def main(): env = Env() + bp = DB(env) coin = env.coin - os.chdir(env.db_dir) - bp = BlockProcessor(env, None) if len(sys.argv) == 1: count_entries(bp.db) return