Browse Source

Merge branch 'irc' into develop

master
Neil Booth 8 years ago
parent
commit
1d52e6ffb5
  1. 6
      README.rst
  2. 6
      docs/RELEASE-NOTES
  3. 25
      electrumx_rpc.py
  4. 24
      samples/scripts/NOTES
  5. 9
      server/block_processor.py
  6. 2
      server/daemon.py
  7. 6
      server/env.py
  8. 137
      server/irc.py
  9. 83
      server/protocol.py
  10. 2
      setup.py

6
README.rst

@ -68,8 +68,9 @@ All of the following likely play a part:
but I cannot think how they could be easily indexable on a filesystem. but I cannot think how they could be easily indexable on a filesystem.
- avoiding unnecessary or redundant computations - avoiding unnecessary or redundant computations
- more efficient memory usage - more efficient memory usage
- asyncio and asynchronous prefetch of blocks. ElectrumX should not - asyncio and asynchronous prefetch of blocks.
have any need of threads.
ElectrumX should not have any need of threads.
Roadmap Roadmap
@ -81,7 +82,6 @@ Roadmap
- improve DB abstraction so LMDB is not penalized - improve DB abstraction so LMDB is not penalized
- continue to clean up the code and remove layering violations - continue to clean up the code and remove layering violations
- store all UTXOs, not just those with addresses - store all UTXOs, not just those with addresses
- implement IRC connectivity
- potentially move some functionality to C or C++ - potentially move some functionality to C or C++
The above are in no particular order. The above are in no particular order.

6
docs/RELEASE-NOTES

@ -1,3 +1,9 @@
version 0.4
-----------
- IRC connectivity. See the notes.
- logging improvements
Version 0.3.2, 0.3.3 Version 0.3.2, 0.3.3
-------------------- --------------------

25
electrumx_rpc.py

@ -13,6 +13,7 @@
import argparse import argparse
import asyncio import asyncio
import json import json
import pprint
from functools import partial from functools import partial
from os import environ from os import environ
@ -21,6 +22,7 @@ class RPCClient(asyncio.Protocol):
def __init__(self, loop): def __init__(self, loop):
self.loop = loop self.loop = loop
self.method = None
def connection_made(self, transport): def connection_made(self, transport):
self.transport = transport self.transport = transport
@ -28,15 +30,28 @@ class RPCClient(asyncio.Protocol):
def connection_lost(self, exc): def connection_lost(self, exc):
self.loop.stop() self.loop.stop()
def send(self, payload): def send(self, method, params):
self.method = method
payload = {'method': method, 'params': params}
data = json.dumps(payload) + '\n' data = json.dumps(payload) + '\n'
self.transport.write(data.encode()) self.transport.write(data.encode())
def data_received(self, data): def data_received(self, data):
payload = json.loads(data.decode()) payload = json.loads(data.decode())
self.transport.close() self.transport.close()
print(json.dumps(payload, indent=4, sort_keys=True)) result = payload['result']
error = payload['error']
if error:
print("ERROR: {}".format(error))
else:
if self.method == 'sessions':
fmt = '{:<4} {:>23} {:>7} {:>15} {:>7}'
print(fmt.format('Type', 'Peer', 'Subs', 'Client', 'Time'))
for kind, peer, subs, client, time in result:
print(fmt.format(kind, peer, '{:,d}'.format(subs),
client, '{:,d}'.format(int(time))))
else:
pprint.pprint(result, indent=4)
def main(): def main():
'''Send the RPC command to the server and print the result.''' '''Send the RPC command to the server and print the result.'''
@ -52,14 +67,12 @@ def main():
if args.port is None: if args.port is None:
args.port = int(environ.get('ELECTRUMX_RPC_PORT', 8000)) args.port = int(environ.get('ELECTRUMX_RPC_PORT', 8000))
payload = {'method': args.command[0], 'params': args.param}
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
proto_factory = partial(RPCClient, loop) proto_factory = partial(RPCClient, loop)
coro = loop.create_connection(proto_factory, 'localhost', args.port) coro = loop.create_connection(proto_factory, 'localhost', args.port)
try: try:
transport, protocol = loop.run_until_complete(coro) transport, protocol = loop.run_until_complete(coro)
protocol.send(payload) protocol.send(args.command[0], args.param)
loop.run_forever() loop.run_forever()
except OSError: except OSError:
print('error connecting - is ElectrumX catching up or not running?') print('error connecting - is ElectrumX catching up or not running?')

24
samples/scripts/NOTES

@ -34,13 +34,23 @@ BANNER_FILE - a path to a banner file to serve to clients. The banner file
is re-read for each new client. is re-read for each new client.
DONATION_ADDRESS - server donation address. Defaults to none. DONATION_ADDRESS - server donation address. Defaults to none.
Your performance might change by tweaking the following cache If you want IRC connectivity to advertise your node:
variables. Cache size is only checked roughly every minute, so the
caches can grow beyond the specified size. Also the Python process is IRC - set to anything non-empty
often quite a bit fatter than the combined cache size, because of IRC_NICK - the nick to use when connecting to IRC. The default is a
Python overhead and also because leveldb consumes a lot of memory hash of REPORT_HOST. Either way 'E_' will be prepended.
during UTXO flushing. So I recommend you set the sum of these to REPORT_HOST - the host to advertise. Defaults to HOST.
nothing over half your available physical RAM: REPORT_SSL_PORT - the SSL port to advertise. Defaults to SSL_PORT.
REPORT_TCP_PORT - the TCP port to advertise. Defaults to TCP_PORT.
If synchronizing from the Genesis block your performance might change
by tweaking the following cache variables. Cache size is only checked
roughly every minute, so the caches can grow beyond the specified
size. Also the Python process is often quite a bit fatter than the
combined cache size, because of Python overhead and also because
leveldb consumes a lot of memory during UTXO flushing. So I recommend
you set the sum of these to nothing over half your available physical
RAM:
HIST_MB - amount of history cache, in MB, to retain before flushing to HIST_MB - amount of history cache, in MB, to retain before flushing to
disk. Default is 250; probably no benefit being much larger disk. Default is 250; probably no benefit being much larger

9
server/block_processor.py

@ -201,7 +201,8 @@ class MemPool(LoggedClass):
hex_hashes.difference_update(self.txs) hex_hashes.difference_update(self.txs)
raw_txs = await self.bp.daemon.getrawtransactions(hex_hashes) raw_txs = await self.bp.daemon.getrawtransactions(hex_hashes)
if initial: if initial:
self.logger.info('all fetched, now analysing...') self.logger.info('analysing {:,d} mempool txs...'
.format(len(raw_txs)))
new_txs = {hex_hash: Deserializer(raw_tx).read_tx() new_txs = {hex_hash: Deserializer(raw_tx).read_tx()
for hex_hash, raw_tx in zip(hex_hashes, raw_txs) if raw_tx} for hex_hash, raw_tx in zip(hex_hashes, raw_txs) if raw_tx}
del raw_txs, hex_hashes del raw_txs, hex_hashes
@ -691,12 +692,12 @@ class BlockProcessor(server.db.DB):
db_cache_size = len(self.db_cache) * 105 db_cache_size = len(self.db_cache) * 105
hist_cache_size = len(self.history) * 180 + self.history_size * 4 hist_cache_size = len(self.history) * 180 + self.history_size * 4
tx_hash_size = (self.tx_count - self.db_tx_count) * 74 tx_hash_size = (self.tx_count - self.db_tx_count) * 74
utxo_MB = (db_deletes_size + utxo_cache_size + tx_hash_size) // one_MB utxo_MB = (db_cache_size + utxo_cache_size + tx_hash_size) // one_MB
hist_MB = hist_cache_size // one_MB hist_MB = hist_cache_size // one_MB
self.logger.info('UTXOs: {:,d} deletes: {:,d} ' self.logger.info('UTXOs: {:,d} deletes: {:,d} '
'UTXOs {:,d}MB hist {:,d}MB' 'UTXOs {:,d}MB hist {:,d}MB'
.format(len(self.utxo_cache), len(self.db_deletes), .format(len(self.utxo_cache), self.db_deletes,
utxo_MB, hist_MB)) utxo_MB, hist_MB))
self.logger.info('our height: {:,d} daemon height: {:,d}' self.logger.info('our height: {:,d} daemon height: {:,d}'
.format(self.height, self.daemon.cached_height())) .format(self.height, self.daemon.cached_height()))
@ -989,7 +990,7 @@ class BlockProcessor(server.db.DB):
'DB spends: {:,d}' 'DB spends: {:,d}'
.format(len(self.utxo_cache) + self.utxo_cache_spends, .format(len(self.utxo_cache) + self.utxo_cache_spends,
self.utxo_cache_spends, self.utxo_cache_spends,
len(self.db_deletes) // 2)) self.db_deletes))
fs_flush_start = time.time() fs_flush_start = time.time()
self.fs_flush() self.fs_flush()

2
server/daemon.py

@ -31,7 +31,7 @@ class Daemon(util.LoggedClass):
super().__init__() super().__init__()
self.url = url self.url = url
self._height = None self._height = None
self.logger.info('connecting to daemon at URL {}'.format(url)) self.logger.info('connecting at URL {}'.format(url))
self.debug_caught_up = 'caught_up' in debug self.debug_caught_up = 'caught_up' in debug
# Limit concurrent RPC calls to this number. # Limit concurrent RPC calls to this number.
# See DEFAULT_HTTP_WORKQUEUE in bitcoind, which is typically 16 # See DEFAULT_HTTP_WORKQUEUE in bitcoind, which is typically 16

6
server/env.py

@ -45,6 +45,12 @@ class Env(LoggedClass):
self.db_engine = self.default('DB_ENGINE', 'leveldb') self.db_engine = self.default('DB_ENGINE', 'leveldb')
self.debug = self.default('DEBUG', '') self.debug = self.default('DEBUG', '')
self.debug = [item.lower() for item in self.debug.split()] self.debug = [item.lower() for item in self.debug.split()]
# IRC
self.report_tcp_port = self.integer('REPORT_TCP_PORT', self.tcp_port)
self.report_ssl_port = self.integer('REPORT_SSL_PORT', self.ssl_port)
self.report_host = self.default('REPORT_HOST', self.host)
self.irc_nick = self.default('IRC_NICK', None)
self.irc = self.default('IRC', False)
def default(self, envvar, default): def default(self, envvar, default):
return environ.get(envvar, default) return environ.get(envvar, default)

137
server/irc.py

@ -0,0 +1,137 @@
# Copyright (c) 2016, Neil Booth
#
# All rights reserved.
#
# See the file "LICENCE" for information about the copyright
# and warranty status of this software.
'''IRC connectivity to discover peers.
Only calling start() requires the IRC Python module.
'''
import asyncio
import re
import socket
from collections import namedtuple
from lib.hash import double_sha256
from lib.util import LoggedClass
from server.version import VERSION
def port_text(letter, port, default):
if not port:
return ''
if port == default:
return letter
return letter + str(port)
class IRC(LoggedClass):
PEER_REGEXP = re.compile('(E_[^!]*)!')
Peer = namedtuple('Peer', 'ip_addr host ports')
class DisconnectedError(Exception):
pass
def __init__(self, env):
super().__init__()
tcp_text = port_text('t', env.report_tcp_port, 50001)
ssl_text = port_text('s', env.report_ssl_port, 50002)
version = 'X{}'.format(VERSION.split()[1])
self.real_name = '{} v{} {} {}'.format(env.report_host, version,
tcp_text, ssl_text)
self.nick = 'E_{}'.format(env.irc_nick if env.irc_nick else
double_sha256(env.report_host.encode())
[:5].hex())
self.peers = {}
async def start(self):
import irc.client as irc_client
self.logger.info('joining IRC with nick "{}" and real name "{}"'
.format(self.nick, self.real_name))
reactor = irc_client.Reactor()
for event in ['welcome', 'join', 'quit', 'kick', 'whoreply',
'namreply', 'disconnect']:
reactor.add_global_handler(event, getattr(self, 'on_' + event))
while True:
try:
connection = reactor.server()
connection.connect('irc.freenode.net', 6667,
self.nick, ircname=self.real_name)
connection.set_keepalive(60)
while True:
reactor.process_once()
await asyncio.sleep(2)
except irc_client.ServerConnectionError as e:
self.logger.error('connection error: {}'.format(e))
except self.DisconnectedError:
self.logger.error('disconnected')
await asyncio.sleep(10)
def log_event(self, event):
self.logger.info('IRC event type {} source {} args {}'
.format(event.type, event.source, event.arguments))
def on_welcome(self, connection, event):
'''Called when we connect to freenode.'''
connection.join('#electrum')
def on_disconnect(self, connection, event):
'''Called if we are disconnected.'''
self.log_event(event)
raise self.DisconnectedError
def on_join(self, connection, event):
'''Called when someone new connects to our channel, including us.'''
match = self.PEER_REGEXP.match(event.source)
if match:
connection.who(match.group(1))
def on_quit(self, connection, event):
'''Called when someone leaves our channel.'''
match = self.PEER_REGEXP.match(event.source)
if match:
self.peers.pop(match.group(1), None)
def on_kick(self, connection, event):
'''Called when someone is kicked from our channel.'''
self.log_event(event)
match = self.PEER_REGEXP.match(event.arguments[0])
if match:
self.peers.pop(match.group(1), None)
def on_namreply(self, connection, event):
'''Called repeatedly when we first connect to inform us of all users
in the channel.
The users are space-separated in the 2nd argument.
'''
for peer in event.arguments[2].split():
if peer.startswith("E_"):
connection.who(peer)
def on_whoreply(self, connection, event):
'''Called when a response to our who requests arrives.
The nick is the 4th argument, and real name is in the 6th
argument preceeded by '0 ' for some reason.
'''
try:
nick = event.arguments[4]
line = event.arguments[6].split()
try:
ip_addr = socket.gethostbyname(line[1])
except socket.error:
# No IPv4 address could be resolved. Could be .onion or IPv6.
ip_addr = line[1]
peer = self.Peer(ip_addr, line[1], line[2:])
self.peers[nick] = peer
self.logger.info('new {}'.format(peer))
except IndexError:
pass

83
server/protocol.py

@ -12,14 +12,16 @@ import asyncio
import codecs import codecs
import json import json
import ssl import ssl
import time
import traceback import traceback
from collections import namedtuple from collections import namedtuple
from functools import partial from functools import partial
from server.block_processor import BlockProcessor
from server.daemon import DaemonError
from lib.hash import sha256, double_sha256, hash_to_str, hex_str_to_hash from lib.hash import sha256, double_sha256, hash_to_str, hex_str_to_hash
from lib.util import LoggedClass from lib.util import LoggedClass
from server.block_processor import BlockProcessor
from server.daemon import DaemonError
from server.irc import IRC
from server.version import VERSION from server.version import VERSION
@ -38,15 +40,19 @@ class BlockServer(BlockProcessor):
def __init__(self, env): def __init__(self, env):
super().__init__(env) super().__init__(env)
self.servers = [] self.servers = []
self.irc = IRC(env)
async def caught_up(self, mempool_hashes): async def caught_up(self, mempool_hashes):
await super().caught_up(mempool_hashes) await super().caught_up([]) #mempool_hashes)
if not self.servers: if not self.servers:
await self.start_servers() await self.start_servers()
if self.env.irc:
asyncio.ensure_future(self.irc.start())
ElectrumX.notify(self.height, self.touched) ElectrumX.notify(self.height, self.touched)
async def start_server(self, name, protocol, host, port, *, ssl=None): async def start_server(self, class_name, kind, host, port, *, ssl=None):
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
protocol = partial(class_name, self.env, kind)
server = loop.create_server(protocol, host, port, ssl=ssl) server = loop.create_server(protocol, host, port, ssl=ssl)
try: try:
self.servers.append(await server) self.servers.append(await server)
@ -54,10 +60,10 @@ class BlockServer(BlockProcessor):
raise raise
except Exception as e: except Exception as e:
self.logger.error('{} server failed to listen on {}:{:d} :{}' self.logger.error('{} server failed to listen on {}:{:d} :{}'
.format(name, host, port, e)) .format(kind, host, port, e))
else: else:
self.logger.info('{} server listening on {}:{:d}' self.logger.info('{} server listening on {}:{:d}'
.format(name, host, port)) .format(kind, host, port))
async def start_servers(self): async def start_servers(self):
'''Start listening on RPC, TCP and SSL ports. '''Start listening on RPC, TCP and SSL ports.
@ -66,27 +72,27 @@ class BlockServer(BlockProcessor):
''' '''
env = self.env env = self.env
JSONRPC.init(self, self.daemon, self.coin) JSONRPC.init(self, self.daemon, self.coin)
protocol = LocalRPC
if env.rpc_port is not None: if env.rpc_port is not None:
await self.start_server('RPC', protocol, 'localhost', env.rpc_port) await self.start_server(LocalRPC, 'RPC', 'localhost', env.rpc_port)
protocol = partial(ElectrumX, env)
if env.tcp_port is not None: if env.tcp_port is not None:
await self.start_server('TCP', protocol, env.host, env.tcp_port) await self.start_server(ElectrumX, 'TCP', env.host, env.tcp_port)
if env.ssl_port is not None: if env.ssl_port is not None:
# FIXME: update if we want to require Python >= 3.5.3 # FIXME: update if we want to require Python >= 3.5.3
sslc = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2) sslc = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
sslc.load_cert_chain(env.ssl_certfile, keyfile=env.ssl_keyfile) sslc.load_cert_chain(env.ssl_certfile, keyfile=env.ssl_keyfile)
await self.start_server('SSL', protocol, env.host, env.ssl_port, await self.start_server(ElectrumX, 'SSL', env.host,
ssl=sslc) env.ssl_port, ssl=sslc)
def stop(self): def stop(self):
'''Close the listening servers.''' '''Close the listening servers.'''
for server in self.servers: for server in self.servers:
server.close() server.close()
def irc_peers(self):
return self.irc.peers
AsyncTask = namedtuple('AsyncTask', 'session job') AsyncTask = namedtuple('AsyncTask', 'session job')
@ -107,7 +113,7 @@ class SessionManager(LoggedClass):
self.sessions.remove(session) self.sessions.remove(session)
if self.current_task and session == self.current_task.session: if self.current_task and session == self.current_task.session:
self.logger.info('cancelling running task') self.logger.info('cancelling running task')
self.current_task.cancel() self.current_task.job.cancel()
def add_task(self, session, job): def add_task(self, session, job):
assert session in self.sessions assert session in self.sessions
@ -142,11 +148,16 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
self.send_count = 0 self.send_count = 0
self.send_size = 0 self.send_size = 0
self.error_count = 0 self.error_count = 0
self.hash168s = set()
self.start = time.time()
self.client = 'unknown'
self.peername = 'unknown'
def connection_made(self, transport): def connection_made(self, transport):
'''Handle an incoming client connection.''' '''Handle an incoming client connection.'''
self.transport = transport self.transport = transport
self.peername = transport.get_extra_info('peername') peer = transport.get_extra_info('peername')
self.peername = '{}:{}'.format(peer[0], peer[1])
self.logger.info('connection from {}'.format(self.peername)) self.logger.info('connection from {}'.format(self.peername))
self.SESSION_MGR.add_session(self) self.SESSION_MGR.add_session(self)
@ -284,6 +295,10 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
cls.COIN = coin cls.COIN = coin
cls.SESSION_MGR = SessionManager() cls.SESSION_MGR = SessionManager()
@classmethod
def irc_peers(cls):
return cls.BLOCK_PROCESSOR.irc_peers()
@classmethod @classmethod
def height(cls): def height(cls):
'''Return the current height.''' '''Return the current height.'''
@ -306,10 +321,10 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
class ElectrumX(JSONRPC): class ElectrumX(JSONRPC):
'''A TCP server that handles incoming Electrum connections.''' '''A TCP server that handles incoming Electrum connections.'''
def __init__(self, env): def __init__(self, env, kind):
super().__init__() super().__init__()
self.env = env self.env = env
self.hash168s = set() self.kind = kind
self.subscribe_headers = False self.subscribe_headers = False
self.subscribe_height = False self.subscribe_height = False
self.notified_height = None self.notified_height = None
@ -331,8 +346,7 @@ class ElectrumX(JSONRPC):
@classmethod @classmethod
def watched_address_count(cls): def watched_address_count(cls):
sessions = cls.SESSION_MGR.sessions sessions = cls.SESSION_MGR.sessions
return sum(len(session.hash168s) for session in sessions return sum(len(session.hash168s) for session in sessions)
if isinstance(session, cls))
@classmethod @classmethod
def notify(cls, height, touched): def notify(cls, height, touched):
@ -349,6 +363,9 @@ class ElectrumX(JSONRPC):
hash168_to_address = cls.COIN.hash168_to_address hash168_to_address = cls.COIN.hash168_to_address
for session in cls.SESSION_MGR.sessions: for session in cls.SESSION_MGR.sessions:
if not isinstance(session, ElectrumX):
continue
if height != session.notified_height: if height != session.notified_height:
session.notified_height = height session.notified_height = height
if session.subscribe_headers: if session.subscribe_headers:
@ -402,12 +419,6 @@ class ElectrumX(JSONRPC):
return {"block_height": height, "merkle": merkle_branch, "pos": pos} return {"block_height": height, "merkle": merkle_branch, "pos": pos}
@classmethod
def irc_peers(cls):
'''Returns a dictionary of IRC nick to (ip, host, ports) tuples, one
per peer.'''
return {}
@classmethod @classmethod
def height(cls): def height(cls):
return cls.BLOCK_PROCESSOR.height return cls.BLOCK_PROCESSOR.height
@ -590,39 +601,47 @@ class ElectrumX(JSONRPC):
subscription. subscription.
''' '''
self.require_empty_params(params) self.require_empty_params(params)
peers = ElectrumX.irc_peers() return list(self.irc_peers().values())
return tuple(peers.values())
async def version(self, params): async def version(self, params):
'''Return the server version as a string.''' '''Return the server version as a string.'''
if len(params) == 2:
self.client = str(params[0])
self.protocol_version = params[1]
return VERSION return VERSION
class LocalRPC(JSONRPC): class LocalRPC(JSONRPC):
'''A local TCP RPC server for querying status.''' '''A local TCP RPC server for querying status.'''
def __init__(self): def __init__(self, env, kind):
super().__init__() super().__init__()
cmds = 'getinfo sessions numsessions peers numpeers'.split() cmds = 'getinfo sessions numsessions peers numpeers'.split()
self.handlers = {cmd: getattr(self.__class__, cmd) for cmd in cmds} self.handlers = {cmd: getattr(self.__class__, cmd) for cmd in cmds}
self.env = env
self.kind = kind
async def getinfo(self, params): async def getinfo(self, params):
return { return {
'blocks': self.height(), 'blocks': self.height(),
'peers': len(ElectrumX.irc_peers()), 'peers': len(self.irc_peers()),
'sessions': len(self.SESSION_MGR.sessions), 'sessions': len(self.SESSION_MGR.sessions),
'watched': ElectrumX.watched_address_count(), 'watched': ElectrumX.watched_address_count(),
'cached': 0, 'cached': 0,
} }
async def sessions(self, params): async def sessions(self, params):
return [] now = time.time()
return [(session.kind,
'this RPC client' if session == self else session.peername,
len(session.hash168s), session.client, now - session.start)
for session in self.SESSION_MGR.sessions]
async def numsessions(self, params): async def numsessions(self, params):
return len(self.SESSION_MGR.sessions) return len(self.SESSION_MGR.sessions)
async def peers(self, params): async def peers(self, params):
return tuple(ElectrumX.irc_peers().keys()) return self.irc_peers()
async def numpeers(self, params): async def numpeers(self, params):
return len(ElectrumX.irc_peers()) return len(self.irc_peers())

2
setup.py

@ -7,6 +7,8 @@ setuptools.setup(
version=VERSION.split()[-1], version=VERSION.split()[-1],
scripts=['electrumx_server.py', 'electrumx_rpc.py'], scripts=['electrumx_server.py', 'electrumx_rpc.py'],
python_requires='>=3.5', python_requires='>=3.5',
# "irc" package is only required if IRC connectivity is enabled
# via environment variables, in which case I've tested with 15.0.4
install_requires=['plyvel', 'aiohttp >= 1'], install_requires=['plyvel', 'aiohttp >= 1'],
packages=setuptools.find_packages(), packages=setuptools.find_packages(),
description='ElectrumX Server', description='ElectrumX Server',

Loading…
Cancel
Save