Browse Source

Rework peer discovery

Make it more naturally async
patch-2
Neil Booth 7 years ago
parent
commit
eac214e508
  1. 2
      electrumx/__init__.py
  2. 376
      electrumx/server/peers.py

2
electrumx/__init__.py

@ -1,4 +1,4 @@
version = 'ElectrumX 1.6a' version = 'ElectrumX 1.6b'
version_short = version.split()[-1] version_short = version.split()[-1]
from electrumx.server.controller import Controller from electrumx.server.controller import Controller

376
electrumx/server/peers.py

@ -13,12 +13,11 @@ import socket
import ssl import ssl
import time import time
from collections import defaultdict, Counter from collections import defaultdict, Counter
from functools import partial
from aiorpcx import ClientSession, RPCError, SOCKSProxy, ConnectionError from aiorpcx import ClientSession, RPCError, SOCKSProxy, ConnectionError
from electrumx.lib.peer import Peer from electrumx.lib.peer import Peer
from electrumx.lib.util import ConnectionLogger, class_logger, protocol_tuple from electrumx.lib.util import class_logger, protocol_tuple
PEER_GOOD, PEER_STALE, PEER_NEVER, PEER_BAD = range(4) PEER_GOOD, PEER_STALE, PEER_NEVER, PEER_BAD = range(4)
@ -26,38 +25,23 @@ STALE_SECS = 24 * 3600
WAKEUP_SECS = 300 WAKEUP_SECS = 300
class PeerSession(ClientSession): class RequestError(Exception):
'''An outgoing session to a peer.''' pass
sessions = set()
def __init__(self, peer, peer_mgr, kind, host, port, **kwargs): class BadPeerError(Exception):
super().__init__(host, port, **kwargs) pass
self.peer = peer
self.peer_mgr = peer_mgr
self.kind = kind
self.timeout = 20 if self.peer.is_tor else 10
self.logger = class_logger(__name__, self.__class__.__name__)
self.logger = ConnectionLogger(self.logger, {'conn_id': f'{host}'})
def connection_made(self, transport):
super().connection_made(transport)
self.sessions.add(self)
# Update IP address if not Tor def assert_good(request, instance):
if not self.peer.is_tor: result = request.result()
address = self.peer_address() if not isinstance(result, instance):
if address: raise RequestError(f'{request} returned bad result type '
self.peer.ip_addr = address[0] f'{type(result).__name__}')
# Send server.version first
self.send_request('server.version', self.peer_mgr.server_version_args,
self.on_version, timeout=self.timeout)
def connection_lost(self, exc): class PeerSession(ClientSession):
'''Handle an incoming client connection.''' '''An outgoing session to a peer.'''
super().connection_lost(exc)
self.sessions.remove(self)
def _header_notification(self, header): def _header_notification(self, header):
pass pass
@ -68,171 +52,6 @@ class PeerSession(ClientSession):
return self._header_notification return self._header_notification
return None return None
def is_good(self, request, instance):
try:
result = request.result()
except (asyncio.CancelledError, ConnectionError):
return False
except asyncio.TimeoutError as e:
self.fail(request, str(e))
return False
except RPCError as error:
self.fail(request, f'{error.message} ({error.code})')
return False
if isinstance(result, instance):
return True
self.fail(request, f'{request} returned bad result type '
f'{type(result).__name__}')
return False
def fail(self, request, reason):
self.logger.error(f'{request} failed: {reason}')
self.peer_mgr._set_verification_status(self.peer, self.kind, False)
self.close()
def bad(self, reason):
self.logger.error(f'marking bad: {reason}')
self.peer.mark_bad()
self.peer_mgr._set_verification_status(self.peer, self.kind, False)
self.close()
def on_version(self, request):
'''Handle the response to the version message.'''
if not self.is_good(request, list):
return
result = request.result()
# Protocol version 1.1 returns a pair with the version first
if len(result) != 2 or not all(isinstance(x, str) for x in result):
self.fail(request, 'result array bad format')
return
version = result[0]
self.peer.server_version = version
self.peer.features['server_version'] = version
self.ptuple = protocol_tuple(result[1])
for method, on_done in [
('blockchain.headers.subscribe', self.on_height),
('server.features', self.on_features),
('server.peers.subscribe', self.on_peers_subscribe),
]:
self.send_request(method, on_done=on_done, timeout=self.timeout)
def on_features(self, request):
if not self.is_good(request, dict):
return
features = request.result()
hosts = [host.lower() for host in features.get('hosts', {})]
our_hash = self.peer_mgr.env.coin.GENESIS_HASH
if our_hash != features.get('genesis_hash'):
self.bad('incorrect genesis hash')
elif self.peer.host.lower() in hosts:
self.peer.update_features(features)
self.maybe_close()
else:
self.bad('ignoring - not listed in host list {}'.format(hosts))
def on_height(self, request):
'''Handle the response to blockchain.headers.subscribe message.'''
if not self.is_good(request, dict):
return
result = request.result()
our_height = self.peer_mgr.chain_state.db_height()
if self.ptuple < (1, 3):
their_height = result.get('block_height')
else:
their_height = result.get('height')
if not isinstance(their_height, int):
self.bad('invalid height {}'.format(their_height))
return
if abs(our_height - their_height) > 5:
self.bad('bad height {:,d} (ours: {:,d})'
.format(their_height, our_height))
return
# Check prior header too in case of hard fork.
check_height = min(our_height, their_height)
raw_header = self.peer_mgr.chain_state.raw_header(check_height)
if self.ptuple >= (1, 4):
self.send_request('blockchain.block.header', [check_height],
partial(self.on_header, raw_header.hex()),
timeout=self.timeout)
else:
expected_header = self.peer_mgr.env.coin.electrum_header(
raw_header, check_height)
self.send_request('blockchain.block.get_header', [check_height],
partial(self.on_legacy_header, expected_header),
timeout=self.timeout)
def on_header(self, ours, request):
'''Handle the response to blockchain.block.get_header message.
Compare hashes of prior header in attempt to determine if forked.'''
if not self.is_good(request, str):
return
theirs = request.result()
if ours == theirs:
self.maybe_close()
else:
self.bad('our header {} and theirs {} differ'.format(ours, theirs))
def on_legacy_header(self, expected_header, request):
'''Handle the response to blockchain.block.get_header message.
Compare hashes of prior header in attempt to determine if forked.'''
if not self.is_good(request, dict):
return
result = request.result()
theirs = result.get('prev_block_hash')
ours = expected_header.get('prev_block_hash')
if ours == theirs:
self.maybe_close()
else:
self.bad('our header hash {} and theirs {} differ'
.format(ours, theirs))
def on_peers_subscribe(self, request):
'''Handle the response to the peers.subcribe message.'''
if not self.is_good(request, list):
return
# Check the peers list we got from a remote peer.
# Each is expected to be of the form:
# [ip_addr, hostname, ['v1.0', 't51001', 's51002']]
# Call add_peer if the remote doesn't appear to know about us.
raw_peers = request.result()
try:
real_names = [' '.join([u[1]] + u[2]) for u in raw_peers]
peers = [Peer.from_real_name(real_name, str(self.peer))
for real_name in real_names]
except Exception:
self.bad('bad server.peers.subscribe response')
return
features = self.peer_mgr._features_to_register(self.peer, peers)
if features:
self.logger.info(f'registering ourself with "server.add_peer"')
self.send_request('server.add_peer', [features],
self.on_add_peer, timeout=self.timeout)
else:
self.maybe_close()
def on_add_peer(self, request):
'''We got a response the add_peer message. Don't care about its
form.'''
self.maybe_close()
def maybe_close(self):
'''Close the connection if no requests are outstanding, and mark peer
as good.
'''
if not self.all_requests():
self.close()
self.peer_mgr._set_verification_status(self.peer, self.kind, True)
class PeerManager(object): class PeerManager(object):
'''Looks after the DB of peer network servers. '''Looks after the DB of peer network servers.
@ -356,19 +175,13 @@ class PeerManager(object):
''' '''
self._import_peers() self._import_peers()
try: while True:
while True: await self._maybe_detect_proxy()
await self._maybe_detect_proxy() await self._retry_peers()
await self._retry_peers() timeout = self.loop.call_later(WAKEUP_SECS, self.retry_event.set)
timeout = self.loop.call_later(WAKEUP_SECS, await self.retry_event.wait()
self.retry_event.set) self.retry_event.clear()
await self.retry_event.wait() timeout.cancel()
self.retry_event.clear()
timeout.cancel()
finally:
for session in list(PeerSession.sessions):
session.abort()
await session.wait_closed()
async def _retry_peers(self): async def _retry_peers(self):
'''Retry peers that are close to getting stale.''' '''Retry peers that are close to getting stale.'''
@ -392,6 +205,7 @@ class PeerManager(object):
async def _retry_peer(self, peer): async def _retry_peer(self, peer):
peer.try_count += 1 peer.try_count += 1
success = False
for kind, port in peer.connection_port_pairs(): for kind, port in peer.connection_port_pairs():
peer.last_try = time.time() peer.last_try = time.time()
@ -414,19 +228,137 @@ class PeerManager(object):
# connections so our peers see the correct source. # connections so our peers see the correct source.
kwargs['local_addr'] = (host, None) kwargs['local_addr'] = (host, None)
session = PeerSession(peer, self, kind, peer.host, port, **kwargs)
try: try:
await session.create_connection() async with PeerSession(peer.host, port, **kwargs) as session:
return await self._verify_peer(session, peer)
except Exception as e: success = True
elapsed = time.time() - peer.last_try except RPCError as e:
self.logger.info(f'failed connecting to {peer} at {kind} port ' self.logger.error(f'[{peer}] RPC error: {e.message} '
f'{port} in {elapsed:.1f}s: {e}') f'({e.code})')
# Try the next port pair except (RequestError, asyncio.TimeoutError) as e:
self.logger.error(f'[{peer}] {e}')
except BadPeerError as e:
self.logger.error(f'[{peer}] marking bad: ({e})')
peer.mark_bad()
except (OSError, ConnectionError) as e:
self.logger.info(f'[{peer}] {kind} connection to '
f'port {port} failed: {e}')
continue continue
self._set_verification_status(peer, kind, success)
if success:
return
self._maybe_forget_peer(peer) self._maybe_forget_peer(peer)
async def _verify_peer(self, session, peer):
if not peer.is_tor:
address = session.peer_address()
if address:
peer.ip_addr = address[0]
timeout = 20 if peer.is_tor else 10
# server.version goes first
request = session.send_request(
'server.version', self.server_version_args, timeout=timeout)
result = await request
assert_good(request, list)
# Protocol version 1.1 returns a pair with the version first
if len(result) != 2 or not all(isinstance(x, str) for x in result):
raise RequestFailure(f'bad server.version result: {result}')
server_version, protocol_version = result
peer.server_version = server_version
peer.features['server_version'] = server_version
ptuple = protocol_tuple(protocol_version)
jobs = [self.tasks.create_task(message) for message in (
self._send_headers_subscribe(session, peer, timeout, ptuple),
self._send_server_features(session, peer, timeout),
self._send_peers_subscribe(session, peer, timeout)
)]
await asyncio.wait(jobs)
async def _send_headers_subscribe(self, session, peer, timeout, ptuple):
request = session.send_request('blockchain.headers.subscribe',
timeout=timeout)
result = await request
assert_good(request, dict)
our_height = self.chain_state.db_height()
if ptuple < (1, 3):
their_height = result.get('block_height')
else:
their_height = result.get('height')
if not isinstance(their_height, int):
raise BadPeerError(f'invalid height {their_height}')
if abs(our_height - their_height) > 5:
raise BadPeerError(f'bad height {their_height:,d} '
f'(ours: {our_height:,d})')
# Check prior header too in case of hard fork.
check_height = min(our_height, their_height)
raw_header = self.chain_state.raw_header(check_height)
if ptuple >= (1, 4):
ours = raw_header.hex()
request = session.send_request('blockchain.block.header',
[check_height], timeout=timeout)
theirs = await request
assert_good(request, str)
if ours != theirs:
raise BadPeerError(f'our header {ours} and '
f'theirs {theirs} differ')
else:
ours = self.env.coin.electrum_header(raw_header, check_height)
request = session.send_request('blockchain.block.get_header',
[check_height], timeout=timeout)
result = await request
assert_good(request, dict)
theirs = result.get('prev_block_hash')
ours = ours.get('prev_block_hash')
if ours != theirs:
raise BadPeerError(f'our header hash {ours} and '
f'theirs {theirs} differ')
async def _send_server_features(self, session, peer, timeout):
request = session.send_request('server.features', timeout=timeout)
features = await request
assert_good(request, dict)
hosts = [host.lower() for host in features.get('hosts', {})]
if self.env.coin.GENESIS_HASH != features.get('genesis_hash'):
raise BadPeerError('incorrect genesis hash')
elif peer.host.lower() in hosts:
peer.update_features(features)
else:
raise BadPeerError(f'not listed in own hosts list {hosts}')
async def _send_peers_subscribe(self, session, peer, timeout):
request = session.send_request('server.peers.subscribe',
timeout=timeout)
raw_peers = await request
assert_good(request, list)
# Check the peers list we got from a remote peer.
# Each is expected to be of the form:
# [ip_addr, hostname, ['v1.0', 't51001', 's51002']]
# Call add_peer if the remote doesn't appear to know about us.
try:
real_names = [' '.join([u[1]] + u[2]) for u in raw_peers]
peers = [Peer.from_real_name(real_name, str(peer))
for real_name in real_names]
except Exception:
raise BadPeerError('bad server.peers.subscribe response')
features = self._features_to_register(peer, peers)
if not features:
return
self.logger.info(f'registering ourself with {peer}')
request = session.send_request('server.add_peer', [features],
timeout=timeout)
# We only care to wait for the response
await request
def _set_verification_status(self, peer, kind, good): def _set_verification_status(self, peer, kind, good):
'''Called when a verification succeeded or failed.''' '''Called when a verification succeeded or failed.'''
now = time.time() now = time.time()
@ -464,7 +396,7 @@ class PeerManager(object):
if forget: if forget:
desc = 'bad' if peer.bad else 'unreachable' desc = 'bad' if peer.bad else 'unreachable'
self.logger.info('forgetting {} peer: {}'.format(desc, peer)) self.logger.info(f'forgetting {desc} peer: {peer}')
self.peers.discard(peer) self.peers.discard(peer)
# #
@ -492,7 +424,7 @@ class PeerManager(object):
elif check_ports: elif check_ports:
for match in matches: for match in matches:
if match.check_ports(peer): if match.check_ports(peer):
self.logger.info('ports changed for {}'.format(peer)) self.logger.info(f'ports changed for {peer}')
retry = True retry = True
if new_peers: if new_peers:
@ -504,8 +436,8 @@ class PeerManager(object):
else: else:
use_peers = new_peers use_peers = new_peers
for n, peer in enumerate(use_peers): for n, peer in enumerate(use_peers):
self.logger.info('accepted new peer {:d}/{:d} {} from {} ' self.logger.info(f'accepted new peer {n+1}/len(use_peers) '
.format(n + 1, len(use_peers), peer, source)) f'{peer} from {source}')
self.peers.update(use_peers) self.peers.update(use_peers)
if retry: if retry:
@ -552,12 +484,12 @@ class PeerManager(object):
reason = 'source-destination mismatch' reason = 'source-destination mismatch'
if permit: if permit:
self.logger.info('accepted add_peer request from {} for {}' self.logger.info(f'accepted add_peer request from {source} '
.format(source, host)) f'for {host}')
self.add_peers([peer], check_ports=True) self.add_peers([peer], check_ports=True)
else: else:
self.logger.warning('rejected add_peer request from {} for {} ({})' self.logger.warning(f'rejected add_peer request from {source} '
.format(source, host, reason)) f'for {host} ({reason})')
return permit return permit

Loading…
Cancel
Save