diff --git a/README.rst b/README.rst index ea75618..3efb463 100644 --- a/README.rst +++ b/README.rst @@ -68,8 +68,9 @@ All of the following likely play a part: but I cannot think how they could be easily indexable on a filesystem. - avoiding unnecessary or redundant computations - more efficient memory usage -- asyncio and asynchronous prefetch of blocks. ElectrumX should not - have any need of threads. +- asyncio and asynchronous prefetch of blocks. + +ElectrumX should not have any need of threads. Roadmap @@ -81,7 +82,6 @@ Roadmap - improve DB abstraction so LMDB is not penalized - continue to clean up the code and remove layering violations - store all UTXOs, not just those with addresses -- implement IRC connectivity - potentially move some functionality to C or C++ The above are in no particular order. diff --git a/docs/RELEASE-NOTES b/docs/RELEASE-NOTES index 92748c8..551811f 100644 --- a/docs/RELEASE-NOTES +++ b/docs/RELEASE-NOTES @@ -1,3 +1,9 @@ +version 0.4 +----------- + +- IRC connectivity. See the notes for environment variables, etc. +- logging improvements + Version 0.3.2, 0.3.3 -------------------- diff --git a/electrumx_rpc.py b/electrumx_rpc.py index 03e25c1..9d3a9c2 100755 --- a/electrumx_rpc.py +++ b/electrumx_rpc.py @@ -13,6 +13,7 @@ import argparse import asyncio import json +import pprint from functools import partial from os import environ @@ -21,6 +22,7 @@ class RPCClient(asyncio.Protocol): def __init__(self, loop): self.loop = loop + self.method = None def connection_made(self, transport): self.transport = transport @@ -28,15 +30,28 @@ class RPCClient(asyncio.Protocol): def connection_lost(self, exc): self.loop.stop() - def send(self, payload): + def send(self, method, params): + self.method = method + payload = {'method': method, 'params': params} data = json.dumps(payload) + '\n' self.transport.write(data.encode()) def data_received(self, data): payload = json.loads(data.decode()) self.transport.close() - print(json.dumps(payload, indent=4, sort_keys=True)) - + result = payload['result'] + error = payload['error'] + if error: + print("ERROR: {}".format(error)) + else: + if self.method == 'sessions': + fmt = '{:<4} {:>23} {:>7} {:>15} {:>7}' + print(fmt.format('Type', 'Peer', 'Subs', 'Client', 'Time')) + for kind, peer, subs, client, time in result: + print(fmt.format(kind, peer, '{:,d}'.format(subs), + client, '{:,d}'.format(int(time)))) + else: + pprint.pprint(result, indent=4) def main(): '''Send the RPC command to the server and print the result.''' @@ -52,14 +67,12 @@ def main(): if args.port is None: args.port = int(environ.get('ELECTRUMX_RPC_PORT', 8000)) - payload = {'method': args.command[0], 'params': args.param} - loop = asyncio.get_event_loop() proto_factory = partial(RPCClient, loop) coro = loop.create_connection(proto_factory, 'localhost', args.port) try: transport, protocol = loop.run_until_complete(coro) - protocol.send(payload) + protocol.send(args.command[0], args.param) loop.run_forever() except OSError: print('error connecting - is ElectrumX catching up or not running?') diff --git a/samples/scripts/NOTES b/samples/scripts/NOTES index 4b86bde..acc5898 100644 --- a/samples/scripts/NOTES +++ b/samples/scripts/NOTES @@ -34,13 +34,23 @@ BANNER_FILE - a path to a banner file to serve to clients. The banner file is re-read for each new client. DONATION_ADDRESS - server donation address. Defaults to none. -Your performance might change by tweaking the following cache -variables. Cache size is only checked roughly every minute, so the -caches can grow beyond the specified size. Also the Python process is -often quite a bit fatter than the combined cache size, because of -Python overhead and also because leveldb consumes a lot of memory -during UTXO flushing. So I recommend you set the sum of these to -nothing over half your available physical RAM: +If you want IRC connectivity to advertise your node: + +IRC - set to anything non-empty +IRC_NICK - the nick to use when connecting to IRC. The default is a + hash of REPORT_HOST. Either way 'E_' will be prepended. +REPORT_HOST - the host to advertise. Defaults to HOST. +REPORT_SSL_PORT - the SSL port to advertise. Defaults to SSL_PORT. +REPORT_TCP_PORT - the TCP port to advertise. Defaults to TCP_PORT. + +If synchronizing from the Genesis block your performance might change +by tweaking the following cache variables. Cache size is only checked +roughly every minute, so the caches can grow beyond the specified +size. Also the Python process is often quite a bit fatter than the +combined cache size, because of Python overhead and also because +leveldb consumes a lot of memory during UTXO flushing. So I recommend +you set the sum of these to nothing over half your available physical +RAM: HIST_MB - amount of history cache, in MB, to retain before flushing to disk. Default is 250; probably no benefit being much larger diff --git a/server/block_processor.py b/server/block_processor.py index d3ec0e0..28d067c 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -201,7 +201,8 @@ class MemPool(LoggedClass): hex_hashes.difference_update(self.txs) raw_txs = await self.bp.daemon.getrawtransactions(hex_hashes) if initial: - self.logger.info('all fetched, now analysing...') + self.logger.info('analysing {:,d} mempool txs...' + .format(len(raw_txs))) new_txs = {hex_hash: Deserializer(raw_tx).read_tx() for hex_hash, raw_tx in zip(hex_hashes, raw_txs) if raw_tx} del raw_txs, hex_hashes @@ -551,7 +552,6 @@ class BlockProcessor(server.db.DB): # as it reads the wall time. flush_history(batch) if flush_utxos: - self.fs_flush() self.flush_utxos(batch) self.flush_state(batch) self.logger.info('committing transaction...') @@ -604,7 +604,6 @@ class BlockProcessor(server.db.DB): def fs_flush(self): '''Flush the things stored on the filesystem.''' - flush_start = time.time() blocks_done = len(self.headers) prior_tx_count = (self.tx_counts[self.db_height] if self.db_height >= 0 else 0) @@ -649,8 +648,6 @@ class BlockProcessor(server.db.DB): self.tx_hashes = [] self.headers = [] - self.logger.info('FS flush took {:.1f} seconds' - .format(time.time() - flush_start)) def backup_history(self, batch, hash168s): self.logger.info('backing up history to height {:,d} tx_count {:,d}' @@ -694,19 +691,16 @@ class BlockProcessor(server.db.DB): utxo_cache_size = len(self.utxo_cache) * 187 db_cache_size = len(self.db_cache) * 105 hist_cache_size = len(self.history) * 180 + self.history_size * 4 - utxo_MB = (db_cache_size + utxo_cache_size) // one_MB + tx_hash_size = (self.tx_count - self.db_tx_count) * 74 + utxo_MB = (db_cache_size + utxo_cache_size + tx_hash_size) // one_MB hist_MB = hist_cache_size // one_MB - self.logger.info('cache stats at height {:,d} daemon height: {:,d}' + self.logger.info('UTXOs: {:,d} deletes: {:,d} ' + 'UTXOs {:,d}MB hist {:,d}MB' + .format(len(self.utxo_cache), self.db_deletes, + utxo_MB, hist_MB)) + self.logger.info('our height: {:,d} daemon height: {:,d}' .format(self.height, self.daemon.cached_height())) - self.logger.info(' entries: UTXO: {:,d} DB: {:,d} ' - 'hist addrs: {:,d} hist size {:,d}' - .format(len(self.utxo_cache), - len(self.db_cache), - len(self.history), - self.history_size)) - self.logger.info(' size: {:,d}MB (UTXOs {:,d}MB hist {:,d}MB)' - .format(utxo_MB + hist_MB, utxo_MB, hist_MB)) return utxo_MB, hist_MB def undo_key(self, height): @@ -989,9 +983,21 @@ class BlockProcessor(server.db.DB): # Care is needed because the writes generated by flushing the # UTXO state may have keys in common with our write cache or # may be in the DB already. - self.logger.info('flushing UTXOs: {:,d} txs and {:,d} blocks' - .format(self.tx_count - self.db_tx_count, - self.height - self.db_height)) + self.logger.info('flushing {:,d} blocks with {:,d} txs' + .format(self.height - self.db_height, + self.tx_count - self.db_tx_count)) + self.logger.info('UTXO cache adds: {:,d} spends: {:,d} ' + 'DB spends: {:,d}' + .format(len(self.utxo_cache) + self.utxo_cache_spends, + self.utxo_cache_spends, + self.db_deletes)) + + fs_flush_start = time.time() + self.fs_flush() + fs_flush_end = time.time() + self.logger.info('FS flush took {:.1f} seconds' + .format(fs_flush_end - fs_flush_start)) + collisions = 0 new_utxos = len(self.utxo_cache) @@ -1022,11 +1028,6 @@ class BlockProcessor(server.db.DB): adds = new_utxos + self.utxo_cache_spends - self.logger.info('UTXO cache adds: {:,d} spends: {:,d} ' - .format(adds, self.utxo_cache_spends)) - self.logger.info('DB adds: {:,d} spends: {:,d}, collisions: {:,d}' - .format(new_utxos, self.db_deletes, collisions)) - self.db_cache = {} self.utxo_cache_spends = self.db_deletes = 0 self.utxo_flush_count = self.flush_count @@ -1034,6 +1035,9 @@ class BlockProcessor(server.db.DB): self.db_height = self.height self.db_tip = self.tip + self.logger.info('UTXO flush took {:.1f} seconds' + .format(time.time() - fs_flush_end)) + def read_headers(self, start, count): # Read some from disk disk_count = min(count, self.db_height + 1 - start) diff --git a/server/daemon.py b/server/daemon.py index a5e006e..241b85e 100644 --- a/server/daemon.py +++ b/server/daemon.py @@ -31,7 +31,7 @@ class Daemon(util.LoggedClass): super().__init__() self.url = url self._height = None - self.logger.info('connecting to daemon at URL {}'.format(url)) + self.logger.info('connecting at URL {}'.format(url)) self.debug_caught_up = 'caught_up' in debug # Limit concurrent RPC calls to this number. # See DEFAULT_HTTP_WORKQUEUE in bitcoind, which is typically 16 diff --git a/server/env.py b/server/env.py index 403a277..4a08eb3 100644 --- a/server/env.py +++ b/server/env.py @@ -45,6 +45,12 @@ class Env(LoggedClass): self.db_engine = self.default('DB_ENGINE', 'leveldb') self.debug = self.default('DEBUG', '') self.debug = [item.lower() for item in self.debug.split()] + # IRC + self.report_tcp_port = self.integer('REPORT_TCP_PORT', self.tcp_port) + self.report_ssl_port = self.integer('REPORT_SSL_PORT', self.ssl_port) + self.report_host = self.default('REPORT_HOST', self.host) + self.irc_nick = self.default('IRC_NICK', None) + self.irc = self.default('IRC', False) def default(self, envvar, default): return environ.get(envvar, default) diff --git a/server/irc.py b/server/irc.py new file mode 100644 index 0000000..8a52f75 --- /dev/null +++ b/server/irc.py @@ -0,0 +1,137 @@ +# Copyright (c) 2016, Neil Booth +# +# All rights reserved. +# +# See the file "LICENCE" for information about the copyright +# and warranty status of this software. + +'''IRC connectivity to discover peers. + +Only calling start() requires the IRC Python module. +''' + +import asyncio +import re +import socket + +from collections import namedtuple + +from lib.hash import double_sha256 +from lib.util import LoggedClass +from server.version import VERSION + +def port_text(letter, port, default): + if not port: + return '' + if port == default: + return letter + return letter + str(port) + + +class IRC(LoggedClass): + + PEER_REGEXP = re.compile('(E_[^!]*)!') + Peer = namedtuple('Peer', 'ip_addr host ports') + + class DisconnectedError(Exception): + pass + + def __init__(self, env): + super().__init__() + tcp_text = port_text('t', env.report_tcp_port, 50001) + ssl_text = port_text('s', env.report_ssl_port, 50002) + version = 'X{}'.format(VERSION.split()[1]) + self.real_name = '{} v{} {} {}'.format(env.report_host, version, + tcp_text, ssl_text) + self.nick = 'E_{}'.format(env.irc_nick if env.irc_nick else + double_sha256(env.report_host.encode()) + [:5].hex()) + self.peers = {} + + async def start(self): + import irc.client as irc_client + + self.logger.info('joining IRC with nick "{}" and real name "{}"' + .format(self.nick, self.real_name)) + + reactor = irc_client.Reactor() + for event in ['welcome', 'join', 'quit', 'kick', 'whoreply', + 'namreply', 'disconnect']: + reactor.add_global_handler(event, getattr(self, 'on_' + event)) + + while True: + try: + connection = reactor.server() + connection.connect('irc.freenode.net', 6667, + self.nick, ircname=self.real_name) + connection.set_keepalive(60) + while True: + reactor.process_once() + await asyncio.sleep(2) + except irc_client.ServerConnectionError as e: + self.logger.error('connection error: {}'.format(e)) + except self.DisconnectedError: + self.logger.error('disconnected') + await asyncio.sleep(10) + + def log_event(self, event): + self.logger.info('IRC event type {} source {} args {}' + .format(event.type, event.source, event.arguments)) + + def on_welcome(self, connection, event): + '''Called when we connect to freenode.''' + connection.join('#electrum') + + def on_disconnect(self, connection, event): + '''Called if we are disconnected.''' + self.log_event(event) + raise self.DisconnectedError + + def on_join(self, connection, event): + '''Called when someone new connects to our channel, including us.''' + match = self.PEER_REGEXP.match(event.source) + if match: + connection.who(match.group(1)) + + def on_quit(self, connection, event): + '''Called when someone leaves our channel.''' + match = self.PEER_REGEXP.match(event.source) + if match: + self.peers.pop(match.group(1), None) + + def on_kick(self, connection, event): + '''Called when someone is kicked from our channel.''' + self.log_event(event) + match = self.PEER_REGEXP.match(event.arguments[0]) + if match: + self.peers.pop(match.group(1), None) + + def on_namreply(self, connection, event): + '''Called repeatedly when we first connect to inform us of all users + in the channel. + + The users are space-separated in the 2nd argument. + ''' + for peer in event.arguments[2].split(): + if peer.startswith("E_"): + connection.who(peer) + + def on_whoreply(self, connection, event): + '''Called when a response to our who requests arrives. + + The nick is the 4th argument, and real name is in the 6th + argument preceeded by '0 ' for some reason. + ''' + try: + nick = event.arguments[4] + line = event.arguments[6].split() + try: + ip_addr = socket.gethostbyname(line[1]) + except socket.error: + # No IPv4 address could be resolved. Could be .onion or IPv6. + ip_addr = line[1] + peer = self.Peer(ip_addr, line[1], line[2:]) + self.peers[nick] = peer + self.logger.info('new {}'.format(peer)) + except IndexError: + pass diff --git a/server/protocol.py b/server/protocol.py index 2d9946e..d5f2999 100644 --- a/server/protocol.py +++ b/server/protocol.py @@ -12,14 +12,16 @@ import asyncio import codecs import json import ssl +import time import traceback from collections import namedtuple from functools import partial -from server.block_processor import BlockProcessor -from server.daemon import DaemonError from lib.hash import sha256, double_sha256, hash_to_str, hex_str_to_hash from lib.util import LoggedClass +from server.block_processor import BlockProcessor +from server.daemon import DaemonError +from server.irc import IRC from server.version import VERSION @@ -38,15 +40,19 @@ class BlockServer(BlockProcessor): def __init__(self, env): super().__init__(env) self.servers = [] + self.irc = IRC(env) async def caught_up(self, mempool_hashes): await super().caught_up(mempool_hashes) if not self.servers: await self.start_servers() + if self.env.irc: + asyncio.ensure_future(self.irc.start()) ElectrumX.notify(self.height, self.touched) - async def start_server(self, name, protocol, host, port, *, ssl=None): + async def start_server(self, class_name, kind, host, port, *, ssl=None): loop = asyncio.get_event_loop() + protocol = partial(class_name, self.env, kind) server = loop.create_server(protocol, host, port, ssl=ssl) try: self.servers.append(await server) @@ -54,10 +60,10 @@ class BlockServer(BlockProcessor): raise except Exception as e: self.logger.error('{} server failed to listen on {}:{:d} :{}' - .format(name, host, port, e)) + .format(kind, host, port, e)) else: self.logger.info('{} server listening on {}:{:d}' - .format(name, host, port)) + .format(kind, host, port)) async def start_servers(self): '''Start listening on RPC, TCP and SSL ports. @@ -66,27 +72,27 @@ class BlockServer(BlockProcessor): ''' env = self.env JSONRPC.init(self, self.daemon, self.coin) - - protocol = LocalRPC if env.rpc_port is not None: - await self.start_server('RPC', protocol, 'localhost', env.rpc_port) + await self.start_server(LocalRPC, 'RPC', 'localhost', env.rpc_port) - protocol = partial(ElectrumX, env) if env.tcp_port is not None: - await self.start_server('TCP', protocol, env.host, env.tcp_port) + await self.start_server(ElectrumX, 'TCP', env.host, env.tcp_port) if env.ssl_port is not None: # FIXME: update if we want to require Python >= 3.5.3 sslc = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2) sslc.load_cert_chain(env.ssl_certfile, keyfile=env.ssl_keyfile) - await self.start_server('SSL', protocol, env.host, env.ssl_port, - ssl=sslc) + await self.start_server(ElectrumX, 'SSL', env.host, + env.ssl_port, ssl=sslc) def stop(self): '''Close the listening servers.''' for server in self.servers: server.close() + def irc_peers(self): + return self.irc.peers + AsyncTask = namedtuple('AsyncTask', 'session job') @@ -107,7 +113,7 @@ class SessionManager(LoggedClass): self.sessions.remove(session) if self.current_task and session == self.current_task.session: self.logger.info('cancelling running task') - self.current_task.cancel() + self.current_task.job.cancel() def add_task(self, session, job): assert session in self.sessions @@ -142,11 +148,16 @@ class JSONRPC(asyncio.Protocol, LoggedClass): self.send_count = 0 self.send_size = 0 self.error_count = 0 + self.hash168s = set() + self.start = time.time() + self.client = 'unknown' + self.peername = 'unknown' def connection_made(self, transport): '''Handle an incoming client connection.''' self.transport = transport - self.peername = transport.get_extra_info('peername') + peer = transport.get_extra_info('peername') + self.peername = '{}:{}'.format(peer[0], peer[1]) self.logger.info('connection from {}'.format(self.peername)) self.SESSION_MGR.add_session(self) @@ -284,6 +295,10 @@ class JSONRPC(asyncio.Protocol, LoggedClass): cls.COIN = coin cls.SESSION_MGR = SessionManager() + @classmethod + def irc_peers(cls): + return cls.BLOCK_PROCESSOR.irc_peers() + @classmethod def height(cls): '''Return the current height.''' @@ -306,10 +321,10 @@ class JSONRPC(asyncio.Protocol, LoggedClass): class ElectrumX(JSONRPC): '''A TCP server that handles incoming Electrum connections.''' - def __init__(self, env): + def __init__(self, env, kind): super().__init__() self.env = env - self.hash168s = set() + self.kind = kind self.subscribe_headers = False self.subscribe_height = False self.notified_height = None @@ -331,8 +346,7 @@ class ElectrumX(JSONRPC): @classmethod def watched_address_count(cls): sessions = cls.SESSION_MGR.sessions - return sum(len(session.hash168s) for session in sessions - if isinstance(session, cls)) + return sum(len(session.hash168s) for session in sessions) @classmethod def notify(cls, height, touched): @@ -349,6 +363,9 @@ class ElectrumX(JSONRPC): hash168_to_address = cls.COIN.hash168_to_address for session in cls.SESSION_MGR.sessions: + if not isinstance(session, ElectrumX): + continue + if height != session.notified_height: session.notified_height = height if session.subscribe_headers: @@ -402,12 +419,6 @@ class ElectrumX(JSONRPC): return {"block_height": height, "merkle": merkle_branch, "pos": pos} - @classmethod - def irc_peers(cls): - '''Returns a dictionary of IRC nick to (ip, host, ports) tuples, one - per peer.''' - return {} - @classmethod def height(cls): return cls.BLOCK_PROCESSOR.height @@ -590,39 +601,47 @@ class ElectrumX(JSONRPC): subscription. ''' self.require_empty_params(params) - peers = ElectrumX.irc_peers() - return tuple(peers.values()) + return list(self.irc_peers().values()) async def version(self, params): '''Return the server version as a string.''' + if len(params) == 2: + self.client = str(params[0]) + self.protocol_version = params[1] return VERSION class LocalRPC(JSONRPC): '''A local TCP RPC server for querying status.''' - def __init__(self): + def __init__(self, env, kind): super().__init__() cmds = 'getinfo sessions numsessions peers numpeers'.split() self.handlers = {cmd: getattr(self.__class__, cmd) for cmd in cmds} + self.env = env + self.kind = kind async def getinfo(self, params): return { 'blocks': self.height(), - 'peers': len(ElectrumX.irc_peers()), + 'peers': len(self.irc_peers()), 'sessions': len(self.SESSION_MGR.sessions), 'watched': ElectrumX.watched_address_count(), 'cached': 0, } async def sessions(self, params): - return [] + now = time.time() + return [(session.kind, + 'this RPC client' if session == self else session.peername, + len(session.hash168s), session.client, now - session.start) + for session in self.SESSION_MGR.sessions] async def numsessions(self, params): return len(self.SESSION_MGR.sessions) async def peers(self, params): - return tuple(ElectrumX.irc_peers().keys()) + return self.irc_peers() async def numpeers(self, params): - return len(ElectrumX.irc_peers()) + return len(self.irc_peers()) diff --git a/server/version.py b/server/version.py index a05ff67..9cd652a 100644 --- a/server/version.py +++ b/server/version.py @@ -1 +1 @@ -VERSION = "ElectrumX 0.3.3" +VERSION = "ElectrumX 0.4" diff --git a/setup.py b/setup.py index 827e807..be23287 100644 --- a/setup.py +++ b/setup.py @@ -7,6 +7,8 @@ setuptools.setup( version=VERSION.split()[-1], scripts=['electrumx_server.py', 'electrumx_rpc.py'], python_requires='>=3.5', + # "irc" package is only required if IRC connectivity is enabled + # via environment variables, in which case I've tested with 15.0.4 install_requires=['plyvel', 'aiohttp >= 1'], packages=setuptools.find_packages(), description='ElectrumX Server',