|
|
@ -26,49 +26,36 @@ |
|
|
|
import base64 |
|
|
|
import hashlib |
|
|
|
import functools |
|
|
|
import copy |
|
|
|
from typing import Union, Tuple, Optional |
|
|
|
from ctypes import ( |
|
|
|
byref, c_byte, c_int, c_uint, c_char_p, c_size_t, c_void_p, create_string_buffer, |
|
|
|
CFUNCTYPE, POINTER, cast |
|
|
|
) |
|
|
|
|
|
|
|
from .util import bfh, bh2u, assert_bytes, to_bytes, InvalidPassword, profiler, randrange |
|
|
|
from .crypto import (sha256d, aes_encrypt_with_iv, aes_decrypt_with_iv, hmac_oneshot) |
|
|
|
from . import constants |
|
|
|
from .logging import get_logger |
|
|
|
|
|
|
|
# TODO -->>> |
|
|
|
import ctypes |
|
|
|
from ctypes import ( |
|
|
|
byref, c_byte, c_int, c_uint, c_char_p, c_size_t, c_void_p, create_string_buffer, |
|
|
|
CFUNCTYPE, POINTER, cast |
|
|
|
) |
|
|
|
from .ecc_fast import _libsecp256k1, SECP256K1_EC_UNCOMPRESSED |
|
|
|
# TODO <<<-- |
|
|
|
|
|
|
|
_logger = get_logger(__name__) |
|
|
|
|
|
|
|
|
|
|
|
def generator(): |
|
|
|
return GENERATOR |
|
|
|
|
|
|
|
|
|
|
|
def point_at_infinity(): |
|
|
|
return ECPubkey(None) |
|
|
|
|
|
|
|
|
|
|
|
def string_to_number(b: bytes) -> int: |
|
|
|
return int.from_bytes(b, byteorder='big', signed=False) |
|
|
|
|
|
|
|
|
|
|
|
def sig_string_from_der_sig(der_sig: bytes, order=None) -> bytes: |
|
|
|
def sig_string_from_der_sig(der_sig: bytes) -> bytes: |
|
|
|
r, s = get_r_and_s_from_der_sig(der_sig) |
|
|
|
return sig_string_from_r_and_s(r, s) |
|
|
|
|
|
|
|
|
|
|
|
def der_sig_from_sig_string(sig_string: bytes, order=None) -> bytes: |
|
|
|
def der_sig_from_sig_string(sig_string: bytes) -> bytes: |
|
|
|
r, s = get_r_and_s_from_sig_string(sig_string) |
|
|
|
return der_sig_from_r_and_s(r, s) |
|
|
|
|
|
|
|
|
|
|
|
def der_sig_from_r_and_s(r: int, s: int, order=None) -> bytes: |
|
|
|
def der_sig_from_r_and_s(r: int, s: int) -> bytes: |
|
|
|
sig_string = (int.to_bytes(r, length=32, byteorder="big") + |
|
|
|
int.to_bytes(s, length=32, byteorder="big")) |
|
|
|
sig = create_string_buffer(64) |
|
|
@ -85,7 +72,7 @@ def der_sig_from_r_and_s(r: int, s: int, order=None) -> bytes: |
|
|
|
return bytes(der_sig)[:der_sig_size] |
|
|
|
|
|
|
|
|
|
|
|
def get_r_and_s_from_der_sig(der_sig: bytes, order=None) -> Tuple[int, int]: |
|
|
|
def get_r_and_s_from_der_sig(der_sig: bytes) -> Tuple[int, int]: |
|
|
|
assert isinstance(der_sig, bytes) |
|
|
|
sig = create_string_buffer(64) |
|
|
|
ret = _libsecp256k1.secp256k1_ecdsa_signature_parse_der(_libsecp256k1.ctx, sig, der_sig, len(der_sig)) |
|
|
@ -99,7 +86,7 @@ def get_r_and_s_from_der_sig(der_sig: bytes, order=None) -> Tuple[int, int]: |
|
|
|
return r, s |
|
|
|
|
|
|
|
|
|
|
|
def get_r_and_s_from_sig_string(sig_string: bytes, order=None) -> Tuple[int, int]: |
|
|
|
def get_r_and_s_from_sig_string(sig_string: bytes) -> Tuple[int, int]: |
|
|
|
if not (isinstance(sig_string, bytes) and len(sig_string) == 64): |
|
|
|
raise Exception("sig_string must be bytes, and 64 bytes exactly") |
|
|
|
sig = create_string_buffer(64) |
|
|
@ -114,7 +101,7 @@ def get_r_and_s_from_sig_string(sig_string: bytes, order=None) -> Tuple[int, int |
|
|
|
return r, s |
|
|
|
|
|
|
|
|
|
|
|
def sig_string_from_r_and_s(r: int, s: int, order=None) -> bytes: |
|
|
|
def sig_string_from_r_and_s(r: int, s: int) -> bytes: |
|
|
|
sig_string = (int.to_bytes(r, length=32, byteorder="big") + |
|
|
|
int.to_bytes(s, length=32, byteorder="big")) |
|
|
|
sig = create_string_buffer(64) |
|
|
@ -250,12 +237,12 @@ class ECPubkey(object): |
|
|
|
|
|
|
|
other %= CURVE_ORDER |
|
|
|
if self.is_at_infinity() or other == 0: |
|
|
|
return point_at_infinity() |
|
|
|
return POINT_AT_INFINITY |
|
|
|
pubkey = self._to_libsecp256k1_pubkey_ptr() |
|
|
|
|
|
|
|
ret = _libsecp256k1.secp256k1_ec_pubkey_tweak_mul(_libsecp256k1.ctx, pubkey, other.to_bytes(32, byteorder="big")) |
|
|
|
if not ret: |
|
|
|
return point_at_infinity() |
|
|
|
return POINT_AT_INFINITY |
|
|
|
return ECPubkey._from_libsecp256k1_pubkey_ptr(pubkey) |
|
|
|
|
|
|
|
def __rmul__(self, other: int): |
|
|
@ -276,7 +263,7 @@ class ECPubkey(object): |
|
|
|
array_of_pubkey_ptrs = (c_char_p * 2)(pubkey1, pubkey2) |
|
|
|
ret = _libsecp256k1.secp256k1_ec_pubkey_combine(_libsecp256k1.ctx, pubkey_sum, array_of_pubkey_ptrs, 2) |
|
|
|
if not ret: |
|
|
|
return point_at_infinity() |
|
|
|
return POINT_AT_INFINITY |
|
|
|
return ECPubkey._from_libsecp256k1_pubkey_ptr(pubkey_sum) |
|
|
|
|
|
|
|
def __eq__(self, other) -> bool: |
|
|
@ -345,7 +332,7 @@ class ECPubkey(object): |
|
|
|
return CURVE_ORDER |
|
|
|
|
|
|
|
def is_at_infinity(self): |
|
|
|
return self == point_at_infinity() |
|
|
|
return self == POINT_AT_INFINITY |
|
|
|
|
|
|
|
@classmethod |
|
|
|
def is_pubkey_bytes(cls, b: bytes): |
|
|
@ -359,6 +346,7 @@ class ECPubkey(object): |
|
|
|
GENERATOR = ECPubkey(bytes.fromhex('0479be667ef9dcbbac55a06295ce870b07029bfcdb2dce28d959f2815b16f81798' |
|
|
|
'483ada7726a3c4655da4fbfc0e1108a8fd17b448a68554199c47d08ffb10d4b8')) |
|
|
|
CURVE_ORDER = 0xFFFFFFFF_FFFFFFFF_FFFFFFFF_FFFFFFFE_BAAEDCE6_AF48A03B_BFD25E8C_D0364141 |
|
|
|
POINT_AT_INFINITY = ECPubkey(None) |
|
|
|
|
|
|
|
|
|
|
|
def msg_magic(message: bytes) -> bytes: |
|
|
@ -414,7 +402,7 @@ class ECPrivkey(ECPubkey): |
|
|
|
raise InvalidECPointException('Invalid secret scalar (not within curve order)') |
|
|
|
self.secret_scalar = secret |
|
|
|
|
|
|
|
pubkey = generator() * secret |
|
|
|
pubkey = GENERATOR * secret |
|
|
|
super().__init__(pubkey.get_public_key_bytes(compressed=False)) |
|
|
|
|
|
|
|
@classmethod |
|
|
@ -450,7 +438,7 @@ class ECPrivkey(ECPubkey): |
|
|
|
def get_secret_bytes(self) -> bytes: |
|
|
|
return int.to_bytes(self.secret_scalar, length=32, byteorder='big', signed=False) |
|
|
|
|
|
|
|
def sign(self, msg_hash: bytes, sigencode=None, sigdecode=None) -> bytes: |
|
|
|
def sign(self, msg_hash: bytes, sigencode=None) -> bytes: |
|
|
|
if not (isinstance(msg_hash, bytes) and len(msg_hash) == 32): |
|
|
|
raise Exception("msg_hash to be signed must be bytes, and 32 bytes exactly") |
|
|
|
if sigencode is None: |
|
|
@ -481,13 +469,11 @@ class ECPrivkey(ECPubkey): |
|
|
|
sig_string = sig_string_from_r_and_s(r, s) |
|
|
|
self.verify_message_hash(sig_string, msg_hash) |
|
|
|
|
|
|
|
sig = sigencode(r, s, CURVE_ORDER) |
|
|
|
sig = sigencode(r, s) |
|
|
|
return sig |
|
|
|
|
|
|
|
def sign_transaction(self, hashed_preimage: bytes) -> bytes: |
|
|
|
return self.sign(hashed_preimage, |
|
|
|
sigencode=der_sig_from_r_and_s, |
|
|
|
sigdecode=get_r_and_s_from_der_sig) |
|
|
|
return self.sign(hashed_preimage, sigencode=der_sig_from_r_and_s) |
|
|
|
|
|
|
|
def sign_message(self, message: bytes, is_compressed: bool, algo=lambda x: sha256d(msg_magic(x))) -> bytes: |
|
|
|
def bruteforce_recid(sig_string): |
|
|
@ -503,9 +489,7 @@ class ECPrivkey(ECPubkey): |
|
|
|
|
|
|
|
message = to_bytes(message, 'utf8') |
|
|
|
msg_hash = algo(message) |
|
|
|
sig_string = self.sign(msg_hash, |
|
|
|
sigencode=sig_string_from_r_and_s, |
|
|
|
sigdecode=get_r_and_s_from_sig_string) |
|
|
|
sig_string = self.sign(msg_hash, sigencode=sig_string_from_r_and_s) |
|
|
|
sig65, recid = bruteforce_recid(sig_string) |
|
|
|
return sig65 |
|
|
|
|
|
|
|