From 501807bf1a4d615b725826096122d73a1e7d2e78 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Sat, 12 Nov 2016 16:02:57 +0900 Subject: [PATCH] Implement IRC support and related queries from clients --- README.rst | 6 +- docs/RELEASE-NOTES | 6 ++ samples/scripts/NOTES | 24 +++++-- server/block_processor.py | 3 +- server/env.py | 6 ++ server/irc.py | 137 ++++++++++++++++++++++++++++++++++++++ server/protocol.py | 86 +++++++++++++++--------- setup.py | 2 + 8 files changed, 228 insertions(+), 42 deletions(-) create mode 100644 server/irc.py diff --git a/README.rst b/README.rst index ea75618..3efb463 100644 --- a/README.rst +++ b/README.rst @@ -68,8 +68,9 @@ All of the following likely play a part: but I cannot think how they could be easily indexable on a filesystem. - avoiding unnecessary or redundant computations - more efficient memory usage -- asyncio and asynchronous prefetch of blocks. ElectrumX should not - have any need of threads. +- asyncio and asynchronous prefetch of blocks. + +ElectrumX should not have any need of threads. Roadmap @@ -81,7 +82,6 @@ Roadmap - improve DB abstraction so LMDB is not penalized - continue to clean up the code and remove layering violations - store all UTXOs, not just those with addresses -- implement IRC connectivity - potentially move some functionality to C or C++ The above are in no particular order. diff --git a/docs/RELEASE-NOTES b/docs/RELEASE-NOTES index 92748c8..1b1d8eb 100644 --- a/docs/RELEASE-NOTES +++ b/docs/RELEASE-NOTES @@ -1,3 +1,9 @@ +version 0.4 +----------- + +- IRC connectivity. See the notes. +- logging improvements + Version 0.3.2, 0.3.3 -------------------- diff --git a/samples/scripts/NOTES b/samples/scripts/NOTES index 4b86bde..acc5898 100644 --- a/samples/scripts/NOTES +++ b/samples/scripts/NOTES @@ -34,13 +34,23 @@ BANNER_FILE - a path to a banner file to serve to clients. The banner file is re-read for each new client. DONATION_ADDRESS - server donation address. Defaults to none. -Your performance might change by tweaking the following cache -variables. Cache size is only checked roughly every minute, so the -caches can grow beyond the specified size. Also the Python process is -often quite a bit fatter than the combined cache size, because of -Python overhead and also because leveldb consumes a lot of memory -during UTXO flushing. So I recommend you set the sum of these to -nothing over half your available physical RAM: +If you want IRC connectivity to advertise your node: + +IRC - set to anything non-empty +IRC_NICK - the nick to use when connecting to IRC. The default is a + hash of REPORT_HOST. Either way 'E_' will be prepended. +REPORT_HOST - the host to advertise. Defaults to HOST. +REPORT_SSL_PORT - the SSL port to advertise. Defaults to SSL_PORT. +REPORT_TCP_PORT - the TCP port to advertise. Defaults to TCP_PORT. + +If synchronizing from the Genesis block your performance might change +by tweaking the following cache variables. Cache size is only checked +roughly every minute, so the caches can grow beyond the specified +size. Also the Python process is often quite a bit fatter than the +combined cache size, because of Python overhead and also because +leveldb consumes a lot of memory during UTXO flushing. So I recommend +you set the sum of these to nothing over half your available physical +RAM: HIST_MB - amount of history cache, in MB, to retain before flushing to disk. Default is 250; probably no benefit being much larger diff --git a/server/block_processor.py b/server/block_processor.py index 5e62ee9..c595fd6 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -201,7 +201,8 @@ class MemPool(LoggedClass): hex_hashes.difference_update(self.txs) raw_txs = await self.bp.daemon.getrawtransactions(hex_hashes) if initial: - self.logger.info('all fetched, now analysing...') + self.logger.info('analysing {:,d} mempool txs...' + .format(len(raw_txs))) new_txs = {hex_hash: Deserializer(raw_tx).read_tx() for hex_hash, raw_tx in zip(hex_hashes, raw_txs) if raw_tx} del raw_txs, hex_hashes diff --git a/server/env.py b/server/env.py index 403a277..4a08eb3 100644 --- a/server/env.py +++ b/server/env.py @@ -45,6 +45,12 @@ class Env(LoggedClass): self.db_engine = self.default('DB_ENGINE', 'leveldb') self.debug = self.default('DEBUG', '') self.debug = [item.lower() for item in self.debug.split()] + # IRC + self.report_tcp_port = self.integer('REPORT_TCP_PORT', self.tcp_port) + self.report_ssl_port = self.integer('REPORT_SSL_PORT', self.ssl_port) + self.report_host = self.default('REPORT_HOST', self.host) + self.irc_nick = self.default('IRC_NICK', None) + self.irc = self.default('IRC', False) def default(self, envvar, default): return environ.get(envvar, default) diff --git a/server/irc.py b/server/irc.py new file mode 100644 index 0000000..8a52f75 --- /dev/null +++ b/server/irc.py @@ -0,0 +1,137 @@ +# Copyright (c) 2016, Neil Booth +# +# All rights reserved. +# +# See the file "LICENCE" for information about the copyright +# and warranty status of this software. + +'''IRC connectivity to discover peers. + +Only calling start() requires the IRC Python module. +''' + +import asyncio +import re +import socket + +from collections import namedtuple + +from lib.hash import double_sha256 +from lib.util import LoggedClass +from server.version import VERSION + +def port_text(letter, port, default): + if not port: + return '' + if port == default: + return letter + return letter + str(port) + + +class IRC(LoggedClass): + + PEER_REGEXP = re.compile('(E_[^!]*)!') + Peer = namedtuple('Peer', 'ip_addr host ports') + + class DisconnectedError(Exception): + pass + + def __init__(self, env): + super().__init__() + tcp_text = port_text('t', env.report_tcp_port, 50001) + ssl_text = port_text('s', env.report_ssl_port, 50002) + version = 'X{}'.format(VERSION.split()[1]) + self.real_name = '{} v{} {} {}'.format(env.report_host, version, + tcp_text, ssl_text) + self.nick = 'E_{}'.format(env.irc_nick if env.irc_nick else + double_sha256(env.report_host.encode()) + [:5].hex()) + self.peers = {} + + async def start(self): + import irc.client as irc_client + + self.logger.info('joining IRC with nick "{}" and real name "{}"' + .format(self.nick, self.real_name)) + + reactor = irc_client.Reactor() + for event in ['welcome', 'join', 'quit', 'kick', 'whoreply', + 'namreply', 'disconnect']: + reactor.add_global_handler(event, getattr(self, 'on_' + event)) + + while True: + try: + connection = reactor.server() + connection.connect('irc.freenode.net', 6667, + self.nick, ircname=self.real_name) + connection.set_keepalive(60) + while True: + reactor.process_once() + await asyncio.sleep(2) + except irc_client.ServerConnectionError as e: + self.logger.error('connection error: {}'.format(e)) + except self.DisconnectedError: + self.logger.error('disconnected') + await asyncio.sleep(10) + + def log_event(self, event): + self.logger.info('IRC event type {} source {} args {}' + .format(event.type, event.source, event.arguments)) + + def on_welcome(self, connection, event): + '''Called when we connect to freenode.''' + connection.join('#electrum') + + def on_disconnect(self, connection, event): + '''Called if we are disconnected.''' + self.log_event(event) + raise self.DisconnectedError + + def on_join(self, connection, event): + '''Called when someone new connects to our channel, including us.''' + match = self.PEER_REGEXP.match(event.source) + if match: + connection.who(match.group(1)) + + def on_quit(self, connection, event): + '''Called when someone leaves our channel.''' + match = self.PEER_REGEXP.match(event.source) + if match: + self.peers.pop(match.group(1), None) + + def on_kick(self, connection, event): + '''Called when someone is kicked from our channel.''' + self.log_event(event) + match = self.PEER_REGEXP.match(event.arguments[0]) + if match: + self.peers.pop(match.group(1), None) + + def on_namreply(self, connection, event): + '''Called repeatedly when we first connect to inform us of all users + in the channel. + + The users are space-separated in the 2nd argument. + ''' + for peer in event.arguments[2].split(): + if peer.startswith("E_"): + connection.who(peer) + + def on_whoreply(self, connection, event): + '''Called when a response to our who requests arrives. + + The nick is the 4th argument, and real name is in the 6th + argument preceeded by '0 ' for some reason. + ''' + try: + nick = event.arguments[4] + line = event.arguments[6].split() + try: + ip_addr = socket.gethostbyname(line[1]) + except socket.error: + # No IPv4 address could be resolved. Could be .onion or IPv6. + ip_addr = line[1] + peer = self.Peer(ip_addr, line[1], line[2:]) + self.peers[nick] = peer + self.logger.info('new {}'.format(peer)) + except IndexError: + pass diff --git a/server/protocol.py b/server/protocol.py index 2d9946e..28aa07f 100644 --- a/server/protocol.py +++ b/server/protocol.py @@ -12,14 +12,16 @@ import asyncio import codecs import json import ssl +import time import traceback from collections import namedtuple from functools import partial -from server.block_processor import BlockProcessor -from server.daemon import DaemonError from lib.hash import sha256, double_sha256, hash_to_str, hex_str_to_hash from lib.util import LoggedClass +from server.block_processor import BlockProcessor +from server.daemon import DaemonError +from server.irc import IRC from server.version import VERSION @@ -38,15 +40,19 @@ class BlockServer(BlockProcessor): def __init__(self, env): super().__init__(env) self.servers = [] + self.irc = IRC(env) async def caught_up(self, mempool_hashes): await super().caught_up(mempool_hashes) if not self.servers: await self.start_servers() + if self.env.irc: + asyncio.ensure_future(self.irc.start()) ElectrumX.notify(self.height, self.touched) - async def start_server(self, name, protocol, host, port, *, ssl=None): + async def start_server(self, class_name, kind, host, port, *, ssl=None): loop = asyncio.get_event_loop() + protocol = partial(class_name, self.env, kind) server = loop.create_server(protocol, host, port, ssl=ssl) try: self.servers.append(await server) @@ -54,10 +60,10 @@ class BlockServer(BlockProcessor): raise except Exception as e: self.logger.error('{} server failed to listen on {}:{:d} :{}' - .format(name, host, port, e)) + .format(kind, host, port, e)) else: self.logger.info('{} server listening on {}:{:d}' - .format(name, host, port)) + .format(kind, host, port)) async def start_servers(self): '''Start listening on RPC, TCP and SSL ports. @@ -66,27 +72,27 @@ class BlockServer(BlockProcessor): ''' env = self.env JSONRPC.init(self, self.daemon, self.coin) - - protocol = LocalRPC if env.rpc_port is not None: - await self.start_server('RPC', protocol, 'localhost', env.rpc_port) + await self.start_server(LocalRPC, 'RPC', 'localhost', env.rpc_port) - protocol = partial(ElectrumX, env) if env.tcp_port is not None: - await self.start_server('TCP', protocol, env.host, env.tcp_port) + await self.start_server(ElectrumX, 'TCP', env.host, env.tcp_port) if env.ssl_port is not None: # FIXME: update if we want to require Python >= 3.5.3 sslc = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2) sslc.load_cert_chain(env.ssl_certfile, keyfile=env.ssl_keyfile) - await self.start_server('SSL', protocol, env.host, env.ssl_port, - ssl=sslc) + await self.start_server(ElectrumX, 'SSL', env.host, + env.ssl_port, ssl=sslc) def stop(self): '''Close the listening servers.''' for server in self.servers: server.close() + def irc_peers(self): + return self.irc.peers + AsyncTask = namedtuple('AsyncTask', 'session job') @@ -107,7 +113,7 @@ class SessionManager(LoggedClass): self.sessions.remove(session) if self.current_task and session == self.current_task.session: self.logger.info('cancelling running task') - self.current_task.cancel() + self.current_task.job.cancel() def add_task(self, session, job): assert session in self.sessions @@ -142,11 +148,16 @@ class JSONRPC(asyncio.Protocol, LoggedClass): self.send_count = 0 self.send_size = 0 self.error_count = 0 + self.hash168s = set() + self.start = time.time() + self.client = 'unknown' + self.peername = 'unknown' def connection_made(self, transport): '''Handle an incoming client connection.''' self.transport = transport - self.peername = transport.get_extra_info('peername') + peer = transport.get_extra_info('peername') + self.peername = '{}:{}'.format(peer[0], peer[1]) self.logger.info('connection from {}'.format(self.peername)) self.SESSION_MGR.add_session(self) @@ -284,6 +295,10 @@ class JSONRPC(asyncio.Protocol, LoggedClass): cls.COIN = coin cls.SESSION_MGR = SessionManager() + @classmethod + def irc_peers(cls): + return cls.BLOCK_PROCESSOR.irc_peers() + @classmethod def height(cls): '''Return the current height.''' @@ -306,10 +321,10 @@ class JSONRPC(asyncio.Protocol, LoggedClass): class ElectrumX(JSONRPC): '''A TCP server that handles incoming Electrum connections.''' - def __init__(self, env): + def __init__(self, env, kind): super().__init__() self.env = env - self.hash168s = set() + self.kind = kind self.subscribe_headers = False self.subscribe_height = False self.notified_height = None @@ -331,8 +346,7 @@ class ElectrumX(JSONRPC): @classmethod def watched_address_count(cls): sessions = cls.SESSION_MGR.sessions - return sum(len(session.hash168s) for session in sessions - if isinstance(session, cls)) + return sum(len(session.hash168s) for session in sessions) @classmethod def notify(cls, height, touched): @@ -349,6 +363,9 @@ class ElectrumX(JSONRPC): hash168_to_address = cls.COIN.hash168_to_address for session in cls.SESSION_MGR.sessions: + if not isinstance(session, ElectrumX): + continue + if height != session.notified_height: session.notified_height = height if session.subscribe_headers: @@ -402,12 +419,6 @@ class ElectrumX(JSONRPC): return {"block_height": height, "merkle": merkle_branch, "pos": pos} - @classmethod - def irc_peers(cls): - '''Returns a dictionary of IRC nick to (ip, host, ports) tuples, one - per peer.''' - return {} - @classmethod def height(cls): return cls.BLOCK_PROCESSOR.height @@ -590,39 +601,52 @@ class ElectrumX(JSONRPC): subscription. ''' self.require_empty_params(params) - peers = ElectrumX.irc_peers() - return tuple(peers.values()) + return list(self.irc_peers().values()) async def version(self, params): '''Return the server version as a string.''' + if len(params) == 2: + self.client = str(params[0]) + self.protocol_version = params[1] return VERSION class LocalRPC(JSONRPC): '''A local TCP RPC server for querying status.''' - def __init__(self): + def __init__(self, env, kind): super().__init__() cmds = 'getinfo sessions numsessions peers numpeers'.split() self.handlers = {cmd: getattr(self.__class__, cmd) for cmd in cmds} + self.env = env + self.kind = kind async def getinfo(self, params): return { 'blocks': self.height(), - 'peers': len(ElectrumX.irc_peers()), + 'peers': len(self.irc_peers()), 'sessions': len(self.SESSION_MGR.sessions), 'watched': ElectrumX.watched_address_count(), 'cached': 0, } async def sessions(self, params): - return [] + now = time.time() + fmt = '{:<4} {:>21} {:>7} {:>12} {:>7}' + result = [] + result.append(fmt.format('Type', 'Peer', 'Subs', 'Client', 'Time')) + for session in self.SESSION_MGR.sessions: + result.append(fmt.format(session.kind, session.peername, + '{:,d}'.format(len(session.hash168s)), + session.client, + '{:,d}'.format(int(now - session.start)))) + return result async def numsessions(self, params): return len(self.SESSION_MGR.sessions) async def peers(self, params): - return tuple(ElectrumX.irc_peers().keys()) + return self.irc_peers() async def numpeers(self, params): - return len(ElectrumX.irc_peers()) + return len(self.irc_peers()) diff --git a/setup.py b/setup.py index 827e807..be23287 100644 --- a/setup.py +++ b/setup.py @@ -7,6 +7,8 @@ setuptools.setup( version=VERSION.split()[-1], scripts=['electrumx_server.py', 'electrumx_rpc.py'], python_requires='>=3.5', + # "irc" package is only required if IRC connectivity is enabled + # via environment variables, in which case I've tested with 15.0.4 install_requires=['plyvel', 'aiohttp >= 1'], packages=setuptools.find_packages(), description='ElectrumX Server',