diff --git a/docs/changelog.rst b/docs/changelog.rst index fae97d5..fd9f3a4 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -11,6 +11,8 @@ Version 1.8.1 (in development) ============================== +* fix `#557`_ + Version 1.8 (06 Aug 2018) ========================== @@ -199,3 +201,4 @@ bitcoincash:qzxpdlt8ehu9ehftw6rqsy2jgfq4nsltxvhrdmdfpn .. _#523: https://github.com/kyuupichan/electrumx/issues/523 .. _#534: https://github.com/kyuupichan/electrumx/issues/534 .. _#538: https://github.com/kyuupichan/electrumx/issues/538 +.. _#557: https://github.com/kyuupichan/electrumx/issues/557 diff --git a/electrumx/lib/server_base.py b/electrumx/lib/server_base.py index 6d6f66a..602ae90 100644 --- a/electrumx/lib/server_base.py +++ b/electrumx/lib/server_base.py @@ -29,6 +29,7 @@ class ServerBase(object): ''' SUPPRESS_MESSAGE_REGEX = re.compile('SSH handshake') + SUPPRESS_TASK_REGEX = re.compile('accept_connection2') PYTHON_MIN_VERSION = (3, 6) def __init__(self, env): @@ -68,6 +69,8 @@ class ServerBase(object): message = context.get('message') if message and self.SUPPRESS_MESSAGE_REGEX.match(message): return + if self.SUPPRESS_TASK_REGEX.match(repr(context.get('task'))): + return loop.default_exception_handler(context) async def _main(self, loop): diff --git a/electrumx/server/chain_state.py b/electrumx/server/chain_state.py index 06c2d8c..18ee7d7 100644 --- a/electrumx/server/chain_state.py +++ b/electrumx/server/chain_state.py @@ -6,8 +6,6 @@ # and warranty status of this software. -import asyncio - from electrumx.lib.hash import hash_to_hex_str @@ -72,13 +70,9 @@ class ChainState(object): except ValueError: pass - try: - hashX = coin.address_to_hashX(arg) - lines.append(f'Address: {arg}') - return hashX - except Base58Error: - print(f'Ingoring unknown arg: {arg}') - return None + hashX = coin.address_to_hashX(arg) + lines.append(f'Address: {arg}') + return hashX for arg in args: hashX = arg_to_hashX(arg) diff --git a/electrumx/server/controller.py b/electrumx/server/controller.py index da07089..6424d0e 100644 --- a/electrumx/server/controller.py +++ b/electrumx/server/controller.py @@ -80,8 +80,8 @@ class Controller(ServerBase): '''Start the RPC server and wait for the mempool to synchronize. Then start serving external clients. ''' - if not (0, 6, 2) <= aiorpcx_version < (0, 7): - raise RuntimeError('aiorpcX version 0.6.x with x>=2 required') + if not (0, 7) <= aiorpcx_version < (0, 8): + raise RuntimeError('aiorpcX version 0.7.x required') env = self.env min_str, max_str = env.coin.SESSIONCLS.protocol_min_max_strings() diff --git a/electrumx/server/peers.py b/electrumx/server/peers.py index 0c7ba1c..cd17cc6 100644 --- a/electrumx/server/peers.py +++ b/electrumx/server/peers.py @@ -16,8 +16,8 @@ from collections import defaultdict, Counter from aiorpcx import (ClientSession, SOCKSProxy, Notification, handler_invocation, - SOCKSError, RPCError, TaskTimeout, - TaskGroup, ignore_after, timeout_after) + SOCKSError, RPCError, TaskTimeout, TaskGroup, Event, + sleep, run_in_thread, ignore_after, timeout_after) from electrumx.lib.peer import Peer from electrumx.lib.util import class_logger, protocol_tuple @@ -74,7 +74,7 @@ class PeerManager(object): self.peers = set() self.permit_onion_peer_time = time.time() self.proxy = None - self.task_group = None + self.group = TaskGroup() def _my_clearnet_peer(self): '''Returns the clearnet peer representing this server, if any.''' @@ -150,7 +150,7 @@ class PeerManager(object): self.logger.info(f'detected {proxy}') return self.logger.info('no proxy detected, will try later') - await asyncio.sleep(900) + await sleep(900) async def _note_peers(self, peers, limit=2, check_ports=False, source=None): @@ -178,9 +178,9 @@ class PeerManager(object): use_peers = new_peers for peer in use_peers: self.logger.info(f'accepted new peer {peer} from {source}') - peer.retry_event = asyncio.Event() + peer.retry_event = Event() self.peers.add(peer) - await self.task_group.spawn(self._monitor_peer(peer)) + await self.group.spawn(self._monitor_peer(peer)) async def _monitor_peer(self, peer): # Stop monitoring if we were dropped (a duplicate peer) @@ -231,16 +231,14 @@ class PeerManager(object): is_good = True break except BadPeerError as e: - self.logger.error(f'{peer_text} marking bad: ({e!r})') + self.logger.error(f'{peer_text} marking bad: ({e})') peer.mark_bad() break except RPCError as e: self.logger.error(f'{peer_text} RPC error: {e.message} ' f'({e.code})') - except TaskTimeout as e: - self.logger.error(f'{peer_text} timed out after {e.args[0]}s') - except (OSError, SOCKSError, ConnectionError) as e: - self.logger.info(f'{peer_text} {e!r}') + except (OSError, SOCKSError, ConnectionError, TaskTimeout) as e: + self.logger.info(f'{peer_text} {e}') if is_good: now = time.time() @@ -292,10 +290,15 @@ class PeerManager(object): peer.features['server_version'] = server_version ptuple = protocol_tuple(protocol_version) - # FIXME: make these concurrent with first exception preserved - await self._send_headers_subscribe(session, peer, ptuple) - await self._send_server_features(session, peer) - await self._send_peers_subscribe(session, peer) + # Do the rest concurrently + async with TaskGroup() as group: + await group.spawn(self._send_headers_subscribe(session, peer, + ptuple)) + await group.spawn(self._send_server_features(session, peer)) + await group.spawn(self._send_peers_subscribe(session, peer)) + # If any task errors out; bail + async for task in group: + task.result() async def _send_headers_subscribe(self, session, peer, ptuple): message = 'blockchain.headers.subscribe' @@ -374,7 +377,7 @@ class PeerManager(object): # # External interface # - async def discover_peers(self, task_group): + async def discover_peers(self): '''Perform peer maintenance. This includes 1) Forgetting unreachable peers. @@ -387,9 +390,14 @@ class PeerManager(object): self.logger.info(f'beginning peer discovery. Force use of ' f'proxy: {self.env.force_proxy}') - self.task_group = task_group - await task_group.spawn(self._detect_proxy()) - await task_group.spawn(self._import_peers()) + forever = Event() + async with self.group as group: + await group.spawn(forever.wait()) + await group.spawn(self._detect_proxy()) + await group.spawn(self._import_peers()) + # Consume tasks as they complete + async for task in group: + task.result() def info(self): '''The number of peers.''' diff --git a/electrumx/server/session.py b/electrumx/server/session.py index afa61f5..cbd9a18 100644 --- a/electrumx/server/session.py +++ b/electrumx/server/session.py @@ -20,15 +20,15 @@ from collections import defaultdict from functools import partial from aiorpcx import ( - ServerSession, JSONRPCAutoDetect, TaskGroup, handler_invocation, - RPCError, Request, ignore_after + ServerSession, JSONRPCAutoDetect, JSONRPCConnection, + TaskGroup, handler_invocation, RPCError, Request, ignore_after ) import electrumx import electrumx.lib.text as text import electrumx.lib.util as util from electrumx.lib.hash import (sha256, hash_to_hex_str, hex_str_to_hash, - HASHX_LEN) + HASHX_LEN, Base58Error) from electrumx.lib.peer import Peer from electrumx.server.daemon import DaemonError from electrumx.server.peers import PeerManager @@ -259,7 +259,7 @@ class SessionManager(object): # Give the sockets some time to close gracefully async with TaskGroup() as group: for session in stale_sessions: - await group.spawn(session.close(force_after=30)) + await group.spawn(session.close()) # Consolidate small groups bw_limit = self.env.bandwidth_limit @@ -391,7 +391,10 @@ class SessionManager(object): async def rpc_query(self, items, limit): '''Return a list of data about server peers.''' - return await self.chain_state.query(items, limit) + try: + return await self.chain_state.query(items, limit) + except Base58Error as e: + raise RPCError(BAD_REQUEST, e.args[0]) from None async def rpc_sessions(self): '''Return statistics about connected sessions.''' @@ -434,8 +437,8 @@ class SessionManager(object): await self._start_external_servers() # Peer discovery should start after the external servers # because we connect to ourself - async with TaskGroup(wait=object) as group: - await group.spawn(self.peer_mgr.discover_peers(group)) + async with TaskGroup() as group: + await group.spawn(self.peer_mgr.discover_peers()) await group.spawn(self._clear_stale_sessions()) await group.spawn(self._log_sessions()) await group.spawn(self._restart_if_paused()) @@ -445,7 +448,7 @@ class SessionManager(object): await self._close_servers(list(self.servers.keys())) async with TaskGroup() as group: for session in list(self.sessions): - await group.spawn(session.close(force_after=0.1)) + await group.spawn(session.close(force_after=1)) def session_count(self): '''The number of connections that we've sent something to.''' @@ -516,7 +519,8 @@ class SessionBase(ServerSession): session_counter = itertools.count() def __init__(self, session_mgr, chain_state, mempool, peer_mgr, kind): - super().__init__(protocol=JSONRPCAutoDetect) + connection = JSONRPCConnection(JSONRPCAutoDetect) + super().__init__(connection=connection) self.logger = util.class_logger(__name__, self.__class__.__name__) self.session_mgr = session_mgr self.chain_state = chain_state @@ -620,7 +624,7 @@ class ElectrumX(SessionBase): self.subscribe_headers = False self.subscribe_headers_raw = False self.notified_height = None - self.connection._max_response_size = self.env.max_send + self.connection.max_response_size = self.env.max_send self.max_subs = self.env.max_session_subs self.hashX_subs = {} self.sv_seen = False @@ -658,13 +662,6 @@ class ElectrumX(SessionBase): def protocol_version_string(self): return util.version_string(self.protocol_tuple) - # FIXME: make this the aiorpcx API for version 0.7 - async def close(self, force_after=30): - '''Close the connection and return when closed.''' - async with ignore_after(force_after): - await super().close() - self.abort() - async def daemon_request(self, method, *args): '''Catch a DaemonError and convert it to an RPCError.''' try: diff --git a/electrumx_rpc b/electrumx_rpc index a520f94..cd7a1ac 100755 --- a/electrumx_rpc +++ b/electrumx_rpc @@ -113,7 +113,7 @@ def main(): # aiorpcX makes this so easy... async def send_request(): try: - async with timeout_after(1): + async with timeout_after(15): async with ClientSession('localhost', port) as session: result = await session.send_request(method, args) if method in ('query', ): @@ -128,8 +128,6 @@ def main(): except OSError: print('cannot connect - is ElectrumX catching up, not running, or ' f'is {port} the wrong RPC port?') - except TaskTimeout as e: - print(f'request timed out after {e.args[0]}s') except Exception as e: print(f'error making request: {e!r}')