|
|
@ -185,8 +185,8 @@ class PeerManager(object): |
|
|
|
async def _monitor_peer(self, peer): |
|
|
|
# Stop monitoring if we were dropped (a duplicate peer) |
|
|
|
while peer in self.peers: |
|
|
|
is_good = await self._is_peer_good(peer) |
|
|
|
if self._should_drop_peer(peer, is_good): |
|
|
|
if await self._should_drop_peer(peer): |
|
|
|
self.peers.discard(peer) |
|
|
|
break |
|
|
|
# Figure out how long to sleep before retrying. Retry a |
|
|
|
# good connection when it is about to turn stale, otherwise |
|
|
@ -198,47 +198,9 @@ class PeerManager(object): |
|
|
|
async with ignore_after(pause): |
|
|
|
await peer.retry_event.wait() |
|
|
|
|
|
|
|
async def _should_drop_peer(self, peer, is_good): |
|
|
|
now = time.time() |
|
|
|
if self.env.force_proxy or peer.is_tor: |
|
|
|
how = f'via {kind} over Tor' |
|
|
|
else: |
|
|
|
how = f'via {kind} at {peer.ip_addr}' |
|
|
|
status = 'verified' if good else 'failed to verify' |
|
|
|
elapsed = now - peer.last_try |
|
|
|
self.logger.info(f'{status} {peer} {how} in {elapsed:.1f}s') |
|
|
|
|
|
|
|
if good: |
|
|
|
peer.try_count = 0 |
|
|
|
peer.last_good = now |
|
|
|
peer.source = 'peer' |
|
|
|
# At most 2 matches if we're a host name, potentially |
|
|
|
# several if we're an IP address (several instances |
|
|
|
# can share a NAT). |
|
|
|
matches = peer.matches(self.peers) |
|
|
|
for match in matches: |
|
|
|
if match.ip_address: |
|
|
|
if len(matches) > 1: |
|
|
|
self.peers.remove(match) |
|
|
|
# Force the peer's monitoring task to exit |
|
|
|
match.retry_event.set() |
|
|
|
elif peer.host in match.features['hosts']: |
|
|
|
match.update_features_from_peer(peer) |
|
|
|
else: |
|
|
|
# Forget the peer if long-term unreachable |
|
|
|
if peer.last_good and not peer.bad: |
|
|
|
try_limit = 10 |
|
|
|
else: |
|
|
|
try_limit = 3 |
|
|
|
if peer.try_count >= try_limit: |
|
|
|
desc = 'bad' if peer.bad else 'unreachable' |
|
|
|
self.logger.info(f'forgetting {desc} peer: {peer}') |
|
|
|
self.peers.discard(peer) |
|
|
|
return True |
|
|
|
return False |
|
|
|
|
|
|
|
async def _is_peer_good(self, peer): |
|
|
|
async def _should_drop_peer(self, peer): |
|
|
|
peer.try_count += 1 |
|
|
|
is_good = False |
|
|
|
for kind, port in peer.connection_port_pairs(): |
|
|
|
peer.last_try = time.time() |
|
|
|
|
|
|
@ -263,7 +225,8 @@ class PeerManager(object): |
|
|
|
try: |
|
|
|
async with PeerSession(peer.host, port, **kwargs) as session: |
|
|
|
await self._verify_peer(session, peer) |
|
|
|
return True |
|
|
|
is_good = True |
|
|
|
break |
|
|
|
except BadPeerError as e: |
|
|
|
self.logger.error(f'[{peer}] marking bad: ({e})') |
|
|
|
peer.mark_bad() |
|
|
@ -277,6 +240,43 @@ class PeerManager(object): |
|
|
|
self.logger.info(f'[{peer}] {kind} connection to ' |
|
|
|
f'port {port} failed: {e}') |
|
|
|
|
|
|
|
now = time.time() |
|
|
|
if self.env.force_proxy or peer.is_tor: |
|
|
|
how = f'via {kind} over Tor' |
|
|
|
else: |
|
|
|
how = f'via {kind} at {peer.ip_addr}' |
|
|
|
status = 'verified' if is_good else 'failed to verify' |
|
|
|
elapsed = now - peer.last_try |
|
|
|
self.logger.info(f'{status} {peer} {how} in {elapsed:.1f}s') |
|
|
|
|
|
|
|
if is_good: |
|
|
|
peer.try_count = 0 |
|
|
|
peer.last_good = now |
|
|
|
peer.source = 'peer' |
|
|
|
# At most 2 matches if we're a host name, potentially |
|
|
|
# several if we're an IP address (several instances |
|
|
|
# can share a NAT). |
|
|
|
matches = peer.matches(self.peers) |
|
|
|
for match in matches: |
|
|
|
if match.ip_address: |
|
|
|
if len(matches) > 1: |
|
|
|
self.peers.remove(match) |
|
|
|
# Force the peer's monitoring task to exit |
|
|
|
match.retry_event.set() |
|
|
|
elif peer.host in match.features['hosts']: |
|
|
|
match.update_features_from_peer(peer) |
|
|
|
else: |
|
|
|
# Forget the peer if long-term unreachable |
|
|
|
if peer.last_good and not peer.bad: |
|
|
|
try_limit = 10 |
|
|
|
else: |
|
|
|
try_limit = 3 |
|
|
|
if peer.try_count >= try_limit: |
|
|
|
desc = 'bad' if peer.bad else 'unreachable' |
|
|
|
self.logger.info(f'forgetting {desc} peer: {peer}') |
|
|
|
return True |
|
|
|
return False |
|
|
|
|
|
|
|
async def _verify_peer(self, session, peer): |
|
|
|
if not peer.is_tor: |
|
|
|
address = session.peer_address() |
|
|
|