diff --git a/electrum/interface.py b/electrum/interface.py index 516ef6680..309c656f3 100644 --- a/electrum/interface.py +++ b/electrum/interface.py @@ -30,6 +30,7 @@ import traceback import asyncio from typing import Tuple, Union, List, TYPE_CHECKING, Optional from collections import defaultdict +from ipaddress import IPv4Network, IPv6Network, ip_address import aiorpcx from aiorpcx import RPCSession, Notification @@ -51,6 +52,8 @@ if TYPE_CHECKING: ca_path = certifi.where() +BUCKET_NAME_OF_ONION_SERVERS = 'onion' + class NetworkTimeout: # seconds @@ -187,6 +190,7 @@ class Interface(PrintError): self.network = network self._set_proxy(proxy) self.session = None # type: NotificationSession + self._ipaddr_bucket = None self.tip_header = None self.tip = 0 @@ -395,6 +399,9 @@ class Interface(PrintError): return conn, 0 return conn, res['count'] + def is_main_server(self) -> bool: + return self.network.default_server == self.server + async def open_session(self, sslc, exit_early=False): async with aiorpcx.Connector(NotificationSession, host=self.host, port=self.port, @@ -408,6 +415,9 @@ class Interface(PrintError): raise GracefulDisconnect(e) # probably 'unsupported protocol version' if exit_early: return + if not self.network.check_interface_against_healthy_spread_of_connected_servers(self): + raise GracefulDisconnect(f'too many connected servers already ' + f'in bucket {self.bucket_based_on_ipaddress()}') self.print_error("connection established. version: {}".format(ver)) async with self.group as group: @@ -604,6 +614,35 @@ class Interface(PrintError): def client_name(cls) -> str: return f'electrum/{version.ELECTRUM_VERSION}' + def is_tor(self): + return self.host.endswith('.onion') + + def ip_addr(self) -> Optional[str]: + session = self.session + if not session: return None + peer_addr = session.peer_address() + if not peer_addr: return None + return peer_addr[0] + + def bucket_based_on_ipaddress(self) -> str: + def do_bucket(): + if self.is_tor(): + return BUCKET_NAME_OF_ONION_SERVERS + ip_addr = ip_address(self.ip_addr()) + if not ip_addr: + return '' + if ip_addr.version == 4: + slash16 = IPv4Network(ip_addr).supernet(prefixlen_diff=32-16) + return str(slash16) + elif ip_addr.version == 6: + slash48 = IPv6Network(ip_addr).supernet(prefixlen_diff=128-48) + return str(slash48) + return '' + + if not self._ipaddr_bucket: + self._ipaddr_bucket = do_bucket() + return self._ipaddr_bucket + def _assert_header_does_not_check_against_any_chain(header: dict) -> None: chain_bad = blockchain.check_header(header) if 'mock' not in header else header['mock']['check'](header) diff --git a/electrum/network.py b/electrum/network.py index 4ac337299..ed5892af9 100644 --- a/electrum/network.py +++ b/electrum/network.py @@ -52,7 +52,7 @@ from . import blockchain from . import bitcoin from .blockchain import Blockchain, HEADER_SIZE from .interface import (Interface, serialize_server, deserialize_server, - RequestTimedOut, NetworkTimeout) + RequestTimedOut, NetworkTimeout, BUCKET_NAME_OF_ONION_SERVERS) from .version import PROTOCOL_VERSION from .simple_config import SimpleConfig from .i18n import _ @@ -756,6 +756,30 @@ class Network(PrintError): self._add_recent_server(server) self.trigger_callback('network_updated') + def check_interface_against_healthy_spread_of_connected_servers(self, iface_to_check) -> bool: + # main interface is exempt. this makes switching servers easier + if iface_to_check.is_main_server(): + return True + # bucket connected interfaces + with self.interfaces_lock: + interfaces = list(self.interfaces.values()) + if iface_to_check in interfaces: + interfaces.remove(iface_to_check) + buckets = defaultdict(list) + for iface in interfaces: + buckets[iface.bucket_based_on_ipaddress()].append(iface) + # check proposed server against buckets + onion_servers = buckets[BUCKET_NAME_OF_ONION_SERVERS] + if iface_to_check.is_tor(): + # keep number of onion servers below half of all connected servers + if len(onion_servers) > NUM_TARGET_CONNECTED_SERVERS // 2: + return False + else: + bucket = iface_to_check.bucket_based_on_ipaddress() + if len(buckets[bucket]) > 0: + return False + return True + async def _init_headers_file(self): b = blockchain.get_best_chain() filename = b.path() @@ -1149,11 +1173,20 @@ class Network(PrintError): async def maybe_queue_new_interfaces_to_be_launched_later(): now = time.time() for i in range(self.num_server - len(self.interfaces) - len(self.connecting)): + # FIXME this should try to honour "healthy spread of connected servers" self._start_random_interface() if now - self.nodes_retry_time > NODES_RETRY_INTERVAL: self.print_error('network: retrying connections') self.disconnected_servers = set([]) self.nodes_retry_time = now + async def maintain_healthy_spread_of_connected_servers(): + with self.interfaces_lock: interfaces = list(self.interfaces.values()) + random.shuffle(interfaces) + for iface in interfaces: + if not self.check_interface_against_healthy_spread_of_connected_servers(iface): + self.print_error(f"disconnecting from {iface.server}. too many connected " + f"servers already in bucket {iface.bucket_based_on_ipaddress()}") + await self._close_interface(iface) async def maintain_main_interface(): await self._ensure_there_is_a_main_interface() if self.is_connected(): @@ -1164,6 +1197,7 @@ class Network(PrintError): try: await launch_already_queued_up_new_interfaces() await maybe_queue_new_interfaces_to_be_launched_later() + await maintain_healthy_spread_of_connected_servers() await maintain_main_interface() except asyncio.CancelledError: # suppress spurious cancellations