Browse Source

Switch to aiorpcX for SOCKS proxying.

patch-2
Neil Booth 7 years ago
parent
commit
80fc334449
  1. 241
      lib/socks.py
  2. 60
      server/peers.py
  3. 6
      server/session.py
  4. 6
      setup.py

241
lib/socks.py

@ -1,241 +0,0 @@
# Copyright (c) 2017, Neil Booth
#
# All rights reserved.
#
# The MIT License (MIT)
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
# and warranty status of this software.
'''Socks proxying.'''
import asyncio
import ipaddress
import logging
import socket
import struct
from functools import partial
import lib.util as util
class Socks(util.LoggedClass):
'''Socks protocol wrapper.'''
SOCKS5_ERRORS = {
1: 'general SOCKS server failure',
2: 'connection not allowed by ruleset',
3: 'network unreachable',
4: 'host unreachable',
5: 'connection refused',
6: 'TTL expired',
7: 'command not supported',
8: 'address type not supported',
}
class Error(Exception):
pass
def __init__(self, loop, sock, host, port):
super().__init__()
self.loop = loop
self.sock = sock
self.host = host
self.port = port
try:
self.ip_address = ipaddress.ip_address(host)
except ValueError:
self.ip_address = None
self.debug = False
async def _socks4_handshake(self):
if self.ip_address:
# Socks 4
ip_addr = self.ip_address
host_bytes = b''
else:
# Socks 4a
ip_addr = ipaddress.ip_address('0.0.0.1')
host_bytes = self.host.encode() + b'\0'
user_id = ''
data = b'\4\1' + struct.pack('>H', self.port) + ip_addr.packed
data += user_id.encode() + b'\0' + host_bytes
await self.loop.sock_sendall(self.sock, data)
data = await self.loop.sock_recv(self.sock, 8)
if data[0] != 0:
raise self.Error('proxy sent bad initial Socks4 byte')
if data[1] != 0x5a:
raise self.Error('proxy request failed or rejected')
async def _socks5_handshake(self):
await self.loop.sock_sendall(self.sock, b'\5\1\0')
data = await self.loop.sock_recv(self.sock, 2)
if data[0] != 5:
raise self.Error('proxy sent bad SOCKS5 initial byte')
if data[1] != 0:
raise self.Error('proxy rejected SOCKS5 authentication method')
if self.ip_address:
if self.ip_address.version == 4:
addr = b'\1' + self.ip_address.packed
else:
addr = b'\4' + self.ip_address.packed
else:
host = self.host.encode()
addr = b'\3' + bytes([len(host)]) + host
data = b'\5\1\0' + addr + struct.pack('>H', self.port)
await self.loop.sock_sendall(self.sock, data)
data = await self.loop.sock_recv(self.sock, 5)
if data[0] != 5:
raise self.Error('proxy sent bad SOSCK5 response initial byte')
if data[1] != 0:
msg = self.SOCKS5_ERRORS.get(data[1], 'unknown SOCKS5 error {:d}'
.format(data[1]))
raise self.Error(msg)
if data[3] == 1:
addr_len, data = 3, data[4:]
elif data[3] == 3:
addr_len, data = data[4], b''
elif data[3] == 4:
addr_len, data = 15, data[4:]
data = await self.loop.sock_recv(self.sock, addr_len + 2)
addr = data[:addr_len]
port, = struct.unpack('>H', data[-2:])
async def handshake(self):
'''Write the proxy handshake sequence.'''
if self.ip_address and self.ip_address.version == 6:
await self._socks5_handshake()
else:
await self._socks4_handshake()
if self.debug:
address = (self.host, self.port)
self.log_info('successful connection via proxy to {}'
.format(util.address_string(address)))
class SocksProxy(util.LoggedClass):
def __init__(self, host, port, loop):
'''Host can be an IPv4 address, IPv6 address, or a host name.
Port can be None, in which case one is auto-detected.'''
super().__init__()
# Host and port of the proxy
self.host = host
self.try_ports = [port, 9050, 9150, 1080]
self.errors = 0
self.ip_addr = None
self.lost_event = asyncio.Event()
self.tried_event = asyncio.Event()
self.loop = loop
self.set_lost()
async def auto_detect_loop(self):
'''Try to detect a proxy at regular intervals until one is found.
If one is found, do nothing until one is lost.'''
while True:
await self.lost_event.wait()
self.lost_event.clear()
tries = 0
while True:
tries += 1
log_failure = tries % 10 == 1
await self.detect_proxy(log_failure=log_failure)
if self.is_up():
break
await asyncio.sleep(600)
def is_up(self):
'''Returns True if we have a good proxy.'''
return self.port is not None
def set_lost(self):
'''Called when the proxy appears lost/down.'''
self.port = None
self.lost_event.set()
async def connect_via_proxy(self, host, port, proxy_address=None):
'''Connect to a (host, port) pair via the proxy. Returns the
connected socket on success.'''
proxy_address = proxy_address or (self.host, self.port)
sock = socket.socket()
sock.setblocking(False)
try:
await self.loop.sock_connect(sock, proxy_address)
socks = Socks(self.loop, sock, host, port)
await socks.handshake()
return sock
except Exception:
sock.close()
raise
async def detect_proxy(self, host='www.google.com', port=80,
log_failure=True):
'''Attempt to detect a proxy by establishing a connection through it
to the given target host / port pair.
'''
if self.is_up():
return
sock = None
for proxy_port in self.try_ports:
if proxy_port is None:
continue
paddress = (self.host, proxy_port)
try:
sock = await self.connect_via_proxy(host, port, paddress)
break
except Exception as e:
if log_failure:
self.logger.info('failed to detect proxy at {}: {}'
.format(util.address_string(paddress), e))
self.tried_event.set()
# Failed all ports?
if sock is None:
return
peername = sock.getpeername()
sock.close()
self.ip_addr = peername[0]
self.port = proxy_port
self.errors = 0
self.logger.info('detected proxy at {} ({})'
.format(util.address_string(paddress), self.ip_addr))
async def create_connection(self, protocol_factory, host, port, **kwargs):
'''All arguments are as to asyncio's create_connection method.'''
try:
sock = await self.connect_via_proxy(host, port)
self.errors = 0
except Exception:
self.errors += 1
# If we have 3 consecutive errors, consider the proxy undetected
if self.errors == 3:
self.set_lost()
raise
hostname = host if kwargs.get('ssl') else None
return await self.loop.create_connection(
protocol_factory, sock=sock, server_hostname=hostname, **kwargs)

60
server/peers.py

@ -15,9 +15,10 @@ import time
from collections import defaultdict, Counter
from functools import partial
import aiorpcx
from lib.jsonrpc import JSONSession
from lib.peer import Peer
from lib.socks import SocksProxy
import lib.util as util
import server.version as version
@ -238,8 +239,9 @@ class PeerManager(util.LoggedClass):
# any other peers with the same host name or IP address.
self.peers = set()
self.permit_onion_peer_time = time.time()
self.proxy = SocksProxy(env.tor_proxy_host, env.tor_proxy_port,
loop=self.loop)
self.proxy_tried_event = asyncio.Event()
self.detect_proxy_event = asyncio.Event()
self.proxy = None
def my_clearnet_peer(self):
'''Returns the clearnet peer representing this server, if any.'''
@ -415,6 +417,43 @@ class PeerManager(util.LoggedClass):
'''Schedule the coro to be run.'''
return self.controller.ensure_future(coro, callback=callback)
async def detect_proxy_loop(self):
'''Detect a proxy. If found, returns with self.proxy set to an
aiorpcX.SOCKSProxy instance. Otherwise retries occasionally.'''
host = self.env.tor_proxy_host
if self.env.tor_proxy_port is None:
ports = [9050, 9150, 1080]
else:
ports = [self.env.tor_proxy_port]
cls = aiorpcx.SOCKSProxy
self.detect_proxy_event.set()
while True:
await self.detect_proxy_event.wait()
self.detect_proxy_event.clear()
if self.proxy:
continue
self.log_info(f'trying to detect proxy on "{host}" ports {ports}')
result = await cls.auto_detect_host(host, ports, None,
loop=self.loop)
self.proxy_tried_event.set()
if isinstance(result, cls):
self.proxy = result
self.log_info(f'detected {self.proxy}')
continue
for failure_msg in result:
self.log_info(failure_msg)
pause = 600
self.log_info(f'will retry proxy detection in {pause} seconds')
self.loop.call_later(pause, self.detect_proxy_event.set)
def proxy_peername(self):
'''Return the peername of the proxy, if there is a proxy, otherwise
None.'''
return self.proxy.peername if self.proxy else None
async def main_loop(self):
'''Main loop performing peer maintenance. This includes
@ -426,15 +465,14 @@ class PeerManager(util.LoggedClass):
self.logger.info('peer discovery is disabled')
return
self.import_peers()
self.logger.info('beginning peer discovery. Force use of proxy: {}'
.format(self.env.force_proxy))
# Wait a few seconds after starting the proxy detection loop
# for proxy detection to succeed
self.ensure_future(self.proxy.auto_detect_loop())
await self.proxy.tried_event.wait()
# Wait a few moments while trying to detect a proxy
self.ensure_future(self.detect_proxy_loop())
await self.proxy_tried_event.wait()
self.logger.info('beginning peer discovery; force use of proxy: {}'
.format(self.env.force_proxy))
self.import_peers()
while True:
timeout = self.loop.call_later(WAKEUP_SECS, self.retry_event.set)
@ -481,7 +519,7 @@ class PeerManager(util.LoggedClass):
if self.env.force_proxy or peer.is_tor:
# Only attempt a proxy connection if the proxy is up
if not self.proxy.is_up():
if not self.proxy:
return
create_connection = self.proxy.create_connection
else:

6
server/session.py

@ -296,9 +296,11 @@ class ElectrumX(SessionBase):
def is_tor(self):
'''Try to detect if the connection is to a tor hidden service we are
running.'''
proxy = self.controller.peer_mgr.proxy
peername = self.controller.peer_mgr.proxy_peername()
if not peername:
return False
peer_info = self.peer_info()
return peer_info and peer_info[0] == proxy.ip_addr
return peer_info and peer_info[0] == peername[0]
async def replaced_banner(self, banner):
network_info = await self.controller.daemon_request('getnetworkinfo')

6
setup.py

@ -11,7 +11,7 @@ setuptools.setup(
# "x11_hash" package (1.4) is required to sync DASH network.
# "tribus_hash" package is required to sync Denarius network.
# "blake256" package is required to sync Decred network.
install_requires=['plyvel', 'pylru', 'aiohttp >= 1'],
install_requires=['aiorpcX >= 0.4.1', 'plyvel', 'pylru', 'aiohttp >= 1'],
packages=setuptools.find_packages(exclude=['tests']),
description='ElectrumX Server',
author='Neil Booth',
@ -20,9 +20,11 @@ setuptools.setup(
url='https://github.com/kyuupichan/electrumx/',
long_description='Server implementation for the Electrum wallet',
classifiers=[
'Development Status :: 3 - Alpha',
'Development Status :: 5 - Production/Stable',
'Framework :: AsyncIO',
'Topic :: Internet',
'License :: OSI Approved :: MIT License',
'Operating System :: Unix',
"Programming Language :: Python :: 3.6",
],
)

Loading…
Cancel
Save