|
|
@ -17,7 +17,7 @@ from collections import defaultdict, Counter |
|
|
|
from aiorpcx import (Connector, RPCSession, SOCKSProxy, |
|
|
|
Notification, handler_invocation, |
|
|
|
SOCKSError, RPCError, TaskTimeout, TaskGroup, Event, |
|
|
|
sleep, run_in_thread, ignore_after, timeout_after) |
|
|
|
sleep, ignore_after, timeout_after) |
|
|
|
|
|
|
|
from electrumx.lib.peer import Peer |
|
|
|
from electrumx.lib.util import class_logger, protocol_tuple |
|
|
@ -152,22 +152,28 @@ class PeerManager(object): |
|
|
|
self.logger.info('no proxy detected, will try later') |
|
|
|
await sleep(900) |
|
|
|
|
|
|
|
async def _note_peers(self, peers, limit=2, check_ports=False, |
|
|
|
async def _note_peers(self, peers, limit=2, check_ports=False, check_matches=False, |
|
|
|
source=None): |
|
|
|
'''Add a limited number of peers that are not already present.''' |
|
|
|
new_peers = [] |
|
|
|
known = [] |
|
|
|
for peer in peers: |
|
|
|
if not peer.is_public or (peer.is_tor and not self.proxy): |
|
|
|
continue |
|
|
|
|
|
|
|
matches = peer.matches(self.peers) |
|
|
|
if not matches: |
|
|
|
if matches: |
|
|
|
known.append(peer) |
|
|
|
if check_ports: |
|
|
|
for match in matches: |
|
|
|
if match.check_ports(peer): |
|
|
|
self.logger.info(f'ports changed for {peer}') |
|
|
|
match.retry_event.set() |
|
|
|
else: |
|
|
|
new_peers.append(peer) |
|
|
|
elif check_ports: |
|
|
|
for match in matches: |
|
|
|
if match.check_ports(peer): |
|
|
|
self.logger.info(f'ports changed for {peer}') |
|
|
|
match.retry_event.set() |
|
|
|
|
|
|
|
if check_matches and len(self.peers) >= 6 and len(known) <= len(self.peers) // 2: |
|
|
|
return False |
|
|
|
|
|
|
|
if new_peers: |
|
|
|
source = source or new_peers[0].source |
|
|
@ -182,6 +188,8 @@ class PeerManager(object): |
|
|
|
self.peers.add(peer) |
|
|
|
await self.group.spawn(self._monitor_peer(peer)) |
|
|
|
|
|
|
|
return True |
|
|
|
|
|
|
|
async def _monitor_peer(self, peer): |
|
|
|
# Stop monitoring if we were dropped (a duplicate peer) |
|
|
|
while peer in self.peers: |
|
|
@ -299,12 +307,14 @@ class PeerManager(object): |
|
|
|
|
|
|
|
# Process reported peers if remote peer is good |
|
|
|
peers = peers_task.result() |
|
|
|
await self._note_peers(peers) |
|
|
|
features = self._features_to_register(peer, peers) |
|
|
|
if features: |
|
|
|
self.logger.info(f'registering ourself with {peer}') |
|
|
|
# We only care to wait for the response |
|
|
|
await session.send_request('server.add_peer', [features]) |
|
|
|
if await self._note_peers(peers, check_matches=True): |
|
|
|
features = self._features_to_register(peer, peers) |
|
|
|
if features: |
|
|
|
self.logger.info(f'registering ourself with {peer}') |
|
|
|
# We only care to wait for the response |
|
|
|
await session.send_request('server.add_peer', [features]) |
|
|
|
else: |
|
|
|
raise BadPeerError('potential sybil detected') |
|
|
|
|
|
|
|
async def _send_headers_subscribe(self, session, peer, ptuple): |
|
|
|
message = 'blockchain.headers.subscribe' |
|
|
@ -446,8 +456,7 @@ class PeerManager(object): |
|
|
|
reason = 'source-destination mismatch' |
|
|
|
|
|
|
|
if permit: |
|
|
|
self.logger.info(f'accepted add_peer request from {source} ' |
|
|
|
f'for {host}') |
|
|
|
self.logger.info(f'accepted add_peer request from {source} for {host}') |
|
|
|
await self._note_peers([peer], check_ports=True) |
|
|
|
else: |
|
|
|
self.logger.warning(f'rejected add_peer request from {source} ' |
|
|
|