Browse Source

synchronizer: make 'add' thread-safe, and some clean-up

3.3.3.1
SomberNight 6 years ago
parent
commit
002b8a99e2
No known key found for this signature in database GPG Key ID: B33B5F232C6271E9
  1. 30
      electrum/synchronizer.py

30
electrum/synchronizer.py

@ -51,6 +51,7 @@ class Synchronizer(PrintError):
''' '''
def __init__(self, wallet): def __init__(self, wallet):
self.wallet = wallet self.wallet = wallet
self.asyncio_loop = wallet.network.asyncio_loop
self.requested_tx = {} self.requested_tx = {}
self.requested_histories = {} self.requested_histories = {}
self.requested_addrs = set() self.requested_addrs = set()
@ -69,10 +70,13 @@ class Synchronizer(PrintError):
and not self.requested_tx) and not self.requested_tx)
def add(self, addr): def add(self, addr):
asyncio.run_coroutine_threadsafe(self._add(addr), self.asyncio_loop)
async def _add(self, addr):
self.requested_addrs.add(addr) self.requested_addrs.add(addr)
self.add_queue.put_nowait(addr) await self.add_queue.put(addr)
async def on_address_status(self, addr, status): async def _on_address_status(self, addr, status):
history = self.wallet.history.get(addr, []) history = self.wallet.history.get(addr, [])
if history_status(history) == status: if history_status(history) == status:
return return
@ -98,12 +102,12 @@ class Synchronizer(PrintError):
# Store received history # Store received history
self.wallet.receive_history_callback(addr, hist, tx_fees) self.wallet.receive_history_callback(addr, hist, tx_fees)
# Request transactions we don't have # Request transactions we don't have
await self.request_missing_txs(hist) await self._request_missing_txs(hist)
# Remove request; this allows up_to_date to be True # Remove request; this allows up_to_date to be True
self.requested_histories.pop(addr) self.requested_histories.pop(addr)
async def request_missing_txs(self, hist): async def _request_missing_txs(self, hist):
# "hist" is a list of [tx_hash, tx_height] lists # "hist" is a list of [tx_hash, tx_height] lists
transaction_hashes = [] transaction_hashes = []
for tx_hash, tx_height in hist: for tx_hash, tx_height in hist:
@ -114,11 +118,12 @@ class Synchronizer(PrintError):
transaction_hashes.append(tx_hash) transaction_hashes.append(tx_hash)
self.requested_tx[tx_hash] = tx_height self.requested_tx[tx_hash] = tx_height
if not transaction_hashes: return
async with TaskGroup() as group: async with TaskGroup() as group:
for tx_hash in transaction_hashes: for tx_hash in transaction_hashes:
await group.spawn(self.get_transaction, tx_hash) await group.spawn(self._get_transaction, tx_hash)
async def get_transaction(self, tx_hash): async def _get_transaction(self, tx_hash):
result = await self.session.send_request('blockchain.transaction.get', [tx_hash]) result = await self.session.send_request('blockchain.transaction.get', [tx_hash])
tx = Transaction(result) tx = Transaction(result)
try: try:
@ -137,22 +142,22 @@ class Synchronizer(PrintError):
# callbacks # callbacks
self.wallet.network.trigger_callback('new_transaction', self.wallet, tx) self.wallet.network.trigger_callback('new_transaction', self.wallet, tx)
async def subscribe_to_address(self, addr): async def send_subscriptions(self, group: TaskGroup):
async def subscribe_to_address(addr):
h = address_to_scripthash(addr) h = address_to_scripthash(addr)
self.scripthash_to_address[h] = addr self.scripthash_to_address[h] = addr
await self.session.subscribe('blockchain.scripthash.subscribe', [h], self.status_queue) await self.session.subscribe('blockchain.scripthash.subscribe', [h], self.status_queue)
self.requested_addrs.remove(addr) self.requested_addrs.remove(addr)
async def send_subscriptions(self, group: TaskGroup):
while True: while True:
addr = await self.add_queue.get() addr = await self.add_queue.get()
await group.spawn(self.subscribe_to_address, addr) await group.spawn(subscribe_to_address, addr)
async def handle_status(self, group: TaskGroup): async def handle_status(self, group: TaskGroup):
while True: while True:
h, status = await self.status_queue.get() h, status = await self.status_queue.get()
addr = self.scripthash_to_address[h] addr = self.scripthash_to_address[h]
await group.spawn(self.on_address_status, addr, status) await group.spawn(self._on_address_status, addr, status)
self._processed_some_notifications = True self._processed_some_notifications = True
@property @property
@ -164,15 +169,14 @@ class Synchronizer(PrintError):
async def main(self): async def main(self):
self.wallet.set_up_to_date(False) self.wallet.set_up_to_date(False)
# request missing txns, if any # request missing txns, if any
async with TaskGroup() as group:
for history in self.wallet.history.values(): for history in self.wallet.history.values():
# Old electrum servers returned ['*'] when all history for the address # Old electrum servers returned ['*'] when all history for the address
# was pruned. This no longer happens but may remain in old wallets. # was pruned. This no longer happens but may remain in old wallets.
if history == ['*']: continue if history == ['*']: continue
await group.spawn(self.request_missing_txs, history) await self._request_missing_txs(history)
# add addresses to bootstrap # add addresses to bootstrap
for addr in self.wallet.get_addresses(): for addr in self.wallet.get_addresses():
self.add(addr) await self._add(addr)
# main loop # main loop
while True: while True:
await asyncio.sleep(0.1) await asyncio.sleep(0.1)

Loading…
Cancel
Save