From 51e0672da6ed4d8d8b078b0aad8cd515d59aaeef Mon Sep 17 00:00:00 2001 From: SomberNight Date: Tue, 30 Apr 2019 21:24:39 +0200 Subject: [PATCH] update to aiorpcx 0.17 --- contrib/requirements/requirements.txt | 2 +- electrum/interface.py | 71 +++++++++++++++------------ 2 files changed, 40 insertions(+), 33 deletions(-) diff --git a/contrib/requirements/requirements.txt b/contrib/requirements/requirements.txt index de28c7825..a68f7a90e 100644 --- a/contrib/requirements/requirements.txt +++ b/contrib/requirements/requirements.txt @@ -5,7 +5,7 @@ protobuf dnspython jsonrpclib-pelix qdarkstyle<2.6 -aiorpcx>=0.9,<0.11 +aiorpcx>=0.17,<0.18 aiohttp>=3.3.0 aiohttp_socks certifi diff --git a/electrum/interface.py b/electrum/interface.py index b642d64b4..aee8ca71a 100644 --- a/electrum/interface.py +++ b/electrum/interface.py @@ -34,6 +34,7 @@ from ipaddress import IPv4Network, IPv6Network, ip_address import aiorpcx from aiorpcx import RPCSession, Notification +from aiorpcx.curio import timeout_after, TaskTimeout import certifi from .util import PrintError, ignore_exceptions, log_exceptions, bfh, SilentTaskGroup @@ -72,10 +73,10 @@ class NotificationSession(RPCSession): super(NotificationSession, self).__init__(*args, **kwargs) self.subscriptions = defaultdict(list) self.cache = {} - self.in_flight_requests_semaphore = asyncio.Semaphore(100) self.default_timeout = NetworkTimeout.Generic.NORMAL self._msg_counter = 0 self.interface = None # type: Optional[Interface] + self.cost_hard_limit = 0 # disable aiorpcx resource limits def _get_and_inc_msg_counter(self): # runs in event loop thread, no need for lock @@ -84,35 +85,40 @@ class NotificationSession(RPCSession): async def handle_request(self, request): self.maybe_log(f"--> {request}") - # note: if server sends malformed request and we raise, the superclass - # will catch the exception, count errors, and at some point disconnect - if isinstance(request, Notification): - params, result = request.args[:-1], request.args[-1] - key = self.get_hashable_key_for_rpc_call(request.method, params) - if key in self.subscriptions: - self.cache[key] = result - for queue in self.subscriptions[key]: - await queue.put(request.args) + try: + if isinstance(request, Notification): + params, result = request.args[:-1], request.args[-1] + key = self.get_hashable_key_for_rpc_call(request.method, params) + if key in self.subscriptions: + self.cache[key] = result + for queue in self.subscriptions[key]: + await queue.put(request.args) + else: + raise Exception(f'unexpected notification') else: - raise Exception('unexpected request: {}'.format(repr(request))) + raise Exception(f'unexpected request. not a notification') + except Exception as e: + self.interface.print_error(f"error handling request {request}. exc: {repr(e)}") + await self.close() async def send_request(self, *args, timeout=None, **kwargs): - # note: the timeout starts after the request touches the wire! - if timeout is None: - timeout = self.default_timeout - # note: the semaphore implementation guarantees no starvation - async with self.in_flight_requests_semaphore: - msg_id = self._get_and_inc_msg_counter() - self.maybe_log(f"<-- {args} {kwargs} (id: {msg_id})") - try: - response = await asyncio.wait_for( - super().send_request(*args, **kwargs), - timeout) - except asyncio.TimeoutError as e: - raise RequestTimedOut(f'request timed out: {args} (id: {msg_id})') from e - else: - self.maybe_log(f"--> {response} (id: {msg_id})") - return response + # note: semaphores/timeouts/backpressure etc are handled by + # aiorpcx. the timeout arg here in most cases should not be set + msg_id = self._get_and_inc_msg_counter() + self.maybe_log(f"<-- {args} {kwargs} (id: {msg_id})") + try: + response = await asyncio.wait_for( + super().send_request(*args, **kwargs), + timeout) + except (TaskTimeout, asyncio.TimeoutError) as e: + raise RequestTimedOut(f'request timed out: {args} (id: {msg_id})') from e + else: + self.maybe_log(f"--> {response} (id: {msg_id})") + return response + + def set_default_timeout(self, timeout): + self.sent_request_timeout = timeout + self.max_send_delay = timeout async def subscribe(self, method: str, params: List, queue: asyncio.Queue): # note: until the cache is written for the first time, @@ -212,10 +218,11 @@ class Interface(PrintError): auth = None else: auth = aiorpcx.socks.SOCKSUserAuth(username, pw) + addr = "{}:{}".format(proxy['host'], proxy['port']) if proxy['mode'] == "socks4": - self.proxy = aiorpcx.socks.SOCKSProxy((proxy['host'], int(proxy['port'])), aiorpcx.socks.SOCKS4a, auth) + self.proxy = aiorpcx.socks.SOCKSProxy(addr, aiorpcx.socks.SOCKS4a, auth) elif proxy['mode'] == "socks5": - self.proxy = aiorpcx.socks.SOCKSProxy((proxy['host'], int(proxy['port'])), aiorpcx.socks.SOCKS5, auth) + self.proxy = aiorpcx.socks.SOCKSProxy(addr, aiorpcx.socks.SOCKS5, auth) else: raise NotImplementedError # http proxy not available with aiorpcx else: @@ -408,7 +415,7 @@ class Interface(PrintError): ssl=sslc, proxy=self.proxy) as session: self.session = session # type: NotificationSession self.session.interface = self - self.session.default_timeout = self.network.get_network_timeout_seconds(NetworkTimeout.Generic) + self.session.set_default_timeout(self.network.get_network_timeout_seconds(NetworkTimeout.Generic)) try: ver = await session.send_request('server.version', [self.client_name(), version.PROTOCOL_VERSION]) except aiorpcx.jsonrpc.RPCError as e: @@ -620,9 +627,9 @@ class Interface(PrintError): def ip_addr(self) -> Optional[str]: session = self.session if not session: return None - peer_addr = session.peer_address() + peer_addr = session.remote_address() if not peer_addr: return None - return peer_addr[0] + return str(peer_addr.host) def bucket_based_on_ipaddress(self) -> str: def do_bucket():