Browse Source

Merge branch 'release-0.5.1'

master 0.5.1
Neil Booth 8 years ago
parent
commit
399cd8fb2c
  1. 40
      docs/RELEASE-NOTES
  2. 54
      electrumx_rpc.py
  3. 17
      electrumx_server.py
  4. 6
      lib/jsonrpc.py
  5. 83
      server/block_processor.py
  6. 6
      server/irc.py
  7. 161
      server/protocol.py
  8. 2
      server/version.py
  9. 11
      tests/test_util.py

40
docs/RELEASE-NOTES

@ -1,3 +1,43 @@
version 0.5.1
-------------
- 0.5 changed some cache defaults, only partially intentionally. For
some users, including me, the result was a regression (a 15hr HDD
sync became a 20hr sync). Another user reported their fastest sync
yet (sub 10hr SSD sync). What changed was memory accounting - all
releases until 0.5 were not properly accounting for memory usage of
unflushed transaction hashes. In 0.5 they were accounted for in the
UTXO cache, which resulted in much earlier flushes. 0.5.1 flushes
the hashes at the same time as history so I now account for it
towards the history cache limit. To get a reasonable comparison
with prior releases your HIST_MB environment variable should be
bumped by about 15% from 0.4 and earlier values. This will not
result in greater memory consumption - the additional memory
consumption was being ignored before but is now being included.
- 0.5.1 is the first release where Electrum client requests are queued
on a per-session basis. Previously they were in a global queue.
This is the beginning of ensuring that expensive / DOS requests
mostly affect that user's session and not those of other users. The
goal is that each session's requests run asynchronously parallel to
every other sessions's requests. The missing part of the puzzle is
that Python's asyncio is co-operative, however at the moment
ElectrumX does not yield during expensive requests. I intend that a
near upcoming release will ensure expensive requests yield the CPU
at regular fine-grained intervals. The intended result is that, to
some extent, expensive requests mainly delay that and later requests
from the same session, and have minimal impact on the legitimate
requests of other sessions. The extent to which this goal is
achieved will only be verifiable in practice.
- more robust tracking and handling of asynchronous tasks. I hope
this will reduce asyncio's logging messages, some of which I'm
becoming increasingly convinced I have no control over. In
particular I learned earlier releases were unintentionally limiting
the universe of acceptable SSL protocols, and so I made them the
default that had been intended.
- I added logging of expensive tasks, though I don't expect much real
information from this
- various RPC improvements
version 0.5
-----------

54
electrumx_rpc.py

@ -13,7 +13,6 @@
import argparse
import asyncio
import json
import pprint
from functools import partial
from os import environ
@ -36,38 +35,39 @@ class RPCClient(asyncio.Protocol):
data = json.dumps(payload) + '\n'
self.transport.write(data.encode())
def print_sessions(self, result):
def data_fmt(count, size):
return '{:,d}/{:,d}KB'.format(count, size // 1024)
def time_fmt(t):
t = int(t)
return ('{:3d}:{:02d}:{:02d}'
.format(t // 3600, (t % 3600) // 60, t % 60))
fmt = ('{:<4} {:>23} {:>15} {:>5} '
'{:>7} {:>7} {:>7} {:>7} {:>5} {:>9}')
print(fmt.format('Type', 'Peer', 'Client', 'Subs',
'Recv #', 'Recv KB', 'Sent #', 'Sent KB',
'Errs', 'Time'))
for (kind, peer, subs, client, recv_count, recv_size,
send_count, send_size, error_count, time) in result:
print(fmt.format(kind, peer, client, '{:,d}'.format(subs),
'{:,d}'.format(recv_count),
'{:,d}'.format(recv_size // 1024),
'{:,d}'.format(send_count),
'{:,d}'.format(send_size // 1024),
'{:,d}'.format(error_count),
time_fmt(time)))
def data_received(self, data):
payload = json.loads(data.decode())
self.transport.close()
result = payload['result']
error = payload['error']
if error:
print("ERROR: {}".format(error))
if not error and self.method == 'sessions':
self.print_sessions(result)
else:
def data_fmt(count, size):
return '{:,d}/{:,d}KB'.format(count, size // 1024)
def time_fmt(t):
t = int(t)
return ('{:3d}:{:02d}:{:02d}'
.format(t // 3600, (t % 3600) // 60, t % 60))
if self.method == 'sessions':
fmt = ('{:<4} {:>23} {:>15} {:>5} '
'{:>7} {:>7} {:>7} {:>7} {:>5} {:>9}')
print(fmt.format('Type', 'Peer', 'Client', 'Subs',
'Snt #', 'Snt MB', 'Rcv #', 'Rcv MB',
'Errs', 'Time'))
for (kind, peer, subs, client, recv_count, recv_size,
send_count, send_size, error_count, time) in result:
print(fmt.format(kind, peer, client, '{:,d}'.format(subs),
'{:,d}'.format(recv_count),
'{:,.1f}'.format(recv_size / 1048576),
'{:,d}'.format(send_count),
'{:,.1f}'.format(send_size / 1048576),
'{:,d}'.format(error_count),
time_fmt(time)))
else:
pprint.pprint(result, indent=4)
value = {'error': error} if error else result
print(json.dumps(value, indent=4, sort_keys=True))
def main():
'''Send the RPC command to the server and print the result.'''

17
electrumx_server.py

@ -32,23 +32,18 @@ def main_loop():
def on_signal(signame):
'''Call on receipt of a signal to cleanly shutdown.'''
logging.warning('received {} signal, shutting down'.format(signame))
for task in asyncio.Task.all_tasks():
task.cancel()
future.cancel()
server = BlockServer(Env())
future = asyncio.ensure_future(server.main_loop())
# Install signal handlers
for signame in ('SIGINT', 'SIGTERM'):
loop.add_signal_handler(getattr(signal, signame),
partial(on_signal, signame))
server = BlockServer(Env())
future = server.start()
try:
loop.run_until_complete(future)
except asyncio.CancelledError:
pass
finally:
server.stop()
loop.close()
loop.run_until_complete(future)
loop.close()
def main():

6
lib/jsonrpc.py

@ -63,7 +63,7 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
def __init__(self, msg, code=-1, **kw_args):
super().__init__(**kw_args)
self.msg = msg
self.code
self.code = code
def __init__(self):
@ -172,7 +172,7 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
the connection is closing.
'''
if isinstance(request, list):
payload = self.batch_request_payload(request)
payload = await self.batch_request_payload(request)
else:
payload = await self.single_request_payload(request)
@ -231,7 +231,7 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
handler = self.method_handler(method)
if not handler:
raise self.RPCError('unknown method: {}'.format(method),
raise self.RPCError('unknown method: "{}"'.format(method),
self.METHOD_NOT_FOUND)
return await handler(params)

83
server/block_processor.py

@ -91,6 +91,8 @@ class Prefetcher(LoggedClass):
await asyncio.sleep(0)
except DaemonError as e:
self.logger.info('ignoring daemon error: {}'.format(e))
except asyncio.CancelledError:
break
async def _caught_up(self):
'''Poll for new blocks and mempool state.
@ -215,7 +217,7 @@ class MemPool(LoggedClass):
for n, (hex_hash, tx) in enumerate(new_txs.items()):
# Yield to process e.g. signals
if n % 500 == 0:
if n % 100 == 0:
await asyncio.sleep(0)
txout_pairs = [txout_pair(txout) for txout in tx.outputs]
self.txs[hex_hash] = (None, txout_pairs, None)
@ -236,7 +238,7 @@ class MemPool(LoggedClass):
# Now add the inputs
for n, (hex_hash, tx) in enumerate(new_txs.items()):
# Yield to process e.g. signals
if n % 50 == 0:
if n % 10 == 0:
await asyncio.sleep(0)
if initial and time.time() > next_log:
@ -319,6 +321,8 @@ class BlockProcessor(server.db.DB):
super().__init__(env)
# These are our state as we move ahead of DB state
self.fs_height = self.db_height
self.fs_tx_count = self.db_tx_count
self.height = self.db_height
self.tip = self.db_tip
self.tx_count = self.db_tx_count
@ -327,6 +331,7 @@ class BlockProcessor(server.db.DB):
self.daemon.debug_set_height(self.height)
self.mempool = MemPool(self)
self.touched = set()
self.futures = []
# Meta
self.utxo_MB = env.utxo_MB
@ -369,23 +374,28 @@ class BlockProcessor(server.db.DB):
self.clean_db()
def start(self):
'''Returns a future that starts the block processor when awaited.'''
return asyncio.gather(self.main_loop(),
self.prefetcher.main_loop())
async def main_loop(self):
'''Main loop for block processing.
Safely flushes the DB on clean shutdown.
'''
self.futures.append(asyncio.ensure_future(self.prefetcher.main_loop()))
try:
while True:
await self._wait_for_update()
await asyncio.sleep(0) # Yield
except asyncio.CancelledError:
self.flush(True)
raise
self.on_cancel()
# This lets the asyncio subsystem process futures cancellations
await asyncio.sleep(0)
def on_cancel(self):
'''Called when the main loop is cancelled.
Intended to be overridden in derived classes.'''
for future in self.futures:
future.cancel()
self.flush(True)
async def _wait_for_update(self):
'''Wait for the prefetcher to deliver blocks or a mempool update.
@ -526,7 +536,8 @@ class BlockProcessor(server.db.DB):
def assert_flushed(self):
'''Asserts state is fully flushed.'''
assert self.tx_count == self.db_tx_count
assert self.tx_count == self.fs_tx_count == self.db_tx_count
assert self.height == self.fs_height == self.db_height
assert not self.history
assert not self.utxo_cache
assert not self.db_cache
@ -563,9 +574,10 @@ class BlockProcessor(server.db.DB):
# time it took to commit the batch
self.flush_state(self.db)
self.logger.info('flush #{:,d} to height {:,d} txs: {:,d} took {:,d}s'
self.logger.info('flush #{:,d} to height {:,d} txs: {:,d} '
'took {:,.1f}s'
.format(self.flush_count, self.height, self.tx_count,
int(self.last_flush - flush_start)))
self.last_flush - flush_start))
# Catch-up stats
if show_stats:
@ -591,7 +603,12 @@ class BlockProcessor(server.db.DB):
formatted_time(tx_est / this_tx_per_sec)))
def flush_history(self, batch):
flush_start = time.time()
fs_flush_start = time.time()
self.fs_flush()
fs_flush_end = time.time()
self.logger.info('FS flush took {:.1f} seconds'
.format(fs_flush_end - fs_flush_start))
flush_id = pack('>H', self.flush_count)
for hash168, hist in self.history.items():
@ -599,21 +616,21 @@ class BlockProcessor(server.db.DB):
batch.put(key, hist.tobytes())
self.logger.info('flushed {:,d} history entries for {:,d} addrs '
'in {:,d}s'
'in {:.1f}s'
.format(self.history_size, len(self.history),
int(time.time() - flush_start)))
time.time() - fs_flush_end))
self.history = defaultdict(partial(array.array, 'I'))
self.history_size = 0
def fs_flush(self):
'''Flush the things stored on the filesystem.'''
blocks_done = len(self.headers)
prior_tx_count = (self.tx_counts[self.db_height]
if self.db_height >= 0 else 0)
prior_tx_count = (self.tx_counts[self.fs_height]
if self.fs_height >= 0 else 0)
cur_tx_count = self.tx_counts[-1] if self.tx_counts else 0
txs_done = cur_tx_count - prior_tx_count
assert self.db_height + blocks_done == self.height
assert self.fs_height + blocks_done == self.height
assert len(self.tx_hashes) == blocks_done
assert len(self.tx_counts) == self.height + 1
assert cur_tx_count == self.tx_count, \
@ -622,13 +639,13 @@ class BlockProcessor(server.db.DB):
# First the headers
headers = b''.join(self.headers)
header_len = self.coin.HEADER_LEN
self.headers_file.seek((self.db_height + 1) * header_len)
self.headers_file.seek((self.fs_height + 1) * header_len)
self.headers_file.write(headers)
self.headers_file.flush()
# Then the tx counts
self.txcount_file.seek((self.db_height + 1) * self.tx_counts.itemsize)
self.txcount_file.write(self.tx_counts[self.db_height + 1:])
self.txcount_file.seek((self.fs_height + 1) * self.tx_counts.itemsize)
self.txcount_file.write(self.tx_counts[self.fs_height + 1:])
self.txcount_file.flush()
# Finally the hashes
@ -648,7 +665,8 @@ class BlockProcessor(server.db.DB):
file_pos += size
os.sync()
self.fs_height = self.height
self.fs_tx_count = self.tx_count
self.tx_hashes = []
self.headers = []
@ -692,9 +710,9 @@ class BlockProcessor(server.db.DB):
utxo_cache_size = len(self.utxo_cache) * 187
db_cache_size = len(self.db_cache) * 105
hist_cache_size = len(self.history) * 180 + self.history_size * 4
tx_hash_size = (self.tx_count - self.db_tx_count) * 74
utxo_MB = (db_cache_size + utxo_cache_size + tx_hash_size) // one_MB
hist_MB = hist_cache_size // one_MB
tx_hash_size = (self.tx_count - self.fs_tx_count) * 74
utxo_MB = (db_cache_size + utxo_cache_size) // one_MB
hist_MB = (hist_cache_size + tx_hash_size) // one_MB
self.logger.info('UTXOs: {:,d} deletes: {:,d} '
'UTXOs {:,d}MB hist {:,d}MB'
@ -978,6 +996,7 @@ class BlockProcessor(server.db.DB):
# Care is needed because the writes generated by flushing the
# UTXO state may have keys in common with our write cache or
# may be in the DB already.
flush_start = time.time()
self.logger.info('flushing {:,d} blocks with {:,d} txs'
.format(self.height - self.db_height,
self.tx_count - self.db_tx_count))
@ -987,12 +1006,6 @@ class BlockProcessor(server.db.DB):
self.utxo_cache_spends,
self.db_deletes))
fs_flush_start = time.time()
self.fs_flush()
fs_flush_end = time.time()
self.logger.info('FS flush took {:.1f} seconds'
.format(fs_flush_end - fs_flush_start))
collisions = 0
new_utxos = len(self.utxo_cache)
@ -1031,18 +1044,18 @@ class BlockProcessor(server.db.DB):
self.db_tip = self.tip
self.logger.info('UTXO flush took {:.1f} seconds'
.format(time.time() - fs_flush_end))
.format(time.time() - flush_start))
def read_headers(self, start, count):
# Read some from disk
disk_count = min(count, self.db_height + 1 - start)
disk_count = min(count, self.fs_height + 1 - start)
result = self.fs_read_headers(start, disk_count)
count -= disk_count
start += disk_count
# The rest from memory
if count:
start -= self.db_height + 1
start -= self.fs_height + 1
if not (count >= 0 and start + count <= len(self.headers)):
raise ChainError('{:,d} headers starting at {:,d} not on disk'
.format(count, start))
@ -1056,7 +1069,7 @@ class BlockProcessor(server.db.DB):
# Is this unflushed?
if tx_hash is None:
tx_hashes = self.tx_hashes[tx_height - (self.db_height + 1)]
tx_hashes = self.tx_hashes[tx_height - (self.fs_height + 1)]
tx_hash = tx_hashes[tx_num - self.tx_counts[tx_height - 1]]
return tx_hash, tx_height

6
server/irc.py

@ -51,6 +51,12 @@ class IRC(LoggedClass):
self.peers = {}
async def start(self):
try:
await self.join()
except asyncio.CancelledError:
pass
async def join(self):
import irc.client as irc_client
self.logger.info('joining IRC with nick "{}" and real name "{}"'

161
server/protocol.py

@ -42,15 +42,16 @@ class BlockServer(BlockProcessor):
self.bs_caught_up = True
self.server_mgr.notify(self.height, self.touched)
def stop(self):
'''Close the listening servers.'''
def on_cancel(self):
'''Called when the main loop is cancelled.'''
self.server_mgr.stop()
super().on_cancel()
class ServerManager(LoggedClass):
'''Manages the servers.'''
AsyncTask = namedtuple('AsyncTask', 'session job')
MgrTask = namedtuple('MgrTask', 'session task')
def __init__(self, bp, env):
super().__init__()
@ -58,9 +59,8 @@ class ServerManager(LoggedClass):
self.env = env
self.servers = []
self.irc = IRC(env)
self.sessions = set()
self.tasks = asyncio.Queue()
self.current_task = None
self.sessions = {}
self.futures = [] # At present just the IRC future, if any
async def start_server(self, kind, *args, **kw_args):
loop = asyncio.get_event_loop()
@ -95,16 +95,14 @@ class ServerManager(LoggedClass):
await self.start_server('TCP', env.host, env.tcp_port)
if env.ssl_port is not None:
# FIXME: update if we want to require Python >= 3.5.3
sslc = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
# Python 3.5.3: use PROTOCOL_TLS
sslc = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
sslc.load_cert_chain(env.ssl_certfile, keyfile=env.ssl_keyfile)
await self.start_server('SSL', env.host, env.ssl_port, ssl=sslc)
asyncio.ensure_future(self.run_tasks())
if env.irc:
self.logger.info('starting IRC coroutine')
asyncio.ensure_future(self.irc.start())
self.futures.append(asyncio.ensure_future(self.irc.start()))
else:
self.logger.info('IRC disabled')
@ -115,67 +113,54 @@ class ServerManager(LoggedClass):
ElectrumX.notify(sessions, height, touched)
def stop(self):
'''Close the listening servers.'''
'''Close listening servers.'''
for server in self.servers:
server.close()
self.servers = []
for future in self.futures:
future.cancel()
self.futures = []
sessions = list(self.sessions.keys()) # A copy
for session in sessions:
self.remove_session(session)
def add_session(self, session):
assert session not in self.sessions
self.sessions.add(session)
coro = session.serve_requests()
self.sessions[session] = asyncio.ensure_future(coro)
def remove_session(self, session):
self.sessions.remove(session)
if self.current_task and session == self.current_task.session:
self.logger.info('cancelling running task')
self.current_task.job.cancel()
def add_task(self, session, job):
assert session in self.sessions
task = asyncio.ensure_future(job)
self.tasks.put_nowait(self.AsyncTask(session, task))
async def run_tasks(self):
'''Asynchronously run through the task queue.'''
while True:
task = await self.tasks.get()
try:
if task.session in self.sessions:
self.current_task = task
await task.job
else:
task.job.cancel()
except asyncio.CancelledError:
self.logger.info('cancelled task noted')
except Exception:
# Getting here should probably be considered a bug and fixed
traceback.print_exc()
finally:
self.current_task = None
future = self.sessions.pop(session)
future.cancel()
def irc_peers(self):
return self.irc.peers
def session_count(self):
return len(self.sessions)
'''Returns a dictionary.'''
active = len([s for s in self.sessions if s.send_count])
total = len(self.sessions)
return {'active': active, 'inert': total - active, 'total': total}
def info(self):
'''Returned in the RPC 'getinfo' call.'''
address_count = sum(len(session.hash168s)
for session in self.sessions
if isinstance(session, ElectrumX))
def address_count(self):
return sum(len(session.hash168s) for session in self.sessions
if isinstance(session, ElectrumX))
async def rpc_getinfo(self, params):
'''The RPC 'getinfo' call.'''
return {
'blocks': self.bp.height,
'peers': len(self.irc_peers()),
'peers': len(self.irc.peers),
'sessions': self.session_count(),
'watched': address_count,
'watched': self.address_count(),
'cached': 0,
}
def sessions_info(self):
async def rpc_sessions(self, params):
'''Returned to the RPC 'sessions' call.'''
now = time.time()
return [(session.kind,
session.peername(),
session.peername(for_log=False),
len(session.hash168s),
'RPC' if isinstance(session, LocalRPC) else session.client,
session.recv_count, session.recv_size,
@ -184,9 +169,23 @@ class ServerManager(LoggedClass):
now - session.start)
for session in self.sessions]
async def rpc_numsessions(self, params):
return self.session_count()
async def rpc_peers(self, params):
return self.irc.peers
async def rpc_numpeers(self, params):
return len(self.irc.peers)
class Session(JSONRPC):
'''Base class of ElectrumX JSON session protocols.'''
'''Base class of ElectrumX JSON session protocols.
Each session runs its tasks in asynchronous parallelism with other
sessions. To prevent some sessions blocking othersr, potentially
long-running requests should yield (not yet implemented).
'''
def __init__(self, manager, bp, env, kind):
super().__init__()
@ -197,12 +196,14 @@ class Session(JSONRPC):
self.coin = bp.coin
self.kind = kind
self.hash168s = set()
self.requests = asyncio.Queue()
self.current_task = None
self.client = 'unknown'
def connection_made(self, transport):
'''Handle an incoming client connection.'''
super().connection_made(transport)
self.logger.info('connection from {}'.format(self.peername(True)))
self.logger.info('connection from {}'.format(self.peername()))
self.manager.add_session(self)
def connection_lost(self, exc):
@ -211,7 +212,7 @@ class Session(JSONRPC):
if self.error_count or self.send_size >= 250000:
self.logger.info('{} disconnected. '
'Sent {:,d} bytes in {:,d} messages {:,d} errors'
.format(self.peername(True), self.send_size,
.format(self.peername(), self.send_size,
self.send_count, self.error_count))
self.manager.remove_session(self)
@ -221,15 +222,35 @@ class Session(JSONRPC):
def on_json_request(self, request):
'''Queue the request for asynchronous handling.'''
self.manager.add_task(self, self.handle_json_request(request))
self.requests.put_nowait(request)
def peername(self, for_log=False):
# Anonymi{z, s}e all IP addresses that will be stored in a log
if for_log and self.env.anon_logs and self.peer_info:
info = ["XX.XX.XX.XX", "XX"]
else:
info = self.peer_info
return 'unknown' if not info else '{}:{}'.format(info[0], info[1])
async def serve_requests(self):
'''Asynchronously run through the task queue.'''
while True:
await asyncio.sleep(0)
request = await self.requests.get()
try:
start = time.time()
await self.handle_json_request(request)
secs = time.time() - start
if secs > 1:
self.logger.warning('slow request for {} took {:.1f}s: {}'
.format(self.peername(), secs,
request))
except asyncio.CancelledError:
break
except Exception:
# Getting here should probably be considered a bug and fixed
self.logger.error('error handling request {}'.format(request))
traceback.print_exc()
def peername(self, *, for_log=True):
if not self.peer_info:
return 'unknown'
# Anonymize IP addresses that will be logged
if for_log and self.env.anon_logs:
return 'xx.xx.xx.xx:xx'
return '{}:{}'.format(self.peer_info[0], self.peer_info[1])
def tx_hash_from_param(self, param):
'''Raise an RPCError if the parameter is not a valid transaction
@ -576,19 +597,5 @@ class LocalRPC(Session):
def __init__(self, *args):
super().__init__(*args)
cmds = 'getinfo sessions numsessions peers numpeers'.split()
self.handlers = {cmd: getattr(self, cmd) for cmd in cmds}
async def getinfo(self, params):
return self.manager.info()
async def sessions(self, params):
return self.manager.sessions_info()
async def numsessions(self, params):
return self.manager.session_count()
async def peers(self, params):
return self.manager.irc_peers()
async def numpeers(self, params):
return len(self.manager.irc_peers())
self.handlers = {cmd: getattr(self.manager, 'rpc_{}'.format(cmd))
for cmd in cmds}

2
server/version.py

@ -1 +1 @@
VERSION = "ElectrumX 0.5"
VERSION = "ElectrumX 0.5.1"

11
tests/test_util.py

@ -3,6 +3,9 @@ from lib import util
def test_cachedproperty():
class Target:
CALL_COUNT = 0
def __init__(self):
self.call_count = 0
@ -11,8 +14,15 @@ def test_cachedproperty():
self.call_count += 1
return self.call_count
@util.cachedproperty
def cls_prop(cls):
cls.CALL_COUNT += 1
return cls.CALL_COUNT
t = Target()
assert t.prop == t.prop == 1
assert Target.cls_prop == Target.cls_prop == 1
def test_deep_getsizeof():
@ -36,6 +46,7 @@ class B(Base):
def test_subclasses():
assert util.subclasses(Base) == [A, B]
assert util.subclasses(Base, strict=False) == [A, B, Base]
def test_chunks():

Loading…
Cancel
Save