Browse Source

bip32: refactor whole module. clean-up.

sqlite_db
SomberNight 6 years ago
parent
commit
85a7aa291e
No known key found for this signature in database GPG Key ID: B33B5F232C6271E9
  1. 412
      electrum/bip32.py
  2. 10
      electrum/commands.py
  3. 6
      electrum/constants.py
  4. 9
      electrum/ecc.py
  5. 32
      electrum/keystore.py
  6. 2
      electrum/mnemonic.py
  7. 13
      electrum/plugins/coldcard/coldcard.py
  8. 14
      electrum/plugins/cosigner_pool/qt.py
  9. 6
      electrum/plugins/digitalbitbox/digitalbitbox.py
  10. 10
      electrum/plugins/keepkey/clientbase.py
  11. 14
      electrum/plugins/keepkey/keepkey.py
  12. 11
      electrum/plugins/ledger/ledger.py
  13. 10
      electrum/plugins/safe_t/clientbase.py
  14. 14
      electrum/plugins/safe_t/safe_t.py
  15. 10
      electrum/plugins/trezor/clientbase.py
  16. 14
      electrum/plugins/trezor/trezor.py
  17. 41
      electrum/plugins/trustedcoin/trustedcoin.py
  18. 17
      electrum/tests/test_bitcoin.py

412
electrum/bip32.py

@ -3,7 +3,7 @@
# file LICENCE or http://www.opensource.org/licenses/mit-license.php
import hashlib
from typing import List
from typing import List, Tuple, NamedTuple, Union, Iterable
from .util import bfh, bh2u, BitcoinException, print_error
from . import constants
@ -13,257 +13,299 @@ from .bitcoin import rev_hex, int_to_hex, EncodeBase58Check, DecodeBase58Check
BIP32_PRIME = 0x80000000
UINT32_MAX = (1 << 32) - 1
def protect_against_invalid_ecpoint(func):
def func_wrapper(*args):
n = args[-1]
child_index = args[-1]
while True:
is_prime = n & BIP32_PRIME
is_prime = child_index & BIP32_PRIME
try:
return func(*args[:-1], n=n)
return func(*args[:-1], child_index=child_index)
except ecc.InvalidECPointException:
print_error('bip32 protect_against_invalid_ecpoint: skipping index')
n += 1
is_prime2 = n & BIP32_PRIME
child_index += 1
is_prime2 = child_index & BIP32_PRIME
if is_prime != is_prime2: raise OverflowError()
return func_wrapper
# Child private key derivation function (from master private key)
# k = master private key (32 bytes)
# c = master chain code (extra entropy for key derivation) (32 bytes)
# n = the index of the key we want to derive. (only 32 bits will be used)
# If n is hardened (i.e. the 32nd bit is set), the resulting private key's
# corresponding public key can NOT be determined without the master private key.
# However, if n is not hardened, the resulting private key's corresponding
# public key can be determined without the master private key.
@protect_against_invalid_ecpoint
def CKD_priv(k, c, n):
if n < 0: raise ValueError('the bip32 index needs to be non-negative')
is_prime = n & BIP32_PRIME
return _CKD_priv(k, c, bfh(rev_hex(int_to_hex(n,4))), is_prime)
def CKD_priv(parent_privkey: bytes, parent_chaincode: bytes, child_index: int) -> Tuple[bytes, bytes]:
"""Child private key derivation function (from master private key)
If n is hardened (i.e. the 32nd bit is set), the resulting private key's
corresponding public key can NOT be determined without the master private key.
However, if n is not hardened, the resulting private key's corresponding
public key can be determined without the master private key.
"""
if child_index < 0: raise ValueError('the bip32 index needs to be non-negative')
is_hardened_child = bool(child_index & BIP32_PRIME)
return _CKD_priv(parent_privkey=parent_privkey,
parent_chaincode=parent_chaincode,
child_index=bfh(rev_hex(int_to_hex(child_index, 4))),
is_hardened_child=is_hardened_child)
def _CKD_priv(k, c, s, is_prime):
def _CKD_priv(parent_privkey: bytes, parent_chaincode: bytes,
child_index: bytes, is_hardened_child: bool) -> Tuple[bytes, bytes]:
try:
keypair = ecc.ECPrivkey(k)
keypair = ecc.ECPrivkey(parent_privkey)
except ecc.InvalidECPointException as e:
raise BitcoinException('Impossible xprv (not within curve order)') from e
cK = keypair.get_public_key_bytes(compressed=True)
data = bytes([0]) + k + s if is_prime else cK + s
I = hmac_oneshot(c, data, hashlib.sha512)
parent_pubkey = keypair.get_public_key_bytes(compressed=True)
if is_hardened_child:
data = bytes([0]) + parent_privkey + child_index
else:
data = parent_pubkey + child_index
I = hmac_oneshot(parent_chaincode, data, hashlib.sha512)
I_left = ecc.string_to_number(I[0:32])
k_n = (I_left + ecc.string_to_number(k)) % ecc.CURVE_ORDER
if I_left >= ecc.CURVE_ORDER or k_n == 0:
child_privkey = (I_left + ecc.string_to_number(parent_privkey)) % ecc.CURVE_ORDER
if I_left >= ecc.CURVE_ORDER or child_privkey == 0:
raise ecc.InvalidECPointException()
k_n = ecc.number_to_string(k_n, ecc.CURVE_ORDER)
c_n = I[32:]
return k_n, c_n
# Child public key derivation function (from public key only)
# K = master public key
# c = master chain code
# n = index of key we want to derive
# This function allows us to find the nth public key, as long as n is
# not hardened. If n is hardened, we need the master private key to find it.
child_privkey = ecc.number_to_string(child_privkey, ecc.CURVE_ORDER)
child_chaincode = I[32:]
return child_privkey, child_chaincode
@protect_against_invalid_ecpoint
def CKD_pub(cK, c, n):
if n < 0: raise ValueError('the bip32 index needs to be non-negative')
if n & BIP32_PRIME: raise Exception()
return _CKD_pub(cK, c, bfh(rev_hex(int_to_hex(n,4))))
# helper function, callable with arbitrary string.
# note: 's' does not need to fit into 32 bits here! (c.f. trustedcoin billing)
def _CKD_pub(cK, c, s):
I = hmac_oneshot(c, cK + s, hashlib.sha512)
pubkey = ecc.ECPrivkey(I[0:32]) + ecc.ECPubkey(cK)
def CKD_pub(parent_pubkey: bytes, parent_chaincode: bytes, child_index: int) -> Tuple[bytes, bytes]:
"""Child public key derivation function (from public key only)
This function allows us to find the nth public key, as long as n is
not hardened. If n is hardened, we need the master private key to find it.
"""
if child_index < 0: raise ValueError('the bip32 index needs to be non-negative')
if child_index & BIP32_PRIME: raise Exception('not possible to derive hardened child from parent pubkey')
return _CKD_pub(parent_pubkey=parent_pubkey,
parent_chaincode=parent_chaincode,
child_index=bfh(rev_hex(int_to_hex(child_index, 4))))
# helper function, callable with arbitrary 'child_index' byte-string.
# i.e.: 'child_index' does not need to fit into 32 bits here! (c.f. trustedcoin billing)
def _CKD_pub(parent_pubkey: bytes, parent_chaincode: bytes, child_index: bytes) -> Tuple[bytes, bytes]:
I = hmac_oneshot(parent_chaincode, parent_pubkey + child_index, hashlib.sha512)
pubkey = ecc.ECPrivkey(I[0:32]) + ecc.ECPubkey(parent_pubkey)
if pubkey.is_at_infinity():
raise ecc.InvalidECPointException()
cK_n = pubkey.get_public_key_bytes(compressed=True)
c_n = I[32:]
return cK_n, c_n
child_pubkey = pubkey.get_public_key_bytes(compressed=True)
child_chaincode = I[32:]
return child_pubkey, child_chaincode
def xprv_header(xtype, *, net=None):
def xprv_header(xtype: str, *, net=None) -> bytes:
if net is None:
net = constants.net
return bfh("%08x" % net.XPRV_HEADERS[xtype])
return net.XPRV_HEADERS[xtype].to_bytes(length=4, byteorder="big")
def xpub_header(xtype, *, net=None):
def xpub_header(xtype: str, *, net=None) -> bytes:
if net is None:
net = constants.net
return bfh("%08x" % net.XPUB_HEADERS[xtype])
def serialize_xprv(xtype, c, k, depth=0, fingerprint=b'\x00'*4,
child_number=b'\x00'*4, *, net=None):
if not ecc.is_secret_within_curve_range(k):
raise BitcoinException('Impossible xprv (not within curve order)')
xprv = xprv_header(xtype, net=net) \
+ bytes([depth]) + fingerprint + child_number + c + bytes([0]) + k
return EncodeBase58Check(xprv)
def serialize_xpub(xtype, c, cK, depth=0, fingerprint=b'\x00'*4,
child_number=b'\x00'*4, *, net=None):
xpub = xpub_header(xtype, net=net) \
+ bytes([depth]) + fingerprint + child_number + c + cK
return EncodeBase58Check(xpub)
return net.XPUB_HEADERS[xtype].to_bytes(length=4, byteorder="big")
class InvalidMasterKeyVersionBytes(BitcoinException): pass
def deserialize_xkey(xkey, prv, *, net=None):
if net is None:
net = constants.net
xkey = DecodeBase58Check(xkey)
if len(xkey) != 78:
raise BitcoinException('Invalid length for extended key: {}'
.format(len(xkey)))
depth = xkey[4]
fingerprint = xkey[5:9]
child_number = xkey[9:13]
c = xkey[13:13+32]
header = int.from_bytes(xkey[0:4], byteorder='big')
headers = net.XPRV_HEADERS if prv else net.XPUB_HEADERS
if header not in headers.values():
raise InvalidMasterKeyVersionBytes('Invalid extended key format: {}'
.format(hex(header)))
xtype = list(headers.keys())[list(headers.values()).index(header)]
n = 33 if prv else 32
K_or_k = xkey[13+n:]
if prv and not ecc.is_secret_within_curve_range(K_or_k):
raise BitcoinException('Impossible xprv (not within curve order)')
return xtype, depth, fingerprint, child_number, c, K_or_k
def deserialize_xpub(xkey, *, net=None):
return deserialize_xkey(xkey, False, net=net)
def deserialize_xprv(xkey, *, net=None):
return deserialize_xkey(xkey, True, net=net)
class BIP32Node(NamedTuple):
xtype: str
eckey: Union[ecc.ECPubkey, ecc.ECPrivkey]
chaincode: bytes
depth: int = 0
fingerprint: bytes = b'\x00'*4
child_number: bytes = b'\x00'*4
@classmethod
def from_xkey(cls, xkey: str, *, net=None) -> 'BIP32Node':
if net is None:
net = constants.net
xkey = DecodeBase58Check(xkey)
if len(xkey) != 78:
raise BitcoinException('Invalid length for extended key: {}'
.format(len(xkey)))
depth = xkey[4]
fingerprint = xkey[5:9]
child_number = xkey[9:13]
chaincode = xkey[13:13 + 32]
header = int.from_bytes(xkey[0:4], byteorder='big')
if header in net.XPRV_HEADERS_INV:
headers_inv = net.XPRV_HEADERS_INV
is_private = True
elif header in net.XPUB_HEADERS_INV:
headers_inv = net.XPUB_HEADERS_INV
is_private = False
else:
raise InvalidMasterKeyVersionBytes(f'Invalid extended key format: {hex(header)}')
xtype = headers_inv[header]
if is_private:
eckey = ecc.ECPrivkey(xkey[13 + 33:])
else:
eckey = ecc.ECPubkey(xkey[13 + 32:])
return BIP32Node(xtype=xtype,
eckey=eckey,
chaincode=chaincode,
depth=depth,
fingerprint=fingerprint,
child_number=child_number)
@classmethod
def from_rootseed(cls, seed: bytes, *, xtype: str) -> 'BIP32Node':
I = hmac_oneshot(b"Bitcoin seed", seed, hashlib.sha512)
master_k = I[0:32]
master_c = I[32:]
return BIP32Node(xtype=xtype,
eckey=ecc.ECPrivkey(master_k),
chaincode=master_c)
def to_xprv(self, *, net=None) -> str:
if not self.is_private():
raise Exception("cannot serialize as xprv; private key missing")
payload = (xprv_header(self.xtype, net=net) +
bytes([self.depth]) +
self.fingerprint +
self.child_number +
self.chaincode +
bytes([0]) +
self.eckey.get_secret_bytes())
assert len(payload) == 78, f"unexpected xprv payload len {len(payload)}"
return EncodeBase58Check(payload)
def to_xpub(self, *, net=None) -> str:
payload = (xpub_header(self.xtype, net=net) +
bytes([self.depth]) +
self.fingerprint +
self.child_number +
self.chaincode +
self.eckey.get_public_key_bytes(compressed=True))
assert len(payload) == 78, f"unexpected xpub payload len {len(payload)}"
return EncodeBase58Check(payload)
def to_xkey(self, *, net=None) -> str:
if self.is_private():
return self.to_xprv(net=net)
else:
return self.to_xpub(net=net)
def convert_to_public(self) -> 'BIP32Node':
if not self.is_private():
return self
pubkey = ecc.ECPubkey(self.eckey.get_public_key_bytes())
return self._replace(eckey=pubkey)
def is_private(self) -> bool:
return isinstance(self.eckey, ecc.ECPrivkey)
def subkey_at_private_derivation(self, path: Union[str, Iterable[int]]) -> 'BIP32Node':
if isinstance(path, str):
path = convert_bip32_path_to_list_of_uint32(path)
if not self.is_private():
raise Exception("cannot do bip32 private derivation; private key missing")
if not path:
return self
depth = self.depth
chaincode = self.chaincode
privkey = self.eckey.get_secret_bytes()
for child_index in path:
parent_privkey = privkey
privkey, chaincode = CKD_priv(privkey, chaincode, child_index)
depth += 1
parent_pubkey = ecc.ECPrivkey(parent_privkey).get_public_key_bytes(compressed=True)
fingerprint = hash_160(parent_pubkey)[0:4]
child_number = child_index.to_bytes(length=4, byteorder="big")
return BIP32Node(xtype=self.xtype,
eckey=ecc.ECPrivkey(privkey),
chaincode=chaincode,
depth=depth,
fingerprint=fingerprint,
child_number=child_number)
def subkey_at_public_derivation(self, path: Union[str, Iterable[int]]) -> 'BIP32Node':
if isinstance(path, str):
path = convert_bip32_path_to_list_of_uint32(path)
if not path:
return self.convert_to_public()
depth = self.depth
chaincode = self.chaincode
pubkey = self.eckey.get_public_key_bytes(compressed=True)
for child_index in path:
parent_pubkey = pubkey
pubkey, chaincode = CKD_pub(pubkey, chaincode, child_index)
depth += 1
fingerprint = hash_160(parent_pubkey)[0:4]
child_number = child_index.to_bytes(length=4, byteorder="big")
return BIP32Node(xtype=self.xtype,
eckey=ecc.ECPubkey(pubkey),
chaincode=chaincode,
depth=depth,
fingerprint=fingerprint,
child_number=child_number)
def xpub_type(x):
return deserialize_xpub(x)[0]
return BIP32Node.from_xkey(x).xtype
def is_xpub(text):
try:
deserialize_xpub(text)
return True
node = BIP32Node.from_xkey(text)
return not node.is_private()
except:
return False
def is_xprv(text):
try:
deserialize_xprv(text)
return True
node = BIP32Node.from_xkey(text)
return node.is_private()
except:
return False
def xpub_from_xprv(xprv):
xtype, depth, fingerprint, child_number, c, k = deserialize_xprv(xprv)
cK = ecc.ECPrivkey(k).get_public_key_bytes(compressed=True)
return serialize_xpub(xtype, c, cK, depth, fingerprint, child_number)
def bip32_root(seed, xtype):
I = hmac_oneshot(b"Bitcoin seed", seed, hashlib.sha512)
master_k = I[0:32]
master_c = I[32:]
# create xprv first, as that will check if master_k is within curve order
xprv = serialize_xprv(xtype, master_c, master_k)
cK = ecc.ECPrivkey(master_k).get_public_key_bytes(compressed=True)
xpub = serialize_xpub(xtype, master_c, cK)
return xprv, xpub
def xpub_from_pubkey(xtype, cK):
if cK[0] not in (0x02, 0x03):
raise ValueError('Unexpected first byte: {}'.format(cK[0]))
return serialize_xpub(xtype, b'\x00'*32, cK)
return BIP32Node.from_xkey(xprv).to_xpub()
def bip32_derivation(s: str) -> int:
if not s.startswith('m/'):
raise ValueError('invalid bip32 derivation path: {}'.format(s))
s = s[2:]
for n in s.split('/'):
if n == '': continue
i = int(n[:-1]) + BIP32_PRIME if n[-1] == "'" else int(n)
yield i
def convert_bip32_path_to_list_of_uint32(n: str) -> List[int]:
"""Convert bip32 path to list of uint32 integers with prime flags
m/0/-1/1' -> [0, 0x80000001, 0x80000001]
based on code in trezorlib
"""
if not n:
return []
if n.endswith("/"):
n = n[:-1]
n = n.split('/')
# cut leading "m" if present, but do not require it
if n[0] == "m":
n = n[1:]
path = []
for x in n.split('/')[1:]:
if x == '': continue
for x in n:
if x == '':
# gracefully allow repeating "/" chars in path.
# makes concatenating paths easier
continue
prime = 0
if x.endswith("'"):
x = x.replace('\'', '')
if x.endswith("'") or x.endswith("h"):
x = x[:-1]
prime = BIP32_PRIME
if x.startswith('-'):
prime = BIP32_PRIME
path.append(abs(int(x)) | prime)
child_index = abs(int(x)) | prime
if child_index > UINT32_MAX:
raise ValueError(f"bip32 path child index too large: {child_index} > {UINT32_MAX}")
path.append(child_index)
return path
def is_bip32_derivation(x: str) -> bool:
def is_bip32_derivation(s: str) -> bool:
try:
[ i for i in bip32_derivation(x)]
return True
except :
if not s.startswith('m/'):
return False
convert_bip32_path_to_list_of_uint32(s)
except:
return False
def bip32_private_derivation(xprv, branch, sequence):
if not sequence.startswith(branch):
raise ValueError('incompatible branch ({}) and sequence ({})'
.format(branch, sequence))
if branch == sequence:
return xprv, xpub_from_xprv(xprv)
xtype, depth, fingerprint, child_number, c, k = deserialize_xprv(xprv)
sequence = sequence[len(branch):]
for n in sequence.split('/'):
if n == '': continue
i = int(n[:-1]) + BIP32_PRIME if n[-1] == "'" else int(n)
parent_k = k
k, c = CKD_priv(k, c, i)
depth += 1
parent_cK = ecc.ECPrivkey(parent_k).get_public_key_bytes(compressed=True)
fingerprint = hash_160(parent_cK)[0:4]
child_number = bfh("%08X"%i)
cK = ecc.ECPrivkey(k).get_public_key_bytes(compressed=True)
xpub = serialize_xpub(xtype, c, cK, depth, fingerprint, child_number)
xprv = serialize_xprv(xtype, c, k, depth, fingerprint, child_number)
return xprv, xpub
def bip32_public_derivation(xpub, branch, sequence):
xtype, depth, fingerprint, child_number, c, cK = deserialize_xpub(xpub)
if not sequence.startswith(branch):
raise ValueError('incompatible branch ({}) and sequence ({})'
.format(branch, sequence))
sequence = sequence[len(branch):]
for n in sequence.split('/'):
if n == '': continue
i = int(n)
parent_cK = cK
cK, c = CKD_pub(cK, c, i)
depth += 1
fingerprint = hash_160(parent_cK)[0:4]
child_number = bfh("%08X"%i)
return serialize_xpub(xtype, c, cK, depth, fingerprint, child_number)
def bip32_private_key(sequence, k, chain):
for i in sequence:
k, chain = CKD_priv(k, chain, i)
return k
else:
return True

10
electrum/commands.py

@ -39,6 +39,7 @@ from .util import bfh, bh2u, format_satoshis, json_decode, print_error, json_enc
from . import bitcoin
from .bitcoin import is_address, hash_160, COIN, TYPE_ADDRESS
from . import bip32
from .bip32 import BIP32Node
from .i18n import _
from .transaction import Transaction, multisig_script, TxOutput
from .paymentrequest import PR_PAID, PR_UNPAID, PR_UNKNOWN, PR_EXPIRED
@ -439,12 +440,11 @@ class Commands:
@command('')
def convert_xkey(self, xkey, xtype):
"""Convert xtype of a master key. e.g. xpub -> ypub"""
is_xprv = bip32.is_xprv(xkey)
if not bip32.is_xpub(xkey) and not is_xprv:
try:
node = BIP32Node.from_xkey(xkey)
except:
raise Exception('xkey should be a master public/private key')
_, depth, fingerprint, child_number, c, cK = bip32.deserialize_xkey(xkey, is_xprv)
serialize = bip32.serialize_xprv if is_xprv else bip32.serialize_xpub
return serialize(xtype, c, cK, depth, fingerprint, child_number)
return node._replace(xtype=xtype).to_xkey()
@command('wp')
def getseed(self, password=None):

6
electrum/constants.py

@ -26,6 +26,8 @@
import os
import json
from .util import inv_dict
def read_json(filename, default):
path = os.path.join(os.path.dirname(__file__), filename)
@ -63,6 +65,7 @@ class BitcoinMainnet(AbstractNet):
'p2wpkh': 0x04b2430c, # zprv
'p2wsh': 0x02aa7a99, # Zprv
}
XPRV_HEADERS_INV = inv_dict(XPRV_HEADERS)
XPUB_HEADERS = {
'standard': 0x0488b21e, # xpub
'p2wpkh-p2sh': 0x049d7cb2, # ypub
@ -70,6 +73,7 @@ class BitcoinMainnet(AbstractNet):
'p2wpkh': 0x04b24746, # zpub
'p2wsh': 0x02aa7ed3, # Zpub
}
XPUB_HEADERS_INV = inv_dict(XPUB_HEADERS)
BIP44_COIN_TYPE = 0
@ -92,6 +96,7 @@ class BitcoinTestnet(AbstractNet):
'p2wpkh': 0x045f18bc, # vprv
'p2wsh': 0x02575048, # Vprv
}
XPRV_HEADERS_INV = inv_dict(XPRV_HEADERS)
XPUB_HEADERS = {
'standard': 0x043587cf, # tpub
'p2wpkh-p2sh': 0x044a5262, # upub
@ -99,6 +104,7 @@ class BitcoinTestnet(AbstractNet):
'p2wpkh': 0x045f1cf6, # vpub
'p2wsh': 0x02575483, # Vpub
}
XPUB_HEADERS_INV = inv_dict(XPUB_HEADERS)
BIP44_COIN_TYPE = 1

9
electrum/ecc.py

@ -229,6 +229,9 @@ class ECPubkey(object):
def point(self) -> Tuple[int, int]:
return self._pubkey.point.x(), self._pubkey.point.y()
def __repr__(self):
return f"<ECPubkey {self.get_public_key_hex()}>"
def __mul__(self, other: int):
if not isinstance(other, int):
raise TypeError('multiplication not defined for ECPubkey and {}'.format(type(other)))
@ -375,6 +378,12 @@ class ECPrivkey(ECPubkey):
privkey_32bytes = number_to_string(scalar, CURVE_ORDER)
return privkey_32bytes
def __repr__(self):
return f"<ECPrivkey {self.get_public_key_hex()}>"
def get_secret_bytes(self) -> bytes:
return number_to_string(self.secret_scalar, CURVE_ORDER)
def sign(self, data: bytes, sigencode=None, sigdecode=None) -> bytes:
if sigencode is None:
sigencode = sig_string_from_r_and_s

32
electrum/keystore.py

@ -31,10 +31,8 @@ from typing import Tuple
from . import bitcoin, ecc, constants, bip32
from .bitcoin import (deserialize_privkey, serialize_privkey,
public_key_to_p2pkh)
from .bip32 import (bip32_public_derivation, deserialize_xpub, CKD_pub,
bip32_root, deserialize_xprv, bip32_private_derivation,
bip32_private_key, bip32_derivation, BIP32_PRIME,
is_xpub, is_xprv)
from .bip32 import (convert_bip32_path_to_list_of_uint32, BIP32_PRIME,
is_xpub, is_xprv, BIP32Node)
from .ecc import string_to_number, number_to_string
from .crypto import (pw_decode, pw_encode, sha256, sha256d, PW_HASH_VERSION_LATEST,
SUPPORTED_PW_HASH_VERSIONS, UnsupportedPasswordHashVersion)
@ -133,6 +131,9 @@ class Software_KeyStore(KeyStore):
def check_password(self, password):
raise NotImplementedError() # implemented by subclasses
def get_private_key(self, *args, **kwargs) -> Tuple[bytes, bool]:
raise NotImplementedError() # implemented by subclasses
class Imported_KeyStore(Software_KeyStore):
# keystore for imported private keys
@ -263,7 +264,8 @@ class Xpub:
def derive_pubkey(self, for_change, n):
xpub = self.xpub_change if for_change else self.xpub_receive
if xpub is None:
xpub = bip32_public_derivation(self.xpub, "", "/%d"%for_change)
rootnode = BIP32Node.from_xkey(self.xpub)
xpub = rootnode.subkey_at_public_derivation((for_change,)).to_xpub()
if for_change:
self.xpub_change = xpub
else:
@ -272,10 +274,8 @@ class Xpub:
@classmethod
def get_pubkey_from_xpub(self, xpub, sequence):
_, _, _, _, c, cK = deserialize_xpub(xpub)
for i in sequence:
cK, c = CKD_pub(cK, c, i)
return bh2u(cK)
node = BIP32Node.from_xkey(xpub).subkey_at_public_derivation(sequence)
return node.eckey.get_public_key_hex(compressed=True)
def get_xpubkey(self, c, i):
s = ''.join(map(lambda x: bitcoin.int_to_hex(x,2), (c, i)))
@ -334,7 +334,7 @@ class BIP32_KeyStore(Deterministic_KeyStore, Xpub):
def check_password(self, password):
xprv = pw_decode(self.xprv, password, version=self.pw_hash_version)
if deserialize_xprv(xprv)[4] != deserialize_xpub(self.xpub)[4]:
if BIP32Node.from_xkey(xprv).chaincode != BIP32Node.from_xkey(self.xpub).chaincode:
raise InvalidPassword()
def update_password(self, old_password, new_password):
@ -360,14 +360,14 @@ class BIP32_KeyStore(Deterministic_KeyStore, Xpub):
self.xpub = bip32.xpub_from_xprv(xprv)
def add_xprv_from_seed(self, bip32_seed, xtype, derivation):
xprv, xpub = bip32_root(bip32_seed, xtype)
xprv, xpub = bip32_private_derivation(xprv, "m/", derivation)
self.add_xprv(xprv)
rootnode = BIP32Node.from_rootseed(bip32_seed, xtype=xtype)
node = rootnode.subkey_at_private_derivation(derivation)
self.add_xprv(node.to_xprv())
def get_private_key(self, sequence, password):
xprv = self.get_master_private_key(password)
_, _, _, _, c, k = deserialize_xprv(xprv)
pk = bip32_private_key(sequence, k, c)
node = BIP32Node.from_xkey(xprv).subkey_at_private_derivation(sequence)
pk = node.eckey.get_secret_bytes()
return pk, True
@ -658,7 +658,7 @@ def xtype_from_derivation(derivation: str) -> str:
elif derivation.startswith("m/45'"):
return 'standard'
bip32_indices = list(bip32_derivation(derivation))
bip32_indices = convert_bip32_path_to_list_of_uint32(derivation)
if len(bip32_indices) >= 4:
if bip32_indices[0] == 48 + BIP32_PRIME:
# m / purpose' / coin_type' / account' / script_type' / change / address_index

2
electrum/mnemonic.py

@ -126,7 +126,7 @@ class Mnemonic(object):
print_error("wordlist has %d words"%len(self.wordlist))
@classmethod
def mnemonic_to_seed(self, mnemonic, passphrase):
def mnemonic_to_seed(self, mnemonic, passphrase) -> bytes:
PBKDF2_ROUNDS = 2048
mnemonic = normalize_text(mnemonic)
passphrase = passphrase or ''

13
electrum/plugins/coldcard/coldcard.py

@ -6,7 +6,7 @@ from struct import pack, unpack
import os, sys, time, io
import traceback
from electrum.bip32 import serialize_xpub, deserialize_xpub, InvalidMasterKeyVersionBytes
from electrum.bip32 import BIP32Node, InvalidMasterKeyVersionBytes
from electrum.i18n import _
from electrum.plugin import Device
from electrum.keystore import Hardware_KeyStore, xpubkey_to_pubkey, Xpub
@ -40,12 +40,7 @@ try:
def mitm_verify(self, sig, expect_xpub):
# verify a signature (65 bytes) over the session key, using the master bip32 node
# - customized to use specific EC library of Electrum.
from electrum.ecc import ECPubkey
xtype, depth, parent_fingerprint, child_number, chain_code, K_or_k \
= deserialize_xpub(expect_xpub)
pubkey = ECPubkey(K_or_k)
pubkey = BIP32Node.from_xkey(expect_xpub).eckey
try:
pubkey.verify_message_hash(sig[1:65], self.session_key)
return True
@ -191,12 +186,12 @@ class CKCCClient:
# TODO handle timeout?
# change type of xpub to the requested type
try:
__, depth, fingerprint, child_number, c, cK = deserialize_xpub(xpub)
node = BIP32Node.from_xkey(xpub)
except InvalidMasterKeyVersionBytes:
raise UserFacingException(_('Invalid xpub magic. Make sure your {} device is set to the correct chain.')
.format(self.device)) from None
if xtype != 'standard':
xpub = serialize_xpub(xtype, c, cK, depth, fingerprint, child_number)
xpub = node._replace(xtype=xtype).to_xpub()
return xpub
def ping_check(self):

14
electrum/plugins/cosigner_pool/qt.py

@ -29,8 +29,9 @@ from xmlrpc.client import ServerProxy
from PyQt5.QtCore import QObject, pyqtSignal
from PyQt5.QtWidgets import QPushButton
from electrum import util, keystore, ecc, bip32, crypto
from electrum import util, keystore, ecc, crypto
from electrum import transaction
from electrum.bip32 import BIP32Node
from electrum.plugin import BasePlugin, hook
from electrum.i18n import _
from electrum.wallet import Multisig_Wallet
@ -131,12 +132,12 @@ class Plugin(BasePlugin):
self.cosigner_list = []
for key, keystore in wallet.keystores.items():
xpub = keystore.get_master_public_key()
K = bip32.deserialize_xpub(xpub)[-1]
_hash = bh2u(crypto.sha256d(K))
pubkey = BIP32Node.from_xkey(xpub).eckey.get_public_key_bytes(compressed=True)
_hash = bh2u(crypto.sha256d(pubkey))
if not keystore.is_watching_only():
self.keys.append((key, _hash, window))
else:
self.cosigner_list.append((window, xpub, K, _hash))
self.cosigner_list.append((window, xpub, pubkey, _hash))
if self.listener:
self.listener.set_keyhashes([t[1] for t in self.keys])
@ -221,9 +222,8 @@ class Plugin(BasePlugin):
if not xprv:
return
try:
k = bip32.deserialize_xprv(xprv)[-1]
EC = ecc.ECPrivkey(k)
message = bh2u(EC.decrypt_message(message))
privkey = BIP32Node.from_xkey(xprv).eckey
message = bh2u(privkey.decrypt_message(message))
except Exception as e:
traceback.print_exc(file=sys.stdout)
window.show_error(_('Error decrypting message') + ':\n' + str(e))

6
electrum/plugins/digitalbitbox/digitalbitbox.py

@ -18,7 +18,7 @@ import time
from electrum.crypto import sha256d, EncodeAES_base64, EncodeAES_bytes, DecodeAES_bytes, hmac_oneshot
from electrum.bitcoin import (TYPE_ADDRESS, push_script, var_int, public_key_to_p2pkh,
is_address)
from electrum.bip32 import serialize_xpub, deserialize_xpub
from electrum.bip32 import BIP32Node
from electrum import ecc
from electrum.ecc import msg_magic
from electrum.wallet import Standard_Wallet
@ -118,8 +118,8 @@ class DigitalBitbox_Client():
# only ever returns the mainnet standard type, but it is agnostic
# to the type when signing.
if xtype != 'standard' or constants.net.TESTNET:
_, depth, fingerprint, child_number, c, cK = deserialize_xpub(xpub, net=constants.BitcoinMainnet)
xpub = serialize_xpub(xtype, c, cK, depth, fingerprint, child_number)
node = BIP32Node.from_xkey(xpub, net=constants.BitcoinMainnet)
xpub = node._replace(xtype=xtype).to_xpub()
return xpub
else:
raise Exception('no reply')

10
electrum/plugins/keepkey/clientbase.py

@ -1,10 +1,11 @@
import time
from struct import pack
from electrum import ecc
from electrum.i18n import _
from electrum.util import PrintError, UserCancelled
from electrum.keystore import bip39_normalize_passphrase
from electrum.bip32 import serialize_xpub, convert_bip32_path_to_list_of_uint32
from electrum.bip32 import BIP32Node, convert_bip32_path_to_list_of_uint32
class GuiMixin(object):
@ -154,7 +155,12 @@ class KeepKeyClientBase(GuiMixin, PrintError):
address_n = self.expand_path(bip32_path)
creating = False
node = self.get_public_node(address_n, creating).node
return serialize_xpub(xtype, node.chain_code, node.public_key, node.depth, self.i4b(node.fingerprint), self.i4b(node.child_num))
return BIP32Node(xtype=xtype,
eckey=ecc.ECPubkey(node.public_key),
chaincode=node.chain_code,
depth=node.depth,
fingerprint=self.i4b(node.fingerprint),
child_number=self.i4b(node.child_num)).to_xpub()
def toggle_passphrase(self):
if self.features.passphrase_protection:

14
electrum/plugins/keepkey/keepkey.py

@ -4,7 +4,7 @@ import sys
from electrum.util import bfh, bh2u, UserCancelled, UserFacingException
from electrum.bitcoin import TYPE_ADDRESS, TYPE_SCRIPT
from electrum.bip32 import deserialize_xpub
from electrum.bip32 import BIP32Node
from electrum import constants
from electrum.i18n import _
from electrum.transaction import deserialize, Transaction
@ -227,13 +227,13 @@ class KeepKeyPlugin(HW_PluginBase):
label, language)
def _make_node_path(self, xpub, address_n):
_, depth, fingerprint, child_num, chain_code, key = deserialize_xpub(xpub)
bip32node = BIP32Node.from_xkey(xpub)
node = self.types.HDNodeType(
depth=depth,
fingerprint=int.from_bytes(fingerprint, 'big'),
child_num=int.from_bytes(child_num, 'big'),
chain_code=chain_code,
public_key=key,
depth=bip32node.depth,
fingerprint=int.from_bytes(bip32node.fingerprint, 'big'),
child_num=int.from_bytes(bip32node.child_number, 'big'),
chain_code=bip32node.chaincode,
public_key=bip32node.eckey.get_public_key_bytes(compressed=True),
)
return self.types.HDNodePathType(node=node, address_n=address_n)

11
electrum/plugins/ledger/ledger.py

@ -3,8 +3,9 @@ import hashlib
import sys
import traceback
from electrum import ecc
from electrum.bitcoin import TYPE_ADDRESS, int_to_hex, var_int
from electrum.bip32 import serialize_xpub
from electrum.bip32 import BIP32Node
from electrum.i18n import _
from electrum.keystore import Hardware_KeyStore
from electrum.transaction import Transaction
@ -112,8 +113,12 @@ class Ledger_Client():
depth = len(splitPath)
lastChild = splitPath[len(splitPath) - 1].split('\'')
childnum = int(lastChild[0]) if len(lastChild) == 1 else 0x80000000 | int(lastChild[0])
xpub = serialize_xpub(xtype, nodeData['chainCode'], publicKey, depth, self.i4b(fingerprint), self.i4b(childnum))
return xpub
return BIP32Node(xtype=xtype,
eckey=ecc.ECPubkey(publicKey),
chaincode=nodeData['chainCode'],
depth=depth,
fingerprint=self.i4b(fingerprint),
child_number=self.i4b(childnum)).to_xpub()
def has_detached_pin_support(self, client):
try:

10
electrum/plugins/safe_t/clientbase.py

@ -1,10 +1,11 @@
import time
from struct import pack
from electrum import ecc
from electrum.i18n import _
from electrum.util import PrintError, UserCancelled
from electrum.keystore import bip39_normalize_passphrase
from electrum.bip32 import serialize_xpub, convert_bip32_path_to_list_of_uint32
from electrum.bip32 import BIP32Node, convert_bip32_path_to_list_of_uint32
class GuiMixin(object):
@ -156,7 +157,12 @@ class SafeTClientBase(GuiMixin, PrintError):
address_n = self.expand_path(bip32_path)
creating = False
node = self.get_public_node(address_n, creating).node
return serialize_xpub(xtype, node.chain_code, node.public_key, node.depth, self.i4b(node.fingerprint), self.i4b(node.child_num))
return BIP32Node(xtype=xtype,
eckey=ecc.ECPubkey(node.public_key),
chaincode=node.chain_code,
depth=node.depth,
fingerprint=self.i4b(node.fingerprint),
child_number=self.i4b(node.child_num)).to_xpub()
def toggle_passphrase(self):
if self.features.passphrase_protection:

14
electrum/plugins/safe_t/safe_t.py

@ -4,7 +4,7 @@ import sys
from electrum.util import bfh, bh2u, versiontuple, UserCancelled, UserFacingException
from electrum.bitcoin import TYPE_ADDRESS, TYPE_SCRIPT
from electrum.bip32 import deserialize_xpub
from electrum.bip32 import BIP32Node
from electrum import constants
from electrum.i18n import _
from electrum.plugin import Device
@ -244,13 +244,13 @@ class SafeTPlugin(HW_PluginBase):
label, language)
def _make_node_path(self, xpub, address_n):
_, depth, fingerprint, child_num, chain_code, key = deserialize_xpub(xpub)
bip32node = BIP32Node.from_xkey(xpub)
node = self.types.HDNodeType(
depth=depth,
fingerprint=int.from_bytes(fingerprint, 'big'),
child_num=int.from_bytes(child_num, 'big'),
chain_code=chain_code,
public_key=key,
depth=bip32node.depth,
fingerprint=int.from_bytes(bip32node.fingerprint, 'big'),
child_num=int.from_bytes(bip32node.child_number, 'big'),
chain_code=bip32node.chaincode,
public_key=bip32node.eckey.get_public_key_bytes(compressed=True),
)
return self.types.HDNodePathType(node=node, address_n=address_n)

10
electrum/plugins/trezor/clientbase.py

@ -1,10 +1,11 @@
import time
from struct import pack
from electrum import ecc
from electrum.i18n import _
from electrum.util import PrintError, UserCancelled, UserFacingException
from electrum.keystore import bip39_normalize_passphrase
from electrum.bip32 import serialize_xpub, convert_bip32_path_to_list_of_uint32 as parse_path
from electrum.bip32 import BIP32Node, convert_bip32_path_to_list_of_uint32 as parse_path
from trezorlib.client import TrezorClient
from trezorlib.exceptions import TrezorFailure, Cancelled, OutdatedFirmwareError
@ -120,7 +121,12 @@ class TrezorClientBase(PrintError):
address_n = parse_path(bip32_path)
with self.run_flow(creating_wallet=creating):
node = trezorlib.btc.get_public_node(self.client, address_n).node
return serialize_xpub(xtype, node.chain_code, node.public_key, node.depth, self.i4b(node.fingerprint), self.i4b(node.child_num))
return BIP32Node(xtype=xtype,
eckey=ecc.ECPubkey(node.public_key),
chaincode=node.chain_code,
depth=node.depth,
fingerprint=self.i4b(node.fingerprint),
child_number=self.i4b(node.child_num)).to_xpub()
def toggle_passphrase(self):
if self.features.passphrase_protection:

14
electrum/plugins/trezor/trezor.py

@ -3,7 +3,7 @@ import sys
from electrum.util import bfh, bh2u, versiontuple, UserCancelled, UserFacingException
from electrum.bitcoin import TYPE_ADDRESS, TYPE_SCRIPT
from electrum.bip32 import deserialize_xpub, convert_bip32_path_to_list_of_uint32 as parse_path
from electrum.bip32 import BIP32Node, convert_bip32_path_to_list_of_uint32 as parse_path
from electrum import constants
from electrum.i18n import _
from electrum.plugin import Device
@ -241,13 +241,13 @@ class TrezorPlugin(HW_PluginBase):
raise RuntimeError("Unsupported recovery method")
def _make_node_path(self, xpub, address_n):
_, depth, fingerprint, child_num, chain_code, key = deserialize_xpub(xpub)
bip32node = BIP32Node.from_xkey(xpub)
node = HDNodeType(
depth=depth,
fingerprint=int.from_bytes(fingerprint, 'big'),
child_num=int.from_bytes(child_num, 'big'),
chain_code=chain_code,
public_key=key,
depth=bip32node.depth,
fingerprint=int.from_bytes(bip32node.fingerprint, 'big'),
child_num=int.from_bytes(bip32node.child_number, 'big'),
chain_code=bip32node.chaincode,
public_key=bip32node.eckey.get_public_key_bytes(compressed=True),
)
return HDNodePathType(node=node, address_n=address_n)

41
electrum/plugins/trustedcoin/trustedcoin.py

@ -37,8 +37,7 @@ from aiohttp import ClientResponse
from electrum import ecc, constants, keystore, version, bip32, bitcoin
from electrum.bitcoin import TYPE_ADDRESS
from electrum.bip32 import (deserialize_xpub, deserialize_xprv, bip32_private_key, CKD_pub,
serialize_xpub, bip32_root, bip32_private_derivation, xpub_type)
from electrum.bip32 import CKD_pub, BIP32Node, xpub_type
from electrum.crypto import sha256
from electrum.transaction import TxOutput
from electrum.mnemonic import Mnemonic, seed_type, is_any_2fa_seed_type
@ -59,9 +58,8 @@ def get_signing_xpub(xtype):
raise NotImplementedError('xtype: {}'.format(xtype))
if xtype == 'standard':
return xpub
_, depth, fingerprint, child_number, c, cK = bip32.deserialize_xpub(xpub)
xpub = bip32.serialize_xpub(xtype, c, cK, depth, fingerprint, child_number)
return xpub
node = BIP32Node.from_xkey(xpub)
return node._replace(xtype=xtype).to_xpub()
def get_billing_xpub():
if constants.net.TESTNET:
@ -388,20 +386,26 @@ def get_user_id(storage):
short_id = hashlib.sha256(long_id).hexdigest()
return long_id, short_id
def make_xpub(xpub, s):
version, _, _, _, c, cK = deserialize_xpub(xpub)
cK2, c2 = bip32._CKD_pub(cK, c, s)
return serialize_xpub(version, c2, cK2)
def make_xpub(xpub, s) -> str:
rootnode = BIP32Node.from_xkey(xpub)
child_pubkey, child_chaincode = bip32._CKD_pub(parent_pubkey=rootnode.eckey.get_public_key_bytes(compressed=True),
parent_chaincode=rootnode.chaincode,
child_index=s)
child_node = BIP32Node(xtype=rootnode.xtype,
eckey=ecc.ECPubkey(child_pubkey),
chaincode=child_chaincode)
return child_node.to_xpub()
def make_billing_address(wallet, num, addr_type):
long_id, short_id = wallet.get_user_id()
xpub = make_xpub(get_billing_xpub(), long_id)
version, _, _, _, c, cK = deserialize_xpub(xpub)
cK, c = CKD_pub(cK, c, num)
usernode = BIP32Node.from_xkey(xpub)
child_node = usernode.subkey_at_public_derivation([num])
pubkey = child_node.eckey.get_public_key_bytes(compressed=True)
if addr_type == 'legacy':
return bitcoin.public_key_to_p2pkh(cK)
return bitcoin.public_key_to_p2pkh(pubkey)
elif addr_type == 'segwit':
return bitcoin.public_key_to_p2wpkh(cK)
return bitcoin.public_key_to_p2wpkh(pubkey)
else:
raise ValueError(f'unexpected billing type: {addr_type}')
@ -538,9 +542,9 @@ class TrustedCoinPlugin(BasePlugin):
assert is_any_2fa_seed_type(t)
xtype = 'standard' if t == '2fa' else 'p2wsh'
bip32_seed = Mnemonic.mnemonic_to_seed(seed, passphrase)
xprv, xpub = bip32_root(bip32_seed, xtype)
xprv, xpub = bip32_private_derivation(xprv, "m/", derivation)
return xprv, xpub
rootnode = BIP32Node.from_rootseed(bip32_seed, xtype=xtype)
child_node = rootnode.subkey_at_private_derivation(derivation)
return child_node.to_xprv(), child_node.to_xpub()
@classmethod
def xkeys_from_seed(self, seed, passphrase):
@ -721,9 +725,8 @@ class TrustedCoinPlugin(BasePlugin):
challenge = r.get('challenge')
message = 'TRUSTEDCOIN CHALLENGE: ' + challenge
def f(xprv):
_, _, _, _, c, k = deserialize_xprv(xprv)
pk = bip32_private_key([0, 0], k, c)
key = ecc.ECPrivkey(pk)
rootnode = BIP32Node.from_xkey(xprv)
key = rootnode.subkey_at_private_derivation((0, 0)).eckey
sig = key.sign_message(message, True)
return base64.b64encode(sig).decode()

17
electrum/tests/test_bitcoin.py

@ -9,7 +9,7 @@ from electrum.bitcoin import (public_key_to_p2pkh, address_from_private_key,
is_compressed_privkey, EncodeBase58Check,
script_num_to_hex, push_script, add_number_to_script, int_to_hex,
opcodes)
from electrum.bip32 import (bip32_root, bip32_public_derivation, bip32_private_derivation,
from electrum.bip32 import (BIP32Node,
xpub_from_xprv, xpub_type, is_xprv, is_bip32_derivation,
is_xpub, convert_bip32_path_to_list_of_uint32)
from electrum.crypto import sha256d, SUPPORTED_PW_HASH_VERSIONS
@ -405,19 +405,18 @@ class Test_xprv_xpub(SequentialTestCase):
'xtype': 'p2wpkh'},
)
def _do_test_bip32(self, seed, sequence):
xprv, xpub = bip32_root(bfh(seed), 'standard')
def _do_test_bip32(self, seed: str, sequence):
node = BIP32Node.from_rootseed(bfh(seed), xtype='standard')
xprv, xpub = node.to_xprv(), node.to_xpub()
self.assertEqual("m/", sequence[0:2])
path = 'm'
sequence = sequence[2:]
for n in sequence.split('/'):
child_path = path + '/' + n
if n[-1] != "'":
xpub2 = bip32_public_derivation(xpub, path, child_path)
xprv, xpub = bip32_private_derivation(xprv, path, child_path)
xpub2 = BIP32Node.from_xkey(xpub).subkey_at_public_derivation(n).to_xpub()
node = BIP32Node.from_xkey(xprv).subkey_at_private_derivation(n)
xprv, xpub = node.to_xprv(), node.to_xpub()
if n[-1] != "'":
self.assertEqual(xpub, xpub2)
path = child_path
return xpub, xprv
@ -474,7 +473,7 @@ class Test_xprv_xpub(SequentialTestCase):
def test_convert_bip32_path_to_list_of_uint32(self):
self.assertEqual([0, 0x80000001, 0x80000001], convert_bip32_path_to_list_of_uint32("m/0/-1/1'"))
self.assertEqual([], convert_bip32_path_to_list_of_uint32("m/"))
self.assertEqual([2147483692, 2147488889, 221], convert_bip32_path_to_list_of_uint32("m/44'/5241'/221"))
self.assertEqual([2147483692, 2147488889, 221], convert_bip32_path_to_list_of_uint32("m/44'/5241h/221"))
def test_xtype_from_derivation(self):
self.assertEqual('standard', xtype_from_derivation("m/44'"))

Loading…
Cancel
Save