Browse Source

use an input queue in synchronizer, so that new addresses can be created from other threads

283
ThomasV 11 years ago
parent
commit
38f5deee86
  1. 19
      lib/synchronizer.py
  2. 31
      lib/wallet.py

19
lib/synchronizer.py

@ -35,6 +35,7 @@ class WalletSynchronizer(threading.Thread):
self.running = False self.running = False
self.lock = threading.Lock() self.lock = threading.Lock()
self.queue = Queue.Queue() self.queue = Queue.Queue()
self.address_queue = Queue.Queue()
def stop(self): def stop(self):
with self.lock: self.running = False with self.lock: self.running = False
@ -42,6 +43,8 @@ class WalletSynchronizer(threading.Thread):
def is_running(self): def is_running(self):
with self.lock: return self.running with self.lock: return self.running
def add(self, address):
self.address_queue.put(address)
def subscribe_to_addresses(self, addresses): def subscribe_to_addresses(self, addresses):
messages = [] messages = []
@ -49,21 +52,15 @@ class WalletSynchronizer(threading.Thread):
messages.append(('blockchain.address.subscribe', [addr])) messages.append(('blockchain.address.subscribe', [addr]))
self.network.subscribe( messages, lambda i,r: self.queue.put(r)) self.network.subscribe( messages, lambda i,r: self.queue.put(r))
def run(self): def run(self):
with self.lock: with self.lock:
self.running = True self.running = True
while self.is_running(): while self.is_running():
if not self.network.is_connected(): if not self.network.is_connected():
self.network.wait_until_connected() self.network.wait_until_connected()
self.run_interface() self.run_interface()
def run_interface(self): def run_interface(self):
print_error("synchronizer: connected to", self.network.main_server()) print_error("synchronizer: connected to", self.network.main_server())
requested_tx = [] requested_tx = []
@ -84,10 +81,18 @@ class WalletSynchronizer(threading.Thread):
self.subscribe_to_addresses(self.wallet.addresses(True)) self.subscribe_to_addresses(self.wallet.addresses(True))
while self.is_running(): while self.is_running():
# 1. create new addresses # 1. create new addresses
new_addresses = self.wallet.synchronize() self.wallet.synchronize()
# request missing addresses # request missing addresses
new_addresses = []
while True:
try:
addr = self.address_queue.get(block=False)
except Queue.Empty:
break
new_addresses.append(addr)
if new_addresses: if new_addresses:
self.subscribe_to_addresses(new_addresses) self.subscribe_to_addresses(new_addresses)

31
lib/wallet.py

@ -1139,25 +1139,23 @@ class Deterministic_Wallet(Abstract_Wallet):
if n > nmax: nmax = n if n > nmax: nmax = n
return nmax + 1 return nmax + 1
def create_new_address(self, account, for_change):
address = account.create_new_address(for_change)
self.history[address] = []
self.synchronizer.add(address)
self.save_accounts()
def synchronize_sequence(self, account, for_change): def synchronize_sequence(self, account, for_change):
limit = self.gap_limit_for_change if for_change else self.gap_limit limit = self.gap_limit_for_change if for_change else self.gap_limit
new_addresses = []
while True: while True:
addresses = account.get_addresses(for_change) addresses = account.get_addresses(for_change)
if len(addresses) < limit: if len(addresses) < limit:
address = account.create_new_address(for_change) self.create_new_address(account, for_change)
self.history[address] = []
new_addresses.append( address )
continue continue
if map( lambda a: self.address_is_old(a), addresses[-limit:] ) == limit*[False]: if map( lambda a: self.address_is_old(a), addresses[-limit:] ) == limit*[False]:
break break
else: else:
address = account.create_new_address(for_change) self.create_new_address(account, for_change)
self.history[address] = []
new_addresses.append( address )
return new_addresses
def check_pending_accounts(self): def check_pending_accounts(self):
for account_id, addr in self.next_addresses.items(): for account_id, addr in self.next_addresses.items():
@ -1169,22 +1167,15 @@ class Deterministic_Wallet(Abstract_Wallet):
self.next_addresses.pop(account_id) self.next_addresses.pop(account_id)
def synchronize_account(self, account): def synchronize_account(self, account):
new = [] self.synchronize_sequence(account, 0)
new += self.synchronize_sequence(account, 0) self.synchronize_sequence(account, 1)
new += self.synchronize_sequence(account, 1)
return new
def synchronize(self): def synchronize(self):
self.check_pending_accounts() self.check_pending_accounts()
new = []
for account in self.accounts.values(): for account in self.accounts.values():
if type(account) in [ImportedAccount, PendingAccount]: if type(account) in [ImportedAccount, PendingAccount]:
continue continue
new += self.synchronize_account(account) self.synchronize_account(account)
if new:
self.save_accounts()
self.storage.put('addr_history', self.history, True)
return new
def restore(self, callback): def restore(self, callback):
from i18n import _ from i18n import _

Loading…
Cancel
Save