Browse Source

Merge branch 'synchronizer-unthread' of https://github.com/kyuupichan/electrum into kyuupichan-synchronizer-unthread

283
ThomasV 10 years ago
parent
commit
ccd07c6a80
  1. 3
      lib/network_proxy.py
  2. 277
      lib/synchronizer.py
  3. 5
      lib/wallet.py

3
lib/network_proxy.py

@ -61,10 +61,13 @@ class NetworkProxy(util.DaemonThread):
self.blockchain_height = 0 self.blockchain_height = 0
self.server_height = 0 self.server_height = 0
self.interfaces = [] self.interfaces = []
self.jobs = []
def run(self): def run(self):
while self.is_running(): while self.is_running():
for job in self.jobs:
job()
try: try:
response = self.pipe.get() response = self.pipe.get()
except util.timeout: except util.timeout:

277
lib/synchronizer.py

@ -17,174 +17,169 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
import threading from threading import Lock
import time
import Queue
import bitcoin from bitcoin import Hash, hash_encode
import util
from transaction import Transaction from transaction import Transaction
from util import print_error, print_msg
class WalletSynchronizer(util.DaemonThread): class WalletSynchronizer():
'''The synchronizer keeps the wallet up-to-date with its set of
addresses and their transactions. It subscribes over the network
to wallet addresses, gets the wallet to generate new addresses
when necessary, requests the transaction history of any addresses
we don't have the full history of, and requests binary transaction
data of any transactions the wallet doesn't have.
External interface: __init__() and add() member functions.
'''
def __init__(self, wallet, network): def __init__(self, wallet, network):
util.DaemonThread.__init__(self)
self.wallet = wallet self.wallet = wallet
self.network = network self.network = network
self.was_updated = True self.new_addresses = set()
self.queue = Queue.Queue() # Entries are (tx_hash, tx_height) tuples
self.address_queue = Queue.Queue() self.requested_tx = set()
self.requested_histories = {}
self.requested_addrs = set()
self.lock = Lock()
self.initialize()
def print_error(self, *msg):
print_error("[Synchronizer]", *msg)
def print_msg(self, *msg):
print_msg("[Synchronizer]", *msg)
def parse_response(self, response):
if response.get('error'):
self.print_error("response error:", response)
return None, None
return response['params'], response['result']
def is_up_to_date(self):
return (not self.requested_tx and not self.requested_histories
and not self.requested_addrs)
def add(self, address): def add(self, address):
self.address_queue.put(address) '''This can be called from the proxy or GUI threads.'''
with self.lock:
self.new_addresses.add(address)
def subscribe_to_addresses(self, addresses): def subscribe_to_addresses(self, addresses):
messages = [] if addresses:
for addr in addresses: self.requested_addrs |= addresses
messages.append(('blockchain.address.subscribe', [addr])) msgs = map(lambda addr: ('blockchain.address.subscribe', [addr]),
self.network.send(messages, self.queue.put) addresses)
self.network.send(msgs, self.addr_subscription_response)
def run(self):
while self.is_running(): def addr_subscription_response(self, response):
if not self.network.is_connected(): params, result = self.parse_response(response)
time.sleep(0.1) if not params:
continue return
self.run_interface()
self.print_error("stopped")
def run_interface(self):
#print_error("synchronizer: connected to", self.network.get_parameters())
requested_tx = []
missing_tx = []
requested_histories = {}
# request any missing transactions
for history in self.wallet.history.values():
if history == ['*']: continue
for tx_hash, tx_height in history:
if self.wallet.transactions.get(tx_hash) is None and (tx_hash, tx_height) not in missing_tx:
missing_tx.append( (tx_hash, tx_height) )
if missing_tx:
self.print_error("missing tx", missing_tx)
# subscriptions
self.subscribe_to_addresses(self.wallet.addresses(True))
while self.is_running():
# 1. create new addresses
self.wallet.synchronize()
# request missing addresses
new_addresses = []
while True:
try:
addr = self.address_queue.get(block=False)
except Queue.Empty:
break
new_addresses.append(addr)
if new_addresses:
self.subscribe_to_addresses(new_addresses)
# request missing transactions
for tx_hash, tx_height in missing_tx:
if (tx_hash, tx_height) not in requested_tx:
self.network.send([ ('blockchain.transaction.get',[tx_hash, tx_height]) ], self.queue.put)
requested_tx.append( (tx_hash, tx_height) )
missing_tx = []
# detect if situation has changed
if self.network.is_up_to_date() and self.queue.empty():
if not self.wallet.is_up_to_date():
self.wallet.set_up_to_date(True)
self.was_updated = True
self.wallet.save_transactions()
else:
if self.wallet.is_up_to_date():
self.wallet.set_up_to_date(False)
self.was_updated = True
if self.was_updated:
self.network.trigger_callback('updated')
self.was_updated = False
# 2. get a response
try:
r = self.queue.get(timeout=0.1)
except Queue.Empty:
continue
# 3. process response
method = r['method']
params = r['params']
result = r.get('result')
error = r.get('error')
if error:
self.print_error("error", r)
continue
if method == 'blockchain.address.subscribe':
addr = params[0] addr = params[0]
if self.wallet.get_status(self.wallet.get_address_history(addr)) != result: if addr in self.requested_addrs: # Notifications won't be in
if requested_histories.get(addr) is None: self.requested_addrs.remove(addr)
self.network.send([('blockchain.address.get_history', [addr])], self.queue.put) history = self.wallet.get_address_history(addr)
requested_histories[addr] = result if self.wallet.get_status(history) != result:
if self.requested_histories.get(addr) is None:
elif method == 'blockchain.address.get_history': self.network.send([('blockchain.address.get_history', [addr])],
self.addr_history_response)
self.requested_histories[addr] = result
def addr_history_response(self, response):
params, result = self.parse_response(response)
if not params:
return
addr = params[0] addr = params[0]
self.print_error("receiving history", addr, len(result)) self.print_error("receiving history", addr, len(result))
hist = [] server_status = self.requested_histories.pop(addr)
# check that txids are unique
txids = []
for item in result:
tx_hash = item['tx_hash']
if tx_hash not in txids:
txids.append(tx_hash)
hist.append( (tx_hash, item['height']) )
if len(hist) != len(result):
self.print_error("error: server sent history with non-unique txid", result)
continue
# check that the status corresponds to what was announced # Check that txids are unique
rs = requested_histories.pop(addr) hashes = set(map(lambda item: item['tx_hash'], result))
if self.wallet.get_status(hist) != rs: if len(hashes) != len(result):
self.print_error("error: status mismatch: %s" % addr) raise Exception("error: server history has non-unique txids: %s"
continue % addr)
# store received history # Check that the status corresponds to what was announced
hist = map(lambda item: (item['tx_hash'], item['height']), result)
if self.wallet.get_status(hist) != server_status:
raise Exception("error: status mismatch: %s" % addr)
# Store received history
self.wallet.receive_history_callback(addr, hist) self.wallet.receive_history_callback(addr, hist)
# request transactions that we don't have # Request transactions we don't have
for tx_hash, tx_height in hist: self.request_missing_txs(hist)
if self.wallet.transactions.get(tx_hash) is None:
if (tx_hash, tx_height) not in requested_tx and (tx_hash, tx_height) not in missing_tx:
missing_tx.append( (tx_hash, tx_height) )
elif method == 'blockchain.transaction.get': def tx_response(self, response):
tx_hash = params[0] params, result = self.parse_response(response)
tx_height = params[1] if not params:
assert tx_hash == bitcoin.hash_encode(bitcoin.Hash(result.decode('hex'))) return
tx_hash, tx_height = params
assert tx_hash == hash_encode(Hash(result.decode('hex')))
tx = Transaction(result) tx = Transaction(result)
try: try:
tx.deserialize() tx.deserialize()
except Exception: except Exception:
self.print_msg("Warning: Cannot deserialize transactions. skipping") self.print_msg("cannot deserialize transaction, skipping", tx_hash)
continue return
self.wallet.receive_tx_callback(tx_hash, tx, tx_height) self.wallet.receive_tx_callback(tx_hash, tx, tx_height)
self.was_updated = True self.requested_tx.remove((tx_hash, tx_height))
requested_tx.remove( (tx_hash, tx_height) )
self.print_error("received tx:", tx_hash, len(tx.raw)) self.print_error("received tx:", tx_hash, len(tx.raw))
if not self.requested_tx:
self.network.trigger_callback('updated')
# Updated gets called too many times from other places as
# well; if we used that signal we get the notification
# three times
self.network.trigger_callback("new_transaction")
def request_missing_txs(self, hist):
# "hist" is a list of [tx_hash, tx_height] lists
missing = set()
for tx_hash, tx_height in hist:
if self.wallet.transactions.get(tx_hash) is None:
missing.add((tx_hash, tx_height))
missing -= self.requested_tx
if missing:
requests = [('blockchain.transaction.get', tx) for tx in missing]
self.network.send(requests, self.tx_response)
self.requested_tx |= missing
def initialize(self):
'''Check the initial state of the wallet. Subscribe to all its
addresses, and request any transactions in its address history
we don't have.
'''
for history in self.wallet.history.values():
# Old electrum servers returned ['*'] when all history for
# the address was pruned. This no longer happens but may
# remain in old wallets.
if history == ['*']:
continue
self.request_missing_txs(history)
if self.requested_tx:
self.print_error("missing tx", self.requested_tx)
self.subscribe_to_addresses(set(self.wallet.addresses(True)))
else: def main_loop(self):
self.print_error("Error: Unknown message:" + method + ", " + repr(params) + ", " + repr(result) ) '''Called from the network proxy thread main loop.'''
# 1. Create new addresses
self.wallet.synchronize()
if self.was_updated and not requested_tx: # 2. Subscribe to new addresses
with self.lock:
addresses = self.new_addresses
self.new_addresses = set()
self.subscribe_to_addresses(addresses)
# 3. Detect if situation has changed
up_to_date = self.is_up_to_date()
if up_to_date != self.wallet.is_up_to_date():
self.wallet.set_up_to_date(up_to_date)
if up_to_date:
self.wallet.save_transactions()
self.network.trigger_callback('updated') self.network.trigger_callback('updated')
# Updated gets called too many times from other places as well; if we use that signal we get the notification three times
self.network.trigger_callback("new_transaction")
self.was_updated = False

5
lib/wallet.py

@ -1107,7 +1107,7 @@ class Abstract_Wallet(object):
self.verifier.start() self.verifier.start()
self.set_verifier(self.verifier) self.set_verifier(self.verifier)
self.synchronizer = WalletSynchronizer(self, network) self.synchronizer = WalletSynchronizer(self, network)
self.synchronizer.start() network.jobs.append(self.synchronizer.main_loop)
else: else:
self.verifier = None self.verifier = None
self.synchronizer = None self.synchronizer = None
@ -1115,7 +1115,8 @@ class Abstract_Wallet(object):
def stop_threads(self): def stop_threads(self):
if self.network: if self.network:
self.verifier.stop() self.verifier.stop()
self.synchronizer.stop() self.network.jobs = []
self.synchronizer = None
self.storage.put('stored_height', self.get_local_height(), True) self.storage.put('stored_height', self.get_local_height(), True)
def restore(self, cb): def restore(self, cb):

Loading…
Cancel
Save