Browse Source

temporary_path for unverified certificates

283
ThomasV 11 years ago
parent
commit
36b61fccfd
  1. 46
      lib/interface.py

46
lib/interface.py

@ -31,6 +31,21 @@ DEFAULT_TIMEOUT = 5
proxy_modes = ['socks4', 'socks5', 'http'] proxy_modes = ['socks4', 'socks5', 'http']
def is_expired(cert):
from OpenSSL import crypto as c
_cert = c.load_certificate(c.FILETYPE_PEM, cert)
notAfter = _cert.get_notAfter()
notBefore = _cert.get_notBefore()
now = time.time()
if now > time.mktime( time.strptime(notAfter[:-1] + "GMT", "%Y%m%d%H%M%S%Z") ):
print "deprecated cert", self.host, notAfter
return True
if now < time.mktime( time.strptime(notBefore[:-1] + "GMT", "%Y%m%d%H%M%S%Z") ):
print "notbefore", self.host, notBefore
return True
return False
class Interface(threading.Thread): class Interface(threading.Thread):
@ -266,7 +281,7 @@ class Interface(threading.Thread):
try: try:
s.connect((self.host, self.port)) s.connect((self.host, self.port))
except: except:
print_error("failed to connect", self.host, self.port) # print_error("failed to connect", self.host, self.port)
return return
s = ssl.wrap_socket(s, ssl_version=ssl.PROTOCOL_SSLv3, cert_reqs=ssl.CERT_NONE, ca_certs=None) s = ssl.wrap_socket(s, ssl_version=ssl.PROTOCOL_SSLv3, cert_reqs=ssl.CERT_NONE, ca_certs=None)
@ -274,20 +289,8 @@ class Interface(threading.Thread):
s.close() s.close()
cert = ssl.DER_cert_to_PEM_cert(dercert) cert = ssl.DER_cert_to_PEM_cert(dercert)
from OpenSSL import crypto as c temporary_path = cert_path + '.temp'
_cert = c.load_certificate(c.FILETYPE_PEM, cert) with open(temporary_path,"w") as f:
notAfter = _cert.get_notAfter()
notBefore = _cert.get_notBefore()
now = time.time()
if now > time.mktime( time.strptime(notAfter[:-1] + "GMT", "%Y%m%d%H%M%S%Z") ):
print "deprecated cert", self.host, notAfter
return
if now < time.mktime( time.strptime(notBefore[:-1] + "GMT", "%Y%m%d%H%M%S%Z") ):
print "notbefore", self.host, notBefore
return
with open(cert_path,"w") as f:
print_error("saving certificate for",self.host)
f.write(cert) f.write(cert)
else: else:
is_new = False is_new = False
@ -308,19 +311,24 @@ class Interface(threading.Thread):
s = ssl.wrap_socket(s, s = ssl.wrap_socket(s,
ssl_version=ssl.PROTOCOL_SSLv3, ssl_version=ssl.PROTOCOL_SSLv3,
cert_reqs=ssl.CERT_REQUIRED, cert_reqs=ssl.CERT_REQUIRED,
ca_certs=cert_path, ca_certs= (temporary_path if is_new else cert_path),
do_handshake_on_connect=True) do_handshake_on_connect=True)
except ssl.SSLError, e: except ssl.SSLError, e:
print_error("SSL error:", self.host, e) print_error("SSL error:", self.host, e)
# delete the certificate so we will download a new one
if is_new: if is_new:
os.unlink(cert_path) os.unlink(temporary_path)
return return
except: except:
traceback.print_exc(file=sys.stdout)
print_error("wrap_socket failed", self.host) print_error("wrap_socket failed", self.host)
traceback.print_exc(file=sys.stdout)
if is_new:
os.unlink(temporary_path)
return return
if is_new:
print_error("saving certificate for", self.host)
os.rename(temporary_path, cert_path)
# hostname verification (disabled) # hostname verification (disabled)
#from backports.ssl_match_hostname import match_hostname, CertificateError #from backports.ssl_match_hostname import match_hostname, CertificateError
#try: #try:

Loading…
Cancel
Save