From 68dbf9fad22e04fa7935f35813f7f7c006c3d5fd Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Wed, 25 Jul 2018 21:33:12 +0800 Subject: [PATCH] Tweak task handling so all errors are logged --- electrumx/lib/server_base.py | 2 +- electrumx/lib/tasks.py | 15 +++++++++++++-- electrumx/server/controller.py | 4 ++-- electrumx/server/mempool.py | 3 ++- electrumx/server/peers.py | 6 ++++-- electrumx/server/session.py | 10 +++++----- 6 files changed, 27 insertions(+), 13 deletions(-) diff --git a/electrumx/lib/server_base.py b/electrumx/lib/server_base.py index a68952b..50c2bc4 100644 --- a/electrumx/lib/server_base.py +++ b/electrumx/lib/server_base.py @@ -103,7 +103,7 @@ class ServerBase(object): partial(self.on_signal, signame)) loop.set_exception_handler(self.on_exception) - self.tasks.create_task(self.start_servers()) + await self.start_servers() # Wait for shutdown to be signalled, and log it. # Derived classes may want to provide a shutdown() coroutine. diff --git a/electrumx/lib/tasks.py b/electrumx/lib/tasks.py index b122226..1775316 100644 --- a/electrumx/lib/tasks.py +++ b/electrumx/lib/tasks.py @@ -46,9 +46,20 @@ class Tasks(object): '''Run a function in a separate thread, and await its completion.''' return await self.loop.run_in_executor(None, func, *args) - def create_task(self, coro): + def create_task(self, coro, daemon=True): '''Schedule the coro to be run.''' - return self.tasks.create_task(coro) + task = self.tasks.create_task(coro) + if daemon: + task.add_done_callback(self._check_task_exception) + return task + + def _check_task_exception(self, task): + '''Check a task for exceptions.''' + try: + if not task.cancelled(): + task.result() + except Exception as e: + self.logger.exception(f'uncaught task exception: {e}') async def cancel_all(self, wait=True): '''Cancels all tasks and waits for them to complete.''' diff --git a/electrumx/server/controller.py b/electrumx/server/controller.py index d0d7df3..21b5777 100644 --- a/electrumx/server/controller.py +++ b/electrumx/server/controller.py @@ -111,10 +111,10 @@ class Controller(ServerBase): '''Start the RPC server and wait for the mempool to synchronize. Then start the peer manager and serving external clients. ''' - self.session_mgr.start_rpc_server() + await self.session_mgr.start_rpc_server() await self.bp.catch_up_to_daemon() await self.mempool.start_and_wait_for_sync() - self.session_mgr.start_serving() + await self.session_mgr.start_serving() # Peer discovery should start after we start serving because # we connect to ourself self.peer_mgr.start_peer_discovery() diff --git a/electrumx/server/mempool.py b/electrumx/server/mempool.py index 5153742..5e19a6d 100644 --- a/electrumx/server/mempool.py +++ b/electrumx/server/mempool.py @@ -165,7 +165,8 @@ class MemPool(object): # Process new transactions new_hashes = list(all_hashes.difference(txs)) jobs = [self.tasks.create_task(self._fetch_and_accept - (hashes, all_hashes, touched)) + (hashes, all_hashes, touched), + daemon=False) for hashes in chunks(new_hashes, 2000)] if jobs: await asyncio.gather(*jobs) diff --git a/electrumx/server/peers.py b/electrumx/server/peers.py index 6167ac9..c30d7d9 100644 --- a/electrumx/server/peers.py +++ b/electrumx/server/peers.py @@ -194,9 +194,11 @@ class PeerManager(object): # Retry a failed connection if enough time has passed return peer.last_try < now - WAKEUP_SECS * 2 ** peer.try_count + tasks = [] for peer in self.peers: if should_retry(peer): - self.tasks.create_task(self._retry_peer(peer)) + tasks.append(self.tasks.create_task(self._retry_peer(peer))) + await asyncio.gather(*tasks) async def _retry_peer(self, peer): peer.try_count += 1 @@ -275,7 +277,7 @@ class PeerManager(object): peer.features['server_version'] = server_version ptuple = protocol_tuple(protocol_version) - jobs = [self.tasks.create_task(message) for message in ( + jobs = [self.tasks.create_task(message, daemon=False) for message in ( self._send_headers_subscribe(session, peer, timeout, ptuple), self._send_server_features(session, peer, timeout), self._send_peers_subscribe(session, peer, timeout) diff --git a/electrumx/server/session.py b/electrumx/server/session.py index a0c8c7c..b646596 100644 --- a/electrumx/server/session.py +++ b/electrumx/server/session.py @@ -396,13 +396,13 @@ class SessionManager(object): # --- External Interface - def start_rpc_server(self): + async def start_rpc_server(self): '''Start the RPC server if enabled.''' if self.env.rpc_port is not None: - self.tasks.create_task(self._start_server( - 'RPC', self.env.cs_host(for_rpc=True), self.env.rpc_port)) + await self._start_server('RPC', self.env.cs_host(for_rpc=True), + self.env.rpc_port) - def start_serving(self): + async def start_serving(self): '''Start TCP and SSL servers.''' self.logger.info('max session count: {:,d}'.format(self.max_sessions)) self.logger.info('session timeout: {:,d} seconds' @@ -418,7 +418,7 @@ class SessionManager(object): if self.env.drop_client is not None: self.logger.info('drop clients matching: {}' .format(self.env.drop_client.pattern)) - self.tasks.create_task(self._start_external_servers()) + await self._start_external_servers() self.tasks.create_task(self._housekeeping()) async def shutdown(self):