|
|
@ -263,7 +263,7 @@ class Notifier(SynchronizerBase): |
|
|
|
def __init__(self, network): |
|
|
|
SynchronizerBase.__init__(self, network) |
|
|
|
self.watched_addresses = defaultdict(list) # type: Dict[str, List[str]] |
|
|
|
self.start_watching_queue = asyncio.Queue() |
|
|
|
self._start_watching_queue = asyncio.Queue() # type: asyncio.Queue[Tuple[str, str]] |
|
|
|
|
|
|
|
async def main(self): |
|
|
|
# resend existing subscriptions if we were restarted |
|
|
@ -271,11 +271,20 @@ class Notifier(SynchronizerBase): |
|
|
|
await self._add_address(addr) |
|
|
|
# main loop |
|
|
|
while True: |
|
|
|
addr, url = await self.start_watching_queue.get() |
|
|
|
addr, url = await self._start_watching_queue.get() |
|
|
|
self.watched_addresses[addr].append(url) |
|
|
|
await self._add_address(addr) |
|
|
|
|
|
|
|
async def start_watching_addr(self, addr: str, url: str): |
|
|
|
await self._start_watching_queue.put((addr, url)) |
|
|
|
|
|
|
|
async def stop_watching_addr(self, addr: str): |
|
|
|
self.watched_addresses.pop(addr, None) |
|
|
|
# TODO blockchain.scripthash.unsubscribe |
|
|
|
|
|
|
|
async def _on_address_status(self, addr, status): |
|
|
|
if addr not in self.watched_addresses: |
|
|
|
return |
|
|
|
self.logger.info(f'new status for addr {addr}') |
|
|
|
headers = {'content-type': 'application/json'} |
|
|
|
data = {'address': addr, 'status': status} |
|
|
|