|
|
@ -20,34 +20,26 @@ |
|
|
|
from datetime import datetime |
|
|
|
import sys |
|
|
|
|
|
|
|
import pyasn1 |
|
|
|
import pyasn1_modules |
|
|
|
import tlslite |
|
|
|
import util |
|
|
|
from util import profiler, print_error |
|
|
|
|
|
|
|
from asn1tinydecoder import asn1_node_root, asn1_get_all, asn1_get_value, \ |
|
|
|
asn1_get_value_of_type, asn1_node_next, asn1_node_first_child, \ |
|
|
|
asn1_read_length, asn1_node_is_child_of, \ |
|
|
|
bytestr_to_int, bitstr_to_bytestr |
|
|
|
|
|
|
|
# workaround https://github.com/trevp/tlslite/issues/15 |
|
|
|
tlslite.utils.cryptomath.pycryptoLoaded = False |
|
|
|
|
|
|
|
|
|
|
|
from pyasn1.codec.der import decoder, encoder |
|
|
|
from pyasn1.type.univ import Any, ObjectIdentifier, OctetString |
|
|
|
from pyasn1.type.char import BMPString, IA5String, UTF8String |
|
|
|
from pyasn1.type.useful import GeneralizedTime |
|
|
|
from pyasn1_modules.rfc2459 import (Certificate, DirectoryString, |
|
|
|
SubjectAltName, GeneralNames, |
|
|
|
GeneralName) |
|
|
|
from pyasn1_modules.rfc2459 import id_ce_subjectAltName as SUBJECT_ALT_NAME |
|
|
|
from pyasn1_modules.rfc2459 import id_at_commonName as COMMON_NAME |
|
|
|
from pyasn1_modules.rfc2459 import id_at_organizationalUnitName as OU_NAME |
|
|
|
from pyasn1_modules.rfc2459 import id_ce_basicConstraints, BasicConstraints |
|
|
|
XMPP_ADDR = ObjectIdentifier('1.3.6.1.5.5.7.8.5') |
|
|
|
SRV_NAME = ObjectIdentifier('1.3.6.1.5.5.7.8.7') |
|
|
|
|
|
|
|
# algo OIDs |
|
|
|
ALGO_RSA_SHA1 = ObjectIdentifier('1.2.840.113549.1.1.5') |
|
|
|
ALGO_RSA_SHA256 = ObjectIdentifier('1.2.840.113549.1.1.11') |
|
|
|
ALGO_RSA_SHA384 = ObjectIdentifier('1.2.840.113549.1.1.12') |
|
|
|
ALGO_RSA_SHA512 = ObjectIdentifier('1.2.840.113549.1.1.13') |
|
|
|
ALGO_RSA_SHA1 = '1.2.840.113549.1.1.5' |
|
|
|
ALGO_RSA_SHA256 = '1.2.840.113549.1.1.11' |
|
|
|
ALGO_RSA_SHA384 = '1.2.840.113549.1.1.12' |
|
|
|
ALGO_RSA_SHA512 = '1.2.840.113549.1.1.13' |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# prefixes, see http://stackoverflow.com/questions/3713774/c-sharp-how-to-calculate-asn-1-der-encoding-of-a-particular-hash-algorithm |
|
|
|
PREFIX_RSA_SHA256 = bytearray([0x30,0x31,0x30,0x0d,0x06,0x09,0x60,0x86,0x48,0x01,0x65,0x03,0x04,0x02,0x01,0x05,0x00,0x04,0x20]) |
|
|
@ -62,6 +54,21 @@ def decode_str(data): |
|
|
|
encoding = 'utf-16-be' if isinstance(data, BMPString) else 'utf-8' |
|
|
|
return bytes(data).decode(encoding) |
|
|
|
|
|
|
|
def decode_OID(s): |
|
|
|
s = map(ord, s) |
|
|
|
r = [] |
|
|
|
r.append(s[0] / 40) |
|
|
|
r.append(s[0] % 40) |
|
|
|
k = 0 |
|
|
|
for i in s[1:]: |
|
|
|
if i<128: |
|
|
|
r.append(i + 128*k) |
|
|
|
k = 0 |
|
|
|
else: |
|
|
|
k = (i - 128) + 128*k |
|
|
|
return '.'.join(map(str,r)) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class X509(tlslite.X509): |
|
|
|
"""Child class of tlslite.X509 that uses pyasn1 to parse cert |
|
|
@ -69,182 +76,116 @@ class X509(tlslite.X509): |
|
|
|
should try to do everything in tlslite. |
|
|
|
""" |
|
|
|
|
|
|
|
def slow_parse(self): |
|
|
|
self.cert = decoder.decode(str(self.bytes), asn1Spec=Certificate())[0] |
|
|
|
self.tbs = self.cert.getComponentByName('tbsCertificate') |
|
|
|
self.subject = self.tbs.getComponentByName('subject') |
|
|
|
self.extensions = self.tbs.getComponentByName('extensions') or [] |
|
|
|
def get_children(self, der, i): |
|
|
|
nodes = [] |
|
|
|
ii = asn1_node_first_child(der,i) |
|
|
|
while True: |
|
|
|
nodes.append(ii) |
|
|
|
ii = asn1_node_next(der,ii) |
|
|
|
if ii[0] > i[2]: |
|
|
|
break |
|
|
|
return nodes |
|
|
|
|
|
|
|
def get_dict(self, der, i): |
|
|
|
p = {} |
|
|
|
for ii in self.get_children(der, i): |
|
|
|
for iii in self.get_children(der, ii): |
|
|
|
iiii = asn1_node_first_child(der, iii) |
|
|
|
oid = decode_OID(asn1_get_value_of_type(der, iiii, 'OBJECT IDENTIFIER')) |
|
|
|
iiii = asn1_node_next(der, iiii) |
|
|
|
value = asn1_get_value(der, iiii) |
|
|
|
p[oid] = value |
|
|
|
return p |
|
|
|
|
|
|
|
def parseBinary(self, b): |
|
|
|
|
|
|
|
# call tlslite method first |
|
|
|
tlslite.X509.parseBinary(self, b) |
|
|
|
|
|
|
|
der = str(b) |
|
|
|
root = asn1_node_root(der) |
|
|
|
cert = asn1_node_first_child(der, root) |
|
|
|
# data for signature |
|
|
|
self.data = asn1_get_all(der, cert) |
|
|
|
|
|
|
|
# optional version field |
|
|
|
if asn1_get_value(der, cert)[0] == chr(0xa0): |
|
|
|
version = asn1_node_first_child(der, cert) |
|
|
|
serial_number = asn1_node_next(der, version) |
|
|
|
else: |
|
|
|
serial_number = asn1_node_first_child(der, cert) |
|
|
|
self.serial_number = bytestr_to_int(asn1_get_value_of_type(der, serial_number, 'INTEGER')) |
|
|
|
|
|
|
|
# signature algorithm |
|
|
|
sig_algo = asn1_node_next(der, serial_number) |
|
|
|
ii = asn1_node_first_child(der, sig_algo) |
|
|
|
self.sig_algo = decode_OID(asn1_get_value_of_type(der, ii, 'OBJECT IDENTIFIER')) |
|
|
|
|
|
|
|
# issuer |
|
|
|
issuer = asn1_node_next(der, sig_algo) |
|
|
|
self.issuer = self.get_dict(der, issuer) |
|
|
|
|
|
|
|
# validity |
|
|
|
validity = asn1_node_next(der, issuer) |
|
|
|
ii = asn1_node_first_child(der, validity) |
|
|
|
self.notBefore = asn1_get_value_of_type(der, ii, 'UTCTime') |
|
|
|
ii = asn1_node_next(der,ii) |
|
|
|
self.notAfter = asn1_get_value_of_type(der, ii, 'UTCTime') |
|
|
|
|
|
|
|
# subject |
|
|
|
subject = asn1_node_next(der, validity) |
|
|
|
self.subject = self.get_dict(der, subject) |
|
|
|
subject_pki = asn1_node_next(der, subject) |
|
|
|
|
|
|
|
# optional fields: issuer_uid, subject_uid, extensions |
|
|
|
i = subject_pki |
|
|
|
self.CA = False |
|
|
|
while True: |
|
|
|
i = asn1_node_next(der, i) |
|
|
|
if i[0] > cert[2]: |
|
|
|
break |
|
|
|
for ii in self.get_children(der, i): |
|
|
|
for iii in self.get_children(der, ii): |
|
|
|
iiii = asn1_node_first_child(der, iii) |
|
|
|
oid = decode_OID(asn1_get_value_of_type(der, iiii, 'OBJECT IDENTIFIER')) |
|
|
|
iiii = asn1_node_next(der, iiii) |
|
|
|
value = asn1_get_value(der, iiii) |
|
|
|
if oid == '2.5.29.19': # basic constraints |
|
|
|
self.CA = value |
|
|
|
else: |
|
|
|
pass |
|
|
|
|
|
|
|
# cert signature |
|
|
|
cert_sig_algo = asn1_node_next(der, cert) |
|
|
|
ii = asn1_node_first_child(der, cert_sig_algo) |
|
|
|
self.cert_sig_algo = decode_OID(asn1_get_value_of_type(der, ii, 'OBJECT IDENTIFIER')) |
|
|
|
cert_sig = asn1_node_next(der, cert_sig_algo) |
|
|
|
self.signature = asn1_get_value(der, cert_sig)[1:] |
|
|
|
|
|
|
|
|
|
|
|
def get_common_name(self): |
|
|
|
return self.extract_names()['CN'] |
|
|
|
return self.subject.get('2.5.4.3') |
|
|
|
|
|
|
|
def get_issuer(self): |
|
|
|
results = {'CN': None, 'OU': None,} |
|
|
|
issuer = self.tbs.getComponentByName('issuer') |
|
|
|
# Extract the CommonName(s) from the cert. |
|
|
|
for rdnss in issuer: |
|
|
|
for rdns in rdnss: |
|
|
|
for name in rdns: |
|
|
|
oid = name.getComponentByName('type') |
|
|
|
value = name.getComponentByName('value') |
|
|
|
|
|
|
|
if oid == COMMON_NAME: |
|
|
|
value = decoder.decode(value, asn1Spec=DirectoryString())[0] |
|
|
|
value = decode_str(value.getComponent()) |
|
|
|
results['CN'] = value |
|
|
|
|
|
|
|
elif oid == OU_NAME: |
|
|
|
value = decoder.decode(value, asn1Spec=DirectoryString())[0] |
|
|
|
value = decode_str(value.getComponent()) |
|
|
|
results['OU'] = value |
|
|
|
return results |
|
|
|
|
|
|
|
def extract_names(self): |
|
|
|
results = {'CN': None, |
|
|
|
'DNS': set(), |
|
|
|
'SRV': set(), |
|
|
|
'URI': set(), |
|
|
|
'XMPPAddr': set(), |
|
|
|
'OU': None,} |
|
|
|
|
|
|
|
# Extract the CommonName(s) from the cert. |
|
|
|
for rdnss in self.subject: |
|
|
|
for rdns in rdnss: |
|
|
|
for name in rdns: |
|
|
|
oid = name.getComponentByName('type') |
|
|
|
value = name.getComponentByName('value') |
|
|
|
|
|
|
|
if oid == COMMON_NAME: |
|
|
|
value = decoder.decode(value, asn1Spec=DirectoryString())[0] |
|
|
|
value = decode_str(value.getComponent()) |
|
|
|
results['CN'] = value |
|
|
|
|
|
|
|
elif oid == OU_NAME: |
|
|
|
value = decoder.decode(value, asn1Spec=DirectoryString())[0] |
|
|
|
value = decode_str(value.getComponent()) |
|
|
|
results['OU'] = value |
|
|
|
|
|
|
|
# Extract the Subject Alternate Names (DNS, SRV, URI, XMPPAddr) |
|
|
|
for extension in self.extensions: |
|
|
|
oid = extension.getComponentByName('extnID') |
|
|
|
if oid != SUBJECT_ALT_NAME: |
|
|
|
continue |
|
|
|
|
|
|
|
value = decoder.decode(extension.getComponentByName('extnValue'), |
|
|
|
asn1Spec=OctetString())[0] |
|
|
|
sa_names = decoder.decode(value, asn1Spec=SubjectAltName())[0] |
|
|
|
for name in sa_names: |
|
|
|
name_type = name.getName() |
|
|
|
if name_type == 'dNSName': |
|
|
|
results['DNS'].add(decode_str(name.getComponent())) |
|
|
|
if name_type == 'uniformResourceIdentifier': |
|
|
|
value = decode_str(name.getComponent()) |
|
|
|
if value.startswith('xmpp:'): |
|
|
|
results['URI'].add(value[5:]) |
|
|
|
elif name_type == 'otherName': |
|
|
|
name = name.getComponent() |
|
|
|
|
|
|
|
oid = name.getComponentByName('type-id') |
|
|
|
value = name.getComponentByName('value') |
|
|
|
|
|
|
|
if oid == XMPP_ADDR: |
|
|
|
value = decoder.decode(value, asn1Spec=UTF8String())[0] |
|
|
|
results['XMPPAddr'].add(decode_str(value)) |
|
|
|
elif oid == SRV_NAME: |
|
|
|
value = decoder.decode(value, asn1Spec=IA5String())[0] |
|
|
|
results['SRV'].add(decode_str(value)) |
|
|
|
return results |
|
|
|
return self.issuer.get('2.5.4.3') |
|
|
|
|
|
|
|
def get_signature(self): |
|
|
|
return self.cert_sig_algo, self.signature, self.data |
|
|
|
|
|
|
|
def check_ca(self): |
|
|
|
for extension in self.extensions: |
|
|
|
oid = extension.getComponentByName('extnID') |
|
|
|
if oid != id_ce_basicConstraints: |
|
|
|
continue |
|
|
|
value = decoder.decode(extension.getComponentByName('extnValue'), |
|
|
|
asn1Spec=OctetString())[0] |
|
|
|
constraints = decoder.decode(value, asn1Spec=BasicConstraints())[0] |
|
|
|
return bool(constraints[0]) |
|
|
|
|
|
|
|
def extract_sig(self): |
|
|
|
signature = self.cert.getComponentByName('signatureValue') |
|
|
|
algorithm = self.cert.getComponentByName('signatureAlgorithm') |
|
|
|
data = encoder.encode(self.tbs) |
|
|
|
s = encoder.encode(signature) |
|
|
|
return algorithm, s, data |
|
|
|
|
|
|
|
|
|
|
|
def extract_pubkey(self): |
|
|
|
pki = self.tbs.getComponentByName('subjectPublicKeyInfo') |
|
|
|
algo = pki.getComponentByName('algorithm') |
|
|
|
algorithm = algo.getComponentByName('algorithm') |
|
|
|
parameters = algo.getComponentByName('parameters') |
|
|
|
subjectPublicKey = pki.getComponentByName('subjectPublicKey') |
|
|
|
return algorithm, parameters, encoder.encode(subjectPublicKey) |
|
|
|
|
|
|
|
|
|
|
|
def extract_dates(self): |
|
|
|
validity = self.tbs.getComponentByName('validity') |
|
|
|
not_before = validity.getComponentByName('notBefore') |
|
|
|
not_before = str(not_before.getComponent()) |
|
|
|
not_after = validity.getComponentByName('notAfter') |
|
|
|
not_after = str(not_after.getComponent()) |
|
|
|
if isinstance(not_before, GeneralizedTime): |
|
|
|
not_before = datetime.strptime(not_before, '%Y%m%d%H%M%SZ') |
|
|
|
else: |
|
|
|
not_before = datetime.strptime(not_before, '%y%m%d%H%M%SZ') |
|
|
|
if isinstance(not_after, GeneralizedTime): |
|
|
|
not_after = datetime.strptime(not_after, '%Y%m%d%H%M%SZ') |
|
|
|
else: |
|
|
|
not_after = datetime.strptime(not_after, '%y%m%d%H%M%SZ') |
|
|
|
return not_before, not_after |
|
|
|
|
|
|
|
def get_ttl(self): |
|
|
|
not_before, not_after = self.extract_dates() |
|
|
|
if not_after is None: |
|
|
|
return None |
|
|
|
return not_after - datetime.utcnow() |
|
|
|
return self.CA |
|
|
|
|
|
|
|
def check_date(self): |
|
|
|
not_before, not_after = self.extract_dates() |
|
|
|
now = datetime.utcnow() |
|
|
|
import time |
|
|
|
now = time.time() |
|
|
|
TIMESTAMP_FMT = '%y%m%d%H%M%SZ' |
|
|
|
not_before = time.mktime(time.strptime(self.notBefore, TIMESTAMP_FMT)) |
|
|
|
not_after = time.mktime(time.strptime(self.notAfter, TIMESTAMP_FMT)) |
|
|
|
if not_before > now: |
|
|
|
raise CertificateError( |
|
|
|
'Certificate has not entered its valid date range.') |
|
|
|
raise CertificateError('Certificate has not entered its valid date range.') |
|
|
|
if not_after <= now: |
|
|
|
raise CertificateError( |
|
|
|
'Certificate has expired.') |
|
|
|
raise CertificateError('Certificate has expired.') |
|
|
|
|
|
|
|
def check_name(self, expected): |
|
|
|
cert_names = self.extract_names() |
|
|
|
if '.' in expected: |
|
|
|
expected_wild = expected[expected.index('.'):] |
|
|
|
else: |
|
|
|
expected_wild = expected |
|
|
|
expected_srv = '_xmpp-client.%s' % expected |
|
|
|
for name in cert_names['XMPPAddr']: |
|
|
|
if name == expected: |
|
|
|
return True |
|
|
|
for name in cert_names['SRV']: |
|
|
|
if name == expected_srv or name == expected: |
|
|
|
return True |
|
|
|
for name in cert_names['DNS']: |
|
|
|
if name == expected: |
|
|
|
return True |
|
|
|
if name.startswith('*'): |
|
|
|
if '.' in name: |
|
|
|
name_wild = name[name.index('.'):] |
|
|
|
else: |
|
|
|
name_wild = name |
|
|
|
if expected_wild == name_wild: |
|
|
|
return True |
|
|
|
for name in cert_names['URI']: |
|
|
|
if name == expected: |
|
|
|
return True |
|
|
|
if cert_names['CN'] == expected: |
|
|
|
return True |
|
|
|
raise CertificateError( |
|
|
|
'Could not match certficate against hostname: %s' % expected) |
|
|
|
|
|
|
|
|
|
|
|
class X509CertChain(tlslite.X509CertChain): |
|
|
@ -253,6 +194,8 @@ class X509CertChain(tlslite.X509CertChain): |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@profiler |
|
|
|
def load_certificates(ca_path): |
|
|
|
ca_list = {} |
|
|
|
with open(ca_path, 'r') as f: |
|
|
@ -262,8 +205,11 @@ def load_certificates(ca_path): |
|
|
|
x = X509() |
|
|
|
try: |
|
|
|
x.parseBinary(b) |
|
|
|
x.check_date() |
|
|
|
except Exception as e: |
|
|
|
util.print_error("cannot parse cert:", e) |
|
|
|
util.print_error("cert error:", e) |
|
|
|
continue |
|
|
|
ca_list[x.getFingerprint()] = x |
|
|
|
ca_list[x.get_common_name()] = x |
|
|
|
|
|
|
|
|
|
|
|
return ca_list |
|
|
|