From 74e6fe416f187f9891e14388f1d7a50314f4f0ee Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Tue, 7 Aug 2018 10:46:12 +0900 Subject: [PATCH 1/9] Move close() up to the base class Fixes #557 --- docs/changelog.rst | 3 +++ electrumx/server/session.py | 14 +++++++------- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/docs/changelog.rst b/docs/changelog.rst index fae97d5..fd9f3a4 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -11,6 +11,8 @@ Version 1.8.1 (in development) ============================== +* fix `#557`_ + Version 1.8 (06 Aug 2018) ========================== @@ -199,3 +201,4 @@ bitcoincash:qzxpdlt8ehu9ehftw6rqsy2jgfq4nsltxvhrdmdfpn .. _#523: https://github.com/kyuupichan/electrumx/issues/523 .. _#534: https://github.com/kyuupichan/electrumx/issues/534 .. _#538: https://github.com/kyuupichan/electrumx/issues/538 +.. _#557: https://github.com/kyuupichan/electrumx/issues/557 diff --git a/electrumx/server/session.py b/electrumx/server/session.py index 23a5187..4676ef1 100644 --- a/electrumx/server/session.py +++ b/electrumx/server/session.py @@ -584,6 +584,13 @@ class SessionBase(ServerSession): def sub_count(self): return 0 + # FIXME: make this the aiorpcx API for version 0.7 + async def close(self, force_after=30): + '''Close the connection and return when closed.''' + async with ignore_after(force_after): + await super().close() + self.abort() + async def handle_request(self, request): '''Handle an incoming request. ElectrumX doesn't receive notifications from client sessions. @@ -645,13 +652,6 @@ class ElectrumX(SessionBase): def protocol_version_string(self): return util.version_string(self.protocol_tuple) - # FIXME: make this the aiorpcx API for version 0.7 - async def close(self, force_after=30): - '''Close the connection and return when closed.''' - async with ignore_after(force_after): - await super().close() - self.abort() - async def daemon_request(self, method, *args): '''Catch a DaemonError and convert it to an RPCError.''' try: From a3afab83d648696c60974aacaba93533f38da74e Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Tue, 7 Aug 2018 21:02:35 +0900 Subject: [PATCH 2/9] Better handle bad input to query RPC call Based on #559. Also: - remove unused import - restore timeout to 15s - handle invalid input by catching at a higher level and converting to RPCError --- electrumx/server/chain_state.py | 12 +++--------- electrumx/server/session.py | 7 +++++-- electrumx_rpc | 2 +- 3 files changed, 9 insertions(+), 12 deletions(-) diff --git a/electrumx/server/chain_state.py b/electrumx/server/chain_state.py index 38b4e7b..7bf2374 100644 --- a/electrumx/server/chain_state.py +++ b/electrumx/server/chain_state.py @@ -6,8 +6,6 @@ # and warranty status of this software. -import asyncio - from aiorpcx import run_in_thread from electrumx.lib.hash import hash_to_hex_str @@ -95,13 +93,9 @@ class ChainState(object): except ValueError: pass - try: - hashX = coin.address_to_hashX(arg) - lines.append(f'Address: {arg}') - return hashX - except Base58Error: - print(f'Ingoring unknown arg: {arg}') - return None + hashX = coin.address_to_hashX(arg) + lines.append(f'Address: {arg}') + return hashX for arg in args: hashX = arg_to_hashX(arg) diff --git a/electrumx/server/session.py b/electrumx/server/session.py index 4676ef1..f058b18 100644 --- a/electrumx/server/session.py +++ b/electrumx/server/session.py @@ -28,7 +28,7 @@ import electrumx import electrumx.lib.text as text import electrumx.lib.util as util from electrumx.lib.hash import (sha256, hash_to_hex_str, hex_str_to_hash, - HASHX_LEN) + HASHX_LEN, Base58Error) from electrumx.lib.peer import Peer from electrumx.server.daemon import DaemonError from electrumx.server.peers import PeerManager @@ -384,7 +384,10 @@ class SessionManager(object): async def rpc_query(self, items, limit): '''Return a list of data about server peers.''' - return await self.chain_state.query(items, limit) + try: + return await self.chain_state.query(items, limit) + except Base58Error as e: + raise RPCError(BAD_REQUEST, e.args[0]) from None async def rpc_sessions(self): '''Return statistics about connected sessions.''' diff --git a/electrumx_rpc b/electrumx_rpc index a520f94..c8bc1d0 100755 --- a/electrumx_rpc +++ b/electrumx_rpc @@ -113,7 +113,7 @@ def main(): # aiorpcX makes this so easy... async def send_request(): try: - async with timeout_after(1): + async with timeout_after(15): async with ClientSession('localhost', port) as session: result = await session.send_request(method, args) if method in ('query', ): From 962b4679d7fa6e2199872cbee479af17cd92c91f Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Wed, 8 Aug 2018 12:08:33 +0900 Subject: [PATCH 3/9] Require aiorpcX 0.7.0 --- electrumx/server/controller.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/electrumx/server/controller.py b/electrumx/server/controller.py index da07089..6424d0e 100644 --- a/electrumx/server/controller.py +++ b/electrumx/server/controller.py @@ -80,8 +80,8 @@ class Controller(ServerBase): '''Start the RPC server and wait for the mempool to synchronize. Then start serving external clients. ''' - if not (0, 6, 2) <= aiorpcx_version < (0, 7): - raise RuntimeError('aiorpcX version 0.6.x with x>=2 required') + if not (0, 7) <= aiorpcx_version < (0, 8): + raise RuntimeError('aiorpcX version 0.7.x required') env = self.env min_str, max_str = env.coin.SESSIONCLS.protocol_min_max_strings() From 6697614427494fe36f3ac2dcffa022aca3d44362 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Wed, 8 Aug 2018 12:04:49 +0900 Subject: [PATCH 4/9] Use close from aiorpcX 0.7.0 --- electrumx/server/session.py | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/electrumx/server/session.py b/electrumx/server/session.py index f058b18..4bc0105 100644 --- a/electrumx/server/session.py +++ b/electrumx/server/session.py @@ -252,7 +252,7 @@ class SessionManager(object): # Give the sockets some time to close gracefully async with TaskGroup() as group: for session in stale_sessions: - await group.spawn(session.close(force_after=30)) + await group.spawn(session.close()) # Consolidate small groups bw_limit = self.env.bandwidth_limit @@ -441,7 +441,7 @@ class SessionManager(object): await self._close_servers(list(self.servers.keys())) async with TaskGroup() as group: for session in list(self.sessions): - await group.spawn(session.close(force_after=0.1)) + await group.spawn(session.close(force_after=1)) def session_count(self): '''The number of connections that we've sent something to.''' @@ -587,13 +587,6 @@ class SessionBase(ServerSession): def sub_count(self): return 0 - # FIXME: make this the aiorpcx API for version 0.7 - async def close(self, force_after=30): - '''Close the connection and return when closed.''' - async with ignore_after(force_after): - await super().close() - self.abort() - async def handle_request(self, request): '''Handle an incoming request. ElectrumX doesn't receive notifications from client sessions. From 2d66a4f5aad9720e67963f17614451d2bf77b9ea Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Wed, 8 Aug 2018 12:07:36 +0900 Subject: [PATCH 5/9] aiorpcX 0.7.0 has better string text for TaskTimeout --- electrumx/server/peers.py | 8 +++----- electrumx_rpc | 2 -- 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/electrumx/server/peers.py b/electrumx/server/peers.py index 737e943..64e5fd7 100644 --- a/electrumx/server/peers.py +++ b/electrumx/server/peers.py @@ -231,16 +231,14 @@ class PeerManager(object): is_good = True break except BadPeerError as e: - self.logger.error(f'{peer_text} marking bad: ({e!r})') + self.logger.error(f'{peer_text} marking bad: ({e})') peer.mark_bad() break except RPCError as e: self.logger.error(f'{peer_text} RPC error: {e.message} ' f'({e.code})') - except TaskTimeout as e: - self.logger.error(f'{peer_text} timed out after {e.args[0]}s') - except (OSError, SOCKSError, ConnectionError) as e: - self.logger.info(f'{peer_text} {e!r}') + except (OSError, SOCKSError, ConnectionError, TaskTimeout) as e: + self.logger.info(f'{peer_text} {e}') if is_good: now = time.time() diff --git a/electrumx_rpc b/electrumx_rpc index c8bc1d0..cd7a1ac 100755 --- a/electrumx_rpc +++ b/electrumx_rpc @@ -128,8 +128,6 @@ def main(): except OSError: print('cannot connect - is ElectrumX catching up, not running, or ' f'is {port} the wrong RPC port?') - except TaskTimeout as e: - print(f'request timed out after {e.args[0]}s') except Exception as e: print(f'error making request: {e!r}') From 9c5d59e997fd04595ee2fb4d74fe97217eb41b9b Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Wed, 8 Aug 2018 12:12:44 +0900 Subject: [PATCH 6/9] Use new Session interface --- electrumx/server/session.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/electrumx/server/session.py b/electrumx/server/session.py index 4bc0105..83aa867 100644 --- a/electrumx/server/session.py +++ b/electrumx/server/session.py @@ -20,8 +20,8 @@ from collections import defaultdict from functools import partial from aiorpcx import ( - ServerSession, JSONRPCAutoDetect, TaskGroup, handler_invocation, - RPCError, Request, ignore_after + ServerSession, JSONRPCAutoDetect, JSONRPCConnection, + TaskGroup, handler_invocation, RPCError, Request, ignore_after ) import electrumx @@ -506,7 +506,8 @@ class SessionBase(ServerSession): session_counter = itertools.count() def __init__(self, session_mgr, chain_state, mempool, peer_mgr, kind): - super().__init__(protocol=JSONRPCAutoDetect) + connection = JSONRPCConnection(JSONRPCAutoDetect) + super().__init__(connection=connection) self.logger = util.class_logger(__name__, self.__class__.__name__) self.session_mgr = session_mgr self.chain_state = chain_state @@ -610,7 +611,7 @@ class ElectrumX(SessionBase): self.subscribe_headers = False self.subscribe_headers_raw = False self.notified_height = None - self.connection._max_response_size = self.env.max_send + self.connection.max_response_size = self.env.max_send self.max_subs = self.env.max_session_subs self.hashX_subs = {} self.sv_seen = False From 9b4276c68d73a3b549caa587c39e801f72f426f7 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Wed, 8 Aug 2018 12:28:46 +0900 Subject: [PATCH 7/9] Have peers.py use its own task group --- electrumx/server/peers.py | 25 +++++++++++++++---------- electrumx/server/session.py | 4 ++-- 2 files changed, 17 insertions(+), 12 deletions(-) diff --git a/electrumx/server/peers.py b/electrumx/server/peers.py index 64e5fd7..3bb208b 100644 --- a/electrumx/server/peers.py +++ b/electrumx/server/peers.py @@ -16,8 +16,8 @@ from collections import defaultdict, Counter from aiorpcx import (ClientSession, SOCKSProxy, Notification, handler_invocation, - SOCKSError, RPCError, TaskTimeout, - TaskGroup, run_in_thread, ignore_after, timeout_after) + SOCKSError, RPCError, TaskTimeout, TaskGroup, Event, + sleep, run_in_thread, ignore_after, timeout_after) from electrumx.lib.peer import Peer from electrumx.lib.util import class_logger, protocol_tuple @@ -74,7 +74,7 @@ class PeerManager(object): self.peers = set() self.permit_onion_peer_time = time.time() self.proxy = None - self.task_group = None + self.group = TaskGroup() def _my_clearnet_peer(self): '''Returns the clearnet peer representing this server, if any.''' @@ -150,7 +150,7 @@ class PeerManager(object): self.logger.info(f'detected {proxy}') return self.logger.info('no proxy detected, will try later') - await asyncio.sleep(900) + await sleep(900) async def _note_peers(self, peers, limit=2, check_ports=False, source=None): @@ -178,9 +178,9 @@ class PeerManager(object): use_peers = new_peers for peer in use_peers: self.logger.info(f'accepted new peer {peer} from {source}') - peer.retry_event = asyncio.Event() + peer.retry_event = Event() self.peers.add(peer) - await self.task_group.spawn(self._monitor_peer(peer)) + await self.group.spawn(self._monitor_peer(peer)) async def _monitor_peer(self, peer): # Stop monitoring if we were dropped (a duplicate peer) @@ -372,7 +372,7 @@ class PeerManager(object): # # External interface # - async def discover_peers(self, task_group): + async def discover_peers(self): '''Perform peer maintenance. This includes 1) Forgetting unreachable peers. @@ -385,9 +385,14 @@ class PeerManager(object): self.logger.info(f'beginning peer discovery. Force use of ' f'proxy: {self.env.force_proxy}') - self.task_group = task_group - await task_group.spawn(self._detect_proxy()) - await task_group.spawn(self._import_peers()) + forever = Event() + async with self.group as group: + await group.spawn(forever.wait()) + await group.spawn(self._detect_proxy()) + await group.spawn(self._import_peers()) + # Consume tasks as they complete + async for task in group: + task.result() def info(self): '''The number of peers.''' diff --git a/electrumx/server/session.py b/electrumx/server/session.py index 83aa867..9f63027 100644 --- a/electrumx/server/session.py +++ b/electrumx/server/session.py @@ -430,8 +430,8 @@ class SessionManager(object): await self._start_external_servers() # Peer discovery should start after the external servers # because we connect to ourself - async with TaskGroup(wait=object) as group: - await group.spawn(self.peer_mgr.discover_peers(group)) + async with TaskGroup() as group: + await group.spawn(self.peer_mgr.discover_peers()) await group.spawn(self._clear_stale_sessions()) await group.spawn(self._log_sessions()) await group.spawn(self._restart_if_paused()) From 931b227618b42d05bc56a0ec5f2fdbbd01638325 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Wed, 8 Aug 2018 12:31:32 +0900 Subject: [PATCH 8/9] Verify a peer concurrently --- electrumx/server/peers.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/electrumx/server/peers.py b/electrumx/server/peers.py index 3bb208b..209353a 100644 --- a/electrumx/server/peers.py +++ b/electrumx/server/peers.py @@ -290,10 +290,15 @@ class PeerManager(object): peer.features['server_version'] = server_version ptuple = protocol_tuple(protocol_version) - # FIXME: make these concurrent with first exception preserved - await self._send_headers_subscribe(session, peer, ptuple) - await self._send_server_features(session, peer) - await self._send_peers_subscribe(session, peer) + # Do the rest concurrently + async with TaskGroup() as group: + await group.spawn(self._send_headers_subscribe(session, peer, + ptuple)) + await group.spawn(self._send_server_features(session, peer)) + await group.spawn(self._send_peers_subscribe(session, peer)) + # If any task errors out; bail + async for task in group: + task.result() async def _send_headers_subscribe(self, session, peer, ptuple): message = 'blockchain.headers.subscribe' From 12c49bbe753caeb1047475aa51dade39c2204eff Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Wed, 8 Aug 2018 12:48:22 +0900 Subject: [PATCH 9/9] Suppress accept_connection2 asyncio log spew --- electrumx/lib/server_base.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/electrumx/lib/server_base.py b/electrumx/lib/server_base.py index 6d6f66a..602ae90 100644 --- a/electrumx/lib/server_base.py +++ b/electrumx/lib/server_base.py @@ -29,6 +29,7 @@ class ServerBase(object): ''' SUPPRESS_MESSAGE_REGEX = re.compile('SSH handshake') + SUPPRESS_TASK_REGEX = re.compile('accept_connection2') PYTHON_MIN_VERSION = (3, 6) def __init__(self, env): @@ -68,6 +69,8 @@ class ServerBase(object): message = context.get('message') if message and self.SUPPRESS_MESSAGE_REGEX.match(message): return + if self.SUPPRESS_TASK_REGEX.match(repr(context.get('task'))): + return loop.default_exception_handler(context) async def _main(self, loop):