Browse Source

Create SessionManager class

Break out controller functionality for session management.
patch-2
Neil Booth 7 years ago
parent
commit
e41d1db25c
  1. 82
      electrumx/lib/text.py
  2. 466
      electrumx/server/controller.py
  3. 407
      electrumx/server/session.py
  4. 6
      electrumx_rpc

82
electrumx/lib/text.py

@ -0,0 +1,82 @@
import time
import electrumx.lib.util as util
def sessions_lines(data):
'''A generator returning lines for a list of sessions.
data is the return value of rpc_sessions().'''
fmt = ('{:<6} {:<5} {:>17} {:>5} {:>5} {:>5} '
'{:>7} {:>7} {:>7} {:>7} {:>7} {:>9} {:>21}')
yield fmt.format('ID', 'Flags', 'Client', 'Proto',
'Reqs', 'Txs', 'Subs',
'Recv', 'Recv KB', 'Sent', 'Sent KB', 'Time', 'Peer')
for (id_, flags, peer, client, proto, reqs, txs_sent, subs,
recv_count, recv_size, send_count, send_size, time) in data:
yield fmt.format(id_, flags, client, proto,
'{:,d}'.format(reqs),
'{:,d}'.format(txs_sent),
'{:,d}'.format(subs),
'{:,d}'.format(recv_count),
'{:,d}'.format(recv_size // 1024),
'{:,d}'.format(send_count),
'{:,d}'.format(send_size // 1024),
util.formatted_time(time, sep=''), peer)
def groups_lines(data):
'''A generator returning lines for a list of groups.
data is the return value of rpc_groups().'''
fmt = ('{:<6} {:>9} {:>9} {:>6} {:>6} {:>8}'
'{:>7} {:>9} {:>7} {:>9}')
yield fmt.format('ID', 'Sessions', 'Bwidth KB', 'Reqs', 'Txs', 'Subs',
'Recv', 'Recv KB', 'Sent', 'Sent KB')
for (id_, session_count, bandwidth, reqs, txs_sent, subs,
recv_count, recv_size, send_count, send_size) in data:
yield fmt.format(id_,
'{:,d}'.format(session_count),
'{:,d}'.format(bandwidth // 1024),
'{:,d}'.format(reqs),
'{:,d}'.format(txs_sent),
'{:,d}'.format(subs),
'{:,d}'.format(recv_count),
'{:,d}'.format(recv_size // 1024),
'{:,d}'.format(send_count),
'{:,d}'.format(send_size // 1024))
def peers_lines(data):
'''A generator returning lines for a list of peers.
data is the return value of rpc_peers().'''
def time_fmt(t):
if not t:
return 'Never'
return util.formatted_time(now - t)
now = time.time()
fmt = ('{:<30} {:<6} {:>5} {:>5} {:<17} {:>4} '
'{:>4} {:>8} {:>11} {:>11} {:>5} {:>20} {:<15}')
yield fmt.format('Host', 'Status', 'TCP', 'SSL', 'Server', 'Min',
'Max', 'Pruning', 'Last Good', 'Last Try',
'Tries', 'Source', 'IP Address')
for item in data:
features = item['features']
hostname = item['host']
host = features['hosts'][hostname]
yield fmt.format(hostname[:30],
item['status'],
host.get('tcp_port') or '',
host.get('ssl_port') or '',
features['server_version'] or 'unknown',
features['protocol_min'],
features['protocol_max'],
features['pruning'] or '',
time_fmt(item['last_good']),
time_fmt(item['last_try']),
item['try_count'],
item['source'][:20],
item['ip_addr'] or '')

466
electrumx/server/controller.py

@ -6,38 +6,21 @@
# and warranty status of this software.
import asyncio
import itertools
import json
import os
import ssl
import time
import traceback
from bisect import bisect_left
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor
from functools import partial
import pylru
from aiorpcx import RPCError, TaskSet, _version as aiorpcx_version
import electrumx
from electrumx.lib.hash import hash_to_hex_str, hex_str_to_hash
from electrumx.lib.peer import Peer
from electrumx.lib.server_base import ServerBase
import electrumx.lib.util as util
from electrumx.server.daemon import DaemonError
from electrumx.server.mempool import MemPool
from electrumx.server.peers import PeerManager
from electrumx.server.session import (LocalRPC, BAD_REQUEST, DAEMON_ERROR,
non_negative_integer)
class SessionGroup(object):
def __init__(self, gid):
self.gid = gid
# Concurrency per group
self.semaphore = asyncio.Semaphore(20)
from electrumx.server.session import (BAD_REQUEST, DAEMON_ERROR,
SessionManager, non_negative_integer)
class Controller(ServerBase):
@ -47,7 +30,6 @@ class Controller(ServerBase):
up with the daemon.
'''
CATCHING_UP, LISTENING, PAUSED, SHUTTING_DOWN = range(4)
AIORPCX_MIN = (0, 5, 6)
def __init__(self, env):
@ -66,69 +48,39 @@ class Controller(ServerBase):
self.logger.info(f'event loop policy: {env.loop_policy}')
self.coin = env.coin
self.servers = {}
self.tasks = TaskSet()
self.sessions = set()
self.cur_group = SessionGroup(0)
self.txs_sent = 0
self.next_log_sessions = 0
self.state = self.CATCHING_UP
self.max_sessions = env.max_sessions
self.low_watermark = self.max_sessions * 19 // 20
self.max_subs = env.max_subs
# Cache some idea of room to avoid recounting on each subscription
self.subs_room = 0
self.next_stale_check = 0
self.history_cache = pylru.lrucache(256)
self.header_cache = pylru.lrucache(8)
self.cache_height = 0
self.cache_mn_height = 0
self.mn_cache = pylru.lrucache(256)
env.max_send = max(350000, env.max_send)
# Set up the RPC request handlers
cmds = ('add_peer daemon_url disconnect getinfo groups log peers '
'reorg sessions stop'.split())
self.rpc_handlers = {cmd: getattr(self, 'rpc_' + cmd) for cmd in cmds}
self.loop = asyncio.get_event_loop()
self.executor = ThreadPoolExecutor()
self.loop.set_default_executor(self.executor)
# The complex objects. Note PeerManager references self.loop (ugh)
self.session_mgr = SessionManager(env, self)
self.daemon = self.coin.DAEMON(env)
self.bp = self.coin.BLOCK_PROCESSOR(env, self, self.daemon)
self.mempool = MemPool(self.bp, self)
self.peer_mgr = PeerManager(env, self)
# Event triggered when electrumx is listening for incoming requests.
self.server_listening = asyncio.Event()
async def start_servers(self):
'''Start the RPC server and schedule the external servers to be
started once the block processor has caught up.
'''
if self.env.rpc_port is not None:
await self.start_server('RPC', self.env.cs_host(for_rpc=True),
self.env.rpc_port)
await self.session_mgr.start_rpc_server()
self.create_task(self.bp.main_loop())
self.create_task(self.wait_for_bp_catchup())
async def shutdown(self):
'''Perform the shutdown sequence.'''
self.state = self.SHUTTING_DOWN
# Close servers and sessions, and cancel all tasks
self.close_servers(list(self.servers.keys()))
for session in self.sessions:
session.abort()
# Not certain of ordering here
self.tasks.cancel_all()
# Wait for the above to take effect
await self.session_mgr.shutdown()
await self.tasks.wait()
for session in list(self.sessions):
await session.wait_closed()
# Finally shut down the block processor and executor
self.bp.shutdown(self.executor)
@ -147,10 +99,6 @@ class Controller(ServerBase):
'''
return self.mempool.value(hashX)
def sent_tx(self, tx_hash):
'''Call when a TX is sent.'''
self.txs_sent += 1
async def run_in_executor(self, func, *args):
'''Wait whilst running func in the executor.'''
return await self.loop.run_in_executor(None, func, *args)
@ -173,30 +121,6 @@ class Controller(ServerBase):
except Exception as e:
self.logger.exception(f'uncaught task exception: {e}')
async def housekeeping(self):
'''Regular housekeeping checks.'''
n = 0
while True:
n += 1
await asyncio.sleep(15)
if n % 10 == 0:
self.clear_stale_sessions()
# Start listening for incoming connections if paused and
# session count has fallen
if (self.state == self.PAUSED and
len(self.sessions) <= self.low_watermark):
await self.start_external_servers()
# Periodically log sessions
if self.env.log_sessions and time.time() > self.next_log_sessions:
if self.next_log_sessions:
data = self.session_data(for_log=True)
for line in Controller.sessions_text_lines(data):
self.logger.info(line)
self.logger.info(json.dumps(self.getinfo()))
self.next_log_sessions = time.time() + self.env.log_sessions
async def wait_for_bp_catchup(self):
'''Wait for the block processor to catch up, and for the mempool to
synchronize, then kick off server background processes.'''
@ -204,67 +128,8 @@ class Controller(ServerBase):
self.create_task(self.mempool.main_loop())
await self.mempool.synchronized_event.wait()
self.create_task(self.peer_mgr.main_loop())
self.create_task(self.log_start_external_servers())
self.create_task(self.housekeeping())
def close_servers(self, kinds):
'''Close the servers of the given kinds (TCP etc.).'''
if kinds:
self.logger.info('closing down {} listening servers'
.format(', '.join(kinds)))
for kind in kinds:
server = self.servers.pop(kind, None)
if server:
server.close()
async def start_server(self, kind, *args, **kw_args):
protocol_class = LocalRPC if kind == 'RPC' else self.coin.SESSIONCLS
protocol_factory = partial(protocol_class, self, kind)
server = self.loop.create_server(protocol_factory, *args, **kw_args)
host, port = args[:2]
try:
self.servers[kind] = await server
except Exception as e:
self.logger.error('{} server failed to listen on {}:{:d} :{}'
.format(kind, host, port, e))
else:
self.logger.info('{} server listening on {}:{:d}'
.format(kind, host, port))
async def log_start_external_servers(self):
'''Start TCP and SSL servers.'''
self.logger.info('max session count: {:,d}'.format(self.max_sessions))
self.logger.info('session timeout: {:,d} seconds'
.format(self.env.session_timeout))
self.logger.info('session bandwidth limit {:,d} bytes'
.format(self.env.bandwidth_limit))
self.logger.info('max response size {:,d} bytes'
.format(self.env.max_send))
self.logger.info('max subscriptions across all sessions: {:,d}'
.format(self.max_subs))
self.logger.info('max subscriptions per session: {:,d}'
.format(self.env.max_session_subs))
if self.env.drop_client is not None:
self.logger.info('drop clients matching: {}'
.format(self.env.drop_client.pattern))
await self.start_external_servers()
async def start_external_servers(self):
'''Start listening on TCP and SSL ports, but only if the respective
port was given in the environment.
'''
self.state = self.LISTENING
env = self.env
host = env.cs_host(for_rpc=False)
if env.tcp_port is not None:
await self.start_server('TCP', host, env.tcp_port)
if env.ssl_port is not None:
sslc = ssl.SSLContext(ssl.PROTOCOL_TLS)
sslc.load_cert_chain(env.ssl_certfile, keyfile=env.ssl_keyfile)
await self.start_server('SSL', host, env.ssl_port, ssl=sslc)
self.server_listening.set()
self.create_task(self.session_mgr.start_serving())
self.create_task(self.session_mgr.housekeeping())
def notify_sessions(self, touched):
'''Notify sessions about height changes and touched addresses.'''
@ -278,19 +143,7 @@ class Controller(ServerBase):
self.cache_height = height
self.header_cache.clear()
# Height notifications are synchronous. Those sessions with
# touched addresses are scheduled for asynchronous completion
for session in self.sessions:
if isinstance(session, LocalRPC):
continue
session_touched = session.notify(height, touched)
if session_touched is not None:
self.create_task(session.notify_async(session_touched))
def notify_peers(self, updates):
'''Notify of peer updates.'''
for session in self.sessions:
session.notify_peers(updates)
self.session_mgr.notify(height, touched)
def raw_header(self, height):
'''Return the binary header at the given height.'''
@ -307,299 +160,6 @@ class Controller(ServerBase):
height)
return self.header_cache[height]
def add_session(self, session):
self.sessions.add(session)
if (len(self.sessions) >= self.max_sessions
and self.state == self.LISTENING):
self.state = self.PAUSED
session.logger.info('maximum sessions {:,d} reached, stopping new '
'connections until count drops to {:,d}'
.format(self.max_sessions, self.low_watermark))
self.close_servers(['TCP', 'SSL'])
gid = int(session.start_time - self.start_time) // 900
if self.cur_group.gid != gid:
self.cur_group = SessionGroup(gid)
return self.cur_group
def remove_session(self, session):
'''Remove a session from our sessions list if there.'''
self.sessions.remove(session)
def close_session(self, session):
'''Close the session's transport.'''
session.close()
return 'disconnected {:d}'.format(session.session_id)
def toggle_logging(self, session):
'''Toggle logging of the session.'''
session.toggle_logging()
return 'log {:d}: {}'.format(session.session_id, session.log_me)
def _group_map(self):
group_map = defaultdict(list)
for session in self.sessions:
group_map[session.group].append(session)
return group_map
def clear_stale_sessions(self):
'''Cut off sessions that haven't done anything for 10 minutes.'''
now = time.time()
stale_cutoff = now - self.env.session_timeout
stale = []
for session in self.sessions:
if session.is_closing():
session.abort()
elif session.last_recv < stale_cutoff:
self.close_session(session)
stale.append(session.session_id)
if stale:
self.logger.info('closing stale connections {}'.format(stale))
# Consolidate small groups
bw_limit = self.env.bandwidth_limit
group_map = self._group_map()
groups = [group for group, sessions in group_map.items()
if len(sessions) <= 5 and
sum(s.bw_charge for s in sessions) < bw_limit]
if len(groups) > 1:
new_group = groups[-1]
for group in groups:
for session in group_map[group]:
session.group = new_group
def session_count(self):
'''The number of connections that we've sent something to.'''
return len(self.sessions)
def getinfo(self):
'''A one-line summary of server state.'''
group_map = self._group_map()
return {
'version': electrumx.version,
'daemon': self.daemon.logged_url(),
'daemon_height': self.daemon.cached_height(),
'db_height': self.bp.db_height,
'closing': len([s for s in self.sessions if s.is_closing()]),
'errors': sum(s.rpc.errors for s in self.sessions),
'groups': len(group_map),
'logged': len([s for s in self.sessions if s.log_me]),
'paused': sum(s.paused for s in self.sessions),
'pid': os.getpid(),
'peers': self.peer_mgr.info(),
'requests': sum(s.count_pending_items() for s in self.sessions),
'sessions': self.session_count(),
'subs': self.sub_count(),
'txs_sent': self.txs_sent,
'uptime': util.formatted_time(time.time() - self.start_time),
}
def sub_count(self):
return sum(s.sub_count() for s in self.sessions)
@staticmethod
def groups_text_lines(data):
'''A generator returning lines for a list of groups.
data is the return value of rpc_groups().'''
fmt = ('{:<6} {:>9} {:>9} {:>6} {:>6} {:>8}'
'{:>7} {:>9} {:>7} {:>9}')
yield fmt.format('ID', 'Sessions', 'Bwidth KB', 'Reqs', 'Txs', 'Subs',
'Recv', 'Recv KB', 'Sent', 'Sent KB')
for (id_, session_count, bandwidth, reqs, txs_sent, subs,
recv_count, recv_size, send_count, send_size) in data:
yield fmt.format(id_,
'{:,d}'.format(session_count),
'{:,d}'.format(bandwidth // 1024),
'{:,d}'.format(reqs),
'{:,d}'.format(txs_sent),
'{:,d}'.format(subs),
'{:,d}'.format(recv_count),
'{:,d}'.format(recv_size // 1024),
'{:,d}'.format(send_count),
'{:,d}'.format(send_size // 1024))
def group_data(self):
'''Returned to the RPC 'groups' call.'''
result = []
group_map = self._group_map()
for group, sessions in group_map.items():
result.append([group.gid,
len(sessions),
sum(s.bw_charge for s in sessions),
sum(s.count_pending_items() for s in sessions),
sum(s.txs_sent for s in sessions),
sum(s.sub_count() for s in sessions),
sum(s.recv_count for s in sessions),
sum(s.recv_size for s in sessions),
sum(s.send_count for s in sessions),
sum(s.send_size for s in sessions),
])
return result
@staticmethod
def peers_text_lines(data):
'''A generator returning lines for a list of peers.
data is the return value of rpc_peers().'''
def time_fmt(t):
if not t:
return 'Never'
return util.formatted_time(now - t)
now = time.time()
fmt = ('{:<30} {:<6} {:>5} {:>5} {:<17} {:>4} '
'{:>4} {:>8} {:>11} {:>11} {:>5} {:>20} {:<15}')
yield fmt.format('Host', 'Status', 'TCP', 'SSL', 'Server', 'Min',
'Max', 'Pruning', 'Last Good', 'Last Try',
'Tries', 'Source', 'IP Address')
for item in data:
features = item['features']
hostname = item['host']
host = features['hosts'][hostname]
yield fmt.format(hostname[:30],
item['status'],
host.get('tcp_port') or '',
host.get('ssl_port') or '',
features['server_version'] or 'unknown',
features['protocol_min'],
features['protocol_max'],
features['pruning'] or '',
time_fmt(item['last_good']),
time_fmt(item['last_try']),
item['try_count'],
item['source'][:20],
item['ip_addr'] or '')
@staticmethod
def sessions_text_lines(data):
'''A generator returning lines for a list of sessions.
data is the return value of rpc_sessions().'''
fmt = ('{:<6} {:<5} {:>17} {:>5} {:>5} {:>5} '
'{:>7} {:>7} {:>7} {:>7} {:>7} {:>9} {:>21}')
yield fmt.format('ID', 'Flags', 'Client', 'Proto',
'Reqs', 'Txs', 'Subs',
'Recv', 'Recv KB', 'Sent', 'Sent KB', 'Time', 'Peer')
for (id_, flags, peer, client, proto, reqs, txs_sent, subs,
recv_count, recv_size, send_count, send_size, time) in data:
yield fmt.format(id_, flags, client, proto,
'{:,d}'.format(reqs),
'{:,d}'.format(txs_sent),
'{:,d}'.format(subs),
'{:,d}'.format(recv_count),
'{:,d}'.format(recv_size // 1024),
'{:,d}'.format(send_count),
'{:,d}'.format(send_size // 1024),
util.formatted_time(time, sep=''), peer)
def session_data(self, for_log):
'''Returned to the RPC 'sessions' call.'''
now = time.time()
sessions = sorted(self.sessions, key=lambda s: s.start_time)
return [(session.session_id,
session.flags(),
session.peer_address_str(for_log=for_log),
session.client,
session.protocol_version_string(),
session.count_pending_items(),
session.txs_sent,
session.sub_count(),
session.recv_count, session.recv_size,
session.send_count, session.send_size,
now - session.start_time)
for session in sessions]
def lookup_session(self, session_id):
try:
session_id = int(session_id)
except Exception:
pass
else:
for session in self.sessions:
if session.session_id == session_id:
return session
return None
def for_each_session(self, session_ids, operation):
if not isinstance(session_ids, list):
raise RPCError(BAD_REQUEST, 'expected a list of session IDs')
result = []
for session_id in session_ids:
session = self.lookup_session(session_id)
if session:
result.append(operation(session))
else:
result.append('unknown session: {}'.format(session_id))
return result
# Local RPC command handlers
def rpc_add_peer(self, real_name):
'''Add a peer.
real_name: a real name, as would appear on IRC
'''
peer = Peer.from_real_name(real_name, 'RPC')
self.peer_mgr.add_peers([peer])
return "peer '{}' added".format(real_name)
def rpc_disconnect(self, session_ids):
'''Disconnect sesssions.
session_ids: array of session IDs
'''
return self.for_each_session(session_ids, self.close_session)
def rpc_log(self, session_ids):
'''Toggle logging of sesssions.
session_ids: array of session IDs
'''
return self.for_each_session(session_ids, self.toggle_logging)
def rpc_daemon_url(self, daemon_url=None):
'''Replace the daemon URL.'''
daemon_url = daemon_url or self.env.daemon_url
try:
self.daemon.set_urls(self.coin.daemon_urls(daemon_url))
except Exception as e:
raise RPCError(BAD_REQUEST, f'an error occured: {e}')
return 'now using daemon at {}'.format(self.daemon.logged_url())
def rpc_stop(self):
'''Shut down the server cleanly.'''
self.loop.call_soon(self.shutdown_event.set)
return 'stopping'
def rpc_getinfo(self):
'''Return summary information about the server process.'''
return self.getinfo()
def rpc_groups(self):
'''Return statistics about the session groups.'''
return self.group_data()
def rpc_peers(self):
'''Return a list of data about server peers.'''
return self.peer_mgr.rpc_data()
def rpc_sessions(self):
'''Return statistics about connected sessions.'''
return self.session_data(for_log=False)
def rpc_reorg(self, count=3):
'''Force a reorg of the given number of blocks.
count: number of blocks to reorg (default 3)
'''
count = non_negative_integer(count)
if not self.bp.force_chain_reorg(count):
raise RPCError(BAD_REQUEST, 'still catching up with daemon')
return 'scheduled a reorg of {:,d} blocks'.format(count)
# Helpers for RPC "blockchain" command handlers
def assert_tx_hash(self, value):
@ -619,14 +179,6 @@ class Controller(ServerBase):
except DaemonError as e:
raise RPCError(DAEMON_ERROR, f'daemon error: {e}')
def new_subscription(self):
if self.subs_room <= 0:
self.subs_room = self.max_subs - self.sub_count()
if self.subs_room <= 0:
raise RPCError(BAD_REQUEST, f'server subscription limit '
f'{self.max_subs:,d} reached')
self.subs_room -= 1
async def get_history(self, hashX):
'''Get history asynchronously to reduce latency.'''
if hashX in self.history_cache:

407
electrumx/server/session.py

@ -7,18 +7,25 @@
'''Classes for local RPC server and remote client TCP/SSL servers.'''
import asyncio
import codecs
import datetime
import itertools
import json
import os
import ssl
import time
import datetime
from collections import defaultdict
from functools import partial
from aiorpcx import ServerSession, JSONRPCAutoDetect, RPCError
import electrumx
import electrumx.lib.text as text
import electrumx.lib.util as util
from electrumx.lib.hash import (sha256, hash_to_hex_str, hex_str_to_hash,
HASHX_LEN)
import electrumx.lib.util as util
from electrumx.lib.peer import Peer
from electrumx.server.daemon import DaemonError
@ -50,6 +57,7 @@ def non_negative_integer(value):
class Semaphores(object):
'''For aiorpcX's semaphore handling.'''
def __init__(self, semaphores):
self.semaphores = semaphores
@ -65,6 +73,384 @@ class Semaphores(object):
semaphore.release()
class SessionGroup(object):
def __init__(self, gid):
self.gid = gid
# Concurrency per group
self.semaphore = asyncio.Semaphore(20)
class SessionManager(object):
'''Holds global state about all sessions.'''
CATCHING_UP, LISTENING, PAUSED, SHUTTING_DOWN = range(4)
def __init__(self, env, controller):
self.env = env
self.controller = controller
self.logger = util.class_logger(__name__, self.__class__.__name__)
self.servers = {}
self.sessions = set()
self.max_sessions = env.max_sessions
self.low_watermark = self.max_sessions * 19 // 20
self.max_subs = env.max_subs
self.next_log_sessions = 0
self.cur_group = SessionGroup(0)
self.state = self.CATCHING_UP
self.txs_sent = 0
self.start_time = time.time()
# Cache some idea of room to avoid recounting on each subscription
self.subs_room = 0
# Event triggered when electrumx is listening for incoming requests.
self.server_listening = asyncio.Event()
# Set up the RPC request handlers
cmds = ('add_peer daemon_url disconnect getinfo groups log peers '
'reorg sessions stop'.split())
self.rpc_handlers = {cmd: getattr(self, 'rpc_' + cmd) for cmd in cmds}
async def _start_server(self, kind, *args, **kw_args):
loop = asyncio.get_event_loop()
if kind == 'RPC':
protocol_class = LocalRPC
else:
protocol_class = self.env.coin.SESSIONCLS
protocol_factory = partial(protocol_class, self, self.controller, kind)
server = loop.create_server(protocol_factory, *args, **kw_args)
host, port = args[:2]
try:
self.servers[kind] = await server
except Exception as e:
self.logger.error('{} server failed to listen on {}:{:d} :{}'
.format(kind, host, port, e))
else:
self.logger.info('{} server listening on {}:{:d}'
.format(kind, host, port))
async def _start_external_servers(self):
'''Start listening on TCP and SSL ports, but only if the respective
port was given in the environment.
'''
env = self.env
host = env.cs_host(for_rpc=False)
if env.tcp_port is not None:
await self._start_server('TCP', host, env.tcp_port)
if env.ssl_port is not None:
sslc = ssl.SSLContext(ssl.PROTOCOL_TLS)
sslc.load_cert_chain(env.ssl_certfile, keyfile=env.ssl_keyfile)
await self._start_server('SSL', host, env.ssl_port, ssl=sslc)
# Change state
self.state = self.LISTENING
self.server_listening.set()
def _close_servers(self, kinds):
'''Close the servers of the given kinds (TCP etc.).'''
if kinds:
self.logger.info('closing down {} listening servers'
.format(', '.join(kinds)))
for kind in kinds:
server = self.servers.pop(kind, None)
if server:
server.close()
def _group_map(self):
group_map = defaultdict(list)
for session in self.sessions:
group_map[session.group].append(session)
return group_map
def _sub_count(self):
return sum(s.sub_count() for s in self.sessions)
def _lookup_session(self, session_id):
try:
session_id = int(session_id)
except Exception:
pass
else:
for session in self.sessions:
if session.session_id == session_id:
return session
return None
def _for_each_session(self, session_ids, operation):
if not isinstance(session_ids, list):
raise RPCError(BAD_REQUEST, 'expected a list of session IDs')
result = []
for session_id in session_ids:
session = self._lookup_session(session_id)
if session:
result.append(operation(session))
else:
result.append('unknown session: {}'.format(session_id))
return result
def _close_session(self, session):
'''Close the session's transport.'''
session.close()
return 'disconnected {:d}'.format(session.session_id)
def _clear_stale_sessions(self):
'''Cut off sessions that haven't done anything for 10 minutes.'''
now = time.time()
stale_cutoff = now - self.env.session_timeout
stale = []
for session in self.sessions:
if session.is_closing():
session.abort()
elif session.last_recv < stale_cutoff:
self._close_session(session)
stale.append(session.session_id)
if stale:
self.logger.info('closing stale connections {}'.format(stale))
# Consolidate small groups
bw_limit = self.env.bandwidth_limit
group_map = self._group_map()
groups = [group for group, sessions in group_map.items()
if len(sessions) <= 5 and
sum(s.bw_charge for s in sessions) < bw_limit]
if len(groups) > 1:
new_group = groups[-1]
for group in groups:
for session in group_map[group]:
session.group = new_group
def _getinfo(self):
'''A one-line summary of server state.'''
group_map = self._group_map()
daemon = self.controller.daemon
bp = self.controller.bp
peer_mgr = self.controller.peer_mgr
return {
'version': electrumx.version,
'daemon': daemon.logged_url(),
'daemon_height': daemon.cached_height(),
'db_height': bp.db_height,
'closing': len([s for s in self.sessions if s.is_closing()]),
'errors': sum(s.rpc.errors for s in self.sessions),
'groups': len(group_map),
'logged': len([s for s in self.sessions if s.log_me]),
'paused': sum(s.paused for s in self.sessions),
'pid': os.getpid(),
'peers': peer_mgr.info(),
'requests': sum(s.count_pending_items() for s in self.sessions),
'sessions': self._session_count(),
'subs': self._sub_count(),
'txs_sent': self.txs_sent,
'uptime': util.formatted_time(time.time() - self.start_time),
}
def _session_data(self, for_log):
'''Returned to the RPC 'sessions' call.'''
now = time.time()
sessions = sorted(self.sessions, key=lambda s: s.start_time)
return [(session.session_id,
session.flags(),
session.peer_address_str(for_log=for_log),
session.client,
session.protocol_version_string(),
session.count_pending_items(),
session.txs_sent,
session.sub_count(),
session.recv_count, session.recv_size,
session.send_count, session.send_size,
now - session.start_time)
for session in sessions]
def _group_data(self):
'''Returned to the RPC 'groups' call.'''
result = []
group_map = self._group_map()
for group, sessions in group_map.items():
result.append([group.gid,
len(sessions),
sum(s.bw_charge for s in sessions),
sum(s.count_pending_items() for s in sessions),
sum(s.txs_sent for s in sessions),
sum(s.sub_count() for s in sessions),
sum(s.recv_count for s in sessions),
sum(s.recv_size for s in sessions),
sum(s.send_count for s in sessions),
sum(s.send_size for s in sessions),
])
return result
# --- LocalRPC command handlers
def rpc_add_peer(self, real_name):
'''Add a peer.
real_name: a real name, as would appear on IRC
'''
peer = Peer.from_real_name(real_name, 'RPC')
self.controller.peer_mgr.add_peers([peer])
return "peer '{}' added".format(real_name)
def rpc_disconnect(self, session_ids):
'''Disconnect sesssions.
session_ids: array of session IDs
'''
return self._for_each_session(session_ids, self._close_session)
def rpc_log(self, session_ids):
'''Toggle logging of sesssions.
session_ids: array of session IDs
'''
def toggle_logging(session):
'''Toggle logging of the session.'''
session.toggle_logging()
return 'log {:d}: {}'.format(session.session_id, session.log_me)
return self._for_each_session(session_ids, toggle_logging)
def rpc_daemon_url(self, daemon_url=None):
'''Replace the daemon URL.'''
daemon_url = daemon_url or self.env.daemon_url
daemon = self.controller.daemon
try:
daemon.set_urls(self.env.coin.daemon_urls(daemon_url))
except Exception as e:
raise RPCError(BAD_REQUEST, f'an error occured: {e}')
return 'now using daemon at {}'.format(daemon.logged_url())
def rpc_stop(self):
'''Shut down the server cleanly.'''
loop = asyncio.get_event_loop()
loop.call_soon(self.controller.shutdown_event.set)
return 'stopping'
def rpc_getinfo(self):
'''Return summary information about the server process.'''
return self._getinfo()
def rpc_groups(self):
'''Return statistics about the session groups.'''
return self._group_data()
def rpc_peers(self):
'''Return a list of data about server peers.'''
return self.controller.peer_mgr.rpc_data()
def rpc_sessions(self):
'''Return statistics about connected sessions.'''
return self._session_data(for_log=False)
def rpc_reorg(self, count=3):
'''Force a reorg of the given number of blocks.
count: number of blocks to reorg (default 3)
'''
count = non_negative_integer(count)
if not self.controller.bp.force_chain_reorg(count):
raise RPCError(BAD_REQUEST, 'still catching up with daemon')
return 'scheduled a reorg of {:,d} blocks'.format(count)
# --- External Interface
async def start_serving(self):
'''Start TCP and SSL servers.'''
self.logger.info('max session count: {:,d}'.format(self.max_sessions))
self.logger.info('session timeout: {:,d} seconds'
.format(self.env.session_timeout))
self.logger.info('session bandwidth limit {:,d} bytes'
.format(self.env.bandwidth_limit))
self.logger.info('max response size {:,d} bytes'
.format(self.env.max_send))
self.logger.info('max subscriptions across all sessions: {:,d}'
.format(self.max_subs))
self.logger.info('max subscriptions per session: {:,d}'
.format(self.env.max_session_subs))
if self.env.drop_client is not None:
self.logger.info('drop clients matching: {}'
.format(self.env.drop_client.pattern))
await self._start_external_servers()
async def start_rpc_server(self):
if self.env.rpc_port is not None:
await self._start_server('RPC', self.env.cs_host(for_rpc=True),
self.env.rpc_port)
async def shutdown(self):
'''Close servers and sessions.'''
self.state = self.SHUTTING_DOWN
self._close_servers(list(self.servers.keys()))
for session in self.sessions:
session.abort()
for session in list(self.sessions):
await session.wait_closed()
def session_count(self):
'''The number of connections that we've sent something to.'''
return len(self.sessions)
def notify(self, height, touched):
# Height notifications are synchronous. Those sessions with
# touched addresses are scheduled for asynchronous completion
create_task = self.controller.create_task
for session in self.sessions:
if isinstance(session, LocalRPC):
continue
session_touched = session.notify(height, touched)
if session_touched is not None:
create_task(session.notify_async(session_touched))
async def housekeeping(self):
'''Regular housekeeping checks.'''
n = 0
while True:
n += 1
await asyncio.sleep(15)
if n % 10 == 0:
self._clear_stale_sessions()
# Start listening for incoming connections if paused and
# session count has fallen
if (self.state == self.PAUSED and
len(self.sessions) <= self.low_watermark):
await self._start_external_servers()
# Periodically log sessions
if self.env.log_sessions and time.time() > self.next_log_sessions:
if self.next_log_sessions:
data = self._session_data(for_log=True)
for line in text.sessions_lines(data):
self.logger.info(line)
self.logger.info(json.dumps(self._getinfo()))
self.next_log_sessions = time.time() + self.env.log_sessions
def add_session(self, session):
self.sessions.add(session)
if (len(self.sessions) >= self.max_sessions
and self.state == self.LISTENING):
self.state = self.PAUSED
session.logger.info('maximum sessions {:,d} reached, stopping new '
'connections until count drops to {:,d}'
.format(self.max_sessions, self.low_watermark))
self._close_servers(['TCP', 'SSL'])
gid = int(session.start_time - self.start_time) // 900
if self.cur_group.gid != gid:
self.cur_group = SessionGroup(gid)
return self.cur_group
def remove_session(self, session):
'''Remove a session from our sessions list if there.'''
self.sessions.remove(session)
def new_subscription(self):
if self.subs_room <= 0:
self.subs_room = self.max_subs - self._sub_count()
if self.subs_room <= 0:
raise RPCError(BAD_REQUEST, f'server subscription limit '
f'{self.max_subs:,d} reached')
self.subs_room -= 1
class SessionBase(ServerSession):
'''Base class of ElectrumX JSON sessions.
@ -75,11 +461,12 @@ class SessionBase(ServerSession):
MAX_CHUNK_SIZE = 2016
session_counter = itertools.count()
def __init__(self, controller, kind):
def __init__(self, session_mgr, controller, kind):
super().__init__(rpc_protocol=JSONRPCAutoDetect)
self.logger = util.class_logger(__name__, self.__class__.__name__)
self.kind = kind # 'RPC', 'TCP' etc.
self.session_mgr = session_mgr
self.controller = controller
self.kind = kind # 'RPC', 'TCP' etc.
self.bp = controller.bp
self.env = controller.env
self.coin = self.env.coin
@ -126,14 +513,14 @@ class SessionBase(ServerSession):
context = {'conn_id': f'{self.session_id}'}
self.logger = util.ConnectionLogger(self.logger, context)
self.rpc.logger = self.logger
self.group = self.controller.add_session(self)
self.group = self.session_mgr.add_session(self)
self.logger.info(f'{self.kind} {self.peer_address_str()}, '
f'{len(self.controller.sessions):,d} total')
f'{self.session_mgr.session_count():,d} total')
def connection_lost(self, exc):
'''Handle client disconnection.'''
super().connection_lost(exc)
self.controller.remove_session(self)
self.session_mgr.remove_session(self)
msg = ''
if self.paused:
msg += ' whilst paused'
@ -349,7 +736,7 @@ class ElectrumX(SessionBase):
f'{self.max_subs:,d} reached')
# Now let the controller check its limit
self.controller.new_subscription()
self.session_mgr.new_subscription()
self.hashX_subs[hashX] = alias
return await self.address_status(hashX)
@ -621,8 +1008,8 @@ class ElectrumX(SessionBase):
try:
tx_hash = await self.daemon.sendrawtransaction([raw_tx])
self.txs_sent += 1
self.session_mgr.txs_sent += 1
self.logger.info('sent tx: {}'.format(tx_hash))
self.controller.sent_tx(tx_hash)
return tx_hash
except DaemonError as e:
error, = e.args
@ -706,7 +1093,7 @@ class LocalRPC(SessionBase):
def request_handler(self, method):
'''Return the async handler for the given request method.'''
return self.controller.rpc_handlers.get(method)
return self.session_mgr.rpc_handlers.get(method)
class DashElectrumX(ElectrumX):

6
electrumx_rpc

@ -15,9 +15,9 @@ import asyncio
import json
from os import environ
from aiorpcx import ClientSession
import electrumx.lib.text as text
from electrumx import Controller
from aiorpcx import ClientSession
def main():
@ -46,7 +46,7 @@ def main():
async with ClientSession('localhost', port) as session:
result = await session.send_request(method, params, timeout=15)
if method in ('groups', 'peers', 'sessions'):
lines_func = getattr(Controller, f'{method}_text_lines')
lines_func = getattr(text, f'{method}_lines')
for line in lines_func(result):
print(line)
else:

Loading…
Cancel
Save