From 9e220820aaa59bc93767a35dc4ab064b7b0a64a8 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Wed, 23 Nov 2016 16:55:23 +0900 Subject: [PATCH] Rework futures and event handling --- server/block_processor.py | 31 +++++++++---------- server/irc.py | 10 ++++-- server/protocol.py | 65 ++++++++++++++++++++++----------------- 3 files changed, 58 insertions(+), 48 deletions(-) diff --git a/server/block_processor.py b/server/block_processor.py index d529659..2456956 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -147,6 +147,7 @@ class BlockProcessor(server.db.DB): self.daemon = Daemon(self.coin.daemon_urls(env.daemon_url)) self.caught_up = False + self.event = asyncio.Event() self.touched = set() # Meta @@ -182,27 +183,23 @@ class BlockProcessor(server.db.DB): self.clean_db() async def main_loop(self): - '''Main loop for block processing. - - Safely flushes the DB on clean shutdown. - ''' - prefetcher_loop = asyncio.ensure_future(self.prefetcher.main_loop()) - - # Simulate a reorg if requested - if self.env.force_reorg > 0: - self.logger.info('DEBUG: simulating chain reorg of {:,d} blocks' - .format(self.env.force_reorg)) - await self.handle_chain_reorg(self.env.force_reorg) - + '''Main loop for block processing.''' try: + # Simulate a reorg if requested + if self.env.force_reorg > 0: + self.logger.info('DEBUG: simulating reorg of {:,d} blocks' + .format(self.env.force_reorg)) + await self.handle_chain_reorg(self.env.force_reorg) + while True: await self._wait_for_update() except asyncio.CancelledError: pass - prefetcher_loop.cancel() + async def shutdown(self): + '''Shut down the DB cleanly.''' + self.logger.info('flushing state to DB for clean shutdown...') self.flush(True) - await self.client.shutdown() async def _wait_for_update(self): '''Wait for the prefetcher to deliver blocks. @@ -211,7 +208,7 @@ class BlockProcessor(server.db.DB): ''' blocks = await self.prefetcher.get_blocks() if not blocks: - await self.first_caught_up() + self.first_caught_up() return '''Strip the unspendable genesis coinbase.''' @@ -235,7 +232,7 @@ class BlockProcessor(server.db.DB): self.next_cache_check = time.time() + 60 self.touched = set() - async def first_caught_up(self): + def first_caught_up(self): '''Called when first caught up after start, or after a reorg.''' self.caught_up = True if self.first_sync: @@ -243,7 +240,7 @@ class BlockProcessor(server.db.DB): self.logger.info('{} synced to height {:,d}. DB version:' .format(VERSION, self.height, self.db_version)) self.flush(True) - await self.client.first_caught_up() + self.event.set() async def handle_chain_reorg(self, count): '''Handle a chain reorganisation. diff --git a/server/irc.py b/server/irc.py index 5ee50ea..14ba8da 100644 --- a/server/irc.py +++ b/server/irc.py @@ -54,10 +54,16 @@ class IRC(LoggedClass): self.irc_port = env.coin.IRC_PORT self.peer_regexp = re.compile('({}[^!]*)!'.format(self.prefix)) self.peers = {} + self.disabled = env.irc is None - async def start(self): + async def start(self, caught_up): + '''Start IRC connections once caught up if enabled in environment.''' + await caught_up.wait() try: - await self.join() + if self.disabled: + self.logger.info('IRC is disabled') + else: + await self.join() except asyncio.CancelledError: pass except Exception as e: diff --git a/server/protocol.py b/server/protocol.py index dc452e9..c14e745 100644 --- a/server/protocol.py +++ b/server/protocol.py @@ -52,12 +52,11 @@ class MemPool(util.LoggedClass): self.hash168s = defaultdict(set) # None can be a key self.count = -1 - def start(self): - '''Starts the mempool synchronization mainloop. Return a future.''' - return asyncio.ensure_future(self.main_loop()) + async def main_loop(self, caught_up): + '''Asynchronously maintain mempool status with daemon. - async def main_loop(self): - '''Asynchronously maintain mempool status with daemon.''' + Waits until the caught up event is signalled.''' + await caught_up.wait() self.logger.info('maintaining state with daemon...') while True: try: @@ -181,7 +180,7 @@ class MemPool(util.LoggedClass): self.logger.info('{:,d} txs touching {:,d} addresses' .format(len(self.txs), len(self.hash168s))) - self.maanger.notify(touched) + self.manager.notify(touched) def transactions(self, hash168): '''Generate (hex_hash, tx_fee, unconfirmed) tuples for mempool @@ -220,15 +219,14 @@ class ServerManager(util.LoggedClass): def __init__(self, env): super().__init__() self.bp = BlockProcessor(self, env) - self.mempool = MemPool(self.db.daemon, env.coin, self.bp, self) + self.mempool = MemPool(self.bp.daemon, env.coin, self.bp, self) + self.irc = IRC(env) self.env = env self.servers = [] - self.irc = IRC(env) self.sessions = {} self.max_subs = env.max_subs self.subscription_count = 0 - self.irc_future = None - self.mempool_future = None + self.futures = [] self.logger.info('max subscriptions across all sessions: {:,d}' .format(self.max_subs)) self.logger.info('max subscriptions per session: {:,d}' @@ -249,6 +247,24 @@ class ServerManager(util.LoggedClass): ''' return self.mempool.value(hash168) + async def main_loop(self): + '''Server manager main loop.''' + def add_future(coro): + self.futures.append(asyncio.ensure_future(coro)) + + add_future(self.bp.main_loop()) + add_future(self.bp.prefetcher.main_loop()) + add_future(self.mempool.main_loop(self.bp.event)) + add_future(self.irc.start(self.bp.event)) + add_future(self.start_servers(self.bp.event)) + + for future in asyncio.as_completed(self.futures): + try: + await future # Note: future is not one of self.futures + except asyncio.CancelledError: + break + await self.shutdown() + async def start_server(self, kind, *args, **kw_args): loop = asyncio.get_event_loop() protocol_class = LocalRPC if kind == 'RPC' else ElectrumX @@ -265,12 +281,14 @@ class ServerManager(util.LoggedClass): self.logger.info('{} server listening on {}:{:d}' .format(kind, host, port)) - async def start_servers(self): + async def start_servers(self, caught_up): '''Connect to IRC and start listening for incoming connections. Only connect to IRC if enabled. Start listening on RCP, TCP - and SSL ports only if the port wasn pecified. + and SSL ports only if the port wasn't pecified. Waits for the + caught_up event to be signalled. ''' + await caught_up.wait() env = self.env if env.rpc_port is not None: @@ -285,16 +303,6 @@ class ServerManager(util.LoggedClass): sslc.load_cert_chain(env.ssl_certfile, keyfile=env.ssl_keyfile) await self.start_server('SSL', env.host, env.ssl_port, ssl=sslc) - if env.irc: - self.irc_future = asyncio.ensure_future(self.irc.start()) - else: - self.logger.info('IRC disabled') - - async def first_caught_up(self): - if not self.mempool_future: - self.mempool_future = self.mempool.start() - await self.server_mgr.start_servers() - def notify(self, touched): '''Notify sessions about height changes and touched addresses.''' cache = {} @@ -305,18 +313,17 @@ class ServerManager(util.LoggedClass): async def shutdown(self): '''Call to shutdown the servers. Returns when done.''' + for future in self.futures: + future.cancel() for server in self.servers: server.close() - for server in self.servers: await server.wait_closed() - self.servers = [] - - if self.irc_future: - self.irc_future.cancel() - if self.mempool_future: - self.mempool_future.cancel() + self.servers = [] # So add_session closes new sessions + while not all(future.done() for future in self.futures): + await asyncio.sleep(0) if self.sessions: await self.close_sessions() + await self.bp.shutdown() async def close_sessions(self, secs=60): self.logger.info('cleanly closing client sessions, please wait...')