Browse Source

Clean up controller interface with other parts

patch-2
Neil Booth 7 years ago
parent
commit
41e734caeb
  1. 7
      electrumx/server/controller.py
  2. 17
      electrumx/server/peers.py
  3. 64
      electrumx/server/session.py

7
electrumx/server/controller.py

@ -49,11 +49,10 @@ class Controller(ServerBase):
'''Start the RPC server and wait for the mempool to synchronize. Then
start the peer manager and serving external clients.
'''
await self.session_mgr.start_rpc_server()
self.session_mgr.start_rpc_server()
await self.chain_state.wait_for_mempool()
self.tasks.create_task(self.peer_mgr.main_loop())
self.tasks.create_task(self.session_mgr.start_serving())
self.tasks.create_task(self.session_mgr.housekeeping())
self.peer_mgr.start_peer_discovery()
self.session_mgr.start_serving()
async def shutdown(self):
'''Perform the shutdown sequence.'''

17
electrumx/server/peers.py

@ -483,20 +483,21 @@ class PeerManager(object):
None.'''
return self.proxy.peername if self.proxy else None
async def main_loop(self):
def start_peer_discovery(self):
if self.env.peer_discovery == self.env.PD_ON:
self.logger.info(f'beginning peer discovery. Force use of '
f'proxy: {self.env.force_proxy}')
self.tasks.create_task(self.peer_discovery_loop())
else:
self.logger.info('peer discovery is disabled')
async def peer_discovery_loop(self):
'''Main loop performing peer maintenance. This includes
1) Forgetting unreachable peers.
2) Verifying connectivity of new peers.
3) Retrying old peers at regular intervals.
'''
if self.env.peer_discovery != self.env.PD_ON:
self.logger.info('peer discovery is disabled')
return
self.logger.info('beginning peer discovery. Force use of proxy: {}'
.format(self.env.force_proxy))
self.import_peers()
await self.maybe_detect_proxy()

64
electrumx/server/session.py

@ -175,6 +175,30 @@ class SessionManager(object):
if server:
server.close()
async def _housekeeping(self):
'''Regular housekeeping checks.'''
n = 0
while True:
n += 1
await asyncio.sleep(15)
if n % 10 == 0:
self._clear_stale_sessions()
# Start listening for incoming connections if paused and
# session count has fallen
if (self.state == self.PAUSED and
len(self.sessions) <= self.low_watermark):
await self._start_external_servers()
# Periodically log sessions
if self.env.log_sessions and time.time() > self.next_log_sessions:
if self.next_log_sessions:
data = self._session_data(for_log=True)
for line in text.sessions_lines(data):
self.logger.info(line)
self.logger.info(json.dumps(self._get_info()))
self.next_log_sessions = time.time() + self.env.log_sessions
def _group_map(self):
group_map = defaultdict(list)
for session in self.sessions:
@ -368,7 +392,13 @@ class SessionManager(object):
# --- External Interface
async def start_serving(self):
def start_rpc_server(self):
'''Start the RPC server if enabled.'''
if self.env.rpc_port is not None:
self.tasks.create_task(self._start_server(
'RPC', self.env.cs_host(for_rpc=True), self.env.rpc_port))
def start_serving(self):
'''Start TCP and SSL servers.'''
self.logger.info('max session count: {:,d}'.format(self.max_sessions))
self.logger.info('session timeout: {:,d} seconds'
@ -384,12 +414,8 @@ class SessionManager(object):
if self.env.drop_client is not None:
self.logger.info('drop clients matching: {}'
.format(self.env.drop_client.pattern))
await self._start_external_servers()
async def start_rpc_server(self):
if self.env.rpc_port is not None:
await self._start_server('RPC', self.env.cs_host(for_rpc=True),
self.env.rpc_port)
self.tasks.create_task(self._start_external_servers())
self.tasks.create_task(self._housekeeping())
async def shutdown(self):
'''Close servers and sessions.'''
@ -417,30 +443,6 @@ class SessionManager(object):
if session_touched is not None:
self.tasks.create_task(session.notify_async(session_touched))
async def housekeeping(self):
'''Regular housekeeping checks.'''
n = 0
while True:
n += 1
await asyncio.sleep(15)
if n % 10 == 0:
self._clear_stale_sessions()
# Start listening for incoming connections if paused and
# session count has fallen
if (self.state == self.PAUSED and
len(self.sessions) <= self.low_watermark):
await self._start_external_servers()
# Periodically log sessions
if self.env.log_sessions and time.time() > self.next_log_sessions:
if self.next_log_sessions:
data = self._session_data(for_log=True)
for line in text.sessions_lines(data):
self.logger.info(line)
self.logger.info(json.dumps(self._get_info()))
self.next_log_sessions = time.time() + self.env.log_sessions
def add_session(self, session):
self.sessions.add(session)
if (len(self.sessions) >= self.max_sessions

Loading…
Cancel
Save