diff --git a/server/peers.py b/server/peers.py index 97adb2d..47f54c6 100644 --- a/server/peers.py +++ b/server/peers.py @@ -17,7 +17,6 @@ from functools import partial import aiorpcx -from lib.jsonrpc import JSONSession from lib.peer import Peer import lib.util as util import server.version as version @@ -28,191 +27,178 @@ STALE_SECS = 24 * 3600 WAKEUP_SECS = 300 -class PeerSession(JSONSession): +class PeerSession(aiorpcx.ClientSession, util.LoggedClass): '''An outgoing session to a peer.''' - def __init__(self, peer, peer_mgr, kind): - super().__init__() - self.max_send = 0 + def __init__(self, peer, peer_mgr, kind, host, port, **kwargs): + super().__init__(host, port, **kwargs) + util.LoggedClass.__init__(self) self.peer = peer self.peer_mgr = peer_mgr self.kind = kind - self.failed = False - self.bad = False - self.remote_peers = None - self.log_prefix = '[{}] '.format(self.peer) - - async def wait_on_items(self): - while True: - await self.items_event.wait() - await self.process_pending_items() + self.timeout = 20 if self.peer.is_tor else 10 def connection_made(self, transport): '''Handle an incoming client connection.''' super().connection_made(transport) - self.log_prefix = '[{}] '.format(str(self.peer)[:25]) - self.future = self.peer_mgr.ensure_future(self.wait_on_items()) - # Update IP address + # Update IP address if not Tor if not self.peer.is_tor: - peer_info = self.peer_info() - if peer_info: - self.peer.ip_addr = peer_info[0] - - # Collect data - proto_ver = (version.PROTOCOL_MIN, version.PROTOCOL_MAX) - self.send_request(self.on_version, 'server.version', - [version.VERSION, proto_ver]) - self.send_request(self.on_features, 'server.features') - self.send_request(self.on_height, 'blockchain.headers.subscribe') - self.send_request(self.on_peers_subscribe, 'server.peers.subscribe') - - def connection_lost(self, exc): - '''Handle disconnection.''' - super().connection_lost(exc) - self.future.cancel() - - def on_peers_subscribe(self, result, error): - '''Handle the response to the peers.subcribe message.''' - if error: - self.failed = True - self.log_error('server.peers.subscribe: {}'.format(error)) - else: - # Save for later analysis - self.remote_peers = result - self.close_if_done() - - def on_add_peer(self, result, error): - '''We got a response the add_peer message.''' - # This is the last thing we were waiting for; shutdown the connection - self.shutdown_connection() - - def on_features(self, features, error): - # Several peers don't implement this. If they do, check they are - # the same network with the genesis hash. - if not error and isinstance(features, dict): - hosts = [host.lower() for host in features.get('hosts', {})] - our_hash = self.peer_mgr.env.coin.GENESIS_HASH - if our_hash != features.get('genesis_hash'): - self.bad = True - self.log_warning('incorrect genesis hash') - elif self.peer.host.lower() in hosts: - self.peer.update_features(features) - else: - self.bad = True - self.log_warning('ignoring - not listed in host list {}' - .format(hosts)) - self.close_if_done() + address = self.peer_address() + if address: + self.peer.ip_addr = address[0] - def on_height(self, result, error): - '''Handle the response to blockchain.headers.subscribe message.''' - if error: - self.failed = True - self.log_error('blockchain.headers.subscribe returned an error') - elif not isinstance(result, dict): - self.bad = True - self.log_error('bad blockchain.headers.subscribe response') - else: - controller = self.peer_mgr.controller - our_height = controller.bp.db_height - their_height = result.get('block_height') - if not isinstance(their_height, int): - self.log_warning('invalid height {}'.format(their_height)) - self.bad = True - elif abs(our_height - their_height) > 5: - self.log_warning('bad height {:,d} (ours: {:,d})' - .format(their_height, our_height)) - self.bad = True - - # Check prior header too in case of hard fork. - if not self.bad: - check_height = min(our_height, their_height) - self.send_request(self.on_header, 'blockchain.block.get_header', - [check_height]) - self.expected_header = controller.electrum_header(check_height) - self.close_if_done() - - def on_header(self, result, error): - '''Handle the response to blockchain.block.get_header message. - Compare hashes of prior header in attempt to determine if forked.''' - if error: - self.failed = True - self.log_error('blockchain.block.get_header returned an error') - elif not isinstance(result, dict): - self.bad = True - self.log_error('bad blockchain.block.get_header response') - else: - theirs = result.get('prev_block_hash') - ours = self.expected_header.get('prev_block_hash') - if ours != theirs: - self.log_error('our header hash {} and theirs {} differ' - .format(ours, theirs)) - self.bad = True + # Send server.version first + args = [version.VERSION, [version.PROTOCOL_MIN, version.PROTOCOL_MAX]] + self.send_request('server.version', args, self.on_version, + timeout=self.timeout) + + def is_good(self, request, instance): + try: + result = request.result() + except asyncio.CancelledError: + return False + except asyncio.TimeoutError as e: + self.fail(request, str(e)) + return False + except RPCError as error: + self.fail(request, f'{error.message} ({error.code})') + return False - self.close_if_done() + if isinstance(result, instance): + return True - def on_version(self, result, error): + self.fail(request, f'{request} returned bad result type ' + f'{type(result).__name__}') + return False + + def fail(self, request, reason): + self.logger.error(f'[{self.peer.host}] {request} failed: {reason}') + self.peer_mgr.set_verification_status(self.peer, self.kind, False) + self.close() + + def bad(self, reason): + self.logger.error(f'[{self.peer.host}] marking bad: {reason}') + self.peer.mark_bad() + self.peer_mgr.set_verification_status(self.peer, self.kind, False) + self.close() + + def on_version(self, request): '''Handle the response to the version message.''' - if error: - self.failed = True - self.log_error('server.version returned an error') + if not self.is_good(request, (list, str)): + return + + result = request.result() + if isinstance(result, str): + version = result else: # Protocol version 1.1 returns a pair with the version first - if isinstance(result, list) and len(result) == 2: - result = result[0] - if isinstance(result, str): - self.peer.server_version = result - self.peer.features['server_version'] = result - self.close_if_done() + if len(result) < 2 or not isinstance(result[0], str): + self.fail(request, 'result array bad format') + return + version = result[0] + self.peer.server_version = version + self.peer.features['server_version'] = version + + for method, on_done in [ + ('blockchain.headers.subscribe', self.on_height), + ('server.features', self.on_features), + ('server.peers.subscribe', self.on_peers_subscribe), + ]: + self.send_request(method, on_done=on_done, timeout=self.timeout) + + def on_features(self, request): + if not self.is_good(request, dict): + return - def check_remote_peers(self): - '''Check the peers list we got from a remote peer. + features = request.result() + hosts = [host.lower() for host in features.get('hosts', {})] + our_hash = self.peer_mgr.env.coin.GENESIS_HASH + if our_hash != features.get('genesis_hash'): + self.bad('incorrect genesis hash') + elif self.peer.host.lower() in hosts: + self.peer.update_features(features) + self.maybe_close() + else: + self.bad('ignoring - not listed in host list {}'.format(hosts)) - Each update is expected to be of the form: - [ip_addr, hostname, ['v1.0', 't51001', 's51002']] + def on_height(self, request): + '''Handle the response to blockchain.headers.subscribe message.''' + if not self.is_good(request, dict): + return - Call add_peer if the remote doesn't appear to know about us. - ''' - try: - real_names = [' '.join([u[1]] + u[2]) for u in self.remote_peers] - peers = [Peer.from_real_name(real_name, str(self.peer)) - for real_name in real_names] - except Exception: - self.log_error('bad server.peers.subscribe response') + result = request.result() + controller = self.peer_mgr.controller + our_height = controller.bp.db_height + their_height = result.get('block_height') + if not isinstance(their_height, int): + self.bad('invalid height {}'.format(their_height)) + return + if abs(our_height - their_height) > 5: + self.bad('bad height {:,d} (ours: {:,d})' + .format(their_height, our_height)) + return + # Check prior header too in case of hard fork. + check_height = min(our_height, their_height) + expected_header = controller.electrum_header(check_height) + self.send_request('blockchain.block.get_header', [check_height], + partial(self.on_header, expected_header), + timeout=self.timeout) + + def on_header(self, expected_header, request): + '''Handle the response to blockchain.block.get_header message. + Compare hashes of prior header in attempt to determine if forked.''' + if not self.is_good(request, dict): return - self.peer_mgr.add_peers(peers) + result = request.result() + theirs = result.get('prev_block_hash') + ours = expected_header.get('prev_block_hash') + if ours == theirs: + self.maybe_close() + else: + self.bad('our header hash {} and theirs {} differ' + .format(ours, theirs)) - # Announce ourself if not present. Don't if disabled, we - # are a non-public IP address, or to ourselves. - if not self.peer_mgr.env.peer_announce: - return - if self.peer in self.peer_mgr.myselves: + def on_peers_subscribe(self, request): + '''Handle the response to the peers.subcribe message.''' + if not self.is_good(request, list): return - my = self.peer_mgr.my_clearnet_peer() - if not my or not my.is_public: + + # Check the peers list we got from a remote peer. + # Each is expected to be of the form: + # [ip_addr, hostname, ['v1.0', 't51001', 's51002']] + # Call add_peer if the remote doesn't appear to know about us. + raw_peers = request.result() + try: + real_names = [' '.join([u[1]] + u[2]) for u in raw_peers] + peers = [Peer.from_real_name(real_name, str(self.peer)) + for real_name in real_names] + except Exception: + self.bad('bad server.peers.subscribe response') return - for peer in my.matches(peers): - if peer.tcp_port == my.tcp_port and peer.ssl_port == my.ssl_port: - return - self.log_info('registering ourself with server.add_peer') - self.send_request(self.on_add_peer, 'server.add_peer', [my.features]) + features = self.peer_mgr.features_to_register(self.peer, peers) + if features: + self.logger.info(f'[{self.peer.host}] registering ourself with ' + '"server.add_peer"') + self.send_request('server.add_peer', [features], + self.on_add_peer, timeout=self.timeout) + else: + self.maybe_close() - def close_if_done(self): - if not self.has_pending_requests(): - if self.bad: - self.peer.mark_bad() - elif self.remote_peers: - self.check_remote_peers() - # We might now be waiting for an add_peer response - if not self.has_pending_requests(): - self.shutdown_connection() + def on_add_peer(self, request): + '''We got a response the add_peer message. Don't care about its + form.''' + self.maybe_close() - def shutdown_connection(self): - is_good = not (self.failed or self.bad) - self.peer_mgr.set_verification_status(self.peer, self.kind, is_good) - self.close_connection() + def maybe_close(self): + '''Close the connection if no requests are outstanding, and mark peer + as good. + ''' + if not self.all_requests(): + self.close() + self.peer_mgr.set_verification_status(self.peer, self.kind, True) class PeerManager(util.LoggedClass): @@ -287,6 +273,26 @@ class PeerManager(util.LoggedClass): return [peer_data(peer) for peer in sorted(self.peers, key=peer_key)] + def features_to_register(self, peer, remote_peers): + '''If we should register ourselves to the remote peer, which has + reported the given list of known peers, return the clearnet + identity features to register, otherwise None. + ''' + self.add_peers(remote_peers) + + # Announce ourself if not present. Don't if disabled, we + # are a non-public IP address, or to ourselves. + if not self.env.peer_announce or peer in self.myselves: + return None + my = self.my_clearnet_peer() + if not my or not my.is_public: + return None + # Register if no matches, or ports have changed + for peer in my.matches(remote_peers): + if peer.tcp_port == my.tcp_port and peer.ssl_port == my.ssl_port: + return None + return my.features + def add_peers(self, peers, limit=2, check_ports=False, source=None): '''Add a limited number of peers that are not already present.''' retry = False @@ -505,45 +511,43 @@ class PeerManager(util.LoggedClass): def retry_peer(self, peer, port_pairs): peer.last_try = time.time() + + kwargs = {'loop': self.loop} + kind, port = port_pairs[0] - sslc = ssl.SSLContext(ssl.PROTOCOL_TLS) if kind == 'SSL' else None + if kind == 'SSL': + kwargs['ssl'] = ssl.SSLContext(ssl.PROTOCOL_TLS) host = self.env.cs_host(for_rpc=False) if isinstance(host, list): host = host[0] - kwargs = {'ssl': sslc} if self.env.force_proxy or peer.is_tor: - # Only attempt a proxy connection if we have one if not self.proxy: return - create_connection = self.proxy.create_connection - else: - create_connection = self.loop.create_connection - # Use our listening Host/IP for outgoing connections so - # our peers see the correct source. - if host: - kwargs['local_addr'] = (host, None) - - protocol_factory = partial(PeerSession, peer, self, kind) - coro = create_connection(protocol_factory, peer.host, port, **kwargs) - callback = partial(self.connection_done, peer, port_pairs) - self.ensure_future(coro, callback) - - def connection_done(self, peer, port_pairs, future): + kwargs['proxy'] = self.proxy + elif host: + # Use our listening Host/IP for outgoing non-proxy + # connections so our peers see the correct source. + kwargs['local_addr'] = (host, None) + + session = PeerSession(peer, self, kind, peer.host, port, **kwargs) + callback = partial(self.on_connected, session, peer, port_pairs) + self.ensure_future(session.create_connection(), callback) + + def on_connected(self, session, peer, port_pairs, future): '''Called when a connection attempt succeeds or fails. - If failed, log it and try remaining port pairs. If none, - release the connection count semaphore. + If failed, close the session, log it and try remaining port pairs. ''' exception = future.exception() if exception: - kind, port = port_pairs[0] - self.logger.info('failed connecting to {} at {} port {:d} ' - 'in {:.1f}s: {}' - .format(peer, kind, port, - time.time() - peer.last_try, exception)) - port_pairs = port_pairs[1:] + session.close() + kind, port = port_pairs.pop(0) + self.log_info('failed connecting to {} at {} port {:d} ' + 'in {:.1f}s: {}' + .format(peer, kind, port, + time.time() - peer.last_try, exception)) if port_pairs: self.retry_peer(peer, port_pairs) else: