@ -56,7 +56,9 @@ class PeerSession(JSONSession):
self . peer = peer
self . peer_mgr = peer_mgr
self . kind = kind
self . failed = False
self . bad = False
self . remote_peers = None
self . log_prefix = ' [ {} ] ' . format ( self . peer )
async def wait_on_items ( self ) :
@ -92,44 +94,12 @@ class PeerSession(JSONSession):
def on_peers_subscribe ( self , result , error ) :
''' Handle the response to the peers.subcribe message. '''
if error :
self . ba d = True
self . faile d = True
self . log_error ( ' server.peers.subscribe: {} ' . format ( error ) )
else :
self . check_remote_peers ( result )
self . remote_peers = result
self . close_if_done ( )
def check_remote_peers ( self , updates ) :
''' When a peer gives us a peer update.
Each update is expected to be of the form :
[ ip_addr , hostname , [ ' v1.0 ' , ' t51001 ' , ' s51002 ' ] ]
'''
try :
real_names = [ ' ' . join ( [ u [ 1 ] ] + u [ 2 ] ) for u in updates ]
peers = [ Peer . from_real_name ( real_name , str ( self . peer ) )
for real_name in real_names ]
except Exception :
self . log_error ( ' bad server.peers.subscribe response ' )
return
self . peer_mgr . add_peers ( peers )
# Announce ourself if not present. Don't if disabled, we
# are a non-public IP address, or to ourselves.
if not self . peer_mgr . env . peer_announce :
return
if self . peer in self . peer_mgr . myselves :
return
my = self . peer_mgr . my_clearnet_peer ( )
if not my . is_public :
return
for peer in my . matches ( peers ) :
if peer . tcp_port == my . tcp_port and peer . ssl_port == my . ssl_port :
return
self . log_info ( ' registering ourself with server.add_peer ' )
self . send_request ( self . on_add_peer , ' server.add_peer ' , [ my . features ] )
def on_add_peer ( self , result , error ) :
''' Handle the response to the add_peer message. '''
self . close_if_done ( )
@ -149,7 +119,7 @@ class PeerSession(JSONSession):
def on_headers ( self , result , error ) :
''' Handle the response to the version message. '''
if error :
self . ba d = True
self . faile d = True
self . log_error ( ' blockchain.headers.subscribe returned an error ' )
elif not isinstance ( result , dict ) :
self . bad = True
@ -157,34 +127,72 @@ class PeerSession(JSONSession):
else :
our_height = self . peer_mgr . controller . bp . db_height
their_height = result . get ( ' block_height ' )
is_good = ( isinstance ( their_height , int ) and
abs ( our_height - their_height ) < = 5 )
if not is_good :
if not isinstance ( their_height , int ) :
self . log_warning ( ' invalid height {} ' . format ( their_height ) )
self . bad = True
elif abs ( our_height - their_height ) > 5 :
self . log_warning ( ' bad height {:,d} (ours: {:,d} ) '
. format ( their_height , our_height ) )
self . bad = True
self . log_warning ( ' bad height {} ' . format ( their_height ) )
self . close_if_done ( )
def on_version ( self , result , error ) :
''' Handle the response to the version message. '''
if error :
self . ba d = True
self . faile d = True
self . log_error ( ' server.version returned an error ' )
elif isinstance ( result , str ) :
self . peer . server_version = result
self . peer . features [ ' server_version ' ] = result
self . close_if_done ( )
def check_remote_peers ( self ) :
''' When a peer gives us a peer update.
Each update is expected to be of the form :
[ ip_addr , hostname , [ ' v1.0 ' , ' t51001 ' , ' s51002 ' ] ]
'''
try :
real_names = [ ' ' . join ( [ u [ 1 ] ] + u [ 2 ] ) for u in self . remote_peers ]
peers = [ Peer . from_real_name ( real_name , str ( self . peer ) )
for real_name in real_names ]
except Exception :
self . log_error ( ' bad server.peers.subscribe response ' )
return
self . peer_mgr . add_peers ( peers )
# Announce ourself if not present. Don't if disabled, we
# are a non-public IP address, or to ourselves.
if not self . peer_mgr . env . peer_announce :
return
if self . peer in self . peer_mgr . myselves :
return
my = self . peer_mgr . my_clearnet_peer ( )
if not my . is_public :
return
for peer in my . matches ( peers ) :
if peer . tcp_port == my . tcp_port and peer . ssl_port == my . ssl_port :
return
self . log_info ( ' registering ourself with server.add_peer ' )
self . send_request ( self . on_add_peer , ' server.add_peer ' , [ my . features ] )
def close_if_done ( self ) :
if not self . has_pending_requests ( ) :
if self . bad :
self . peer . mark_bad ( )
self . peer_mgr . set_connection_status ( self . peer , not self . bad )
elif self . remote_peers :
self . check_remote_peers ( )
self . peer . last_connect = time . time ( )
is_good = not ( self . failed or self . bad )
self . peer_mgr . set_connection_status ( self . peer , is_good )
if self . peer . is_tor :
how = ' via {} over Tor ' . format ( self . kind )
else :
how = ' via {} at {} ' . format ( self . kind ,
self . peer_addr ( anon = False ) )
status = ' failed to verify ' if self . bad else ' verified '
status = ' verified ' if is_goo d else ' failed to verify '
elapsed = time . time ( ) - self . peer . last_try
self . log_info ( ' {} {} in {:.1f} s ' . format ( status , how , elapsed ) )
self . close_connection ( )
@ -536,7 +544,6 @@ class PeerManager(util.LoggedClass):
''' Called when a connection succeeded or failed. '''
if good :
peer . try_count = 0
peer . last_connect = time . time ( )
peer . source = ' peer '
# Remove matching IP addresses
for match in peer . matches ( self . peers ) :
@ -547,7 +554,10 @@ class PeerManager(util.LoggedClass):
def maybe_forget_peer ( self , peer ) :
''' Forget the peer if appropriate, e.g. long-term unreachable. '''
try_limit = 10 if peer . last_connect else 3
if peer . last_connect and not peer . bad :
try_limit = 10
else :
try_limit = 3
forget = peer . try_count > = try_limit
if forget :