diff --git a/docs/RELEASE-NOTES b/docs/RELEASE-NOTES index 7d67cbe..f62be6a 100644 --- a/docs/RELEASE-NOTES +++ b/docs/RELEASE-NOTES @@ -1,3 +1,17 @@ +version 0.7.5 +------------- + +- refactoring of server manager and event handling. One side effect + is to fix a bug in 0.7.4 where after a reorg ElectrumX might create + a second mempool and/or kick off more servers. Your testing would + be appreciated. This is part of the refactoring necessary to + process incoming blocks asynchronously so client connections are not + left waiting for several seconds +- close connections on first bad JSON encoding. Do not process buffered + requests of a closing connection +- make IRC params a function of the coin (TheLazier) and supply them for + Dash + version 0.7.4 ------------- diff --git a/electrumx_server.py b/electrumx_server.py index d1ef476..940828e 100755 --- a/electrumx_server.py +++ b/electrumx_server.py @@ -17,7 +17,7 @@ import traceback from functools import partial from server.env import Env -from server.protocol import BlockServer +from server.protocol import ServerManager SUPPRESS_MESSAGES = [ 'Fatal read error on socket transport', @@ -45,7 +45,7 @@ def main_loop(): 'accept_connection2()' in repr(context.get('task'))): loop.default_exception_handler(context) - server = BlockServer(Env()) + server = ServerManager(Env()) future = asyncio.ensure_future(server.main_loop()) # Install signal handlers diff --git a/lib/coins.py b/lib/coins.py index 4e56894..71129b6 100644 --- a/lib/coins.py +++ b/lib/coins.py @@ -39,6 +39,11 @@ class Coin(object): VALUE_PER_COIN = 100000000 CHUNK_SIZE=2016 STRANGE_VERBYTE = 0xff + # IRC Defaults + IRC_PREFIX = "E_" + IRC_CHANNEL = "#electrum" + IRC_SERVER = "irc.freenode.net" + IRC_PORT = 6667 @classmethod def lookup_coin_class(cls, name, net): @@ -359,6 +364,8 @@ class Dash(Coin): TX_COUNT = 2157510 TX_PER_BLOCK = 4 DEFAULT_RPC_PORT = 9998 + IRC_PREFIX = "D_" + IRC_CHANNEL = "#electrum-dash" @classmethod def header_hashes(cls, header): @@ -381,3 +388,4 @@ class DashTestnet(Dash): TX_COUNT = 132681 TX_PER_BLOCK = 1 DEFAULT_RPC_PORT = 19998 + IRC_PREFIX = "d_" diff --git a/lib/jsonrpc.py b/lib/jsonrpc.py index eff942d..e2ddda2 100644 --- a/lib/jsonrpc.py +++ b/lib/jsonrpc.py @@ -97,6 +97,9 @@ class JSONRPC(asyncio.Protocol, LoggedClass): decode_message for handling. ''' self.recv_size += len(data) + if self.transport.is_closing(): + return + while True: npos = data.find(ord('\n')) if npos == -1: @@ -118,6 +121,7 @@ class JSONRPC(asyncio.Protocol, LoggedClass): except UnicodeDecodeError as e: msg = 'cannot decode binary bytes: {}'.format(e) self.send_json_error(msg, self.PARSE_ERROR) + self.transport.close() return try: @@ -125,6 +129,7 @@ class JSONRPC(asyncio.Protocol, LoggedClass): except json.JSONDecodeError as e: msg = 'cannot decode JSON: {}'.format(e) self.send_json_error(msg, self.PARSE_ERROR) + self.transport.close() return self.on_json_request(message) diff --git a/server/block_processor.py b/server/block_processor.py index f911e51..2456956 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -133,9 +133,11 @@ class BlockProcessor(server.db.DB): Coordinate backing up in case of chain reorganisations. ''' - def __init__(self, env): + def __init__(self, client, env): super().__init__(env) + self.client = client + # These are our state as we move ahead of DB state self.fs_height = self.db_height self.fs_tx_count = self.db_tx_count @@ -145,8 +147,8 @@ class BlockProcessor(server.db.DB): self.daemon = Daemon(self.coin.daemon_urls(env.daemon_url)) self.caught_up = False + self.event = asyncio.Event() self.touched = set() - self.futures = [] # Meta self.utxo_MB = env.utxo_MB @@ -181,37 +183,24 @@ class BlockProcessor(server.db.DB): self.clean_db() async def main_loop(self): - '''Main loop for block processing. - - Safely flushes the DB on clean shutdown. - ''' - self.futures.append(asyncio.ensure_future(self.prefetcher.main_loop())) - - # Simulate a reorg if requested - if self.env.force_reorg > 0: - self.logger.info('DEBUG: simulating chain reorg of {:,d} blocks' - .format(self.env.force_reorg)) - await self.handle_chain_reorg(self.env.force_reorg) - + '''Main loop for block processing.''' try: + # Simulate a reorg if requested + if self.env.force_reorg > 0: + self.logger.info('DEBUG: simulating reorg of {:,d} blocks' + .format(self.env.force_reorg)) + await self.handle_chain_reorg(self.env.force_reorg) + while True: await self._wait_for_update() except asyncio.CancelledError: - self.on_cancel() - await self.wait_shutdown() - - def on_cancel(self): - '''Called when the main loop is cancelled. + pass - Intended to be overridden in derived classes.''' - for future in self.futures: - future.cancel() + async def shutdown(self): + '''Shut down the DB cleanly.''' + self.logger.info('flushing state to DB for clean shutdown...') self.flush(True) - async def wait_shutdown(self): - '''Wait for shutdown to complete cleanly, and return.''' - await asyncio.sleep(0) - async def _wait_for_update(self): '''Wait for the prefetcher to deliver blocks. @@ -219,7 +208,7 @@ class BlockProcessor(server.db.DB): ''' blocks = await self.prefetcher.get_blocks() if not blocks: - await self.first_caught_up() + self.first_caught_up() return '''Strip the unspendable genesis coinbase.''' @@ -237,26 +226,21 @@ class BlockProcessor(server.db.DB): # Flush everything as queries are performed on the DB and # not in-memory. self.flush(True) - self.notify(self.touched) + self.client.notify(self.touched) elif time.time() > self.next_cache_check: self.check_cache_size() self.next_cache_check = time.time() + 60 self.touched = set() - async def first_caught_up(self): - '''Called after each deamon poll if caught up.''' + def first_caught_up(self): + '''Called when first caught up after start, or after a reorg.''' self.caught_up = True if self.first_sync: self.first_sync = False self.logger.info('{} synced to height {:,d}. DB version:' .format(VERSION, self.height, self.db_version)) self.flush(True) - - def notify(self, touched): - '''Called with list of touched addresses by new blocks. - - Only called for blocks found after first_caught_up is called. - Intended to be overridden in derived classes.''' + self.event.set() async def handle_chain_reorg(self, count): '''Handle a chain reorganisation. diff --git a/server/irc.py b/server/irc.py index 3a938cd..14ba8da 100644 --- a/server/irc.py +++ b/server/irc.py @@ -30,7 +30,6 @@ def port_text(letter, port, default): class IRC(LoggedClass): - PEER_REGEXP = re.compile('(E_[^!]*)!') Peer = namedtuple('Peer', 'ip_addr host ports') class DisconnectedError(Exception): @@ -45,14 +44,26 @@ class IRC(LoggedClass): version = '1.0' self.real_name = '{} v{} {} {}'.format(env.report_host, version, tcp_text, ssl_text) - self.nick = 'E_{}'.format(env.irc_nick if env.irc_nick else + self.prefix = env.coin.IRC_PREFIX + self.nick = '{}{}'.format(self.prefix, + env.irc_nick if env.irc_nick else double_sha256(env.report_host.encode()) [:5].hex()) + self.channel = env.coin.IRC_CHANNEL + self.irc_server = env.coin.IRC_SERVER + self.irc_port = env.coin.IRC_PORT + self.peer_regexp = re.compile('({}[^!]*)!'.format(self.prefix)) self.peers = {} + self.disabled = env.irc is None - async def start(self): + async def start(self, caught_up): + '''Start IRC connections once caught up if enabled in environment.''' + await caught_up.wait() try: - await self.join() + if self.disabled: + self.logger.info('IRC is disabled') + else: + await self.join() except asyncio.CancelledError: pass except Exception as e: @@ -72,7 +83,7 @@ class IRC(LoggedClass): while True: try: connection = reactor.server() - connection.connect('irc.freenode.net', 6667, + connection.connect(self.irc_server, self.irc_port, self.nick, ircname=self.real_name) connection.set_keepalive(60) while True: @@ -89,8 +100,8 @@ class IRC(LoggedClass): .format(event.type, event.source, event.arguments)) def on_welcome(self, connection, event): - '''Called when we connect to freenode.''' - connection.join('#electrum') + '''Called when we connect to irc server.''' + connection.join(self.channel) def on_disconnect(self, connection, event): '''Called if we are disconnected.''' @@ -99,20 +110,20 @@ class IRC(LoggedClass): def on_join(self, connection, event): '''Called when someone new connects to our channel, including us.''' - match = self.PEER_REGEXP.match(event.source) + match = self.peer_regexp.match(event.source) if match: connection.who(match.group(1)) def on_quit(self, connection, event): '''Called when someone leaves our channel.''' - match = self.PEER_REGEXP.match(event.source) + match = self.peer_regexp.match(event.source) if match: self.peers.pop(match.group(1), None) def on_kick(self, connection, event): '''Called when someone is kicked from our channel.''' self.log_event(event) - match = self.PEER_REGEXP.match(event.arguments[0]) + match = self.peer_regexp.match(event.arguments[0]) if match: self.peers.pop(match.group(1), None) @@ -123,7 +134,7 @@ class IRC(LoggedClass): The users are space-separated in the 2nd argument. ''' for peer in event.arguments[2].split(): - if peer.startswith("E_"): + if peer.startswith(self.prefix): connection.who(peer) def on_whoreply(self, connection, event): diff --git a/server/protocol.py b/server/protocol.py index 94261df..c14e745 100644 --- a/server/protocol.py +++ b/server/protocol.py @@ -20,63 +20,14 @@ from functools import partial from lib.hash import sha256, double_sha256, hash_to_str, hex_str_to_hash from lib.jsonrpc import JSONRPC, json_notification_payload from lib.tx import Deserializer -from lib.util import LoggedClass +import lib.util as util from server.block_processor import BlockProcessor from server.daemon import DaemonError from server.irc import IRC from server.version import VERSION -class BlockServer(BlockProcessor): - '''Like BlockProcessor but also has a mempool and a server manager. - - Servers are started immediately the block processor first catches - up with the daemon. - ''' - - def __init__(self, env): - super().__init__(env) - self.server_mgr = ServerManager(self, env) - self.mempool = MemPool(self) - - async def first_caught_up(self): - # Call the base class to flush and log first - await super().first_caught_up() - await self.server_mgr.start_servers() - self.futures.append(self.mempool.start()) - - def notify(self, touched): - '''Called when addresses are touched by new blocks or mempool - updates.''' - self.server_mgr.notify(self.height, touched) - - def on_cancel(self): - '''Called when the main loop is cancelled.''' - self.server_mgr.stop() - super().on_cancel() - - async def wait_shutdown(self): - '''Wait for shutdown to complete cleanly, and return.''' - await self.server_mgr.wait_shutdown() - await super().wait_shutdown() - - def mempool_transactions(self, hash168): - '''Generate (hex_hash, tx_fee, unconfirmed) tuples for mempool - entries for the hash168. - - unconfirmed is True if any txin is unconfirmed. - ''' - return self.mempool.transactions(hash168) - - def mempool_value(self, hash168): - '''Return the unconfirmed amount in the mempool for hash168. - - Can be positive or negative. - ''' - return self.mempool.value(hash168) - - -class MemPool(LoggedClass): +class MemPool(util.LoggedClass): '''Representation of the daemon's mempool. Updated regularly in caught-up state. Goal is to enable efficient @@ -91,19 +42,21 @@ class MemPool(LoggedClass): tx's txins are unconfirmed. tx hashes are hex strings. ''' - def __init__(self, bp): + def __init__(self, daemon, coin, db, manager): super().__init__() + self.daemon = daemon + self.coin = coin + self.db = db + self.manager = manager self.txs = {} self.hash168s = defaultdict(set) # None can be a key - self.bp = bp self.count = -1 - def start(self): - '''Starts the mempool synchronization mainloop. Return a future.''' - return asyncio.ensure_future(self.main_loop()) + async def main_loop(self, caught_up): + '''Asynchronously maintain mempool status with daemon. - async def main_loop(self): - '''Asynchronously maintain mempool status with daemon.''' + Waits until the caught up event is signalled.''' + await caught_up.wait() self.logger.info('maintaining state with daemon...') while True: try: @@ -120,7 +73,7 @@ class MemPool(LoggedClass): Remove transactions that are no longer in our mempool. Request new transactions we don't have then add to our mempool. ''' - hex_hashes = set(await self.bp.daemon.mempool_hashes()) + hex_hashes = set(await self.daemon.mempool_hashes()) touched = set() missing_utxos = [] @@ -145,7 +98,7 @@ class MemPool(LoggedClass): # ones the daemon no longer has (it will return None). Put # them into a dictionary of hex hash to deserialized tx. hex_hashes.difference_update(self.txs) - raw_txs = await self.bp.daemon.getrawtransactions(hex_hashes) + raw_txs = await self.daemon.getrawtransactions(hex_hashes) if initial: self.logger.info('analysing {:,d} mempool txs' .format(len(raw_txs))) @@ -155,8 +108,8 @@ class MemPool(LoggedClass): # The mempool is unordered, so process all outputs first so # that looking for inputs has full info. - script_hash168 = self.bp.coin.hash168_from_script() - db_utxo_lookup = self.bp.db_utxo_lookup + script_hash168 = self.coin.hash168_from_script() + db_utxo_lookup = self.db.db_utxo_lookup def txout_pair(txout): return (script_hash168(txout.pk_script), txout.value) @@ -195,7 +148,7 @@ class MemPool(LoggedClass): try: infos = (txin_info(txin) for txin in tx.inputs) txin_pairs, unconfs = zip(*infos) - except self.bp.MissingUTXOError: + except self.db.MissingUTXOError: # Drop this TX. If other mempool txs depend on it # it's harmless - next time the mempool is refreshed # they'll either be cleaned up or the UTXOs will no @@ -227,7 +180,7 @@ class MemPool(LoggedClass): self.logger.info('{:,d} txs touching {:,d} addresses' .format(len(self.txs), len(self.hash168s))) - self.bp.notify(touched) + self.manager.notify(touched) def transactions(self, hash168): '''Generate (hex_hash, tx_fee, unconfirmed) tuples for mempool @@ -254,26 +207,64 @@ class MemPool(LoggedClass): return value -class ServerManager(LoggedClass): - '''Manages the servers.''' +class ServerManager(util.LoggedClass): + '''Manages the client servers, a mempool, and a block processor. + + Servers are started immediately the block processor first catches + up with the daemon. + ''' MgrTask = namedtuple('MgrTask', 'session task') - def __init__(self, bp, env): + def __init__(self, env): super().__init__() - self.bp = bp + self.bp = BlockProcessor(self, env) + self.mempool = MemPool(self.bp.daemon, env.coin, self.bp, self) + self.irc = IRC(env) self.env = env self.servers = [] - self.irc = IRC(env) self.sessions = {} self.max_subs = env.max_subs self.subscription_count = 0 - self.irc_future = None + self.futures = [] self.logger.info('max subscriptions across all sessions: {:,d}' .format(self.max_subs)) self.logger.info('max subscriptions per session: {:,d}' .format(env.max_session_subs)) + def mempool_transactions(self, hash168): + '''Generate (hex_hash, tx_fee, unconfirmed) tuples for mempool + entries for the hash168. + + unconfirmed is True if any txin is unconfirmed. + ''' + return self.mempool.transactions(hash168) + + def mempool_value(self, hash168): + '''Return the unconfirmed amount in the mempool for hash168. + + Can be positive or negative. + ''' + return self.mempool.value(hash168) + + async def main_loop(self): + '''Server manager main loop.''' + def add_future(coro): + self.futures.append(asyncio.ensure_future(coro)) + + add_future(self.bp.main_loop()) + add_future(self.bp.prefetcher.main_loop()) + add_future(self.mempool.main_loop(self.bp.event)) + add_future(self.irc.start(self.bp.event)) + add_future(self.start_servers(self.bp.event)) + + for future in asyncio.as_completed(self.futures): + try: + await future # Note: future is not one of self.futures + except asyncio.CancelledError: + break + await self.shutdown() + async def start_server(self, kind, *args, **kw_args): loop = asyncio.get_event_loop() protocol_class = LocalRPC if kind == 'RPC' else ElectrumX @@ -290,12 +281,14 @@ class ServerManager(LoggedClass): self.logger.info('{} server listening on {}:{:d}' .format(kind, host, port)) - async def start_servers(self): + async def start_servers(self, caught_up): '''Connect to IRC and start listening for incoming connections. Only connect to IRC if enabled. Start listening on RCP, TCP - and SSL ports only if the port wasn pecified. + and SSL ports only if the port wasn't pecified. Waits for the + caught_up event to be signalled. ''' + await caught_up.wait() env = self.env if env.rpc_port is not None: @@ -310,40 +303,34 @@ class ServerManager(LoggedClass): sslc.load_cert_chain(env.ssl_certfile, keyfile=env.ssl_keyfile) await self.start_server('SSL', env.host, env.ssl_port, ssl=sslc) - if env.irc: - self.irc_future = asyncio.ensure_future(self.irc.start()) - else: - self.logger.info('IRC disabled') - - def notify(self, height, touched): + def notify(self, touched): '''Notify sessions about height changes and touched addresses.''' cache = {} for session in self.sessions: if isinstance(session, ElectrumX): # Use a tuple to distinguish from JSON - session.jobs.put_nowait((height, touched, cache)) + session.jobs.put_nowait((self.bp.height, touched, cache)) - def stop(self): - '''Close listening servers.''' + async def shutdown(self): + '''Call to shutdown the servers. Returns when done.''' + for future in self.futures: + future.cancel() for server in self.servers: server.close() - if self.irc_future: - self.irc_future.cancel() + await server.wait_closed() + self.servers = [] # So add_session closes new sessions + while not all(future.done() for future in self.futures): + await asyncio.sleep(0) if self.sessions: - self.logger.info('cleanly closing client sessions, please wait...') + await self.close_sessions() + await self.bp.shutdown() + + async def close_sessions(self, secs=60): + self.logger.info('cleanly closing client sessions, please wait...') for session in self.sessions: self.close_session(session) - - async def wait_shutdown(self): - # Wait for servers to close - for server in self.servers: - await server.wait_closed() - self.servers = [] - - secs = 60 self.logger.info('server listening sockets closed, waiting ' '{:d} seconds for socket cleanup'.format(secs)) - limit = time.time() + secs while self.sessions and time.time() < limit: await asyncio.sleep(4) @@ -628,7 +615,7 @@ class ElectrumX(Session): # Note history is ordered and mempool unordered in electrum-server # For mempool, height is -1 if unconfirmed txins, otherwise 0 history = await self.async_get_history(hash168) - mempool = self.bp.mempool_transactions(hash168) + mempool = self.manager.mempool_transactions(hash168) status = ''.join('{}:{:d}:'.format(hash_to_str(tx_hash), height) for tx_hash, height in history) @@ -666,7 +653,7 @@ class ElectrumX(Session): def unconfirmed_history(self, hash168): # Note unconfirmed history is unordered in electrum-server # Height is -1 if unconfirmed txins, otherwise 0 - mempool = self.bp.mempool_transactions(hash168) + mempool = self.manager.mempool_transactions(hash168) return [{'tx_hash': tx_hash, 'height': -unconfirmed, 'fee': fee} for tx_hash, fee, unconfirmed in mempool] @@ -707,7 +694,7 @@ class ElectrumX(Session): async def get_balance(self, hash168): utxos = await self.get_utxos(hash168) confirmed = sum(utxo.value for utxo in utxos) - unconfirmed = self.bp.mempool_value(hash168) + unconfirmed = self.manager.mempool_value(hash168) return {'confirmed': confirmed, 'unconfirmed': unconfirmed} async def list_unspent(self, hash168): diff --git a/server/version.py b/server/version.py index aa6eb22..c7103aa 100644 --- a/server/version.py +++ b/server/version.py @@ -1 +1 @@ -VERSION = "ElectrumX 0.7.4" +VERSION = "ElectrumX 0.7.5"