#!/usr/bin/env python2 # -*- mode: python -*- # # Electrum - lightweight Bitcoin client # Copyright (C) 2016 The Electrum developers # # Permission is hereby granted, free of charge, to any person # obtaining a copy of this software and associated documentation files # (the "Software"), to deal in the Software without restriction, # including without limitation the rights to use, copy, modify, merge, # publish, distribute, sublicense, and/or sell copies of the Software, # and to permit persons to whom the Software is furnished to do so, # subject to the following conditions: # # The above copyright notice and this permission notice shall be # included in all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS # BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN # ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN # CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. from unicodedata import normalize from . import bitcoin, ecc from .bitcoin import * from .ecc import string_to_number, number_to_string from .crypto import pw_decode, pw_encode from . import constants from .util import (PrintError, InvalidPassword, hfu, WalletFileException, BitcoinException) from .mnemonic import Mnemonic, load_wordlist from .plugins import run_hook class KeyStore(PrintError): def has_seed(self): return False def is_watching_only(self): return False def can_import(self): return False def may_have_password(self): """Returns whether the keystore can be encrypted with a password.""" raise NotImplementedError() def get_tx_derivations(self, tx): keypairs = {} for txin in tx.inputs(): num_sig = txin.get('num_sig') if num_sig is None: continue x_signatures = txin['signatures'] signatures = [sig for sig in x_signatures if sig] if len(signatures) == num_sig: # input is complete continue for k, x_pubkey in enumerate(txin['x_pubkeys']): if x_signatures[k] is not None: # this pubkey already signed continue derivation = self.get_pubkey_derivation(x_pubkey) if not derivation: continue keypairs[x_pubkey] = derivation return keypairs def can_sign(self, tx): if self.is_watching_only(): return False return bool(self.get_tx_derivations(tx)) def ready_to_sign(self): return not self.is_watching_only() class Software_KeyStore(KeyStore): def __init__(self): KeyStore.__init__(self) def may_have_password(self): return not self.is_watching_only() def sign_message(self, sequence, message, password): privkey, compressed = self.get_private_key(sequence, password) key = ecc.ECPrivkey(privkey) return key.sign_message(message, compressed) def decrypt_message(self, sequence, message, password): privkey, compressed = self.get_private_key(sequence, password) ec = ecc.ECPrivkey(privkey) decrypted = ec.decrypt_message(message) return decrypted def sign_transaction(self, tx, password): if self.is_watching_only(): return # Raise if password is not correct. self.check_password(password) # Add private keys keypairs = self.get_tx_derivations(tx) for k, v in keypairs.items(): keypairs[k] = self.get_private_key(v, password) # Sign if keypairs: tx.sign(keypairs) class Imported_KeyStore(Software_KeyStore): # keystore for imported private keys def __init__(self, d): Software_KeyStore.__init__(self) self.keypairs = d.get('keypairs', {}) def is_deterministic(self): return False def get_master_public_key(self): return None def dump(self): return { 'type': 'imported', 'keypairs': self.keypairs, } def can_import(self): return True def check_password(self, password): pubkey = list(self.keypairs.keys())[0] self.get_private_key(pubkey, password) def import_privkey(self, sec, password): txin_type, privkey, compressed = deserialize_privkey(sec) pubkey = ecc.ECPrivkey(privkey).get_public_key_hex(compressed=compressed) # re-serialize the key so the internal storage format is consistent serialized_privkey = serialize_privkey( privkey, compressed, txin_type, internal_use=True) # NOTE: if the same pubkey is reused for multiple addresses (script types), # there will only be one pubkey-privkey pair for it in self.keypairs, # and the privkey will encode a txin_type but that txin_type cannot be trusted. # Removing keys complicates this further. self.keypairs[pubkey] = pw_encode(serialized_privkey, password) return txin_type, pubkey def delete_imported_key(self, key): self.keypairs.pop(key) def get_private_key(self, pubkey, password): sec = pw_decode(self.keypairs[pubkey], password) txin_type, privkey, compressed = deserialize_privkey(sec) # this checks the password if pubkey != ecc.ECPrivkey(privkey).get_public_key_hex(compressed=compressed): raise InvalidPassword() return privkey, compressed def get_pubkey_derivation(self, x_pubkey): if x_pubkey[0:2] in ['02', '03', '04']: if x_pubkey in self.keypairs.keys(): return x_pubkey elif x_pubkey[0:2] == 'fd': addr = bitcoin.script_to_address(x_pubkey[2:]) if addr in self.addresses: return self.addresses[addr].get('pubkey') def update_password(self, old_password, new_password): self.check_password(old_password) if new_password == '': new_password = None for k, v in self.keypairs.items(): b = pw_decode(v, old_password) c = pw_encode(b, new_password) self.keypairs[k] = c class Deterministic_KeyStore(Software_KeyStore): def __init__(self, d): Software_KeyStore.__init__(self) self.seed = d.get('seed', '') self.passphrase = d.get('passphrase', '') def is_deterministic(self): return True def dump(self): d = {} if self.seed: d['seed'] = self.seed if self.passphrase: d['passphrase'] = self.passphrase return d def has_seed(self): return bool(self.seed) def is_watching_only(self): return not self.has_seed() def add_seed(self, seed): if self.seed: raise Exception("a seed exists") self.seed = self.format_seed(seed) def get_seed(self, password): return pw_decode(self.seed, password) def get_passphrase(self, password): return pw_decode(self.passphrase, password) if self.passphrase else '' class Xpub: def __init__(self): self.xpub = None self.xpub_receive = None self.xpub_change = None def get_master_public_key(self): return self.xpub def derive_pubkey(self, for_change, n): xpub = self.xpub_change if for_change else self.xpub_receive if xpub is None: xpub = bip32_public_derivation(self.xpub, "", "/%d"%for_change) if for_change: self.xpub_change = xpub else: self.xpub_receive = xpub return self.get_pubkey_from_xpub(xpub, (n,)) @classmethod def get_pubkey_from_xpub(self, xpub, sequence): _, _, _, _, c, cK = deserialize_xpub(xpub) for i in sequence: cK, c = CKD_pub(cK, c, i) return bh2u(cK) def get_xpubkey(self, c, i): s = ''.join(map(lambda x: bitcoin.int_to_hex(x,2), (c, i))) return 'ff' + bh2u(bitcoin.DecodeBase58Check(self.xpub)) + s @classmethod def parse_xpubkey(self, pubkey): assert pubkey[0:2] == 'ff' pk = bfh(pubkey) pk = pk[1:] xkey = bitcoin.EncodeBase58Check(pk[0:78]) dd = pk[78:] s = [] while dd: n = int(bitcoin.rev_hex(bh2u(dd[0:2])), 16) dd = dd[2:] s.append(n) assert len(s) == 2 return xkey, s def get_pubkey_derivation(self, x_pubkey): if x_pubkey[0:2] != 'ff': return xpub, derivation = self.parse_xpubkey(x_pubkey) if self.xpub != xpub: return return derivation class BIP32_KeyStore(Deterministic_KeyStore, Xpub): def __init__(self, d): Xpub.__init__(self) Deterministic_KeyStore.__init__(self, d) self.xpub = d.get('xpub') self.xprv = d.get('xprv') def format_seed(self, seed): return ' '.join(seed.split()) def dump(self): d = Deterministic_KeyStore.dump(self) d['type'] = 'bip32' d['xpub'] = self.xpub d['xprv'] = self.xprv return d def get_master_private_key(self, password): return pw_decode(self.xprv, password) def check_password(self, password): xprv = pw_decode(self.xprv, password) if deserialize_xprv(xprv)[4] != deserialize_xpub(self.xpub)[4]: raise InvalidPassword() def update_password(self, old_password, new_password): self.check_password(old_password) if new_password == '': new_password = None if self.has_seed(): decoded = self.get_seed(old_password) self.seed = pw_encode(decoded, new_password) if self.passphrase: decoded = self.get_passphrase(old_password) self.passphrase = pw_encode(decoded, new_password) if self.xprv is not None: b = pw_decode(self.xprv, old_password) self.xprv = pw_encode(b, new_password) def is_watching_only(self): return self.xprv is None def add_xprv(self, xprv): self.xprv = xprv self.xpub = bitcoin.xpub_from_xprv(xprv) def add_xprv_from_seed(self, bip32_seed, xtype, derivation): xprv, xpub = bip32_root(bip32_seed, xtype) xprv, xpub = bip32_private_derivation(xprv, "m/", derivation) self.add_xprv(xprv) def get_private_key(self, sequence, password): xprv = self.get_master_private_key(password) _, _, _, _, c, k = deserialize_xprv(xprv) pk = bip32_private_key(sequence, k, c) return pk, True class Old_KeyStore(Deterministic_KeyStore): def __init__(self, d): Deterministic_KeyStore.__init__(self, d) self.mpk = d.get('mpk') def get_hex_seed(self, password): return pw_decode(self.seed, password).encode('utf8') def dump(self): d = Deterministic_KeyStore.dump(self) d['mpk'] = self.mpk d['type'] = 'old' return d def add_seed(self, seedphrase): Deterministic_KeyStore.add_seed(self, seedphrase) s = self.get_hex_seed(None) self.mpk = self.mpk_from_seed(s) def add_master_public_key(self, mpk): self.mpk = mpk def format_seed(self, seed): from . import old_mnemonic, mnemonic seed = mnemonic.normalize_text(seed) # see if seed was entered as hex if seed: try: bfh(seed) return str(seed) except Exception: pass words = seed.split() seed = old_mnemonic.mn_decode(words) if not seed: raise Exception("Invalid seed") return seed def get_seed(self, password): from . import old_mnemonic s = self.get_hex_seed(password) return ' '.join(old_mnemonic.mn_encode(s)) @classmethod def mpk_from_seed(klass, seed): secexp = klass.stretch_key(seed) privkey = ecc.ECPrivkey.from_secret_scalar(secexp) return privkey.get_public_key_hex(compressed=False)[2:] @classmethod def stretch_key(self, seed): x = seed for i in range(100000): x = hashlib.sha256(x + seed).digest() return string_to_number(x) @classmethod def get_sequence(self, mpk, for_change, n): return string_to_number(Hash(("%d:%d:"%(n, for_change)).encode('ascii') + bfh(mpk))) @classmethod def get_pubkey_from_mpk(self, mpk, for_change, n): z = self.get_sequence(mpk, for_change, n) master_public_key = ecc.ECPubkey(bfh('04'+mpk)) public_key = master_public_key + z*ecc.generator() return public_key.get_public_key_hex(compressed=False) def derive_pubkey(self, for_change, n): return self.get_pubkey_from_mpk(self.mpk, for_change, n) def get_private_key_from_stretched_exponent(self, for_change, n, secexp): secexp = (secexp + self.get_sequence(self.mpk, for_change, n)) % ecc.CURVE_ORDER pk = number_to_string(secexp, ecc.CURVE_ORDER) return pk def get_private_key(self, sequence, password): seed = self.get_hex_seed(password) self.check_seed(seed) for_change, n = sequence secexp = self.stretch_key(seed) pk = self.get_private_key_from_stretched_exponent(for_change, n, secexp) return pk, False def check_seed(self, seed): secexp = self.stretch_key(seed) master_private_key = ecc.ECPrivkey.from_secret_scalar(secexp) master_public_key = master_private_key.get_public_key_bytes(compressed=False)[1:] if master_public_key != bfh(self.mpk): print_error('invalid password (mpk)', self.mpk, bh2u(master_public_key)) raise InvalidPassword() def check_password(self, password): seed = self.get_hex_seed(password) self.check_seed(seed) def get_master_public_key(self): return self.mpk def get_xpubkey(self, for_change, n): s = ''.join(map(lambda x: bitcoin.int_to_hex(x,2), (for_change, n))) return 'fe' + self.mpk + s @classmethod def parse_xpubkey(self, x_pubkey): assert x_pubkey[0:2] == 'fe' pk = x_pubkey[2:] mpk = pk[0:128] dd = pk[128:] s = [] while dd: n = int(bitcoin.rev_hex(dd[0:4]), 16) dd = dd[4:] s.append(n) assert len(s) == 2 return mpk, s def get_pubkey_derivation(self, x_pubkey): if x_pubkey[0:2] != 'fe': return mpk, derivation = self.parse_xpubkey(x_pubkey) if self.mpk != mpk: return return derivation def update_password(self, old_password, new_password): self.check_password(old_password) if new_password == '': new_password = None if self.has_seed(): decoded = pw_decode(self.seed, old_password) self.seed = pw_encode(decoded, new_password) class Hardware_KeyStore(KeyStore, Xpub): # Derived classes must set: # - device # - DEVICE_IDS # - wallet_type #restore_wallet_class = BIP32_RD_Wallet max_change_outputs = 1 def __init__(self, d): Xpub.__init__(self) KeyStore.__init__(self) # Errors and other user interaction is done through the wallet's # handler. The handler is per-window and preserved across # device reconnects self.xpub = d.get('xpub') self.label = d.get('label') self.derivation = d.get('derivation') self.handler = None run_hook('init_keystore', self) def set_label(self, label): self.label = label def may_have_password(self): return False def is_deterministic(self): return True def dump(self): return { 'type': 'hardware', 'hw_type': self.hw_type, 'xpub': self.xpub, 'derivation':self.derivation, 'label':self.label, } def unpaired(self): '''A device paired with the wallet was disconnected. This can be called in any thread context.''' self.print_error("unpaired") def paired(self): '''A device paired with the wallet was (re-)connected. This can be called in any thread context.''' self.print_error("paired") def can_export(self): return False def is_watching_only(self): '''The wallet is not watching-only; the user will be prompted for pin and passphrase as appropriate when needed.''' assert not self.has_seed() return False def get_password_for_storage_encryption(self): from .storage import get_derivation_used_for_hw_device_encryption client = self.plugin.get_client(self) derivation = get_derivation_used_for_hw_device_encryption() xpub = client.get_xpub(derivation, "standard") password = self.get_pubkey_from_xpub(xpub, ()) return password def has_usable_connection_with_device(self): if not hasattr(self, 'plugin'): return False client = self.plugin.get_client(self, force_pair=False) if client is None: return False return client.has_usable_connection_with_device() def ready_to_sign(self): return super().ready_to_sign() and self.has_usable_connection_with_device() def bip39_normalize_passphrase(passphrase): return normalize('NFKD', passphrase or '') def bip39_to_seed(mnemonic, passphrase): import pbkdf2, hashlib, hmac PBKDF2_ROUNDS = 2048 mnemonic = normalize('NFKD', ' '.join(mnemonic.split())) passphrase = bip39_normalize_passphrase(passphrase) return pbkdf2.PBKDF2(mnemonic, 'mnemonic' + passphrase, iterations = PBKDF2_ROUNDS, macmodule = hmac, digestmodule = hashlib.sha512).read(64) # returns tuple (is_checksum_valid, is_wordlist_valid) def bip39_is_checksum_valid(mnemonic): words = [ normalize('NFKD', word) for word in mnemonic.split() ] words_len = len(words) wordlist = load_wordlist("english.txt") n = len(wordlist) checksum_length = 11*words_len//33 entropy_length = 32*checksum_length i = 0 words.reverse() while words: w = words.pop() try: k = wordlist.index(w) except ValueError: return False, False i = i*n + k if words_len not in [12, 15, 18, 21, 24]: return False, True entropy = i >> checksum_length checksum = i % 2**checksum_length h = '{:x}'.format(entropy) while len(h) < entropy_length/4: h = '0'+h b = bytearray.fromhex(h) hashed = int(hfu(hashlib.sha256(b).digest()), 16) calculated_checksum = hashed >> (256 - checksum_length) return checksum == calculated_checksum, True def from_bip39_seed(seed, passphrase, derivation, xtype=None): k = BIP32_KeyStore({}) bip32_seed = bip39_to_seed(seed, passphrase) if xtype is None: xtype = xtype_from_derivation(derivation) k.add_xprv_from_seed(bip32_seed, xtype, derivation) return k def xtype_from_derivation(derivation): """Returns the script type to be used for this derivation.""" if derivation.startswith("m/84'"): return 'p2wpkh' elif derivation.startswith("m/49'"): return 'p2wpkh-p2sh' else: return 'standard' # extended pubkeys def is_xpubkey(x_pubkey): return x_pubkey[0:2] == 'ff' def parse_xpubkey(x_pubkey): assert x_pubkey[0:2] == 'ff' return BIP32_KeyStore.parse_xpubkey(x_pubkey) def xpubkey_to_address(x_pubkey): if x_pubkey[0:2] == 'fd': address = bitcoin.script_to_address(x_pubkey[2:]) return x_pubkey, address if x_pubkey[0:2] in ['02', '03', '04']: pubkey = x_pubkey elif x_pubkey[0:2] == 'ff': xpub, s = BIP32_KeyStore.parse_xpubkey(x_pubkey) pubkey = BIP32_KeyStore.get_pubkey_from_xpub(xpub, s) elif x_pubkey[0:2] == 'fe': mpk, s = Old_KeyStore.parse_xpubkey(x_pubkey) pubkey = Old_KeyStore.get_pubkey_from_mpk(mpk, s[0], s[1]) else: raise BitcoinException("Cannot parse pubkey. prefix: {}" .format(x_pubkey[0:2])) if pubkey: address = public_key_to_p2pkh(bfh(pubkey)) return pubkey, address def xpubkey_to_pubkey(x_pubkey): pubkey, address = xpubkey_to_address(x_pubkey) return pubkey hw_keystores = {} def register_keystore(hw_type, constructor): hw_keystores[hw_type] = constructor def hardware_keystore(d): hw_type = d['hw_type'] if hw_type in hw_keystores: constructor = hw_keystores[hw_type] return constructor(d) raise WalletFileException('unknown hardware type: {}'.format(hw_type)) def load_keystore(storage, name): d = storage.get(name, {}) t = d.get('type') if not t: raise WalletFileException( 'Wallet format requires update.\n' 'Cannot find keystore for name {}'.format(name)) if t == 'old': k = Old_KeyStore(d) elif t == 'imported': k = Imported_KeyStore(d) elif t == 'bip32': k = BIP32_KeyStore(d) elif t == 'hardware': k = hardware_keystore(d) else: raise WalletFileException( 'Unknown type {} for keystore named {}'.format(t, name)) return k def is_old_mpk(mpk: str) -> bool: try: int(mpk, 16) except: return False if len(mpk) != 128: return False try: ecc.ECPubkey(bfh('04' + mpk)) except: return False return True def is_address_list(text): parts = text.split() return bool(parts) and all(bitcoin.is_address(x) for x in parts) def get_private_keys(text): parts = text.split('\n') parts = map(lambda x: ''.join(x.split()), parts) parts = list(filter(bool, parts)) if bool(parts) and all(bitcoin.is_private_key(x) for x in parts): return parts def is_private_key_list(text): return bool(get_private_keys(text)) is_mpk = lambda x: is_old_mpk(x) or is_xpub(x) is_private = lambda x: is_seed(x) or is_xprv(x) or is_private_key_list(x) is_master_key = lambda x: is_old_mpk(x) or is_xprv(x) or is_xpub(x) is_private_key = lambda x: is_xprv(x) or is_private_key_list(x) is_bip32_key = lambda x: is_xprv(x) or is_xpub(x) def bip44_derivation(account_id, bip43_purpose=44): coin = constants.net.BIP44_COIN_TYPE return "m/%d'/%d'/%d'" % (bip43_purpose, coin, int(account_id)) def from_seed(seed, passphrase, is_p2sh): t = seed_type(seed) if t == 'old': keystore = Old_KeyStore({}) keystore.add_seed(seed) elif t in ['standard', 'segwit']: keystore = BIP32_KeyStore({}) keystore.add_seed(seed) keystore.passphrase = passphrase bip32_seed = Mnemonic.mnemonic_to_seed(seed, passphrase) if t == 'standard': der = "m/" xtype = 'standard' else: der = "m/1'/" if is_p2sh else "m/0'/" xtype = 'p2wsh' if is_p2sh else 'p2wpkh' keystore.add_xprv_from_seed(bip32_seed, xtype, der) else: raise BitcoinException('Unexpected seed type {}'.format(t)) return keystore def from_private_key_list(text): keystore = Imported_KeyStore({}) for x in get_private_keys(text): keystore.import_key(x, None) return keystore def from_old_mpk(mpk): keystore = Old_KeyStore({}) keystore.add_master_public_key(mpk) return keystore def from_xpub(xpub): k = BIP32_KeyStore({}) k.xpub = xpub return k def from_xprv(xprv): xpub = bitcoin.xpub_from_xprv(xprv) k = BIP32_KeyStore({}) k.xprv = xprv k.xpub = xpub return k def from_master_key(text): if is_xprv(text): k = from_xprv(text) elif is_old_mpk(text): k = from_old_mpk(text) elif is_xpub(text): k = from_xpub(text) else: raise BitcoinException('Invalid master key') return k