diff --git a/electrum/__init__.py b/electrum/__init__.py index d8df161f1..48a60c15c 100644 --- a/electrum/__init__.py +++ b/electrum/__init__.py @@ -1,6 +1,6 @@ from .version import ELECTRUM_VERSION from .util import format_satoshis, print_msg, print_error, set_verbosity -from .wallet import Synchronizer, Wallet +from .wallet import Wallet from .storage import WalletStorage from .coinchooser import COIN_CHOOSERS from .network import Network, pick_random_server diff --git a/electrum/address_synchronizer.py b/electrum/address_synchronizer.py new file mode 100644 index 000000000..bbc8b0b2e --- /dev/null +++ b/electrum/address_synchronizer.py @@ -0,0 +1,494 @@ +# Electrum - lightweight Bitcoin client +# Copyright (C) 2018 The Electrum Developers +# +# Permission is hereby granted, free of charge, to any person +# obtaining a copy of this software and associated documentation files +# (the "Software"), to deal in the Software without restriction, +# including without limitation the rights to use, copy, modify, merge, +# publish, distribute, sublicense, and/or sell copies of the Software, +# and to permit persons to whom the Software is furnished to do so, +# subject to the following conditions: +# +# The above copyright notice and this permission notice shall be +# included in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS +# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN +# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +import threading +import itertools +from collections import defaultdict + +from .util import PrintError, profiler +from .transaction import Transaction +from .synchronizer import Synchronizer +from .verifier import SPV + +TX_HEIGHT_LOCAL = -2 +TX_HEIGHT_UNCONF_PARENT = -1 +TX_HEIGHT_UNCONFIRMED = 0 + +class AddTransactionException(Exception): + pass + + +class UnrelatedTransactionException(AddTransactionException): + def __str__(self): + return _("Transaction is unrelated to this wallet.") + +class AddressSynchronizer(PrintError): + """ + inherited by wallet + """ + + def __init__(self, storage): + self.storage = storage + self.network = None + # verifier (SPV) and synchronizer are started in start_threads + self.synchronizer = None + self.verifier = None + # locks: if you need to take multiple ones, acquire them in the order they are defined here! + self.lock = threading.RLock() + self.transaction_lock = threading.RLock() + # address -> list(txid, height) + self.history = storage.get('addr_history',{}) + # Verified transactions. txid -> (height, timestamp, block_pos). Access with self.lock. + self.verified_tx = storage.get('verified_tx3', {}) + # Transactions pending verification. txid -> tx_height. Access with self.lock. + self.unverified_tx = defaultdict(int) + # true when synchronized + self.up_to_date = False + self.load_transactions() + self.load_local_history() + self.load_unverified_transactions() + self.remove_local_transactions_we_dont_have() + + def load_unverified_transactions(self): + # review transactions that are in the history + for addr, hist in self.history.items(): + for tx_hash, tx_height in hist: + # add it in case it was previously unconfirmed + self.add_unverified_tx(tx_hash, tx_height) + + def start_threads(self, network): + self.network = network + if self.network is not None: + self.verifier = SPV(self.network, self) + self.synchronizer = Synchronizer(self, network) + network.add_jobs([self.verifier, self.synchronizer]) + else: + self.verifier = None + self.synchronizer = None + + def stop_threads(self): + if self.network: + self.network.remove_jobs([self.synchronizer, self.verifier]) + self.synchronizer.release() + self.synchronizer = None + self.verifier = None + # Now no references to the synchronizer or verifier + # remain so they will be GC-ed + self.storage.put('stored_height', self.get_local_height()) + self.save_transactions() + self.save_verified_tx() + self.storage.write() + + def add_address(self, address): + if address not in self.history: + self.history[address] = [] + self.set_up_to_date(False) + if self.synchronizer: + self.synchronizer.add(address) + + def get_conflicting_transactions(self, tx): + """Returns a set of transaction hashes from the wallet history that are + directly conflicting with tx, i.e. they have common outpoints being + spent with tx. If the tx is already in wallet history, that will not be + reported as a conflict. + """ + conflicting_txns = set() + with self.transaction_lock: + for txin in tx.inputs(): + if txin['type'] == 'coinbase': + continue + prevout_hash = txin['prevout_hash'] + prevout_n = txin['prevout_n'] + spending_tx_hash = self.spent_outpoints[prevout_hash].get(prevout_n) + if spending_tx_hash is None: + continue + # this outpoint has already been spent, by spending_tx + assert spending_tx_hash in self.transactions + conflicting_txns |= {spending_tx_hash} + txid = tx.txid() + if txid in conflicting_txns: + # this tx is already in history, so it conflicts with itself + if len(conflicting_txns) > 1: + raise Exception('Found conflicting transactions already in wallet history.') + conflicting_txns -= {txid} + return conflicting_txns + + def add_transaction(self, tx_hash, tx, allow_unrelated=False): + assert tx_hash, tx_hash + assert tx, tx + assert tx.is_complete() + # we need self.transaction_lock but get_tx_height will take self.lock + # so we need to take that too here, to enforce order of locks + with self.lock, self.transaction_lock: + # NOTE: returning if tx in self.transactions might seem like a good idea + # BUT we track is_mine inputs in a txn, and during subsequent calls + # of add_transaction tx, we might learn of more-and-more inputs of + # being is_mine, as we roll the gap_limit forward + is_coinbase = tx.inputs()[0]['type'] == 'coinbase' + tx_height = self.get_tx_height(tx_hash)[0] + if not allow_unrelated: + # note that during sync, if the transactions are not properly sorted, + # it could happen that we think tx is unrelated but actually one of the inputs is is_mine. + # this is the main motivation for allow_unrelated + is_mine = any([self.is_mine(self.get_txin_address(txin)) for txin in tx.inputs()]) + is_for_me = any([self.is_mine(self.get_txout_address(txo)) for txo in tx.outputs()]) + if not is_mine and not is_for_me: + raise UnrelatedTransactionException() + # Find all conflicting transactions. + # In case of a conflict, + # 1. confirmed > mempool > local + # 2. this new txn has priority over existing ones + # When this method exits, there must NOT be any conflict, so + # either keep this txn and remove all conflicting (along with dependencies) + # or drop this txn + conflicting_txns = self.get_conflicting_transactions(tx) + if conflicting_txns: + existing_mempool_txn = any( + self.get_tx_height(tx_hash2)[0] in (TX_HEIGHT_UNCONFIRMED, TX_HEIGHT_UNCONF_PARENT) + for tx_hash2 in conflicting_txns) + existing_confirmed_txn = any( + self.get_tx_height(tx_hash2)[0] > 0 + for tx_hash2 in conflicting_txns) + if existing_confirmed_txn and tx_height <= 0: + # this is a non-confirmed tx that conflicts with confirmed txns; drop. + return False + if existing_mempool_txn and tx_height == TX_HEIGHT_LOCAL: + # this is a local tx that conflicts with non-local txns; drop. + return False + # keep this txn and remove all conflicting + to_remove = set() + to_remove |= conflicting_txns + for conflicting_tx_hash in conflicting_txns: + to_remove |= self.get_depending_transactions(conflicting_tx_hash) + for tx_hash2 in to_remove: + self.remove_transaction(tx_hash2) + # add inputs + def add_value_from_prev_output(): + dd = self.txo.get(prevout_hash, {}) + # note: this nested loop takes linear time in num is_mine outputs of prev_tx + for addr, outputs in dd.items(): + # note: instead of [(n, v, is_cb), ...]; we could store: {n -> (v, is_cb)} + for n, v, is_cb in outputs: + if n == prevout_n: + if addr and self.is_mine(addr): + if d.get(addr) is None: + d[addr] = set() + d[addr].add((ser, v)) + return + self.txi[tx_hash] = d = {} + for txi in tx.inputs(): + if txi['type'] == 'coinbase': + continue + prevout_hash = txi['prevout_hash'] + prevout_n = txi['prevout_n'] + ser = prevout_hash + ':%d' % prevout_n + self.spent_outpoints[prevout_hash][prevout_n] = tx_hash + add_value_from_prev_output() + # add outputs + self.txo[tx_hash] = d = {} + for n, txo in enumerate(tx.outputs()): + v = txo[2] + ser = tx_hash + ':%d'%n + addr = self.get_txout_address(txo) + if addr and self.is_mine(addr): + if d.get(addr) is None: + d[addr] = [] + d[addr].append((n, v, is_coinbase)) + # give v to txi that spends me + next_tx = self.spent_outpoints[tx_hash].get(n) + if next_tx is not None: + dd = self.txi.get(next_tx, {}) + if dd.get(addr) is None: + dd[addr] = set() + if (ser, v) not in dd[addr]: + dd[addr].add((ser, v)) + self._add_tx_to_local_history(next_tx) + # add to local history + self._add_tx_to_local_history(tx_hash) + # save + self.transactions[tx_hash] = tx + return True + + def remove_transaction(self, tx_hash): + def remove_from_spent_outpoints(): + # undo spends in spent_outpoints + if tx is not None: # if we have the tx, this branch is faster + for txin in tx.inputs(): + if txin['type'] == 'coinbase': + continue + prevout_hash = txin['prevout_hash'] + prevout_n = txin['prevout_n'] + self.spent_outpoints[prevout_hash].pop(prevout_n, None) + if not self.spent_outpoints[prevout_hash]: + self.spent_outpoints.pop(prevout_hash) + else: # expensive but always works + for prevout_hash, d in list(self.spent_outpoints.items()): + for prevout_n, spending_txid in d.items(): + if spending_txid == tx_hash: + self.spent_outpoints[prevout_hash].pop(prevout_n, None) + if not self.spent_outpoints[prevout_hash]: + self.spent_outpoints.pop(prevout_hash) + # Remove this tx itself; if nothing spends from it. + # It is not so clear what to do if other txns spend from it, but it will be + # removed when those other txns are removed. + if not self.spent_outpoints[tx_hash]: + self.spent_outpoints.pop(tx_hash) + + with self.transaction_lock: + self.print_error("removing tx from history", tx_hash) + tx = self.transactions.pop(tx_hash, None) + remove_from_spent_outpoints() + self._remove_tx_from_local_history(tx_hash) + self.txi.pop(tx_hash, None) + self.txo.pop(tx_hash, None) + + def receive_tx_callback(self, tx_hash, tx, tx_height): + self.add_unverified_tx(tx_hash, tx_height) + self.add_transaction(tx_hash, tx, allow_unrelated=True) + + def receive_history_callback(self, addr, hist, tx_fees): + with self.lock: + old_hist = self.get_address_history(addr) + for tx_hash, height in old_hist: + if (tx_hash, height) not in hist: + # make tx local + self.unverified_tx.pop(tx_hash, None) + self.verified_tx.pop(tx_hash, None) + if self.verifier: + self.verifier.remove_spv_proof_for_tx(tx_hash) + self.history[addr] = hist + + for tx_hash, tx_height in hist: + # add it in case it was previously unconfirmed + self.add_unverified_tx(tx_hash, tx_height) + # if addr is new, we have to recompute txi and txo + tx = self.transactions.get(tx_hash) + if tx is None: + continue + self.add_transaction(tx_hash, tx, allow_unrelated=True) + + # Store fees + self.tx_fees.update(tx_fees) + + @profiler + def load_transactions(self): + # load txi, txo, tx_fees + self.txi = self.storage.get('txi', {}) + for txid, d in list(self.txi.items()): + for addr, lst in d.items(): + self.txi[txid][addr] = set([tuple(x) for x in lst]) + self.txo = self.storage.get('txo', {}) + self.tx_fees = self.storage.get('tx_fees', {}) + tx_list = self.storage.get('transactions', {}) + # load transactions + self.transactions = {} + for tx_hash, raw in tx_list.items(): + tx = Transaction(raw) + self.transactions[tx_hash] = tx + if self.txi.get(tx_hash) is None and self.txo.get(tx_hash) is None: + self.print_error("removing unreferenced tx", tx_hash) + self.transactions.pop(tx_hash) + # load spent_outpoints + _spent_outpoints = self.storage.get('spent_outpoints', {}) + self.spent_outpoints = defaultdict(dict) + for prevout_hash, d in _spent_outpoints.items(): + for prevout_n_str, spending_txid in d.items(): + prevout_n = int(prevout_n_str) + self.spent_outpoints[prevout_hash][prevout_n] = spending_txid + + @profiler + def load_local_history(self): + self._history_local = {} # address -> set(txid) + for txid in itertools.chain(self.txi, self.txo): + self._add_tx_to_local_history(txid) + + def remove_local_transactions_we_dont_have(self): + txid_set = set(self.txi) | set(self.txo) + for txid in txid_set: + tx_height = self.get_tx_height(txid)[0] + if tx_height == TX_HEIGHT_LOCAL and txid not in self.transactions: + self.remove_transaction(txid) + + @profiler + def save_transactions(self, write=False): + with self.transaction_lock: + tx = {} + for k,v in self.transactions.items(): + tx[k] = str(v) + self.storage.put('transactions', tx) + self.storage.put('txi', self.txi) + self.storage.put('txo', self.txo) + self.storage.put('tx_fees', self.tx_fees) + self.storage.put('addr_history', self.history) + self.storage.put('spent_outpoints', self.spent_outpoints) + if write: + self.storage.write() + + def save_verified_tx(self, write=False): + with self.lock: + self.storage.put('verified_tx3', self.verified_tx) + if write: + self.storage.write() + + def clear_history(self): + with self.lock: + with self.transaction_lock: + self.txi = {} + self.txo = {} + self.tx_fees = {} + self.spent_outpoints = defaultdict(dict) + self.history = {} + self.verified_tx = {} + self.transactions = {} + self.save_transactions() + + def get_history(self, domain=None): + # get domain + if domain is None: + domain = self.get_addresses() + domain = set(domain) + # 1. Get the history of each address in the domain, maintain the + # delta of a tx as the sum of its deltas on domain addresses + tx_deltas = defaultdict(int) + for addr in domain: + h = self.get_address_history(addr) + for tx_hash, height in h: + delta = self.get_tx_delta(tx_hash, addr) + if delta is None or tx_deltas[tx_hash] is None: + tx_deltas[tx_hash] = None + else: + tx_deltas[tx_hash] += delta + # 2. create sorted history + history = [] + for tx_hash in tx_deltas: + delta = tx_deltas[tx_hash] + height, conf, timestamp = self.get_tx_height(tx_hash) + history.append((tx_hash, height, conf, timestamp, delta)) + history.sort(key = lambda x: self.get_txpos(x[0])) + history.reverse() + # 3. add balance + c, u, x = self.get_balance(domain) + balance = c + u + x + h2 = [] + for tx_hash, height, conf, timestamp, delta in history: + h2.append((tx_hash, height, conf, timestamp, delta, balance)) + if balance is None or delta is None: + balance = None + else: + balance -= delta + h2.reverse() + # fixme: this may happen if history is incomplete + if balance not in [None, 0]: + self.print_error("Error: history not synchronized") + return [] + + return h2 + + def _add_tx_to_local_history(self, txid): + with self.transaction_lock: + for addr in itertools.chain(self.txi.get(txid, []), self.txo.get(txid, [])): + cur_hist = self._history_local.get(addr, set()) + cur_hist.add(txid) + self._history_local[addr] = cur_hist + + def _remove_tx_from_local_history(self, txid): + with self.transaction_lock: + for addr in itertools.chain(self.txi.get(txid, []), self.txo.get(txid, [])): + cur_hist = self._history_local.get(addr, set()) + try: + cur_hist.remove(txid) + except KeyError: + pass + else: + self._history_local[addr] = cur_hist + + def add_unverified_tx(self, tx_hash, tx_height): + if tx_height in (TX_HEIGHT_UNCONFIRMED, TX_HEIGHT_UNCONF_PARENT) \ + and tx_hash in self.verified_tx: + with self.lock: + self.verified_tx.pop(tx_hash) + if self.verifier: + self.verifier.remove_spv_proof_for_tx(tx_hash) + + # tx will be verified only if height > 0 + if tx_hash not in self.verified_tx: + with self.lock: + self.unverified_tx[tx_hash] = tx_height + + def add_verified_tx(self, tx_hash, info): + # Remove from the unverified map and add to the verified map + with self.lock: + self.unverified_tx.pop(tx_hash, None) + self.verified_tx[tx_hash] = info # (tx_height, timestamp, pos) + height, conf, timestamp = self.get_tx_height(tx_hash) + self.network.trigger_callback('verified', tx_hash, height, conf, timestamp) + + def get_unverified_txs(self): + '''Returns a map from tx hash to transaction height''' + with self.lock: + return dict(self.unverified_tx) # copy + + def undo_verifications(self, blockchain, height): + '''Used by the verifier when a reorg has happened''' + txs = set() + with self.lock: + for tx_hash, item in list(self.verified_tx.items()): + tx_height, timestamp, pos = item + if tx_height >= height: + header = blockchain.read_header(tx_height) + # fixme: use block hash, not timestamp + if not header or header.get('timestamp') != timestamp: + self.verified_tx.pop(tx_hash, None) + txs.add(tx_hash) + return txs + + def get_local_height(self): + """ return last known height if we are offline """ + return self.network.get_local_height() if self.network else self.storage.get('stored_height', 0) + + def get_tx_height(self, tx_hash): + """ Given a transaction, returns (height, conf, timestamp) """ + with self.lock: + if tx_hash in self.verified_tx: + height, timestamp, pos = self.verified_tx[tx_hash] + conf = max(self.get_local_height() - height + 1, 0) + return height, conf, timestamp + elif tx_hash in self.unverified_tx: + height = self.unverified_tx[tx_hash] + return height, 0, None + else: + # local transaction + return TX_HEIGHT_LOCAL, 0, None + + def set_up_to_date(self, up_to_date): + with self.lock: + self.up_to_date = up_to_date + if up_to_date: + self.save_transactions(write=True) + # if the verifier is also up to date, persist that too; + # otherwise it will persist its results when it finishes + if self.verifier and self.verifier.is_up_to_date(): + self.save_verified_tx(write=True) + + def is_up_to_date(self): + with self.lock: return self.up_to_date diff --git a/electrum/gui/qt/history_list.py b/electrum/gui/qt/history_list.py index 83ab3fdb0..8f1a5ab31 100644 --- a/electrum/gui/qt/history_list.py +++ b/electrum/gui/qt/history_list.py @@ -26,7 +26,7 @@ import webbrowser import datetime -from electrum.wallet import AddTransactionException, TX_HEIGHT_LOCAL +from electrum.address_synchronizer import TX_HEIGHT_LOCAL from .util import * from electrum.i18n import _ from electrum.util import block_explorer_URL, profiler, print_error diff --git a/electrum/gui/qt/main_window.py b/electrum/gui/qt/main_window.py index ac2c52825..1cdae70dc 100644 --- a/electrum/gui/qt/main_window.py +++ b/electrum/gui/qt/main_window.py @@ -51,7 +51,8 @@ from electrum.util import (format_time, format_satoshis, format_fee_satoshis, base_units, base_units_list, base_unit_name_to_decimal_point, decimal_point_to_base_unit_name, quantize_feerate) from electrum.transaction import Transaction -from electrum.wallet import Multisig_Wallet, AddTransactionException, CannotBumpFee +from electrum.address_synchronizer import AddTransactionException +from electrum.wallet import Multisig_Wallet, CannotBumpFee from .amountedit import AmountEdit, BTCAmountEdit, MyLineEdit, FeerateEdit from .qrcodewidget import QRCodeWidget, QRDialog diff --git a/electrum/gui/qt/transaction_dialog.py b/electrum/gui/qt/transaction_dialog.py index fe7624dad..66d47f6f7 100644 --- a/electrum/gui/qt/transaction_dialog.py +++ b/electrum/gui/qt/transaction_dialog.py @@ -37,7 +37,6 @@ from electrum.plugin import run_hook from electrum import simple_config from electrum.util import bfh -from electrum.wallet import AddTransactionException from electrum.transaction import SerializationError from .util import * diff --git a/electrum/tests/test_wallet_vertical.py b/electrum/tests/test_wallet_vertical.py index 96d42dcc2..c95d98313 100644 --- a/electrum/tests/test_wallet_vertical.py +++ b/electrum/tests/test_wallet_vertical.py @@ -7,7 +7,8 @@ from typing import Sequence from electrum import storage, bitcoin, keystore, constants from electrum import Transaction from electrum import SimpleConfig -from electrum.wallet import TX_HEIGHT_UNCONFIRMED, TX_HEIGHT_UNCONF_PARENT, sweep, Multisig_Wallet, Standard_Wallet, Imported_Wallet +from electrum.address_synchronizer import TX_HEIGHT_UNCONFIRMED, TX_HEIGHT_UNCONF_PARENT +from electrum.wallet import sweep, Multisig_Wallet, Standard_Wallet, Imported_Wallet from electrum.util import bfh, bh2u from electrum.plugins.trustedcoin import trustedcoin diff --git a/electrum/wallet.py b/electrum/wallet.py index 416deffdb..b207a4d60 100644 --- a/electrum/wallet.py +++ b/electrum/wallet.py @@ -28,7 +28,7 @@ import os -import threading +import sys import random import time import json @@ -36,12 +36,8 @@ import copy import errno import traceback from functools import partial -from collections import defaultdict from numbers import Number from decimal import Decimal -import itertools - -import sys from .i18n import _ from .util import (NotEnoughFunds, PrintError, UserCancelled, profiler, @@ -57,8 +53,7 @@ from .storage import multisig_type, STO_EV_PLAINTEXT, STO_EV_USER_PW, STO_EV_XPU from . import transaction, bitcoin, coinchooser, paymentrequest, contacts from .transaction import Transaction from .plugin import run_hook -from .synchronizer import Synchronizer -from .verifier import SPV +from .address_synchronizer import AddressSynchronizer from .paymentrequest import PR_PAID, PR_UNPAID, PR_UNKNOWN, PR_EXPIRED from .paymentrequest import InvoiceStore @@ -71,9 +66,6 @@ TX_STATUS = [ _('Local'), ] -TX_HEIGHT_LOCAL = -2 -TX_HEIGHT_UNCONF_PARENT = -1 -TX_HEIGHT_UNCONFIRMED = 0 def relayfee(network): @@ -158,65 +150,37 @@ def sweep(privkeys, network, config, recipient, fee=None, imax=100): return tx -class AddTransactionException(Exception): - pass - -class UnrelatedTransactionException(AddTransactionException): - def __str__(self): - return _("Transaction is unrelated to this wallet.") +class CannotBumpFee(Exception): pass -class CannotBumpFee(Exception): pass -class Abstract_Wallet(PrintError): +class Abstract_Wallet(AddressSynchronizer): """ Wallet classes are created to handle various address generation methods. Completion states (watching-only, single account, no seed, etc) are handled inside classes. """ max_change_outputs = 3 + gap_limit_for_change = 6 def __init__(self, storage): + AddressSynchronizer.__init__(self, storage) self.electrum_version = ELECTRUM_VERSION - self.storage = storage - self.network = None - # verifier (SPV) and synchronizer are started in start_threads - self.synchronizer = None - self.verifier = None - - self.gap_limit_for_change = 6 # constant - - # locks: if you need to take multiple ones, acquire them in the order they are defined here! - self.lock = threading.RLock() - self.transaction_lock = threading.RLock() - # saved fields self.use_change = storage.get('use_change', True) self.multiple_change = storage.get('multiple_change', False) self.labels = storage.get('labels', {}) self.frozen_addresses = set(storage.get('frozen_addresses',[])) - self.history = storage.get('addr_history',{}) # address -> list(txid, height) self.fiat_value = storage.get('fiat_value', {}) self.receive_requests = storage.get('payment_requests', {}) - # Verified transactions. txid -> (height, timestamp, block_pos). Access with self.lock. - self.verified_tx = storage.get('verified_tx3', {}) - # Transactions pending verification. txid -> tx_height. Access with self.lock. - self.unverified_tx = defaultdict(int) - self.load_keystore() self.load_addresses() self.test_addresses_sanity() - self.load_transactions() - self.load_local_history() - self.check_history() - self.load_unverified_transactions() - self.remove_local_transactions_we_dont_have() - # wallet.up_to_date is true when the wallet is synchronized - self.up_to_date = False + self.check_history() # save wallet type the first time if self.storage.get('wallet_type') is None: @@ -228,7 +192,6 @@ class Abstract_Wallet(PrintError): self.coin_price_cache = {} - def diagnostic_name(self): return self.basename() @@ -238,92 +201,16 @@ class Abstract_Wallet(PrintError): def get_master_public_key(self): return None - @profiler - def load_transactions(self): - # load txi, txo, tx_fees - self.txi = self.storage.get('txi', {}) - for txid, d in list(self.txi.items()): - for addr, lst in d.items(): - self.txi[txid][addr] = set([tuple(x) for x in lst]) - self.txo = self.storage.get('txo', {}) - self.tx_fees = self.storage.get('tx_fees', {}) - tx_list = self.storage.get('transactions', {}) - # load transactions - self.transactions = {} - for tx_hash, raw in tx_list.items(): - tx = Transaction(raw) - self.transactions[tx_hash] = tx - if self.txi.get(tx_hash) is None and self.txo.get(tx_hash) is None: - self.print_error("removing unreferenced tx", tx_hash) - self.transactions.pop(tx_hash) - # load spent_outpoints - _spent_outpoints = self.storage.get('spent_outpoints', {}) - self.spent_outpoints = defaultdict(dict) - for prevout_hash, d in _spent_outpoints.items(): - for prevout_n_str, spending_txid in d.items(): - prevout_n = int(prevout_n_str) - self.spent_outpoints[prevout_hash][prevout_n] = spending_txid - - @profiler - def load_local_history(self): - self._history_local = {} # address -> set(txid) - for txid in itertools.chain(self.txi, self.txo): - self._add_tx_to_local_history(txid) - - def remove_local_transactions_we_dont_have(self): - txid_set = set(self.txi) | set(self.txo) - for txid in txid_set: - tx_height = self.get_tx_height(txid)[0] - if tx_height == TX_HEIGHT_LOCAL and txid not in self.transactions: - self.remove_transaction(txid) - - @profiler - def save_transactions(self, write=False): - with self.transaction_lock: - tx = {} - for k,v in self.transactions.items(): - tx[k] = str(v) - self.storage.put('transactions', tx) - self.storage.put('txi', self.txi) - self.storage.put('txo', self.txo) - self.storage.put('tx_fees', self.tx_fees) - self.storage.put('addr_history', self.history) - self.storage.put('spent_outpoints', self.spent_outpoints) - if write: - self.storage.write() - - def save_verified_tx(self, write=False): - with self.lock: - self.storage.put('verified_tx3', self.verified_tx) - if write: - self.storage.write() - - def clear_history(self): - with self.lock: - with self.transaction_lock: - self.txi = {} - self.txo = {} - self.tx_fees = {} - self.spent_outpoints = defaultdict(dict) - self.history = {} - self.verified_tx = {} - self.transactions = {} - self.save_transactions() - @profiler def check_history(self): save = False - hist_addrs_mine = list(filter(lambda k: self.is_mine(k), self.history.keys())) hist_addrs_not_mine = list(filter(lambda k: not self.is_mine(k), self.history.keys())) - for addr in hist_addrs_not_mine: self.history.pop(addr) save = True - for addr in hist_addrs_mine: hist = self.history[addr] - for tx_hash, tx_height in hist: if self.txi.get(tx_hash) or self.txo.get(tx_hash): continue @@ -358,19 +245,6 @@ class Abstract_Wallet(PrintError): def is_deterministic(self): return self.keystore.is_deterministic() - def set_up_to_date(self, up_to_date): - with self.lock: - self.up_to_date = up_to_date - if up_to_date: - self.save_transactions(write=True) - # if the verifier is also up to date, persist that too; - # otherwise it will persist its results when it finishes - if self.verifier and self.verifier.is_up_to_date(): - self.save_verified_tx(write=True) - - def is_up_to_date(self): - with self.lock: return self.up_to_date - def set_label(self, name, text = None): changed = False old_text = self.labels.get(name) @@ -441,64 +315,6 @@ class Abstract_Wallet(PrintError): def get_public_keys(self, address): return [self.get_public_key(address)] - def add_unverified_tx(self, tx_hash, tx_height): - if tx_height in (TX_HEIGHT_UNCONFIRMED, TX_HEIGHT_UNCONF_PARENT) \ - and tx_hash in self.verified_tx: - with self.lock: - self.verified_tx.pop(tx_hash) - if self.verifier: - self.verifier.remove_spv_proof_for_tx(tx_hash) - - # tx will be verified only if height > 0 - if tx_hash not in self.verified_tx: - with self.lock: - self.unverified_tx[tx_hash] = tx_height - - def add_verified_tx(self, tx_hash, info): - # Remove from the unverified map and add to the verified map - with self.lock: - self.unverified_tx.pop(tx_hash, None) - self.verified_tx[tx_hash] = info # (tx_height, timestamp, pos) - height, conf, timestamp = self.get_tx_height(tx_hash) - self.network.trigger_callback('verified', tx_hash, height, conf, timestamp) - - def get_unverified_txs(self): - '''Returns a map from tx hash to transaction height''' - with self.lock: - return dict(self.unverified_tx) # copy - - def undo_verifications(self, blockchain, height): - '''Used by the verifier when a reorg has happened''' - txs = set() - with self.lock: - for tx_hash, item in list(self.verified_tx.items()): - tx_height, timestamp, pos = item - if tx_height >= height: - header = blockchain.read_header(tx_height) - # fixme: use block hash, not timestamp - if not header or header.get('timestamp') != timestamp: - self.verified_tx.pop(tx_hash, None) - txs.add(tx_hash) - return txs - - def get_local_height(self): - """ return last known height if we are offline """ - return self.network.get_local_height() if self.network else self.storage.get('stored_height', 0) - - def get_tx_height(self, tx_hash): - """ Given a transaction, returns (height, conf, timestamp) """ - with self.lock: - if tx_hash in self.verified_tx: - height, timestamp, pos = self.verified_tx[tx_hash] - conf = max(self.get_local_height() - height + 1, 0) - return height, conf, timestamp - elif tx_hash in self.unverified_tx: - height = self.unverified_tx[tx_hash] - return height, 0, None - else: - # local transaction - return TX_HEIGHT_LOCAL, 0, None - def get_txpos(self, tx_hash): "return position, even if the tx is unverified" with self.lock: @@ -757,24 +573,6 @@ class Abstract_Wallet(PrintError): h.append((tx_hash, tx_height)) return h - def _add_tx_to_local_history(self, txid): - with self.transaction_lock: - for addr in itertools.chain(self.txi.get(txid, []), self.txo.get(txid, [])): - cur_hist = self._history_local.get(addr, set()) - cur_hist.add(txid) - self._history_local[addr] = cur_hist - - def _remove_tx_from_local_history(self, txid): - with self.transaction_lock: - for addr in itertools.chain(self.txi.get(txid, []), self.txo.get(txid, [])): - cur_hist = self._history_local.get(addr, set()) - try: - cur_hist.remove(txid) - except KeyError: - pass - else: - self._history_local[addr] = cur_hist - def get_txin_address(self, txi): addr = txi.get('address') if addr and addr != "(pubkey)": @@ -798,235 +596,6 @@ class Abstract_Wallet(PrintError): addr = None return addr - def get_conflicting_transactions(self, tx): - """Returns a set of transaction hashes from the wallet history that are - directly conflicting with tx, i.e. they have common outpoints being - spent with tx. If the tx is already in wallet history, that will not be - reported as a conflict. - """ - conflicting_txns = set() - with self.transaction_lock: - for txin in tx.inputs(): - if txin['type'] == 'coinbase': - continue - prevout_hash = txin['prevout_hash'] - prevout_n = txin['prevout_n'] - spending_tx_hash = self.spent_outpoints[prevout_hash].get(prevout_n) - if spending_tx_hash is None: - continue - # this outpoint has already been spent, by spending_tx - assert spending_tx_hash in self.transactions - conflicting_txns |= {spending_tx_hash} - txid = tx.txid() - if txid in conflicting_txns: - # this tx is already in history, so it conflicts with itself - if len(conflicting_txns) > 1: - raise Exception('Found conflicting transactions already in wallet history.') - conflicting_txns -= {txid} - return conflicting_txns - - def add_transaction(self, tx_hash, tx, allow_unrelated=False): - assert tx_hash, tx_hash - assert tx, tx - assert tx.is_complete() - # we need self.transaction_lock but get_tx_height will take self.lock - # so we need to take that too here, to enforce order of locks - with self.lock, self.transaction_lock: - # NOTE: returning if tx in self.transactions might seem like a good idea - # BUT we track is_mine inputs in a txn, and during subsequent calls - # of add_transaction tx, we might learn of more-and-more inputs of - # being is_mine, as we roll the gap_limit forward - is_coinbase = tx.inputs()[0]['type'] == 'coinbase' - tx_height = self.get_tx_height(tx_hash)[0] - if not allow_unrelated: - # note that during sync, if the transactions are not properly sorted, - # it could happen that we think tx is unrelated but actually one of the inputs is is_mine. - # this is the main motivation for allow_unrelated - is_mine = any([self.is_mine(self.get_txin_address(txin)) for txin in tx.inputs()]) - is_for_me = any([self.is_mine(self.get_txout_address(txo)) for txo in tx.outputs()]) - if not is_mine and not is_for_me: - raise UnrelatedTransactionException() - # Find all conflicting transactions. - # In case of a conflict, - # 1. confirmed > mempool > local - # 2. this new txn has priority over existing ones - # When this method exits, there must NOT be any conflict, so - # either keep this txn and remove all conflicting (along with dependencies) - # or drop this txn - conflicting_txns = self.get_conflicting_transactions(tx) - if conflicting_txns: - existing_mempool_txn = any( - self.get_tx_height(tx_hash2)[0] in (TX_HEIGHT_UNCONFIRMED, TX_HEIGHT_UNCONF_PARENT) - for tx_hash2 in conflicting_txns) - existing_confirmed_txn = any( - self.get_tx_height(tx_hash2)[0] > 0 - for tx_hash2 in conflicting_txns) - if existing_confirmed_txn and tx_height <= 0: - # this is a non-confirmed tx that conflicts with confirmed txns; drop. - return False - if existing_mempool_txn and tx_height == TX_HEIGHT_LOCAL: - # this is a local tx that conflicts with non-local txns; drop. - return False - # keep this txn and remove all conflicting - to_remove = set() - to_remove |= conflicting_txns - for conflicting_tx_hash in conflicting_txns: - to_remove |= self.get_depending_transactions(conflicting_tx_hash) - for tx_hash2 in to_remove: - self.remove_transaction(tx_hash2) - # add inputs - def add_value_from_prev_output(): - dd = self.txo.get(prevout_hash, {}) - # note: this nested loop takes linear time in num is_mine outputs of prev_tx - for addr, outputs in dd.items(): - # note: instead of [(n, v, is_cb), ...]; we could store: {n -> (v, is_cb)} - for n, v, is_cb in outputs: - if n == prevout_n: - if addr and self.is_mine(addr): - if d.get(addr) is None: - d[addr] = set() - d[addr].add((ser, v)) - return - self.txi[tx_hash] = d = {} - for txi in tx.inputs(): - if txi['type'] == 'coinbase': - continue - prevout_hash = txi['prevout_hash'] - prevout_n = txi['prevout_n'] - ser = prevout_hash + ':%d' % prevout_n - self.spent_outpoints[prevout_hash][prevout_n] = tx_hash - add_value_from_prev_output() - # add outputs - self.txo[tx_hash] = d = {} - for n, txo in enumerate(tx.outputs()): - v = txo[2] - ser = tx_hash + ':%d'%n - addr = self.get_txout_address(txo) - if addr and self.is_mine(addr): - if d.get(addr) is None: - d[addr] = [] - d[addr].append((n, v, is_coinbase)) - # give v to txi that spends me - next_tx = self.spent_outpoints[tx_hash].get(n) - if next_tx is not None: - dd = self.txi.get(next_tx, {}) - if dd.get(addr) is None: - dd[addr] = set() - if (ser, v) not in dd[addr]: - dd[addr].add((ser, v)) - self._add_tx_to_local_history(next_tx) - # add to local history - self._add_tx_to_local_history(tx_hash) - # save - self.transactions[tx_hash] = tx - return True - - def remove_transaction(self, tx_hash): - def remove_from_spent_outpoints(): - # undo spends in spent_outpoints - if tx is not None: # if we have the tx, this branch is faster - for txin in tx.inputs(): - if txin['type'] == 'coinbase': - continue - prevout_hash = txin['prevout_hash'] - prevout_n = txin['prevout_n'] - self.spent_outpoints[prevout_hash].pop(prevout_n, None) - if not self.spent_outpoints[prevout_hash]: - self.spent_outpoints.pop(prevout_hash) - else: # expensive but always works - for prevout_hash, d in list(self.spent_outpoints.items()): - for prevout_n, spending_txid in d.items(): - if spending_txid == tx_hash: - self.spent_outpoints[prevout_hash].pop(prevout_n, None) - if not self.spent_outpoints[prevout_hash]: - self.spent_outpoints.pop(prevout_hash) - # Remove this tx itself; if nothing spends from it. - # It is not so clear what to do if other txns spend from it, but it will be - # removed when those other txns are removed. - if not self.spent_outpoints[tx_hash]: - self.spent_outpoints.pop(tx_hash) - - with self.transaction_lock: - self.print_error("removing tx from history", tx_hash) - tx = self.transactions.pop(tx_hash, None) - remove_from_spent_outpoints() - self._remove_tx_from_local_history(tx_hash) - self.txi.pop(tx_hash, None) - self.txo.pop(tx_hash, None) - - def receive_tx_callback(self, tx_hash, tx, tx_height): - self.add_unverified_tx(tx_hash, tx_height) - self.add_transaction(tx_hash, tx, allow_unrelated=True) - - def receive_history_callback(self, addr, hist, tx_fees): - with self.lock: - old_hist = self.get_address_history(addr) - for tx_hash, height in old_hist: - if (tx_hash, height) not in hist: - # make tx local - self.unverified_tx.pop(tx_hash, None) - self.verified_tx.pop(tx_hash, None) - if self.verifier: - self.verifier.remove_spv_proof_for_tx(tx_hash) - self.history[addr] = hist - - for tx_hash, tx_height in hist: - # add it in case it was previously unconfirmed - self.add_unverified_tx(tx_hash, tx_height) - # if addr is new, we have to recompute txi and txo - tx = self.transactions.get(tx_hash) - if tx is None: - continue - self.add_transaction(tx_hash, tx, allow_unrelated=True) - - # Store fees - self.tx_fees.update(tx_fees) - - def get_history(self, domain=None): - # get domain - if domain is None: - domain = self.get_addresses() - domain = set(domain) - # 1. Get the history of each address in the domain, maintain the - # delta of a tx as the sum of its deltas on domain addresses - tx_deltas = defaultdict(int) - for addr in domain: - h = self.get_address_history(addr) - for tx_hash, height in h: - delta = self.get_tx_delta(tx_hash, addr) - if delta is None or tx_deltas[tx_hash] is None: - tx_deltas[tx_hash] = None - else: - tx_deltas[tx_hash] += delta - - # 2. create sorted history - history = [] - for tx_hash in tx_deltas: - delta = tx_deltas[tx_hash] - height, conf, timestamp = self.get_tx_height(tx_hash) - history.append((tx_hash, height, conf, timestamp, delta)) - history.sort(key = lambda x: self.get_txpos(x[0])) - history.reverse() - - # 3. add balance - c, u, x = self.get_balance(domain) - balance = c + u + x - h2 = [] - for tx_hash, height, conf, timestamp, delta in history: - h2.append((tx_hash, height, conf, timestamp, delta, balance)) - if balance is None or delta is None: - balance = None - else: - balance -= delta - h2.reverse() - - # fixme: this may happen if history is incomplete - if balance not in [None, 0]: - self.print_error("Error: history not synchronized") - return [] - - return h2 - def balance_at_timestamp(self, domain, target_timestamp): h = self.get_history(domain) for tx_hash, height, conf, timestamp, value, balance in h: @@ -1285,36 +854,6 @@ class Abstract_Wallet(PrintError): return True return False - def load_unverified_transactions(self): - # review transactions that are in the history - for addr, hist in self.history.items(): - for tx_hash, tx_height in hist: - # add it in case it was previously unconfirmed - self.add_unverified_tx(tx_hash, tx_height) - - def start_threads(self, network): - self.network = network - if self.network is not None: - self.verifier = SPV(self.network, self) - self.synchronizer = Synchronizer(self, network) - network.add_jobs([self.verifier, self.synchronizer]) - else: - self.verifier = None - self.synchronizer = None - - def stop_threads(self): - if self.network: - self.network.remove_jobs([self.synchronizer, self.verifier]) - self.synchronizer.release() - self.synchronizer = None - self.verifier = None - # Now no references to the synchronizer or verifier - # remain so they will be GC-ed - self.storage.put('stored_height', self.get_local_height()) - self.save_transactions() - self.save_verified_tx() - self.storage.write() - def wait_until_synchronized(self, callback=None): def wait_for_wallet(): self.set_up_to_date(False) @@ -1605,7 +1144,7 @@ class Abstract_Wallet(PrintError): expiration = 0 conf = None if amount: - if self.up_to_date: + if self.is_up_to_date(): paid, conf = self.get_payment_status(address, amount) status = PR_PAID if paid else PR_UNPAID if status == PR_UNPAID and expiration is not None and time.time() > timestamp + expiration: @@ -1700,12 +1239,6 @@ class Abstract_Wallet(PrintError): def can_delete_address(self): return False - def add_address(self, address): - if address not in self.history: - self.history[address] = [] - if self.synchronizer: - self.synchronizer.add(address) - def has_password(self): return self.has_keystore_encryption() or self.has_storage_encryption()