|
|
@ -71,8 +71,16 @@ class NotificationSession(RPCSession): |
|
|
|
self.cache = {} |
|
|
|
self.in_flight_requests_semaphore = asyncio.Semaphore(100) |
|
|
|
self.default_timeout = NetworkTimeout.Generic.NORMAL |
|
|
|
self._msg_counter = 0 |
|
|
|
self.interface = None # type: Optional[Interface] |
|
|
|
|
|
|
|
def _get_and_inc_msg_counter(self): |
|
|
|
# runs in event loop thread, no need for lock |
|
|
|
self._msg_counter += 1 |
|
|
|
return self._msg_counter |
|
|
|
|
|
|
|
async def handle_request(self, request): |
|
|
|
self.maybe_log(f"--> {request}") |
|
|
|
# note: if server sends malformed request and we raise, the superclass |
|
|
|
# will catch the exception, count errors, and at some point disconnect |
|
|
|
if isinstance(request, Notification): |
|
|
@ -91,12 +99,17 @@ class NotificationSession(RPCSession): |
|
|
|
timeout = self.default_timeout |
|
|
|
# note: the semaphore implementation guarantees no starvation |
|
|
|
async with self.in_flight_requests_semaphore: |
|
|
|
msg_id = self._get_and_inc_msg_counter() |
|
|
|
self.maybe_log(f"<-- {args} {kwargs} (id: {msg_id})") |
|
|
|
try: |
|
|
|
return await asyncio.wait_for( |
|
|
|
response = await asyncio.wait_for( |
|
|
|
super().send_request(*args, **kwargs), |
|
|
|
timeout) |
|
|
|
except asyncio.TimeoutError as e: |
|
|
|
raise RequestTimedOut('request timed out: {}'.format(args)) from e |
|
|
|
raise RequestTimedOut(f'request timed out: {args} (id: {msg_id})') from e |
|
|
|
else: |
|
|
|
self.maybe_log(f"--> {response} (id: {msg_id})") |
|
|
|
return response |
|
|
|
|
|
|
|
async def subscribe(self, method: str, params: List, queue: asyncio.Queue): |
|
|
|
# note: until the cache is written for the first time, |
|
|
@ -123,6 +136,11 @@ class NotificationSession(RPCSession): |
|
|
|
"""Hashable index for subscriptions and cache""" |
|
|
|
return str(method) + repr(params) |
|
|
|
|
|
|
|
def maybe_log(self, msg: str) -> None: |
|
|
|
if not self.interface: return |
|
|
|
if self.interface.debug or self.interface.network.debug: |
|
|
|
self.interface.print_error(msg) |
|
|
|
|
|
|
|
|
|
|
|
class GracefulDisconnect(Exception): pass |
|
|
|
|
|
|
@ -173,6 +191,9 @@ class Interface(PrintError): |
|
|
|
self.tip_header = None |
|
|
|
self.tip = 0 |
|
|
|
|
|
|
|
# Dump network messages (only for this interface). Set at runtime from the console. |
|
|
|
self.debug = False |
|
|
|
|
|
|
|
asyncio.run_coroutine_threadsafe( |
|
|
|
self.network.main_taskgroup.spawn(self.run()), self.network.asyncio_loop) |
|
|
|
self.group = SilentTaskGroup() |
|
|
@ -370,6 +391,7 @@ class Interface(PrintError): |
|
|
|
host=self.host, port=self.port, |
|
|
|
ssl=sslc, proxy=self.proxy) as session: |
|
|
|
self.session = session # type: NotificationSession |
|
|
|
self.session.interface = self |
|
|
|
self.session.default_timeout = self.network.get_network_timeout_seconds(NetworkTimeout.Generic) |
|
|
|
try: |
|
|
|
ver = await session.send_request('server.version', [version.ELECTRUM_VERSION, version.PROTOCOL_VERSION]) |
|
|
|