Browse Source

Merge branch 'release-0.4'

master 0.4
Neil Booth 8 years ago
parent
commit
2de3325152
  1. 6
      README.rst
  2. 6
      docs/RELEASE-NOTES
  3. 25
      electrumx_rpc.py
  4. 24
      samples/scripts/NOTES
  5. 50
      server/block_processor.py
  6. 2
      server/daemon.py
  7. 6
      server/env.py
  8. 137
      server/irc.py
  9. 81
      server/protocol.py
  10. 2
      server/version.py
  11. 2
      setup.py

6
README.rst

@ -68,8 +68,9 @@ All of the following likely play a part:
but I cannot think how they could be easily indexable on a filesystem.
- avoiding unnecessary or redundant computations
- more efficient memory usage
- asyncio and asynchronous prefetch of blocks. ElectrumX should not
have any need of threads.
- asyncio and asynchronous prefetch of blocks.
ElectrumX should not have any need of threads.
Roadmap
@ -81,7 +82,6 @@ Roadmap
- improve DB abstraction so LMDB is not penalized
- continue to clean up the code and remove layering violations
- store all UTXOs, not just those with addresses
- implement IRC connectivity
- potentially move some functionality to C or C++
The above are in no particular order.

6
docs/RELEASE-NOTES

@ -1,3 +1,9 @@
version 0.4
-----------
- IRC connectivity. See the notes for environment variables, etc.
- logging improvements
Version 0.3.2, 0.3.3
--------------------

25
electrumx_rpc.py

@ -13,6 +13,7 @@
import argparse
import asyncio
import json
import pprint
from functools import partial
from os import environ
@ -21,6 +22,7 @@ class RPCClient(asyncio.Protocol):
def __init__(self, loop):
self.loop = loop
self.method = None
def connection_made(self, transport):
self.transport = transport
@ -28,15 +30,28 @@ class RPCClient(asyncio.Protocol):
def connection_lost(self, exc):
self.loop.stop()
def send(self, payload):
def send(self, method, params):
self.method = method
payload = {'method': method, 'params': params}
data = json.dumps(payload) + '\n'
self.transport.write(data.encode())
def data_received(self, data):
payload = json.loads(data.decode())
self.transport.close()
print(json.dumps(payload, indent=4, sort_keys=True))
result = payload['result']
error = payload['error']
if error:
print("ERROR: {}".format(error))
else:
if self.method == 'sessions':
fmt = '{:<4} {:>23} {:>7} {:>15} {:>7}'
print(fmt.format('Type', 'Peer', 'Subs', 'Client', 'Time'))
for kind, peer, subs, client, time in result:
print(fmt.format(kind, peer, '{:,d}'.format(subs),
client, '{:,d}'.format(int(time))))
else:
pprint.pprint(result, indent=4)
def main():
'''Send the RPC command to the server and print the result.'''
@ -52,14 +67,12 @@ def main():
if args.port is None:
args.port = int(environ.get('ELECTRUMX_RPC_PORT', 8000))
payload = {'method': args.command[0], 'params': args.param}
loop = asyncio.get_event_loop()
proto_factory = partial(RPCClient, loop)
coro = loop.create_connection(proto_factory, 'localhost', args.port)
try:
transport, protocol = loop.run_until_complete(coro)
protocol.send(payload)
protocol.send(args.command[0], args.param)
loop.run_forever()
except OSError:
print('error connecting - is ElectrumX catching up or not running?')

24
samples/scripts/NOTES

@ -34,13 +34,23 @@ BANNER_FILE - a path to a banner file to serve to clients. The banner file
is re-read for each new client.
DONATION_ADDRESS - server donation address. Defaults to none.
Your performance might change by tweaking the following cache
variables. Cache size is only checked roughly every minute, so the
caches can grow beyond the specified size. Also the Python process is
often quite a bit fatter than the combined cache size, because of
Python overhead and also because leveldb consumes a lot of memory
during UTXO flushing. So I recommend you set the sum of these to
nothing over half your available physical RAM:
If you want IRC connectivity to advertise your node:
IRC - set to anything non-empty
IRC_NICK - the nick to use when connecting to IRC. The default is a
hash of REPORT_HOST. Either way 'E_' will be prepended.
REPORT_HOST - the host to advertise. Defaults to HOST.
REPORT_SSL_PORT - the SSL port to advertise. Defaults to SSL_PORT.
REPORT_TCP_PORT - the TCP port to advertise. Defaults to TCP_PORT.
If synchronizing from the Genesis block your performance might change
by tweaking the following cache variables. Cache size is only checked
roughly every minute, so the caches can grow beyond the specified
size. Also the Python process is often quite a bit fatter than the
combined cache size, because of Python overhead and also because
leveldb consumes a lot of memory during UTXO flushing. So I recommend
you set the sum of these to nothing over half your available physical
RAM:
HIST_MB - amount of history cache, in MB, to retain before flushing to
disk. Default is 250; probably no benefit being much larger

50
server/block_processor.py

@ -201,7 +201,8 @@ class MemPool(LoggedClass):
hex_hashes.difference_update(self.txs)
raw_txs = await self.bp.daemon.getrawtransactions(hex_hashes)
if initial:
self.logger.info('all fetched, now analysing...')
self.logger.info('analysing {:,d} mempool txs...'
.format(len(raw_txs)))
new_txs = {hex_hash: Deserializer(raw_tx).read_tx()
for hex_hash, raw_tx in zip(hex_hashes, raw_txs) if raw_tx}
del raw_txs, hex_hashes
@ -551,7 +552,6 @@ class BlockProcessor(server.db.DB):
# as it reads the wall time.
flush_history(batch)
if flush_utxos:
self.fs_flush()
self.flush_utxos(batch)
self.flush_state(batch)
self.logger.info('committing transaction...')
@ -604,7 +604,6 @@ class BlockProcessor(server.db.DB):
def fs_flush(self):
'''Flush the things stored on the filesystem.'''
flush_start = time.time()
blocks_done = len(self.headers)
prior_tx_count = (self.tx_counts[self.db_height]
if self.db_height >= 0 else 0)
@ -649,8 +648,6 @@ class BlockProcessor(server.db.DB):
self.tx_hashes = []
self.headers = []
self.logger.info('FS flush took {:.1f} seconds'
.format(time.time() - flush_start))
def backup_history(self, batch, hash168s):
self.logger.info('backing up history to height {:,d} tx_count {:,d}'
@ -694,19 +691,16 @@ class BlockProcessor(server.db.DB):
utxo_cache_size = len(self.utxo_cache) * 187
db_cache_size = len(self.db_cache) * 105
hist_cache_size = len(self.history) * 180 + self.history_size * 4
utxo_MB = (db_cache_size + utxo_cache_size) // one_MB
tx_hash_size = (self.tx_count - self.db_tx_count) * 74
utxo_MB = (db_cache_size + utxo_cache_size + tx_hash_size) // one_MB
hist_MB = hist_cache_size // one_MB
self.logger.info('cache stats at height {:,d} daemon height: {:,d}'
self.logger.info('UTXOs: {:,d} deletes: {:,d} '
'UTXOs {:,d}MB hist {:,d}MB'
.format(len(self.utxo_cache), self.db_deletes,
utxo_MB, hist_MB))
self.logger.info('our height: {:,d} daemon height: {:,d}'
.format(self.height, self.daemon.cached_height()))
self.logger.info(' entries: UTXO: {:,d} DB: {:,d} '
'hist addrs: {:,d} hist size {:,d}'
.format(len(self.utxo_cache),
len(self.db_cache),
len(self.history),
self.history_size))
self.logger.info(' size: {:,d}MB (UTXOs {:,d}MB hist {:,d}MB)'
.format(utxo_MB + hist_MB, utxo_MB, hist_MB))
return utxo_MB, hist_MB
def undo_key(self, height):
@ -989,9 +983,21 @@ class BlockProcessor(server.db.DB):
# Care is needed because the writes generated by flushing the
# UTXO state may have keys in common with our write cache or
# may be in the DB already.
self.logger.info('flushing UTXOs: {:,d} txs and {:,d} blocks'
.format(self.tx_count - self.db_tx_count,
self.height - self.db_height))
self.logger.info('flushing {:,d} blocks with {:,d} txs'
.format(self.height - self.db_height,
self.tx_count - self.db_tx_count))
self.logger.info('UTXO cache adds: {:,d} spends: {:,d} '
'DB spends: {:,d}'
.format(len(self.utxo_cache) + self.utxo_cache_spends,
self.utxo_cache_spends,
self.db_deletes))
fs_flush_start = time.time()
self.fs_flush()
fs_flush_end = time.time()
self.logger.info('FS flush took {:.1f} seconds'
.format(fs_flush_end - fs_flush_start))
collisions = 0
new_utxos = len(self.utxo_cache)
@ -1022,11 +1028,6 @@ class BlockProcessor(server.db.DB):
adds = new_utxos + self.utxo_cache_spends
self.logger.info('UTXO cache adds: {:,d} spends: {:,d} '
.format(adds, self.utxo_cache_spends))
self.logger.info('DB adds: {:,d} spends: {:,d}, collisions: {:,d}'
.format(new_utxos, self.db_deletes, collisions))
self.db_cache = {}
self.utxo_cache_spends = self.db_deletes = 0
self.utxo_flush_count = self.flush_count
@ -1034,6 +1035,9 @@ class BlockProcessor(server.db.DB):
self.db_height = self.height
self.db_tip = self.tip
self.logger.info('UTXO flush took {:.1f} seconds'
.format(time.time() - fs_flush_end))
def read_headers(self, start, count):
# Read some from disk
disk_count = min(count, self.db_height + 1 - start)

2
server/daemon.py

@ -31,7 +31,7 @@ class Daemon(util.LoggedClass):
super().__init__()
self.url = url
self._height = None
self.logger.info('connecting to daemon at URL {}'.format(url))
self.logger.info('connecting at URL {}'.format(url))
self.debug_caught_up = 'caught_up' in debug
# Limit concurrent RPC calls to this number.
# See DEFAULT_HTTP_WORKQUEUE in bitcoind, which is typically 16

6
server/env.py

@ -45,6 +45,12 @@ class Env(LoggedClass):
self.db_engine = self.default('DB_ENGINE', 'leveldb')
self.debug = self.default('DEBUG', '')
self.debug = [item.lower() for item in self.debug.split()]
# IRC
self.report_tcp_port = self.integer('REPORT_TCP_PORT', self.tcp_port)
self.report_ssl_port = self.integer('REPORT_SSL_PORT', self.ssl_port)
self.report_host = self.default('REPORT_HOST', self.host)
self.irc_nick = self.default('IRC_NICK', None)
self.irc = self.default('IRC', False)
def default(self, envvar, default):
return environ.get(envvar, default)

137
server/irc.py

@ -0,0 +1,137 @@
# Copyright (c) 2016, Neil Booth
#
# All rights reserved.
#
# See the file "LICENCE" for information about the copyright
# and warranty status of this software.
'''IRC connectivity to discover peers.
Only calling start() requires the IRC Python module.
'''
import asyncio
import re
import socket
from collections import namedtuple
from lib.hash import double_sha256
from lib.util import LoggedClass
from server.version import VERSION
def port_text(letter, port, default):
if not port:
return ''
if port == default:
return letter
return letter + str(port)
class IRC(LoggedClass):
PEER_REGEXP = re.compile('(E_[^!]*)!')
Peer = namedtuple('Peer', 'ip_addr host ports')
class DisconnectedError(Exception):
pass
def __init__(self, env):
super().__init__()
tcp_text = port_text('t', env.report_tcp_port, 50001)
ssl_text = port_text('s', env.report_ssl_port, 50002)
version = 'X{}'.format(VERSION.split()[1])
self.real_name = '{} v{} {} {}'.format(env.report_host, version,
tcp_text, ssl_text)
self.nick = 'E_{}'.format(env.irc_nick if env.irc_nick else
double_sha256(env.report_host.encode())
[:5].hex())
self.peers = {}
async def start(self):
import irc.client as irc_client
self.logger.info('joining IRC with nick "{}" and real name "{}"'
.format(self.nick, self.real_name))
reactor = irc_client.Reactor()
for event in ['welcome', 'join', 'quit', 'kick', 'whoreply',
'namreply', 'disconnect']:
reactor.add_global_handler(event, getattr(self, 'on_' + event))
while True:
try:
connection = reactor.server()
connection.connect('irc.freenode.net', 6667,
self.nick, ircname=self.real_name)
connection.set_keepalive(60)
while True:
reactor.process_once()
await asyncio.sleep(2)
except irc_client.ServerConnectionError as e:
self.logger.error('connection error: {}'.format(e))
except self.DisconnectedError:
self.logger.error('disconnected')
await asyncio.sleep(10)
def log_event(self, event):
self.logger.info('IRC event type {} source {} args {}'
.format(event.type, event.source, event.arguments))
def on_welcome(self, connection, event):
'''Called when we connect to freenode.'''
connection.join('#electrum')
def on_disconnect(self, connection, event):
'''Called if we are disconnected.'''
self.log_event(event)
raise self.DisconnectedError
def on_join(self, connection, event):
'''Called when someone new connects to our channel, including us.'''
match = self.PEER_REGEXP.match(event.source)
if match:
connection.who(match.group(1))
def on_quit(self, connection, event):
'''Called when someone leaves our channel.'''
match = self.PEER_REGEXP.match(event.source)
if match:
self.peers.pop(match.group(1), None)
def on_kick(self, connection, event):
'''Called when someone is kicked from our channel.'''
self.log_event(event)
match = self.PEER_REGEXP.match(event.arguments[0])
if match:
self.peers.pop(match.group(1), None)
def on_namreply(self, connection, event):
'''Called repeatedly when we first connect to inform us of all users
in the channel.
The users are space-separated in the 2nd argument.
'''
for peer in event.arguments[2].split():
if peer.startswith("E_"):
connection.who(peer)
def on_whoreply(self, connection, event):
'''Called when a response to our who requests arrives.
The nick is the 4th argument, and real name is in the 6th
argument preceeded by '0 ' for some reason.
'''
try:
nick = event.arguments[4]
line = event.arguments[6].split()
try:
ip_addr = socket.gethostbyname(line[1])
except socket.error:
# No IPv4 address could be resolved. Could be .onion or IPv6.
ip_addr = line[1]
peer = self.Peer(ip_addr, line[1], line[2:])
self.peers[nick] = peer
self.logger.info('new {}'.format(peer))
except IndexError:
pass

81
server/protocol.py

@ -12,14 +12,16 @@ import asyncio
import codecs
import json
import ssl
import time
import traceback
from collections import namedtuple
from functools import partial
from server.block_processor import BlockProcessor
from server.daemon import DaemonError
from lib.hash import sha256, double_sha256, hash_to_str, hex_str_to_hash
from lib.util import LoggedClass
from server.block_processor import BlockProcessor
from server.daemon import DaemonError
from server.irc import IRC
from server.version import VERSION
@ -38,15 +40,19 @@ class BlockServer(BlockProcessor):
def __init__(self, env):
super().__init__(env)
self.servers = []
self.irc = IRC(env)
async def caught_up(self, mempool_hashes):
await super().caught_up(mempool_hashes)
if not self.servers:
await self.start_servers()
if self.env.irc:
asyncio.ensure_future(self.irc.start())
ElectrumX.notify(self.height, self.touched)
async def start_server(self, name, protocol, host, port, *, ssl=None):
async def start_server(self, class_name, kind, host, port, *, ssl=None):
loop = asyncio.get_event_loop()
protocol = partial(class_name, self.env, kind)
server = loop.create_server(protocol, host, port, ssl=ssl)
try:
self.servers.append(await server)
@ -54,10 +60,10 @@ class BlockServer(BlockProcessor):
raise
except Exception as e:
self.logger.error('{} server failed to listen on {}:{:d} :{}'
.format(name, host, port, e))
.format(kind, host, port, e))
else:
self.logger.info('{} server listening on {}:{:d}'
.format(name, host, port))
.format(kind, host, port))
async def start_servers(self):
'''Start listening on RPC, TCP and SSL ports.
@ -66,27 +72,27 @@ class BlockServer(BlockProcessor):
'''
env = self.env
JSONRPC.init(self, self.daemon, self.coin)
protocol = LocalRPC
if env.rpc_port is not None:
await self.start_server('RPC', protocol, 'localhost', env.rpc_port)
await self.start_server(LocalRPC, 'RPC', 'localhost', env.rpc_port)
protocol = partial(ElectrumX, env)
if env.tcp_port is not None:
await self.start_server('TCP', protocol, env.host, env.tcp_port)
await self.start_server(ElectrumX, 'TCP', env.host, env.tcp_port)
if env.ssl_port is not None:
# FIXME: update if we want to require Python >= 3.5.3
sslc = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
sslc.load_cert_chain(env.ssl_certfile, keyfile=env.ssl_keyfile)
await self.start_server('SSL', protocol, env.host, env.ssl_port,
ssl=sslc)
await self.start_server(ElectrumX, 'SSL', env.host,
env.ssl_port, ssl=sslc)
def stop(self):
'''Close the listening servers.'''
for server in self.servers:
server.close()
def irc_peers(self):
return self.irc.peers
AsyncTask = namedtuple('AsyncTask', 'session job')
@ -107,7 +113,7 @@ class SessionManager(LoggedClass):
self.sessions.remove(session)
if self.current_task and session == self.current_task.session:
self.logger.info('cancelling running task')
self.current_task.cancel()
self.current_task.job.cancel()
def add_task(self, session, job):
assert session in self.sessions
@ -142,11 +148,16 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
self.send_count = 0
self.send_size = 0
self.error_count = 0
self.hash168s = set()
self.start = time.time()
self.client = 'unknown'
self.peername = 'unknown'
def connection_made(self, transport):
'''Handle an incoming client connection.'''
self.transport = transport
self.peername = transport.get_extra_info('peername')
peer = transport.get_extra_info('peername')
self.peername = '{}:{}'.format(peer[0], peer[1])
self.logger.info('connection from {}'.format(self.peername))
self.SESSION_MGR.add_session(self)
@ -284,6 +295,10 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
cls.COIN = coin
cls.SESSION_MGR = SessionManager()
@classmethod
def irc_peers(cls):
return cls.BLOCK_PROCESSOR.irc_peers()
@classmethod
def height(cls):
'''Return the current height.'''
@ -306,10 +321,10 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
class ElectrumX(JSONRPC):
'''A TCP server that handles incoming Electrum connections.'''
def __init__(self, env):
def __init__(self, env, kind):
super().__init__()
self.env = env
self.hash168s = set()
self.kind = kind
self.subscribe_headers = False
self.subscribe_height = False
self.notified_height = None
@ -331,8 +346,7 @@ class ElectrumX(JSONRPC):
@classmethod
def watched_address_count(cls):
sessions = cls.SESSION_MGR.sessions
return sum(len(session.hash168s) for session in sessions
if isinstance(session, cls))
return sum(len(session.hash168s) for session in sessions)
@classmethod
def notify(cls, height, touched):
@ -349,6 +363,9 @@ class ElectrumX(JSONRPC):
hash168_to_address = cls.COIN.hash168_to_address
for session in cls.SESSION_MGR.sessions:
if not isinstance(session, ElectrumX):
continue
if height != session.notified_height:
session.notified_height = height
if session.subscribe_headers:
@ -402,12 +419,6 @@ class ElectrumX(JSONRPC):
return {"block_height": height, "merkle": merkle_branch, "pos": pos}
@classmethod
def irc_peers(cls):
'''Returns a dictionary of IRC nick to (ip, host, ports) tuples, one
per peer.'''
return {}
@classmethod
def height(cls):
return cls.BLOCK_PROCESSOR.height
@ -590,39 +601,47 @@ class ElectrumX(JSONRPC):
subscription.
'''
self.require_empty_params(params)
peers = ElectrumX.irc_peers()
return tuple(peers.values())
return list(self.irc_peers().values())
async def version(self, params):
'''Return the server version as a string.'''
if len(params) == 2:
self.client = str(params[0])
self.protocol_version = params[1]
return VERSION
class LocalRPC(JSONRPC):
'''A local TCP RPC server for querying status.'''
def __init__(self):
def __init__(self, env, kind):
super().__init__()
cmds = 'getinfo sessions numsessions peers numpeers'.split()
self.handlers = {cmd: getattr(self.__class__, cmd) for cmd in cmds}
self.env = env
self.kind = kind
async def getinfo(self, params):
return {
'blocks': self.height(),
'peers': len(ElectrumX.irc_peers()),
'peers': len(self.irc_peers()),
'sessions': len(self.SESSION_MGR.sessions),
'watched': ElectrumX.watched_address_count(),
'cached': 0,
}
async def sessions(self, params):
return []
now = time.time()
return [(session.kind,
'this RPC client' if session == self else session.peername,
len(session.hash168s), session.client, now - session.start)
for session in self.SESSION_MGR.sessions]
async def numsessions(self, params):
return len(self.SESSION_MGR.sessions)
async def peers(self, params):
return tuple(ElectrumX.irc_peers().keys())
return self.irc_peers()
async def numpeers(self, params):
return len(ElectrumX.irc_peers())
return len(self.irc_peers())

2
server/version.py

@ -1 +1 @@
VERSION = "ElectrumX 0.3.3"
VERSION = "ElectrumX 0.4"

2
setup.py

@ -7,6 +7,8 @@ setuptools.setup(
version=VERSION.split()[-1],
scripts=['electrumx_server.py', 'electrumx_rpc.py'],
python_requires='>=3.5',
# "irc" package is only required if IRC connectivity is enabled
# via environment variables, in which case I've tested with 15.0.4
install_requires=['plyvel', 'aiohttp >= 1'],
packages=setuptools.find_packages(),
description='ElectrumX Server',

Loading…
Cancel
Save