From 41e734caeb56f663dc8aaff271a8faedf9552ebc Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Fri, 20 Jul 2018 12:40:57 +0800 Subject: [PATCH] Clean up controller interface with other parts --- electrumx/server/controller.py | 7 ++-- electrumx/server/peers.py | 17 ++++----- electrumx/server/session.py | 64 ++++++++++++++++++---------------- 3 files changed, 45 insertions(+), 43 deletions(-) diff --git a/electrumx/server/controller.py b/electrumx/server/controller.py index 43346df..8ed3abe 100644 --- a/electrumx/server/controller.py +++ b/electrumx/server/controller.py @@ -49,11 +49,10 @@ class Controller(ServerBase): '''Start the RPC server and wait for the mempool to synchronize. Then start the peer manager and serving external clients. ''' - await self.session_mgr.start_rpc_server() + self.session_mgr.start_rpc_server() await self.chain_state.wait_for_mempool() - self.tasks.create_task(self.peer_mgr.main_loop()) - self.tasks.create_task(self.session_mgr.start_serving()) - self.tasks.create_task(self.session_mgr.housekeeping()) + self.peer_mgr.start_peer_discovery() + self.session_mgr.start_serving() async def shutdown(self): '''Perform the shutdown sequence.''' diff --git a/electrumx/server/peers.py b/electrumx/server/peers.py index d4b9235..2d2384f 100644 --- a/electrumx/server/peers.py +++ b/electrumx/server/peers.py @@ -483,20 +483,21 @@ class PeerManager(object): None.''' return self.proxy.peername if self.proxy else None - async def main_loop(self): + def start_peer_discovery(self): + if self.env.peer_discovery == self.env.PD_ON: + self.logger.info(f'beginning peer discovery. Force use of ' + f'proxy: {self.env.force_proxy}') + self.tasks.create_task(self.peer_discovery_loop()) + else: + self.logger.info('peer discovery is disabled') + + async def peer_discovery_loop(self): '''Main loop performing peer maintenance. This includes 1) Forgetting unreachable peers. 2) Verifying connectivity of new peers. 3) Retrying old peers at regular intervals. ''' - if self.env.peer_discovery != self.env.PD_ON: - self.logger.info('peer discovery is disabled') - return - - self.logger.info('beginning peer discovery. Force use of proxy: {}' - .format(self.env.force_proxy)) - self.import_peers() await self.maybe_detect_proxy() diff --git a/electrumx/server/session.py b/electrumx/server/session.py index 9bcea6d..ea6ce49 100644 --- a/electrumx/server/session.py +++ b/electrumx/server/session.py @@ -175,6 +175,30 @@ class SessionManager(object): if server: server.close() + async def _housekeeping(self): + '''Regular housekeeping checks.''' + n = 0 + while True: + n += 1 + await asyncio.sleep(15) + if n % 10 == 0: + self._clear_stale_sessions() + + # Start listening for incoming connections if paused and + # session count has fallen + if (self.state == self.PAUSED and + len(self.sessions) <= self.low_watermark): + await self._start_external_servers() + + # Periodically log sessions + if self.env.log_sessions and time.time() > self.next_log_sessions: + if self.next_log_sessions: + data = self._session_data(for_log=True) + for line in text.sessions_lines(data): + self.logger.info(line) + self.logger.info(json.dumps(self._get_info())) + self.next_log_sessions = time.time() + self.env.log_sessions + def _group_map(self): group_map = defaultdict(list) for session in self.sessions: @@ -368,7 +392,13 @@ class SessionManager(object): # --- External Interface - async def start_serving(self): + def start_rpc_server(self): + '''Start the RPC server if enabled.''' + if self.env.rpc_port is not None: + self.tasks.create_task(self._start_server( + 'RPC', self.env.cs_host(for_rpc=True), self.env.rpc_port)) + + def start_serving(self): '''Start TCP and SSL servers.''' self.logger.info('max session count: {:,d}'.format(self.max_sessions)) self.logger.info('session timeout: {:,d} seconds' @@ -384,12 +414,8 @@ class SessionManager(object): if self.env.drop_client is not None: self.logger.info('drop clients matching: {}' .format(self.env.drop_client.pattern)) - await self._start_external_servers() - - async def start_rpc_server(self): - if self.env.rpc_port is not None: - await self._start_server('RPC', self.env.cs_host(for_rpc=True), - self.env.rpc_port) + self.tasks.create_task(self._start_external_servers()) + self.tasks.create_task(self._housekeeping()) async def shutdown(self): '''Close servers and sessions.''' @@ -417,30 +443,6 @@ class SessionManager(object): if session_touched is not None: self.tasks.create_task(session.notify_async(session_touched)) - async def housekeeping(self): - '''Regular housekeeping checks.''' - n = 0 - while True: - n += 1 - await asyncio.sleep(15) - if n % 10 == 0: - self._clear_stale_sessions() - - # Start listening for incoming connections if paused and - # session count has fallen - if (self.state == self.PAUSED and - len(self.sessions) <= self.low_watermark): - await self._start_external_servers() - - # Periodically log sessions - if self.env.log_sessions and time.time() > self.next_log_sessions: - if self.next_log_sessions: - data = self._session_data(for_log=True) - for line in text.sessions_lines(data): - self.logger.info(line) - self.logger.info(json.dumps(self._get_info())) - self.next_log_sessions = time.time() + self.env.log_sessions - def add_session(self, session): self.sessions.add(session) if (len(self.sessions) >= self.max_sessions