@ -246,7 +246,8 @@ class Network(Logger, NetworkRetryManager[ServerAddr]):
taskgroup : Optional [ TaskGroup ]
taskgroup : Optional [ TaskGroup ]
interface : Optional [ Interface ]
interface : Optional [ Interface ]
interfaces : Dict [ ServerAddr , Interface ]
interfaces : Dict [ ServerAddr , Interface ]
_connecting : Set [ ServerAddr ]
_connecting_ifaces : Set [ ServerAddr ]
_closing_ifaces : Set [ ServerAddr ]
default_server : ServerAddr
default_server : ServerAddr
_recent_servers : List [ ServerAddr ]
_recent_servers : List [ ServerAddr ]
@ -321,10 +322,16 @@ class Network(Logger, NetworkRetryManager[ServerAddr]):
# the main server we are currently communicating with
# the main server we are currently communicating with
self . interface = None
self . interface = None
self . default_server_changed_event = asyncio . Event ( )
self . default_server_changed_event = asyncio . Event ( )
# set of servers we have an ongoing connection with
# Set of servers we have an ongoing connection with.
self . interfaces = { }
# For any ServerAddr, at most one corresponding Interface object
# can exist at any given time. Depending on the state of that Interface,
# the ServerAddr can be found in one of the following sets.
# Note: during a transition, the ServerAddr can appear in two sets momentarily.
self . _connecting_ifaces = set ( )
self . interfaces = { } # these are the ifaces in "initialised and usable" state
self . _closing_ifaces = set ( )
self . auto_connect = self . config . get ( ' auto_connect ' , True )
self . auto_connect = self . config . get ( ' auto_connect ' , True )
self . _connecting = set ( )
self . proxy = None
self . proxy = None
self . _maybe_set_oneserver ( )
self . _maybe_set_oneserver ( )
@ -551,7 +558,7 @@ class Network(Logger, NetworkRetryManager[ServerAddr]):
def _get_next_server_to_try ( self ) - > Optional [ ServerAddr ] :
def _get_next_server_to_try ( self ) - > Optional [ ServerAddr ] :
now = time . time ( )
now = time . time ( )
with self . interfaces_lock :
with self . interfaces_lock :
connected_servers = set ( self . interfaces ) | self . _connecting
connected_servers = set ( self . interfaces ) | self . _connecting_ifaces | self . _closing_ifaces
# First try from recent servers. (which are persisted)
# First try from recent servers. (which are persisted)
# As these are servers we successfully connected to recently, they are
# As these are servers we successfully connected to recently, they are
# most likely to work. This also makes servers "sticky".
# most likely to work. This also makes servers "sticky".
@ -680,13 +687,9 @@ class Network(Logger, NetworkRetryManager[ServerAddr]):
# Stop any current interface in order to terminate subscriptions,
# Stop any current interface in order to terminate subscriptions,
# and to cancel tasks in interface.taskgroup.
# and to cancel tasks in interface.taskgroup.
# However, for headers sub, give preference to this interface
# over unknown ones, i.e. start it again right away.
if old_server and old_server != server :
if old_server and old_server != server :
# don't wait for old_interface to close as that might be slow:
# don't wait for old_interface to close as that might be slow:
await self . taskgroup . spawn ( self . _close_interface ( old_interface ) )
await self . taskgroup . spawn ( self . _close_interface ( old_interface ) )
if len ( self . interfaces ) < = self . num_server :
await self . taskgroup . spawn ( self . _run_new_interface ( old_server ) )
if server not in self . interfaces :
if server not in self . interfaces :
self . interface = None
self . interface = None
@ -708,15 +711,23 @@ class Network(Logger, NetworkRetryManager[ServerAddr]):
if blockchain_updated :
if blockchain_updated :
util . trigger_callback ( ' blockchain_updated ' )
util . trigger_callback ( ' blockchain_updated ' )
async def _close_interface ( self , interface : Interface ) :
async def _close_interface ( self , interface : Optional [ Interface ] ) :
if interface :
if not interface :
with self . interfaces_lock :
return
if self . interfaces . get ( interface . server ) == interface :
if interface . server in self . _closing_ifaces :
self . interfaces . pop ( interface . server )
return
if interface == self . interface :
self . _closing_ifaces . add ( interface . server )
self . interface = None
with self . interfaces_lock :
if self . interfaces . get ( interface . server ) == interface :
self . interfaces . pop ( interface . server )
if interface == self . interface :
self . interface = None
try :
# this can take some time if server/connection is slow:
# this can take some time if server/connection is slow:
await interface . close ( )
await interface . close ( )
await interface . got_disconnected . wait ( )
finally :
self . _closing_ifaces . discard ( interface . server )
@with_recent_servers_lock
@with_recent_servers_lock
def _add_recent_server ( self , server : ServerAddr ) - > None :
def _add_recent_server ( self , server : ServerAddr ) - > None :
@ -732,8 +743,7 @@ class Network(Logger, NetworkRetryManager[ServerAddr]):
''' A connection to server either went down, or was never made.
''' A connection to server either went down, or was never made.
We distinguish by whether it is in self . interfaces . '''
We distinguish by whether it is in self . interfaces . '''
if not interface : return
if not interface : return
# note: don't rely on interface.server for comparisons here
if interface . server == self . default_server :
if interface == self . interface :
self . _set_status ( ' disconnected ' )
self . _set_status ( ' disconnected ' )
await self . _close_interface ( interface )
await self . _close_interface ( interface )
util . trigger_callback ( ' network_updated ' )
util . trigger_callback ( ' network_updated ' )
@ -748,9 +758,11 @@ class Network(Logger, NetworkRetryManager[ServerAddr]):
@ignore_exceptions # do not kill outer taskgroup
@ignore_exceptions # do not kill outer taskgroup
@log_exceptions
@log_exceptions
async def _run_new_interface ( self , server : ServerAddr ) :
async def _run_new_interface ( self , server : ServerAddr ) :
if server in self . interfaces or server in self . _connecting :
if ( server in self . interfaces
or server in self . _connecting_ifaces
or server in self . _closing_ifaces ) :
return
return
self . _connecting . add ( server )
self . _connecting_ifaces . add ( server )
if server == self . default_server :
if server == self . default_server :
self . logger . info ( f " connecting to { server } as new interface " )
self . logger . info ( f " connecting to { server } as new interface " )
self . _set_status ( ' connecting ' )
self . _set_status ( ' connecting ' )
@ -770,8 +782,7 @@ class Network(Logger, NetworkRetryManager[ServerAddr]):
assert server not in self . interfaces
assert server not in self . interfaces
self . interfaces [ server ] = interface
self . interfaces [ server ] = interface
finally :
finally :
try : self . _connecting . remove ( server )
self . _connecting_ifaces . discard ( server )
except KeyError : pass
if server == self . default_server :
if server == self . default_server :
await self . switch_to_interface ( server )
await self . switch_to_interface ( server )
@ -1129,7 +1140,8 @@ class Network(Logger, NetworkRetryManager[ServerAddr]):
assert not self . taskgroup
assert not self . taskgroup
self . taskgroup = taskgroup = SilentTaskGroup ( )
self . taskgroup = taskgroup = SilentTaskGroup ( )
assert not self . interface and not self . interfaces
assert not self . interface and not self . interfaces
assert not self . _connecting
assert not self . _connecting_ifaces
assert not self . _closing_ifaces
self . logger . info ( ' starting network ' )
self . logger . info ( ' starting network ' )
self . _clear_addr_retry_times ( )
self . _clear_addr_retry_times ( )
self . _set_proxy ( deserialize_proxy ( self . config . get ( ' proxy ' ) ) )
self . _set_proxy ( deserialize_proxy ( self . config . get ( ' proxy ' ) ) )
@ -1173,7 +1185,8 @@ class Network(Logger, NetworkRetryManager[ServerAddr]):
self . taskgroup = None
self . taskgroup = None
self . interface = None
self . interface = None
self . interfaces = { }
self . interfaces = { }
self . _connecting . clear ( )
self . _connecting_ifaces . clear ( )
self . _closing_ifaces . clear ( )
if not full_shutdown :
if not full_shutdown :
util . trigger_callback ( ' network_updated ' )
util . trigger_callback ( ' network_updated ' )
@ -1197,7 +1210,8 @@ class Network(Logger, NetworkRetryManager[ServerAddr]):
async def _maintain_sessions ( self ) :
async def _maintain_sessions ( self ) :
async def maybe_start_new_interfaces ( ) :
async def maybe_start_new_interfaces ( ) :
for i in range ( self . num_server - len ( self . interfaces ) - len ( self . _connecting ) ) :
num_existing_ifaces = len ( self . interfaces ) + len ( self . _connecting_ifaces ) + len ( self . _closing_ifaces )
for i in range ( self . num_server - num_existing_ifaces ) :
# FIXME this should try to honour "healthy spread of connected servers"
# FIXME this should try to honour "healthy spread of connected servers"
server = self . _get_next_server_to_try ( )
server = self . _get_next_server_to_try ( )
if server :
if server :