|
@ -24,7 +24,7 @@ |
|
|
# SOFTWARE. |
|
|
# SOFTWARE. |
|
|
import asyncio |
|
|
import asyncio |
|
|
import hashlib |
|
|
import hashlib |
|
|
from typing import Dict, List, TYPE_CHECKING |
|
|
from typing import Dict, List, TYPE_CHECKING, Tuple |
|
|
from collections import defaultdict |
|
|
from collections import defaultdict |
|
|
|
|
|
|
|
|
from aiorpcx import TaskGroup, run_in_thread |
|
|
from aiorpcx import TaskGroup, run_in_thread |
|
@ -59,12 +59,14 @@ class SynchronizerBase(NetworkJobOnDefaultServer): |
|
|
def __init__(self, network: 'Network'): |
|
|
def __init__(self, network: 'Network'): |
|
|
self.asyncio_loop = network.asyncio_loop |
|
|
self.asyncio_loop = network.asyncio_loop |
|
|
NetworkJobOnDefaultServer.__init__(self, network) |
|
|
NetworkJobOnDefaultServer.__init__(self, network) |
|
|
|
|
|
self._reset_request_counters() |
|
|
|
|
|
|
|
|
def _reset(self): |
|
|
def _reset(self): |
|
|
super()._reset() |
|
|
super()._reset() |
|
|
self.requested_addrs = set() |
|
|
self.requested_addrs = set() |
|
|
self.scripthash_to_address = {} |
|
|
self.scripthash_to_address = {} |
|
|
self._processed_some_notifications = False # so that we don't miss them |
|
|
self._processed_some_notifications = False # so that we don't miss them |
|
|
|
|
|
self._reset_request_counters() |
|
|
# Queues |
|
|
# Queues |
|
|
self.add_queue = asyncio.Queue() |
|
|
self.add_queue = asyncio.Queue() |
|
|
self.status_queue = asyncio.Queue() |
|
|
self.status_queue = asyncio.Queue() |
|
@ -79,6 +81,10 @@ class SynchronizerBase(NetworkJobOnDefaultServer): |
|
|
# we are being cancelled now |
|
|
# we are being cancelled now |
|
|
self.session.unsubscribe(self.status_queue) |
|
|
self.session.unsubscribe(self.status_queue) |
|
|
|
|
|
|
|
|
|
|
|
def _reset_request_counters(self): |
|
|
|
|
|
self._requests_sent = 0 |
|
|
|
|
|
self._requests_answered = 0 |
|
|
|
|
|
|
|
|
def add(self, addr): |
|
|
def add(self, addr): |
|
|
asyncio.run_coroutine_threadsafe(self._add_address(addr), self.asyncio_loop) |
|
|
asyncio.run_coroutine_threadsafe(self._add_address(addr), self.asyncio_loop) |
|
|
|
|
|
|
|
@ -96,7 +102,9 @@ class SynchronizerBase(NetworkJobOnDefaultServer): |
|
|
async def subscribe_to_address(addr): |
|
|
async def subscribe_to_address(addr): |
|
|
h = address_to_scripthash(addr) |
|
|
h = address_to_scripthash(addr) |
|
|
self.scripthash_to_address[h] = addr |
|
|
self.scripthash_to_address[h] = addr |
|
|
|
|
|
self._requests_sent += 1 |
|
|
await self.session.subscribe('blockchain.scripthash.subscribe', [h], self.status_queue) |
|
|
await self.session.subscribe('blockchain.scripthash.subscribe', [h], self.status_queue) |
|
|
|
|
|
self._requests_answered += 1 |
|
|
self.requested_addrs.remove(addr) |
|
|
self.requested_addrs.remove(addr) |
|
|
|
|
|
|
|
|
while True: |
|
|
while True: |
|
@ -110,6 +118,9 @@ class SynchronizerBase(NetworkJobOnDefaultServer): |
|
|
await self.group.spawn(self._on_address_status, addr, status) |
|
|
await self.group.spawn(self._on_address_status, addr, status) |
|
|
self._processed_some_notifications = True |
|
|
self._processed_some_notifications = True |
|
|
|
|
|
|
|
|
|
|
|
def num_requests_sent_and_answered(self) -> Tuple[int, int]: |
|
|
|
|
|
return self._requests_sent, self._requests_answered |
|
|
|
|
|
|
|
|
async def main(self): |
|
|
async def main(self): |
|
|
raise NotImplementedError() # implemented by subclasses |
|
|
raise NotImplementedError() # implemented by subclasses |
|
|
|
|
|
|
|
@ -148,7 +159,9 @@ class Synchronizer(SynchronizerBase): |
|
|
# request address history |
|
|
# request address history |
|
|
self.requested_histories[addr] = status |
|
|
self.requested_histories[addr] = status |
|
|
h = address_to_scripthash(addr) |
|
|
h = address_to_scripthash(addr) |
|
|
|
|
|
self._requests_sent += 1 |
|
|
result = await self.network.get_history_for_scripthash(h) |
|
|
result = await self.network.get_history_for_scripthash(h) |
|
|
|
|
|
self._requests_answered += 1 |
|
|
self.logger.info(f"receiving history {addr} {len(result)}") |
|
|
self.logger.info(f"receiving history {addr} {len(result)}") |
|
|
hashes = set(map(lambda item: item['tx_hash'], result)) |
|
|
hashes = set(map(lambda item: item['tx_hash'], result)) |
|
|
hist = list(map(lambda item: (item['tx_hash'], item['height']), result)) |
|
|
hist = list(map(lambda item: (item['tx_hash'], item['height']), result)) |
|
@ -187,6 +200,7 @@ class Synchronizer(SynchronizerBase): |
|
|
await group.spawn(self._get_transaction(tx_hash, allow_server_not_finding_tx=allow_server_not_finding_tx)) |
|
|
await group.spawn(self._get_transaction(tx_hash, allow_server_not_finding_tx=allow_server_not_finding_tx)) |
|
|
|
|
|
|
|
|
async def _get_transaction(self, tx_hash, *, allow_server_not_finding_tx=False): |
|
|
async def _get_transaction(self, tx_hash, *, allow_server_not_finding_tx=False): |
|
|
|
|
|
self._requests_sent += 1 |
|
|
try: |
|
|
try: |
|
|
result = await self.network.get_transaction(tx_hash) |
|
|
result = await self.network.get_transaction(tx_hash) |
|
|
except UntrustedServerReturnedError as e: |
|
|
except UntrustedServerReturnedError as e: |
|
@ -196,6 +210,8 @@ class Synchronizer(SynchronizerBase): |
|
|
return |
|
|
return |
|
|
else: |
|
|
else: |
|
|
raise |
|
|
raise |
|
|
|
|
|
finally: |
|
|
|
|
|
self._requests_answered += 1 |
|
|
tx = Transaction(result) |
|
|
tx = Transaction(result) |
|
|
try: |
|
|
try: |
|
|
tx.deserialize() # see if raises |
|
|
tx.deserialize() # see if raises |
|
@ -234,6 +250,8 @@ class Synchronizer(SynchronizerBase): |
|
|
if (up_to_date != self.wallet.is_up_to_date() |
|
|
if (up_to_date != self.wallet.is_up_to_date() |
|
|
or up_to_date and self._processed_some_notifications): |
|
|
or up_to_date and self._processed_some_notifications): |
|
|
self._processed_some_notifications = False |
|
|
self._processed_some_notifications = False |
|
|
|
|
|
if up_to_date: |
|
|
|
|
|
self._reset_request_counters() |
|
|
self.wallet.set_up_to_date(up_to_date) |
|
|
self.wallet.set_up_to_date(up_to_date) |
|
|
self.wallet.network.trigger_callback('wallet_updated', self.wallet) |
|
|
self.wallet.network.trigger_callback('wallet_updated', self.wallet) |
|
|
|
|
|
|
|
|