Browse Source

Move peer management to peers.py from irc

It's cleaner and will be useful for peer-to-peer comms later
master
Neil Booth 8 years ago
parent
commit
05a6da1920
  1. 5
      docs/ENVIRONMENT.rst
  2. 30
      server/controller.py
  3. 2
      server/env.py
  4. 122
      server/irc.py
  5. 139
      server/peers.py

5
docs/ENVIRONMENT.rst

@ -239,8 +239,9 @@ connectivity on IRC:
* **REPORT_HOST_TOR**
The tor .onion address to advertise. If set, an additional
connection to IRC happens with '_tor" appended to **IRC_NICK**.
The tor address to advertise; must end with `.onion`. If set, an
additional connection to IRC happens with '_tor' appended to
**IRC_NICK**.
* **REPORT_TCP_PORT_TOR**

30
server/controller.py

@ -23,8 +23,8 @@ from lib.hash import sha256, double_sha256, hash_to_str, hex_str_to_hash
import lib.util as util
from server.block_processor import BlockProcessor
from server.daemon import Daemon, DaemonError
from server.irc import IRC
from server.session import LocalRPC, ElectrumX
from server.peers import PeerManager
from server.mempool import MemPool
from server.version import VERSION
@ -61,7 +61,7 @@ class Controller(util.LoggedClass):
self.daemon = Daemon(env.coin.daemon_urls(env.daemon_url))
self.bp = BlockProcessor(env, self.daemon)
self.mempool = MemPool(self.bp)
self.irc = IRC(env)
self.peers = PeerManager(env)
self.env = env
self.servers = {}
# Map of session to the key of its list in self.groups
@ -96,12 +96,14 @@ class Controller(util.LoggedClass):
'block.get_header block.get_chunk estimatefee relayfee '
'transaction.get transaction.get_merkle utxo.get_address'),
('server',
'banner donation_address peers.subscribe'),
'banner donation_address'),
]
self.electrumx_handlers = {'.'.join([prefix, suffix]):
getattr(self, suffix.replace('.', '_'))
for prefix, suffixes in rpcs
for suffix in suffixes.split()}
handlers = {'.'.join([prefix, suffix]):
getattr(self, suffix.replace('.', '_'))
for prefix, suffixes in rpcs
for suffix in suffixes.split()}
handlers['server.peers.subscribe'] = self.peers.subscribe
self.electrumx_handlers = handlers
async def mempool_transactions(self, hashX):
'''Generate (hex_hash, tx_fee, unconfirmed) tuples for mempool
@ -207,11 +209,11 @@ class Controller(util.LoggedClass):
async def await_bp_catchup():
'''Wait for the block processor to catch up.
When it has, start the servers and connect to IRC.
Then start the servers and the peer manager.
'''
await self.bp.caught_up_event.wait()
self.logger.info('block processor has caught up')
add_future(self.irc.start())
add_future(self.peers.main_loop())
add_future(self.start_servers())
add_future(self.mempool.main_loop())
add_future(self.enqueue_delayed_sessions())
@ -433,7 +435,7 @@ class Controller(util.LoggedClass):
'logged': len([s for s in self.sessions if s.log_me]),
'paused': sum(s.pause for s in self.sessions),
'pid': os.getpid(),
'peers': len(self.irc.peers),
'peers': self.peers.count(),
'requests': sum(s.requests_remaining() for s in self.sessions),
'sessions': self.session_count(),
'subs': self.sub_count(),
@ -593,7 +595,7 @@ class Controller(util.LoggedClass):
async def rpc_peers(self):
'''Return a list of server peers, currently taken from IRC.'''
return self.irc.peers
return self.peers.peer_list()
async def rpc_reorg(self, count=3):
'''Force a reorg of the given number of blocks.
@ -880,9 +882,3 @@ class Controller(util.LoggedClass):
async def donation_address(self):
'''Return the donation address as a string, empty if there is none.'''
return self.env.donation_address
async def peers_subscribe(self):
'''Returns the server peers as a list of (ip, host, ports) tuples.
Despite the name this is not currently treated as a subscription.'''
return list(self.irc.peers.values())

2
server/env.py

@ -66,7 +66,7 @@ class Env(LoggedClass):
self.report_ssl_port
if self.report_ssl_port else
self.ssl_port)
self.report_host_tor = self.default('REPORT_HOST_TOR', None)
self.report_host_tor = self.default('REPORT_HOST_TOR', '')
def default(self, envvar, default):
return environ.get(envvar, default)

122
server/irc.py

@ -12,7 +12,6 @@ Only calling start() requires the IRC Python module.
import asyncio
import re
import socket
from collections import namedtuple
@ -22,52 +21,26 @@ from lib.util import LoggedClass
class IRC(LoggedClass):
Peer = namedtuple('Peer', 'ip_addr host ports')
class DisconnectedError(Exception):
pass
def __init__(self, env):
def __init__(self, env, peer_mgr):
super().__init__()
self.env = env
self.coin = env.coin
self.peer_mgr = peer_mgr
# If this isn't something a peer or client expects
# then you won't appear in the client's network dialog box
irc_address = (env.coin.IRC_SERVER, env.coin.IRC_PORT)
self.channel = env.coin.IRC_CHANNEL
self.prefix = env.coin.IRC_PREFIX
self.clients = []
self.nick = '{}{}'.format(self.prefix,
env.irc_nick if env.irc_nick else
double_sha256(env.report_host.encode())
[:5].hex())
self.clients.append(IrcClient(irc_address, self.nick,
env.report_host,
env.report_tcp_port,
env.report_ssl_port))
if env.report_host_tor:
self.clients.append(IrcClient(irc_address, self.nick + '_tor',
env.report_host_tor,
env.report_tcp_port_tor,
env.report_ssl_port_tor))
self.peer_regexp = re.compile('({}[^!]*)!'.format(self.prefix))
self.peers = {}
async def start(self):
async def start(self, name_pairs):
'''Start IRC connections if enabled in environment.'''
try:
if self.env.irc:
await self.join()
else:
self.logger.info('IRC is disabled')
except asyncio.CancelledError:
pass
except Exception as e:
self.logger.error(str(e))
async def join(self):
import irc.client as irc_client
from jaraco.stream import buffer
@ -77,21 +50,18 @@ class IRC(LoggedClass):
# Register handlers for events we're interested in
reactor = irc_client.Reactor()
for event in 'welcome join quit kick whoreply disconnect'.split():
for event in 'welcome join quit whoreply disconnect'.split():
reactor.add_global_handler(event, getattr(self, 'on_' + event))
# Note: Multiple nicks in same channel will trigger duplicate events
for client in self.clients:
client.connection = reactor.server()
clients = [IrcClient(self.coin, real_name, self.nick + suffix,
reactor.server())
for (real_name, suffix) in name_pairs]
while True:
try:
for client in self.clients:
self.logger.info('Joining IRC in {} as "{}" with '
'real name "{}"'
.format(self.channel, client.nick,
client.realname))
client.connect()
for client in clients:
client.connect(self)
while True:
reactor.process_once()
await asyncio.sleep(2)
@ -130,14 +100,7 @@ class IRC(LoggedClass):
'''Called when someone leaves our channel.'''
match = self.peer_regexp.match(event.source)
if match:
self.peers.pop(match.group(1), None)
def on_kick(self, connection, event):
'''Called when someone is kicked from our channel.'''
self.log_event(event)
match = self.peer_regexp.match(event.arguments[0])
if match:
self.peers.pop(match.group(1), None)
self.peer_mgr.remove_irc_peer(match.group(1))
def on_whoreply(self, connection, event):
'''Called when a response to our who requests arrives.
@ -145,50 +108,25 @@ class IRC(LoggedClass):
The nick is the 4th argument, and real name is in the 6th
argument preceeded by '0 ' for some reason.
'''
try:
nick = event.arguments[4]
if nick.startswith(self.prefix):
line = event.arguments[6].split()
try:
ip_addr = socket.gethostbyname(line[1])
except socket.error:
# Could be .onion or IPv6.
ip_addr = line[1]
peer = self.Peer(ip_addr, line[1], line[2:])
self.peers[nick] = peer
except (IndexError, UnicodeError):
# UnicodeError comes from invalid domains (issue #68)
pass
class IrcClient(LoggedClass):
VERSION = '1.0'
DEFAULT_PORTS = {'t': 50001, 's': 50002}
def __init__(self, irc_address, nick, host, tcp_port, ssl_port):
super().__init__()
self.irc_host, self.irc_port = irc_address
nick = event.arguments[4]
if nick.startswith(self.prefix):
line = event.arguments[6].split()
hostname, details = line[1], line[2:]
self.peer_mgr.add_irc_peer(nick, hostname, details)
class IrcClient(object):
def __init__(self, coin, real_name, nick, server):
self.irc_host = coin.IRC_SERVER
self.irc_port = coin.IRC_PORT
self.nick = nick
self.realname = self.create_realname(host, tcp_port, ssl_port)
self.connection = None
self.real_name = real_name
self.server = server
def connect(self, keepalive=60):
def connect(self, irc):
'''Connect this client to its IRC server'''
self.connection.connect(self.irc_host, self.irc_port, self.nick,
ircname=self.realname)
self.connection.set_keepalive(keepalive)
@classmethod
def create_realname(cls, host, tcp_port, ssl_port):
def port_text(letter, port):
if not port:
return ''
if port == cls.DEFAULT_PORTS.get(letter):
return ' ' + letter
else:
return ' ' + letter + str(port)
tcp = port_text('t', tcp_port)
ssl = port_text('s', ssl_port)
return '{} v{}{}{}'.format(host, cls.VERSION, tcp, ssl)
irc.logger.info('joining {} as "{}" with real name "{}"'
.format(irc.channel, self.nick, self.real_name))
self.server.connect(self.irc_host, self.irc_port, self.nick,
ircname=self.real_name)

139
server/peers.py

@ -0,0 +1,139 @@
# Copyright (c) 2017, Neil Booth
#
# All rights reserved.
#
# See the file "LICENCE" for information about the copyright
# and warranty status of this software.
'''Peer management.'''
import asyncio
import socket
import traceback
from collections import namedtuple
from functools import partial
import lib.util as util
from server.irc import IRC
NetIdentity = namedtuple('NetIdentity', 'host tcp_port ssl_port nick_suffix')
IRCPeer = namedtuple('IRCPeer', 'ip_addr host details')
class PeerManager(util.LoggedClass):
'''Looks after the DB of peer network servers.
Attempts to maintain a connection with up to 8 peers.
Issues a 'peers.subscribe' RPC to them and tells them our data.
'''
VERSION = '1.0'
DEFAULT_PORTS = {'t': 50001, 's': 50002}
def __init__(self, env):
super().__init__()
self.env = env
self.loop = asyncio.get_event_loop()
self.irc = IRC(env, self)
self.futures = set()
self.identities = []
# Keyed by nick
self.irc_peers = {}
# We can have a Tor identity inaddition to a normal one
self.identities.append(NetIdentity(env.report_host,
env.report_tcp_port,
env.report_ssl_port,
''))
if env.report_host_tor.endswith('.onion'):
self.identities.append(NetIdentity(env.report_host_tor,
env.report_tcp_port_tor,
env.report_ssl_port_tor,
'_tor'))
async def executor(self, func, *args, **kwargs):
'''Run func taking args in the executor.'''
await self.loop.run_in_executor(None, partial(func, *args, **kwargs))
@classmethod
def real_name(cls, identity):
'''Real name as used on IRC.'''
def port_text(letter, port):
if not port:
return ''
if port == cls.DEFAULT_PORTS.get(letter):
return ' ' + letter
else:
return ' ' + letter + str(port)
tcp = port_text('t', identity.tcp_port)
ssl = port_text('s', identity.ssl_port)
return '{} v{}{}{}'.format(identity.host, cls.VERSION, tcp, ssl)
def ensure_future(self, coro):
'''Convert a coro into a future and add it to our pending list
to be waited for.'''
self.futures.add(asyncio.ensure_future(coro))
def start_irc(self):
'''Start up the IRC connections if enabled.'''
if self.env.irc:
name_pairs = [(self.real_name(identity), identity.nick_suffix)
for identity in self.identities]
self.ensure_future(self.irc.start(name_pairs))
else:
self.logger.info('IRC is disabled')
async def main_loop(self):
'''Start and then enter the main loop.'''
self.start_irc()
try:
while True:
await asyncio.sleep(10)
done = [future for future in self.futures if future.done()]
self.futures.difference_update(done)
for future in done:
try:
future.result()
except:
self.log_error(traceback.format_exc())
finally:
for future in self.futures:
future.cancel()
def dns_lookup_peer(self, nick, hostname, details):
try:
ip_addr = None
try:
ip_addr = socket.gethostbyname(hostname)
except socket.error:
pass # IPv6?
ip_addr = ip_addr or hostname
self.irc_peers[nick] = IRCPeer(ip_addr, hostname, details)
self.logger.info('new IRC peer {} at {} ({})'
.format(nick, hostname, details))
except UnicodeError:
# UnicodeError comes from invalid domains (issue #68)
self.logger.info('IRC peer domain {} invalid'.format(hostname))
def add_irc_peer(self, *args):
'''Schedule DNS lookup of peer.'''
self.ensure_future(self.executor(self.dns_lookup_peer, *args))
def remove_irc_peer(self, nick):
'''Remove a peer from our IRC peers map.'''
self.logger.info('removing IRC peer {}'.format(nick))
self.irc_peers.pop(nick, None)
def count(self):
return len(self.irc_peers)
def peer_list(self):
return self.irc_peers
async def subscribe(self):
'''Returns the server peers as a list of (ip, host, details) tuples.
Despite the name this is not currently treated as a subscription.'''
return list(self.irc_peers.values())
Loading…
Cancel
Save