Browse Source

Cleaner shutdown

Use aiorpcX task functionality

Shut down peer sessions cleanly
patch-2
Neil Booth 7 years ago
parent
commit
4eebf420e8
  1. 8
      docs/changelog.rst
  2. 2
      server/block_processor.py
  3. 64
      server/controller.py
  4. 36
      server/peers.py
  5. 2
      setup.py

8
docs/changelog.rst

@ -1,6 +1,14 @@
ChangeLog
=========
Version 1.4.1
-------------
* minor bugfixes - cleaner shutdown; group handling
* set PROTOCOL_MIN to 1.0; this will prevent 2.9.x clients from connecting
and encourage upgrades to more recent clients without the security hole
* requires aiorpcx 0.5.4
Version 1.4
-----------

2
server/block_processor.py

@ -198,7 +198,7 @@ class BlockProcessor(server.db.DB):
async def main_loop(self):
'''Main loop for block processing.'''
self.controller.ensure_future(self.prefetcher.main_loop())
self.controller.create_task(self.prefetcher.main_loop())
await self.prefetcher.reset_height()
while True:

64
server/controller.py

@ -19,7 +19,7 @@ from functools import partial
import pylru
from aiorpcx import RPCError
from aiorpcx import RPCError, TaskSet
from lib.hash import double_sha256, hash_to_str, hex_str_to_hash
from lib.peer import Peer
from lib.server_base import ServerBase
@ -48,7 +48,7 @@ class Controller(ServerBase):
CATCHING_UP, LISTENING, PAUSED, SHUTTING_DOWN = range(4)
PROTOCOL_MIN = '1.0'
PROTOCOL_MAX = '1.2'
VERSION = 'ElectrumX 1.4'
VERSION = 'ElectrumX 1.4.1'
def __init__(self, env):
'''Initialize everything that doesn't require the event loop.'''
@ -60,6 +60,7 @@ class Controller(ServerBase):
self.coin = env.coin
self.servers = {}
self.tasks = TaskSet()
self.sessions = set()
self.cur_group = SessionGroup(0)
self.txs_sent = 0
@ -68,7 +69,6 @@ class Controller(ServerBase):
self.max_sessions = env.max_sessions
self.low_watermark = self.max_sessions * 19 // 20
self.max_subs = env.max_subs
self.futures = {}
# Cache some idea of room to avoid recounting on each subscription
self.subs_room = 0
self.next_stale_check = 0
@ -127,25 +127,23 @@ class Controller(ServerBase):
await self.start_server('RPC', self.env.cs_host(for_rpc=True),
self.env.rpc_port)
self.ensure_future(self.bp.main_loop())
self.ensure_future(self.wait_for_bp_catchup())
self.create_task(self.bp.main_loop())
self.create_task(self.wait_for_bp_catchup())
async def shutdown(self):
'''Perform the shutdown sequence.'''
self.state = self.SHUTTING_DOWN
# Close servers and sessions
# Close servers and sessions, and cancel all tasks
self.close_servers(list(self.servers.keys()))
for session in self.sessions:
self.close_session(session)
self.tasks.cancel_all()
# Cancel pending futures
for future in self.futures:
future.cancel()
# Wait for all futures to finish
while not all(future.done() for future in self.futures):
await asyncio.sleep(0.1)
# Wait for the above to take effect
await self.tasks.wait()
for session in list(self.sessions):
await session.wait_closed()
# Finally shut down the block processor and executor
self.bp.shutdown(self.executor)
@ -175,27 +173,21 @@ class Controller(ServerBase):
def schedule_executor(self, func, *args):
'''Schedule running func in the executor, return a task.'''
return self.ensure_future(self.run_in_executor(func, *args))
return self.create_task(self.run_in_executor(func, *args))
def ensure_future(self, coro, callback=None):
def create_task(self, coro, callback=None):
'''Schedule the coro to be run.'''
future = asyncio.ensure_future(coro)
future.add_done_callback(self.on_future_done)
self.futures[future] = callback
return future
def on_future_done(self, future):
'''Collect the result of a future after removing it from our set.'''
callback = self.futures.pop(future)
task = self.tasks.create_task(coro)
task.add_done_callback(callback or self.check_task_exception)
return task
def check_task_exception(self, task):
'''Check a task for exceptions.'''
try:
if callback:
callback(future)
else:
future.result()
except asyncio.CancelledError:
pass
except Exception:
self.logger.error(traceback.format_exc())
if not task.cancelled():
task.result()
except Exception as e:
self.logger.exception(f'uncaught task exception: {e}')
async def housekeeping(self):
'''Regular housekeeping checks.'''
@ -226,11 +218,11 @@ class Controller(ServerBase):
synchronize, then kick off server background processes.'''
await self.bp.caught_up_event.wait()
self.logger.info('block processor has caught up')
self.ensure_future(self.mempool.main_loop())
self.create_task(self.mempool.main_loop())
await self.mempool.synchronized_event.wait()
self.ensure_future(self.peer_mgr.main_loop())
self.ensure_future(self.log_start_external_servers())
self.ensure_future(self.housekeeping())
self.create_task(self.peer_mgr.main_loop())
self.create_task(self.log_start_external_servers())
self.create_task(self.housekeeping())
def close_servers(self, kinds):
'''Close the servers of the given kinds (TCP etc.).'''
@ -309,7 +301,7 @@ class Controller(ServerBase):
continue
session_touched = session.notify(height, touched)
if session_touched is not None:
self.ensure_future(session.notify_async(session_touched))
self.create_task(session.notify_async(session_touched))
def notify_peers(self, updates):
'''Notify of peer updates.'''

36
server/peers.py

@ -30,6 +30,8 @@ WAKEUP_SECS = 300
class PeerSession(aiorpcx.ClientSession):
'''An outgoing session to a peer.'''
sessions = set()
def __init__(self, peer, peer_mgr, kind, host, port, **kwargs):
super().__init__(host, port, **kwargs)
self.peer = peer
@ -42,6 +44,7 @@ class PeerSession(aiorpcx.ClientSession):
def connection_made(self, transport):
'''Handle an incoming client connection.'''
super().connection_made(transport)
self.sessions.add(self)
# Update IP address if not Tor
if not self.peer.is_tor:
@ -54,6 +57,11 @@ class PeerSession(aiorpcx.ClientSession):
self.send_request('server.version', controller.server_version_args(),
self.on_version, timeout=self.timeout)
def connection_lost(self, exc):
'''Handle an incoming client connection.'''
super().connection_lost(exc)
self.sessions.remove(self)
def _header_notification(self, header):
pass
@ -427,10 +435,6 @@ class PeerManager(object):
for real_name in coin_peers]
self.add_peers(peers, limit=None)
def ensure_future(self, coro, callback=None):
'''Schedule the coro to be run.'''
return self.controller.ensure_future(coro, callback=callback)
async def maybe_detect_proxy(self):
'''Detect a proxy if we don't have one and some time has passed since
the last attempt.
@ -477,12 +481,17 @@ class PeerManager(object):
self.import_peers()
await self.maybe_detect_proxy()
try:
while True:
timeout = self.loop.call_later(WAKEUP_SECS, self.retry_event.set)
timeout = self.loop.call_later(WAKEUP_SECS,
self.retry_event.set)
await self.retry_event.wait()
self.retry_event.clear()
timeout.cancel()
await self.retry_peers()
finally:
for session in list(PeerSession.sessions):
await session.wait_closed()
def is_coin_onion_peer(self, peer):
'''Return true if this peer is a hard-coded onion peer.'''
@ -541,22 +550,19 @@ class PeerManager(object):
kwargs['local_addr'] = (host, None)
session = PeerSession(peer, self, kind, peer.host, port, **kwargs)
callback = partial(self.on_connected, session, peer, port_pairs)
self.ensure_future(session.create_connection(), callback)
callback = partial(self.on_connected, peer, port_pairs)
self.controller.create_task(session.create_connection(), callback)
def on_connected(self, session, peer, port_pairs, future):
def on_connected(self, peer, port_pairs, task):
'''Called when a connection attempt succeeds or fails.
If failed, close the session, log it and try remaining port pairs.
'''
exception = future.exception()
if exception:
session.close()
if not task.cancelled() and task.exception():
kind, port = port_pairs.pop(0)
self.logger.info('failed connecting to {} at {} port {:d} '
'in {:.1f}s: {}'
.format(peer, kind, port,
time.time() - peer.last_try, exception))
elapsed = time.time() - peer.last_try
self.logger.info(f'failed connecting to {peer} at {kind} port '
f'{port} in {elapsed:.1f}s: {task.exception()}')
if port_pairs:
self.retry_peer(peer, port_pairs)
else:

2
setup.py

@ -11,7 +11,7 @@ setuptools.setup(
# "x11_hash" package (1.4) is required to sync DASH network.
# "tribus_hash" package is required to sync Denarius network.
# "blake256" package is required to sync Decred network.
install_requires=['aiorpcX >= 0.5.3', 'plyvel', 'pylru', 'aiohttp >= 1'],
install_requires=['aiorpcX >= 0.5.4', 'plyvel', 'pylru', 'aiohttp >= 1'],
packages=setuptools.find_packages(exclude=['tests']),
description='ElectrumX Server',
author='Neil Booth',

Loading…
Cancel
Save