From 05a6da1920e0c5ab1051bcaf7310c307ae53fe01 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Sun, 22 Jan 2017 19:21:55 +0900 Subject: [PATCH] Move peer management to peers.py from irc It's cleaner and will be useful for peer-to-peer comms later --- docs/ENVIRONMENT.rst | 5 +- server/controller.py | 30 ++++------ server/env.py | 2 +- server/irc.py | 122 ++++++++++--------------------------- server/peers.py | 139 +++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 186 insertions(+), 112 deletions(-) create mode 100644 server/peers.py diff --git a/docs/ENVIRONMENT.rst b/docs/ENVIRONMENT.rst index 24e4993..f323356 100644 --- a/docs/ENVIRONMENT.rst +++ b/docs/ENVIRONMENT.rst @@ -239,8 +239,9 @@ connectivity on IRC: * **REPORT_HOST_TOR** - The tor .onion address to advertise. If set, an additional - connection to IRC happens with '_tor" appended to **IRC_NICK**. + The tor address to advertise; must end with `.onion`. If set, an + additional connection to IRC happens with '_tor' appended to + **IRC_NICK**. * **REPORT_TCP_PORT_TOR** diff --git a/server/controller.py b/server/controller.py index 015b2d1..f2a8a6d 100644 --- a/server/controller.py +++ b/server/controller.py @@ -23,8 +23,8 @@ from lib.hash import sha256, double_sha256, hash_to_str, hex_str_to_hash import lib.util as util from server.block_processor import BlockProcessor from server.daemon import Daemon, DaemonError -from server.irc import IRC from server.session import LocalRPC, ElectrumX +from server.peers import PeerManager from server.mempool import MemPool from server.version import VERSION @@ -61,7 +61,7 @@ class Controller(util.LoggedClass): self.daemon = Daemon(env.coin.daemon_urls(env.daemon_url)) self.bp = BlockProcessor(env, self.daemon) self.mempool = MemPool(self.bp) - self.irc = IRC(env) + self.peers = PeerManager(env) self.env = env self.servers = {} # Map of session to the key of its list in self.groups @@ -96,12 +96,14 @@ class Controller(util.LoggedClass): 'block.get_header block.get_chunk estimatefee relayfee ' 'transaction.get transaction.get_merkle utxo.get_address'), ('server', - 'banner donation_address peers.subscribe'), + 'banner donation_address'), ] - self.electrumx_handlers = {'.'.join([prefix, suffix]): - getattr(self, suffix.replace('.', '_')) - for prefix, suffixes in rpcs - for suffix in suffixes.split()} + handlers = {'.'.join([prefix, suffix]): + getattr(self, suffix.replace('.', '_')) + for prefix, suffixes in rpcs + for suffix in suffixes.split()} + handlers['server.peers.subscribe'] = self.peers.subscribe + self.electrumx_handlers = handlers async def mempool_transactions(self, hashX): '''Generate (hex_hash, tx_fee, unconfirmed) tuples for mempool @@ -207,11 +209,11 @@ class Controller(util.LoggedClass): async def await_bp_catchup(): '''Wait for the block processor to catch up. - When it has, start the servers and connect to IRC. + Then start the servers and the peer manager. ''' await self.bp.caught_up_event.wait() self.logger.info('block processor has caught up') - add_future(self.irc.start()) + add_future(self.peers.main_loop()) add_future(self.start_servers()) add_future(self.mempool.main_loop()) add_future(self.enqueue_delayed_sessions()) @@ -433,7 +435,7 @@ class Controller(util.LoggedClass): 'logged': len([s for s in self.sessions if s.log_me]), 'paused': sum(s.pause for s in self.sessions), 'pid': os.getpid(), - 'peers': len(self.irc.peers), + 'peers': self.peers.count(), 'requests': sum(s.requests_remaining() for s in self.sessions), 'sessions': self.session_count(), 'subs': self.sub_count(), @@ -593,7 +595,7 @@ class Controller(util.LoggedClass): async def rpc_peers(self): '''Return a list of server peers, currently taken from IRC.''' - return self.irc.peers + return self.peers.peer_list() async def rpc_reorg(self, count=3): '''Force a reorg of the given number of blocks. @@ -880,9 +882,3 @@ class Controller(util.LoggedClass): async def donation_address(self): '''Return the donation address as a string, empty if there is none.''' return self.env.donation_address - - async def peers_subscribe(self): - '''Returns the server peers as a list of (ip, host, ports) tuples. - - Despite the name this is not currently treated as a subscription.''' - return list(self.irc.peers.values()) diff --git a/server/env.py b/server/env.py index 196d24c..2304284 100644 --- a/server/env.py +++ b/server/env.py @@ -66,7 +66,7 @@ class Env(LoggedClass): self.report_ssl_port if self.report_ssl_port else self.ssl_port) - self.report_host_tor = self.default('REPORT_HOST_TOR', None) + self.report_host_tor = self.default('REPORT_HOST_TOR', '') def default(self, envvar, default): return environ.get(envvar, default) diff --git a/server/irc.py b/server/irc.py index 31154d6..469adc8 100644 --- a/server/irc.py +++ b/server/irc.py @@ -12,7 +12,6 @@ Only calling start() requires the IRC Python module. import asyncio import re -import socket from collections import namedtuple @@ -22,52 +21,26 @@ from lib.util import LoggedClass class IRC(LoggedClass): - Peer = namedtuple('Peer', 'ip_addr host ports') - class DisconnectedError(Exception): pass - def __init__(self, env): + def __init__(self, env, peer_mgr): super().__init__() - self.env = env + self.coin = env.coin + self.peer_mgr = peer_mgr # If this isn't something a peer or client expects # then you won't appear in the client's network dialog box - irc_address = (env.coin.IRC_SERVER, env.coin.IRC_PORT) self.channel = env.coin.IRC_CHANNEL self.prefix = env.coin.IRC_PREFIX - - self.clients = [] self.nick = '{}{}'.format(self.prefix, env.irc_nick if env.irc_nick else double_sha256(env.report_host.encode()) [:5].hex()) - self.clients.append(IrcClient(irc_address, self.nick, - env.report_host, - env.report_tcp_port, - env.report_ssl_port)) - if env.report_host_tor: - self.clients.append(IrcClient(irc_address, self.nick + '_tor', - env.report_host_tor, - env.report_tcp_port_tor, - env.report_ssl_port_tor)) - self.peer_regexp = re.compile('({}[^!]*)!'.format(self.prefix)) - self.peers = {} - async def start(self): + async def start(self, name_pairs): '''Start IRC connections if enabled in environment.''' - try: - if self.env.irc: - await self.join() - else: - self.logger.info('IRC is disabled') - except asyncio.CancelledError: - pass - except Exception as e: - self.logger.error(str(e)) - - async def join(self): import irc.client as irc_client from jaraco.stream import buffer @@ -77,21 +50,18 @@ class IRC(LoggedClass): # Register handlers for events we're interested in reactor = irc_client.Reactor() - for event in 'welcome join quit kick whoreply disconnect'.split(): + for event in 'welcome join quit whoreply disconnect'.split(): reactor.add_global_handler(event, getattr(self, 'on_' + event)) # Note: Multiple nicks in same channel will trigger duplicate events - for client in self.clients: - client.connection = reactor.server() + clients = [IrcClient(self.coin, real_name, self.nick + suffix, + reactor.server()) + for (real_name, suffix) in name_pairs] while True: try: - for client in self.clients: - self.logger.info('Joining IRC in {} as "{}" with ' - 'real name "{}"' - .format(self.channel, client.nick, - client.realname)) - client.connect() + for client in clients: + client.connect(self) while True: reactor.process_once() await asyncio.sleep(2) @@ -130,14 +100,7 @@ class IRC(LoggedClass): '''Called when someone leaves our channel.''' match = self.peer_regexp.match(event.source) if match: - self.peers.pop(match.group(1), None) - - def on_kick(self, connection, event): - '''Called when someone is kicked from our channel.''' - self.log_event(event) - match = self.peer_regexp.match(event.arguments[0]) - if match: - self.peers.pop(match.group(1), None) + self.peer_mgr.remove_irc_peer(match.group(1)) def on_whoreply(self, connection, event): '''Called when a response to our who requests arrives. @@ -145,50 +108,25 @@ class IRC(LoggedClass): The nick is the 4th argument, and real name is in the 6th argument preceeded by '0 ' for some reason. ''' - try: - nick = event.arguments[4] - if nick.startswith(self.prefix): - line = event.arguments[6].split() - try: - ip_addr = socket.gethostbyname(line[1]) - except socket.error: - # Could be .onion or IPv6. - ip_addr = line[1] - peer = self.Peer(ip_addr, line[1], line[2:]) - self.peers[nick] = peer - except (IndexError, UnicodeError): - # UnicodeError comes from invalid domains (issue #68) - pass - - -class IrcClient(LoggedClass): - - VERSION = '1.0' - DEFAULT_PORTS = {'t': 50001, 's': 50002} - - def __init__(self, irc_address, nick, host, tcp_port, ssl_port): - super().__init__() - self.irc_host, self.irc_port = irc_address + nick = event.arguments[4] + if nick.startswith(self.prefix): + line = event.arguments[6].split() + hostname, details = line[1], line[2:] + self.peer_mgr.add_irc_peer(nick, hostname, details) + + +class IrcClient(object): + + def __init__(self, coin, real_name, nick, server): + self.irc_host = coin.IRC_SERVER + self.irc_port = coin.IRC_PORT self.nick = nick - self.realname = self.create_realname(host, tcp_port, ssl_port) - self.connection = None + self.real_name = real_name + self.server = server - def connect(self, keepalive=60): + def connect(self, irc): '''Connect this client to its IRC server''' - self.connection.connect(self.irc_host, self.irc_port, self.nick, - ircname=self.realname) - self.connection.set_keepalive(keepalive) - - @classmethod - def create_realname(cls, host, tcp_port, ssl_port): - def port_text(letter, port): - if not port: - return '' - if port == cls.DEFAULT_PORTS.get(letter): - return ' ' + letter - else: - return ' ' + letter + str(port) - - tcp = port_text('t', tcp_port) - ssl = port_text('s', ssl_port) - return '{} v{}{}{}'.format(host, cls.VERSION, tcp, ssl) + irc.logger.info('joining {} as "{}" with real name "{}"' + .format(irc.channel, self.nick, self.real_name)) + self.server.connect(self.irc_host, self.irc_port, self.nick, + ircname=self.real_name) diff --git a/server/peers.py b/server/peers.py new file mode 100644 index 0000000..541c983 --- /dev/null +++ b/server/peers.py @@ -0,0 +1,139 @@ +# Copyright (c) 2017, Neil Booth +# +# All rights reserved. +# +# See the file "LICENCE" for information about the copyright +# and warranty status of this software. + +'''Peer management.''' + +import asyncio +import socket +import traceback +from collections import namedtuple +from functools import partial + +import lib.util as util +from server.irc import IRC + + +NetIdentity = namedtuple('NetIdentity', 'host tcp_port ssl_port nick_suffix') +IRCPeer = namedtuple('IRCPeer', 'ip_addr host details') + + +class PeerManager(util.LoggedClass): + '''Looks after the DB of peer network servers. + + Attempts to maintain a connection with up to 8 peers. + Issues a 'peers.subscribe' RPC to them and tells them our data. + ''' + VERSION = '1.0' + DEFAULT_PORTS = {'t': 50001, 's': 50002} + + def __init__(self, env): + super().__init__() + self.env = env + self.loop = asyncio.get_event_loop() + self.irc = IRC(env, self) + self.futures = set() + self.identities = [] + # Keyed by nick + self.irc_peers = {} + + # We can have a Tor identity inaddition to a normal one + self.identities.append(NetIdentity(env.report_host, + env.report_tcp_port, + env.report_ssl_port, + '')) + if env.report_host_tor.endswith('.onion'): + self.identities.append(NetIdentity(env.report_host_tor, + env.report_tcp_port_tor, + env.report_ssl_port_tor, + '_tor')) + + async def executor(self, func, *args, **kwargs): + '''Run func taking args in the executor.''' + await self.loop.run_in_executor(None, partial(func, *args, **kwargs)) + + @classmethod + def real_name(cls, identity): + '''Real name as used on IRC.''' + def port_text(letter, port): + if not port: + return '' + if port == cls.DEFAULT_PORTS.get(letter): + return ' ' + letter + else: + return ' ' + letter + str(port) + + tcp = port_text('t', identity.tcp_port) + ssl = port_text('s', identity.ssl_port) + return '{} v{}{}{}'.format(identity.host, cls.VERSION, tcp, ssl) + + def ensure_future(self, coro): + '''Convert a coro into a future and add it to our pending list + to be waited for.''' + self.futures.add(asyncio.ensure_future(coro)) + + def start_irc(self): + '''Start up the IRC connections if enabled.''' + if self.env.irc: + name_pairs = [(self.real_name(identity), identity.nick_suffix) + for identity in self.identities] + self.ensure_future(self.irc.start(name_pairs)) + else: + self.logger.info('IRC is disabled') + + async def main_loop(self): + '''Start and then enter the main loop.''' + self.start_irc() + + try: + while True: + await asyncio.sleep(10) + done = [future for future in self.futures if future.done()] + self.futures.difference_update(done) + for future in done: + try: + future.result() + except: + self.log_error(traceback.format_exc()) + finally: + for future in self.futures: + future.cancel() + + def dns_lookup_peer(self, nick, hostname, details): + try: + ip_addr = None + try: + ip_addr = socket.gethostbyname(hostname) + except socket.error: + pass # IPv6? + ip_addr = ip_addr or hostname + self.irc_peers[nick] = IRCPeer(ip_addr, hostname, details) + self.logger.info('new IRC peer {} at {} ({})' + .format(nick, hostname, details)) + except UnicodeError: + # UnicodeError comes from invalid domains (issue #68) + self.logger.info('IRC peer domain {} invalid'.format(hostname)) + + def add_irc_peer(self, *args): + '''Schedule DNS lookup of peer.''' + self.ensure_future(self.executor(self.dns_lookup_peer, *args)) + + def remove_irc_peer(self, nick): + '''Remove a peer from our IRC peers map.''' + self.logger.info('removing IRC peer {}'.format(nick)) + self.irc_peers.pop(nick, None) + + def count(self): + return len(self.irc_peers) + + def peer_list(self): + return self.irc_peers + + async def subscribe(self): + '''Returns the server peers as a list of (ip, host, details) tuples. + + Despite the name this is not currently treated as a subscription.''' + return list(self.irc_peers.values())