From e8db8983ece26a6ef681ae8baa991af0fabbd3d4 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Mon, 8 Jun 2015 16:56:04 +0900 Subject: [PATCH] Make the synchronizer not a thread. The synchronizer's work is done from the network proxy's main loop. A minor problem with the old synchronizer was that it considered itself out of date if the network was out of date. This was too generic: the network can have pending requests unrelated to the synchronizer. This resulted in the synchronizer often unnecessarily flipping the wallet between up-to-date and not-up-to-date, and causing unnecessary calls to wallet.save_transactions(). This was observable when opening the network dialog box: frequently just opening it would cause a wallet status change and transaction flush, simply because the network dialog sends a get_parameters() request. This rework of the synchronizer does not have that issue. --- lib/network_proxy.py | 3 + lib/synchronizer.py | 307 +++++++++++++++++++++---------------------- lib/wallet.py | 7 +- 3 files changed, 159 insertions(+), 158 deletions(-) diff --git a/lib/network_proxy.py b/lib/network_proxy.py index ca2dc2214..6453ead9f 100644 --- a/lib/network_proxy.py +++ b/lib/network_proxy.py @@ -61,10 +61,13 @@ class NetworkProxy(util.DaemonThread): self.blockchain_height = 0 self.server_height = 0 self.interfaces = [] + self.jobs = [] def run(self): while self.is_running(): + for job in self.jobs: + job() try: response = self.pipe.get() except util.timeout: diff --git a/lib/synchronizer.py b/lib/synchronizer.py index 6ac8a9bcb..6555d0d0e 100644 --- a/lib/synchronizer.py +++ b/lib/synchronizer.py @@ -17,172 +17,169 @@ # along with this program. If not, see . -import threading -import time -import Queue +from threading import Lock -import bitcoin -import util +from bitcoin import Hash, hash_encode from transaction import Transaction +from util import print_error, print_msg -class WalletSynchronizer(util.DaemonThread): +class WalletSynchronizer(): + '''The synchronizer keeps the wallet up-to-date with its set of + addresses and their transactions. It subscribes over the network + to wallet addresses, gets the wallet to generate new addresses + when necessary, requests the transaction history of any addresses + we don't have the full history of, and requests binary transaction + data of any transactions the wallet doesn't have. + + External interface: __init__() and add() member functions. + ''' def __init__(self, wallet, network): - util.DaemonThread.__init__(self) self.wallet = wallet self.network = network - self.was_updated = True - self.queue = Queue.Queue() - self.address_queue = Queue.Queue() + self.new_addresses = set() + # Entries are (tx_hash, tx_height) tuples + self.requested_tx = set() + self.requested_histories = {} + self.requested_addrs = set() + self.lock = Lock() + self.initialize() + + def print_error(self, *msg): + print_error("[Synchronizer]", *msg) + + def print_msg(self, *msg): + print_msg("[Synchronizer]", *msg) + + def parse_response(self, response): + if response.get('error'): + self.print_error("response error:", response) + return None, None + return response['params'], response['result'] + + def is_up_to_date(self): + return (not self.requested_tx and not self.requested_histories + and not self.requested_addrs) def add(self, address): - self.address_queue.put(address) + '''This can be called from the proxy or GUI threads.''' + with self.lock: + self.new_addresses.add(address) def subscribe_to_addresses(self, addresses): - messages = [] - for addr in addresses: - messages.append(('blockchain.address.subscribe', [addr])) - self.network.send(messages, self.queue.put) - - def run(self): - while self.is_running(): - if not self.network.is_connected(): - time.sleep(0.1) - continue - self.run_interface() - self.print_error("stopped") - - def run_interface(self): - #print_error("synchronizer: connected to", self.network.get_parameters()) - - requested_tx = [] - missing_tx = [] - requested_histories = {} - - # request any missing transactions + if addresses: + self.requested_addrs |= addresses + msgs = map(lambda addr: ('blockchain.address.subscribe', [addr]), + addresses) + self.network.send(msgs, self.addr_subscription_response) + + def addr_subscription_response(self, response): + params, result = self.parse_response(response) + if not params: + return + addr = params[0] + if addr in self.requested_addrs: # Notifications won't be in + self.requested_addrs.remove(addr) + history = self.wallet.get_address_history(addr) + if self.wallet.get_status(history) != result: + if self.requested_histories.get(addr) is None: + self.network.send([('blockchain.address.get_history', [addr])], + self.addr_history_response) + self.requested_histories[addr] = result + + def addr_history_response(self, response): + params, result = self.parse_response(response) + if not params: + return + addr = params[0] + self.print_error("receiving history", addr, len(result)) + server_status = self.requested_histories.pop(addr) + + # Check that txids are unique + hashes = set(map(lambda item: item['tx_hash'], result)) + if len(hashes) != len(result): + raise Exception("error: server history has non-unique txids: %s" + % addr) + + # Check that the status corresponds to what was announced + hist = map(lambda item: (item['tx_hash'], item['height']), result) + if self.wallet.get_status(hist) != server_status: + raise Exception("error: status mismatch: %s" % addr) + + # Store received history + self.wallet.receive_history_callback(addr, hist) + + # Request transactions we don't have + self.request_missing_txs(hist) + + def tx_response(self, response): + params, result = self.parse_response(response) + if not params: + return + tx_hash, tx_height = params + assert tx_hash == hash_encode(Hash(result.decode('hex'))) + tx = Transaction(result) + try: + tx.deserialize() + except Exception: + self.print_msg("cannot deserialize transaction, skipping", tx_hash) + return + + self.wallet.receive_tx_callback(tx_hash, tx, tx_height) + self.requested_tx.remove((tx_hash, tx_height)) + self.print_error("received tx:", tx_hash, len(tx.raw)) + if not self.requested_tx: + self.network.trigger_callback('updated') + # Updated gets called too many times from other places as + # well; if we used that signal we get the notification + # three times + self.network.trigger_callback("new_transaction") + + def request_missing_txs(self, hist): + # "hist" is a list of [tx_hash, tx_height] lists + missing = set() + for tx_hash, tx_height in hist: + if self.wallet.transactions.get(tx_hash) is None: + missing.add((tx_hash, tx_height)) + missing -= self.requested_tx + if missing: + requests = [('blockchain.transaction.get', tx) for tx in missing] + self.network.send(requests, self.tx_response) + self.requested_tx |= missing + + def initialize(self): + '''Check the initial state of the wallet. Subscribe to all its + addresses, and request any transactions in its address history + we don't have. + ''' for history in self.wallet.history.values(): - if history == ['*']: continue - for tx_hash, tx_height in history: - if self.wallet.transactions.get(tx_hash) is None and (tx_hash, tx_height) not in missing_tx: - missing_tx.append( (tx_hash, tx_height) ) - - if missing_tx: - self.print_error("missing tx", missing_tx) - - # subscriptions - self.subscribe_to_addresses(self.wallet.addresses(True)) - - while self.is_running(): - - # 1. create new addresses - self.wallet.synchronize() - - # request missing addresses - new_addresses = [] - while True: - try: - addr = self.address_queue.get(block=False) - except Queue.Empty: - break - new_addresses.append(addr) - if new_addresses: - self.subscribe_to_addresses(new_addresses) - - # request missing transactions - for tx_hash, tx_height in missing_tx: - if (tx_hash, tx_height) not in requested_tx: - self.network.send([ ('blockchain.transaction.get',[tx_hash, tx_height]) ], self.queue.put) - requested_tx.append( (tx_hash, tx_height) ) - missing_tx = [] - - # detect if situation has changed - if self.network.is_up_to_date() and self.queue.empty(): - if not self.wallet.is_up_to_date(): - self.wallet.set_up_to_date(True) - self.was_updated = True - self.wallet.save_transactions() - else: - if self.wallet.is_up_to_date(): - self.wallet.set_up_to_date(False) - self.was_updated = True - - if self.was_updated: - self.network.trigger_callback('updated') - self.was_updated = False - - # 2. get a response - try: - r = self.queue.get(timeout=0.1) - except Queue.Empty: - continue - - # 3. process response - method = r['method'] - params = r['params'] - result = r.get('result') - error = r.get('error') - if error: - self.print_error("error", r) + # Old electrum servers returned ['*'] when all history for + # the address was pruned. This no longer happens but may + # remain in old wallets. + if history == ['*']: continue - - if method == 'blockchain.address.subscribe': - addr = params[0] - if self.wallet.get_status(self.wallet.get_address_history(addr)) != result: - if requested_histories.get(addr) is None: - self.network.send([('blockchain.address.get_history', [addr])], self.queue.put) - requested_histories[addr] = result - - elif method == 'blockchain.address.get_history': - addr = params[0] - self.print_error("receiving history", addr, len(result)) - hist = [] - # check that txids are unique - txids = [] - for item in result: - tx_hash = item['tx_hash'] - if tx_hash not in txids: - txids.append(tx_hash) - hist.append( (tx_hash, item['height']) ) - - if len(hist) != len(result): - raise Exception("error: server sent history with non-unique txid", result) - - # check that the status corresponds to what was announced - rs = requested_histories.pop(addr) - if self.wallet.get_status(hist) != rs: - raise Exception("error: status mismatch: %s"%addr) - - # store received history - self.wallet.receive_history_callback(addr, hist) - - # request transactions that we don't have - for tx_hash, tx_height in hist: - if self.wallet.transactions.get(tx_hash) is None: - if (tx_hash, tx_height) not in requested_tx and (tx_hash, tx_height) not in missing_tx: - missing_tx.append( (tx_hash, tx_height) ) - - elif method == 'blockchain.transaction.get': - tx_hash = params[0] - tx_height = params[1] - assert tx_hash == bitcoin.hash_encode(bitcoin.Hash(result.decode('hex'))) - tx = Transaction(result) - try: - tx.deserialize() - except Exception: - self.print_msg("Warning: Cannot deserialize transactions. skipping") - continue - - self.wallet.receive_tx_callback(tx_hash, tx, tx_height) - self.was_updated = True - requested_tx.remove( (tx_hash, tx_height) ) - self.print_error("received tx:", tx_hash, len(tx.raw)) - - else: - self.print_error("Error: Unknown message:" + method + ", " + repr(params) + ", " + repr(result) ) - - if self.was_updated and not requested_tx: - self.network.trigger_callback('updated') - # Updated gets called too many times from other places as well; if we use that signal we get the notification three times - self.network.trigger_callback("new_transaction") - self.was_updated = False + self.request_missing_txs(history) + + if self.requested_tx: + self.print_error("missing tx", self.requested_tx) + self.subscribe_to_addresses(set(self.wallet.addresses(True))) + + def main_loop(self): + '''Called from the network proxy thread main loop.''' + # 1. Create new addresses + self.wallet.synchronize() + + # 2. Subscribe to new addresses + with self.lock: + addresses = self.new_addresses + self.new_addresses = set() + self.subscribe_to_addresses(addresses) + + # 3. Detect if situation has changed + up_to_date = self.is_up_to_date() + if up_to_date != self.wallet.is_up_to_date(): + self.wallet.set_up_to_date(up_to_date) + if up_to_date: + self.wallet.save_transactions() + self.network.trigger_callback('updated') diff --git a/lib/wallet.py b/lib/wallet.py index fc7e66684..783380b3f 100644 --- a/lib/wallet.py +++ b/lib/wallet.py @@ -1105,15 +1105,16 @@ class Abstract_Wallet(object): self.verifier.start() self.set_verifier(self.verifier) self.synchronizer = WalletSynchronizer(self, network) - self.synchronizer.start() + network.jobs.append(self.synchronizer.main_loop) else: self.verifier = None - self.synchronizer =None + self.synchronizer = None def stop_threads(self): if self.network: self.verifier.stop() - self.synchronizer.stop() + self.network.jobs = [] + self.synchronizer = None self.storage.put('stored_height', self.get_local_height(), True) def restore(self, cb):