Browse Source

Merge pull request #3265 from SomberNight/interface_tls_1_2

interface ssl: besides TLS 1.1, also allow later versions
3.0.x
ThomasV 7 years ago
committed by GitHub
parent
commit
568afd7a17
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 26
      lib/interface.py

26
lib/interface.py

@ -124,6 +124,18 @@ class TcpConnection(threading.Thread, util.PrintError):
else:
self.print_error("failed to connect", str(e))
@staticmethod
def get_ssl_context(cert_reqs, ca_certs):
context = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH, cafile=ca_certs)
context.check_hostname = False
context.verify_mode = cert_reqs
context.options |= ssl.OP_NO_SSLv2
context.options |= ssl.OP_NO_SSLv3
context.options |= ssl.OP_NO_TLSv1
return context
def get_socket(self):
if self.use_ssl:
cert_path = os.path.join(self.config_path, 'certs', self.host)
@ -134,7 +146,8 @@ class TcpConnection(threading.Thread, util.PrintError):
return
# try with CA first
try:
s = ssl.wrap_socket(s, ssl_version=ssl.PROTOCOL_TLSv1_1, cert_reqs=ssl.CERT_REQUIRED, ca_certs=ca_path, do_handshake_on_connect=True)
context = self.get_ssl_context(cert_reqs=ssl.CERT_REQUIRED, ca_certs=ca_path)
s = context.wrap_socket(s, do_handshake_on_connect=True)
except ssl.SSLError as e:
print_error(e)
s = None
@ -150,7 +163,8 @@ class TcpConnection(threading.Thread, util.PrintError):
if s is None:
return
try:
s = ssl.wrap_socket(s, ssl_version=ssl.PROTOCOL_TLSv1_1, cert_reqs=ssl.CERT_NONE, ca_certs=None)
context = self.get_ssl_context(cert_reqs=ssl.CERT_NONE, ca_certs=None)
s = context.wrap_socket(s)
except ssl.SSLError as e:
self.print_error("SSL error retrieving SSL certificate:", e)
return
@ -174,11 +188,9 @@ class TcpConnection(threading.Thread, util.PrintError):
if self.use_ssl:
try:
s = ssl.wrap_socket(s,
ssl_version=ssl.PROTOCOL_TLSv1_1,
cert_reqs=ssl.CERT_REQUIRED,
ca_certs=(temporary_path if is_new else cert_path),
do_handshake_on_connect=True)
context = self.get_ssl_context(cert_reqs=ssl.CERT_REQUIRED,
ca_certs=(temporary_path if is_new else cert_path))
s = context.wrap_socket(s, do_handshake_on_connect=True)
except socket.timeout:
self.print_error('timeout')
return

Loading…
Cancel
Save