Browse Source

Split out lib/server_base.py

patch-1
Neil Booth 7 years ago
parent
commit
366c556c5f
  1. 128
      lib/server_base.py
  2. 161
      server/controller.py

128
lib/server_base.py

@ -0,0 +1,128 @@
# Copyright (c) 2017, Neil Booth
#
# All rights reserved.
#
# See the file "LICENCE" for information about the copyright
# and warranty status of this software.
import asyncio
import os
import signal
import sys
import time
from functools import partial
import lib.util as util
class ServerBase(util.LoggedClass):
'''Base class server implementation.
Derived classes are expected to:
- set PYTHON_MIN_VERSION and SUPPRESS_MESSAGES as appropriate
- implement the start_servers() coroutine, called from the run() method.
Upon return the event loop runs until the shutdown signal is received.
- implement the shutdown() coroutine
'''
SUPPRESS_MESSAGES = [
'Fatal read error on socket transport',
'Fatal write error on socket transport',
]
PYTHON_MIN_VERSION = (3, 6)
def __init__(self, env):
'''Save the environment, perform basic sanity checks, and set the
event loop policy.
'''
super().__init__()
self.env = env
# Sanity checks
if sys.version_info < self.PYTHON_MIN_VERSION:
mvs = '.'.join(str(part) for part in self.PYTHON_MIN_VERSION)
raise RuntimeError('Python version >= {} is required'.format(mvs))
if os.geteuid() == 0 and not env.allow_root:
raise RuntimeError('RUNNING AS ROOT IS STRONGLY DISCOURAGED!\n'
'You shoud create an unprivileged user account '
'and use that.\n'
'To continue as root anyway, restart with '
'environment variable ALLOW_ROOT non-empty')
# First asyncio operation must be to set the event loop policy
# as this replaces the event loop
self.logger.info('event loop policy: {}'.format(self.env.loop_policy))
asyncio.set_event_loop_policy(self.env.loop_policy)
# Trigger this event to cleanly shutdown
self.shutdown_event = asyncio.Event()
async def start_servers(self):
'''Override to perform initialization that requires the event loop,
and start servers.'''
pass
async def shutdown(self):
'''Override to perform the shutdown sequence, if any.'''
pass
async def _wait_for_shutdown_event(self):
'''Wait for shutdown to be signalled, and log it.
Derived classes may want to provide a shutdown() coroutine.'''
# Shut down cleanly after waiting for shutdown to be signalled
await self.shutdown_event.wait()
self.logger.info('shutting down')
# Wait for the shutdown sequence
await self.shutdown()
# Finally, work around an apparent asyncio bug that causes log
# spew on shutdown for partially opened SSL sockets
try:
del asyncio.sslproto._SSLProtocolTransport.__del__
except Exception:
pass
self.logger.info('shutdown complete')
def on_signal(self, signame):
'''Call on receipt of a signal to cleanly shutdown.'''
self.logger.warning('received {} signal, initiating shutdown'
.format(signame))
self.shutdown_event.set()
def on_exception(self, loop, context):
'''Suppress spurious messages it appears we cannot control.'''
message = context.get('message')
if message in self.SUPPRESS_MESSAGES:
return
if 'accept_connection2()' in repr(context.get('task')):
return
loop.default_exception_handler(context)
def run(self):
'''Run the server application:
- record start time
- set the event loop policy as specified by the environment
- install SIGINT and SIGKILL handlers to trigger shutdown_event
- set loop's exception handler to suppress unwanted messages
- run the event loop until start_servers() completes
- run the event loop until shutdown is signalled
'''
self.start_time = time.time()
loop = asyncio.get_event_loop()
for signame in ('SIGINT', 'SIGTERM'):
loop.add_signal_handler(getattr(signal, signame),
partial(self.on_signal, signame))
loop.set_exception_handler(self.on_exception)
loop.run_until_complete(self.start_servers())
loop.run_until_complete(self._wait_for_shutdown_event())
loop.close()

161
server/controller.py

@ -8,9 +8,7 @@
import asyncio import asyncio
import json import json
import os import os
import signal
import ssl import ssl
import sys
import time import time
import traceback import traceback
from bisect import bisect_left from bisect import bisect_left
@ -23,6 +21,7 @@ import pylru
from lib.jsonrpc import JSONSessionBase, RPCError from lib.jsonrpc import JSONSessionBase, RPCError
from lib.hash import double_sha256, hash_to_str, hex_str_to_hash from lib.hash import double_sha256, hash_to_str, hex_str_to_hash
from lib.peer import Peer from lib.peer import Peer
from lib.server_base import ServerBase
import lib.util as util import lib.util as util
from server.daemon import DaemonError from server.daemon import DaemonError
from server.mempool import MemPool from server.mempool import MemPool
@ -30,50 +29,22 @@ from server.peers import PeerManager
from server.session import LocalRPC from server.session import LocalRPC
class Controller(util.LoggedClass): class Controller(ServerBase):
'''Manages the client servers, a mempool, and a block processor. '''Manages the client servers, a mempool, and a block processor.
Servers are started immediately the block processor first catches Servers are started immediately the block processor first catches
up with the daemon. up with the daemon.
''' '''
PYTHON_MIN_VERSION = (3, 5, 3)
BANDS = 5 BANDS = 5
CATCHING_UP, LISTENING, PAUSED, SHUTTING_DOWN = range(4) CATCHING_UP, LISTENING, PAUSED, SHUTTING_DOWN = range(4)
SUPPRESS_MESSAGES = [
'Fatal read error on socket transport',
'Fatal write error on socket transport',
]
def __init__(self, env): def __init__(self, env):
super().__init__() '''Initialize everything that doesn't require the event loop.'''
super().__init__(env)
# Sanity checks
if sys.version_info < (3, 5, 3):
raise RuntimeError('Python >= 3.5.3 is required to run ElectrumX')
if os.geteuid() == 0 and not env.allow_root:
raise RuntimeError('RUNNING AS ROOT IS STRONGLY DISCOURAGED!\n'
'You shoud create an unprivileged user account '
'and use that.\n'
'To continue as root anyway, restart with '
'environment variable ALLOW_ROOT non-empty')
# Set the event loop policy before doing anything asyncio
self.logger.info('event loop policy: {}'.format(env.loop_policy))
asyncio.set_event_loop_policy(env.loop_policy)
self.loop = asyncio.get_event_loop()
# Set this event to cleanly shutdown
self.shutdown_event = asyncio.Event()
self.executor = ThreadPoolExecutor()
self.loop.set_default_executor(self.executor)
self.start_time = time.time()
self.coin = env.coin self.coin = env.coin
self.daemon = self.coin.DAEMON(env)
self.bp = self.coin.BLOCK_PROCESSOR(env, self, self.daemon)
self.mempool = MemPool(self.bp, self)
self.peer_mgr = PeerManager(env, self)
self.env = env
self.servers = {} self.servers = {}
# Map of session to the key of its list in self.groups # Map of session to the key of its list in self.groups
self.sessions = {} self.sessions = {}
@ -98,6 +69,47 @@ class Controller(util.LoggedClass):
'sessions stop'.split()) 'sessions stop'.split())
self.rpc_handlers = {cmd: getattr(self, 'rpc_' + cmd) for cmd in cmds} self.rpc_handlers = {cmd: getattr(self, 'rpc_' + cmd) for cmd in cmds}
self.loop = asyncio.get_event_loop()
self.executor = ThreadPoolExecutor()
self.loop.set_default_executor(self.executor)
# The complex objects. Note PeerManager references self.loop (ugh)
self.daemon = self.coin.DAEMON(env)
self.bp = self.coin.BLOCK_PROCESSOR(env, self, self.daemon)
self.mempool = MemPool(self.bp, self)
self.peer_mgr = PeerManager(env, self)
async def start_servers(self):
'''Start the RPC server and schedule the external servers to be
started once the block processor has caught up.
'''
if self.env.rpc_port is not None:
await self.start_server('RPC', self.env.cs_host(for_rpc=True),
self.env.rpc_port)
self.ensure_future(self.bp.main_loop())
self.ensure_future(self.wait_for_bp_catchup())
async def shutdown(self):
'''Perform the shutdown sequence.'''
self.state = self.SHUTTING_DOWN
# Close servers and sessions
self.close_servers(list(self.servers.keys()))
for session in self.sessions:
self.close_session(session)
# Cancel pending futures
for future in self.futures:
future.cancel()
# Wait for all futures to finish
while not all(future.done() for future in self.futures):
await asyncio.sleep(0.1)
# Finally shut down the block processor and executor
self.bp.shutdown(self.executor)
async def mempool_transactions(self, hashX): async def mempool_transactions(self, hashX):
'''Generate (hex_hash, tx_fee, unconfirmed) tuples for mempool '''Generate (hex_hash, tx_fee, unconfirmed) tuples for mempool
entries for the hashX. entries for the hashX.
@ -195,58 +207,16 @@ class Controller(util.LoggedClass):
self.next_log_sessions = time.time() + self.env.log_sessions self.next_log_sessions = time.time() + self.env.log_sessions
async def wait_for_bp_catchup(self): async def wait_for_bp_catchup(self):
'''Called when the block processor catches up.''' '''Wait for the block processor to catch up, then kick off server
background processes.'''
await self.bp.caught_up_event.wait() await self.bp.caught_up_event.wait()
self.logger.info('block processor has caught up') self.logger.info('block processor has caught up')
self.ensure_future(self.peer_mgr.main_loop()) self.ensure_future(self.peer_mgr.main_loop())
self.ensure_future(self.start_servers()) self.ensure_future(self.log_start_external_servers())
self.ensure_future(self.housekeeping()) self.ensure_future(self.housekeeping())
self.ensure_future(self.mempool.main_loop()) self.ensure_future(self.mempool.main_loop())
self.ensure_future(self.notify()) self.ensure_future(self.notify())
async def main_loop(self):
'''Controller main loop.'''
if self.env.rpc_port is not None:
await self.start_server('RPC', self.env.cs_host(for_rpc=True),
self.env.rpc_port)
self.ensure_future(self.bp.main_loop())
self.ensure_future(self.wait_for_bp_catchup())
# Shut down cleanly after waiting for shutdown to be signalled
await self.shutdown_event.wait()
self.logger.info('shutting down')
await self.shutdown()
# Avoid log spew on shutdown for partially opened SSL sockets
try:
del asyncio.sslproto._SSLProtocolTransport.__del__
except Exception:
pass
self.logger.info('shutdown complete')
def initiate_shutdown(self):
'''Call this function to start the shutdown process.'''
self.shutdown_event.set()
async def shutdown(self):
'''Perform the shutdown sequence.'''
self.state = self.SHUTTING_DOWN
# Close servers and sessions
self.close_servers(list(self.servers.keys()))
for session in self.sessions:
self.close_session(session)
# Cancel pending futures
for future in self.futures:
future.cancel()
# Wait for all futures to finish
while not all(future.done() for future in self.futures):
await asyncio.sleep(0.1)
# Finally shut down the block processor and executor
self.bp.shutdown(self.executor)
def close_servers(self, kinds): def close_servers(self, kinds):
'''Close the servers of the given kinds (TCP etc.).''' '''Close the servers of the given kinds (TCP etc.).'''
if kinds: if kinds:
@ -272,7 +242,7 @@ class Controller(util.LoggedClass):
self.logger.info('{} server listening on {}:{:d}' self.logger.info('{} server listening on {}:{:d}'
.format(kind, host, port)) .format(kind, host, port))
async def start_servers(self): async def log_start_external_servers(self):
'''Start TCP and SSL servers.''' '''Start TCP and SSL servers.'''
self.logger.info('max session count: {:,d}'.format(self.max_sessions)) self.logger.info('max session count: {:,d}'.format(self.max_sessions))
self.logger.info('session timeout: {:,d} seconds' self.logger.info('session timeout: {:,d} seconds'
@ -628,7 +598,7 @@ class Controller(util.LoggedClass):
def rpc_stop(self): def rpc_stop(self):
'''Shut down the server cleanly.''' '''Shut down the server cleanly.'''
self.initiate_shutdown() self.shutdown_event.set()
return 'stopping' return 'stopping'
def rpc_getinfo(self): def rpc_getinfo(self):
@ -907,32 +877,3 @@ class Controller(util.LoggedClass):
if index >= len(tx.outputs): if index >= len(tx.outputs):
return None return None
return self.coin.address_from_script(tx.outputs[index].pk_script) return self.coin.address_from_script(tx.outputs[index].pk_script)
# Signal, exception handlers.
def on_signal(self, signame):
'''Call on receipt of a signal to cleanly shutdown.'''
self.logger.warning('received {} signal, initiating shutdown'
.format(signame))
self.initiate_shutdown()
def on_exception(self, loop, context):
'''Suppress spurious messages it appears we cannot control.'''
message = context.get('message')
if message not in self.SUPPRESS_MESSAGES:
if not ('task' in context and
'accept_connection2()' in repr(context.get('task'))):
loop.default_exception_handler(context)
def run(self):
# Install signal handlers and exception handler
loop = self.loop
for signame in ('SIGINT', 'SIGTERM'):
loop.add_signal_handler(getattr(signal, signame),
partial(self.on_signal, signame))
loop.set_exception_handler(self.on_exception)
# Run the main loop to completion
future = asyncio.ensure_future(self.main_loop())
loop.run_until_complete(future)
loop.close()

Loading…
Cancel
Save