diff --git a/lib/synchronizer.py b/lib/synchronizer.py new file mode 100644 index 000000000..1a866275b --- /dev/null +++ b/lib/synchronizer.py @@ -0,0 +1,195 @@ +#!/usr/bin/env python +# +# Electrum - lightweight Bitcoin client +# Copyright (C) 2014 Thomas Voegtlin +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + + +import threading +import Queue +import bitcoin +from util import print_error +from transaction import Transaction + + +class WalletSynchronizer(threading.Thread): + + def __init__(self, wallet, network): + threading.Thread.__init__(self) + self.daemon = True + self.wallet = wallet + self.network = network + self.was_updated = True + self.running = False + self.lock = threading.Lock() + self.queue = Queue.Queue() + + def stop(self): + with self.lock: self.running = False + + def is_running(self): + with self.lock: return self.running + + + def subscribe_to_addresses(self, addresses): + messages = [] + for addr in addresses: + messages.append(('blockchain.address.subscribe', [addr])) + self.network.subscribe( messages, lambda i,r: self.queue.put(r)) + + + def run(self): + with self.lock: + self.running = True + + while self.is_running(): + + if not self.network.is_connected(): + self.network.wait_until_connected() + + self.run_interface() + + + def run_interface(self): + + print_error("synchronizer: connected to", self.network.main_server()) + + requested_tx = [] + missing_tx = [] + requested_histories = {} + + # request any missing transactions + for history in self.wallet.history.values(): + if history == ['*']: continue + for tx_hash, tx_height in history: + if self.wallet.transactions.get(tx_hash) is None and (tx_hash, tx_height) not in missing_tx: + missing_tx.append( (tx_hash, tx_height) ) + + if missing_tx: + print_error("missing tx", missing_tx) + + # subscriptions + self.subscribe_to_addresses(self.wallet.addresses(True)) + + while self.is_running(): + # 1. create new addresses + new_addresses = self.wallet.synchronize() + + # request missing addresses + if new_addresses: + self.subscribe_to_addresses(new_addresses) + + # request missing transactions + for tx_hash, tx_height in missing_tx: + if (tx_hash, tx_height) not in requested_tx: + self.network.send([ ('blockchain.transaction.get',[tx_hash, tx_height]) ], lambda i,r: self.queue.put(r)) + requested_tx.append( (tx_hash, tx_height) ) + missing_tx = [] + + # detect if situation has changed + if self.network.is_up_to_date() and self.queue.empty(): + if not self.wallet.is_up_to_date(): + self.wallet.set_up_to_date(True) + self.was_updated = True + else: + if self.wallet.is_up_to_date(): + self.wallet.set_up_to_date(False) + self.was_updated = True + + if self.was_updated: + self.network.trigger_callback('updated') + self.was_updated = False + + # 2. get a response + try: + r = self.queue.get(block=True, timeout=1) + except Queue.Empty: + continue + + # see if it changed + #if interface != self.network.interface: + # break + + if not r: + continue + + # 3. handle response + method = r['method'] + params = r['params'] + result = r.get('result') + error = r.get('error') + if error: + print "error", r + continue + + if method == 'blockchain.address.subscribe': + addr = params[0] + if self.wallet.get_status(self.wallet.get_history(addr)) != result: + if requested_histories.get(addr) is None: + self.network.send([('blockchain.address.get_history', [addr])], lambda i,r:self.queue.put(r)) + requested_histories[addr] = result + + elif method == 'blockchain.address.get_history': + addr = params[0] + print_error("receiving history", addr, result) + if result == ['*']: + assert requested_histories.pop(addr) == '*' + self.wallet.receive_history_callback(addr, result) + else: + hist = [] + # check that txids are unique + txids = [] + for item in result: + tx_hash = item['tx_hash'] + if tx_hash not in txids: + txids.append(tx_hash) + hist.append( (tx_hash, item['height']) ) + + if len(hist) != len(result): + raise Exception("error: server sent history with non-unique txid", result) + + # check that the status corresponds to what was announced + rs = requested_histories.pop(addr) + if self.wallet.get_status(hist) != rs: + raise Exception("error: status mismatch: %s"%addr) + + # store received history + self.wallet.receive_history_callback(addr, hist) + + # request transactions that we don't have + for tx_hash, tx_height in hist: + if self.wallet.transactions.get(tx_hash) is None: + if (tx_hash, tx_height) not in requested_tx and (tx_hash, tx_height) not in missing_tx: + missing_tx.append( (tx_hash, tx_height) ) + + elif method == 'blockchain.transaction.get': + tx_hash = params[0] + tx_height = params[1] + assert tx_hash == bitcoin.hash_encode(bitcoin.Hash(result.decode('hex'))) + tx = Transaction(result) + self.wallet.receive_tx_callback(tx_hash, tx, tx_height) + self.was_updated = True + requested_tx.remove( (tx_hash, tx_height) ) + print_error("received tx:", tx_hash, len(tx.raw)) + + else: + print_error("Error: Unknown message:" + method + ", " + repr(params) + ", " + repr(result) ) + + if self.was_updated and not requested_tx: + self.network.trigger_callback('updated') + # Updated gets called too many times from other places as well; if we use that signal we get the notification three times + self.network.trigger_callback("new_transaction") + self.was_updated = False + diff --git a/lib/wallet.py b/lib/wallet.py index d9e3cd254..2e1ebedeb 100644 --- a/lib/wallet.py +++ b/lib/wallet.py @@ -37,6 +37,7 @@ from account import * from transaction import Transaction from plugins import run_hook import bitcoin +from synchronizer import WalletSynchronizer COINBASE_MATURITY = 100 DUST_THRESHOLD = 5430 @@ -156,17 +157,17 @@ class WalletStorage: -class NewWallet: + + +class Abstract_Wallet: def __init__(self, storage): self.storage = storage self.electrum_version = ELECTRUM_VERSION self.gap_limit_for_change = 3 # constant - # saved fields self.seed_version = storage.get('seed_version', NEW_SEED_VERSION) - self.gap_limit = storage.get('gap_limit', 5) self.use_change = storage.get('use_change',True) self.use_encryption = storage.get('use_encryption', False) @@ -279,35 +280,13 @@ class NewWallet: self.synchronizer.subscribe_to_addresses([address]) return address + def delete_imported_key(self, addr): if addr in self.imported_keys: self.imported_keys.pop(addr) self.storage.put('imported_keys', self.imported_keys, True) - def make_seed(self): - import mnemonic, ecdsa - entropy = ecdsa.util.randrange( pow(2,160) ) - nonce = 0 - while True: - ss = "%040x"%(entropy+nonce) - s = hashlib.sha256(ss.decode('hex')).digest().encode('hex') - # we keep only 13 words, that's approximately 139 bits of entropy - words = mnemonic.mn_encode(s)[0:13] - seed = ' '.join(words) - if is_new_seed(seed): - break # this will remove 8 bits of entropy - nonce += 1 - - return seed - - - def prepare_seed(self, seed): - import unicodedata - return NEW_SEED_VERSION, unicodedata.normalize('NFC', unicode(seed.strip())) - - - def add_seed(self, seed, password): if self.seed: raise Exception("a seed exists") @@ -1483,10 +1462,10 @@ class NewWallet: -class Imported_Wallet(NewWallet): +class Imported_Wallet(Abstract_Wallet): def __init__(self, storage): - NewWallet.__init__(self, storage) + Abstract_Wallet.__init__(self, storage) def is_watching_only(self): n = self.imported_keys.values() @@ -1494,6 +1473,34 @@ class Imported_Wallet(NewWallet): + +class NewWallet(Abstract_Wallet): + """class for BIP32 wallet""" + + def __init__(self, storage): + Abstract_Wallet.__init__(self, storage) + + def make_seed(self): + import mnemonic, ecdsa + entropy = ecdsa.util.randrange( pow(2,160) ) + nonce = 0 + while True: + ss = "%040x"%(entropy+nonce) + s = hashlib.sha256(ss.decode('hex')).digest().encode('hex') + # we keep only 13 words, that's approximately 139 bits of entropy + words = mnemonic.mn_encode(s)[0:13] + seed = ' '.join(words) + if is_new_seed(seed): + break # this will remove 8 bits of entropy + nonce += 1 + return seed + + def prepare_seed(self, seed): + import unicodedata + return NEW_SEED_VERSION, unicodedata.normalize('NFC', unicode(seed.strip())) + + + class Wallet_2of2(NewWallet): def __init__(self, storage): @@ -1527,7 +1534,7 @@ class Wallet_2of2(NewWallet): class Wallet_2of3(Wallet_2of2): def __init__(self, storage): - NewWallet.__init__(self, storage) + Wallet_2of2.__init__(self, storage) self.storage.put('wallet_type', '2of3', True) def create_account(self): @@ -1555,179 +1562,10 @@ class Wallet_2of3(Wallet_2of2): return 'create_2of3_3' -class WalletSynchronizer(threading.Thread): - - def __init__(self, wallet, network): - threading.Thread.__init__(self) - self.daemon = True - self.wallet = wallet - self.network = network - self.was_updated = True - self.running = False - self.lock = threading.Lock() - self.queue = Queue.Queue() - - def stop(self): - with self.lock: self.running = False - - def is_running(self): - with self.lock: return self.running - - - def subscribe_to_addresses(self, addresses): - messages = [] - for addr in addresses: - messages.append(('blockchain.address.subscribe', [addr])) - self.network.subscribe( messages, lambda i,r: self.queue.put(r)) - - - def run(self): - with self.lock: - self.running = True - - while self.is_running(): - - if not self.network.is_connected(): - self.network.wait_until_connected() - - self.run_interface() - - - def run_interface(self): - - print_error("synchronizer: connected to", self.network.main_server()) - - requested_tx = [] - missing_tx = [] - requested_histories = {} - - # request any missing transactions - for history in self.wallet.history.values(): - if history == ['*']: continue - for tx_hash, tx_height in history: - if self.wallet.transactions.get(tx_hash) is None and (tx_hash, tx_height) not in missing_tx: - missing_tx.append( (tx_hash, tx_height) ) - - if missing_tx: - print_error("missing tx", missing_tx) - - # subscriptions - self.subscribe_to_addresses(self.wallet.addresses(True)) - - while self.is_running(): - # 1. create new addresses - new_addresses = self.wallet.synchronize() - - # request missing addresses - if new_addresses: - self.subscribe_to_addresses(new_addresses) - - # request missing transactions - for tx_hash, tx_height in missing_tx: - if (tx_hash, tx_height) not in requested_tx: - self.network.send([ ('blockchain.transaction.get',[tx_hash, tx_height]) ], lambda i,r: self.queue.put(r)) - requested_tx.append( (tx_hash, tx_height) ) - missing_tx = [] - - # detect if situation has changed - if self.network.is_up_to_date() and self.queue.empty(): - if not self.wallet.is_up_to_date(): - self.wallet.set_up_to_date(True) - self.was_updated = True - else: - if self.wallet.is_up_to_date(): - self.wallet.set_up_to_date(False) - self.was_updated = True - - if self.was_updated: - self.network.trigger_callback('updated') - self.was_updated = False - - # 2. get a response - try: - r = self.queue.get(block=True, timeout=1) - except Queue.Empty: - continue - - # see if it changed - #if interface != self.network.interface: - # break - - if not r: - continue - - # 3. handle response - method = r['method'] - params = r['params'] - result = r.get('result') - error = r.get('error') - if error: - print "error", r - continue - - if method == 'blockchain.address.subscribe': - addr = params[0] - if self.wallet.get_status(self.wallet.get_history(addr)) != result: - if requested_histories.get(addr) is None: - self.network.send([('blockchain.address.get_history', [addr])], lambda i,r:self.queue.put(r)) - requested_histories[addr] = result - - elif method == 'blockchain.address.get_history': - addr = params[0] - print_error("receiving history", addr, result) - if result == ['*']: - assert requested_histories.pop(addr) == '*' - self.wallet.receive_history_callback(addr, result) - else: - hist = [] - # check that txids are unique - txids = [] - for item in result: - tx_hash = item['tx_hash'] - if tx_hash not in txids: - txids.append(tx_hash) - hist.append( (tx_hash, item['height']) ) - - if len(hist) != len(result): - raise Exception("error: server sent history with non-unique txid", result) - - # check that the status corresponds to what was announced - rs = requested_histories.pop(addr) - if self.wallet.get_status(hist) != rs: - raise Exception("error: status mismatch: %s"%addr) - - # store received history - self.wallet.receive_history_callback(addr, hist) - - # request transactions that we don't have - for tx_hash, tx_height in hist: - if self.wallet.transactions.get(tx_hash) is None: - if (tx_hash, tx_height) not in requested_tx and (tx_hash, tx_height) not in missing_tx: - missing_tx.append( (tx_hash, tx_height) ) - - elif method == 'blockchain.transaction.get': - tx_hash = params[0] - tx_height = params[1] - assert tx_hash == hash_encode(Hash(result.decode('hex'))) - tx = Transaction(result) - self.wallet.receive_tx_callback(tx_hash, tx, tx_height) - self.was_updated = True - requested_tx.remove( (tx_hash, tx_height) ) - print_error("received tx:", tx_hash, len(tx.raw)) - - else: - print_error("Error: Unknown message:" + method + ", " + repr(params) + ", " + repr(result) ) - - if self.was_updated and not requested_tx: - self.network.trigger_callback('updated') - # Updated gets called too many times from other places as well; if we use that signal we get the notification three times - self.network.trigger_callback("new_transaction") - self.was_updated = False - -class OldWallet(NewWallet): +class OldWallet(Abstract_Wallet): def can_create_accounts(self): return False diff --git a/setup.py b/setup.py index f4ebf5c99..174cae201 100644 --- a/setup.py +++ b/setup.py @@ -80,6 +80,7 @@ setup( 'electrum.pyqrnative', 'electrum.simple_config', 'electrum.socks', + 'electrum.synchronizer', 'electrum.transaction', 'electrum.util', 'electrum.verifier',