@ -38,6 +38,7 @@ class ServerManager(util.LoggedClass):
'''
BANDS = 5
CATCHING_UP , LISTENING , PAUSED , SHUTTING_DOWN = range ( 4 )
class NotificationRequest ( RequestBase ) :
def __init__ ( self , height , touched ) :
@ -60,11 +61,14 @@ class ServerManager(util.LoggedClass):
self . touched , self . touched_event )
self . irc = IRC ( env )
self . env = env
self . servers = [ ]
self . servers = { }
self . sessions = { }
self . groups = defaultdict ( set )
self . txs_sent = 0
self . next_log_sessions = 0
self . state = self . CATCHING_UP
self . max_sessions = env . max_sessions
self . low_watermark = self . max_sessions * 19 / / 20
self . max_subs = env . max_subs
self . subscription_count = 0
self . next_stale_check = 0
@ -77,6 +81,7 @@ class ServerManager(util.LoggedClass):
self . futures = [ ]
env . max_send = max ( 350000 , env . max_send )
self . setup_bands ( )
self . logger . info ( ' max session count: {:,d} ' . format ( self . max_sessions ) )
self . logger . info ( ' session timeout: {:,d} seconds '
. format ( env . session_timeout ) )
self . logger . info ( ' session bandwidth limit {:,d} bytes '
@ -123,16 +128,23 @@ class ServerManager(util.LoggedClass):
+ bisect_left ( self . bands , group_bandwidth ) + 1 ) / / 2
async def enqueue_delayed_sessions ( self ) :
now = time . time ( )
keep = [ ]
for pair in self . delayed_sessions :
timeout , session = pair
if timeout < = now :
self . queue . put_nowait ( session )
else :
keep . append ( pair )
self . delayed_sessions = keep
await asyncio . sleep ( 1 )
while True :
now = time . time ( )
keep = [ ]
for pair in self . delayed_sessions :
timeout , session = pair
if not session . pause and timeout < = now :
self . queue . put_nowait ( session )
else :
keep . append ( pair )
self . delayed_sessions = keep
# If paused and session count has fallen, start listening again
if ( len ( self . sessions ) < = self . low_watermark
and self . state == self . PAUSED ) :
await self . start_external_servers ( )
await asyncio . sleep ( 1 )
def enqueue_session ( self , session ) :
# Might have disconnected whilst waiting
@ -143,8 +155,6 @@ class ServerManager(util.LoggedClass):
self . next_queue_id + = 1
secs = int ( session . pause )
if secs :
session . log_info ( ' delaying processing whilst paused ' )
excess = priority - self . BANDS
if excess > 0 :
secs = excess
@ -185,6 +195,14 @@ class ServerManager(util.LoggedClass):
await self . shutdown ( )
await asyncio . sleep ( 1 )
def close_servers ( self , kinds ) :
''' Close the servers of the given kinds (TCP etc.). '''
for kind in kinds :
server = self . servers . pop ( kind , None )
if server :
server . close ( )
# Don't bother awaiting the close - we're not async
async def start_server ( self , kind , * args , * * kw_args ) :
protocol_class = LocalRPC if kind == ' RPC ' else ElectrumX
protocol = partial ( protocol_class , self , self . bp , self . env , kind )
@ -192,7 +210,7 @@ class ServerManager(util.LoggedClass):
host , port = args [ : 2 ]
try :
self . servers . append ( await server )
self . servers [ kind ] = await server
except Exception as e :
self . logger . error ( ' {} server failed to listen on {} : {:d} : {} '
. format ( kind , host , port , e ) )
@ -201,21 +219,22 @@ class ServerManager(util.LoggedClass):
. format ( kind , host , port ) )
async def start_servers ( self , caught_up ) :
''' Connect to IRC and start listening for incoming connections.
Only connect to IRC if enabled . Start listening on RCP , TCP
and SSL ports only if the port wasn ' t pecified. Waits for the
caught_up event to be signalled .
'''
''' Start RPC, TCP and SSL servers once caught up. '''
await caught_up . wait ( )
env = self . env
if env . rpc_port is not None :
await self . start_server ( ' RPC ' , ' localhost ' , env . rpc_port )
if self . env . rpc_port is not None :
await self . start_server ( ' RPC ' , ' localhost ' , self . env . rpc_port )
await self . start_external_servers ( )
async def start_external_servers ( self ) :
''' Start listening on TCP and SSL ports, but only if the respective
port was given in the environment .
'''
self . state = self . LISTENING
env = self . env
if env . tcp_port is not None :
await self . start_server ( ' TCP ' , env . host , env . tcp_port )
if env . ssl_port is not None :
# Python 3.5.3: use PROTOCOL_TLS
sslc = ssl . SSLContext ( ssl . PROTOCOL_SSLv23 )
@ -282,15 +301,13 @@ class ServerManager(util.LoggedClass):
return history
async def shutdown ( self ) :
''' Call to shutdown the servers. Returns when done. '''
''' Call to shutdown everything. Returns when done. '''
self . state = self . SHUTTING_DOWN
self . close_servers ( list ( self . servers . keys ( ) ) )
self . bp . shutdown ( )
# Don't cancel the block processor main loop - let it close itself
for future in self . futures [ 1 : ] :
future . cancel ( )
for server in self . servers :
server . close ( )
await server . wait_closed ( )
self . servers = [ ] # So add_session closes new sessions
if self . sessions :
await self . close_sessions ( )
@ -308,9 +325,6 @@ class ServerManager(util.LoggedClass):
. format ( len ( self . sessions ) ) )
def add_session ( self , session ) :
# Some connections are acknowledged after the servers are closed
if not self . servers :
return
now = time . time ( )
if now > self . next_stale_check :
self . next_stale_check = now + 300
@ -320,10 +334,16 @@ class ServerManager(util.LoggedClass):
self . sessions [ session ] = group
session . log_info ( ' connection from {} , {:,d} total '
. format ( session . peername ( ) , len ( self . sessions ) ) )
if ( len ( self . sessions ) > = self . max_sessions
and self . state == self . LISTENING ) :
self . state = self . PAUSED
session . log_info ( ' maximum sessions {:,d} reached, stopping new '
' connections until count drops to {:,d} '
. format ( self . max_sessions , self . low_watermark ) )
self . close_servers ( [ ' TCP ' , ' SSL ' ] )
def remove_session ( self , session ) :
# This test should always be True. However if a bug messes
# things up it prevents consequent log noise
''' Remove a session from our sessions list if there. '''
if session in self . sessions :
group = self . sessions . pop ( session )
group . remove ( session )