Browse Source

aiorpcx: simplify open_session

3.3.3.1
Janus 6 years ago
committed by SomberNight
parent
commit
19387ff911
No known key found for this signature in database GPG Key ID: B33B5F232C6271E9
  1. 33
      electrum/interface.py

33
electrum/interface.py

@ -81,7 +81,7 @@ class Interface(PrintError):
async def is_server_ca_signed(self, sslc): async def is_server_ca_signed(self, sslc):
try: try:
await self.open_session(sslc, do_sleep=False) await self.open_session(sslc, exit_early=True)
except ssl.SSLError as e: except ssl.SSLError as e:
assert e.reason == 'CERTIFICATE_VERIFY_FAILED' assert e.reason == 'CERTIFICATE_VERIFY_FAILED'
return False return False
@ -90,7 +90,7 @@ class Interface(PrintError):
@util.aiosafe @util.aiosafe
async def run(self): async def run(self):
if self.protocol != 's': if self.protocol != 's':
await self.open_session(None, execute_after_connect=self.mark_ready) await self.open_session(None, exit_early=False)
assert False assert False
ca_sslc = ssl.create_default_context(ssl.Purpose.SERVER_AUTH) ca_sslc = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
@ -125,7 +125,7 @@ class Interface(PrintError):
else: else:
sslc = ssl.create_default_context(ssl.Purpose.SERVER_AUTH, cafile=self.cert_path) sslc = ssl.create_default_context(ssl.Purpose.SERVER_AUTH, cafile=self.cert_path)
sslc.check_hostname = 0 sslc.check_hostname = 0
await self.open_session(sslc, execute_after_connect=self.mark_ready) await self.open_session(sslc, exit_early=False)
assert False assert False
def mark_ready(self): def mark_ready(self):
@ -171,22 +171,21 @@ class Interface(PrintError):
except ValueError: except ValueError:
return None return None
async def open_session(self, sslc, do_sleep=True, execute_after_connect=lambda: None): async def open_session(self, sslc, exit_early):
q = asyncio.Queue() header_queue = asyncio.Queue()
async with NotificationSession(None, q, self.host, self.port, ssl=sslc, proxy=self.proxy) as session: async with NotificationSession(None, header_queue, self.host, self.port, ssl=sslc, proxy=self.proxy) as session:
ver = await session.send_request('server.version', [ELECTRUM_VERSION, PROTOCOL_VERSION]) ver = await session.send_request('server.version', [ELECTRUM_VERSION, PROTOCOL_VERSION])
self.print_error(ver, do_sleep, self.host) if exit_early:
connect_hook_executed = False return
while do_sleep: self.print_error(ver, self.host)
if not connect_hook_executed: subscription_res = await session.send_request('blockchain.headers.subscribe')
connect_hook_executed = True self.tip_header = blockchain.deserialize_header(bfh(subscription_res['hex']), subscription_res['height'])
res = await session.send_request('blockchain.headers.subscribe') self.tip = subscription_res['height']
self.tip_header = blockchain.deserialize_header(bfh(res['hex']), res['height']) self.mark_ready()
self.tip = res['height'] self.session = session
execute_after_connect() while True:
self.session = session
try: try:
new_header = await asyncio.wait_for(q.get(), 300) new_header = await asyncio.wait_for(header_queue.get(), 300)
self.tip_header = new_header self.tip_header = new_header
self.tip = new_header['block_height'] self.tip = new_header['block_height']
except concurrent.futures.TimeoutError: except concurrent.futures.TimeoutError:

Loading…
Cancel
Save