Browse Source

add root certificate to chain if missing. fixes #1137

283
ThomasV 10 years ago
parent
commit
4fe32d2ad1
  1. 84
      lib/paymentrequest.py
  2. 21
      lib/x509.py

84
lib/paymentrequest.py

@ -106,23 +106,26 @@ class PaymentRequest:
except:
self.error = "cannot parse payment request"
return
self.details = pb2.PaymentDetails()
self.details.ParseFromString(self.data.serialized_payment_details)
for o in self.details.outputs:
addr = transaction.get_address_from_output_script(o.script)[1]
self.outputs.append(('address', addr, o.amount))
self.memo = self.details.memo
self.payment_url = self.details.payment_url
def verify(self):
if not ca_list:
self.error = "Trusted certificate authorities list not found"
return False
paymntreq = self.data
if not paymntreq.signature:
self.error = "No signature"
return
cert = pb2.X509Certificates()
cert.ParseFromString(paymntreq.pki_data)
cert_num = len(cert.certificate)
x509_chain = []
for i in range(cert_num):
x = x509.X509()
@ -140,15 +143,39 @@ class PaymentRequest:
if not x.check_ca():
self.error = "ERROR: Supplied CA Certificate Error"
return
if not cert_num > 1:
self.error = "ERROR: CA Certificate Chain Not Provided by Payment Processor"
return False
# if the root CA is not supplied, add it to the chain
ca = x509_chain[cert_num-1]
supplied_CA_fingerprint = ca.getFingerprint()
supplied_CA_names = ca.extract_names()
CA_OU = supplied_CA_names['OU']
x = ca_list.get(supplied_CA_fingerprint)
if x:
x.slow_parse()
names = x.extract_names()
assert names['CN'] == supplied_CA_names['CN']
else:
issuer = ca.get_issuer()
for x in ca_list.values():
try:
x.slow_parse()
names = x.extract_names()
except Exception as e:
util.print_error("cannot parse cert:", e)
continue
if names.get('CN') == issuer.get('CN'):
x509_chain.append(x)
break
else:
self.error = "Supplied CA Not Found in Trusted CA Store."
return False
# verify the chain of signatures
cert_num = len(cert.certificate)
for i in range(1, cert_num):
x = x509_chain[i]
prev_x = x509_chain[i-1]
algo, sig, data = prev_x.extract_sig()
sig = bytearray(sig[5:])
pubkey = x.publicKey
@ -166,35 +193,17 @@ class PaymentRequest:
else:
self.error = "Algorithm not supported"
util.print_error(self.error, algo.getComponentByName('algorithm'))
return
return False
if not verify:
self.error = "Certificate not Signed by Provided CA Certificate Chain"
return
ca = x509_chain[cert_num-1]
supplied_CA_fingerprint = ca.getFingerprint()
supplied_CA_names = ca.extract_names()
CA_OU = supplied_CA_names['OU']
x = ca_list.get(supplied_CA_fingerprint)
if x:
x.slow_parse()
names = x.extract_names()
CA_match = True
if names['CN'] != supplied_CA_names['CN']:
print "ERROR: Trusted CA CN Mismatch; however CA has trusted fingerprint"
print "Payment will continue with manual verification."
else:
CA_match = False
return False
# verify the BIP70 signature
pubkey0 = x509_chain[0].publicKey
sig = paymntreq.signature
paymntreq.signature = ''
s = paymntreq.SerializeToString()
sigBytes = bytearray(sig)
msgBytes = bytearray(s)
if paymntreq.pki_type == "x509+sha256":
hashBytes = bytearray(hashlib.sha256(msgBytes).digest())
verify = pubkey0.verify(sigBytes, x509.PREFIX_RSA_SHA256 + hashBytes)
@ -203,28 +212,11 @@ class PaymentRequest:
else:
self.error = "ERROR: Unsupported PKI Type for Message Signature"
return False
if not verify:
self.error = "ERROR: Invalid Signature for Payment Request Data"
return False
### SIG Verified
self.details = pay_det = pb2.PaymentDetails()
self.details.ParseFromString(paymntreq.serialized_payment_details)
for o in pay_det.outputs:
addr = transaction.get_address_from_output_script(o.script)[1]
self.outputs.append( ('address', addr, o.amount) )
self.memo = self.details.memo
if CA_match:
self.status = 'Signed by Trusted CA:\n' + CA_OU
else:
self.status = "Supplied CA Not Found in Trusted CA Store."
self.payment_url = self.details.payment_url
self.status = 'Signed by Trusted CA:\n' + CA_OU
return True
def has_expired(self):

21
lib/x509.py

@ -75,6 +75,27 @@ class X509(tlslite.X509):
self.subject = self.tbs.getComponentByName('subject')
self.extensions = self.tbs.getComponentByName('extensions') or []
def get_issuer(self):
results = {'CN': None, 'OU': None,}
issuer = self.tbs.getComponentByName('issuer')
# Extract the CommonName(s) from the cert.
for rdnss in issuer:
for rdns in rdnss:
for name in rdns:
oid = name.getComponentByName('type')
value = name.getComponentByName('value')
if oid == COMMON_NAME:
value = decoder.decode(value, asn1Spec=DirectoryString())[0]
value = decode_str(value.getComponent())
results['CN'] = value
elif oid == OU_NAME:
value = decoder.decode(value, asn1Spec=DirectoryString())[0]
value = decode_str(value.getComponent())
results['OU'] = value
return results
def extract_names(self):
results = {'CN': None,
'DNS': set(),

Loading…
Cancel
Save