Browse Source

Update to aiorpcX 0.6

patch-2
Neil Booth 7 years ago
parent
commit
2c7d56d097
  1. 6
      electrumx/server/controller.py
  2. 79
      electrumx/server/peers.py
  3. 207
      electrumx/server/session.py
  4. 2
      setup.py

6
electrumx/server/controller.py

@ -81,10 +81,8 @@ class Controller(ServerBase):
'''Start the RPC server and wait for the mempool to synchronize. Then '''Start the RPC server and wait for the mempool to synchronize. Then
start serving external clients. start serving external clients.
''' '''
reqd_version = (0, 5, 9) if not (0, 6) <= aiorpcx_version < (0, 7):
if aiorpcx_version != reqd_version: raise RuntimeError('ElectrumX requires aiorpcX version 0.6.x')
raise RuntimeError('ElectrumX requires aiorpcX version '
f'{version_string(reqd_version)}')
env = self.env env = self.env
min_str, max_str = env.coin.SESSIONCLS.protocol_min_max_strings() min_str, max_str = env.coin.SESSIONCLS.protocol_min_max_strings()

79
electrumx/server/peers.py

@ -14,9 +14,10 @@ import ssl
import time import time
from collections import defaultdict, Counter from collections import defaultdict, Counter
from aiorpcx import (ClientSession, SOCKSProxy, SOCKSError, from aiorpcx import (ClientSession, SOCKSProxy,
RPCError, ConnectionError, Notification, handler_invocation,
TaskGroup, run_in_thread, ignore_after) SOCKSError, RPCError, TaskTimeout,
TaskGroup, run_in_thread, ignore_after, timeout_after)
from electrumx.lib.peer import Peer from electrumx.lib.peer import Peer
from electrumx.lib.util import class_logger, protocol_tuple from electrumx.lib.util import class_logger, protocol_tuple
@ -39,14 +40,13 @@ def assert_good(message, result, instance):
class PeerSession(ClientSession): class PeerSession(ClientSession):
'''An outgoing session to a peer.''' '''An outgoing session to a peer.'''
def _header_notification(self, header): async def handle_request(self, request):
pass
def notification_handler(self, method):
# We subscribe so might be unlucky enough to get a notification... # We subscribe so might be unlucky enough to get a notification...
if method == 'blockchain.headers.subscribe': if (isinstance(request, Notification) and
return self._header_notification request.method == 'blockchain.headers.subscribe'):
return None pass
else:
await handler_invocation(None, request) # Raises
class PeerManager(object): class PeerManager(object):
@ -222,34 +222,30 @@ class PeerManager(object):
# connections so our peers see the correct source. # connections so our peers see the correct source.
kwargs['local_addr'] = (host, None) kwargs['local_addr'] = (host, None)
peer_text = f'[{peer}:{port} {kind}]'
try: try:
async with PeerSession(peer.host, port, **kwargs) as session: async with timeout_after(120 if peer.is_tor else 30):
async with PeerSession(peer.host, port,
**kwargs) as session:
await self._verify_peer(session, peer) await self._verify_peer(session, peer)
is_good = True is_good = True
break break
except BadPeerError as e: except BadPeerError as e:
self.logger.error(f'[{peer}] marking bad: ({e})') self.logger.error(f'{peer_text} marking bad: ({e})')
peer.mark_bad() peer.mark_bad()
break break
except RPCError as e: except RPCError as e:
self.logger.error(f'[{peer}] RPC error: {e.message} ' self.logger.error(f'{peer_text} RPC error: {e.message} '
f'({e.code})') f'({e.code})')
except asyncio.TimeoutError as e: except TaskTimeout as e:
self.logger.error(f'[{peer}] {e}') self.logger.error(f'{peer_text} timed out after {e.args[0]}s')
except (OSError, SOCKSError, ConnectionError) as e: except (OSError, SOCKSError, ConnectionError) as e:
self.logger.info(f'[{peer}] {kind} connection to ' self.logger.info(f'{peer_text} {e}')
f'port {port} failed: {e}')
if is_good:
now = time.time() now = time.time()
if self.env.force_proxy or peer.is_tor:
how = f'via {kind} over Tor'
else:
how = f'via {kind} at {peer.ip_addr}'
status = 'verified' if is_good else 'failed to verify'
elapsed = now - peer.last_try elapsed = now - peer.last_try
self.logger.info(f'{status} {peer} {how} in {elapsed:.1f}s') self.logger.info(f'{peer_text} verified in {elapsed:.1f}s')
if is_good:
peer.try_count = 0 peer.try_count = 0
peer.last_good = now peer.last_good = now
peer.source = 'peer' peer.source = 'peer'
@ -283,12 +279,9 @@ class PeerManager(object):
if address: if address:
peer.ip_addr = address[0] peer.ip_addr = address[0]
timeout = 20 if peer.is_tor else 10
# server.version goes first # server.version goes first
message = 'server.version' message = 'server.version'
result = await session.send_request( result = await session.send_request(message, self.server_version_args)
message, self.server_version_args, timeout=timeout)
assert_good(message, result, list) assert_good(message, result, list)
# Protocol version 1.1 returns a pair with the version first # Protocol version 1.1 returns a pair with the version first
@ -299,13 +292,14 @@ class PeerManager(object):
peer.features['server_version'] = server_version peer.features['server_version'] = server_version
ptuple = protocol_tuple(protocol_version) ptuple = protocol_tuple(protocol_version)
await self._send_headers_subscribe(session, peer, timeout, ptuple) # FIXME: make these concurrent with first exception preserved
await self._send_server_features(session, peer, timeout) await self._send_headers_subscribe(session, peer, ptuple)
await self._send_peers_subscribe(session, peer, timeout) await self._send_server_features(session, peer)
await self._send_peers_subscribe(session, peer)
async def _send_headers_subscribe(self, session, peer, timeout, ptuple): async def _send_headers_subscribe(self, session, peer, ptuple):
message = 'blockchain.headers.subscribe' message = 'blockchain.headers.subscribe'
result = await session.send_request(message, timeout=timeout) result = await session.send_request(message)
assert_good(message, result, dict) assert_good(message, result, dict)
our_height = self.chain_state.db_height() our_height = self.chain_state.db_height()
@ -325,8 +319,7 @@ class PeerManager(object):
if ptuple >= (1, 4): if ptuple >= (1, 4):
ours = raw_header.hex() ours = raw_header.hex()
message = 'blockchain.block.header' message = 'blockchain.block.header'
theirs = await session.send_request(message, [check_height], theirs = await session.send_request(message, [check_height])
timeout=timeout)
assert_good(message, theirs, str) assert_good(message, theirs, str)
if ours != theirs: if ours != theirs:
raise BadPeerError(f'our header {ours} and ' raise BadPeerError(f'our header {ours} and '
@ -335,17 +328,16 @@ class PeerManager(object):
ours = self.env.coin.electrum_header(raw_header, check_height) ours = self.env.coin.electrum_header(raw_header, check_height)
ours = ours.get('prev_block_hash') ours = ours.get('prev_block_hash')
message = 'blockchain.block.get_header' message = 'blockchain.block.get_header'
theirs = await session.send_request(message, [check_height], theirs = await session.send_request(message, [check_height])
timeout=timeout)
assert_good(message, theirs, dict) assert_good(message, theirs, dict)
theirs = theirs.get('prev_block_hash') theirs = theirs.get('prev_block_hash')
if ours != theirs: if ours != theirs:
raise BadPeerError(f'our header hash {ours} and ' raise BadPeerError(f'our header hash {ours} and '
f'theirs {theirs} differ') f'theirs {theirs} differ')
async def _send_server_features(self, session, peer, timeout): async def _send_server_features(self, session, peer):
message = 'server.features' message = 'server.features'
features = await session.send_request(message, timeout=timeout) features = await session.send_request(message)
assert_good(message, features, dict) assert_good(message, features, dict)
hosts = [host.lower() for host in features.get('hosts', {})] hosts = [host.lower() for host in features.get('hosts', {})]
if self.env.coin.GENESIS_HASH != features.get('genesis_hash'): if self.env.coin.GENESIS_HASH != features.get('genesis_hash'):
@ -355,9 +347,9 @@ class PeerManager(object):
else: else:
raise BadPeerError(f'not listed in own hosts list {hosts}') raise BadPeerError(f'not listed in own hosts list {hosts}')
async def _send_peers_subscribe(self, session, peer, timeout): async def _send_peers_subscribe(self, session, peer):
message = 'server.peers.subscribe' message = 'server.peers.subscribe'
raw_peers = await session.send_request(message, timeout=timeout) raw_peers = await session.send_request(message)
assert_good(message, raw_peers, list) assert_good(message, raw_peers, list)
# Check the peers list we got from a remote peer. # Check the peers list we got from a remote peer.
@ -377,8 +369,7 @@ class PeerManager(object):
return return
self.logger.info(f'registering ourself with {peer}') self.logger.info(f'registering ourself with {peer}')
# We only care to wait for the response # We only care to wait for the response
await session.send_request('server.add_peer', [features], await session.send_request('server.add_peer', [features])
timeout=timeout)
# #
# External interface # External interface

207
electrumx/server/session.py

@ -18,7 +18,10 @@ import time
from collections import defaultdict from collections import defaultdict
from functools import partial from functools import partial
from aiorpcx import ServerSession, JSONRPCAutoDetect, RPCError, TaskGroup from aiorpcx import (
ServerSession, JSONRPCAutoDetect, TaskGroup, handler_invocation,
RPCError, Request, ignore_after
)
import electrumx import electrumx
import electrumx.lib.text as text import electrumx.lib.text as text
@ -112,7 +115,6 @@ class SessionManager(object):
self.max_sessions = env.max_sessions self.max_sessions = env.max_sessions
self.low_watermark = self.max_sessions * 19 // 20 self.low_watermark = self.max_sessions * 19 // 20
self.max_subs = env.max_subs self.max_subs = env.max_subs
self.next_log_sessions = 0
self.cur_group = SessionGroup(0) self.cur_group = SessionGroup(0)
self.state = self.CATCHING_UP self.state = self.CATCHING_UP
self.txs_sent = 0 self.txs_sent = 0
@ -131,7 +133,8 @@ class SessionManager(object):
# Set up the RPC request handlers # Set up the RPC request handlers
cmds = ('add_peer daemon_url disconnect getinfo groups log peers ' cmds = ('add_peer daemon_url disconnect getinfo groups log peers '
'query reorg sessions stop'.split()) 'query reorg sessions stop'.split())
self.rpc_handlers = {cmd: getattr(self, 'rpc_' + cmd) for cmd in cmds} LocalRPC.request_handlers = {cmd: getattr(self, 'rpc_' + cmd)
for cmd in cmds}
async def _start_server(self, kind, *args, **kw_args): async def _start_server(self, kind, *args, **kw_args):
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
@ -169,7 +172,7 @@ class SessionManager(object):
self.state = self.LISTENING self.state = self.LISTENING
self.server_listening.set() self.server_listening.set()
def _close_servers(self, kinds): async def _close_servers(self, kinds):
'''Close the servers of the given kinds (TCP etc.).''' '''Close the servers of the given kinds (TCP etc.).'''
if kinds: if kinds:
self.logger.info('closing down {} listening servers' self.logger.info('closing down {} listening servers'
@ -178,30 +181,27 @@ class SessionManager(object):
server = self.servers.pop(kind, None) server = self.servers.pop(kind, None)
if server: if server:
server.close() server.close()
await server.wait_closed()
async def _housekeeping(self): async def _restart_if_paused(self):
'''Regular housekeeping checks.'''
n = 0
while True: while True:
n += 1
await asyncio.sleep(15) await asyncio.sleep(15)
if n % 10 == 0:
self._clear_stale_sessions()
# Start listening for incoming connections if paused and # Start listening for incoming connections if paused and
# session count has fallen # session count has fallen
if (self.state == self.PAUSED and if (self.state == self.PAUSED and
len(self.sessions) <= self.low_watermark): len(self.sessions) <= self.low_watermark):
await self._start_external_servers() await self._start_external_servers()
# Periodically log sessions async def _log_sessions(self):
if self.env.log_sessions and time.time() > self.next_log_sessions: '''Periodically log sessions.'''
if self.next_log_sessions: log_interval = self.env.log_sessions
if log_interval:
while True:
await asyncio.sleep(log_interval)
data = self._session_data(for_log=True) data = self._session_data(for_log=True)
for line in text.sessions_lines(data): for line in text.sessions_lines(data):
self.logger.info(line) self.logger.info(line)
self.logger.info(json.dumps(self._get_info())) self.logger.info(json.dumps(self._get_info()))
self.next_log_sessions = time.time() + self.env.log_sessions
def _group_map(self): def _group_map(self):
group_map = defaultdict(list) group_map = defaultdict(list)
@ -223,7 +223,7 @@ class SessionManager(object):
return session return session
return None return None
def _for_each_session(self, session_ids, operation): async def _for_each_session(self, session_ids, operation):
if not isinstance(session_ids, list): if not isinstance(session_ids, list):
raise RPCError(BAD_REQUEST, 'expected a list of session IDs') raise RPCError(BAD_REQUEST, 'expected a list of session IDs')
@ -231,30 +231,29 @@ class SessionManager(object):
for session_id in session_ids: for session_id in session_ids:
session = self._lookup_session(session_id) session = self._lookup_session(session_id)
if session: if session:
result.append(operation(session)) result.append(await operation(session))
else: else:
result.append('unknown session: {}'.format(session_id)) result.append(f'unknown session: {session_id}')
return result return result
def _close_session(self, session): async def _clear_stale_sessions(self):
'''Close the session's transport.'''
session.close()
return 'disconnected {:d}'.format(session.session_id)
def _clear_stale_sessions(self):
'''Cut off sessions that haven't done anything for 10 minutes.''' '''Cut off sessions that haven't done anything for 10 minutes.'''
now = time.time() while True:
stale_cutoff = now - self.env.session_timeout await asyncio.sleep(60)
stale_cutoff = time.time() - self.env.session_timeout
stale = [] stale_sessions = [session for session in self.sessions
for session in self.sessions: if session.last_recv < stale_cutoff]
if session.is_closing(): if stale_sessions:
text = ', '.join(str(session.session_id)
for session in stale_sessions)
self.logger.info(f'closing stale connections {text}')
# Give the sockets some time to close gracefully
async with ignore_after(20):
async with TaskGroup() as group:
for session in stale_sessions:
group.spawn(session.close())
for session in stale_sessions:
session.abort() session.abort()
elif session.last_recv < stale_cutoff:
self._close_session(session)
stale.append(session.session_id)
if stale:
self.logger.info('closing stale connections {}'.format(stale))
# Consolidate small groups # Consolidate small groups
bw_limit = self.env.bandwidth_limit bw_limit = self.env.bandwidth_limit
@ -275,7 +274,7 @@ class SessionManager(object):
result.update({ result.update({
'version': electrumx.version, 'version': electrumx.version,
'closing': len([s for s in self.sessions if s.is_closing()]), 'closing': len([s for s in self.sessions if s.is_closing()]),
'errors': sum(s.rpc.errors for s in self.sessions), 'errors': sum(s.errors for s in self.sessions),
'groups': len(group_map), 'groups': len(group_map),
'logged': len([s for s in self.sessions if s.log_me]), 'logged': len([s for s in self.sessions if s.log_me]),
'paused': sum(s.paused for s in self.sessions), 'paused': sum(s.paused for s in self.sessions),
@ -334,26 +333,33 @@ class SessionManager(object):
await self.peer_mgr.add_localRPC_peer(real_name) await self.peer_mgr.add_localRPC_peer(real_name)
return "peer '{}' added".format(real_name) return "peer '{}' added".format(real_name)
def rpc_disconnect(self, session_ids): async def rpc_disconnect(self, session_ids):
'''Disconnect sesssions. '''Disconnect sesssions.
session_ids: array of session IDs session_ids: array of session IDs
''' '''
return self._for_each_session(session_ids, self._close_session) async def close(session):
'''Close the session's transport.'''
async with ignore_after(2):
await session.close()
session.abort()
return f'disconnected {session.session_id}'
def rpc_log(self, session_ids): return await self._for_each_session(session_ids, close)
async def rpc_log(self, session_ids):
'''Toggle logging of sesssions. '''Toggle logging of sesssions.
session_ids: array of session IDs session_ids: array of session IDs
''' '''
def toggle_logging(session): async def toggle_logging(session):
'''Toggle logging of the session.''' '''Toggle logging of the session.'''
session.toggle_logging() session.toggle_logging()
return 'log {:d}: {}'.format(session.session_id, session.log_me) return f'log {session.session_id}: {session.log_me}'
return self._for_each_session(session_ids, toggle_logging) return await self._for_each_session(session_ids, toggle_logging)
def rpc_daemon_url(self, daemon_url): async def rpc_daemon_url(self, daemon_url):
'''Replace the daemon URL.''' '''Replace the daemon URL.'''
daemon_url = daemon_url or self.env.daemon_url daemon_url = daemon_url or self.env.daemon_url
try: try:
@ -362,20 +368,20 @@ class SessionManager(object):
raise RPCError(BAD_REQUEST, f'an error occured: {e}') raise RPCError(BAD_REQUEST, f'an error occured: {e}')
return f'now using daemon at {daemon_url}' return f'now using daemon at {daemon_url}'
def rpc_stop(self): async def rpc_stop(self):
'''Shut down the server cleanly.''' '''Shut down the server cleanly.'''
self.shutdown_event.set() self.shutdown_event.set()
return 'stopping' return 'stopping'
def rpc_getinfo(self): async def rpc_getinfo(self):
'''Return summary information about the server process.''' '''Return summary information about the server process.'''
return self._get_info() return self._get_info()
def rpc_groups(self): async def rpc_groups(self):
'''Return statistics about the session groups.''' '''Return statistics about the session groups.'''
return self._group_data() return self._group_data()
def rpc_peers(self): async def rpc_peers(self):
'''Return a list of data about server peers.''' '''Return a list of data about server peers.'''
return self.peer_mgr.rpc_data() return self.peer_mgr.rpc_data()
@ -383,11 +389,11 @@ class SessionManager(object):
'''Return a list of data about server peers.''' '''Return a list of data about server peers.'''
return await self.chain_state.query(items, limit) return await self.chain_state.query(items, limit)
def rpc_sessions(self): async def rpc_sessions(self):
'''Return statistics about connected sessions.''' '''Return statistics about connected sessions.'''
return self._session_data(for_log=False) return self._session_data(for_log=False)
def rpc_reorg(self, count): async def rpc_reorg(self, count):
'''Force a reorg of the given number of blocks. '''Force a reorg of the given number of blocks.
count: number of blocks to reorg count: number of blocks to reorg
@ -424,17 +430,19 @@ class SessionManager(object):
await self._start_external_servers() await self._start_external_servers()
# Peer discovery should start after the external servers # Peer discovery should start after the external servers
# because we connect to ourself # because we connect to ourself
async with TaskGroup() as group: async with TaskGroup(wait=object) as group:
await group.spawn(self.peer_mgr.discover_peers(group)) await group.spawn(self.peer_mgr.discover_peers(group))
await group.spawn(self._housekeeping()) await group.spawn(self._clear_stale_sessions())
await group.spawn(self._log_sessions())
await group.spawn(self._restart_if_paused())
finally: finally:
# Close servers and sessions # Close servers and sessions
self.state = self.SHUTTING_DOWN self.state = self.SHUTTING_DOWN
self._close_servers(list(self.servers.keys())) await self._close_servers(list(self.servers.keys()))
for session in self.sessions: for session in self.sessions:
session.abort() session.abort()
for session in list(self.sessions): for session in list(self.sessions):
await session.wait_closed() await session.close()
def session_count(self): def session_count(self):
'''The number of connections that we've sent something to.''' '''The number of connections that we've sent something to.'''
@ -454,7 +462,8 @@ class SessionManager(object):
session.logger.info('maximum sessions {:,d} reached, stopping new ' session.logger.info('maximum sessions {:,d} reached, stopping new '
'connections until count drops to {:,d}' 'connections until count drops to {:,d}'
.format(self.max_sessions, self.low_watermark)) .format(self.max_sessions, self.low_watermark))
self._close_servers(['TCP', 'SSL']) loop = asyncio.get_event_loop()
loop.call_soon(self._close_servers(['TCP', 'SSL']))
gid = int(session.start_time - self.start_time) // 900 gid = int(session.start_time - self.start_time) // 900
if self.cur_group.gid != gid: if self.cur_group.gid != gid:
self.cur_group = SessionGroup(gid) self.cur_group = SessionGroup(gid)
@ -484,7 +493,7 @@ class SessionBase(ServerSession):
session_counter = itertools.count() session_counter = itertools.count()
def __init__(self, session_mgr, chain_state, mempool, peer_mgr, kind): def __init__(self, session_mgr, chain_state, mempool, peer_mgr, kind):
super().__init__(rpc_protocol=JSONRPCAutoDetect) super().__init__(protocol=JSONRPCAutoDetect)
self.logger = util.class_logger(__name__, self.__class__.__name__) self.logger = util.class_logger(__name__, self.__class__.__name__)
self.session_mgr = session_mgr self.session_mgr = session_mgr
self.chain_state = chain_state self.chain_state = chain_state
@ -498,7 +507,7 @@ class SessionBase(ServerSession):
self.txs_sent = 0 self.txs_sent = 0
self.log_me = False self.log_me = False
self.bw_limit = self.env.bandwidth_limit self.bw_limit = self.env.bandwidth_limit
self._orig_mr = self.rpc.message_received self._crm_original = self.connection.receive_message
async def notify(self, height, touched): async def notify(self, height, touched):
pass pass
@ -510,16 +519,16 @@ class SessionBase(ServerSession):
return 'xx.xx.xx.xx:xx' return 'xx.xx.xx.xx:xx'
return super().peer_address_str() return super().peer_address_str()
def message_received(self, message): def receive_message(self, message):
self.logger.info(f'processing {message}') self.logger.info(f'processing {message}')
self._orig_mr(message) self._crm_original(message)
def toggle_logging(self): def toggle_logging(self):
self.log_me = not self.log_me self.log_me = not self.log_me
if self.log_me: if self.log_me:
self.rpc.message_received = self.message_received self.connection.receive_message = self.receive_message
else: else:
self.rpc.message_received = self._orig_mr self.connection.receive_message = self._crm_original
def flags(self): def flags(self):
'''Status flags.''' '''Status flags.'''
@ -537,7 +546,6 @@ class SessionBase(ServerSession):
self.session_id = next(self.session_counter) self.session_id = next(self.session_counter)
context = {'conn_id': f'{self.session_id}'} context = {'conn_id': f'{self.session_id}'}
self.logger = util.ConnectionLogger(self.logger, context) self.logger = util.ConnectionLogger(self.logger, context)
self.rpc.logger = self.logger
self.group = self.session_mgr.add_session(self) self.group = self.session_mgr.add_session(self)
self.logger.info(f'{self.kind} {self.peer_address_str()}, ' self.logger.info(f'{self.kind} {self.peer_address_str()}, '
f'{self.session_mgr.session_count():,d} total') f'{self.session_mgr.session_count():,d} total')
@ -559,7 +567,7 @@ class SessionBase(ServerSession):
self.logger.info(msg) self.logger.info(msg)
def count_pending_items(self): def count_pending_items(self):
return self.rpc.pending_requests return len(self.connection.pending_requests())
def semaphore(self): def semaphore(self):
return Semaphores([self.concurrency.semaphore, self.group.semaphore]) return Semaphores([self.concurrency.semaphore, self.group.semaphore])
@ -567,6 +575,15 @@ class SessionBase(ServerSession):
def sub_count(self): def sub_count(self):
return 0 return 0
async def handle_request(self, request):
'''Return the async handler for the given request method.'''
if isinstance(request, Request):
handler = self.request_handlers.get(request.method)
else:
handler = None
coro = handler_invocation(handler, request)()
return await coro
class ElectrumX(SessionBase): class ElectrumX(SessionBase):
'''A TCP server that handles incoming Electrum connections.''' '''A TCP server that handles incoming Electrum connections.'''
@ -579,12 +596,12 @@ class ElectrumX(SessionBase):
self.subscribe_headers = False self.subscribe_headers = False
self.subscribe_headers_raw = False self.subscribe_headers_raw = False
self.notified_height = None self.notified_height = None
self.max_response_size = self.env.max_send self.connection._max_response_size = self.env.max_send
self.max_subs = self.env.max_session_subs self.max_subs = self.env.max_session_subs
self.hashX_subs = {} self.hashX_subs = {}
self.sv_seen = False self.sv_seen = False
self.mempool_statuses = {} self.mempool_statuses = {}
self.set_protocol_handlers(self.PROTOCOL_MIN) self.set_request_handlers(self.PROTOCOL_MIN)
self.db_height = self.chain_state.db_height self.db_height = self.chain_state.db_height
@classmethod @classmethod
@ -606,6 +623,9 @@ class ElectrumX(SessionBase):
'hash_function': 'sha256', 'hash_function': 'sha256',
} }
async def server_features_async(self):
return self.server_features(self.env)
@classmethod @classmethod
def server_version_args(cls): def server_version_args(cls):
'''The arguments to a server.version RPC call to a peer.''' '''The arguments to a server.version RPC call to a peer.'''
@ -647,7 +667,7 @@ class ElectrumX(SessionBase):
method = 'blockchain.scripthash.subscribe' method = 'blockchain.scripthash.subscribe'
else: else:
method = 'blockchain.address.subscribe' method = 'blockchain.address.subscribe'
self.send_notification(method, (alias, status)) await self.send_notification(method, (alias, status))
if changed: if changed:
es = '' if len(changed) == 1 else 'es' es = '' if len(changed) == 1 else 'es'
@ -669,7 +689,8 @@ class ElectrumX(SessionBase):
self.notified_height = height self.notified_height = height
if self.subscribe_headers: if self.subscribe_headers:
args = (self.subscribe_headers_result(height), ) args = (self.subscribe_headers_result(height), )
self.send_notification('blockchain.headers.subscribe', args) await self.send_notification('blockchain.headers.subscribe',
args)
touched = touched.intersection(self.hashX_subs) touched = touched.intersection(self.hashX_subs)
if touched or (height_changed and self.mempool_statuses): if touched or (height_changed and self.mempool_statuses):
@ -707,15 +728,15 @@ class ElectrumX(SessionBase):
self.notified_height = self.db_height() self.notified_height = self.db_height()
return self.subscribe_headers_result(self.notified_height) return self.subscribe_headers_result(self.notified_height)
def headers_subscribe(self): async def headers_subscribe(self):
'''Subscribe to get raw headers of new blocks.''' '''Subscribe to get raw headers of new blocks.'''
return self._headers_subscribe(True) return self._headers_subscribe(True)
def headers_subscribe_True(self, raw=True): async def headers_subscribe_True(self, raw=True):
'''Subscribe to get headers of new blocks.''' '''Subscribe to get headers of new blocks.'''
return self._headers_subscribe(raw) return self._headers_subscribe(raw)
def headers_subscribe_False(self, raw=False): async def headers_subscribe_False(self, raw=False):
'''Subscribe to get headers of new blocks.''' '''Subscribe to get headers of new blocks.'''
return self._headers_subscribe(raw) return self._headers_subscribe(raw)
@ -723,7 +744,7 @@ class ElectrumX(SessionBase):
'''Add a peer (but only if the peer resolves to the source).''' '''Add a peer (but only if the peer resolves to the source).'''
return await self.peer_mgr.on_add_peer(features, self.peer_address()) return await self.peer_mgr.on_add_peer(features, self.peer_address())
def peers_subscribe(self): async def peers_subscribe(self):
'''Return the server peers as a list of (ip, host, details) tuples.''' '''Return the server peers as a list of (ip, host, details) tuples.'''
return self.peer_mgr.on_peers_subscribe(self.is_tor()) return self.peer_mgr.on_peers_subscribe(self.is_tor())
@ -875,7 +896,7 @@ class ElectrumX(SessionBase):
'root': hash_to_hex_str(root), 'root': hash_to_hex_str(root),
} }
def block_header(self, height, cp_height=0): async def block_header(self, height, cp_height=0):
'''Return a raw block header as a hexadecimal string, or as a '''Return a raw block header as a hexadecimal string, or as a
dictionary with a merkle proof.''' dictionary with a merkle proof.'''
height = non_negative_integer(height) height = non_negative_integer(height)
@ -887,13 +908,13 @@ class ElectrumX(SessionBase):
result.update(self._merkle_proof(cp_height, height)) result.update(self._merkle_proof(cp_height, height))
return result return result
def block_header_13(self, height): async def block_header_13(self, height):
'''Return a raw block header as a hexadecimal string. '''Return a raw block header as a hexadecimal string.
height: the header's height''' height: the header's height'''
return self.block_header(height) return self.block_header(height)
def block_headers(self, start_height, count, cp_height=0): async def block_headers(self, start_height, count, cp_height=0):
'''Return count concatenated block headers as hex for the main chain; '''Return count concatenated block headers as hex for the main chain;
starting at start_height. starting at start_height.
@ -913,10 +934,10 @@ class ElectrumX(SessionBase):
result.update(self._merkle_proof(cp_height, last_height)) result.update(self._merkle_proof(cp_height, last_height))
return result return result
def block_headers_12(self, start_height, count): async def block_headers_12(self, start_height, count):
return self.block_headers(start_height, count) return await self.block_headers(start_height, count)
def block_get_chunk(self, index): async def block_get_chunk(self, index):
'''Return a chunk of block headers as a hexadecimal string. '''Return a chunk of block headers as a hexadecimal string.
index: the chunk index''' index: the chunk index'''
@ -926,7 +947,7 @@ class ElectrumX(SessionBase):
headers, count = self.chain_state.read_headers(start_height, size) headers, count = self.chain_state.read_headers(start_height, size)
return headers.hex() return headers.hex()
def block_get_header(self, height): async def block_get_header(self, height):
'''The deserialized header at a given height. '''The deserialized header at a given height.
height: the header's height''' height: the header's height'''
@ -959,7 +980,7 @@ class ElectrumX(SessionBase):
banner = banner.replace(*pair) banner = banner.replace(*pair)
return banner return banner
def donation_address(self): async def donation_address(self):
'''Return the donation address as a string, empty if there is none.''' '''Return the donation address as a string, empty if there is none.'''
return self.env.donation_address return self.env.donation_address
@ -996,13 +1017,13 @@ class ElectrumX(SessionBase):
number = non_negative_integer(number) number = non_negative_integer(number)
return await self.daemon_request('estimatefee', [number]) return await self.daemon_request('estimatefee', [number])
def ping(self): async def ping(self):
'''Serves as a connection keep-alive mechanism and for the client to '''Serves as a connection keep-alive mechanism and for the client to
confirm the server is still responding. confirm the server is still responding.
''' '''
return None return None
def server_version(self, client_name='', protocol_version=None): async def server_version(self, client_name='', protocol_version=None):
'''Returns the server version as a string. '''Returns the server version as a string.
client_name: a string identifying the client client_name: a string identifying the client
@ -1033,7 +1054,7 @@ class ElectrumX(SessionBase):
self.close_after_send = True self.close_after_send = True
raise RPCError(BAD_REQUEST, raise RPCError(BAD_REQUEST,
f'unsupported protocol version: {protocol_version}') f'unsupported protocol version: {protocol_version}')
self.set_protocol_handlers(ptuple) self.set_request_handlers(ptuple)
return (electrumx.version, self.protocol_version_string()) return (electrumx.version, self.protocol_version_string())
@ -1129,7 +1150,7 @@ class ElectrumX(SessionBase):
else: else:
return tx_hash return tx_hash
def set_protocol_handlers(self, ptuple): def set_request_handlers(self, ptuple):
self.protocol_tuple = ptuple self.protocol_tuple = ptuple
handlers = { handlers = {
@ -1148,7 +1169,7 @@ class ElectrumX(SessionBase):
'server.add_peer': self.add_peer, 'server.add_peer': self.add_peer,
'server.banner': self.banner, 'server.banner': self.banner,
'server.donation_address': self.donation_address, 'server.donation_address': self.donation_address,
'server.features': partial(self.server_features, self.env), 'server.features': self.server_features_async,
'server.peers.subscribe': self.peers_subscribe, 'server.peers.subscribe': self.peers_subscribe,
'server.version': self.server_version, 'server.version': self.server_version,
} }
@ -1185,11 +1206,7 @@ class ElectrumX(SessionBase):
'blockchain.address.subscribe': self.address_subscribe, 'blockchain.address.subscribe': self.address_subscribe,
}) })
self.electrumx_handlers = handlers self.request_handlers = handlers
def request_handler(self, method):
'''Return the async handler for the given request method.'''
return self.electrumx_handlers.get(method)
class LocalRPC(SessionBase): class LocalRPC(SessionBase):
@ -1198,15 +1215,11 @@ class LocalRPC(SessionBase):
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
self.client = 'RPC' self.client = 'RPC'
self.max_response_size = 0 self.connection._max_response_size = 0
def protocol_version_string(self): def protocol_version_string(self):
return 'RPC' return 'RPC'
def request_handler(self, method):
'''Return the async handler for the given request method.'''
return self.session_mgr.rpc_handlers.get(method)
class DashElectrumX(ElectrumX): class DashElectrumX(ElectrumX):
'''A TCP server that handles incoming Electrum Dash connections.''' '''A TCP server that handles incoming Electrum Dash connections.'''
@ -1215,9 +1228,9 @@ class DashElectrumX(ElectrumX):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
self.mns = set() self.mns = set()
def set_protocol_handlers(self, ptuple): def set_request_handlers(self, ptuple):
super().set_protocol_handlers(ptuple) super().set_request_handlers(ptuple)
self.electrumx_handlers.update({ self.request_handlers.update({
'masternode.announce.broadcast': 'masternode.announce.broadcast':
self.masternode_announce_broadcast, self.masternode_announce_broadcast,
'masternode.subscribe': self.masternode_subscribe, 'masternode.subscribe': self.masternode_subscribe,
@ -1230,7 +1243,7 @@ class DashElectrumX(ElectrumX):
for mn in self.mns: for mn in self.mns:
status = await self.daemon_request('masternode_list', status = await self.daemon_request('masternode_list',
['status', mn]) ['status', mn])
self.send_notification('masternode.subscribe', await self.send_notification('masternode.subscribe',
[mn, status.get(mn)]) [mn, status.get(mn)])
# Masternode command handlers # Masternode command handlers

2
setup.py

@ -12,7 +12,7 @@ setuptools.setup(
# "blake256" package is required to sync Decred network. # "blake256" package is required to sync Decred network.
# "xevan_hash" package is required to sync Xuez network. # "xevan_hash" package is required to sync Xuez network.
# "groestlcoin_hash" package is required to sync Groestlcoin network. # "groestlcoin_hash" package is required to sync Groestlcoin network.
install_requires=['aiorpcX == 0.5.9', 'attrs>=15', install_requires=['aiorpcX >= 0.6.0', 'aiorpcX < 0.7.0', 'attrs>=15',
'plyvel', 'pylru', 'aiohttp >= 2'], 'plyvel', 'pylru', 'aiohttp >= 2'],
packages=setuptools.find_packages(include=('electrumx*',)), packages=setuptools.find_packages(include=('electrumx*',)),
description='ElectrumX Server', description='ElectrumX Server',

Loading…
Cancel
Save