Browse Source

network: do not connect to multiple servers on same /16

maintain a healthy spread of (IP addresses of) connected servers
regtest_lnd
SomberNight 6 years ago
parent
commit
d8f3ab0917
No known key found for this signature in database GPG Key ID: B33B5F232C6271E9
  1. 39
      electrum/interface.py
  2. 36
      electrum/network.py

39
electrum/interface.py

@ -30,6 +30,7 @@ import traceback
import asyncio import asyncio
from typing import Tuple, Union, List, TYPE_CHECKING, Optional from typing import Tuple, Union, List, TYPE_CHECKING, Optional
from collections import defaultdict from collections import defaultdict
from ipaddress import IPv4Network, IPv6Network, ip_address
import aiorpcx import aiorpcx
from aiorpcx import RPCSession, Notification from aiorpcx import RPCSession, Notification
@ -51,6 +52,8 @@ if TYPE_CHECKING:
ca_path = certifi.where() ca_path = certifi.where()
BUCKET_NAME_OF_ONION_SERVERS = 'onion'
class NetworkTimeout: class NetworkTimeout:
# seconds # seconds
@ -187,6 +190,7 @@ class Interface(PrintError):
self.network = network self.network = network
self._set_proxy(proxy) self._set_proxy(proxy)
self.session = None # type: NotificationSession self.session = None # type: NotificationSession
self._ipaddr_bucket = None
self.tip_header = None self.tip_header = None
self.tip = 0 self.tip = 0
@ -395,6 +399,9 @@ class Interface(PrintError):
return conn, 0 return conn, 0
return conn, res['count'] return conn, res['count']
def is_main_server(self) -> bool:
return self.network.default_server == self.server
async def open_session(self, sslc, exit_early=False): async def open_session(self, sslc, exit_early=False):
async with aiorpcx.Connector(NotificationSession, async with aiorpcx.Connector(NotificationSession,
host=self.host, port=self.port, host=self.host, port=self.port,
@ -408,6 +415,9 @@ class Interface(PrintError):
raise GracefulDisconnect(e) # probably 'unsupported protocol version' raise GracefulDisconnect(e) # probably 'unsupported protocol version'
if exit_early: if exit_early:
return return
if not self.network.check_interface_against_healthy_spread_of_connected_servers(self):
raise GracefulDisconnect(f'too many connected servers already '
f'in bucket {self.bucket_based_on_ipaddress()}')
self.print_error("connection established. version: {}".format(ver)) self.print_error("connection established. version: {}".format(ver))
async with self.group as group: async with self.group as group:
@ -604,6 +614,35 @@ class Interface(PrintError):
def client_name(cls) -> str: def client_name(cls) -> str:
return f'electrum/{version.ELECTRUM_VERSION}' return f'electrum/{version.ELECTRUM_VERSION}'
def is_tor(self):
return self.host.endswith('.onion')
def ip_addr(self) -> Optional[str]:
session = self.session
if not session: return None
peer_addr = session.peer_address()
if not peer_addr: return None
return peer_addr[0]
def bucket_based_on_ipaddress(self) -> str:
def do_bucket():
if self.is_tor():
return BUCKET_NAME_OF_ONION_SERVERS
ip_addr = ip_address(self.ip_addr())
if not ip_addr:
return ''
if ip_addr.version == 4:
slash16 = IPv4Network(ip_addr).supernet(prefixlen_diff=32-16)
return str(slash16)
elif ip_addr.version == 6:
slash48 = IPv6Network(ip_addr).supernet(prefixlen_diff=128-48)
return str(slash48)
return ''
if not self._ipaddr_bucket:
self._ipaddr_bucket = do_bucket()
return self._ipaddr_bucket
def _assert_header_does_not_check_against_any_chain(header: dict) -> None: def _assert_header_does_not_check_against_any_chain(header: dict) -> None:
chain_bad = blockchain.check_header(header) if 'mock' not in header else header['mock']['check'](header) chain_bad = blockchain.check_header(header) if 'mock' not in header else header['mock']['check'](header)

36
electrum/network.py

@ -52,7 +52,7 @@ from . import blockchain
from . import bitcoin from . import bitcoin
from .blockchain import Blockchain, HEADER_SIZE from .blockchain import Blockchain, HEADER_SIZE
from .interface import (Interface, serialize_server, deserialize_server, from .interface import (Interface, serialize_server, deserialize_server,
RequestTimedOut, NetworkTimeout) RequestTimedOut, NetworkTimeout, BUCKET_NAME_OF_ONION_SERVERS)
from .version import PROTOCOL_VERSION from .version import PROTOCOL_VERSION
from .simple_config import SimpleConfig from .simple_config import SimpleConfig
from .i18n import _ from .i18n import _
@ -756,6 +756,30 @@ class Network(PrintError):
self._add_recent_server(server) self._add_recent_server(server)
self.trigger_callback('network_updated') self.trigger_callback('network_updated')
def check_interface_against_healthy_spread_of_connected_servers(self, iface_to_check) -> bool:
# main interface is exempt. this makes switching servers easier
if iface_to_check.is_main_server():
return True
# bucket connected interfaces
with self.interfaces_lock:
interfaces = list(self.interfaces.values())
if iface_to_check in interfaces:
interfaces.remove(iface_to_check)
buckets = defaultdict(list)
for iface in interfaces:
buckets[iface.bucket_based_on_ipaddress()].append(iface)
# check proposed server against buckets
onion_servers = buckets[BUCKET_NAME_OF_ONION_SERVERS]
if iface_to_check.is_tor():
# keep number of onion servers below half of all connected servers
if len(onion_servers) > NUM_TARGET_CONNECTED_SERVERS // 2:
return False
else:
bucket = iface_to_check.bucket_based_on_ipaddress()
if len(buckets[bucket]) > 0:
return False
return True
async def _init_headers_file(self): async def _init_headers_file(self):
b = blockchain.get_best_chain() b = blockchain.get_best_chain()
filename = b.path() filename = b.path()
@ -1149,11 +1173,20 @@ class Network(PrintError):
async def maybe_queue_new_interfaces_to_be_launched_later(): async def maybe_queue_new_interfaces_to_be_launched_later():
now = time.time() now = time.time()
for i in range(self.num_server - len(self.interfaces) - len(self.connecting)): for i in range(self.num_server - len(self.interfaces) - len(self.connecting)):
# FIXME this should try to honour "healthy spread of connected servers"
self._start_random_interface() self._start_random_interface()
if now - self.nodes_retry_time > NODES_RETRY_INTERVAL: if now - self.nodes_retry_time > NODES_RETRY_INTERVAL:
self.print_error('network: retrying connections') self.print_error('network: retrying connections')
self.disconnected_servers = set([]) self.disconnected_servers = set([])
self.nodes_retry_time = now self.nodes_retry_time = now
async def maintain_healthy_spread_of_connected_servers():
with self.interfaces_lock: interfaces = list(self.interfaces.values())
random.shuffle(interfaces)
for iface in interfaces:
if not self.check_interface_against_healthy_spread_of_connected_servers(iface):
self.print_error(f"disconnecting from {iface.server}. too many connected "
f"servers already in bucket {iface.bucket_based_on_ipaddress()}")
await self._close_interface(iface)
async def maintain_main_interface(): async def maintain_main_interface():
await self._ensure_there_is_a_main_interface() await self._ensure_there_is_a_main_interface()
if self.is_connected(): if self.is_connected():
@ -1164,6 +1197,7 @@ class Network(PrintError):
try: try:
await launch_already_queued_up_new_interfaces() await launch_already_queued_up_new_interfaces()
await maybe_queue_new_interfaces_to_be_launched_later() await maybe_queue_new_interfaces_to_be_launched_later()
await maintain_healthy_spread_of_connected_servers()
await maintain_main_interface() await maintain_main_interface()
except asyncio.CancelledError: except asyncio.CancelledError:
# suppress spurious cancellations # suppress spurious cancellations

Loading…
Cancel
Save