|
@ -340,7 +340,6 @@ class Interface(PrintError): |
|
|
return conn, res['count'] |
|
|
return conn, res['count'] |
|
|
|
|
|
|
|
|
async def open_session(self, sslc, exit_early): |
|
|
async def open_session(self, sslc, exit_early): |
|
|
header_queue = asyncio.Queue() |
|
|
|
|
|
self.session = NotificationSession(self.host, self.port, ssl=sslc, proxy=self.proxy) |
|
|
self.session = NotificationSession(self.host, self.port, ssl=sslc, proxy=self.proxy) |
|
|
async with self.session as session: |
|
|
async with self.session as session: |
|
|
try: |
|
|
try: |
|
@ -350,11 +349,10 @@ class Interface(PrintError): |
|
|
if exit_early: |
|
|
if exit_early: |
|
|
return |
|
|
return |
|
|
self.print_error("connection established. version: {}".format(ver)) |
|
|
self.print_error("connection established. version: {}".format(ver)) |
|
|
await session.subscribe('blockchain.headers.subscribe', [], header_queue) |
|
|
|
|
|
|
|
|
|
|
|
async with self.group as group: |
|
|
async with self.group as group: |
|
|
await group.spawn(self.ping()) |
|
|
await group.spawn(self.ping()) |
|
|
await group.spawn(self.run_fetch_blocks(header_queue)) |
|
|
await group.spawn(self.run_fetch_blocks()) |
|
|
await group.spawn(self.monitor_connection()) |
|
|
await group.spawn(self.monitor_connection()) |
|
|
# NOTE: group.__aexit__ will be called here; this is needed to notice exceptions in the group! |
|
|
# NOTE: group.__aexit__ will be called here; this is needed to notice exceptions in the group! |
|
|
|
|
|
|
|
@ -373,7 +371,9 @@ class Interface(PrintError): |
|
|
self.fut.cancel() |
|
|
self.fut.cancel() |
|
|
asyncio.get_event_loop().create_task(self.group.cancel_remaining()) |
|
|
asyncio.get_event_loop().create_task(self.group.cancel_remaining()) |
|
|
|
|
|
|
|
|
async def run_fetch_blocks(self, header_queue): |
|
|
async def run_fetch_blocks(self): |
|
|
|
|
|
header_queue = asyncio.Queue() |
|
|
|
|
|
await self.session.subscribe('blockchain.headers.subscribe', [], header_queue) |
|
|
while True: |
|
|
while True: |
|
|
self.network.notify('updated') |
|
|
self.network.notify('updated') |
|
|
item = await header_queue.get() |
|
|
item = await header_queue.get() |
|
|