From 4eebf420e82973e7ac29185ceccffe51991b7a94 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Mon, 9 Apr 2018 09:07:15 +0900 Subject: [PATCH] Cleaner shutdown Use aiorpcX task functionality Shut down peer sessions cleanly --- docs/changelog.rst | 8 +++++ server/block_processor.py | 2 +- server/controller.py | 64 +++++++++++++++++---------------------- server/peers.py | 46 ++++++++++++++++------------ setup.py | 2 +- 5 files changed, 64 insertions(+), 58 deletions(-) diff --git a/docs/changelog.rst b/docs/changelog.rst index fcadcbb..1f8ad78 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -1,6 +1,14 @@ ChangeLog ========= +Version 1.4.1 +------------- + +* minor bugfixes - cleaner shutdown; group handling +* set PROTOCOL_MIN to 1.0; this will prevent 2.9.x clients from connecting + and encourage upgrades to more recent clients without the security hole +* requires aiorpcx 0.5.4 + Version 1.4 ----------- diff --git a/server/block_processor.py b/server/block_processor.py index 4b3431f..df4606f 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -198,7 +198,7 @@ class BlockProcessor(server.db.DB): async def main_loop(self): '''Main loop for block processing.''' - self.controller.ensure_future(self.prefetcher.main_loop()) + self.controller.create_task(self.prefetcher.main_loop()) await self.prefetcher.reset_height() while True: diff --git a/server/controller.py b/server/controller.py index a488eae..1445305 100644 --- a/server/controller.py +++ b/server/controller.py @@ -19,7 +19,7 @@ from functools import partial import pylru -from aiorpcx import RPCError +from aiorpcx import RPCError, TaskSet from lib.hash import double_sha256, hash_to_str, hex_str_to_hash from lib.peer import Peer from lib.server_base import ServerBase @@ -48,7 +48,7 @@ class Controller(ServerBase): CATCHING_UP, LISTENING, PAUSED, SHUTTING_DOWN = range(4) PROTOCOL_MIN = '1.0' PROTOCOL_MAX = '1.2' - VERSION = 'ElectrumX 1.4' + VERSION = 'ElectrumX 1.4.1' def __init__(self, env): '''Initialize everything that doesn't require the event loop.''' @@ -60,6 +60,7 @@ class Controller(ServerBase): self.coin = env.coin self.servers = {} + self.tasks = TaskSet() self.sessions = set() self.cur_group = SessionGroup(0) self.txs_sent = 0 @@ -68,7 +69,6 @@ class Controller(ServerBase): self.max_sessions = env.max_sessions self.low_watermark = self.max_sessions * 19 // 20 self.max_subs = env.max_subs - self.futures = {} # Cache some idea of room to avoid recounting on each subscription self.subs_room = 0 self.next_stale_check = 0 @@ -127,25 +127,23 @@ class Controller(ServerBase): await self.start_server('RPC', self.env.cs_host(for_rpc=True), self.env.rpc_port) - self.ensure_future(self.bp.main_loop()) - self.ensure_future(self.wait_for_bp_catchup()) + self.create_task(self.bp.main_loop()) + self.create_task(self.wait_for_bp_catchup()) async def shutdown(self): '''Perform the shutdown sequence.''' self.state = self.SHUTTING_DOWN - # Close servers and sessions + # Close servers and sessions, and cancel all tasks self.close_servers(list(self.servers.keys())) for session in self.sessions: self.close_session(session) + self.tasks.cancel_all() - # Cancel pending futures - for future in self.futures: - future.cancel() - - # Wait for all futures to finish - while not all(future.done() for future in self.futures): - await asyncio.sleep(0.1) + # Wait for the above to take effect + await self.tasks.wait() + for session in list(self.sessions): + await session.wait_closed() # Finally shut down the block processor and executor self.bp.shutdown(self.executor) @@ -175,27 +173,21 @@ class Controller(ServerBase): def schedule_executor(self, func, *args): '''Schedule running func in the executor, return a task.''' - return self.ensure_future(self.run_in_executor(func, *args)) + return self.create_task(self.run_in_executor(func, *args)) - def ensure_future(self, coro, callback=None): + def create_task(self, coro, callback=None): '''Schedule the coro to be run.''' - future = asyncio.ensure_future(coro) - future.add_done_callback(self.on_future_done) - self.futures[future] = callback - return future - - def on_future_done(self, future): - '''Collect the result of a future after removing it from our set.''' - callback = self.futures.pop(future) + task = self.tasks.create_task(coro) + task.add_done_callback(callback or self.check_task_exception) + return task + + def check_task_exception(self, task): + '''Check a task for exceptions.''' try: - if callback: - callback(future) - else: - future.result() - except asyncio.CancelledError: - pass - except Exception: - self.logger.error(traceback.format_exc()) + if not task.cancelled(): + task.result() + except Exception as e: + self.logger.exception(f'uncaught task exception: {e}') async def housekeeping(self): '''Regular housekeeping checks.''' @@ -226,11 +218,11 @@ class Controller(ServerBase): synchronize, then kick off server background processes.''' await self.bp.caught_up_event.wait() self.logger.info('block processor has caught up') - self.ensure_future(self.mempool.main_loop()) + self.create_task(self.mempool.main_loop()) await self.mempool.synchronized_event.wait() - self.ensure_future(self.peer_mgr.main_loop()) - self.ensure_future(self.log_start_external_servers()) - self.ensure_future(self.housekeeping()) + self.create_task(self.peer_mgr.main_loop()) + self.create_task(self.log_start_external_servers()) + self.create_task(self.housekeeping()) def close_servers(self, kinds): '''Close the servers of the given kinds (TCP etc.).''' @@ -309,7 +301,7 @@ class Controller(ServerBase): continue session_touched = session.notify(height, touched) if session_touched is not None: - self.ensure_future(session.notify_async(session_touched)) + self.create_task(session.notify_async(session_touched)) def notify_peers(self, updates): '''Notify of peer updates.''' diff --git a/server/peers.py b/server/peers.py index e57ace2..c7d1a89 100644 --- a/server/peers.py +++ b/server/peers.py @@ -30,6 +30,8 @@ WAKEUP_SECS = 300 class PeerSession(aiorpcx.ClientSession): '''An outgoing session to a peer.''' + sessions = set() + def __init__(self, peer, peer_mgr, kind, host, port, **kwargs): super().__init__(host, port, **kwargs) self.peer = peer @@ -42,6 +44,7 @@ class PeerSession(aiorpcx.ClientSession): def connection_made(self, transport): '''Handle an incoming client connection.''' super().connection_made(transport) + self.sessions.add(self) # Update IP address if not Tor if not self.peer.is_tor: @@ -54,6 +57,11 @@ class PeerSession(aiorpcx.ClientSession): self.send_request('server.version', controller.server_version_args(), self.on_version, timeout=self.timeout) + def connection_lost(self, exc): + '''Handle an incoming client connection.''' + super().connection_lost(exc) + self.sessions.remove(self) + def _header_notification(self, header): pass @@ -427,10 +435,6 @@ class PeerManager(object): for real_name in coin_peers] self.add_peers(peers, limit=None) - def ensure_future(self, coro, callback=None): - '''Schedule the coro to be run.''' - return self.controller.ensure_future(coro, callback=callback) - async def maybe_detect_proxy(self): '''Detect a proxy if we don't have one and some time has passed since the last attempt. @@ -477,12 +481,17 @@ class PeerManager(object): self.import_peers() await self.maybe_detect_proxy() - while True: - timeout = self.loop.call_later(WAKEUP_SECS, self.retry_event.set) - await self.retry_event.wait() - self.retry_event.clear() - timeout.cancel() - await self.retry_peers() + try: + while True: + timeout = self.loop.call_later(WAKEUP_SECS, + self.retry_event.set) + await self.retry_event.wait() + self.retry_event.clear() + timeout.cancel() + await self.retry_peers() + finally: + for session in list(PeerSession.sessions): + await session.wait_closed() def is_coin_onion_peer(self, peer): '''Return true if this peer is a hard-coded onion peer.''' @@ -541,22 +550,19 @@ class PeerManager(object): kwargs['local_addr'] = (host, None) session = PeerSession(peer, self, kind, peer.host, port, **kwargs) - callback = partial(self.on_connected, session, peer, port_pairs) - self.ensure_future(session.create_connection(), callback) + callback = partial(self.on_connected, peer, port_pairs) + self.controller.create_task(session.create_connection(), callback) - def on_connected(self, session, peer, port_pairs, future): + def on_connected(self, peer, port_pairs, task): '''Called when a connection attempt succeeds or fails. If failed, close the session, log it and try remaining port pairs. ''' - exception = future.exception() - if exception: - session.close() + if not task.cancelled() and task.exception(): kind, port = port_pairs.pop(0) - self.logger.info('failed connecting to {} at {} port {:d} ' - 'in {:.1f}s: {}' - .format(peer, kind, port, - time.time() - peer.last_try, exception)) + elapsed = time.time() - peer.last_try + self.logger.info(f'failed connecting to {peer} at {kind} port ' + f'{port} in {elapsed:.1f}s: {task.exception()}') if port_pairs: self.retry_peer(peer, port_pairs) else: diff --git a/setup.py b/setup.py index 0bcf2b8..3d0941a 100644 --- a/setup.py +++ b/setup.py @@ -11,7 +11,7 @@ setuptools.setup( # "x11_hash" package (1.4) is required to sync DASH network. # "tribus_hash" package is required to sync Denarius network. # "blake256" package is required to sync Decred network. - install_requires=['aiorpcX >= 0.5.3', 'plyvel', 'pylru', 'aiohttp >= 1'], + install_requires=['aiorpcX >= 0.5.4', 'plyvel', 'pylru', 'aiohttp >= 1'], packages=setuptools.find_packages(exclude=['tests']), description='ElectrumX Server', author='Neil Booth',