Browse Source

bitcoin/ecc: some more type annotations

3.3.3.1
SomberNight 6 years ago
parent
commit
36f64d1ad9
No known key found for this signature in database GPG Key ID: B33B5F232C6271E9
  1. 98
      electrum/bitcoin.py
  2. 16
      electrum/ecc.py
  3. 2
      electrum/mnemonic.py
  4. 2
      electrum/paymentrequest.py
  5. 6
      electrum/tests/test_bitcoin.py

98
electrum/bitcoin.py

@ -24,7 +24,7 @@
# SOFTWARE. # SOFTWARE.
import hashlib import hashlib
from typing import List, Tuple, TYPE_CHECKING from typing import List, Tuple, TYPE_CHECKING, Optional, Union
from .util import bfh, bh2u, BitcoinException, assert_bytes, to_bytes, inv_dict from .util import bfh, bh2u, BitcoinException, assert_bytes, to_bytes, inv_dict
from . import version from . import version
@ -49,7 +49,7 @@ TYPE_PUBKEY = 1
TYPE_SCRIPT = 2 TYPE_SCRIPT = 2
def rev_hex(s): def rev_hex(s: str) -> str:
return bh2u(bfh(s)[::-1]) return bh2u(bfh(s)[::-1])
@ -162,22 +162,25 @@ def dust_threshold(network: 'Network'=None) -> int:
return 182 * 3 * relayfee(network) // 1000 return 182 * 3 * relayfee(network) // 1000
hash_encode = lambda x: bh2u(x[::-1]) def hash_encode(x: bytes) -> str:
hash_decode = lambda x: bfh(x)[::-1] return bh2u(x[::-1])
hmac_sha_512 = lambda x, y: hmac_oneshot(x, y, hashlib.sha512)
def hash_decode(x: str) -> bytes:
return bfh(x)[::-1]
################################## electrum seeds ################################## electrum seeds
def is_new_seed(x, prefix=version.SEED_PREFIX): def is_new_seed(x: str, prefix=version.SEED_PREFIX) -> bool:
from . import mnemonic from . import mnemonic
x = mnemonic.normalize_text(x) x = mnemonic.normalize_text(x)
s = bh2u(hmac_sha_512(b"Seed version", x.encode('utf8'))) s = bh2u(hmac_oneshot(b"Seed version", x.encode('utf8'), hashlib.sha512))
return s.startswith(prefix) return s.startswith(prefix)
def is_old_seed(seed): def is_old_seed(seed: str) -> bool:
from . import old_mnemonic, mnemonic from . import old_mnemonic, mnemonic
seed = mnemonic.normalize_text(seed) seed = mnemonic.normalize_text(seed)
words = seed.split() words = seed.split()
@ -195,7 +198,7 @@ def is_old_seed(seed):
return is_hex or (uses_electrum_words and (len(words) == 12 or len(words) == 24)) return is_hex or (uses_electrum_words and (len(words) == 12 or len(words) == 24))
def seed_type(x): def seed_type(x: str) -> str:
if is_old_seed(x): if is_old_seed(x):
return 'old' return 'old'
elif is_new_seed(x): elif is_new_seed(x):
@ -206,29 +209,31 @@ def seed_type(x):
return '2fa' return '2fa'
return '' return ''
is_seed = lambda x: bool(seed_type(x))
def is_seed(x: str) -> bool:
return bool(seed_type(x))
############ functions from pywallet ##################### ############ functions from pywallet #####################
def hash160_to_b58_address(h160: bytes, addrtype): def hash160_to_b58_address(h160: bytes, addrtype: int) -> str:
s = bytes([addrtype]) s = bytes([addrtype]) + h160
s += h160 s = s + sha256d(s)[0:4]
return base_encode(s+sha256d(s)[0:4], base=58) return base_encode(s, base=58)
def b58_address_to_hash160(addr): def b58_address_to_hash160(addr: str) -> Tuple[int, bytes]:
addr = to_bytes(addr, 'ascii') addr = to_bytes(addr, 'ascii')
_bytes = base_decode(addr, 25, base=58) _bytes = base_decode(addr, 25, base=58)
return _bytes[0], _bytes[1:21] return _bytes[0], _bytes[1:21]
def hash160_to_p2pkh(h160, *, net=None): def hash160_to_p2pkh(h160: bytes, *, net=None) -> str:
if net is None: if net is None:
net = constants.net net = constants.net
return hash160_to_b58_address(h160, net.ADDRTYPE_P2PKH) return hash160_to_b58_address(h160, net.ADDRTYPE_P2PKH)
def hash160_to_p2sh(h160, *, net=None): def hash160_to_p2sh(h160: bytes, *, net=None) -> str:
if net is None: if net is None:
net = constants.net net = constants.net
return hash160_to_b58_address(h160, net.ADDRTYPE_P2SH) return hash160_to_b58_address(h160, net.ADDRTYPE_P2SH)
@ -236,26 +241,26 @@ def hash160_to_p2sh(h160, *, net=None):
def public_key_to_p2pkh(public_key: bytes) -> str: def public_key_to_p2pkh(public_key: bytes) -> str:
return hash160_to_p2pkh(hash_160(public_key)) return hash160_to_p2pkh(hash_160(public_key))
def hash_to_segwit_addr(h, witver, *, net=None): def hash_to_segwit_addr(h: bytes, witver: int, *, net=None) -> str:
if net is None: if net is None:
net = constants.net net = constants.net
return segwit_addr.encode(net.SEGWIT_HRP, witver, h) return segwit_addr.encode(net.SEGWIT_HRP, witver, h)
def public_key_to_p2wpkh(public_key): def public_key_to_p2wpkh(public_key: bytes) -> str:
return hash_to_segwit_addr(hash_160(public_key), witver=0) return hash_to_segwit_addr(hash_160(public_key), witver=0)
def script_to_p2wsh(script): def script_to_p2wsh(script: str) -> str:
return hash_to_segwit_addr(sha256(bfh(script)), witver=0) return hash_to_segwit_addr(sha256(bfh(script)), witver=0)
def p2wpkh_nested_script(pubkey): def p2wpkh_nested_script(pubkey: str) -> str:
pkh = bh2u(hash_160(bfh(pubkey))) pkh = bh2u(hash_160(bfh(pubkey)))
return '00' + push_script(pkh) return '00' + push_script(pkh)
def p2wsh_nested_script(witness_script): def p2wsh_nested_script(witness_script: str) -> str:
wsh = bh2u(sha256(bfh(witness_script))) wsh = bh2u(sha256(bfh(witness_script)))
return '00' + push_script(wsh) return '00' + push_script(wsh)
def pubkey_to_address(txin_type, pubkey): def pubkey_to_address(txin_type: str, pubkey: str) -> str:
if txin_type == 'p2pkh': if txin_type == 'p2pkh':
return public_key_to_p2pkh(bfh(pubkey)) return public_key_to_p2pkh(bfh(pubkey))
elif txin_type == 'p2wpkh': elif txin_type == 'p2wpkh':
@ -266,7 +271,7 @@ def pubkey_to_address(txin_type, pubkey):
else: else:
raise NotImplementedError(txin_type) raise NotImplementedError(txin_type)
def redeem_script_to_address(txin_type, redeem_script): def redeem_script_to_address(txin_type: str, redeem_script: str) -> str:
if txin_type == 'p2sh': if txin_type == 'p2sh':
return hash160_to_p2sh(hash_160(bfh(redeem_script))) return hash160_to_p2sh(hash_160(bfh(redeem_script)))
elif txin_type == 'p2wsh': elif txin_type == 'p2wsh':
@ -278,7 +283,7 @@ def redeem_script_to_address(txin_type, redeem_script):
raise NotImplementedError(txin_type) raise NotImplementedError(txin_type)
def script_to_address(script, *, net=None): def script_to_address(script: str, *, net=None) -> str:
from .transaction import get_address_from_output_script from .transaction import get_address_from_output_script
t, addr = get_address_from_output_script(bfh(script), net=net) t, addr = get_address_from_output_script(bfh(script), net=net)
assert t == TYPE_ADDRESS assert t == TYPE_ADDRESS
@ -310,15 +315,15 @@ def address_to_script(addr: str, *, net=None) -> str:
raise BitcoinException(f'unknown address type: {addrtype}') raise BitcoinException(f'unknown address type: {addrtype}')
return script return script
def address_to_scripthash(addr): def address_to_scripthash(addr: str) -> str:
script = address_to_script(addr) script = address_to_script(addr)
return script_to_scripthash(script) return script_to_scripthash(script)
def script_to_scripthash(script): def script_to_scripthash(script: str) -> str:
h = sha256(bytes.fromhex(script))[0:32] h = sha256(bfh(script))[0:32]
return bh2u(bytes(reversed(h))) return bh2u(bytes(reversed(h)))
def public_key_to_p2pk_script(pubkey): def public_key_to_p2pk_script(pubkey: str) -> str:
script = push_script(pubkey) script = push_script(pubkey)
script += 'ac' # op_checksig script += 'ac' # op_checksig
return script return script
@ -360,7 +365,7 @@ def base_encode(v: bytes, base: int) -> str:
return result.decode('ascii') return result.decode('ascii')
def base_decode(v, length, base): def base_decode(v: Union[bytes, str], length: Optional[int], base: int) -> Optional[bytes]:
""" decode v into a string of len bytes.""" """ decode v into a string of len bytes."""
# assert_bytes(v) # assert_bytes(v)
v = to_bytes(v, 'ascii') v = to_bytes(v, 'ascii')
@ -398,21 +403,20 @@ class InvalidChecksum(Exception):
pass pass
def EncodeBase58Check(vchIn): def EncodeBase58Check(vchIn: bytes) -> str:
hash = sha256d(vchIn) hash = sha256d(vchIn)
return base_encode(vchIn + hash[0:4], base=58) return base_encode(vchIn + hash[0:4], base=58)
def DecodeBase58Check(psz): def DecodeBase58Check(psz: Union[bytes, str]) -> bytes:
vchRet = base_decode(psz, None, base=58) vchRet = base_decode(psz, None, base=58)
key = vchRet[0:-4] payload = vchRet[0:-4]
csum = vchRet[-4:] csum_found = vchRet[-4:]
hash = sha256d(key) csum_calculated = sha256d(payload)[0:4]
cs32 = hash[0:4] if csum_calculated != csum_found:
if cs32 != csum: raise InvalidChecksum(f'calculated {bh2u(csum_calculated)}, found {bh2u(csum_found)}')
raise InvalidChecksum('expected {}, actual {}'.format(bh2u(cs32), bh2u(csum)))
else: else:
return key return payload
# backwards compat # backwards compat
@ -484,16 +488,16 @@ def deserialize_privkey(key: str) -> Tuple[str, bytes, bool]:
return txin_type, secret_bytes, compressed return txin_type, secret_bytes, compressed
def is_compressed(sec): def is_compressed_privkey(sec: str) -> bool:
return deserialize_privkey(sec)[2] return deserialize_privkey(sec)[2]
def address_from_private_key(sec): def address_from_private_key(sec: str) -> str:
txin_type, privkey, compressed = deserialize_privkey(sec) txin_type, privkey, compressed = deserialize_privkey(sec)
public_key = ecc.ECPrivkey(privkey).get_public_key_hex(compressed=compressed) public_key = ecc.ECPrivkey(privkey).get_public_key_hex(compressed=compressed)
return pubkey_to_address(txin_type, public_key) return pubkey_to_address(txin_type, public_key)
def is_segwit_address(addr, *, net=None): def is_segwit_address(addr: str, *, net=None) -> bool:
if net is None: net = constants.net if net is None: net = constants.net
try: try:
witver, witprog = segwit_addr.decode(net.SEGWIT_HRP, addr) witver, witprog = segwit_addr.decode(net.SEGWIT_HRP, addr)
@ -501,7 +505,7 @@ def is_segwit_address(addr, *, net=None):
return False return False
return witprog is not None return witprog is not None
def is_b58_address(addr, *, net=None): def is_b58_address(addr: str, *, net=None) -> bool:
if net is None: net = constants.net if net is None: net = constants.net
try: try:
addrtype, h = b58_address_to_hash160(addr) addrtype, h = b58_address_to_hash160(addr)
@ -511,13 +515,13 @@ def is_b58_address(addr, *, net=None):
return False return False
return addr == hash160_to_b58_address(h, addrtype) return addr == hash160_to_b58_address(h, addrtype)
def is_address(addr, *, net=None): def is_address(addr: str, *, net=None) -> bool:
if net is None: net = constants.net if net is None: net = constants.net
return is_segwit_address(addr, net=net) \ return is_segwit_address(addr, net=net) \
or is_b58_address(addr, net=net) or is_b58_address(addr, net=net)
def is_private_key(key): def is_private_key(key: str) -> bool:
try: try:
k = deserialize_privkey(key) k = deserialize_privkey(key)
return k is not False return k is not False
@ -527,7 +531,7 @@ def is_private_key(key):
########### end pywallet functions ####################### ########### end pywallet functions #######################
def is_minikey(text): def is_minikey(text: str) -> bool:
# Minikeys are typically 22 or 30 characters, but this routine # Minikeys are typically 22 or 30 characters, but this routine
# permits any length of 20 or more provided the minikey is valid. # permits any length of 20 or more provided the minikey is valid.
# A valid minikey must begin with an 'S', be in base58, and when # A valid minikey must begin with an 'S', be in base58, and when
@ -537,5 +541,5 @@ def is_minikey(text):
and all(ord(c) in __b58chars for c in text) and all(ord(c) in __b58chars for c in text)
and sha256(text + '?')[0] == 0x00) and sha256(text + '?')[0] == 0x00)
def minikey_to_private_key(text): def minikey_to_private_key(text: str) -> bytes:
return sha256(text) return sha256(text)

16
electrum/ecc.py

@ -52,31 +52,31 @@ def point_at_infinity():
return ECPubkey(None) return ECPubkey(None)
def sig_string_from_der_sig(der_sig, order=CURVE_ORDER): def sig_string_from_der_sig(der_sig: bytes, order=CURVE_ORDER) -> bytes:
r, s = ecdsa.util.sigdecode_der(der_sig, order) r, s = ecdsa.util.sigdecode_der(der_sig, order)
return ecdsa.util.sigencode_string(r, s, order) return ecdsa.util.sigencode_string(r, s, order)
def der_sig_from_sig_string(sig_string, order=CURVE_ORDER): def der_sig_from_sig_string(sig_string: bytes, order=CURVE_ORDER) -> bytes:
r, s = ecdsa.util.sigdecode_string(sig_string, order) r, s = ecdsa.util.sigdecode_string(sig_string, order)
return ecdsa.util.sigencode_der_canonize(r, s, order) return ecdsa.util.sigencode_der_canonize(r, s, order)
def der_sig_from_r_and_s(r, s, order=CURVE_ORDER): def der_sig_from_r_and_s(r: int, s: int, order=CURVE_ORDER) -> bytes:
return ecdsa.util.sigencode_der_canonize(r, s, order) return ecdsa.util.sigencode_der_canonize(r, s, order)
def get_r_and_s_from_der_sig(der_sig, order=CURVE_ORDER): def get_r_and_s_from_der_sig(der_sig: bytes, order=CURVE_ORDER) -> Tuple[int, int]:
r, s = ecdsa.util.sigdecode_der(der_sig, order) r, s = ecdsa.util.sigdecode_der(der_sig, order)
return r, s return r, s
def get_r_and_s_from_sig_string(sig_string, order=CURVE_ORDER): def get_r_and_s_from_sig_string(sig_string: bytes, order=CURVE_ORDER) -> Tuple[int, int]:
r, s = ecdsa.util.sigdecode_string(sig_string, order) r, s = ecdsa.util.sigdecode_string(sig_string, order)
return r, s return r, s
def sig_string_from_r_and_s(r, s, order=CURVE_ORDER): def sig_string_from_r_and_s(r: int, s: int, order=CURVE_ORDER) -> bytes:
return ecdsa.util.sigencode_string_canonize(r, s, order) return ecdsa.util.sigencode_string_canonize(r, s, order)
@ -410,7 +410,7 @@ class ECPrivkey(ECPubkey):
sig65, recid = bruteforce_recid(sig_string) sig65, recid = bruteforce_recid(sig_string)
return sig65 return sig65
def decrypt_message(self, encrypted, magic=b'BIE1'): def decrypt_message(self, encrypted: Tuple[str, bytes], magic: bytes=b'BIE1') -> bytes:
encrypted = base64.b64decode(encrypted) encrypted = base64.b64decode(encrypted)
if len(encrypted) < 85: if len(encrypted) < 85:
raise Exception('invalid ciphertext: length') raise Exception('invalid ciphertext: length')
@ -435,6 +435,6 @@ class ECPrivkey(ECPubkey):
return aes_decrypt_with_iv(key_e, iv, ciphertext) return aes_decrypt_with_iv(key_e, iv, ciphertext)
def construct_sig65(sig_string, recid, is_compressed): def construct_sig65(sig_string: bytes, recid: int, is_compressed: bool) -> bytes:
comp = 4 if is_compressed else 0 comp = 4 if is_compressed else 0
return bytes([27 + recid + comp]) + sig_string return bytes([27 + recid + comp]) + sig_string

2
electrum/mnemonic.py

@ -74,7 +74,7 @@ def is_CJK(c):
return False return False
def normalize_text(seed): def normalize_text(seed: str) -> str:
# normalize # normalize
seed = unicodedata.normalize('NFKD', seed) seed = unicodedata.normalize('NFKD', seed)
# lower # lower

2
electrum/paymentrequest.py

@ -336,7 +336,7 @@ def sign_request_with_alias(pr, alias, alias_privkey):
pr.pki_data = str(alias) pr.pki_data = str(alias)
message = pr.SerializeToString() message = pr.SerializeToString()
ec_key = ecc.ECPrivkey(alias_privkey) ec_key = ecc.ECPrivkey(alias_privkey)
compressed = bitcoin.is_compressed(alias_privkey) compressed = bitcoin.is_compressed_privkey(alias_privkey)
pr.signature = ec_key.sign_message(message, compressed) pr.signature = ec_key.sign_message(message, compressed)

6
electrum/tests/test_bitcoin.py

@ -6,7 +6,7 @@ from electrum.bitcoin import (public_key_to_p2pkh, address_from_private_key,
var_int, op_push, address_to_script, var_int, op_push, address_to_script,
deserialize_privkey, serialize_privkey, is_segwit_address, deserialize_privkey, serialize_privkey, is_segwit_address,
is_b58_address, address_to_scripthash, is_minikey, is_b58_address, address_to_scripthash, is_minikey,
is_compressed, seed_type, EncodeBase58Check, is_compressed_privkey, seed_type, EncodeBase58Check,
script_num_to_hex, push_script, add_number_to_script, int_to_hex) script_num_to_hex, push_script, add_number_to_script, int_to_hex)
from electrum.bip32 import (bip32_root, bip32_public_derivation, bip32_private_derivation, from electrum.bip32 import (bip32_root, bip32_public_derivation, bip32_private_derivation,
xpub_from_xprv, xpub_type, is_xprv, is_bip32_derivation, xpub_from_xprv, xpub_type, is_xprv, is_bip32_derivation,
@ -710,10 +710,10 @@ class Test_keyImport(SequentialTestCase):
self.assertEqual(minikey, is_minikey(priv)) self.assertEqual(minikey, is_minikey(priv))
@needs_test_with_all_ecc_implementations @needs_test_with_all_ecc_implementations
def test_is_compressed(self): def test_is_compressed_privkey(self):
for priv_details in self.priv_pub_addr: for priv_details in self.priv_pub_addr:
self.assertEqual(priv_details['compressed'], self.assertEqual(priv_details['compressed'],
is_compressed(priv_details['priv'])) is_compressed_privkey(priv_details['priv']))
class Test_seeds(SequentialTestCase): class Test_seeds(SequentialTestCase):

Loading…
Cancel
Save