From eac214e508748ee3fb1c8107109e0300ebdb4175 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Tue, 24 Jul 2018 20:33:15 +0800 Subject: [PATCH] Rework peer discovery Make it more naturally async --- electrumx/__init__.py | 2 +- electrumx/server/peers.py | 376 ++++++++++++++++---------------------- 2 files changed, 155 insertions(+), 223 deletions(-) diff --git a/electrumx/__init__.py b/electrumx/__init__.py index 71cc7a5..11c2e48 100644 --- a/electrumx/__init__.py +++ b/electrumx/__init__.py @@ -1,4 +1,4 @@ -version = 'ElectrumX 1.6a' +version = 'ElectrumX 1.6b' version_short = version.split()[-1] from electrumx.server.controller import Controller diff --git a/electrumx/server/peers.py b/electrumx/server/peers.py index 5367bb4..5a03a8b 100644 --- a/electrumx/server/peers.py +++ b/electrumx/server/peers.py @@ -13,12 +13,11 @@ import socket import ssl import time from collections import defaultdict, Counter -from functools import partial from aiorpcx import ClientSession, RPCError, SOCKSProxy, ConnectionError from electrumx.lib.peer import Peer -from electrumx.lib.util import ConnectionLogger, class_logger, protocol_tuple +from electrumx.lib.util import class_logger, protocol_tuple PEER_GOOD, PEER_STALE, PEER_NEVER, PEER_BAD = range(4) @@ -26,38 +25,23 @@ STALE_SECS = 24 * 3600 WAKEUP_SECS = 300 -class PeerSession(ClientSession): - '''An outgoing session to a peer.''' +class RequestError(Exception): + pass - sessions = set() - def __init__(self, peer, peer_mgr, kind, host, port, **kwargs): - super().__init__(host, port, **kwargs) - self.peer = peer - self.peer_mgr = peer_mgr - self.kind = kind - self.timeout = 20 if self.peer.is_tor else 10 - self.logger = class_logger(__name__, self.__class__.__name__) - self.logger = ConnectionLogger(self.logger, {'conn_id': f'{host}'}) +class BadPeerError(Exception): + pass - def connection_made(self, transport): - super().connection_made(transport) - self.sessions.add(self) - # Update IP address if not Tor - if not self.peer.is_tor: - address = self.peer_address() - if address: - self.peer.ip_addr = address[0] +def assert_good(request, instance): + result = request.result() + if not isinstance(result, instance): + raise RequestError(f'{request} returned bad result type ' + f'{type(result).__name__}') - # Send server.version first - self.send_request('server.version', self.peer_mgr.server_version_args, - self.on_version, timeout=self.timeout) - def connection_lost(self, exc): - '''Handle an incoming client connection.''' - super().connection_lost(exc) - self.sessions.remove(self) +class PeerSession(ClientSession): + '''An outgoing session to a peer.''' def _header_notification(self, header): pass @@ -68,171 +52,6 @@ class PeerSession(ClientSession): return self._header_notification return None - def is_good(self, request, instance): - try: - result = request.result() - except (asyncio.CancelledError, ConnectionError): - return False - except asyncio.TimeoutError as e: - self.fail(request, str(e)) - return False - except RPCError as error: - self.fail(request, f'{error.message} ({error.code})') - return False - - if isinstance(result, instance): - return True - - self.fail(request, f'{request} returned bad result type ' - f'{type(result).__name__}') - return False - - def fail(self, request, reason): - self.logger.error(f'{request} failed: {reason}') - self.peer_mgr._set_verification_status(self.peer, self.kind, False) - self.close() - - def bad(self, reason): - self.logger.error(f'marking bad: {reason}') - self.peer.mark_bad() - self.peer_mgr._set_verification_status(self.peer, self.kind, False) - self.close() - - def on_version(self, request): - '''Handle the response to the version message.''' - if not self.is_good(request, list): - return - - result = request.result() - # Protocol version 1.1 returns a pair with the version first - if len(result) != 2 or not all(isinstance(x, str) for x in result): - self.fail(request, 'result array bad format') - return - version = result[0] - self.peer.server_version = version - self.peer.features['server_version'] = version - self.ptuple = protocol_tuple(result[1]) - - for method, on_done in [ - ('blockchain.headers.subscribe', self.on_height), - ('server.features', self.on_features), - ('server.peers.subscribe', self.on_peers_subscribe), - ]: - self.send_request(method, on_done=on_done, timeout=self.timeout) - - def on_features(self, request): - if not self.is_good(request, dict): - return - - features = request.result() - hosts = [host.lower() for host in features.get('hosts', {})] - our_hash = self.peer_mgr.env.coin.GENESIS_HASH - if our_hash != features.get('genesis_hash'): - self.bad('incorrect genesis hash') - elif self.peer.host.lower() in hosts: - self.peer.update_features(features) - self.maybe_close() - else: - self.bad('ignoring - not listed in host list {}'.format(hosts)) - - def on_height(self, request): - '''Handle the response to blockchain.headers.subscribe message.''' - if not self.is_good(request, dict): - return - - result = request.result() - our_height = self.peer_mgr.chain_state.db_height() - if self.ptuple < (1, 3): - their_height = result.get('block_height') - else: - their_height = result.get('height') - if not isinstance(their_height, int): - self.bad('invalid height {}'.format(their_height)) - return - if abs(our_height - their_height) > 5: - self.bad('bad height {:,d} (ours: {:,d})' - .format(their_height, our_height)) - return - # Check prior header too in case of hard fork. - check_height = min(our_height, their_height) - raw_header = self.peer_mgr.chain_state.raw_header(check_height) - if self.ptuple >= (1, 4): - self.send_request('blockchain.block.header', [check_height], - partial(self.on_header, raw_header.hex()), - timeout=self.timeout) - else: - expected_header = self.peer_mgr.env.coin.electrum_header( - raw_header, check_height) - self.send_request('blockchain.block.get_header', [check_height], - partial(self.on_legacy_header, expected_header), - timeout=self.timeout) - - def on_header(self, ours, request): - '''Handle the response to blockchain.block.get_header message. - Compare hashes of prior header in attempt to determine if forked.''' - if not self.is_good(request, str): - return - - theirs = request.result() - if ours == theirs: - self.maybe_close() - else: - self.bad('our header {} and theirs {} differ'.format(ours, theirs)) - - def on_legacy_header(self, expected_header, request): - '''Handle the response to blockchain.block.get_header message. - Compare hashes of prior header in attempt to determine if forked.''' - if not self.is_good(request, dict): - return - - result = request.result() - theirs = result.get('prev_block_hash') - ours = expected_header.get('prev_block_hash') - if ours == theirs: - self.maybe_close() - else: - self.bad('our header hash {} and theirs {} differ' - .format(ours, theirs)) - - def on_peers_subscribe(self, request): - '''Handle the response to the peers.subcribe message.''' - if not self.is_good(request, list): - return - - # Check the peers list we got from a remote peer. - # Each is expected to be of the form: - # [ip_addr, hostname, ['v1.0', 't51001', 's51002']] - # Call add_peer if the remote doesn't appear to know about us. - raw_peers = request.result() - try: - real_names = [' '.join([u[1]] + u[2]) for u in raw_peers] - peers = [Peer.from_real_name(real_name, str(self.peer)) - for real_name in real_names] - except Exception: - self.bad('bad server.peers.subscribe response') - return - - features = self.peer_mgr._features_to_register(self.peer, peers) - if features: - self.logger.info(f'registering ourself with "server.add_peer"') - self.send_request('server.add_peer', [features], - self.on_add_peer, timeout=self.timeout) - else: - self.maybe_close() - - def on_add_peer(self, request): - '''We got a response the add_peer message. Don't care about its - form.''' - self.maybe_close() - - def maybe_close(self): - '''Close the connection if no requests are outstanding, and mark peer - as good. - ''' - if not self.all_requests(): - self.close() - self.peer_mgr._set_verification_status(self.peer, self.kind, True) - class PeerManager(object): '''Looks after the DB of peer network servers. @@ -356,19 +175,13 @@ class PeerManager(object): ''' self._import_peers() - try: - while True: - await self._maybe_detect_proxy() - await self._retry_peers() - timeout = self.loop.call_later(WAKEUP_SECS, - self.retry_event.set) - await self.retry_event.wait() - self.retry_event.clear() - timeout.cancel() - finally: - for session in list(PeerSession.sessions): - session.abort() - await session.wait_closed() + while True: + await self._maybe_detect_proxy() + await self._retry_peers() + timeout = self.loop.call_later(WAKEUP_SECS, self.retry_event.set) + await self.retry_event.wait() + self.retry_event.clear() + timeout.cancel() async def _retry_peers(self): '''Retry peers that are close to getting stale.''' @@ -392,6 +205,7 @@ class PeerManager(object): async def _retry_peer(self, peer): peer.try_count += 1 + success = False for kind, port in peer.connection_port_pairs(): peer.last_try = time.time() @@ -414,19 +228,137 @@ class PeerManager(object): # connections so our peers see the correct source. kwargs['local_addr'] = (host, None) - session = PeerSession(peer, self, kind, peer.host, port, **kwargs) try: - await session.create_connection() - return - except Exception as e: - elapsed = time.time() - peer.last_try - self.logger.info(f'failed connecting to {peer} at {kind} port ' - f'{port} in {elapsed:.1f}s: {e}') - # Try the next port pair + async with PeerSession(peer.host, port, **kwargs) as session: + await self._verify_peer(session, peer) + success = True + except RPCError as e: + self.logger.error(f'[{peer}] RPC error: {e.message} ' + f'({e.code})') + except (RequestError, asyncio.TimeoutError) as e: + self.logger.error(f'[{peer}] {e}') + except BadPeerError as e: + self.logger.error(f'[{peer}] marking bad: ({e})') + peer.mark_bad() + except (OSError, ConnectionError) as e: + self.logger.info(f'[{peer}] {kind} connection to ' + f'port {port} failed: {e}') continue + self._set_verification_status(peer, kind, success) + if success: + return + self._maybe_forget_peer(peer) + async def _verify_peer(self, session, peer): + if not peer.is_tor: + address = session.peer_address() + if address: + peer.ip_addr = address[0] + + timeout = 20 if peer.is_tor else 10 + + # server.version goes first + request = session.send_request( + 'server.version', self.server_version_args, timeout=timeout) + result = await request + assert_good(request, list) + + # Protocol version 1.1 returns a pair with the version first + if len(result) != 2 or not all(isinstance(x, str) for x in result): + raise RequestFailure(f'bad server.version result: {result}') + server_version, protocol_version = result + peer.server_version = server_version + peer.features['server_version'] = server_version + ptuple = protocol_tuple(protocol_version) + + jobs = [self.tasks.create_task(message) for message in ( + self._send_headers_subscribe(session, peer, timeout, ptuple), + self._send_server_features(session, peer, timeout), + self._send_peers_subscribe(session, peer, timeout) + )] + await asyncio.wait(jobs) + + async def _send_headers_subscribe(self, session, peer, timeout, ptuple): + request = session.send_request('blockchain.headers.subscribe', + timeout=timeout) + result = await request + assert_good(request, dict) + + our_height = self.chain_state.db_height() + if ptuple < (1, 3): + their_height = result.get('block_height') + else: + their_height = result.get('height') + if not isinstance(their_height, int): + raise BadPeerError(f'invalid height {their_height}') + if abs(our_height - their_height) > 5: + raise BadPeerError(f'bad height {their_height:,d} ' + f'(ours: {our_height:,d})') + + # Check prior header too in case of hard fork. + check_height = min(our_height, their_height) + raw_header = self.chain_state.raw_header(check_height) + if ptuple >= (1, 4): + ours = raw_header.hex() + request = session.send_request('blockchain.block.header', + [check_height], timeout=timeout) + theirs = await request + assert_good(request, str) + if ours != theirs: + raise BadPeerError(f'our header {ours} and ' + f'theirs {theirs} differ') + else: + ours = self.env.coin.electrum_header(raw_header, check_height) + request = session.send_request('blockchain.block.get_header', + [check_height], timeout=timeout) + result = await request + assert_good(request, dict) + theirs = result.get('prev_block_hash') + ours = ours.get('prev_block_hash') + if ours != theirs: + raise BadPeerError(f'our header hash {ours} and ' + f'theirs {theirs} differ') + + async def _send_server_features(self, session, peer, timeout): + request = session.send_request('server.features', timeout=timeout) + features = await request + assert_good(request, dict) + hosts = [host.lower() for host in features.get('hosts', {})] + if self.env.coin.GENESIS_HASH != features.get('genesis_hash'): + raise BadPeerError('incorrect genesis hash') + elif peer.host.lower() in hosts: + peer.update_features(features) + else: + raise BadPeerError(f'not listed in own hosts list {hosts}') + + async def _send_peers_subscribe(self, session, peer, timeout): + request = session.send_request('server.peers.subscribe', + timeout=timeout) + raw_peers = await request + assert_good(request, list) + + # Check the peers list we got from a remote peer. + # Each is expected to be of the form: + # [ip_addr, hostname, ['v1.0', 't51001', 's51002']] + # Call add_peer if the remote doesn't appear to know about us. + try: + real_names = [' '.join([u[1]] + u[2]) for u in raw_peers] + peers = [Peer.from_real_name(real_name, str(peer)) + for real_name in real_names] + except Exception: + raise BadPeerError('bad server.peers.subscribe response') + + features = self._features_to_register(peer, peers) + if not features: + return + self.logger.info(f'registering ourself with {peer}') + request = session.send_request('server.add_peer', [features], + timeout=timeout) + # We only care to wait for the response + await request + def _set_verification_status(self, peer, kind, good): '''Called when a verification succeeded or failed.''' now = time.time() @@ -464,7 +396,7 @@ class PeerManager(object): if forget: desc = 'bad' if peer.bad else 'unreachable' - self.logger.info('forgetting {} peer: {}'.format(desc, peer)) + self.logger.info(f'forgetting {desc} peer: {peer}') self.peers.discard(peer) # @@ -492,7 +424,7 @@ class PeerManager(object): elif check_ports: for match in matches: if match.check_ports(peer): - self.logger.info('ports changed for {}'.format(peer)) + self.logger.info(f'ports changed for {peer}') retry = True if new_peers: @@ -504,8 +436,8 @@ class PeerManager(object): else: use_peers = new_peers for n, peer in enumerate(use_peers): - self.logger.info('accepted new peer {:d}/{:d} {} from {} ' - .format(n + 1, len(use_peers), peer, source)) + self.logger.info(f'accepted new peer {n+1}/len(use_peers) ' + f'{peer} from {source}') self.peers.update(use_peers) if retry: @@ -552,12 +484,12 @@ class PeerManager(object): reason = 'source-destination mismatch' if permit: - self.logger.info('accepted add_peer request from {} for {}' - .format(source, host)) + self.logger.info(f'accepted add_peer request from {source} ' + f'for {host}') self.add_peers([peer], check_ports=True) else: - self.logger.warning('rejected add_peer request from {} for {} ({})' - .format(source, host, reason)) + self.logger.warning(f'rejected add_peer request from {source} ' + f'for {host} ({reason})') return permit