Browse Source

keystore: use abstract base classes, introduce MPKMixin

hard-fail-on-bad-server-string
SomberNight 5 years ago
parent
commit
0ab88b821c
No known key found for this signature in database GPG Key ID: B33B5F232C6271E9
  1. 186
      electrum/keystore.py
  2. 7
      electrum/plugins/coldcard/coldcard.py
  3. 4
      electrum/wallet.py

186
electrum/keystore.py

@ -29,6 +29,7 @@ import hashlib
import re
from typing import Tuple, TYPE_CHECKING, Union, Sequence, Optional, Dict, List, NamedTuple
from functools import lru_cache
from abc import ABC, abstractmethod
from . import bitcoin, ecc, constants, bip32
from .bitcoin import deserialize_privkey, serialize_privkey
@ -50,7 +51,7 @@ if TYPE_CHECKING:
from .plugins.hw_wallet import HW_PluginBase, HardwareClientBase
class KeyStore(Logger):
class KeyStore(Logger, ABC):
type: str
def __init__(self):
@ -69,9 +70,10 @@ class KeyStore(Logger):
def get_type_text(self) -> str:
return f'{self.type}'
@abstractmethod
def may_have_password(self):
"""Returns whether the keystore can be encrypted with a password."""
raise NotImplementedError()
pass
def get_tx_derivations(self, tx: 'PartialTransaction') -> Dict[str, Union[Sequence[int], str]]:
keypairs = {}
@ -96,21 +98,27 @@ class KeyStore(Logger):
def ready_to_sign(self) -> bool:
return not self.is_watching_only()
@abstractmethod
def dump(self) -> dict:
raise NotImplementedError() # implemented by subclasses
pass
@abstractmethod
def is_deterministic(self) -> bool:
raise NotImplementedError() # implemented by subclasses
pass
@abstractmethod
def sign_message(self, sequence, message, password) -> bytes:
raise NotImplementedError() # implemented by subclasses
pass
@abstractmethod
def decrypt_message(self, sequence, message, password) -> bytes:
raise NotImplementedError() # implemented by subclasses
pass
@abstractmethod
def sign_transaction(self, tx: 'PartialTransaction', password) -> None:
raise NotImplementedError() # implemented by subclasses
pass
@abstractmethod
def get_pubkey_derivation(self, pubkey: bytes,
txinout: Union['PartialTxInput', 'PartialTxOutput'],
*, only_der_suffix=True) \
@ -119,41 +127,7 @@ class KeyStore(Logger):
the pubkey itself (hex) if the pubkey belongs to the keystore but not HD derived,
or None if the pubkey is unrelated.
"""
def test_der_suffix_against_pubkey(der_suffix: Sequence[int], pubkey: bytes) -> bool:
if len(der_suffix) != 2:
return False
if pubkey.hex() != self.derive_pubkey(*der_suffix):
return False
return True
if hasattr(self, 'get_root_fingerprint'):
if pubkey not in txinout.bip32_paths:
return None
fp_found, path_found = txinout.bip32_paths[pubkey]
der_suffix = None
full_path = None
# try fp against our root
my_root_fingerprint_hex = self.get_root_fingerprint()
my_der_prefix_str = self.get_derivation_prefix()
ks_der_prefix = convert_bip32_path_to_list_of_uint32(my_der_prefix_str) if my_der_prefix_str else None
if (my_root_fingerprint_hex is not None and ks_der_prefix is not None and
fp_found.hex() == my_root_fingerprint_hex):
if path_found[:len(ks_der_prefix)] == ks_der_prefix:
der_suffix = path_found[len(ks_der_prefix):]
if not test_der_suffix_against_pubkey(der_suffix, pubkey):
der_suffix = None
# try fp against our intermediate fingerprint
if (der_suffix is None and hasattr(self, 'get_bip32_node_for_xpub') and
fp_found == self.get_bip32_node_for_xpub().calc_fingerprint_of_this_node()):
der_suffix = path_found
if not test_der_suffix_against_pubkey(der_suffix, pubkey):
der_suffix = None
if der_suffix is None:
return None
if ks_der_prefix is not None:
full_path = ks_der_prefix + list(der_suffix)
return der_suffix if only_der_suffix else full_path
return None
pass
def find_my_pubkey_in_txinout(
self, txinout: Union['PartialTxInput', 'PartialTxOutput'],
@ -202,14 +176,17 @@ class Software_KeyStore(KeyStore):
if keypairs:
tx.sign(keypairs)
@abstractmethod
def update_password(self, old_password, new_password):
raise NotImplementedError() # implemented by subclasses
pass
@abstractmethod
def check_password(self, password):
raise NotImplementedError() # implemented by subclasses
pass
@abstractmethod
def get_private_key(self, *args, **kwargs) -> Tuple[bytes, bool]:
raise NotImplementedError() # implemented by subclasses
pass
class Imported_KeyStore(Software_KeyStore):
@ -224,9 +201,6 @@ class Imported_KeyStore(Software_KeyStore):
def is_deterministic(self):
return False
def get_master_public_key(self):
return None
def dump(self):
return {
'type': self.type,
@ -308,6 +282,10 @@ class Deterministic_KeyStore(Software_KeyStore):
def is_watching_only(self):
return not self.has_seed()
@abstractmethod
def format_seed(self, seed: str) -> str:
pass
def add_seed(self, seed):
if self.seed:
raise Exception("a seed exists")
@ -325,7 +303,81 @@ class Deterministic_KeyStore(Software_KeyStore):
return ''
class Xpub:
class MasterPublicKeyMixin(ABC):
@abstractmethod
def get_master_public_key(self) -> str:
pass
@abstractmethod
def get_derivation_prefix(self) -> Optional[str]:
"""Returns to bip32 path from some root node to self.xpub
Note that the return value might be None; if it is unknown.
"""
pass
@abstractmethod
def get_root_fingerprint(self) -> Optional[str]:
"""Returns the bip32 fingerprint of the top level node.
This top level node is the node at the beginning of the derivation prefix,
i.e. applying the derivation prefix to it will result self.xpub
Note that the return value might be None; if it is unknown.
"""
pass
@abstractmethod
def get_fp_and_derivation_to_be_used_in_partial_tx(self, der_suffix: Sequence[int], *,
only_der_suffix: bool = True) -> Tuple[bytes, Sequence[int]]:
"""Returns fingerprint and derivation path corresponding to a derivation suffix.
The fingerprint is either the root fp or the intermediate fp, depending on what is available
and 'only_der_suffix', and the derivation path is adjusted accordingly.
"""
pass
@abstractmethod
def derive_pubkey(self, for_change: int, n: int) -> str:
pass
def get_pubkey_derivation(self, pubkey: bytes,
txinout: Union['PartialTxInput', 'PartialTxOutput'],
*, only_der_suffix=True) \
-> Union[Sequence[int], str, None]:
def test_der_suffix_against_pubkey(der_suffix: Sequence[int], pubkey: bytes) -> bool:
if len(der_suffix) != 2:
return False
if pubkey.hex() != self.derive_pubkey(*der_suffix):
return False
return True
if pubkey not in txinout.bip32_paths:
return None
fp_found, path_found = txinout.bip32_paths[pubkey]
der_suffix = None
full_path = None
# try fp against our root
my_root_fingerprint_hex = self.get_root_fingerprint()
my_der_prefix_str = self.get_derivation_prefix()
ks_der_prefix = convert_bip32_path_to_list_of_uint32(my_der_prefix_str) if my_der_prefix_str else None
if (my_root_fingerprint_hex is not None and ks_der_prefix is not None and
fp_found.hex() == my_root_fingerprint_hex):
if path_found[:len(ks_der_prefix)] == ks_der_prefix:
der_suffix = path_found[len(ks_der_prefix):]
if not test_der_suffix_against_pubkey(der_suffix, pubkey):
der_suffix = None
# try fp against our intermediate fingerprint
if (der_suffix is None and isinstance(self, Xpub) and
fp_found == self.get_bip32_node_for_xpub().calc_fingerprint_of_this_node()):
der_suffix = path_found
if not test_der_suffix_against_pubkey(der_suffix, pubkey):
der_suffix = None
if der_suffix is None:
return None
if ks_der_prefix is not None:
full_path = ks_der_prefix + list(der_suffix)
return der_suffix if only_der_suffix else full_path
class Xpub(MasterPublicKeyMixin):
def __init__(self, *, derivation_prefix: str = None, root_fingerprint: str = None):
self.xpub = None
@ -348,25 +400,13 @@ class Xpub:
return self._xpub_bip32_node
def get_derivation_prefix(self) -> Optional[str]:
"""Returns to bip32 path from some root node to self.xpub
Note that the return value might be None; if it is unknown.
"""
return self._derivation_prefix
def get_root_fingerprint(self) -> Optional[str]:
"""Returns the bip32 fingerprint of the top level node.
This top level node is the node at the beginning of the derivation prefix,
i.e. applying the derivation prefix to it will result self.xpub
Note that the return value might be None; if it is unknown.
"""
return self._root_fingerprint
def get_fp_and_derivation_to_be_used_in_partial_tx(self, der_suffix: Sequence[int], *,
only_der_suffix: bool = True) -> Tuple[bytes, Sequence[int]]:
"""Returns fingerprint and derivation path corresponding to a derivation suffix.
The fingerprint is either the root fp or the intermediate fp, depending on what is available
and 'only_der_suffix', and the derivation path is adjusted accordingly.
"""
fingerprint_hex = self.get_root_fingerprint()
der_prefix_str = self.get_derivation_prefix()
if not only_der_suffix and fingerprint_hex is not None and der_prefix_str is not None:
@ -437,7 +477,7 @@ class Xpub:
return node.eckey.get_public_key_bytes(compressed=True)
class BIP32_KeyStore(Deterministic_KeyStore, Xpub):
class BIP32_KeyStore(Xpub, Deterministic_KeyStore):
type = 'bip32'
@ -512,7 +552,8 @@ class BIP32_KeyStore(Deterministic_KeyStore, Xpub):
cK = ecc.ECPrivkey(k).get_public_key_bytes()
return cK, k
class Old_KeyStore(Deterministic_KeyStore):
class Old_KeyStore(MasterPublicKeyMixin, Deterministic_KeyStore):
type = 'old'
@ -585,7 +626,7 @@ class Old_KeyStore(Deterministic_KeyStore):
def derive_pubkey(self, for_change, n) -> str:
return self.get_pubkey_from_mpk(self.mpk, for_change, n)
def get_private_key_from_stretched_exponent(self, for_change, n, secexp):
def _get_private_key_from_stretched_exponent(self, for_change, n, secexp):
secexp = (secexp + self.get_sequence(self.mpk, for_change, n)) % ecc.CURVE_ORDER
pk = number_to_string(secexp, ecc.CURVE_ORDER)
return pk
@ -593,12 +634,12 @@ class Old_KeyStore(Deterministic_KeyStore):
def get_private_key(self, sequence, password):
seed = self.get_hex_seed(password)
secexp = self.stretch_key(seed)
self.check_seed(seed, secexp=secexp)
self._check_seed(seed, secexp=secexp)
for_change, n = sequence
pk = self.get_private_key_from_stretched_exponent(for_change, n, secexp)
pk = self._get_private_key_from_stretched_exponent(for_change, n, secexp)
return pk, False
def check_seed(self, seed, *, secexp=None):
def _check_seed(self, seed, *, secexp=None):
if secexp is None:
secexp = self.stretch_key(seed)
master_private_key = ecc.ECPrivkey.from_secret_scalar(secexp)
@ -608,7 +649,7 @@ class Old_KeyStore(Deterministic_KeyStore):
def check_password(self, password):
seed = self.get_hex_seed(password)
self.check_seed(seed)
self._check_seed(seed)
def get_master_public_key(self):
return self.mpk
@ -623,7 +664,6 @@ class Old_KeyStore(Deterministic_KeyStore):
self._root_fingerprint = xfp.hex().lower()
return self._root_fingerprint
# TODO Old_KeyStore and Xpub could share a common baseclass?
def get_fp_and_derivation_to_be_used_in_partial_tx(self, der_suffix: Sequence[int], *,
only_der_suffix: bool = True) -> Tuple[bytes, Sequence[int]]:
fingerprint_hex = self.get_root_fingerprint()
@ -643,7 +683,7 @@ class Old_KeyStore(Deterministic_KeyStore):
self.pw_hash_version = PW_HASH_VERSION_LATEST
class Hardware_KeyStore(KeyStore, Xpub):
class Hardware_KeyStore(Xpub, KeyStore):
hw_type: str
device: str
plugin: 'HW_PluginBase'
@ -694,9 +734,6 @@ class Hardware_KeyStore(KeyStore, Xpub):
called in any thread context.'''
self.logger.info("paired")
def can_export(self):
return False
def is_watching_only(self):
'''The wallet is not watching-only; the user will be prompted for
pin and passphrase as appropriate when needed.'''
@ -732,6 +769,9 @@ class Hardware_KeyStore(KeyStore, Xpub):
self.is_requesting_to_be_rewritten_to_wallet_file = True
KeyStoreWithMPK = Union[KeyStore, MasterPublicKeyMixin] # intersection really...
def bip39_normalize_passphrase(passphrase):
return normalize('NFKD', passphrase or '')

7
electrum/plugins/coldcard/coldcard.py

@ -11,7 +11,7 @@ from electrum import bip32
from electrum.bip32 import BIP32Node, InvalidMasterKeyVersionBytes
from electrum.i18n import _
from electrum.plugin import Device, hook
from electrum.keystore import Hardware_KeyStore
from electrum.keystore import Hardware_KeyStore, KeyStoreWithMPK
from electrum.transaction import PartialTransaction
from electrum.wallet import Standard_Wallet, Multisig_Wallet, Abstract_Wallet
from electrum.util import bfh, bh2u, versiontuple, UserFacingException
@ -21,9 +21,6 @@ from electrum.logging import get_logger
from ..hw_wallet import HW_PluginBase, HardwareClientBase
from ..hw_wallet.plugin import LibraryFoundButUnusable, only_hook_if_libraries_available
if TYPE_CHECKING:
from electrum.keystore import Xpub
_logger = get_logger(__name__)
@ -571,7 +568,7 @@ class ColdcardPlugin(HW_PluginBase):
xpubs = []
derivs = set()
for xpub, ks in zip(wallet.get_master_public_keys(), wallet.get_keystores()):
for xpub, ks in zip(wallet.get_master_public_keys(), wallet.get_keystores()): # type: str, KeyStoreWithMPK
fp_bytes, der_full = ks.get_fp_and_derivation_to_be_used_in_partial_tx(der_suffix=[], only_der_suffix=False)
fp_hex = fp_bytes.hex().upper()
der_prefix_str = bip32.convert_bip32_intpath_to_strpath(der_full)

4
electrum/wallet.py

@ -55,7 +55,7 @@ from .bitcoin import (COIN, is_address, address_to_script,
is_minikey, relayfee, dust_threshold)
from .crypto import sha256d
from . import keystore
from .keystore import load_keystore, Hardware_KeyStore, KeyStore
from .keystore import load_keystore, Hardware_KeyStore, KeyStore, KeyStoreWithMPK
from .util import multisig_type
from .storage import StorageEncryptionVersion, WalletStorage
from . import transaction, bitcoin, coinchooser, paymentrequest, ecc, bip32
@ -454,7 +454,7 @@ class Abstract_Wallet(AddressSynchronizer):
def get_public_keys(self, address):
return [self.get_public_key(address)]
def get_public_keys_with_deriv_info(self, address: str) -> Dict[str, Tuple[KeyStore, Sequence[int]]]:
def get_public_keys_with_deriv_info(self, address: str) -> Dict[str, Tuple[KeyStoreWithMPK, Sequence[int]]]:
"""Returns a map: pubkey_hex -> (keystore, derivation_suffix)"""
return {}

Loading…
Cancel
Save