@ -75,6 +75,10 @@ NUM_TARGET_CONNECTED_SERVERS = 10
NUM_STICKY_SERVERS = 4
NUM_STICKY_SERVERS = 4
NUM_RECENT_SERVERS = 20
NUM_RECENT_SERVERS = 20
_KNOWN_NETWORK_PROTOCOLS = { ' t ' , ' s ' }
PREFERRED_NETWORK_PROTOCOL = ' s '
assert PREFERRED_NETWORK_PROTOCOL in _KNOWN_NETWORK_PROTOCOLS
def parse_servers ( result : Sequence [ Tuple [ str , str , List [ str ] ] ] ) - > Dict [ str , dict ] :
def parse_servers ( result : Sequence [ Tuple [ str , str , List [ str ] ] ] ) - > Dict [ str , dict ] :
""" parse servers list into dict format """
""" parse servers list into dict format """
@ -115,23 +119,27 @@ def filter_noonion(servers):
return { k : v for k , v in servers . items ( ) if not k . endswith ( ' .onion ' ) }
return { k : v for k , v in servers . items ( ) if not k . endswith ( ' .onion ' ) }
def filter_protocol ( hostmap , protocol = ' s ' ) - > Sequence [ ServerAddr ] :
def filter_protocol ( hostmap , * , allowed_protocols : Iterable [ str ] = None ) - > Sequence [ ServerAddr ] :
""" Filters the hostmap for those implementing protocol. """
""" Filters the hostmap for those implementing protocol. """
if allowed_protocols is None :
allowed_protocols = { PREFERRED_NETWORK_PROTOCOL }
eligible = [ ]
eligible = [ ]
for host , portmap in hostmap . items ( ) :
for host , portmap in hostmap . items ( ) :
for protocol in allowed_protocols :
port = portmap . get ( protocol )
port = portmap . get ( protocol )
if port :
if port :
eligible . append ( ServerAddr ( host , port , protocol = protocol ) )
eligible . append ( ServerAddr ( host , port , protocol = protocol ) )
return eligible
return eligible
def pick_random_server ( hostmap = None , * , protocol = ' s ' ,
def pick_random_server ( hostmap = None , * , allowed_protocols : Iterable [ str ] ,
exclude_set : Set [ ServerAddr ] = None ) - > Optional [ ServerAddr ] :
exclude_set : Set [ ServerAddr ] = None ) - > Optional [ ServerAddr ] :
if hostmap is None :
if hostmap is None :
hostmap = constants . net . DEFAULT_SERVERS
hostmap = constants . net . DEFAULT_SERVERS
if exclude_set is None :
if exclude_set is None :
exclude_set = set ( )
exclude_set = set ( )
eligible = list ( set ( filter_protocol ( hostmap , protocol ) ) - exclude_set )
servers = set ( filter_protocol ( hostmap , allowed_protocols = allowed_protocols ) )
eligible = list ( servers - exclude_set )
return random . choice ( eligible ) if eligible else None
return random . choice ( eligible ) if eligible else None
@ -273,6 +281,9 @@ class Network(Logger, NetworkRetryManager[ServerAddr]):
self . logger . info ( f " blockchains { list ( map ( lambda b : b . forkpoint , blockchain . blockchains . values ( ) ) ) } " )
self . logger . info ( f " blockchains { list ( map ( lambda b : b . forkpoint , blockchain . blockchains . values ( ) ) ) } " )
self . _blockchain_preferred_block = self . config . get ( ' blockchain_preferred_block ' , None ) # type: Optional[Dict]
self . _blockchain_preferred_block = self . config . get ( ' blockchain_preferred_block ' , None ) # type: Optional[Dict]
self . _blockchain = blockchain . get_best_chain ( )
self . _blockchain = blockchain . get_best_chain ( )
self . _allowed_protocols = { PREFERRED_NETWORK_PROTOCOL }
# Server for addresses and transactions
# Server for addresses and transactions
self . default_server = self . config . get ( ' server ' , None )
self . default_server = self . config . get ( ' server ' , None )
# Sanitize default server
# Sanitize default server
@ -283,7 +294,7 @@ class Network(Logger, NetworkRetryManager[ServerAddr]):
self . logger . warning ( ' failed to parse server-string; falling back to localhost. ' )
self . logger . warning ( ' failed to parse server-string; falling back to localhost. ' )
self . default_server = ServerAddr . from_str ( " localhost:50002:s " )
self . default_server = ServerAddr . from_str ( " localhost:50002:s " )
else :
else :
self . default_server = pick_random_server ( )
self . default_server = pick_random_server ( allowed_protocols = self . _allowed_protocols )
assert isinstance ( self . default_server , ServerAddr ) , f " invalid type for default_server: { self . default_server !r} "
assert isinstance ( self . default_server , ServerAddr ) , f " invalid type for default_server: { self . default_server !r} "
self . taskgroup = None
self . taskgroup = None
@ -549,7 +560,7 @@ class Network(Logger, NetworkRetryManager[ServerAddr]):
# we only give priority to recent_servers up to NUM_STICKY_SERVERS.
# we only give priority to recent_servers up to NUM_STICKY_SERVERS.
with self . recent_servers_lock :
with self . recent_servers_lock :
recent_servers = list ( self . _recent_servers )
recent_servers = list ( self . _recent_servers )
recent_servers = [ s for s in recent_servers if s . protocol == self . protocol ]
recent_servers = [ s for s in recent_servers if s . protocol in self . _allowed_ protocols ]
if len ( connected_servers & set ( recent_servers ) ) < NUM_STICKY_SERVERS :
if len ( connected_servers & set ( recent_servers ) ) < NUM_STICKY_SERVERS :
for server in recent_servers :
for server in recent_servers :
if server in connected_servers :
if server in connected_servers :
@ -559,7 +570,7 @@ class Network(Logger, NetworkRetryManager[ServerAddr]):
return server
return server
# try all servers we know about, pick one at random
# try all servers we know about, pick one at random
hostmap = self . get_servers ( )
hostmap = self . get_servers ( )
servers = list ( set ( filter_protocol ( hostmap , self . protocol ) ) - connected_servers )
servers = list ( set ( filter_protocol ( hostmap , allowed_protocols = self . _allowed_ protocols ) ) - connected_servers )
random . shuffle ( servers )
random . shuffle ( servers )
for server in servers :
for server in servers :
if not self . _can_retry_addr ( server , now = now ) :
if not self . _can_retry_addr ( server , now = now ) :
@ -574,7 +585,7 @@ class Network(Logger, NetworkRetryManager[ServerAddr]):
util . trigger_callback ( ' proxy_set ' , self . proxy )
util . trigger_callback ( ' proxy_set ' , self . proxy )
@log_exceptions
@log_exceptions
async def set_parameters ( self , net_params : NetworkParameters ) :
async def set_parameters ( self , net_params : NetworkParameters ) : # TODO
proxy = net_params . proxy
proxy = net_params . proxy
proxy_str = serialize_proxy ( proxy )
proxy_str = serialize_proxy ( proxy )
host , port , protocol = net_params . host , net_params . port , net_params . protocol
host , port , protocol = net_params . host , net_params . port , net_params . protocol
@ -598,7 +609,7 @@ class Network(Logger, NetworkRetryManager[ServerAddr]):
async with self . restart_lock :
async with self . restart_lock :
self . auto_connect = net_params . auto_connect
self . auto_connect = net_params . auto_connect
if self . proxy != proxy or self . protocol != protocol or self . oneserver != net_params . oneserver :
if self . proxy != proxy or self . oneserver != net_params . oneserver :
# Restart the network defaulting to the given server
# Restart the network defaulting to the given server
await self . _stop ( )
await self . _stop ( )
self . default_server = server
self . default_server = server
@ -1138,7 +1149,6 @@ class Network(Logger, NetworkRetryManager[ServerAddr]):
assert not self . _connecting
assert not self . _connecting
self . logger . info ( ' starting network ' )
self . logger . info ( ' starting network ' )
self . _clear_addr_retry_times ( )
self . _clear_addr_retry_times ( )
self . protocol = self . default_server . protocol
self . _set_proxy ( deserialize_proxy ( self . config . get ( ' proxy ' ) ) )
self . _set_proxy ( deserialize_proxy ( self . config . get ( ' proxy ' ) ) )
self . _set_oneserver ( self . config . get ( ' oneserver ' , False ) )
self . _set_oneserver ( self . config . get ( ' oneserver ' , False ) )
await self . taskgroup . spawn ( self . _run_new_interface ( self . default_server ) )
await self . taskgroup . spawn ( self . _run_new_interface ( self . default_server ) )
@ -1282,7 +1292,7 @@ class Network(Logger, NetworkRetryManager[ServerAddr]):
session = self . interface . session
session = self . interface . session
return parse_servers ( await session . send_request ( ' server.peers.subscribe ' ) )
return parse_servers ( await session . send_request ( ' server.peers.subscribe ' ) )
async def send_multiple_requests ( self , servers : List [ st r] , method : str , params : Sequence ) :
async def send_multiple_requests ( self , servers : Sequence [ ServerAdd r] , method : str , params : Sequence ) :
responses = dict ( )
responses = dict ( )
async def get_response ( server : ServerAddr ) :
async def get_response ( server : ServerAddr ) :
interface = Interface ( network = self , server = server , proxy = self . proxy )
interface = Interface ( network = self , server = server , proxy = self . proxy )
@ -1299,6 +1309,5 @@ class Network(Logger, NetworkRetryManager[ServerAddr]):
responses [ interface . server ] = res
responses [ interface . server ] = res
async with TaskGroup ( ) as group :
async with TaskGroup ( ) as group :
for server in servers :
for server in servers :
server = ServerAddr . from_str ( server )
await group . spawn ( get_response ( server ) )
await group . spawn ( get_response ( server ) )
return responses
return responses