Browse Source

Implement IRC support and related queries from clients

master
Neil Booth 8 years ago
parent
commit
501807bf1a
  1. 6
      README.rst
  2. 6
      docs/RELEASE-NOTES
  3. 24
      samples/scripts/NOTES
  4. 3
      server/block_processor.py
  5. 6
      server/env.py
  6. 137
      server/irc.py
  7. 86
      server/protocol.py
  8. 2
      setup.py

6
README.rst

@ -68,8 +68,9 @@ All of the following likely play a part:
but I cannot think how they could be easily indexable on a filesystem.
- avoiding unnecessary or redundant computations
- more efficient memory usage
- asyncio and asynchronous prefetch of blocks. ElectrumX should not
have any need of threads.
- asyncio and asynchronous prefetch of blocks.
ElectrumX should not have any need of threads.
Roadmap
@ -81,7 +82,6 @@ Roadmap
- improve DB abstraction so LMDB is not penalized
- continue to clean up the code and remove layering violations
- store all UTXOs, not just those with addresses
- implement IRC connectivity
- potentially move some functionality to C or C++
The above are in no particular order.

6
docs/RELEASE-NOTES

@ -1,3 +1,9 @@
version 0.4
-----------
- IRC connectivity. See the notes.
- logging improvements
Version 0.3.2, 0.3.3
--------------------

24
samples/scripts/NOTES

@ -34,13 +34,23 @@ BANNER_FILE - a path to a banner file to serve to clients. The banner file
is re-read for each new client.
DONATION_ADDRESS - server donation address. Defaults to none.
Your performance might change by tweaking the following cache
variables. Cache size is only checked roughly every minute, so the
caches can grow beyond the specified size. Also the Python process is
often quite a bit fatter than the combined cache size, because of
Python overhead and also because leveldb consumes a lot of memory
during UTXO flushing. So I recommend you set the sum of these to
nothing over half your available physical RAM:
If you want IRC connectivity to advertise your node:
IRC - set to anything non-empty
IRC_NICK - the nick to use when connecting to IRC. The default is a
hash of REPORT_HOST. Either way 'E_' will be prepended.
REPORT_HOST - the host to advertise. Defaults to HOST.
REPORT_SSL_PORT - the SSL port to advertise. Defaults to SSL_PORT.
REPORT_TCP_PORT - the TCP port to advertise. Defaults to TCP_PORT.
If synchronizing from the Genesis block your performance might change
by tweaking the following cache variables. Cache size is only checked
roughly every minute, so the caches can grow beyond the specified
size. Also the Python process is often quite a bit fatter than the
combined cache size, because of Python overhead and also because
leveldb consumes a lot of memory during UTXO flushing. So I recommend
you set the sum of these to nothing over half your available physical
RAM:
HIST_MB - amount of history cache, in MB, to retain before flushing to
disk. Default is 250; probably no benefit being much larger

3
server/block_processor.py

@ -201,7 +201,8 @@ class MemPool(LoggedClass):
hex_hashes.difference_update(self.txs)
raw_txs = await self.bp.daemon.getrawtransactions(hex_hashes)
if initial:
self.logger.info('all fetched, now analysing...')
self.logger.info('analysing {:,d} mempool txs...'
.format(len(raw_txs)))
new_txs = {hex_hash: Deserializer(raw_tx).read_tx()
for hex_hash, raw_tx in zip(hex_hashes, raw_txs) if raw_tx}
del raw_txs, hex_hashes

6
server/env.py

@ -45,6 +45,12 @@ class Env(LoggedClass):
self.db_engine = self.default('DB_ENGINE', 'leveldb')
self.debug = self.default('DEBUG', '')
self.debug = [item.lower() for item in self.debug.split()]
# IRC
self.report_tcp_port = self.integer('REPORT_TCP_PORT', self.tcp_port)
self.report_ssl_port = self.integer('REPORT_SSL_PORT', self.ssl_port)
self.report_host = self.default('REPORT_HOST', self.host)
self.irc_nick = self.default('IRC_NICK', None)
self.irc = self.default('IRC', False)
def default(self, envvar, default):
return environ.get(envvar, default)

137
server/irc.py

@ -0,0 +1,137 @@
# Copyright (c) 2016, Neil Booth
#
# All rights reserved.
#
# See the file "LICENCE" for information about the copyright
# and warranty status of this software.
'''IRC connectivity to discover peers.
Only calling start() requires the IRC Python module.
'''
import asyncio
import re
import socket
from collections import namedtuple
from lib.hash import double_sha256
from lib.util import LoggedClass
from server.version import VERSION
def port_text(letter, port, default):
if not port:
return ''
if port == default:
return letter
return letter + str(port)
class IRC(LoggedClass):
PEER_REGEXP = re.compile('(E_[^!]*)!')
Peer = namedtuple('Peer', 'ip_addr host ports')
class DisconnectedError(Exception):
pass
def __init__(self, env):
super().__init__()
tcp_text = port_text('t', env.report_tcp_port, 50001)
ssl_text = port_text('s', env.report_ssl_port, 50002)
version = 'X{}'.format(VERSION.split()[1])
self.real_name = '{} v{} {} {}'.format(env.report_host, version,
tcp_text, ssl_text)
self.nick = 'E_{}'.format(env.irc_nick if env.irc_nick else
double_sha256(env.report_host.encode())
[:5].hex())
self.peers = {}
async def start(self):
import irc.client as irc_client
self.logger.info('joining IRC with nick "{}" and real name "{}"'
.format(self.nick, self.real_name))
reactor = irc_client.Reactor()
for event in ['welcome', 'join', 'quit', 'kick', 'whoreply',
'namreply', 'disconnect']:
reactor.add_global_handler(event, getattr(self, 'on_' + event))
while True:
try:
connection = reactor.server()
connection.connect('irc.freenode.net', 6667,
self.nick, ircname=self.real_name)
connection.set_keepalive(60)
while True:
reactor.process_once()
await asyncio.sleep(2)
except irc_client.ServerConnectionError as e:
self.logger.error('connection error: {}'.format(e))
except self.DisconnectedError:
self.logger.error('disconnected')
await asyncio.sleep(10)
def log_event(self, event):
self.logger.info('IRC event type {} source {} args {}'
.format(event.type, event.source, event.arguments))
def on_welcome(self, connection, event):
'''Called when we connect to freenode.'''
connection.join('#electrum')
def on_disconnect(self, connection, event):
'''Called if we are disconnected.'''
self.log_event(event)
raise self.DisconnectedError
def on_join(self, connection, event):
'''Called when someone new connects to our channel, including us.'''
match = self.PEER_REGEXP.match(event.source)
if match:
connection.who(match.group(1))
def on_quit(self, connection, event):
'''Called when someone leaves our channel.'''
match = self.PEER_REGEXP.match(event.source)
if match:
self.peers.pop(match.group(1), None)
def on_kick(self, connection, event):
'''Called when someone is kicked from our channel.'''
self.log_event(event)
match = self.PEER_REGEXP.match(event.arguments[0])
if match:
self.peers.pop(match.group(1), None)
def on_namreply(self, connection, event):
'''Called repeatedly when we first connect to inform us of all users
in the channel.
The users are space-separated in the 2nd argument.
'''
for peer in event.arguments[2].split():
if peer.startswith("E_"):
connection.who(peer)
def on_whoreply(self, connection, event):
'''Called when a response to our who requests arrives.
The nick is the 4th argument, and real name is in the 6th
argument preceeded by '0 ' for some reason.
'''
try:
nick = event.arguments[4]
line = event.arguments[6].split()
try:
ip_addr = socket.gethostbyname(line[1])
except socket.error:
# No IPv4 address could be resolved. Could be .onion or IPv6.
ip_addr = line[1]
peer = self.Peer(ip_addr, line[1], line[2:])
self.peers[nick] = peer
self.logger.info('new {}'.format(peer))
except IndexError:
pass

86
server/protocol.py

@ -12,14 +12,16 @@ import asyncio
import codecs
import json
import ssl
import time
import traceback
from collections import namedtuple
from functools import partial
from server.block_processor import BlockProcessor
from server.daemon import DaemonError
from lib.hash import sha256, double_sha256, hash_to_str, hex_str_to_hash
from lib.util import LoggedClass
from server.block_processor import BlockProcessor
from server.daemon import DaemonError
from server.irc import IRC
from server.version import VERSION
@ -38,15 +40,19 @@ class BlockServer(BlockProcessor):
def __init__(self, env):
super().__init__(env)
self.servers = []
self.irc = IRC(env)
async def caught_up(self, mempool_hashes):
await super().caught_up(mempool_hashes)
if not self.servers:
await self.start_servers()
if self.env.irc:
asyncio.ensure_future(self.irc.start())
ElectrumX.notify(self.height, self.touched)
async def start_server(self, name, protocol, host, port, *, ssl=None):
async def start_server(self, class_name, kind, host, port, *, ssl=None):
loop = asyncio.get_event_loop()
protocol = partial(class_name, self.env, kind)
server = loop.create_server(protocol, host, port, ssl=ssl)
try:
self.servers.append(await server)
@ -54,10 +60,10 @@ class BlockServer(BlockProcessor):
raise
except Exception as e:
self.logger.error('{} server failed to listen on {}:{:d} :{}'
.format(name, host, port, e))
.format(kind, host, port, e))
else:
self.logger.info('{} server listening on {}:{:d}'
.format(name, host, port))
.format(kind, host, port))
async def start_servers(self):
'''Start listening on RPC, TCP and SSL ports.
@ -66,27 +72,27 @@ class BlockServer(BlockProcessor):
'''
env = self.env
JSONRPC.init(self, self.daemon, self.coin)
protocol = LocalRPC
if env.rpc_port is not None:
await self.start_server('RPC', protocol, 'localhost', env.rpc_port)
await self.start_server(LocalRPC, 'RPC', 'localhost', env.rpc_port)
protocol = partial(ElectrumX, env)
if env.tcp_port is not None:
await self.start_server('TCP', protocol, env.host, env.tcp_port)
await self.start_server(ElectrumX, 'TCP', env.host, env.tcp_port)
if env.ssl_port is not None:
# FIXME: update if we want to require Python >= 3.5.3
sslc = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
sslc.load_cert_chain(env.ssl_certfile, keyfile=env.ssl_keyfile)
await self.start_server('SSL', protocol, env.host, env.ssl_port,
ssl=sslc)
await self.start_server(ElectrumX, 'SSL', env.host,
env.ssl_port, ssl=sslc)
def stop(self):
'''Close the listening servers.'''
for server in self.servers:
server.close()
def irc_peers(self):
return self.irc.peers
AsyncTask = namedtuple('AsyncTask', 'session job')
@ -107,7 +113,7 @@ class SessionManager(LoggedClass):
self.sessions.remove(session)
if self.current_task and session == self.current_task.session:
self.logger.info('cancelling running task')
self.current_task.cancel()
self.current_task.job.cancel()
def add_task(self, session, job):
assert session in self.sessions
@ -142,11 +148,16 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
self.send_count = 0
self.send_size = 0
self.error_count = 0
self.hash168s = set()
self.start = time.time()
self.client = 'unknown'
self.peername = 'unknown'
def connection_made(self, transport):
'''Handle an incoming client connection.'''
self.transport = transport
self.peername = transport.get_extra_info('peername')
peer = transport.get_extra_info('peername')
self.peername = '{}:{}'.format(peer[0], peer[1])
self.logger.info('connection from {}'.format(self.peername))
self.SESSION_MGR.add_session(self)
@ -284,6 +295,10 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
cls.COIN = coin
cls.SESSION_MGR = SessionManager()
@classmethod
def irc_peers(cls):
return cls.BLOCK_PROCESSOR.irc_peers()
@classmethod
def height(cls):
'''Return the current height.'''
@ -306,10 +321,10 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
class ElectrumX(JSONRPC):
'''A TCP server that handles incoming Electrum connections.'''
def __init__(self, env):
def __init__(self, env, kind):
super().__init__()
self.env = env
self.hash168s = set()
self.kind = kind
self.subscribe_headers = False
self.subscribe_height = False
self.notified_height = None
@ -331,8 +346,7 @@ class ElectrumX(JSONRPC):
@classmethod
def watched_address_count(cls):
sessions = cls.SESSION_MGR.sessions
return sum(len(session.hash168s) for session in sessions
if isinstance(session, cls))
return sum(len(session.hash168s) for session in sessions)
@classmethod
def notify(cls, height, touched):
@ -349,6 +363,9 @@ class ElectrumX(JSONRPC):
hash168_to_address = cls.COIN.hash168_to_address
for session in cls.SESSION_MGR.sessions:
if not isinstance(session, ElectrumX):
continue
if height != session.notified_height:
session.notified_height = height
if session.subscribe_headers:
@ -402,12 +419,6 @@ class ElectrumX(JSONRPC):
return {"block_height": height, "merkle": merkle_branch, "pos": pos}
@classmethod
def irc_peers(cls):
'''Returns a dictionary of IRC nick to (ip, host, ports) tuples, one
per peer.'''
return {}
@classmethod
def height(cls):
return cls.BLOCK_PROCESSOR.height
@ -590,39 +601,52 @@ class ElectrumX(JSONRPC):
subscription.
'''
self.require_empty_params(params)
peers = ElectrumX.irc_peers()
return tuple(peers.values())
return list(self.irc_peers().values())
async def version(self, params):
'''Return the server version as a string.'''
if len(params) == 2:
self.client = str(params[0])
self.protocol_version = params[1]
return VERSION
class LocalRPC(JSONRPC):
'''A local TCP RPC server for querying status.'''
def __init__(self):
def __init__(self, env, kind):
super().__init__()
cmds = 'getinfo sessions numsessions peers numpeers'.split()
self.handlers = {cmd: getattr(self.__class__, cmd) for cmd in cmds}
self.env = env
self.kind = kind
async def getinfo(self, params):
return {
'blocks': self.height(),
'peers': len(ElectrumX.irc_peers()),
'peers': len(self.irc_peers()),
'sessions': len(self.SESSION_MGR.sessions),
'watched': ElectrumX.watched_address_count(),
'cached': 0,
}
async def sessions(self, params):
return []
now = time.time()
fmt = '{:<4} {:>21} {:>7} {:>12} {:>7}'
result = []
result.append(fmt.format('Type', 'Peer', 'Subs', 'Client', 'Time'))
for session in self.SESSION_MGR.sessions:
result.append(fmt.format(session.kind, session.peername,
'{:,d}'.format(len(session.hash168s)),
session.client,
'{:,d}'.format(int(now - session.start))))
return result
async def numsessions(self, params):
return len(self.SESSION_MGR.sessions)
async def peers(self, params):
return tuple(ElectrumX.irc_peers().keys())
return self.irc_peers()
async def numpeers(self, params):
return len(ElectrumX.irc_peers())
return len(self.irc_peers())

2
setup.py

@ -7,6 +7,8 @@ setuptools.setup(
version=VERSION.split()[-1],
scripts=['electrumx_server.py', 'electrumx_rpc.py'],
python_requires='>=3.5',
# "irc" package is only required if IRC connectivity is enabled
# via environment variables, in which case I've tested with 15.0.4
install_requires=['plyvel', 'aiohttp >= 1'],
packages=setuptools.find_packages(),
description='ElectrumX Server',

Loading…
Cancel
Save