From 19387ff91133174ada0cf0a59e8e62bc6b3cd7e4 Mon Sep 17 00:00:00 2001 From: Janus Date: Fri, 31 Aug 2018 18:06:11 +0200 Subject: [PATCH] aiorpcx: simplify open_session --- electrum/interface.py | 33 ++++++++++++++++----------------- 1 file changed, 16 insertions(+), 17 deletions(-) diff --git a/electrum/interface.py b/electrum/interface.py index ad6ea7c1b..1db430fe7 100644 --- a/electrum/interface.py +++ b/electrum/interface.py @@ -81,7 +81,7 @@ class Interface(PrintError): async def is_server_ca_signed(self, sslc): try: - await self.open_session(sslc, do_sleep=False) + await self.open_session(sslc, exit_early=True) except ssl.SSLError as e: assert e.reason == 'CERTIFICATE_VERIFY_FAILED' return False @@ -90,7 +90,7 @@ class Interface(PrintError): @util.aiosafe async def run(self): if self.protocol != 's': - await self.open_session(None, execute_after_connect=self.mark_ready) + await self.open_session(None, exit_early=False) assert False ca_sslc = ssl.create_default_context(ssl.Purpose.SERVER_AUTH) @@ -125,7 +125,7 @@ class Interface(PrintError): else: sslc = ssl.create_default_context(ssl.Purpose.SERVER_AUTH, cafile=self.cert_path) sslc.check_hostname = 0 - await self.open_session(sslc, execute_after_connect=self.mark_ready) + await self.open_session(sslc, exit_early=False) assert False def mark_ready(self): @@ -171,22 +171,21 @@ class Interface(PrintError): except ValueError: return None - async def open_session(self, sslc, do_sleep=True, execute_after_connect=lambda: None): - q = asyncio.Queue() - async with NotificationSession(None, q, self.host, self.port, ssl=sslc, proxy=self.proxy) as session: + async def open_session(self, sslc, exit_early): + header_queue = asyncio.Queue() + async with NotificationSession(None, header_queue, self.host, self.port, ssl=sslc, proxy=self.proxy) as session: ver = await session.send_request('server.version', [ELECTRUM_VERSION, PROTOCOL_VERSION]) - self.print_error(ver, do_sleep, self.host) - connect_hook_executed = False - while do_sleep: - if not connect_hook_executed: - connect_hook_executed = True - res = await session.send_request('blockchain.headers.subscribe') - self.tip_header = blockchain.deserialize_header(bfh(res['hex']), res['height']) - self.tip = res['height'] - execute_after_connect() - self.session = session + if exit_early: + return + self.print_error(ver, self.host) + subscription_res = await session.send_request('blockchain.headers.subscribe') + self.tip_header = blockchain.deserialize_header(bfh(subscription_res['hex']), subscription_res['height']) + self.tip = subscription_res['height'] + self.mark_ready() + self.session = session + while True: try: - new_header = await asyncio.wait_for(q.get(), 300) + new_header = await asyncio.wait_for(header_queue.get(), 300) self.tip_header = new_header self.tip = new_header['block_height'] except concurrent.futures.TimeoutError: