Browse Source

ecc: use libsecp256k1 for sign/verify/mul/add

hard-fail-on-bad-server-string
SomberNight 5 years ago
parent
commit
ad408ea832
No known key found for this signature in database GPG Key ID: B33B5F232C6271E9
  1. 230
      electrum/ecc.py
  2. 168
      electrum/ecc_fast.py

230
electrum/ecc.py

@ -30,20 +30,25 @@ import copy
from typing import Union, Tuple, Optional from typing import Union, Tuple, Optional
import ecdsa import ecdsa
from ecdsa.ecdsa import curve_secp256k1, generator_secp256k1 from ecdsa.ecdsa import generator_secp256k1
from ecdsa.curves import SECP256k1 from ecdsa.curves import SECP256k1
from ecdsa.ellipticcurve import Point from ecdsa.ellipticcurve import Point
from .util import bfh, bh2u, assert_bytes, to_bytes, InvalidPassword, profiler, randrange from .util import bfh, bh2u, assert_bytes, to_bytes, InvalidPassword, profiler, randrange
from .crypto import (sha256d, aes_encrypt_with_iv, aes_decrypt_with_iv, hmac_oneshot) from .crypto import (sha256d, aes_encrypt_with_iv, aes_decrypt_with_iv, hmac_oneshot)
from .ecc_fast import do_monkey_patching_of_python_ecdsa_internals_with_libsecp256k1
from . import msqr
from . import constants from . import constants
from .logging import get_logger from .logging import get_logger
# TODO -->>>
import ctypes
from ctypes import (
byref, c_byte, c_int, c_uint, c_char_p, c_size_t, c_void_p, create_string_buffer,
CFUNCTYPE, POINTER, cast
)
from .ecc_fast import _libsecp256k1, SECP256K1_EC_UNCOMPRESSED
# TODO <<<--
_logger = get_logger(__name__) _logger = get_logger(__name__)
do_monkey_patching_of_python_ecdsa_internals_with_libsecp256k1()
CURVE_ORDER = SECP256k1.order CURVE_ORDER = SECP256k1.order
@ -60,7 +65,7 @@ def string_to_number(b: bytes) -> int:
return int.from_bytes(b, byteorder='big', signed=False) return int.from_bytes(b, byteorder='big', signed=False)
def sig_string_from_der_sig(der_sig: bytes, order=CURVE_ORDER) -> bytes: def sig_string_from_der_sig(der_sig: bytes, order=CURVE_ORDER) -> bytes: # TODO use libsecp?
r, s = ecdsa.util.sigdecode_der(der_sig, order) r, s = ecdsa.util.sigdecode_der(der_sig, order)
return ecdsa.util.sigencode_string(r, s, order) return ecdsa.util.sigencode_string(r, s, order)
@ -88,7 +93,7 @@ def sig_string_from_r_and_s(r: int, s: int, order=CURVE_ORDER) -> bytes:
return ecdsa.util.sigencode_string_canonize(r, s, order) return ecdsa.util.sigencode_string_canonize(r, s, order)
def point_to_ser(point, compressed=True) -> Optional[bytes]: def point_to_ser(point, compressed=True) -> Optional[bytes]: # TODO rm?
if isinstance(point, tuple): if isinstance(point, tuple):
assert len(point) == 2, f'unexpected point: {point}' assert len(point) == 2, f'unexpected point: {point}'
x, y = point x, y = point
@ -101,37 +106,22 @@ def point_to_ser(point, compressed=True) -> Optional[bytes]:
return bfh('04'+('%064x' % x)+('%064x' % y)) return bfh('04'+('%064x' % x)+('%064x' % y))
def get_y_coord_from_x(x: int, *, odd: bool) -> int: def _x_and_y_from_pubkey_bytes(pubkey: bytes) -> Tuple[int, int]:
curve = curve_secp256k1 pubkey_ptr = create_string_buffer(64)
_p = curve.p() ret = _libsecp256k1.secp256k1_ec_pubkey_parse(
_a = curve.a() _libsecp256k1.ctx, pubkey_ptr, pubkey, len(pubkey))
_b = curve.b() if not ret:
x = x % _p raise InvalidECPointException('public key could not be parsed or is invalid')
y2 = (pow(x, 3, _p) + _a * x + _b) % _p
y = msqr.modular_sqrt(y2, _p) pubkey_serialized = create_string_buffer(65)
if curve.contains_point(x, y): pubkey_size = c_size_t(65)
if odd == bool(y & 1): _libsecp256k1.secp256k1_ec_pubkey_serialize(
return y _libsecp256k1.ctx, pubkey_serialized, byref(pubkey_size), pubkey_ptr, SECP256K1_EC_UNCOMPRESSED)
return _p - y pubkey_serialized = bytes(pubkey_serialized)
raise InvalidECPointException() assert pubkey_serialized[0] == 0x04, pubkey_serialized
x = int.from_bytes(pubkey_serialized[1:33], byteorder='big', signed=False)
y = int.from_bytes(pubkey_serialized[33:65], byteorder='big', signed=False)
def ser_to_point(ser: bytes) -> Tuple[int, int]: return x, y
if ser[0] not in (0x02, 0x03, 0x04):
raise ValueError('Unexpected first byte: {}'.format(ser[0]))
if ser[0] == 0x04:
return string_to_number(ser[1:33]), string_to_number(ser[33:])
x = string_to_number(ser[1:])
odd = ser[0] == 0x03
return x, get_y_coord_from_x(x, odd=odd)
def _ser_to_python_ecdsa_point(ser: bytes) -> ecdsa.ellipticcurve.Point:
x, y = ser_to_point(ser)
try:
return Point(curve_secp256k1, x, y, CURVE_ORDER)
except:
raise InvalidECPointException()
class InvalidECPointException(Exception): class InvalidECPointException(Exception):
@ -182,23 +172,18 @@ class _MySigningKey(ecdsa.SigningKey):
return r, s return r, s
class _PubkeyForPointAtInfinity:
point = ecdsa.ellipticcurve.INFINITY
@functools.total_ordering @functools.total_ordering
class ECPubkey(object): class ECPubkey(object):
def __init__(self, b: Optional[bytes]): def __init__(self, b: Optional[bytes]):
if b is not None: if b is not None:
assert_bytes(b) assert_bytes(b)
point = _ser_to_python_ecdsa_point(b) self._x, self._y = _x_and_y_from_pubkey_bytes(b)
self._pubkey = ecdsa.ecdsa.Public_key(generator_secp256k1, point)
else: else:
self._pubkey = _PubkeyForPointAtInfinity() self._x, self._y = None, None
@classmethod @classmethod
def from_sig_string(cls, sig_string: bytes, recid: int, msg_hash: bytes): def from_sig_string(cls, sig_string: bytes, recid: int, msg_hash: bytes) -> 'ECPubkey':
assert_bytes(sig_string) assert_bytes(sig_string)
if len(sig_string) != 64: if len(sig_string) != 64:
raise Exception('Wrong encoding') raise Exception('Wrong encoding')
@ -209,9 +194,9 @@ class ECPubkey(object):
return ECPubkey.from_point(ecdsa_point) return ECPubkey.from_point(ecdsa_point)
@classmethod @classmethod
def from_signature65(cls, sig: bytes, msg_hash: bytes): def from_signature65(cls, sig: bytes, msg_hash: bytes) -> Tuple['ECPubkey', bool]:
if len(sig) != 65: if len(sig) != 65:
raise Exception("Wrong encoding") raise Exception(f'wrong encoding used for signature? len={len(sig)} (should be 65)')
nV = sig[0] nV = sig[0]
if nV < 27 or nV >= 35: if nV < 27 or nV >= 35:
raise Exception("Bad encoding") raise Exception("Bad encoding")
@ -224,10 +209,17 @@ class ECPubkey(object):
return cls.from_sig_string(sig[1:], recid, msg_hash), compressed return cls.from_sig_string(sig[1:], recid, msg_hash), compressed
@classmethod @classmethod
def from_point(cls, point): def from_point(cls, point) -> 'ECPubkey':
_bytes = point_to_ser(point, compressed=False) # faster than compressed _bytes = point_to_ser(point, compressed=False) # faster than compressed
return ECPubkey(_bytes) return ECPubkey(_bytes)
@classmethod
def from_x_and_y(cls, x: int, y: int) -> 'ECPubkey':
_bytes = (b'\x04'
+ int.to_bytes(x, length=32, byteorder='big', signed=False)
+ int.to_bytes(y, length=32, byteorder='big', signed=False))
return ECPubkey(_bytes)
def get_public_key_bytes(self, compressed=True): def get_public_key_bytes(self, compressed=True):
if self.is_at_infinity(): raise Exception('point is at infinity') if self.is_at_infinity(): raise Exception('point is at infinity')
return point_to_ser(self.point(), compressed) return point_to_ser(self.point(), compressed)
@ -236,16 +228,49 @@ class ECPubkey(object):
return bh2u(self.get_public_key_bytes(compressed)) return bh2u(self.get_public_key_bytes(compressed))
def point(self) -> Tuple[int, int]: def point(self) -> Tuple[int, int]:
return self._pubkey.point.x(), self._pubkey.point.y() return self.x(), self.y()
def x(self) -> int:
return self._x
def y(self) -> int:
return self._y
def _to_libsecp256k1_pubkey_ptr(self):
pubkey = create_string_buffer(64)
public_pair_bytes = self.get_public_key_bytes(compressed=False)
ret = _libsecp256k1.secp256k1_ec_pubkey_parse(
_libsecp256k1.ctx, pubkey, public_pair_bytes, len(public_pair_bytes))
if not ret:
raise Exception('public key could not be parsed or is invalid')
return pubkey
@classmethod
def _from_libsecp256k1_pubkey_ptr(cls, pubkey) -> 'ECPubkey':
pubkey_serialized = create_string_buffer(65)
pubkey_size = c_size_t(65)
_libsecp256k1.secp256k1_ec_pubkey_serialize(
_libsecp256k1.ctx, pubkey_serialized, byref(pubkey_size), pubkey, SECP256K1_EC_UNCOMPRESSED)
return ECPubkey(bytes(pubkey_serialized))
def __repr__(self): def __repr__(self):
if self.is_at_infinity():
return f"<ECPubkey infinity>"
return f"<ECPubkey {self.get_public_key_hex()}>" return f"<ECPubkey {self.get_public_key_hex()}>"
def __mul__(self, other: int): def __mul__(self, other: int):
if not isinstance(other, int): if not isinstance(other, int):
raise TypeError('multiplication not defined for ECPubkey and {}'.format(type(other))) raise TypeError('multiplication not defined for ECPubkey and {}'.format(type(other)))
ecdsa_point = self._pubkey.point * other
return self.from_point(ecdsa_point) other %= CURVE_ORDER
if self.is_at_infinity() or other == 0:
return point_at_infinity()
pubkey = self._to_libsecp256k1_pubkey_ptr()
ret = _libsecp256k1.secp256k1_ec_pubkey_tweak_mul(_libsecp256k1.ctx, pubkey, other.to_bytes(32, byteorder="big"))
if not ret:
return point_at_infinity()
return ECPubkey._from_libsecp256k1_pubkey_ptr(pubkey)
def __rmul__(self, other: int): def __rmul__(self, other: int):
return self * other return self * other
@ -253,38 +278,36 @@ class ECPubkey(object):
def __add__(self, other): def __add__(self, other):
if not isinstance(other, ECPubkey): if not isinstance(other, ECPubkey):
raise TypeError('addition not defined for ECPubkey and {}'.format(type(other))) raise TypeError('addition not defined for ECPubkey and {}'.format(type(other)))
ecdsa_point = self._pubkey.point + other._pubkey.point if self.is_at_infinity(): return other
return self.from_point(ecdsa_point) if other.is_at_infinity(): return self
def __eq__(self, other): pubkey1 = self._to_libsecp256k1_pubkey_ptr()
return self._pubkey.point.x() == other._pubkey.point.x() \ pubkey2 = other._to_libsecp256k1_pubkey_ptr()
and self._pubkey.point.y() == other._pubkey.point.y() pubkey_sum = create_string_buffer(64)
pubkey1 = cast(pubkey1, c_char_p)
pubkey2 = cast(pubkey2, c_char_p)
array_of_pubkey_ptrs = (c_char_p * 2)(pubkey1, pubkey2)
ret = _libsecp256k1.secp256k1_ec_pubkey_combine(_libsecp256k1.ctx, pubkey_sum, array_of_pubkey_ptrs, 2)
if not ret:
return point_at_infinity()
return ECPubkey._from_libsecp256k1_pubkey_ptr(pubkey_sum)
def __eq__(self, other) -> bool:
if not isinstance(other, ECPubkey):
return False
return self.point() == other.point()
def __ne__(self, other): def __ne__(self, other):
return not (self == other) return not (self == other)
def __hash__(self): def __hash__(self):
return hash(self._pubkey.point.x()) return hash(self.point())
def __lt__(self, other): def __lt__(self, other):
if not isinstance(other, ECPubkey): if not isinstance(other, ECPubkey):
raise TypeError('comparison not defined for ECPubkey and {}'.format(type(other))) raise TypeError('comparison not defined for ECPubkey and {}'.format(type(other)))
return self._pubkey.point.x() < other._pubkey.point.x() return (self.x() or 0) < (other.x() or 0)
def __deepcopy__(self, memo: dict = None):
# note: This custom deepcopy implementation needed as copy.deepcopy(self._pubkey) raises.
if memo is None: memo = {}
cls = self.__class__
result = cls.__new__(cls)
memo[id(self)] = result
for k, v in self.__dict__.items():
if k == '_pubkey' and not self.is_at_infinity():
point = _ser_to_python_ecdsa_point(self.get_public_key_bytes(compressed=False))
_pubkey_copy = ecdsa.ecdsa.Public_key(generator_secp256k1, point)
setattr(result, k, _pubkey_copy)
else:
setattr(result, k, copy.deepcopy(v, memo))
return result
def verify_message_for_address(self, sig65: bytes, message: bytes, algo=lambda x: sha256d(msg_magic(x))) -> None: def verify_message_for_address(self, sig65: bytes, message: bytes, algo=lambda x: sha256d(msg_magic(x))) -> None:
assert_bytes(message) assert_bytes(message)
@ -296,13 +319,23 @@ class ECPubkey(object):
# check message # check message
self.verify_message_hash(sig65[1:], h) self.verify_message_hash(sig65[1:], h)
# TODO return bool instead of raising
def verify_message_hash(self, sig_string: bytes, msg_hash: bytes) -> None: def verify_message_hash(self, sig_string: bytes, msg_hash: bytes) -> None:
assert_bytes(sig_string) assert_bytes(sig_string)
if len(sig_string) != 64: if len(sig_string) != 64:
raise Exception('Wrong encoding') raise Exception(f'wrong encoding used for signature? len={len(sig_string)} (should be 64)')
ecdsa_point = self._pubkey.point if not (isinstance(msg_hash, bytes) and len(msg_hash) == 32):
verifying_key = _MyVerifyingKey.from_public_point(ecdsa_point, curve=SECP256k1) raise Exception("msg_hash must be bytes, and 32 bytes exactly")
verifying_key.verify_digest(sig_string, msg_hash, sigdecode=get_r_and_s_from_sig_string)
sig = create_string_buffer(64)
ret = _libsecp256k1.secp256k1_ecdsa_signature_parse_compact(_libsecp256k1.ctx, sig, sig_string)
if not ret:
raise Exception("Bad signature")
ret = _libsecp256k1.secp256k1_ecdsa_signature_normalize(_libsecp256k1.ctx, sig, sig)
pubkey = self._to_libsecp256k1_pubkey_ptr()
if 1 != _libsecp256k1.secp256k1_ecdsa_verify(_libsecp256k1.ctx, sig, msg_hash, pubkey):
raise Exception("Bad signature")
def encrypt_message(self, message: bytes, magic: bytes = b'BIE1') -> bytes: def encrypt_message(self, message: bytes, magic: bytes = b'BIE1') -> bytes:
""" """
@ -391,7 +424,7 @@ class ECPrivkey(ECPubkey):
self.secret_scalar = secret self.secret_scalar = secret
point = generator_secp256k1 * secret point = generator_secp256k1 * secret
super().__init__(point_to_ser(point)) super().__init__(point_to_ser(point)) # TODO
@classmethod @classmethod
def from_secret_scalar(cls, secret_scalar: int): def from_secret_scalar(cls, secret_scalar: int):
@ -426,24 +459,40 @@ class ECPrivkey(ECPubkey):
def get_secret_bytes(self) -> bytes: def get_secret_bytes(self) -> bytes:
return int.to_bytes(self.secret_scalar, length=32, byteorder='big', signed=False) return int.to_bytes(self.secret_scalar, length=32, byteorder='big', signed=False)
def sign(self, data: bytes, sigencode=None, sigdecode=None) -> bytes: def sign(self, msg_hash: bytes, sigencode=None, sigdecode=None) -> bytes:
if not (isinstance(msg_hash, bytes) and len(msg_hash) == 32):
raise Exception("msg_hash to be signed must be bytes, and 32 bytes exactly")
if sigencode is None: if sigencode is None:
sigencode = sig_string_from_r_and_s sigencode = sig_string_from_r_and_s
if sigdecode is None: if sigdecode is None:
sigdecode = get_r_and_s_from_sig_string sigdecode = get_r_and_s_from_sig_string
private_key = _MySigningKey.from_secret_exponent(self.secret_scalar, curve=SECP256k1)
def sig_encode_r_s(r, s, order): privkey_bytes = self.secret_scalar.to_bytes(32, byteorder="big")
nonce_function = None
sig = create_string_buffer(64)
def sign_with_extra_entropy(extra_entropy):
ret = _libsecp256k1.secp256k1_ecdsa_sign(
_libsecp256k1.ctx, sig, msg_hash, privkey_bytes,
nonce_function, extra_entropy)
if not ret:
raise Exception('the nonce generation function failed, or the private key was invalid')
compact_signature = create_string_buffer(64)
_libsecp256k1.secp256k1_ecdsa_signature_serialize_compact(_libsecp256k1.ctx, compact_signature, sig)
r = int.from_bytes(compact_signature[:32], byteorder="big")
s = int.from_bytes(compact_signature[32:], byteorder="big")
return r, s return r, s
r, s = private_key.sign_digest_deterministic(data, hashfunc=hashlib.sha256, sigencode=sig_encode_r_s)
r, s = sign_with_extra_entropy(extra_entropy=None)
counter = 0 counter = 0
while r >= 2**255: # grind for low R value https://github.com/bitcoin/bitcoin/pull/13666 while r >= 2**255: # grind for low R value https://github.com/bitcoin/bitcoin/pull/13666
counter += 1 counter += 1
extra_entropy = int.to_bytes(counter, 32, 'little') extra_entropy = counter.to_bytes(32, byteorder="little")
r, s = private_key.sign_digest_deterministic(data, hashfunc=hashlib.sha256, sigencode=sig_encode_r_s, extra_entropy=extra_entropy) r, s = sign_with_extra_entropy(extra_entropy=extra_entropy)
sig = sigencode(r, s, CURVE_ORDER) sig = sigencode(r, s, CURVE_ORDER)
public_key = private_key.get_verifying_key() # public_key = private_key.get_verifying_key() # TODO
if not public_key.verify_digest(sig, data, sigdecode=sigdecode): # if not public_key.verify_digest(sig, data, sigdecode=sigdecode):
raise Exception('Sanity check verifying our own signature failed.') # raise Exception('Sanity check verifying our own signature failed.')
return sig return sig
def sign_transaction(self, hashed_preimage: bytes) -> bytes: def sign_transaction(self, hashed_preimage: bytes) -> bytes:
@ -482,12 +531,9 @@ class ECPrivkey(ECPubkey):
if magic_found != magic: if magic_found != magic:
raise Exception('invalid ciphertext: invalid magic bytes') raise Exception('invalid ciphertext: invalid magic bytes')
try: try:
ecdsa_point = _ser_to_python_ecdsa_point(ephemeral_pubkey_bytes) ephemeral_pubkey = ECPubkey(ephemeral_pubkey_bytes)
except InvalidECPointException as e: except InvalidECPointException as e:
raise Exception('invalid ciphertext: invalid ephemeral pubkey') from e raise Exception('invalid ciphertext: invalid ephemeral pubkey') from e
if not ecdsa.ecdsa.point_is_valid(generator_secp256k1, ecdsa_point.x(), ecdsa_point.y()):
raise Exception('invalid ciphertext: invalid ephemeral pubkey')
ephemeral_pubkey = ECPubkey.from_point(ecdsa_point)
ecdh_key = (ephemeral_pubkey * self.secret_scalar).get_public_key_bytes(compressed=True) ecdh_key = (ephemeral_pubkey * self.secret_scalar).get_public_key_bytes(compressed=True)
key = hashlib.sha512(ecdh_key).digest() key = hashlib.sha512(ecdh_key).digest()
iv, key_e, key_m = key[0:16], key[16:32], key[32:] iv, key_e, key_m = key[0:16], key[16:32], key[32:]

168
electrum/ecc_fast.py

@ -5,14 +5,11 @@ import os
import sys import sys
import traceback import traceback
import ctypes import ctypes
from ctypes.util import find_library
from ctypes import ( from ctypes import (
byref, c_byte, c_int, c_uint, c_char_p, c_size_t, c_void_p, create_string_buffer, byref, c_byte, c_int, c_uint, c_char_p, c_size_t, c_void_p, create_string_buffer,
CFUNCTYPE, POINTER, cast CFUNCTYPE, POINTER, cast
) )
import ecdsa
from .logging import get_logger from .logging import get_logger
@ -89,8 +86,8 @@ def load_library():
secp256k1.secp256k1_ec_pubkey_combine.restype = c_int secp256k1.secp256k1_ec_pubkey_combine.restype = c_int
secp256k1.ctx = secp256k1.secp256k1_context_create(SECP256K1_CONTEXT_SIGN | SECP256K1_CONTEXT_VERIFY) secp256k1.ctx = secp256k1.secp256k1_context_create(SECP256K1_CONTEXT_SIGN | SECP256K1_CONTEXT_VERIFY)
r = secp256k1.secp256k1_context_randomize(secp256k1.ctx, os.urandom(32)) ret = secp256k1.secp256k1_context_randomize(secp256k1.ctx, os.urandom(32))
if r: if ret:
return secp256k1 return secp256k1
else: else:
_logger.warning('secp256k1_context_randomize failed') _logger.warning('secp256k1_context_randomize failed')
@ -100,165 +97,8 @@ def load_library():
return None return None
class _patched_functions:
prepared_to_patch = False
monkey_patching_active = False
def _prepare_monkey_patching_of_python_ecdsa_internals_with_libsecp256k1():
if not _libsecp256k1:
return
# save original functions so that we can undo patching (needed for tests)
_patched_functions.orig_sign = staticmethod(ecdsa.ecdsa.Private_key.sign)
_patched_functions.orig_verify = staticmethod(ecdsa.ecdsa.Public_key.verifies)
_patched_functions.orig_mul = staticmethod(ecdsa.ellipticcurve.Point.__mul__)
_patched_functions.orig_add = staticmethod(ecdsa.ellipticcurve.Point.__add__)
curve_secp256k1 = ecdsa.ecdsa.curve_secp256k1
curve_order = ecdsa.curves.SECP256k1.order
point_at_infinity = ecdsa.ellipticcurve.INFINITY
def _get_ptr_to_well_formed_pubkey_string_buffer_from_ecdsa_point(point: ecdsa.ellipticcurve.Point):
assert point.curve() == curve_secp256k1
pubkey = create_string_buffer(64)
public_pair_bytes = b'\4' + point.x().to_bytes(32, byteorder="big") + point.y().to_bytes(32, byteorder="big")
r = _libsecp256k1.secp256k1_ec_pubkey_parse(
_libsecp256k1.ctx, pubkey, public_pair_bytes, len(public_pair_bytes))
if not r:
raise Exception('public key could not be parsed or is invalid')
return pubkey
def _get_ecdsa_point_from_libsecp256k1_pubkey_object(pubkey) -> ecdsa.ellipticcurve.Point:
pubkey_serialized = create_string_buffer(65)
pubkey_size = c_size_t(65)
_libsecp256k1.secp256k1_ec_pubkey_serialize(
_libsecp256k1.ctx, pubkey_serialized, byref(pubkey_size), pubkey, SECP256K1_EC_UNCOMPRESSED)
x = int.from_bytes(pubkey_serialized[1:33], byteorder="big")
y = int.from_bytes(pubkey_serialized[33:], byteorder="big")
return ecdsa.ellipticcurve.Point(curve_secp256k1, x, y, curve_order)
def add(self: ecdsa.ellipticcurve.Point, other: ecdsa.ellipticcurve.Point) -> ecdsa.ellipticcurve.Point:
if self.curve() != curve_secp256k1:
# this operation is not on the secp256k1 curve; use original implementation
return _patched_functions.orig_add(self, other)
if self == point_at_infinity: return other
if other == point_at_infinity: return self
pubkey1 = _get_ptr_to_well_formed_pubkey_string_buffer_from_ecdsa_point(self)
pubkey2 = _get_ptr_to_well_formed_pubkey_string_buffer_from_ecdsa_point(other)
pubkey_sum = create_string_buffer(64)
pubkey1 = cast(pubkey1, c_char_p)
pubkey2 = cast(pubkey2, c_char_p)
array_of_pubkey_ptrs = (c_char_p * 2)(pubkey1, pubkey2)
r = _libsecp256k1.secp256k1_ec_pubkey_combine(_libsecp256k1.ctx, pubkey_sum, array_of_pubkey_ptrs, 2)
if not r:
return point_at_infinity
return _get_ecdsa_point_from_libsecp256k1_pubkey_object(pubkey_sum)
def mul(self: ecdsa.ellipticcurve.Point, other: int) -> ecdsa.ellipticcurve.Point:
if self.curve() != curve_secp256k1:
# this operation is not on the secp256k1 curve; use original implementation
return _patched_functions.orig_mul(self, other)
other %= curve_order
if self == point_at_infinity or other == 0:
return point_at_infinity
pubkey = _get_ptr_to_well_formed_pubkey_string_buffer_from_ecdsa_point(self)
r = _libsecp256k1.secp256k1_ec_pubkey_tweak_mul(_libsecp256k1.ctx, pubkey, other.to_bytes(32, byteorder="big"))
if not r:
return point_at_infinity
return _get_ecdsa_point_from_libsecp256k1_pubkey_object(pubkey)
def sign(self: ecdsa.ecdsa.Private_key, hash: int, random_k: int) -> ecdsa.ecdsa.Signature:
# note: random_k is ignored
if self.public_key.curve != curve_secp256k1:
# this operation is not on the secp256k1 curve; use original implementation
return _patched_functions.orig_sign(self, hash, random_k)
secret_exponent = self.secret_multiplier
nonce_function = None
sig = create_string_buffer(64)
sig_hash_bytes = hash.to_bytes(32, byteorder="big")
def sign_with_extra_entropy(extra_entropy):
ret = _libsecp256k1.secp256k1_ecdsa_sign(
_libsecp256k1.ctx, sig, sig_hash_bytes, secret_exponent.to_bytes(32, byteorder="big"),
nonce_function, extra_entropy)
if not ret:
raise Exception('the nonce generation function failed, or the private key was invalid')
compact_signature = create_string_buffer(64)
_libsecp256k1.secp256k1_ecdsa_signature_serialize_compact(_libsecp256k1.ctx, compact_signature, sig)
r = int.from_bytes(compact_signature[:32], byteorder="big")
s = int.from_bytes(compact_signature[32:], byteorder="big")
return r, s
r, s = sign_with_extra_entropy(extra_entropy=None)
counter = 0
while r >= 2**255: # grind for low R value https://github.com/bitcoin/bitcoin/pull/13666
counter += 1
extra_entropy = counter.to_bytes(32, byteorder="little")
r, s = sign_with_extra_entropy(extra_entropy=extra_entropy)
return ecdsa.ecdsa.Signature(r, s)
def verify(self: ecdsa.ecdsa.Public_key, hash: int, signature: ecdsa.ecdsa.Signature) -> bool:
if self.curve != curve_secp256k1:
# this operation is not on the secp256k1 curve; use original implementation
return _patched_functions.orig_verify(self, hash, signature)
sig = create_string_buffer(64)
input64 = signature.r.to_bytes(32, byteorder="big") + signature.s.to_bytes(32, byteorder="big")
r = _libsecp256k1.secp256k1_ecdsa_signature_parse_compact(_libsecp256k1.ctx, sig, input64)
if not r:
return False
r = _libsecp256k1.secp256k1_ecdsa_signature_normalize(_libsecp256k1.ctx, sig, sig)
public_pair_bytes = b'\4' + self.point.x().to_bytes(32, byteorder="big") + self.point.y().to_bytes(32, byteorder="big")
pubkey = create_string_buffer(64)
r = _libsecp256k1.secp256k1_ec_pubkey_parse(
_libsecp256k1.ctx, pubkey, public_pair_bytes, len(public_pair_bytes))
if not r:
return False
return 1 == _libsecp256k1.secp256k1_ecdsa_verify(_libsecp256k1.ctx, sig, hash.to_bytes(32, byteorder="big"), pubkey)
# save new functions so that we can (re-)do patching
_patched_functions.fast_sign = sign
_patched_functions.fast_verify = verify
_patched_functions.fast_mul = mul
_patched_functions.fast_add = add
_patched_functions.prepared_to_patch = True
def do_monkey_patching_of_python_ecdsa_internals_with_libsecp256k1():
if not _libsecp256k1:
# FIXME logging 'verbosity' is not yet initialised
_logger.info('libsecp256k1 library not available, falling back to python-ecdsa. '
'This means signing operations will be slower.')
return
if not _patched_functions.prepared_to_patch:
raise Exception("can't patch python-ecdsa without preparations")
ecdsa.ecdsa.Private_key.sign = _patched_functions.fast_sign
ecdsa.ecdsa.Public_key.verifies = _patched_functions.fast_verify
ecdsa.ellipticcurve.Point.__mul__ = _patched_functions.fast_mul
ecdsa.ellipticcurve.Point.__add__ = _patched_functions.fast_add
_patched_functions.monkey_patching_active = True
def undo_monkey_patching_of_python_ecdsa_internals_with_libsecp256k1():
if not _libsecp256k1:
return
if not _patched_functions.prepared_to_patch:
raise Exception("can't patch python-ecdsa without preparations")
ecdsa.ecdsa.Private_key.sign = _patched_functions.orig_sign
ecdsa.ecdsa.Public_key.verifies = _patched_functions.orig_verify
ecdsa.ellipticcurve.Point.__mul__ = _patched_functions.orig_mul
ecdsa.ellipticcurve.Point.__add__ = _patched_functions.orig_add
_patched_functions.monkey_patching_active = False
def is_using_fast_ecc(): def is_using_fast_ecc():
return _patched_functions.monkey_patching_active return True # TODO rm
try: try:
@ -266,5 +106,3 @@ try:
except BaseException as e: except BaseException as e:
_logger.warning(f'failed to load libsecp256k1: {repr(e)}') _logger.warning(f'failed to load libsecp256k1: {repr(e)}')
_libsecp256k1 = None _libsecp256k1 = None
_prepare_monkey_patching_of_python_ecdsa_internals_with_libsecp256k1()

Loading…
Cancel
Save