From 9b4276c68d73a3b549caa587c39e801f72f426f7 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Wed, 8 Aug 2018 12:28:46 +0900 Subject: [PATCH] Have peers.py use its own task group --- electrumx/server/peers.py | 25 +++++++++++++++---------- electrumx/server/session.py | 4 ++-- 2 files changed, 17 insertions(+), 12 deletions(-) diff --git a/electrumx/server/peers.py b/electrumx/server/peers.py index 64e5fd7..3bb208b 100644 --- a/electrumx/server/peers.py +++ b/electrumx/server/peers.py @@ -16,8 +16,8 @@ from collections import defaultdict, Counter from aiorpcx import (ClientSession, SOCKSProxy, Notification, handler_invocation, - SOCKSError, RPCError, TaskTimeout, - TaskGroup, run_in_thread, ignore_after, timeout_after) + SOCKSError, RPCError, TaskTimeout, TaskGroup, Event, + sleep, run_in_thread, ignore_after, timeout_after) from electrumx.lib.peer import Peer from electrumx.lib.util import class_logger, protocol_tuple @@ -74,7 +74,7 @@ class PeerManager(object): self.peers = set() self.permit_onion_peer_time = time.time() self.proxy = None - self.task_group = None + self.group = TaskGroup() def _my_clearnet_peer(self): '''Returns the clearnet peer representing this server, if any.''' @@ -150,7 +150,7 @@ class PeerManager(object): self.logger.info(f'detected {proxy}') return self.logger.info('no proxy detected, will try later') - await asyncio.sleep(900) + await sleep(900) async def _note_peers(self, peers, limit=2, check_ports=False, source=None): @@ -178,9 +178,9 @@ class PeerManager(object): use_peers = new_peers for peer in use_peers: self.logger.info(f'accepted new peer {peer} from {source}') - peer.retry_event = asyncio.Event() + peer.retry_event = Event() self.peers.add(peer) - await self.task_group.spawn(self._monitor_peer(peer)) + await self.group.spawn(self._monitor_peer(peer)) async def _monitor_peer(self, peer): # Stop monitoring if we were dropped (a duplicate peer) @@ -372,7 +372,7 @@ class PeerManager(object): # # External interface # - async def discover_peers(self, task_group): + async def discover_peers(self): '''Perform peer maintenance. This includes 1) Forgetting unreachable peers. @@ -385,9 +385,14 @@ class PeerManager(object): self.logger.info(f'beginning peer discovery. Force use of ' f'proxy: {self.env.force_proxy}') - self.task_group = task_group - await task_group.spawn(self._detect_proxy()) - await task_group.spawn(self._import_peers()) + forever = Event() + async with self.group as group: + await group.spawn(forever.wait()) + await group.spawn(self._detect_proxy()) + await group.spawn(self._import_peers()) + # Consume tasks as they complete + async for task in group: + task.result() def info(self): '''The number of peers.''' diff --git a/electrumx/server/session.py b/electrumx/server/session.py index 83aa867..9f63027 100644 --- a/electrumx/server/session.py +++ b/electrumx/server/session.py @@ -430,8 +430,8 @@ class SessionManager(object): await self._start_external_servers() # Peer discovery should start after the external servers # because we connect to ourself - async with TaskGroup(wait=object) as group: - await group.spawn(self.peer_mgr.discover_peers(group)) + async with TaskGroup() as group: + await group.spawn(self.peer_mgr.discover_peers()) await group.spawn(self._clear_stale_sessions()) await group.spawn(self._log_sessions()) await group.spawn(self._restart_if_paused())