Browse Source

add multiplexing capability to NotificationSession, simplify interface

3.3.3.1
ThomasV 6 years ago
parent
commit
3b6af914e1
  1. 77
      electrum/interface.py
  2. 4
      electrum/synchronizer.py

77
electrum/interface.py

@ -47,22 +47,21 @@ from . import constants
class NotificationSession(ClientSession): class NotificationSession(ClientSession):
def __init__(self, scripthash, header, *args, **kwargs): def __init__(self, *args, **kwargs):
super(NotificationSession, self).__init__(*args, **kwargs) super(NotificationSession, self).__init__(*args, **kwargs)
# queues: self.subscriptions = {}
self.scripthash = scripthash self.cache = {}
self.header = header
async def handle_request(self, request): async def handle_request(self, request):
# note: if server sends malformed request and we raise, the superclass # note: if server sends malformed request and we raise, the superclass
# will catch the exception, count errors, and at some point disconnect # will catch the exception, count errors, and at some point disconnect
if isinstance(request, Notification): if isinstance(request, Notification):
if request.method == 'blockchain.scripthash.subscribe' and self.scripthash is not None: params, result = request.args[:-1], request.args[-1]
scripthash, status = request.args key = request.method + repr(params)
await self.scripthash.put((scripthash, status)) if key in self.subscriptions:
elif request.method == 'blockchain.headers.subscribe' and self.header is not None: self.cache[key] = result
deser = deserialize_header(bfh(request.args[0]['hex']), request.args[0]['height']) for queue in self.subscriptions[key]:
await self.header.put(deser) await queue.put(request.args)
else: else:
assert False, request.method assert False, request.method
@ -73,6 +72,17 @@ class NotificationSession(ClientSession):
super().send_request(*args, **kwargs), super().send_request(*args, **kwargs),
timeout) timeout)
async def subscribe(self, method, params, queue):
key = method + repr(params)
if key in self.subscriptions:
self.subscriptions[key].append(queue)
result = self.cache[key]
else:
result = await self.send_request(method, params)
self.subscriptions[key] = [queue]
self.cache[key] = result
await queue.put(params + [result])
# FIXME this is often raised inside a TaskGroup, but then it's not silent :( # FIXME this is often raised inside a TaskGroup, but then it's not silent :(
class GracefulDisconnect(AIOSafeSilentException): pass class GracefulDisconnect(AIOSafeSilentException): pass
@ -122,7 +132,6 @@ class Interface(PrintError):
self.tip_header = None self.tip_header = None
self.tip = 0 self.tip = 0
self.tip_lock = asyncio.Lock()
# TODO combine? # TODO combine?
self.fut = asyncio.get_event_loop().create_task(self.run()) self.fut = asyncio.get_event_loop().create_task(self.run())
@ -280,7 +289,7 @@ class Interface(PrintError):
async def open_session(self, sslc, exit_early): async def open_session(self, sslc, exit_early):
header_queue = asyncio.Queue() header_queue = asyncio.Queue()
self.session = NotificationSession(None, header_queue, self.host, self.port, ssl=sslc, proxy=self.proxy) self.session = NotificationSession(self.host, self.port, ssl=sslc, proxy=self.proxy)
async with self.session as session: async with self.session as session:
try: try:
ver = await session.send_request('server.version', [ELECTRUM_VERSION, PROTOCOL_VERSION]) ver = await session.send_request('server.version', [ELECTRUM_VERSION, PROTOCOL_VERSION])
@ -289,14 +298,10 @@ class Interface(PrintError):
if exit_early: if exit_early:
return return
self.print_error(ver, self.host) self.print_error(ver, self.host)
subscription_res = await session.send_request('blockchain.headers.subscribe') await session.subscribe('blockchain.headers.subscribe', [], header_queue)
self.tip_header = blockchain.deserialize_header(bfh(subscription_res['hex']), subscription_res['height'])
self.tip = subscription_res['height']
self.mark_ready()
copy_header_queue = asyncio.Queue()
async with self.group as group: async with self.group as group:
await group.spawn(self.run_fetch_blocks(subscription_res, copy_header_queue)) await group.spawn(self.ping())
await group.spawn(self.subscribe_to_headers(header_queue, copy_header_queue)) await group.spawn(self.run_fetch_blocks(header_queue))
await group.spawn(self.monitor_connection()) await group.spawn(self.monitor_connection())
# NOTE: group.__aexit__ will be called here; this is needed to notice exceptions in the group! # NOTE: group.__aexit__ will be called here; this is needed to notice exceptions in the group!
@ -306,33 +311,29 @@ class Interface(PrintError):
if not self.session or self.session.is_closing(): if not self.session or self.session.is_closing():
raise GracefulDisconnect('server closed session') raise GracefulDisconnect('server closed session')
async def subscribe_to_headers(self, header_queue, copy_header_queue): async def ping(self):
while True: while True:
try: await asyncio.sleep(300)
new_header = await asyncio.wait_for(header_queue.get(), 300) await self.session.send_request('server.ping', timeout=10)
async with self.tip_lock:
self.tip_header = new_header
self.tip = new_header['block_height']
await copy_header_queue.put(new_header)
except concurrent.futures.TimeoutError:
await self.session.send_request('server.ping', timeout=10)
def close(self): def close(self):
self.fut.cancel() self.fut.cancel()
asyncio.get_event_loop().create_task(self.group.cancel_remaining()) asyncio.get_event_loop().create_task(self.group.cancel_remaining())
async def run_fetch_blocks(self, sub_reply, replies): async def run_fetch_blocks(self, header_queue):
if self.tip < constants.net.max_checkpoint():
raise GracefulDisconnect('server tip below max checkpoint')
async with self.network.bhi_lock:
height = self.blockchain.height()+1
await replies.put(blockchain.deserialize_header(bfh(sub_reply['hex']), sub_reply['height']))
while True: while True:
self.network.notify('updated') self.network.notify('updated')
item = await replies.get() item = await header_queue.get()
async with self.network.bhi_lock, self.tip_lock: item = item[0]
height = item['height']
item = blockchain.deserialize_header(bfh(item['hex']), item['height'])
self.tip_header = item
self.tip = height
if self.tip < constants.net.max_checkpoint():
raise GracefulDisconnect('server tip below max checkpoint')
if not self.ready.done():
self.mark_ready()
async with self.network.bhi_lock:
if self.blockchain.height() < item['block_height']-1: if self.blockchain.height() < item['block_height']-1:
_, height = await self.sync_until(height, None) _, height = await self.sync_until(height, None)
if self.blockchain.height() >= height and self.blockchain.check_header(item): if self.blockchain.height() >= height and self.blockchain.check_header(item):

4
electrum/synchronizer.py

@ -141,9 +141,7 @@ class Synchronizer(PrintError):
async def subscribe_to_address(self, addr): async def subscribe_to_address(self, addr):
h = address_to_scripthash(addr) h = address_to_scripthash(addr)
self.scripthash_to_address[h] = addr self.scripthash_to_address[h] = addr
self.session.scripthash = self.status_queue await self.session.subscribe('blockchain.scripthash.subscribe', [h], self.status_queue)
status = await self.session.send_request('blockchain.scripthash.subscribe', [h])
await self.status_queue.put((h, status))
self.requested_addrs.remove(addr) self.requested_addrs.remove(addr)
async def send_subscriptions(self, interface): async def send_subscriptions(self, interface):

Loading…
Cancel
Save