From 36f64d1ad98487df6ea3d9d29dc320ae14799c03 Mon Sep 17 00:00:00 2001 From: SomberNight Date: Sun, 18 Nov 2018 22:07:27 +0100 Subject: [PATCH] bitcoin/ecc: some more type annotations --- electrum/bitcoin.py | 98 ++++++++++++++++++---------------- electrum/ecc.py | 16 +++--- electrum/mnemonic.py | 2 +- electrum/paymentrequest.py | 2 +- electrum/tests/test_bitcoin.py | 6 +-- 5 files changed, 64 insertions(+), 60 deletions(-) diff --git a/electrum/bitcoin.py b/electrum/bitcoin.py index 6bebf498b..56d071631 100644 --- a/electrum/bitcoin.py +++ b/electrum/bitcoin.py @@ -24,7 +24,7 @@ # SOFTWARE. import hashlib -from typing import List, Tuple, TYPE_CHECKING +from typing import List, Tuple, TYPE_CHECKING, Optional, Union from .util import bfh, bh2u, BitcoinException, assert_bytes, to_bytes, inv_dict from . import version @@ -49,7 +49,7 @@ TYPE_PUBKEY = 1 TYPE_SCRIPT = 2 -def rev_hex(s): +def rev_hex(s: str) -> str: return bh2u(bfh(s)[::-1]) @@ -162,22 +162,25 @@ def dust_threshold(network: 'Network'=None) -> int: return 182 * 3 * relayfee(network) // 1000 -hash_encode = lambda x: bh2u(x[::-1]) -hash_decode = lambda x: bfh(x)[::-1] -hmac_sha_512 = lambda x, y: hmac_oneshot(x, y, hashlib.sha512) +def hash_encode(x: bytes) -> str: + return bh2u(x[::-1]) + + +def hash_decode(x: str) -> bytes: + return bfh(x)[::-1] ################################## electrum seeds -def is_new_seed(x, prefix=version.SEED_PREFIX): +def is_new_seed(x: str, prefix=version.SEED_PREFIX) -> bool: from . import mnemonic x = mnemonic.normalize_text(x) - s = bh2u(hmac_sha_512(b"Seed version", x.encode('utf8'))) + s = bh2u(hmac_oneshot(b"Seed version", x.encode('utf8'), hashlib.sha512)) return s.startswith(prefix) -def is_old_seed(seed): +def is_old_seed(seed: str) -> bool: from . import old_mnemonic, mnemonic seed = mnemonic.normalize_text(seed) words = seed.split() @@ -195,7 +198,7 @@ def is_old_seed(seed): return is_hex or (uses_electrum_words and (len(words) == 12 or len(words) == 24)) -def seed_type(x): +def seed_type(x: str) -> str: if is_old_seed(x): return 'old' elif is_new_seed(x): @@ -206,29 +209,31 @@ def seed_type(x): return '2fa' return '' -is_seed = lambda x: bool(seed_type(x)) + +def is_seed(x: str) -> bool: + return bool(seed_type(x)) ############ functions from pywallet ##################### -def hash160_to_b58_address(h160: bytes, addrtype): - s = bytes([addrtype]) - s += h160 - return base_encode(s+sha256d(s)[0:4], base=58) +def hash160_to_b58_address(h160: bytes, addrtype: int) -> str: + s = bytes([addrtype]) + h160 + s = s + sha256d(s)[0:4] + return base_encode(s, base=58) -def b58_address_to_hash160(addr): +def b58_address_to_hash160(addr: str) -> Tuple[int, bytes]: addr = to_bytes(addr, 'ascii') _bytes = base_decode(addr, 25, base=58) return _bytes[0], _bytes[1:21] -def hash160_to_p2pkh(h160, *, net=None): +def hash160_to_p2pkh(h160: bytes, *, net=None) -> str: if net is None: net = constants.net return hash160_to_b58_address(h160, net.ADDRTYPE_P2PKH) -def hash160_to_p2sh(h160, *, net=None): +def hash160_to_p2sh(h160: bytes, *, net=None) -> str: if net is None: net = constants.net return hash160_to_b58_address(h160, net.ADDRTYPE_P2SH) @@ -236,26 +241,26 @@ def hash160_to_p2sh(h160, *, net=None): def public_key_to_p2pkh(public_key: bytes) -> str: return hash160_to_p2pkh(hash_160(public_key)) -def hash_to_segwit_addr(h, witver, *, net=None): +def hash_to_segwit_addr(h: bytes, witver: int, *, net=None) -> str: if net is None: net = constants.net return segwit_addr.encode(net.SEGWIT_HRP, witver, h) -def public_key_to_p2wpkh(public_key): +def public_key_to_p2wpkh(public_key: bytes) -> str: return hash_to_segwit_addr(hash_160(public_key), witver=0) -def script_to_p2wsh(script): +def script_to_p2wsh(script: str) -> str: return hash_to_segwit_addr(sha256(bfh(script)), witver=0) -def p2wpkh_nested_script(pubkey): +def p2wpkh_nested_script(pubkey: str) -> str: pkh = bh2u(hash_160(bfh(pubkey))) return '00' + push_script(pkh) -def p2wsh_nested_script(witness_script): +def p2wsh_nested_script(witness_script: str) -> str: wsh = bh2u(sha256(bfh(witness_script))) return '00' + push_script(wsh) -def pubkey_to_address(txin_type, pubkey): +def pubkey_to_address(txin_type: str, pubkey: str) -> str: if txin_type == 'p2pkh': return public_key_to_p2pkh(bfh(pubkey)) elif txin_type == 'p2wpkh': @@ -266,7 +271,7 @@ def pubkey_to_address(txin_type, pubkey): else: raise NotImplementedError(txin_type) -def redeem_script_to_address(txin_type, redeem_script): +def redeem_script_to_address(txin_type: str, redeem_script: str) -> str: if txin_type == 'p2sh': return hash160_to_p2sh(hash_160(bfh(redeem_script))) elif txin_type == 'p2wsh': @@ -278,7 +283,7 @@ def redeem_script_to_address(txin_type, redeem_script): raise NotImplementedError(txin_type) -def script_to_address(script, *, net=None): +def script_to_address(script: str, *, net=None) -> str: from .transaction import get_address_from_output_script t, addr = get_address_from_output_script(bfh(script), net=net) assert t == TYPE_ADDRESS @@ -310,15 +315,15 @@ def address_to_script(addr: str, *, net=None) -> str: raise BitcoinException(f'unknown address type: {addrtype}') return script -def address_to_scripthash(addr): +def address_to_scripthash(addr: str) -> str: script = address_to_script(addr) return script_to_scripthash(script) -def script_to_scripthash(script): - h = sha256(bytes.fromhex(script))[0:32] +def script_to_scripthash(script: str) -> str: + h = sha256(bfh(script))[0:32] return bh2u(bytes(reversed(h))) -def public_key_to_p2pk_script(pubkey): +def public_key_to_p2pk_script(pubkey: str) -> str: script = push_script(pubkey) script += 'ac' # op_checksig return script @@ -360,7 +365,7 @@ def base_encode(v: bytes, base: int) -> str: return result.decode('ascii') -def base_decode(v, length, base): +def base_decode(v: Union[bytes, str], length: Optional[int], base: int) -> Optional[bytes]: """ decode v into a string of len bytes.""" # assert_bytes(v) v = to_bytes(v, 'ascii') @@ -398,21 +403,20 @@ class InvalidChecksum(Exception): pass -def EncodeBase58Check(vchIn): +def EncodeBase58Check(vchIn: bytes) -> str: hash = sha256d(vchIn) return base_encode(vchIn + hash[0:4], base=58) -def DecodeBase58Check(psz): +def DecodeBase58Check(psz: Union[bytes, str]) -> bytes: vchRet = base_decode(psz, None, base=58) - key = vchRet[0:-4] - csum = vchRet[-4:] - hash = sha256d(key) - cs32 = hash[0:4] - if cs32 != csum: - raise InvalidChecksum('expected {}, actual {}'.format(bh2u(cs32), bh2u(csum))) + payload = vchRet[0:-4] + csum_found = vchRet[-4:] + csum_calculated = sha256d(payload)[0:4] + if csum_calculated != csum_found: + raise InvalidChecksum(f'calculated {bh2u(csum_calculated)}, found {bh2u(csum_found)}') else: - return key + return payload # backwards compat @@ -484,16 +488,16 @@ def deserialize_privkey(key: str) -> Tuple[str, bytes, bool]: return txin_type, secret_bytes, compressed -def is_compressed(sec): +def is_compressed_privkey(sec: str) -> bool: return deserialize_privkey(sec)[2] -def address_from_private_key(sec): +def address_from_private_key(sec: str) -> str: txin_type, privkey, compressed = deserialize_privkey(sec) public_key = ecc.ECPrivkey(privkey).get_public_key_hex(compressed=compressed) return pubkey_to_address(txin_type, public_key) -def is_segwit_address(addr, *, net=None): +def is_segwit_address(addr: str, *, net=None) -> bool: if net is None: net = constants.net try: witver, witprog = segwit_addr.decode(net.SEGWIT_HRP, addr) @@ -501,7 +505,7 @@ def is_segwit_address(addr, *, net=None): return False return witprog is not None -def is_b58_address(addr, *, net=None): +def is_b58_address(addr: str, *, net=None) -> bool: if net is None: net = constants.net try: addrtype, h = b58_address_to_hash160(addr) @@ -511,13 +515,13 @@ def is_b58_address(addr, *, net=None): return False return addr == hash160_to_b58_address(h, addrtype) -def is_address(addr, *, net=None): +def is_address(addr: str, *, net=None) -> bool: if net is None: net = constants.net return is_segwit_address(addr, net=net) \ or is_b58_address(addr, net=net) -def is_private_key(key): +def is_private_key(key: str) -> bool: try: k = deserialize_privkey(key) return k is not False @@ -527,7 +531,7 @@ def is_private_key(key): ########### end pywallet functions ####################### -def is_minikey(text): +def is_minikey(text: str) -> bool: # Minikeys are typically 22 or 30 characters, but this routine # permits any length of 20 or more provided the minikey is valid. # A valid minikey must begin with an 'S', be in base58, and when @@ -537,5 +541,5 @@ def is_minikey(text): and all(ord(c) in __b58chars for c in text) and sha256(text + '?')[0] == 0x00) -def minikey_to_private_key(text): +def minikey_to_private_key(text: str) -> bytes: return sha256(text) diff --git a/electrum/ecc.py b/electrum/ecc.py index 7c2cab92d..feb56ce7c 100644 --- a/electrum/ecc.py +++ b/electrum/ecc.py @@ -52,31 +52,31 @@ def point_at_infinity(): return ECPubkey(None) -def sig_string_from_der_sig(der_sig, order=CURVE_ORDER): +def sig_string_from_der_sig(der_sig: bytes, order=CURVE_ORDER) -> bytes: r, s = ecdsa.util.sigdecode_der(der_sig, order) return ecdsa.util.sigencode_string(r, s, order) -def der_sig_from_sig_string(sig_string, order=CURVE_ORDER): +def der_sig_from_sig_string(sig_string: bytes, order=CURVE_ORDER) -> bytes: r, s = ecdsa.util.sigdecode_string(sig_string, order) return ecdsa.util.sigencode_der_canonize(r, s, order) -def der_sig_from_r_and_s(r, s, order=CURVE_ORDER): +def der_sig_from_r_and_s(r: int, s: int, order=CURVE_ORDER) -> bytes: return ecdsa.util.sigencode_der_canonize(r, s, order) -def get_r_and_s_from_der_sig(der_sig, order=CURVE_ORDER): +def get_r_and_s_from_der_sig(der_sig: bytes, order=CURVE_ORDER) -> Tuple[int, int]: r, s = ecdsa.util.sigdecode_der(der_sig, order) return r, s -def get_r_and_s_from_sig_string(sig_string, order=CURVE_ORDER): +def get_r_and_s_from_sig_string(sig_string: bytes, order=CURVE_ORDER) -> Tuple[int, int]: r, s = ecdsa.util.sigdecode_string(sig_string, order) return r, s -def sig_string_from_r_and_s(r, s, order=CURVE_ORDER): +def sig_string_from_r_and_s(r: int, s: int, order=CURVE_ORDER) -> bytes: return ecdsa.util.sigencode_string_canonize(r, s, order) @@ -410,7 +410,7 @@ class ECPrivkey(ECPubkey): sig65, recid = bruteforce_recid(sig_string) return sig65 - def decrypt_message(self, encrypted, magic=b'BIE1'): + def decrypt_message(self, encrypted: Tuple[str, bytes], magic: bytes=b'BIE1') -> bytes: encrypted = base64.b64decode(encrypted) if len(encrypted) < 85: raise Exception('invalid ciphertext: length') @@ -435,6 +435,6 @@ class ECPrivkey(ECPubkey): return aes_decrypt_with_iv(key_e, iv, ciphertext) -def construct_sig65(sig_string, recid, is_compressed): +def construct_sig65(sig_string: bytes, recid: int, is_compressed: bool) -> bytes: comp = 4 if is_compressed else 0 return bytes([27 + recid + comp]) + sig_string diff --git a/electrum/mnemonic.py b/electrum/mnemonic.py index 6de217509..97889c0aa 100644 --- a/electrum/mnemonic.py +++ b/electrum/mnemonic.py @@ -74,7 +74,7 @@ def is_CJK(c): return False -def normalize_text(seed): +def normalize_text(seed: str) -> str: # normalize seed = unicodedata.normalize('NFKD', seed) # lower diff --git a/electrum/paymentrequest.py b/electrum/paymentrequest.py index 071e7bb88..8b0d4bee5 100644 --- a/electrum/paymentrequest.py +++ b/electrum/paymentrequest.py @@ -336,7 +336,7 @@ def sign_request_with_alias(pr, alias, alias_privkey): pr.pki_data = str(alias) message = pr.SerializeToString() ec_key = ecc.ECPrivkey(alias_privkey) - compressed = bitcoin.is_compressed(alias_privkey) + compressed = bitcoin.is_compressed_privkey(alias_privkey) pr.signature = ec_key.sign_message(message, compressed) diff --git a/electrum/tests/test_bitcoin.py b/electrum/tests/test_bitcoin.py index e11cee4ec..cdf1257f1 100644 --- a/electrum/tests/test_bitcoin.py +++ b/electrum/tests/test_bitcoin.py @@ -6,7 +6,7 @@ from electrum.bitcoin import (public_key_to_p2pkh, address_from_private_key, var_int, op_push, address_to_script, deserialize_privkey, serialize_privkey, is_segwit_address, is_b58_address, address_to_scripthash, is_minikey, - is_compressed, seed_type, EncodeBase58Check, + is_compressed_privkey, seed_type, EncodeBase58Check, script_num_to_hex, push_script, add_number_to_script, int_to_hex) from electrum.bip32 import (bip32_root, bip32_public_derivation, bip32_private_derivation, xpub_from_xprv, xpub_type, is_xprv, is_bip32_derivation, @@ -710,10 +710,10 @@ class Test_keyImport(SequentialTestCase): self.assertEqual(minikey, is_minikey(priv)) @needs_test_with_all_ecc_implementations - def test_is_compressed(self): + def test_is_compressed_privkey(self): for priv_details in self.priv_pub_addr: self.assertEqual(priv_details['compressed'], - is_compressed(priv_details['priv'])) + is_compressed_privkey(priv_details['priv'])) class Test_seeds(SequentialTestCase):