diff --git a/docs/ARCHITECTURE.rst b/docs/ARCHITECTURE.rst index 3af6c99..98c43c4 100644 --- a/docs/ARCHITECTURE.rst +++ b/docs/ARCHITECTURE.rst @@ -15,13 +15,13 @@ The components of the server are roughly like this:: - ElectrumX -<<<<<- LocalRPC - ------------- ------------ < > - ---------- ------------------- ---------- - - Daemon -<<<<<<<<- Block processor ->>>>- Caches - - ---------- ------------------- ---------- + ---------- ------------------- -------------- + - Daemon -<<<<<<<<- Block processor ->>>>- UTXO Cache - + ---------- ------------------- -------------- < < > < - -------------- ----------- - - Prefetcher - - Storage - - -------------- ----------- + -------------- ---------------- + - Prefetcher - - FS + Storage - + -------------- ---------------- Env @@ -90,10 +90,3 @@ IRC Not currently imlpemented; will handle IRC communication for the ElectrumX servers. - -Controller ----------- - -A historical artefact that currently coordinates some of the above -components. Not pictured as it is doesn't seem to have a logical -place and so is probably going away. diff --git a/docs/RELEASE-NOTES b/docs/RELEASE-NOTES index bf1214c..cf34aed 100644 --- a/docs/RELEASE-NOTES +++ b/docs/RELEASE-NOTES @@ -1,3 +1,14 @@ +Version 0.2.2 +------------- + +- mostly refactoring: controller.py is gone; cache.py is half-gone. + Split BlockProcessor into 3: DB, BlockProcessor and BlockServer. DB + handles stored DB and FS state; BlockProcessor handles pushing the + chain forward and caching of updates, and BlockServer will + additionally serve clients on catchup. More to come. +- mempool: better logging; also yields during initial seeding +- issues fixed: #10 + Version 0.2.1 ------------- diff --git a/electrumx_server.py b/electrumx_server.py index 60948fa..939bd2e 100755 --- a/electrumx_server.py +++ b/electrumx_server.py @@ -9,39 +9,45 @@ '''Script to kick off the server.''' - import asyncio import logging import os +import signal import traceback +from functools import partial from server.env import Env -from server.controller import Controller +from server.protocol import BlockServer def main_loop(): - '''Get tasks; loop until complete.''' + '''Start the server.''' if os.geteuid() == 0: raise Exception('DO NOT RUN AS ROOT! Create an unpriveleged user ' 'account and use that') - env = Env() - logging.info('switching current directory to {}'.format(env.db_dir)) - os.chdir(env.db_dir) - loop = asyncio.get_event_loop() #loop.set_debug(True) - controller = Controller(loop, env) - controller.start() + def on_signal(signame): + '''Call on receipt of a signal to cleanly shutdown.''' + logging.warning('received {} signal, shutting down'.format(signame)) + for task in asyncio.Task.all_tasks(): + task.cancel() + + # Install signal handlers + for signame in ('SIGINT', 'SIGTERM'): + loop.add_signal_handler(getattr(signal, signame), + partial(on_signal, signame)) - tasks = asyncio.Task.all_tasks(loop) + server = BlockServer(Env()) + future = server.start() try: - loop.run_until_complete(asyncio.gather(*tasks)) + loop.run_until_complete(future) except asyncio.CancelledError: - logging.warning('task cancelled; asyncio event loop closing') + pass finally: - controller.stop() + server.stop() loop.close() diff --git a/lib/script.py b/lib/script.py index 503d38b..98c8fef 100644 --- a/lib/script.py +++ b/lib/script.py @@ -12,8 +12,6 @@ import struct from collections import namedtuple from lib.enum import Enumeration -from lib.hash import hash160 -from lib.util import cachedproperty class ScriptError(Exception): @@ -58,80 +56,6 @@ assert OpCodes.OP_CHECKSIG == 0xac assert OpCodes.OP_CHECKMULTISIG == 0xae -class ScriptSig(object): - '''A script from a tx input. - - Typically provides one or more signatures.''' - - SIG_ADDRESS, SIG_MULTI, SIG_PUBKEY, SIG_UNKNOWN = range(4) - - def __init__(self, script, coin, kind, sigs, pubkeys): - self.script = script - self.coin = coin - self.kind = kind - self.sigs = sigs - self.pubkeys = pubkeys - - @cachedproperty - def address(self): - if self.kind == SIG_ADDRESS: - return self.coin.address_from_pubkey(self.pubkeys[0]) - if self.kind == SIG_MULTI: - return self.coin.multsig_address(self.pubkeys) - return 'Unknown' - - @classmethod - def from_script(cls, script, coin): - '''Return an instance of this class. - - Return an object with kind SIG_UNKNOWN for unrecognised scripts.''' - try: - return cls.parse_script(script, coin) - except ScriptError: - return cls(script, coin, SIG_UNKNOWN, [], []) - - @classmethod - def parse_script(cls, script, coin): - '''Return an instance of this class. - - Raises on unrecognised scripts.''' - ops, datas = Script.get_ops(script) - - # Address, PubKey and P2SH redeems only push data - if not ops or not Script.match_ops(ops, [-1] * len(ops)): - raise ScriptError('unknown scriptsig pattern') - - # Assume double data pushes are address redeems, single data - # pushes are pubkey redeems - if len(ops) == 2: # Signature, pubkey - return cls(script, coin, SIG_ADDRESS, [datas[0]], [datas[1]]) - - if len(ops) == 1: # Pubkey - return cls(script, coin, SIG_PUBKEY, [datas[0]], []) - - # Presumably it is P2SH (though conceivably the above could be - # too; cannot be sure without the send-to script). We only - # handle CHECKMULTISIG P2SH, which because of a bitcoin core - # bug always start with an unused OP_0. - if ops[0] != OpCodes.OP_0: - raise ScriptError('unknown scriptsig pattern; expected OP_0') - - # OP_0, Sig1, ..., SigM, pk_script - m = len(ops) - 2 - pk_script = datas[-1] - pk_ops, pk_datas = Script.get_ops(script) - - # OP_2 pubkey1 pubkey2 pubkey3 OP_3 OP_CHECKMULTISIG - n = len(pk_ops) - 3 - pattern = ([OpCodes.OP_1 + m - 1] + [-1] * n - + [OpCodes.OP_1 + n - 1, OpCodes.OP_CHECKMULTISIG]) - - if m <= n and Script.match_ops(pk_ops, pattern): - return cls(script, coin, SIG_MULTI, datas[1:-1], pk_datas[1:-2]) - - raise ScriptError('unknown multisig P2SH pattern') - - class ScriptPubKey(object): '''A class for handling a tx output script that gives conditions necessary for spending. diff --git a/lib/util.py b/lib/util.py index eb34f20..f41d4f2 100644 --- a/lib/util.py +++ b/lib/util.py @@ -82,6 +82,7 @@ def subclasses(base_class, strict=True): pairs = inspect.getmembers(sys.modules[base_class.__module__], select) return [pair[1] for pair in pairs] + def chunks(items, size): '''Break up items, an iterable, into chunks of length size.''' for i in range(0, len(items), size): @@ -90,20 +91,12 @@ def chunks(items, size): def bytes_to_int(be_bytes): '''Interprets a big-endian sequence of bytes as an integer''' - assert isinstance(be_bytes, (bytes, bytearray)) - value = 0 - for byte in be_bytes: - value = value * 256 + byte - return value + return int.from_bytes(be_bytes, 'big') def int_to_bytes(value): '''Converts an integer to a big-endian sequence of bytes''' - mods = [] - while value: - value, mod = divmod(value, 256) - mods.append(mod) - return bytes(reversed(mods)) + return value.to_bytes((value.bit_length() + 7) // 8, 'big') def increment_byte_string(bs): diff --git a/query.py b/query.py index faea87a..8fe7c94 100755 --- a/query.py +++ b/query.py @@ -13,11 +13,10 @@ Not currently documented; might become easier to use in future. ''' -import os import sys from server.env import Env -from server.block_processor import BlockProcessor +from server.DB import DB from lib.hash import hash_to_str @@ -40,9 +39,8 @@ def count_entries(db): def main(): env = Env() + bp = DB(env) coin = env.coin - os.chdir(env.db_dir) - bp = BlockProcessor(env, None) if len(sys.argv) == 1: count_entries(bp.db) return diff --git a/server/block_processor.py b/server/block_processor.py index 6116924..feb4eee 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -9,19 +9,21 @@ import array -import ast import asyncio +import itertools +import os import struct import time from bisect import bisect_left -from collections import defaultdict, namedtuple +from collections import defaultdict from functools import partial -from server.cache import FSCache, UTXOCache, NO_CACHE_ENTRY -from server.daemon import DaemonError +from server.cache import UTXOCache, NO_CACHE_ENTRY +from server.daemon import Daemon, DaemonError from lib.hash import hash_to_str from lib.tx import Deserializer from lib.util import chunks, LoggedClass +import server.db from server.storage import open_db @@ -33,9 +35,6 @@ def formatted_time(t): t // 86400, (t % 86400) // 3600, (t % 3600) // 60, t % 60) -UTXO = namedtuple("UTXO", "tx_num tx_pos tx_hash height value") - - class ChainError(Exception): pass @@ -78,7 +77,7 @@ class Prefetcher(LoggedClass): else: return blocks, None - async def start(self): + async def main_loop(self): '''Loop forever polling for more blocks.''' self.logger.info('starting daemon poll loop...') while True: @@ -176,9 +175,11 @@ class MemPool(LoggedClass): ''' hex_hashes = set(hex_hashes) touched = set() + missing_utxos = 0 - if self.count < 0: - self.logger.info('initial fetch of {:,d} daemon mempool txs' + initial = self.count < 0 + if initial: + self.logger.info('beginning import of {:,d} mempool txs' .format(len(hex_hashes))) # Remove gone items @@ -198,6 +199,8 @@ class MemPool(LoggedClass): # them into a dictionary of hex hash to deserialized tx. hex_hashes.difference_update(self.txs) raw_txs = await self.bp.daemon.getrawtransactions(hex_hashes) + if initial: + self.logger.info('all fetched, now analysing...') new_txs = {hex_hash: Deserializer(raw_tx).read_tx() for hex_hash, raw_tx in zip(hex_hashes, raw_txs) if raw_tx} del raw_txs, hex_hashes @@ -210,7 +213,10 @@ class MemPool(LoggedClass): def txout_pair(txout): return (script_hash168(txout.pk_script), txout.value) - for hex_hash, tx in new_txs.items(): + for n, (hex_hash, tx) in enumerate(new_txs.items()): + # Yield to process e.g. signals + if n % 500 == 0: + await asyncio.sleep(0) txout_pairs = [txout_pair(txout) for txout in tx.outputs] self.txs[hex_hash] = (None, txout_pairs, None) @@ -221,22 +227,38 @@ class MemPool(LoggedClass): return mempool_entry[1][txin.prev_idx], True entry = utxo_lookup(txin.prev_hash, txin.prev_idx) if entry == NO_CACHE_ENTRY: - # Not possible unless daemon is lying or we're corrupted? - self.logger.warning('no UTXO found for {} / {}' - .format(hash_to_str(txin.prev_hash), - txin.prev_idx)) + # This happens when the daemon is a block ahead of us + # and has mempool txs spending new txs in that block raise MissingUTXOError value, = struct.unpack(' next_log: + next_log = time.time() + 10 + self.logger.info('{:,d} done ({:d}%)' + .format(n, int(n / len(new_txs) * 100))) + txout_pairs = self.txs[hex_hash][1] try: infos = (txin_info(txin) for txin in tx.inputs) txin_pairs, unconfs = zip(*infos) except MissingUTXOError: - # If we were missing a UTXO for some reason drop this tx + # Drop this TX. If other mempool txs depend on it + # it's harmless - next time the mempool is refreshed + # they'll either be cleaned up or the UTXOs will no + # longer be missing. + missing_utxos += 1 del self.txs[hex_hash] continue self.txs[hex_hash] = (txin_pairs, txout_pairs, any(unconfs)) @@ -249,6 +271,11 @@ class MemPool(LoggedClass): self.hash168s[hash168].add(hex_hash) touched.add(hash168) + if missing_utxos: + self.logger.info('{:,d} txs had missing UTXOs; probably the ' + 'daemon is a block or two ahead of us' + .format(missing_utxos)) + self.count += 1 if self.count % 25 == 0 or gone: self.count = 0 @@ -283,21 +310,25 @@ class MemPool(LoggedClass): return value -class BlockProcessor(LoggedClass): +class BlockProcessor(server.db.DB): '''Process blocks and update the DB state to match. Employ a prefetcher to prefetch blocks in batches for processing. Coordinate backing up in case of chain reorganisations. ''' - def __init__(self, env, daemon, on_update=None): + def __init__(self, env): '''on_update is awaitable, and called only when caught up with the - daemon and a new block arrives or the mempool is updated. - ''' - super().__init__() + daemon and a new block arrives or the mempool is updated.''' + super().__init__(env) - self.daemon = daemon - self.on_update = on_update + # These are our state as we move ahead of DB state + self.height = self.db_height + self.tip = self.db_tip + self.tx_count = self.db_tx_count + + self.daemon = Daemon(env.daemon_url, env.debug) + self.daemon.debug_set_height(self.height) self.mempool = MemPool(self) self.touched = set() @@ -305,38 +336,20 @@ class BlockProcessor(LoggedClass): self.utxo_MB = env.utxo_MB self.hist_MB = env.hist_MB self.next_cache_check = 0 - self.coin = env.coin self.reorg_limit = env.reorg_limit - # Open DB and metadata files. Record some of its state. - db_name = '{}-{}'.format(self.coin.NAME, self.coin.NET) - self.db = open_db(db_name, env.db_engine) - if self.db.is_new: - self.logger.info('created new {} database {}' - .format(env.db_engine, db_name)) - else: - self.logger.info('successfully opened {} database {}' - .format(env.db_engine, db_name)) - - self.init_state() - self.tx_count = self.db_tx_count - self.height = self.db_height - self.tip = self.db_tip - - # Caches to be flushed later. Headers and tx_hashes have one - # entry per block + # Headers and tx_hashes have one entry per block self.history = defaultdict(partial(array.array, 'I')) self.history_size = 0 - self.utxo_cache = UTXOCache(self, self.db, self.coin) - self.fs_cache = FSCache(self.coin, self.height, self.tx_count) - self.prefetcher = Prefetcher(daemon, self.height) + self.prefetcher = Prefetcher(self.daemon, self.height) self.last_flush = time.time() self.last_flush_tx_count = self.tx_count - # Redirected member funcs - self.get_tx_hash = self.fs_cache.get_tx_hash - self.read_headers = self.fs_cache.read_headers + # Caches of unflushed items + self.utxo_cache = UTXOCache(self.get_tx_hash, self.db, self.coin) + self.headers = [] + self.tx_hashes = [] # Log state self.logger.info('{}/{} height: {:,d} tx count: {:,d} ' @@ -355,12 +368,13 @@ class BlockProcessor(LoggedClass): self.clean_db() - def coros(self): - self.daemon.debug_set_height(self.height) - return [self.start(), self.prefetcher.start()] + def start(self): + '''Returns a future that starts the block processor when awaited.''' + return asyncio.gather(self.main_loop(), + self.prefetcher.main_loop()) - async def start(self): - '''External entry point for block processing. + async def main_loop(self): + '''Main loop for block processing. Safely flushes the DB on clean shutdown. ''' @@ -385,6 +399,7 @@ class BlockProcessor(LoggedClass): await asyncio.sleep(0) # Yield if caught_up: await self.caught_up(mempool_hashes) + self.touched = set() except ChainReorg: await self.handle_chain_reorg() @@ -396,10 +411,7 @@ class BlockProcessor(LoggedClass): if self.first_sync: self.first_sync = False self.logger.info('synced to height {:,d}'.format(self.height)) - if self.on_update: - self.touched.update(await self.mempool.update(mempool_hashes)) - await self.on_update(self.height, self.touched) - self.touched = set() + self.touched.update(await self.mempool.update(mempool_hashes)) async def handle_chain_reorg(self): # First get all state on disk @@ -432,7 +444,7 @@ class BlockProcessor(LoggedClass): start = self.height - 1 count = 1 while start > 0: - hashes = self.fs_cache.block_hashes(start, count) + hashes = self.fs_block_hashes(start, count) hex_hashes = [hash_to_str(hash) for hash in hashes] d_hex_hashes = await self.daemon.block_hex_hashes(start, count) n = match_pos(hex_hashes, d_hex_hashes) @@ -449,31 +461,7 @@ class BlockProcessor(LoggedClass): 'height {:,d} to height {:,d}' .format(count, start, start + count - 1)) - return self.fs_cache.block_hashes(start, count) - - def init_state(self): - if self.db.is_new: - self.db_height = -1 - self.db_tx_count = 0 - self.db_tip = b'\0' * 32 - self.flush_count = 0 - self.utxo_flush_count = 0 - self.wall_time = 0 - self.first_sync = True - else: - state = self.db.get(b'state') - state = ast.literal_eval(state.decode()) - if state['genesis'] != self.coin.GENESIS_HASH: - raise ChainError('DB genesis hash {} does not match coin {}' - .format(state['genesis_hash'], - self.coin.GENESIS_HASH)) - self.db_height = state['height'] - self.db_tx_count = state['tx_count'] - self.db_tip = state['tip'] - self.flush_count = state['flush_count'] - self.utxo_flush_count = state['utxo_flush_count'] - self.wall_time = state['wall_time'] - self.first_sync = state.get('first_sync', True) + return self.fs_block_hashes(start, count) def clean_db(self): '''Clean out stale DB items. @@ -547,9 +535,6 @@ class BlockProcessor(LoggedClass): self.height - self.db_height)) self.utxo_cache.flush(batch) self.utxo_flush_count = self.flush_count - self.db_tx_count = self.tx_count - self.db_height = self.height - self.db_tip = self.tip def assert_flushed(self): '''Asserts state is fully flushed.''' @@ -567,37 +552,40 @@ class BlockProcessor(LoggedClass): self.assert_flushed() return + self.flush_count += 1 flush_start = time.time() last_flush = self.last_flush tx_diff = self.tx_count - self.last_flush_tx_count show_stats = self.first_sync - # Write out the files to the FS before flushing to the DB. If - # the DB transaction fails, the files being too long doesn't - # matter. But if writing the files fails we do not want to - # have updated the DB. if self.height > self.db_height: assert flush_history is None flush_history = self.flush_history - self.fs_cache.flush(self.height, self.tx_count) with self.db.write_batch() as batch: # History first - fast and frees memory. Flush state last # as it reads the wall time. flush_history(batch) if flush_utxos: + self.fs_flush() self.flush_utxos(batch) self.flush_state(batch) self.logger.info('committing transaction...') + # Update our in-memory state after successful flush + self.db_tx_count = self.tx_count + self.db_height = self.height + self.db_tip = self.tip + self.tx_hashes = [] + self.headers = [] + # Update and put the wall time again - otherwise we drop the # time it took to commit the batch self.flush_state(self.db) - flush_time = int(self.last_flush - flush_start) self.logger.info('flush #{:,d} to height {:,d} txs: {:,d} took {:,d}s' .format(self.flush_count, self.height, self.tx_count, - flush_time)) + int(self.last_flush - flush_start))) # Catch-up stats if show_stats: @@ -623,21 +611,68 @@ class BlockProcessor(LoggedClass): formatted_time(tx_est / this_tx_per_sec))) def flush_history(self, batch): - self.logger.info('flushing history') - - self.flush_count += 1 + flush_start = time.time() flush_id = struct.pack('>H', self.flush_count) for hash168, hist in self.history.items(): key = b'H' + hash168 + flush_id batch.put(key, hist.tobytes()) - self.logger.info('{:,d} history entries in {:,d} addrs' - .format(self.history_size, len(self.history))) - + self.logger.info('flushed {:,d} history entries for {:,d} addrs ' + 'in {:,d}s' + .format(self.history_size, len(self.history), + int(time.time() - flush_start))) self.history = defaultdict(partial(array.array, 'I')) self.history_size = 0 + def fs_flush(self): + '''Flush the things stored on the filesystem.''' + flush_start = time.time() + blocks_done = len(self.headers) + prior_tx_count = (self.tx_counts[self.db_height] + if self.db_height >= 0 else 0) + cur_tx_count = self.tx_counts[-1] if self.tx_counts else 0 + txs_done = cur_tx_count - prior_tx_count + + assert self.db_height + blocks_done == self.height + assert len(self.tx_hashes) == blocks_done + assert len(self.tx_counts) == self.height + 1 + assert cur_tx_count == self.tx_count, \ + 'cur: {:,d} new: {:,d}'.format(cur_tx_count, self.tx_count) + + # First the headers + headers = b''.join(self.headers) + header_len = self.coin.HEADER_LEN + self.headers_file.seek((self.db_height + 1) * header_len) + self.headers_file.write(headers) + self.headers_file.flush() + + # Then the tx counts + self.txcount_file.seek((self.db_height + 1) * self.tx_counts.itemsize) + self.txcount_file.write(self.tx_counts[self.db_height + 1:]) + self.txcount_file.flush() + + # Finally the hashes + hashes = memoryview(b''.join(itertools.chain(*self.tx_hashes))) + assert len(hashes) % 32 == 0 + assert len(hashes) // 32 == txs_done + cursor = 0 + file_pos = prior_tx_count * 32 + while cursor < len(hashes): + file_num, offset = divmod(file_pos, self.tx_hash_file_size) + size = min(len(hashes) - cursor, self.tx_hash_file_size - offset) + filename = 'hashes{:04d}'.format(file_num) + with self.open_file(filename, create=True) as f: + f.seek(offset) + f.write(hashes[cursor:cursor + size]) + cursor += size + file_pos += size + + os.sync() + + self.logger.info('FS flush took {:.1f} seconds' + .format(time.time() - flush_start)) + def backup_history(self, batch, hash168s): self.logger.info('backing up history to height {:,d} tx_count {:,d}' .format(self.height, self.tx_count)) @@ -707,9 +742,18 @@ class BlockProcessor(LoggedClass): '''Read undo information from a file for the current height.''' return self.db.get(self.undo_key(height)) + def fs_advance_block(self, header, tx_hashes, txs): + '''Update unflushed FS state for a new block.''' + prior_tx_count = self.tx_counts[-1] if self.tx_counts else 0 + + # Cache the new header, tx hashes and cumulative tx count + self.headers.append(header) + self.tx_hashes.append(tx_hashes) + self.tx_counts.append(prior_tx_count + len(txs)) + def advance_block(self, block, update_touched): - # We must update the fs_cache before calling advance_txs() as - # the UTXO cache uses the fs_cache via get_tx_hash() to + # We must update the FS cache before calling advance_txs() as + # the UTXO cache uses the FS cache via get_tx_hash() to # resolve compressed key collisions header, tx_hashes, txs = self.coin.read_block(block) prev_hash, header_hash = self.coin.header_hashes(header) @@ -717,7 +761,7 @@ class BlockProcessor(LoggedClass): raise ChainReorg touched = set() - self.fs_cache.advance_block(header, tx_hashes, txs) + self.fs_advance_block(header, tx_hashes, txs) self.tip = header_hash self.height += 1 undo_info = self.advance_txs(tx_hashes, txs, touched) @@ -797,10 +841,13 @@ class BlockProcessor(LoggedClass): hash_to_str(self.tip), self.height)) self.backup_txs(tx_hashes, txs, touched) - self.fs_cache.backup_block() self.tip = prev_hash + assert self.height >= 0 self.height -= 1 + assert not self.headers + assert not self.tx_hashes + self.logger.info('backed up to height {:,d}'.format(self.height)) self.touched.update(touched) @@ -839,12 +886,33 @@ class BlockProcessor(LoggedClass): assert n == 0 self.tx_count -= len(txs) - @staticmethod - def resolve_limit(limit): - if limit is None: - return -1 - assert isinstance(limit, int) and limit >= 0 - return limit + def read_headers(self, start, count): + # Read some from disk + disk_count = min(count, self.db_height + 1 - start) + result = self.fs_read_headers(start, disk_count) + count -= disk_count + start += disk_count + + # The rest from memory + if count: + start -= self.db_height + 1 + if not (count >= 0 and start + count <= len(self.headers)): + raise ChainError('{:,d} headers starting at {:,d} not on disk' + .format(count, start)) + result += b''.join(self.headers[start: start + count]) + + return result + + def get_tx_hash(self, tx_num): + '''Returns the tx_hash and height of a tx number.''' + tx_hash, tx_height = self.fs_tx_hash(tx_num) + + # Is this unflushed? + if tx_hash is None: + tx_hashes = self.tx_hashes[tx_height - (self.db_height + 1)] + tx_hash = tx_hashes[tx_num - self.tx_counts[tx_height - 1]] + + return tx_hash, tx_height def mempool_transactions(self, hash168): '''Generate (hex_hash, tx_fee, unconfirmed) tuples for mempool @@ -860,60 +928,3 @@ class BlockProcessor(LoggedClass): Can be positive or negative. ''' return self.mempool.value(hash168) - - def get_history(self, hash168, limit=1000): - '''Generator that returns an unpruned, sorted list of (tx_hash, - height) tuples of confirmed transactions that touched the address, - earliest in the blockchain first. Includes both spending and - receiving transactions. By default yields at most 1000 entries. - Set limit to None to get them all. - ''' - limit = self.resolve_limit(limit) - prefix = b'H' + hash168 - for key, hist in self.db.iterator(prefix=prefix): - a = array.array('I') - a.frombytes(hist) - for tx_num in a: - if limit == 0: - return - yield self.get_tx_hash(tx_num) - limit -= 1 - - def get_balance(self, hash168): - '''Returns the confirmed balance of an address.''' - return sum(utxo.value for utxo in self.get_utxos(hash168, limit=None)) - - def get_utxos(self, hash168, limit=1000): - '''Generator that yields all UTXOs for an address sorted in no - particular order. By default yields at most 1000 entries. - Set limit to None to get them all. - ''' - limit = self.resolve_limit(limit) - unpack = struct.unpack - prefix = b'u' + hash168 - for k, v in self.db.iterator(prefix=prefix): - (tx_pos,) = unpack('= 0 - # Just update in-memory. It doesn't matter if disk files are - # too long, they will be overwritten when advancing. - self.height -= 1 - self.tx_counts.pop() - - def flush(self, new_height, new_tx_count): - '''Flush the things stored on the filesystem. - The arguments are passed for sanity check assertions only.''' - self.logger.info('flushing to file system') - - blocks_done = len(self.headers) - prior_tx_count = self.tx_counts[self.height] if self.height >= 0 else 0 - cur_tx_count = self.tx_counts[-1] if self.tx_counts else 0 - txs_done = cur_tx_count - prior_tx_count - - assert self.height + blocks_done == new_height - assert len(self.tx_hashes) == blocks_done - assert len(self.tx_counts) == new_height + 1 - assert cur_tx_count == new_tx_count, \ - 'cur: {:,d} new: {:,d}'.format(cur_tx_count, new_tx_count) - - # First the headers - headers = b''.join(self.headers) - header_len = self.coin.HEADER_LEN - self.headers_file.seek((self.height + 1) * header_len) - self.headers_file.write(headers) - self.headers_file.flush() - - # Then the tx counts - self.txcount_file.seek((self.height + 1) * self.tx_counts.itemsize) - self.txcount_file.write(self.tx_counts[self.height + 1:]) - self.txcount_file.flush() - - # Finally the hashes - hashes = memoryview(b''.join(itertools.chain(*self.tx_hashes))) - assert len(hashes) % 32 == 0 - assert len(hashes) // 32 == txs_done - cursor = 0 - file_pos = prior_tx_count * 32 - while cursor < len(hashes): - file_num, offset = divmod(file_pos, self.tx_hash_file_size) - size = min(len(hashes) - cursor, self.tx_hash_file_size - offset) - filename = 'hashes{:04d}'.format(file_num) - with self.open_file(filename, create=True) as f: - f.seek(offset) - f.write(hashes[cursor:cursor + size]) - cursor += size - file_pos += size - - os.sync() - - self.tx_hashes = [] - self.headers = [] - self.height += blocks_done - - def read_headers(self, start, count): - result = b'' - - # Read some from disk - disk_count = min(count, self.height + 1 - start) - if disk_count > 0: - header_len = self.coin.HEADER_LEN - assert start >= 0 - self.headers_file.seek(start * header_len) - result = self.headers_file.read(disk_count * header_len) - count -= disk_count - start += disk_count - - # The rest from memory - start -= self.height + 1 - assert count >= 0 and start + count <= len(self.headers) - result += b''.join(self.headers[start: start + count]) - - return result - - def get_tx_hash(self, tx_num): - '''Returns the tx_hash and height of a tx number.''' - height = bisect_right(self.tx_counts, tx_num) - - # Is this on disk or unflushed? - if height > self.height: - tx_hashes = self.tx_hashes[height - (self.height + 1)] - tx_hash = tx_hashes[tx_num - self.tx_counts[height - 1]] - else: - file_pos = tx_num * 32 - file_num, offset = divmod(file_pos, self.tx_hash_file_size) - filename = 'hashes{:04d}'.format(file_num) - with self.open_file(filename) as f: - f.seek(offset) - tx_hash = f.read(32) - - return tx_hash, height - - def block_hashes(self, height, count): - headers = self.read_headers(height, count) - hlen = self.coin.HEADER_LEN - return [double_sha256(header) for header in chunks(headers, hlen)] diff --git a/server/controller.py b/server/controller.py deleted file mode 100644 index 6cfd030..0000000 --- a/server/controller.py +++ /dev/null @@ -1,107 +0,0 @@ -# Copyright (c) 2016, Neil Booth -# -# All rights reserved. -# -# See the file "LICENCE" for information about the copyright -# and warranty status of this software. - -'''Server controller. - -Coordinates the parts of the server. Serves as a cache for -client-serving data such as histories. -''' - -import asyncio -import signal -import ssl -from functools import partial - -from server.daemon import Daemon -from server.block_processor import BlockProcessor -from server.protocol import ElectrumX, LocalRPC, JSONRPC -from lib.util import LoggedClass - - -class Controller(LoggedClass): - - def __init__(self, loop, env): - '''Create up the controller. - - Creates DB, Daemon and BlockProcessor instances. - ''' - super().__init__() - self.loop = loop - self.env = env - self.coin = env.coin - self.daemon = Daemon(env.daemon_url, env.debug) - self.block_processor = BlockProcessor(env, self.daemon, - on_update=self.on_update) - JSONRPC.init(self.block_processor, self.daemon, self.coin) - self.servers = [] - - def start(self): - '''Prime the event loop with asynchronous jobs.''' - coros = self.block_processor.coros() - - for coro in coros: - asyncio.ensure_future(coro) - - # Signal handlers - for signame in ('SIGINT', 'SIGTERM'): - self.loop.add_signal_handler(getattr(signal, signame), - partial(self.on_signal, signame)) - - async def on_update(self, height, touched): - if not self.servers: - self.servers = await self.start_servers() - ElectrumX.notify(height, touched) - - async def start_servers(self): - '''Start listening on RPC, TCP and SSL ports. - - Does not start a server if the port wasn't specified. Does - nothing if servers are already running. - ''' - servers = [] - env = self.env - loop = self.loop - - protocol = LocalRPC - if env.rpc_port is not None: - host = 'localhost' - rpc_server = loop.create_server(protocol, host, env.rpc_port) - servers.append(await rpc_server) - self.logger.info('RPC server listening on {}:{:d}' - .format(host, env.rpc_port)) - - protocol = partial(ElectrumX, env) - if env.tcp_port is not None: - tcp_server = loop.create_server(protocol, env.host, env.tcp_port) - servers.append(await tcp_server) - self.logger.info('TCP server listening on {}:{:d}' - .format(env.host, env.tcp_port)) - - if env.ssl_port is not None: - # FIXME: update if we want to require Python >= 3.5.3 - ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2) - ssl_context.load_cert_chain(env.ssl_certfile, - keyfile=env.ssl_keyfile) - ssl_server = loop.create_server(protocol, env.host, env.ssl_port, - ssl=ssl_context) - servers.append(await ssl_server) - self.logger.info('SSL server listening on {}:{:d}' - .format(env.host, env.ssl_port)) - - return servers - - def stop(self): - '''Close the listening servers.''' - for server in self.servers: - server.close() - - def on_signal(self, signame): - '''Call on receipt of a signal to cleanly shutdown.''' - self.logger.warning('received {} signal, preparing to shut down' - .format(signame)) - for task in asyncio.Task.all_tasks(self.loop): - task.cancel() diff --git a/server/daemon.py b/server/daemon.py index f785fc2..e94af41 100644 --- a/server/daemon.py +++ b/server/daemon.py @@ -86,11 +86,11 @@ class Daemon(util.LoggedClass): except aiohttp.ClientHttpProcessingError: msg = 'HTTP error' except aiohttp.ServerDisconnectedError: - msg = 'daemon disconnected' + msg = 'disconnected' except aiohttp.ClientConnectionError: msg = 'connection problem - is your daemon running?' except DaemonWarmingUpError: - msg = 'daemon is still warming up' + msg = 'still starting up checking blocks...' if msg != prior_msg or count == 10: self.logger.error('{}. Retrying between sleeps...' diff --git a/server/db.py b/server/db.py new file mode 100644 index 0000000..414eb75 --- /dev/null +++ b/server/db.py @@ -0,0 +1,220 @@ +# Copyright (c) 2016, Neil Booth +# +# All rights reserved. +# +# See the file "LICENCE" for information about the copyright +# and warranty status of this software. + +'''Interface to the blockchain database.''' + + +import array +import ast +import os +import struct +from bisect import bisect_right +from collections import namedtuple + +from lib.util import chunks, LoggedClass +from lib.hash import double_sha256 +from server.storage import open_db + +UTXO = namedtuple("UTXO", "tx_num tx_pos tx_hash height value") + +class DB(LoggedClass): + '''Simple wrapper of the backend database for querying. + + Performs no DB update, though the DB will be cleaned on opening if + it was shutdown uncleanly. + ''' + + class DBError(Exception): + pass + + def __init__(self, env): + super().__init__() + self.env = env + self.coin = env.coin + + self.logger.info('switching current directory to {}' + .format(env.db_dir)) + os.chdir(env.db_dir) + + # Open DB and metadata files. Record some of its state. + db_name = '{}-{}'.format(self.coin.NAME, self.coin.NET) + self.db = open_db(db_name, env.db_engine) + if self.db.is_new: + self.logger.info('created new {} database {}' + .format(env.db_engine, db_name)) + else: + self.logger.info('successfully opened {} database {}' + .format(env.db_engine, db_name)) + self.init_state_from_db() + + create = self.db_height == -1 + self.headers_file = self.open_file('headers', create) + self.txcount_file = self.open_file('txcount', create) + self.tx_hash_file_size = 16 * 1024 * 1024 + + # tx_counts[N] has the cumulative number of txs at the end of + # height N. So tx_counts[0] is 1 - the genesis coinbase + self.tx_counts = array.array('I') + self.txcount_file.seek(0) + self.tx_counts.fromfile(self.txcount_file, self.db_height + 1) + if self.tx_counts: + assert self.db_tx_count == self.tx_counts[-1] + else: + assert self.db_tx_count == 0 + + def init_state_from_db(self): + if self.db.is_new: + self.db_height = -1 + self.db_tx_count = 0 + self.db_tip = b'\0' * 32 + self.flush_count = 0 + self.utxo_flush_count = 0 + self.wall_time = 0 + self.first_sync = True + else: + state = self.db.get(b'state') + state = ast.literal_eval(state.decode()) + if state['genesis'] != self.coin.GENESIS_HASH: + raise self.DBError('DB genesis hash {} does not match coin {}' + .format(state['genesis_hash'], + self.coin.GENESIS_HASH)) + self.db_height = state['height'] + self.db_tx_count = state['tx_count'] + self.db_tip = state['tip'] + self.flush_count = state['flush_count'] + self.utxo_flush_count = state['utxo_flush_count'] + self.wall_time = state['wall_time'] + self.first_sync = state.get('first_sync', True) + + def open_file(self, filename, create=False): + '''Open the file name. Return its handle.''' + try: + return open(filename, 'rb+') + except FileNotFoundError: + if create: + return open(filename, 'wb+') + raise + + def fs_read_headers(self, start, count): + # Read some from disk + disk_count = min(count, self.db_height + 1 - start) + if start < 0 or count < 0 or disk_count != count: + raise self.DBError('{:,d} headers starting at {:,d} not on disk' + .format(count, start)) + if disk_count: + header_len = self.coin.HEADER_LEN + self.headers_file.seek(start * header_len) + return self.headers_file.read(disk_count * header_len) + return b'' + + def fs_tx_hash(self, tx_num): + '''Return a par (tx_hash, tx_height) for the given tx number. + + If the tx_height is not on disk, returns (None, tx_height).''' + tx_height = bisect_right(self.tx_counts, tx_num) + + if tx_height > self.db_height: + return None, tx_height + raise self.DBError('tx_num {:,d} is not on disk') + + file_pos = tx_num * 32 + file_num, offset = divmod(file_pos, self.tx_hash_file_size) + filename = 'hashes{:04d}'.format(file_num) + with self.open_file(filename) as f: + f.seek(offset) + return f.read(32), tx_height + + def fs_block_hashes(self, height, count): + headers = self.fs_read_headers(height, count) + # FIXME: move to coins.py + hlen = self.coin.HEADER_LEN + return [double_sha256(header) for header in chunks(headers, hlen)] + + @staticmethod + def _resolve_limit(limit): + if limit is None: + return -1 + assert isinstance(limit, int) and limit >= 0 + return limit + + def get_history(self, hash168, limit=1000): + '''Generator that returns an unpruned, sorted list of (tx_hash, + height) tuples of confirmed transactions that touched the address, + earliest in the blockchain first. Includes both spending and + receiving transactions. By default yields at most 1000 entries. + Set limit to None to get them all. + ''' + limit = self._resolve_limit(limit) + prefix = b'H' + hash168 + for key, hist in self.db.iterator(prefix=prefix): + a = array.array('I') + a.frombytes(hist) + for tx_num in a: + if limit == 0: + return + yield self.fs_tx_hash(tx_num) + limit -= 1 + + def get_balance(self, hash168): + '''Returns the confirmed balance of an address.''' + return sum(utxo.value for utxo in self.get_utxos(hash168, limit=None)) + + def get_utxos(self, hash168, limit=1000): + '''Generator that yields all UTXOs for an address sorted in no + particular order. By default yields at most 1000 entries. + Set limit to None to get them all. + ''' + limit = self._resolve_limit(limit) + unpack = struct.unpack + prefix = b'u' + hash168 + for k, v in self.db.iterator(prefix=prefix): + (tx_pos,) = unpack('= 3.5.3 + ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2) + ssl_context.load_cert_chain(env.ssl_certfile, + keyfile=env.ssl_keyfile) + ssl_server = loop.create_server(protocol, env.host, env.ssl_port, + ssl=ssl_context) + self.servers.append(await ssl_server) + self.logger.info('SSL server listening on {}:{:d}' + .format(env.host, env.ssl_port)) + + def stop(self): + '''Close the listening servers.''' + for server in self.servers: + server.close() + + AsyncTask = namedtuple('AsyncTask', 'session job') class SessionManager(LoggedClass): diff --git a/server/version.py b/server/version.py index 7dd5120..79efc29 100644 --- a/server/version.py +++ b/server/version.py @@ -1 +1 @@ -VERSION = "ElectrumX 0.2.1" +VERSION = "ElectrumX 0.2.2"