Browse Source

replace TLSLite dependency with minimal RSA implementation

283
ThomasV 10 years ago
parent
commit
e8d30129ea
  1. 1
      RELEASE-NOTES
  2. 47
      lib/asn1tinydecoder.py
  3. 16
      lib/dnssec.py
  4. 29
      lib/paymentrequest.py
  5. 164
      lib/pem.py
  6. 517
      lib/rsakey.py
  7. 90
      lib/x509.py
  8. 1
      setup.py

1
RELEASE-NOTES

@ -1,6 +1,7 @@
# Release 2.4.1
* Use ssl.PROTOCOL_TLSv1
* Fix DNSSEC issues with ECDSA signatures
* Replace TLSLite dependency with minimal RSA implementation
# Release 2.4
* Payment to DNS names storing a Bitcoin addresses (OpenAlias) is

47
lib/asn1tinydecoder.py

@ -121,3 +121,50 @@ def asn1_read_length(der,ix):
####################### END ASN1 DECODER ############################
def decode_OID(s):
s = map(ord, s)
r = []
r.append(s[0] / 40)
r.append(s[0] % 40)
k = 0
for i in s[1:]:
if i < 128:
r.append(i + 128*k)
k = 0
else:
k = (i - 128) + 128*k
return '.'.join(map(str, r))
def encode_OID(oid):
x = map(int, oid.split('.'))
s = chr(x[0]*40 + x[1])
for i in x[2:]:
ss = chr(i % 128)
while i > 128:
i = i / 128
ss = chr(128 + i % 128) + ss
s += ss
return s
def asn1_get_children(der, i):
nodes = []
ii = asn1_node_first_child(der,i)
nodes.append(ii)
while ii[2]<i[2]:
ii = asn1_node_next(der,ii)
nodes.append(ii)
return nodes
def asn1_get_sequence(s):
return map(lambda j: asn1_get_value(s, j), asn1_get_children(s, asn1_node_root(s)))
def asn1_get_dict(der, i):
p = {}
for ii in asn1_get_children(der, i):
for iii in asn1_get_children(der, ii):
iiii = asn1_node_first_child(der, iii)
oid = decode_OID(asn1_get_value_of_type(der, iiii, 'OBJECT IDENTIFIER'))
iiii = asn1_node_next(der, iiii)
value = asn1_get_value(der, iiii)
p[oid] = value
return p

16
lib/dnssec.py

@ -57,16 +57,16 @@ from dns.exception import DNSException
"""
Pure-Python version of dns.dnssec._validate_rsig
Uses tlslite instead of PyCrypto
"""
import ecdsa
import rsakey
def python_validate_rrsig(rrset, rrsig, keys, origin=None, now=None):
from dns.dnssec import ValidationFailure, ECDSAP256SHA256, ECDSAP384SHA384
from dns.dnssec import _find_candidate_keys, _make_hash, _is_ecdsa, _is_rsa, _to_rdata, _make_algorithm_id
import ecdsa
from tlslite.utils.keyfactory import _createPublicRSAKey
from tlslite.utils.cryptomath import bytesToNumber
if isinstance(origin, (str, unicode)):
origin = dns.name.from_text(origin, dns.name.root)
@ -101,9 +101,9 @@ def python_validate_rrsig(rrset, rrsig, keys, origin=None, now=None):
keyptr = keyptr[2:]
rsa_e = keyptr[0:bytes]
rsa_n = keyptr[bytes:]
n = bytesToNumber(bytearray(rsa_n))
e = bytesToNumber(bytearray(rsa_e))
pubkey = _createPublicRSAKey(n, e)
n = ecdsa.util.string_to_number(rsa_n)
e = ecdsa.util.string_to_number(rsa_e)
pubkey = rsakey.RSAKey(n, e)
sig = rrsig.signature
elif _is_ecdsa(rrsig.algorithm):

29
lib/paymentrequest.py

@ -25,6 +25,7 @@ import threading
import time
import traceback
import urlparse
import json
import requests
try:
@ -34,9 +35,11 @@ except ImportError:
import bitcoin
import util
from util import print_error
import transaction
import x509
from util import print_error
import rsakey
REQUEST_HEADERS = {'Accept': 'application/bitcoin-paymentrequest', 'User-Agent': 'Electrum'}
ACK_HEADERS = {'Content-Type':'application/bitcoin-payment','Accept':'application/bitcoin-paymentack','User-Agent':'Electrum'}
@ -52,7 +55,6 @@ PR_UNKNOWN = 2 # sent but not propagated
PR_PAID = 3 # send and propagated
PR_ERROR = 4 # could not parse
import json
def get_payment_request(url):
@ -163,7 +165,9 @@ class PaymentRequest:
prev_x = x509_chain[i-1]
algo, sig, data = prev_x.get_signature()
sig = bytearray(sig)
pubkey = x.publicKey
pubkey = rsakey.RSAKey(x.modulus, x.exponent)
if algo == x509.ALGO_RSA_SHA1:
verify = pubkey.hashAndVerify(sig, data)
elif algo == x509.ALGO_RSA_SHA256:
@ -183,7 +187,8 @@ class PaymentRequest:
self.error = "Certificate not Signed by Provided CA Certificate Chain"
return False
# verify the BIP70 signature
pubkey0 = x509_chain[0].publicKey
x = x509_chain[0]
pubkey0 = rsakey.RSAKey(x.modulus, x.exponent)
sig = paymntreq.signature
paymntreq.signature = ''
s = paymntreq.SerializeToString()
@ -324,20 +329,22 @@ def sign_request_with_alias(pr, alias, alias_privkey):
pr.signature = ec_key.sign_message(message, compressed, address)
def sign_request_with_x509(pr, key_path, cert_path):
import tlslite
import pem
with open(key_path, 'r') as f:
rsakey = tlslite.utils.python_rsakey.Python_RSAKey.parsePEM(f.read())
params = pem.parse_private_key(f.read())
privkey = rsakey.RSAKey(*params)
with open(cert_path, 'r') as f:
chain = tlslite.X509CertChain()
chain.parsePemList(f.read())
s = f.read()
bList = pem.dePemList(s, "CERTIFICATE")
certificates = pb2.X509Certificates()
certificates.certificate.extend(map(lambda x: str(x.bytes), chain.x509List))
certificates.certificate.extend(map(str, bList))
pr.pki_type = 'x509+sha256'
pr.pki_data = certificates.SerializeToString()
msgBytes = bytearray(pr.SerializeToString())
hashBytes = bytearray(hashlib.sha256(msgBytes).digest())
sig = rsakey.sign(x509.PREFIX_RSA_SHA256 + hashBytes)
sig = privkey.sign(x509.PREFIX_RSA_SHA256 + hashBytes)
pr.signature = bytes(sig)
@ -362,8 +369,6 @@ def make_request(config, req):
class InvoiceStore(object):
def __init__(self, config):

164
lib/pem.py

@ -0,0 +1,164 @@
# This module uses code from TLSLlite
# TLSLite Author: Trevor Perrin)
import binascii
from asn1tinydecoder import *
def a2b_base64(s):
try:
b = bytearray(binascii.a2b_base64(s))
except Exception as e:
raise SyntaxError("base64 error: %s" % e)
return b
def b2a_base64(b):
return binascii.b2a_base64(b)
def dePem(s, name):
"""Decode a PEM string into a bytearray of its payload.
The input must contain an appropriate PEM prefix and postfix
based on the input name string, e.g. for name="CERTIFICATE":
-----BEGIN CERTIFICATE-----
MIIBXDCCAUSgAwIBAgIBADANBgkqhkiG9w0BAQUFADAPMQ0wCwYDVQQDEwRUQUNL
...
KoZIhvcNAQEFBQADAwA5kw==
-----END CERTIFICATE-----
The first such PEM block in the input will be found, and its
payload will be base64 decoded and returned.
"""
prefix = "-----BEGIN %s-----" % name
postfix = "-----END %s-----" % name
start = s.find(prefix)
if start == -1:
raise SyntaxError("Missing PEM prefix")
end = s.find(postfix, start+len(prefix))
if end == -1:
raise SyntaxError("Missing PEM postfix")
s = s[start+len("-----BEGIN %s-----" % name) : end]
retBytes = a2b_base64(s) # May raise SyntaxError
return retBytes
def dePemList(s, name):
"""Decode a sequence of PEM blocks into a list of bytearrays.
The input must contain any number of PEM blocks, each with the appropriate
PEM prefix and postfix based on the input name string, e.g. for
name="TACK BREAK SIG". Arbitrary text can appear between and before and
after the PEM blocks. For example:
" Created by TACK.py 0.9.3 Created at 2012-02-01T00:30:10Z -----BEGIN TACK
BREAK SIG-----
ATKhrz5C6JHJW8BF5fLVrnQss6JnWVyEaC0p89LNhKPswvcC9/s6+vWLd9snYTUv
YMEBdw69PUP8JB4AdqA3K6Ap0Fgd9SSTOECeAKOUAym8zcYaXUwpk0+WuPYa7Zmm
SkbOlK4ywqt+amhWbg9txSGUwFO5tWUHT3QrnRlE/e3PeNFXLx5Bckg= -----END TACK
BREAK SIG----- Created by TACK.py 0.9.3 Created at 2012-02-01T00:30:11Z
-----BEGIN TACK BREAK SIG-----
ATKhrz5C6JHJW8BF5fLVrnQss6JnWVyEaC0p89LNhKPswvcC9/s6+vWLd9snYTUv
YMEBdw69PUP8JB4AdqA3K6BVCWfcjN36lx6JwxmZQncS6sww7DecFO/qjSePCxwM
+kdDqX/9/183nmjx6bf0ewhPXkA0nVXsDYZaydN8rJU1GaMlnjcIYxY= -----END TACK
BREAK SIG----- "
All such PEM blocks will be found, decoded, and return in an ordered list
of bytearrays, which may have zero elements if not PEM blocks are found.
"""
bList = []
prefix = "-----BEGIN %s-----" % name
postfix = "-----END %s-----" % name
while 1:
start = s.find(prefix)
if start == -1:
return bList
end = s.find(postfix, start+len(prefix))
if end == -1:
raise SyntaxError("Missing PEM postfix")
s2 = s[start+len(prefix) : end]
retBytes = a2b_base64(s2) # May raise SyntaxError
bList.append(retBytes)
s = s[end+len(postfix) : ]
def pem(b, name):
"""Encode a payload bytearray into a PEM string.
The input will be base64 encoded, then wrapped in a PEM prefix/postfix
based on the name string, e.g. for name="CERTIFICATE":
-----BEGIN CERTIFICATE-----
MIIBXDCCAUSgAwIBAgIBADANBgkqhkiG9w0BAQUFADAPMQ0wCwYDVQQDEwRUQUNL
...
KoZIhvcNAQEFBQADAwA5kw==
-----END CERTIFICATE-----
"""
s1 = b2a_base64(b)[:-1] # remove terminating \n
s2 = ""
while s1:
s2 += s1[:64] + "\n"
s1 = s1[64:]
s = ("-----BEGIN %s-----\n" % name) + s2 + \
("-----END %s-----\n" % name)
return s
def pemSniff(inStr, name):
searchStr = "-----BEGIN %s-----" % name
return searchStr in inStr
def parse_private_key(s):
"""Parse a string containing a PEM-encoded <privateKey>."""
if pemSniff(s, "PRIVATE KEY"):
bytes = dePem(s, "PRIVATE KEY")
return _parsePKCS8(bytes)
elif pemSniff(s, "RSA PRIVATE KEY"):
bytes = dePem(s, "RSA PRIVATE KEY")
return _parseSSLeay(bytes)
else:
raise SyntaxError("Not a PEM private key file")
def _parsePKCS8(bytes):
s = str(bytes)
root = asn1_node_root(s)
version_node = asn1_node_first_child(s, root)
version = bytestr_to_int(asn1_get_value_of_type(s, version_node, 'INTEGER'))
if version != 0:
raise SyntaxError("Unrecognized PKCS8 version")
rsaOID_node = asn1_node_next(s, version_node)
ii = asn1_node_first_child(s, rsaOID_node)
rsaOID = decode_OID(asn1_get_value_of_type(s, ii, 'OBJECT IDENTIFIER'))
if rsaOID != '1.2.840.113549.1.1.1':
raise SyntaxError("Unrecognized AlgorithmIdentifier")
privkey_node = asn1_node_next(s, rsaOID_node)
value = asn1_get_value_of_type(s, privkey_node, 'OCTET STRING')
return _parseASN1PrivateKey(value)
def _parseSSLeay(bytes):
return _parseASN1PrivateKey(str(bytes))
def bytesToNumber(s):
return int(binascii.hexlify(s), 16)
def _parseASN1PrivateKey(s):
root = asn1_node_root(s)
version_node = asn1_node_first_child(s, root)
version = bytestr_to_int(asn1_get_value_of_type(s, version_node, 'INTEGER'))
if version != 0:
raise SyntaxError("Unrecognized RSAPrivateKey version")
n = asn1_node_next(s, version_node)
e = asn1_node_next(s, n)
d = asn1_node_next(s, e)
p = asn1_node_next(s, d)
q = asn1_node_next(s, p)
dP = asn1_node_next(s, q)
dQ = asn1_node_next(s, dP)
qInv = asn1_node_next(s, dQ)
return map(lambda x: bytesToNumber(asn1_get_value_of_type(s, x, 'INTEGER')), [n, e, d, p, q, dP, dQ, qInv])

517
lib/rsakey.py

@ -0,0 +1,517 @@
# This module uses functions from TLSLite (public domain)
#
# TLSLite Authors:
# Trevor Perrin
# Martin von Loewis - python 3 port
# Yngve Pettersen (ported by Paul Sokolovsky) - TLS 1.2
#
"""Pure-Python RSA implementation."""
from __future__ import print_function
import os
import math
import base64
import binascii
from pem import *
# **************************************************************************
# PRNG Functions
# **************************************************************************
# Check that os.urandom works
import zlib
length = len(zlib.compress(os.urandom(1000)))
assert(length > 900)
def getRandomBytes(howMany):
b = bytearray(os.urandom(howMany))
assert(len(b) == howMany)
return b
prngName = "os.urandom"
# **************************************************************************
# Converter Functions
# **************************************************************************
def bytesToNumber(b):
total = 0
multiplier = 1
for count in range(len(b)-1, -1, -1):
byte = b[count]
total += multiplier * byte
multiplier *= 256
return total
def numberToByteArray(n, howManyBytes=None):
"""Convert an integer into a bytearray, zero-pad to howManyBytes.
The returned bytearray may be smaller than howManyBytes, but will
not be larger. The returned bytearray will contain a big-endian
encoding of the input integer (n).
"""
if howManyBytes == None:
howManyBytes = numBytes(n)
b = bytearray(howManyBytes)
for count in range(howManyBytes-1, -1, -1):
b[count] = int(n % 256)
n >>= 8
return b
def mpiToNumber(mpi): #mpi is an openssl-format bignum string
if (ord(mpi[4]) & 0x80) !=0: #Make sure this is a positive number
raise AssertionError()
b = bytearray(mpi[4:])
return bytesToNumber(b)
def numberToMPI(n):
b = numberToByteArray(n)
ext = 0
#If the high-order bit is going to be set,
#add an extra byte of zeros
if (numBits(n) & 0x7)==0:
ext = 1
length = numBytes(n) + ext
b = bytearray(4+ext) + b
b[0] = (length >> 24) & 0xFF
b[1] = (length >> 16) & 0xFF
b[2] = (length >> 8) & 0xFF
b[3] = length & 0xFF
return bytes(b)
# **************************************************************************
# Misc. Utility Functions
# **************************************************************************
def numBits(n):
if n==0:
return 0
s = "%x" % n
return ((len(s)-1)*4) + \
{'0':0, '1':1, '2':2, '3':2,
'4':3, '5':3, '6':3, '7':3,
'8':4, '9':4, 'a':4, 'b':4,
'c':4, 'd':4, 'e':4, 'f':4,
}[s[0]]
return int(math.floor(math.log(n, 2))+1)
def numBytes(n):
if n==0:
return 0
bits = numBits(n)
return int(math.ceil(bits / 8.0))
# **************************************************************************
# Big Number Math
# **************************************************************************
def getRandomNumber(low, high):
if low >= high:
raise AssertionError()
howManyBits = numBits(high)
howManyBytes = numBytes(high)
lastBits = howManyBits % 8
while 1:
bytes = getRandomBytes(howManyBytes)
if lastBits:
bytes[0] = bytes[0] % (1 << lastBits)
n = bytesToNumber(bytes)
if n >= low and n < high:
return n
def gcd(a,b):
a, b = max(a,b), min(a,b)
while b:
a, b = b, a % b
return a
def lcm(a, b):
return (a * b) // gcd(a, b)
#Returns inverse of a mod b, zero if none
#Uses Extended Euclidean Algorithm
def invMod(a, b):
c, d = a, b
uc, ud = 1, 0
while c != 0:
q = d // c
c, d = d-(q*c), c
uc, ud = ud - (q * uc), uc
if d == 1:
return ud % b
return 0
def powMod(base, power, modulus):
if power < 0:
result = pow(base, power*-1, modulus)
result = invMod(result, modulus)
return result
else:
return pow(base, power, modulus)
#Pre-calculate a sieve of the ~100 primes < 1000:
def makeSieve(n):
sieve = list(range(n))
for count in range(2, int(math.sqrt(n))+1):
if sieve[count] == 0:
continue
x = sieve[count] * 2
while x < len(sieve):
sieve[x] = 0
x += sieve[count]
sieve = [x for x in sieve[2:] if x]
return sieve
sieve = makeSieve(1000)
def isPrime(n, iterations=5, display=False):
#Trial division with sieve
for x in sieve:
if x >= n: return True
if n % x == 0: return False
#Passed trial division, proceed to Rabin-Miller
#Rabin-Miller implemented per Ferguson & Schneier
#Compute s, t for Rabin-Miller
if display: print("*", end=' ')
s, t = n-1, 0
while s % 2 == 0:
s, t = s//2, t+1
#Repeat Rabin-Miller x times
a = 2 #Use 2 as a base for first iteration speedup, per HAC
for count in range(iterations):
v = powMod(a, s, n)
if v==1:
continue
i = 0
while v != n-1:
if i == t-1:
return False
else:
v, i = powMod(v, 2, n), i+1
a = getRandomNumber(2, n)
return True
def getRandomPrime(bits, display=False):
if bits < 10:
raise AssertionError()
#The 1.5 ensures the 2 MSBs are set
#Thus, when used for p,q in RSA, n will have its MSB set
#
#Since 30 is lcm(2,3,5), we'll set our test numbers to
#29 % 30 and keep them there
low = ((2 ** (bits-1)) * 3) // 2
high = 2 ** bits - 30
p = getRandomNumber(low, high)
p += 29 - (p % 30)
while 1:
if display: print(".", end=' ')
p += 30
if p >= high:
p = getRandomNumber(low, high)
p += 29 - (p % 30)
if isPrime(p, display=display):
return p
#Unused at the moment...
def getRandomSafePrime(bits, display=False):
if bits < 10:
raise AssertionError()
#The 1.5 ensures the 2 MSBs are set
#Thus, when used for p,q in RSA, n will have its MSB set
#
#Since 30 is lcm(2,3,5), we'll set our test numbers to
#29 % 30 and keep them there
low = (2 ** (bits-2)) * 3//2
high = (2 ** (bits-1)) - 30
q = getRandomNumber(low, high)
q += 29 - (q % 30)
while 1:
if display: print(".", end=' ')
q += 30
if (q >= high):
q = getRandomNumber(low, high)
q += 29 - (q % 30)
#Ideas from Tom Wu's SRP code
#Do trial division on p and q before Rabin-Miller
if isPrime(q, 0, display=display):
p = (2 * q) + 1
if isPrime(p, display=display):
if isPrime(q, display=display):
return p
class RSAKey(object):
def __init__(self, n=0, e=0, d=0, p=0, q=0, dP=0, dQ=0, qInv=0):
if (n and not e) or (e and not n):
raise AssertionError()
self.n = n
self.e = e
self.d = d
self.p = p
self.q = q
self.dP = dP
self.dQ = dQ
self.qInv = qInv
self.blinder = 0
self.unblinder = 0
def __len__(self):
"""Return the length of this key in bits.
@rtype: int
"""
return numBits(self.n)
def hasPrivateKey(self):
return self.d != 0
def hashAndSign(self, bytes):
"""Hash and sign the passed-in bytes.
This requires the key to have a private component. It performs
a PKCS1-SHA1 signature on the passed-in data.
@type bytes: str or L{bytearray} of unsigned bytes
@param bytes: The value which will be hashed and signed.
@rtype: L{bytearray} of unsigned bytes.
@return: A PKCS1-SHA1 signature on the passed-in data.
"""
hashBytes = SHA1(bytearray(bytes))
prefixedHashBytes = self._addPKCS1SHA1Prefix(hashBytes)
sigBytes = self.sign(prefixedHashBytes)
return sigBytes
def hashAndVerify(self, sigBytes, bytes):
"""Hash and verify the passed-in bytes with the signature.
This verifies a PKCS1-SHA1 signature on the passed-in data.
@type sigBytes: L{bytearray} of unsigned bytes
@param sigBytes: A PKCS1-SHA1 signature.
@type bytes: str or L{bytearray} of unsigned bytes
@param bytes: The value which will be hashed and verified.
@rtype: bool
@return: Whether the signature matches the passed-in data.
"""
hashBytes = SHA1(bytearray(bytes))
# Try it with/without the embedded NULL
prefixedHashBytes1 = self._addPKCS1SHA1Prefix(hashBytes, False)
prefixedHashBytes2 = self._addPKCS1SHA1Prefix(hashBytes, True)
result1 = self.verify(sigBytes, prefixedHashBytes1)
result2 = self.verify(sigBytes, prefixedHashBytes2)
return (result1 or result2)
def sign(self, bytes):
"""Sign the passed-in bytes.
This requires the key to have a private component. It performs
a PKCS1 signature on the passed-in data.
@type bytes: L{bytearray} of unsigned bytes
@param bytes: The value which will be signed.
@rtype: L{bytearray} of unsigned bytes.
@return: A PKCS1 signature on the passed-in data.
"""
if not self.hasPrivateKey():
raise AssertionError()
paddedBytes = self._addPKCS1Padding(bytes, 1)
m = bytesToNumber(paddedBytes)
if m >= self.n:
raise ValueError()
c = self._rawPrivateKeyOp(m)
sigBytes = numberToByteArray(c, numBytes(self.n))
return sigBytes
def verify(self, sigBytes, bytes):
"""Verify the passed-in bytes with the signature.
This verifies a PKCS1 signature on the passed-in data.
@type sigBytes: L{bytearray} of unsigned bytes
@param sigBytes: A PKCS1 signature.
@type bytes: L{bytearray} of unsigned bytes
@param bytes: The value which will be verified.
@rtype: bool
@return: Whether the signature matches the passed-in data.
"""
if len(sigBytes) != numBytes(self.n):
return False
paddedBytes = self._addPKCS1Padding(bytes, 1)
c = bytesToNumber(sigBytes)
if c >= self.n:
return False
m = self._rawPublicKeyOp(c)
checkBytes = numberToByteArray(m, numBytes(self.n))
return checkBytes == paddedBytes
def encrypt(self, bytes):
"""Encrypt the passed-in bytes.
This performs PKCS1 encryption of the passed-in data.
@type bytes: L{bytearray} of unsigned bytes
@param bytes: The value which will be encrypted.
@rtype: L{bytearray} of unsigned bytes.
@return: A PKCS1 encryption of the passed-in data.
"""
paddedBytes = self._addPKCS1Padding(bytes, 2)
m = bytesToNumber(paddedBytes)
if m >= self.n:
raise ValueError()
c = self._rawPublicKeyOp(m)
encBytes = numberToByteArray(c, numBytes(self.n))
return encBytes
def decrypt(self, encBytes):
"""Decrypt the passed-in bytes.
This requires the key to have a private component. It performs
PKCS1 decryption of the passed-in data.
@type encBytes: L{bytearray} of unsigned bytes
@param encBytes: The value which will be decrypted.
@rtype: L{bytearray} of unsigned bytes or None.
@return: A PKCS1 decryption of the passed-in data or None if
the data is not properly formatted.
"""
if not self.hasPrivateKey():
raise AssertionError()
if len(encBytes) != numBytes(self.n):
return None
c = bytesToNumber(encBytes)
if c >= self.n:
return None
m = self._rawPrivateKeyOp(c)
decBytes = numberToByteArray(m, numBytes(self.n))
#Check first two bytes
if decBytes[0] != 0 or decBytes[1] != 2:
return None
#Scan through for zero separator
for x in range(1, len(decBytes)-1):
if decBytes[x]== 0:
break
else:
return None
return decBytes[x+1:] #Return everything after the separator
# **************************************************************************
# Helper Functions for RSA Keys
# **************************************************************************
def _addPKCS1SHA1Prefix(self, bytes, withNULL=True):
# There is a long history of confusion over whether the SHA1
# algorithmIdentifier should be encoded with a NULL parameter or
# with the parameter omitted. While the original intention was
# apparently to omit it, many toolkits went the other way. TLS 1.2
# specifies the NULL should be included, and this behavior is also
# mandated in recent versions of PKCS #1, and is what tlslite has
# always implemented. Anyways, verification code should probably
# accept both. However, nothing uses this code yet, so this is
# all fairly moot.
if not withNULL:
prefixBytes = bytearray(\
[0x30,0x1f,0x30,0x07,0x06,0x05,0x2b,0x0e,0x03,0x02,0x1a,0x04,0x14])
else:
prefixBytes = bytearray(\
[0x30,0x21,0x30,0x09,0x06,0x05,0x2b,0x0e,0x03,0x02,0x1a,0x05,0x00,0x04,0x14])
prefixedBytes = prefixBytes + bytes
return prefixedBytes
def _addPKCS1Padding(self, bytes, blockType):
padLength = (numBytes(self.n) - (len(bytes)+3))
if blockType == 1: #Signature padding
pad = [0xFF] * padLength
elif blockType == 2: #Encryption padding
pad = bytearray(0)
while len(pad) < padLength:
padBytes = getRandomBytes(padLength * 2)
pad = [b for b in padBytes if b != 0]
pad = pad[:padLength]
else:
raise AssertionError()
padding = bytearray([0,blockType] + pad + [0])
paddedBytes = padding + bytes
return paddedBytes
def _rawPrivateKeyOp(self, m):
#Create blinding values, on the first pass:
if not self.blinder:
self.unblinder = getRandomNumber(2, self.n)
self.blinder = powMod(invMod(self.unblinder, self.n), self.e,
self.n)
#Blind the input
m = (m * self.blinder) % self.n
#Perform the RSA operation
c = self._rawPrivateKeyOpHelper(m)
#Unblind the output
c = (c * self.unblinder) % self.n
#Update blinding values
self.blinder = (self.blinder * self.blinder) % self.n
self.unblinder = (self.unblinder * self.unblinder) % self.n
#Return the output
return c
def _rawPrivateKeyOpHelper(self, m):
#Non-CRT version
#c = powMod(m, self.d, self.n)
#CRT version (~3x faster)
s1 = powMod(m, self.dP, self.p)
s2 = powMod(m, self.dQ, self.q)
h = ((s1 - s2) * self.qInv) % self.p
c = s2 + self.q * h
return c
def _rawPublicKeyOp(self, c):
m = powMod(c, self.e, self.n)
return m
def acceptsPassword(self):
return False
def generate(bits):
key = Python_RSAKey()
p = getRandomPrime(bits//2, False)
q = getRandomPrime(bits//2, False)
t = lcm(p-1, q-1)
key.n = p * q
key.e = 65537
key.d = invMod(key.e, t)
key.p = p
key.q = q
key.dP = key.d % (p-1)
key.dQ = key.d % (q-1)
key.qInv = invMod(q, p)
return key
generate = staticmethod(generate)

90
lib/x509.py

@ -20,17 +20,12 @@
from datetime import datetime
import sys
import tlslite
import util
from util import profiler, print_error
from asn1tinydecoder import asn1_node_root, asn1_get_all, asn1_get_value, \
asn1_get_value_of_type, asn1_node_next, asn1_node_first_child, \
asn1_read_length, asn1_node_is_child_of, \
bytestr_to_int, bitstr_to_bytestr
# workaround https://github.com/trevp/tlslite/issues/15
tlslite.utils.cryptomath.pycryptoLoaded = False
from asn1tinydecoder import *
import ecdsa
import hashlib
# algo OIDs
@ -50,61 +45,13 @@ class CertificateError(Exception):
pass
def decode_OID(s):
s = map(ord, s)
r = []
r.append(s[0] / 40)
r.append(s[0] % 40)
k = 0
for i in s[1:]:
if i < 128:
r.append(i + 128*k)
k = 0
else:
k = (i - 128) + 128*k
return '.'.join(map(str, r))
def encode_OID(oid):
x = map(int, oid.split('.'))
s = chr(x[0]*40 + x[1])
for i in x[2:]:
ss = chr(i % 128)
while i > 128:
i = i / 128
ss = chr(128 + i % 128) + ss
s += ss
return s
def asn1_get_children(der, i):
nodes = []
ii = asn1_node_first_child(der,i)
nodes.append(ii)
while ii[2]<i[2]:
ii = asn1_node_next(der,ii)
nodes.append(ii)
return nodes
def asn1_get_sequence(s):
return map(lambda j: asn1_get_value(s, j), asn1_get_children(s, asn1_node_root(s)))
def asn1_get_dict(der, i):
p = {}
for ii in asn1_get_children(der, i):
for iii in asn1_get_children(der, ii):
iiii = asn1_node_first_child(der, iii)
oid = decode_OID(asn1_get_value_of_type(der, iiii, 'OBJECT IDENTIFIER'))
iiii = asn1_node_next(der, iiii)
value = asn1_get_value(der, iiii)
p[oid] = value
return p
class X509(tlslite.X509):
class X509(object):
def parseBinary(self, b):
# call tlslite method first
tlslite.X509.parseBinary(self, b)
self.bytes = bytearray(b)
der = str(b)
root = asn1_node_root(der)
@ -139,8 +86,25 @@ class X509(tlslite.X509):
# subject
subject = asn1_node_next(der, validity)
self.subject = asn1_get_dict(der, subject)
subject_pki = asn1_node_next(der, subject)
public_key_algo = asn1_node_first_child(der, subject_pki)
ii = asn1_node_first_child(der, public_key_algo)
self.public_key_algo = decode_OID(asn1_get_value_of_type(der, ii, 'OBJECT IDENTIFIER'))
# pubkey modulus and exponent
subject_public_key = asn1_node_next(der, public_key_algo)
spk = asn1_get_value_of_type(der, subject_public_key, 'BIT STRING')
spk = bitstr_to_bytestr(spk)
r = asn1_node_root(spk)
modulus = asn1_node_first_child(spk, r)
exponent = asn1_node_next(spk, modulus)
rsa_n = asn1_get_value_of_type(spk, modulus, 'INTEGER')
rsa_e = asn1_get_value_of_type(spk, exponent, 'INTEGER')
self.modulus = ecdsa.util.string_to_number(rsa_n)
self.exponent = ecdsa.util.string_to_number(rsa_e)
# extensions
self.CA = False
self.AKI = None
@ -198,10 +162,8 @@ class X509(tlslite.X509):
if not_after <= now:
raise CertificateError('Certificate has expired.')
class X509CertChain(tlslite.X509CertChain):
pass
def getFingerprint(self):
return hashlib.sha1(self.bytes).digest()
@ -209,11 +171,12 @@ class X509CertChain(tlslite.X509CertChain):
@profiler
def load_certificates(ca_path):
import pem
ca_list = {}
ca_keyID = {}
with open(ca_path, 'r') as f:
s = f.read()
bList = tlslite.utils.pem.dePemList(s, "CERTIFICATE")
bList = pem.dePemList(s, "CERTIFICATE")
for b in bList:
x = X509()
try:
@ -238,7 +201,6 @@ def int_to_bytestr(i):
return s
def create_csr(commonName, challenge, k):
import ecdsa, hashlib
from bitcoin import point_to_ser
private_key = ecdsa.SigningKey.from_string(k, curve = ecdsa.SECP256k1)
public_key = private_key.get_verifying_key()

1
setup.py

@ -35,7 +35,6 @@ setup(
'requests',
'qrcode',
'protobuf',
'tlslite',
'dnspython',
],
package_dir={

Loading…
Cancel
Save