|
|
@ -159,7 +159,7 @@ class ECPubkey(object): |
|
|
|
assert_bytes(sig_string) |
|
|
|
if len(sig_string) != 64: |
|
|
|
raise Exception(f'wrong encoding used for signature? len={len(sig_string)} (should be 64)') |
|
|
|
if recid < 0 or recid > 3: |
|
|
|
if not (0 <= recid <= 3): |
|
|
|
raise ValueError('recid is {}, but should be 0 <= recid <= 3'.format(recid)) |
|
|
|
sig65 = create_string_buffer(65) |
|
|
|
ret = _libsecp256k1.secp256k1_ecdsa_recoverable_signature_parse_compact( |
|
|
@ -177,7 +177,7 @@ class ECPubkey(object): |
|
|
|
if len(sig) != 65: |
|
|
|
raise Exception(f'wrong encoding used for signature? len={len(sig)} (should be 65)') |
|
|
|
nV = sig[0] |
|
|
|
if nV < 27 or nV >= 35: |
|
|
|
if not (27 <= nV <= 34): |
|
|
|
raise Exception("Bad encoding") |
|
|
|
if nV >= 31: |
|
|
|
compressed = True |
|
|
@ -290,33 +290,36 @@ class ECPubkey(object): |
|
|
|
raise TypeError('comparison not defined for ECPubkey and {}'.format(type(other))) |
|
|
|
return (self.x() or 0) < (other.x() or 0) |
|
|
|
|
|
|
|
def verify_message_for_address(self, sig65: bytes, message: bytes, algo=lambda x: sha256d(msg_magic(x))) -> None: |
|
|
|
def verify_message_for_address(self, sig65: bytes, message: bytes, algo=lambda x: sha256d(msg_magic(x))) -> bool: |
|
|
|
assert_bytes(message) |
|
|
|
h = algo(message) |
|
|
|
public_key, compressed = self.from_signature65(sig65, h) |
|
|
|
try: |
|
|
|
public_key, compressed = self.from_signature65(sig65, h) |
|
|
|
except Exception: |
|
|
|
return False |
|
|
|
# check public key |
|
|
|
if public_key != self: |
|
|
|
raise Exception("Bad signature") |
|
|
|
return False |
|
|
|
# check message |
|
|
|
self.verify_message_hash(sig65[1:], h) |
|
|
|
return self.verify_message_hash(sig65[1:], h) |
|
|
|
|
|
|
|
# TODO return bool instead of raising |
|
|
|
def verify_message_hash(self, sig_string: bytes, msg_hash: bytes) -> None: |
|
|
|
def verify_message_hash(self, sig_string: bytes, msg_hash: bytes) -> bool: |
|
|
|
assert_bytes(sig_string) |
|
|
|
if len(sig_string) != 64: |
|
|
|
raise Exception(f'wrong encoding used for signature? len={len(sig_string)} (should be 64)') |
|
|
|
return False |
|
|
|
if not (isinstance(msg_hash, bytes) and len(msg_hash) == 32): |
|
|
|
raise Exception("msg_hash must be bytes, and 32 bytes exactly") |
|
|
|
return False |
|
|
|
|
|
|
|
sig = create_string_buffer(64) |
|
|
|
ret = _libsecp256k1.secp256k1_ecdsa_signature_parse_compact(_libsecp256k1.ctx, sig, sig_string) |
|
|
|
if not ret: |
|
|
|
raise Exception("Bad signature") |
|
|
|
return False |
|
|
|
ret = _libsecp256k1.secp256k1_ecdsa_signature_normalize(_libsecp256k1.ctx, sig, sig) |
|
|
|
|
|
|
|
pubkey = self._to_libsecp256k1_pubkey_ptr() |
|
|
|
if 1 != _libsecp256k1.secp256k1_ecdsa_verify(_libsecp256k1.ctx, sig, msg_hash, pubkey): |
|
|
|
raise Exception("Bad signature") |
|
|
|
return False |
|
|
|
return True |
|
|
|
|
|
|
|
def encrypt_message(self, message: bytes, magic: bytes = b'BIE1') -> bytes: |
|
|
|
""" |
|
|
@ -364,33 +367,28 @@ def msg_magic(message: bytes) -> bytes: |
|
|
|
|
|
|
|
|
|
|
|
def verify_signature(pubkey: bytes, sig: bytes, h: bytes) -> bool: |
|
|
|
try: |
|
|
|
ECPubkey(pubkey).verify_message_hash(sig, h) |
|
|
|
except: |
|
|
|
return False |
|
|
|
return True |
|
|
|
return ECPubkey(pubkey).verify_message_hash(sig, h) |
|
|
|
|
|
|
|
|
|
|
|
def verify_message_with_address(address: str, sig65: bytes, message: bytes, *, net=None): |
|
|
|
def verify_message_with_address(address: str, sig65: bytes, message: bytes, *, net=None) -> bool: |
|
|
|
from .bitcoin import pubkey_to_address |
|
|
|
assert_bytes(sig65, message) |
|
|
|
if net is None: net = constants.net |
|
|
|
h = sha256d(msg_magic(message)) |
|
|
|
try: |
|
|
|
h = sha256d(msg_magic(message)) |
|
|
|
public_key, compressed = ECPubkey.from_signature65(sig65, h) |
|
|
|
# check public key using the address |
|
|
|
pubkey_hex = public_key.get_public_key_hex(compressed) |
|
|
|
for txin_type in ['p2pkh','p2wpkh','p2wpkh-p2sh']: |
|
|
|
addr = pubkey_to_address(txin_type, pubkey_hex, net=net) |
|
|
|
if address == addr: |
|
|
|
break |
|
|
|
else: |
|
|
|
raise Exception("Bad signature") |
|
|
|
# check message |
|
|
|
public_key.verify_message_hash(sig65[1:], h) |
|
|
|
return True |
|
|
|
except Exception as e: |
|
|
|
_logger.info(f"Verification error: {repr(e)}") |
|
|
|
return False |
|
|
|
# check public key using the address |
|
|
|
pubkey_hex = public_key.get_public_key_hex(compressed) |
|
|
|
for txin_type in ['p2pkh','p2wpkh','p2wpkh-p2sh']: |
|
|
|
addr = pubkey_to_address(txin_type, pubkey_hex, net=net) |
|
|
|
if address == addr: |
|
|
|
break |
|
|
|
else: |
|
|
|
return False |
|
|
|
# check message |
|
|
|
return public_key.verify_message_hash(sig65[1:], h) |
|
|
|
|
|
|
|
|
|
|
|
def is_secret_within_curve_range(secret: Union[int, bytes]) -> bool: |
|
|
@ -476,7 +474,8 @@ class ECPrivkey(ECPubkey): |
|
|
|
r, s = sign_with_extra_entropy(extra_entropy=extra_entropy) |
|
|
|
|
|
|
|
sig_string = sig_string_from_r_and_s(r, s) |
|
|
|
self.verify_message_hash(sig_string, msg_hash) |
|
|
|
if not self.verify_message_hash(sig_string, msg_hash): |
|
|
|
raise Exception("sanity check failed: signature we just created does not verify!") |
|
|
|
|
|
|
|
sig = sigencode(r, s) |
|
|
|
return sig |
|
|
@ -488,11 +487,9 @@ class ECPrivkey(ECPubkey): |
|
|
|
def bruteforce_recid(sig_string): |
|
|
|
for recid in range(4): |
|
|
|
sig65 = construct_sig65(sig_string, recid, is_compressed) |
|
|
|
try: |
|
|
|
self.verify_message_for_address(sig65, message, algo) |
|
|
|
return sig65, recid |
|
|
|
except Exception as e: |
|
|
|
if not self.verify_message_for_address(sig65, message, algo): |
|
|
|
continue |
|
|
|
return sig65, recid |
|
|
|
else: |
|
|
|
raise Exception("error: cannot sign message. no recid fits..") |
|
|
|
|
|
|
|