diff --git a/electrumx/server/controller.py b/electrumx/server/controller.py index f6b0c15..1587b19 100644 --- a/electrumx/server/controller.py +++ b/electrumx/server/controller.py @@ -81,10 +81,8 @@ class Controller(ServerBase): '''Start the RPC server and wait for the mempool to synchronize. Then start serving external clients. ''' - reqd_version = (0, 5, 9) - if aiorpcx_version != reqd_version: - raise RuntimeError('ElectrumX requires aiorpcX version ' - f'{version_string(reqd_version)}') + if not (0, 6) <= aiorpcx_version < (0, 7): + raise RuntimeError('ElectrumX requires aiorpcX version 0.6.x') env = self.env min_str, max_str = env.coin.SESSIONCLS.protocol_min_max_strings() diff --git a/electrumx/server/peers.py b/electrumx/server/peers.py index f045885..abcee0b 100644 --- a/electrumx/server/peers.py +++ b/electrumx/server/peers.py @@ -14,9 +14,10 @@ import ssl import time from collections import defaultdict, Counter -from aiorpcx import (ClientSession, SOCKSProxy, SOCKSError, - RPCError, ConnectionError, - TaskGroup, run_in_thread, ignore_after) +from aiorpcx import (ClientSession, SOCKSProxy, + Notification, handler_invocation, + SOCKSError, RPCError, TaskTimeout, + TaskGroup, run_in_thread, ignore_after, timeout_after) from electrumx.lib.peer import Peer from electrumx.lib.util import class_logger, protocol_tuple @@ -39,14 +40,13 @@ def assert_good(message, result, instance): class PeerSession(ClientSession): '''An outgoing session to a peer.''' - def _header_notification(self, header): - pass - - def notification_handler(self, method): + async def handle_request(self, request): # We subscribe so might be unlucky enough to get a notification... - if method == 'blockchain.headers.subscribe': - return self._header_notification - return None + if (isinstance(request, Notification) and + request.method == 'blockchain.headers.subscribe'): + pass + else: + await handler_invocation(None, request) # Raises class PeerManager(object): @@ -222,34 +222,30 @@ class PeerManager(object): # connections so our peers see the correct source. kwargs['local_addr'] = (host, None) + peer_text = f'[{peer}:{port} {kind}]' try: - async with PeerSession(peer.host, port, **kwargs) as session: - await self._verify_peer(session, peer) + async with timeout_after(120 if peer.is_tor else 30): + async with PeerSession(peer.host, port, + **kwargs) as session: + await self._verify_peer(session, peer) is_good = True break except BadPeerError as e: - self.logger.error(f'[{peer}] marking bad: ({e})') + self.logger.error(f'{peer_text} marking bad: ({e})') peer.mark_bad() break except RPCError as e: - self.logger.error(f'[{peer}] RPC error: {e.message} ' + self.logger.error(f'{peer_text} RPC error: {e.message} ' f'({e.code})') - except asyncio.TimeoutError as e: - self.logger.error(f'[{peer}] {e}') + except TaskTimeout as e: + self.logger.error(f'{peer_text} timed out after {e.args[0]}s') except (OSError, SOCKSError, ConnectionError) as e: - self.logger.info(f'[{peer}] {kind} connection to ' - f'port {port} failed: {e}') - - now = time.time() - if self.env.force_proxy or peer.is_tor: - how = f'via {kind} over Tor' - else: - how = f'via {kind} at {peer.ip_addr}' - status = 'verified' if is_good else 'failed to verify' - elapsed = now - peer.last_try - self.logger.info(f'{status} {peer} {how} in {elapsed:.1f}s') + self.logger.info(f'{peer_text} {e}') if is_good: + now = time.time() + elapsed = now - peer.last_try + self.logger.info(f'{peer_text} verified in {elapsed:.1f}s') peer.try_count = 0 peer.last_good = now peer.source = 'peer' @@ -283,12 +279,9 @@ class PeerManager(object): if address: peer.ip_addr = address[0] - timeout = 20 if peer.is_tor else 10 - # server.version goes first message = 'server.version' - result = await session.send_request( - message, self.server_version_args, timeout=timeout) + result = await session.send_request(message, self.server_version_args) assert_good(message, result, list) # Protocol version 1.1 returns a pair with the version first @@ -299,13 +292,14 @@ class PeerManager(object): peer.features['server_version'] = server_version ptuple = protocol_tuple(protocol_version) - await self._send_headers_subscribe(session, peer, timeout, ptuple) - await self._send_server_features(session, peer, timeout) - await self._send_peers_subscribe(session, peer, timeout) + # FIXME: make these concurrent with first exception preserved + await self._send_headers_subscribe(session, peer, ptuple) + await self._send_server_features(session, peer) + await self._send_peers_subscribe(session, peer) - async def _send_headers_subscribe(self, session, peer, timeout, ptuple): + async def _send_headers_subscribe(self, session, peer, ptuple): message = 'blockchain.headers.subscribe' - result = await session.send_request(message, timeout=timeout) + result = await session.send_request(message) assert_good(message, result, dict) our_height = self.chain_state.db_height() @@ -325,8 +319,7 @@ class PeerManager(object): if ptuple >= (1, 4): ours = raw_header.hex() message = 'blockchain.block.header' - theirs = await session.send_request(message, [check_height], - timeout=timeout) + theirs = await session.send_request(message, [check_height]) assert_good(message, theirs, str) if ours != theirs: raise BadPeerError(f'our header {ours} and ' @@ -335,17 +328,16 @@ class PeerManager(object): ours = self.env.coin.electrum_header(raw_header, check_height) ours = ours.get('prev_block_hash') message = 'blockchain.block.get_header' - theirs = await session.send_request(message, [check_height], - timeout=timeout) + theirs = await session.send_request(message, [check_height]) assert_good(message, theirs, dict) theirs = theirs.get('prev_block_hash') if ours != theirs: raise BadPeerError(f'our header hash {ours} and ' f'theirs {theirs} differ') - async def _send_server_features(self, session, peer, timeout): + async def _send_server_features(self, session, peer): message = 'server.features' - features = await session.send_request(message, timeout=timeout) + features = await session.send_request(message) assert_good(message, features, dict) hosts = [host.lower() for host in features.get('hosts', {})] if self.env.coin.GENESIS_HASH != features.get('genesis_hash'): @@ -355,9 +347,9 @@ class PeerManager(object): else: raise BadPeerError(f'not listed in own hosts list {hosts}') - async def _send_peers_subscribe(self, session, peer, timeout): + async def _send_peers_subscribe(self, session, peer): message = 'server.peers.subscribe' - raw_peers = await session.send_request(message, timeout=timeout) + raw_peers = await session.send_request(message) assert_good(message, raw_peers, list) # Check the peers list we got from a remote peer. @@ -377,8 +369,7 @@ class PeerManager(object): return self.logger.info(f'registering ourself with {peer}') # We only care to wait for the response - await session.send_request('server.add_peer', [features], - timeout=timeout) + await session.send_request('server.add_peer', [features]) # # External interface diff --git a/electrumx/server/session.py b/electrumx/server/session.py index 98a4cb4..e64fbdb 100644 --- a/electrumx/server/session.py +++ b/electrumx/server/session.py @@ -18,7 +18,10 @@ import time from collections import defaultdict from functools import partial -from aiorpcx import ServerSession, JSONRPCAutoDetect, RPCError, TaskGroup +from aiorpcx import ( + ServerSession, JSONRPCAutoDetect, TaskGroup, handler_invocation, + RPCError, Request, ignore_after +) import electrumx import electrumx.lib.text as text @@ -112,7 +115,6 @@ class SessionManager(object): self.max_sessions = env.max_sessions self.low_watermark = self.max_sessions * 19 // 20 self.max_subs = env.max_subs - self.next_log_sessions = 0 self.cur_group = SessionGroup(0) self.state = self.CATCHING_UP self.txs_sent = 0 @@ -131,7 +133,8 @@ class SessionManager(object): # Set up the RPC request handlers cmds = ('add_peer daemon_url disconnect getinfo groups log peers ' 'query reorg sessions stop'.split()) - self.rpc_handlers = {cmd: getattr(self, 'rpc_' + cmd) for cmd in cmds} + LocalRPC.request_handlers = {cmd: getattr(self, 'rpc_' + cmd) + for cmd in cmds} async def _start_server(self, kind, *args, **kw_args): loop = asyncio.get_event_loop() @@ -169,7 +172,7 @@ class SessionManager(object): self.state = self.LISTENING self.server_listening.set() - def _close_servers(self, kinds): + async def _close_servers(self, kinds): '''Close the servers of the given kinds (TCP etc.).''' if kinds: self.logger.info('closing down {} listening servers' @@ -178,30 +181,27 @@ class SessionManager(object): server = self.servers.pop(kind, None) if server: server.close() + await server.wait_closed() - async def _housekeeping(self): - '''Regular housekeeping checks.''' - n = 0 + async def _restart_if_paused(self): while True: - n += 1 await asyncio.sleep(15) - if n % 10 == 0: - self._clear_stale_sessions() - # Start listening for incoming connections if paused and # session count has fallen if (self.state == self.PAUSED and len(self.sessions) <= self.low_watermark): await self._start_external_servers() - # Periodically log sessions - if self.env.log_sessions and time.time() > self.next_log_sessions: - if self.next_log_sessions: - data = self._session_data(for_log=True) - for line in text.sessions_lines(data): - self.logger.info(line) - self.logger.info(json.dumps(self._get_info())) - self.next_log_sessions = time.time() + self.env.log_sessions + async def _log_sessions(self): + '''Periodically log sessions.''' + log_interval = self.env.log_sessions + if log_interval: + while True: + await asyncio.sleep(log_interval) + data = self._session_data(for_log=True) + for line in text.sessions_lines(data): + self.logger.info(line) + self.logger.info(json.dumps(self._get_info())) def _group_map(self): group_map = defaultdict(list) @@ -223,7 +223,7 @@ class SessionManager(object): return session return None - def _for_each_session(self, session_ids, operation): + async def _for_each_session(self, session_ids, operation): if not isinstance(session_ids, list): raise RPCError(BAD_REQUEST, 'expected a list of session IDs') @@ -231,42 +231,41 @@ class SessionManager(object): for session_id in session_ids: session = self._lookup_session(session_id) if session: - result.append(operation(session)) + result.append(await operation(session)) else: - result.append('unknown session: {}'.format(session_id)) + result.append(f'unknown session: {session_id}') return result - def _close_session(self, session): - '''Close the session's transport.''' - session.close() - return 'disconnected {:d}'.format(session.session_id) - - def _clear_stale_sessions(self): + async def _clear_stale_sessions(self): '''Cut off sessions that haven't done anything for 10 minutes.''' - now = time.time() - stale_cutoff = now - self.env.session_timeout - - stale = [] - for session in self.sessions: - if session.is_closing(): - session.abort() - elif session.last_recv < stale_cutoff: - self._close_session(session) - stale.append(session.session_id) - if stale: - self.logger.info('closing stale connections {}'.format(stale)) - - # Consolidate small groups - bw_limit = self.env.bandwidth_limit - group_map = self._group_map() - groups = [group for group, sessions in group_map.items() - if len(sessions) <= 5 and - sum(s.bw_charge for s in sessions) < bw_limit] - if len(groups) > 1: - new_group = groups[-1] - for group in groups: - for session in group_map[group]: - session.group = new_group + while True: + await asyncio.sleep(60) + stale_cutoff = time.time() - self.env.session_timeout + stale_sessions = [session for session in self.sessions + if session.last_recv < stale_cutoff] + if stale_sessions: + text = ', '.join(str(session.session_id) + for session in stale_sessions) + self.logger.info(f'closing stale connections {text}') + # Give the sockets some time to close gracefully + async with ignore_after(20): + async with TaskGroup() as group: + for session in stale_sessions: + group.spawn(session.close()) + for session in stale_sessions: + session.abort() + + # Consolidate small groups + bw_limit = self.env.bandwidth_limit + group_map = self._group_map() + groups = [group for group, sessions in group_map.items() + if len(sessions) <= 5 and + sum(s.bw_charge for s in sessions) < bw_limit] + if len(groups) > 1: + new_group = groups[-1] + for group in groups: + for session in group_map[group]: + session.group = new_group def _get_info(self): '''A summary of server state.''' @@ -275,7 +274,7 @@ class SessionManager(object): result.update({ 'version': electrumx.version, 'closing': len([s for s in self.sessions if s.is_closing()]), - 'errors': sum(s.rpc.errors for s in self.sessions), + 'errors': sum(s.errors for s in self.sessions), 'groups': len(group_map), 'logged': len([s for s in self.sessions if s.log_me]), 'paused': sum(s.paused for s in self.sessions), @@ -334,26 +333,33 @@ class SessionManager(object): await self.peer_mgr.add_localRPC_peer(real_name) return "peer '{}' added".format(real_name) - def rpc_disconnect(self, session_ids): + async def rpc_disconnect(self, session_ids): '''Disconnect sesssions. session_ids: array of session IDs ''' - return self._for_each_session(session_ids, self._close_session) + async def close(session): + '''Close the session's transport.''' + async with ignore_after(2): + await session.close() + session.abort() + return f'disconnected {session.session_id}' + + return await self._for_each_session(session_ids, close) - def rpc_log(self, session_ids): + async def rpc_log(self, session_ids): '''Toggle logging of sesssions. session_ids: array of session IDs ''' - def toggle_logging(session): + async def toggle_logging(session): '''Toggle logging of the session.''' session.toggle_logging() - return 'log {:d}: {}'.format(session.session_id, session.log_me) + return f'log {session.session_id}: {session.log_me}' - return self._for_each_session(session_ids, toggle_logging) + return await self._for_each_session(session_ids, toggle_logging) - def rpc_daemon_url(self, daemon_url): + async def rpc_daemon_url(self, daemon_url): '''Replace the daemon URL.''' daemon_url = daemon_url or self.env.daemon_url try: @@ -362,20 +368,20 @@ class SessionManager(object): raise RPCError(BAD_REQUEST, f'an error occured: {e}') return f'now using daemon at {daemon_url}' - def rpc_stop(self): + async def rpc_stop(self): '''Shut down the server cleanly.''' self.shutdown_event.set() return 'stopping' - def rpc_getinfo(self): + async def rpc_getinfo(self): '''Return summary information about the server process.''' return self._get_info() - def rpc_groups(self): + async def rpc_groups(self): '''Return statistics about the session groups.''' return self._group_data() - def rpc_peers(self): + async def rpc_peers(self): '''Return a list of data about server peers.''' return self.peer_mgr.rpc_data() @@ -383,11 +389,11 @@ class SessionManager(object): '''Return a list of data about server peers.''' return await self.chain_state.query(items, limit) - def rpc_sessions(self): + async def rpc_sessions(self): '''Return statistics about connected sessions.''' return self._session_data(for_log=False) - def rpc_reorg(self, count): + async def rpc_reorg(self, count): '''Force a reorg of the given number of blocks. count: number of blocks to reorg @@ -424,17 +430,19 @@ class SessionManager(object): await self._start_external_servers() # Peer discovery should start after the external servers # because we connect to ourself - async with TaskGroup() as group: + async with TaskGroup(wait=object) as group: await group.spawn(self.peer_mgr.discover_peers(group)) - await group.spawn(self._housekeeping()) + await group.spawn(self._clear_stale_sessions()) + await group.spawn(self._log_sessions()) + await group.spawn(self._restart_if_paused()) finally: # Close servers and sessions self.state = self.SHUTTING_DOWN - self._close_servers(list(self.servers.keys())) + await self._close_servers(list(self.servers.keys())) for session in self.sessions: session.abort() for session in list(self.sessions): - await session.wait_closed() + await session.close() def session_count(self): '''The number of connections that we've sent something to.''' @@ -454,7 +462,8 @@ class SessionManager(object): session.logger.info('maximum sessions {:,d} reached, stopping new ' 'connections until count drops to {:,d}' .format(self.max_sessions, self.low_watermark)) - self._close_servers(['TCP', 'SSL']) + loop = asyncio.get_event_loop() + loop.call_soon(self._close_servers(['TCP', 'SSL'])) gid = int(session.start_time - self.start_time) // 900 if self.cur_group.gid != gid: self.cur_group = SessionGroup(gid) @@ -484,7 +493,7 @@ class SessionBase(ServerSession): session_counter = itertools.count() def __init__(self, session_mgr, chain_state, mempool, peer_mgr, kind): - super().__init__(rpc_protocol=JSONRPCAutoDetect) + super().__init__(protocol=JSONRPCAutoDetect) self.logger = util.class_logger(__name__, self.__class__.__name__) self.session_mgr = session_mgr self.chain_state = chain_state @@ -498,7 +507,7 @@ class SessionBase(ServerSession): self.txs_sent = 0 self.log_me = False self.bw_limit = self.env.bandwidth_limit - self._orig_mr = self.rpc.message_received + self._crm_original = self.connection.receive_message async def notify(self, height, touched): pass @@ -510,16 +519,16 @@ class SessionBase(ServerSession): return 'xx.xx.xx.xx:xx' return super().peer_address_str() - def message_received(self, message): + def receive_message(self, message): self.logger.info(f'processing {message}') - self._orig_mr(message) + self._crm_original(message) def toggle_logging(self): self.log_me = not self.log_me if self.log_me: - self.rpc.message_received = self.message_received + self.connection.receive_message = self.receive_message else: - self.rpc.message_received = self._orig_mr + self.connection.receive_message = self._crm_original def flags(self): '''Status flags.''' @@ -537,7 +546,6 @@ class SessionBase(ServerSession): self.session_id = next(self.session_counter) context = {'conn_id': f'{self.session_id}'} self.logger = util.ConnectionLogger(self.logger, context) - self.rpc.logger = self.logger self.group = self.session_mgr.add_session(self) self.logger.info(f'{self.kind} {self.peer_address_str()}, ' f'{self.session_mgr.session_count():,d} total') @@ -559,7 +567,7 @@ class SessionBase(ServerSession): self.logger.info(msg) def count_pending_items(self): - return self.rpc.pending_requests + return len(self.connection.pending_requests()) def semaphore(self): return Semaphores([self.concurrency.semaphore, self.group.semaphore]) @@ -567,6 +575,15 @@ class SessionBase(ServerSession): def sub_count(self): return 0 + async def handle_request(self, request): + '''Return the async handler for the given request method.''' + if isinstance(request, Request): + handler = self.request_handlers.get(request.method) + else: + handler = None + coro = handler_invocation(handler, request)() + return await coro + class ElectrumX(SessionBase): '''A TCP server that handles incoming Electrum connections.''' @@ -579,12 +596,12 @@ class ElectrumX(SessionBase): self.subscribe_headers = False self.subscribe_headers_raw = False self.notified_height = None - self.max_response_size = self.env.max_send + self.connection._max_response_size = self.env.max_send self.max_subs = self.env.max_session_subs self.hashX_subs = {} self.sv_seen = False self.mempool_statuses = {} - self.set_protocol_handlers(self.PROTOCOL_MIN) + self.set_request_handlers(self.PROTOCOL_MIN) self.db_height = self.chain_state.db_height @classmethod @@ -606,6 +623,9 @@ class ElectrumX(SessionBase): 'hash_function': 'sha256', } + async def server_features_async(self): + return self.server_features(self.env) + @classmethod def server_version_args(cls): '''The arguments to a server.version RPC call to a peer.''' @@ -647,7 +667,7 @@ class ElectrumX(SessionBase): method = 'blockchain.scripthash.subscribe' else: method = 'blockchain.address.subscribe' - self.send_notification(method, (alias, status)) + await self.send_notification(method, (alias, status)) if changed: es = '' if len(changed) == 1 else 'es' @@ -669,7 +689,8 @@ class ElectrumX(SessionBase): self.notified_height = height if self.subscribe_headers: args = (self.subscribe_headers_result(height), ) - self.send_notification('blockchain.headers.subscribe', args) + await self.send_notification('blockchain.headers.subscribe', + args) touched = touched.intersection(self.hashX_subs) if touched or (height_changed and self.mempool_statuses): @@ -707,15 +728,15 @@ class ElectrumX(SessionBase): self.notified_height = self.db_height() return self.subscribe_headers_result(self.notified_height) - def headers_subscribe(self): + async def headers_subscribe(self): '''Subscribe to get raw headers of new blocks.''' return self._headers_subscribe(True) - def headers_subscribe_True(self, raw=True): + async def headers_subscribe_True(self, raw=True): '''Subscribe to get headers of new blocks.''' return self._headers_subscribe(raw) - def headers_subscribe_False(self, raw=False): + async def headers_subscribe_False(self, raw=False): '''Subscribe to get headers of new blocks.''' return self._headers_subscribe(raw) @@ -723,7 +744,7 @@ class ElectrumX(SessionBase): '''Add a peer (but only if the peer resolves to the source).''' return await self.peer_mgr.on_add_peer(features, self.peer_address()) - def peers_subscribe(self): + async def peers_subscribe(self): '''Return the server peers as a list of (ip, host, details) tuples.''' return self.peer_mgr.on_peers_subscribe(self.is_tor()) @@ -875,7 +896,7 @@ class ElectrumX(SessionBase): 'root': hash_to_hex_str(root), } - def block_header(self, height, cp_height=0): + async def block_header(self, height, cp_height=0): '''Return a raw block header as a hexadecimal string, or as a dictionary with a merkle proof.''' height = non_negative_integer(height) @@ -887,13 +908,13 @@ class ElectrumX(SessionBase): result.update(self._merkle_proof(cp_height, height)) return result - def block_header_13(self, height): + async def block_header_13(self, height): '''Return a raw block header as a hexadecimal string. height: the header's height''' return self.block_header(height) - def block_headers(self, start_height, count, cp_height=0): + async def block_headers(self, start_height, count, cp_height=0): '''Return count concatenated block headers as hex for the main chain; starting at start_height. @@ -913,10 +934,10 @@ class ElectrumX(SessionBase): result.update(self._merkle_proof(cp_height, last_height)) return result - def block_headers_12(self, start_height, count): - return self.block_headers(start_height, count) + async def block_headers_12(self, start_height, count): + return await self.block_headers(start_height, count) - def block_get_chunk(self, index): + async def block_get_chunk(self, index): '''Return a chunk of block headers as a hexadecimal string. index: the chunk index''' @@ -926,7 +947,7 @@ class ElectrumX(SessionBase): headers, count = self.chain_state.read_headers(start_height, size) return headers.hex() - def block_get_header(self, height): + async def block_get_header(self, height): '''The deserialized header at a given height. height: the header's height''' @@ -959,7 +980,7 @@ class ElectrumX(SessionBase): banner = banner.replace(*pair) return banner - def donation_address(self): + async def donation_address(self): '''Return the donation address as a string, empty if there is none.''' return self.env.donation_address @@ -996,13 +1017,13 @@ class ElectrumX(SessionBase): number = non_negative_integer(number) return await self.daemon_request('estimatefee', [number]) - def ping(self): + async def ping(self): '''Serves as a connection keep-alive mechanism and for the client to confirm the server is still responding. ''' return None - def server_version(self, client_name='', protocol_version=None): + async def server_version(self, client_name='', protocol_version=None): '''Returns the server version as a string. client_name: a string identifying the client @@ -1033,7 +1054,7 @@ class ElectrumX(SessionBase): self.close_after_send = True raise RPCError(BAD_REQUEST, f'unsupported protocol version: {protocol_version}') - self.set_protocol_handlers(ptuple) + self.set_request_handlers(ptuple) return (electrumx.version, self.protocol_version_string()) @@ -1129,7 +1150,7 @@ class ElectrumX(SessionBase): else: return tx_hash - def set_protocol_handlers(self, ptuple): + def set_request_handlers(self, ptuple): self.protocol_tuple = ptuple handlers = { @@ -1148,7 +1169,7 @@ class ElectrumX(SessionBase): 'server.add_peer': self.add_peer, 'server.banner': self.banner, 'server.donation_address': self.donation_address, - 'server.features': partial(self.server_features, self.env), + 'server.features': self.server_features_async, 'server.peers.subscribe': self.peers_subscribe, 'server.version': self.server_version, } @@ -1185,11 +1206,7 @@ class ElectrumX(SessionBase): 'blockchain.address.subscribe': self.address_subscribe, }) - self.electrumx_handlers = handlers - - def request_handler(self, method): - '''Return the async handler for the given request method.''' - return self.electrumx_handlers.get(method) + self.request_handlers = handlers class LocalRPC(SessionBase): @@ -1198,15 +1215,11 @@ class LocalRPC(SessionBase): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.client = 'RPC' - self.max_response_size = 0 + self.connection._max_response_size = 0 def protocol_version_string(self): return 'RPC' - def request_handler(self, method): - '''Return the async handler for the given request method.''' - return self.session_mgr.rpc_handlers.get(method) - class DashElectrumX(ElectrumX): '''A TCP server that handles incoming Electrum Dash connections.''' @@ -1215,9 +1228,9 @@ class DashElectrumX(ElectrumX): super().__init__(*args, **kwargs) self.mns = set() - def set_protocol_handlers(self, ptuple): - super().set_protocol_handlers(ptuple) - self.electrumx_handlers.update({ + def set_request_handlers(self, ptuple): + super().set_request_handlers(ptuple) + self.request_handlers.update({ 'masternode.announce.broadcast': self.masternode_announce_broadcast, 'masternode.subscribe': self.masternode_subscribe, @@ -1230,8 +1243,8 @@ class DashElectrumX(ElectrumX): for mn in self.mns: status = await self.daemon_request('masternode_list', ['status', mn]) - self.send_notification('masternode.subscribe', - [mn, status.get(mn)]) + await self.send_notification('masternode.subscribe', + [mn, status.get(mn)]) # Masternode command handlers async def masternode_announce_broadcast(self, signmnb): diff --git a/setup.py b/setup.py index 9868189..ee25e3c 100644 --- a/setup.py +++ b/setup.py @@ -12,7 +12,7 @@ setuptools.setup( # "blake256" package is required to sync Decred network. # "xevan_hash" package is required to sync Xuez network. # "groestlcoin_hash" package is required to sync Groestlcoin network. - install_requires=['aiorpcX == 0.5.9', 'attrs>=15', + install_requires=['aiorpcX >= 0.6.0', 'aiorpcX < 0.7.0', 'attrs>=15', 'plyvel', 'pylru', 'aiohttp >= 2'], packages=setuptools.find_packages(include=('electrumx*',)), description='ElectrumX Server',