From 094ce2e4b391c187ba38b3706fbe8b0340790605 Mon Sep 17 00:00:00 2001 From: ThomasV Date: Wed, 8 Jul 2015 19:20:54 +0200 Subject: [PATCH] add DNSSEC chain validation --- lib/contacts.py | 62 +++++++----------------- lib/dnssec.py | 123 ++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 139 insertions(+), 46 deletions(-) create mode 100644 lib/dnssec.py diff --git a/lib/contacts.py b/lib/contacts.py index ac72ba4d2..da5105d17 100644 --- a/lib/contacts.py +++ b/lib/contacts.py @@ -1,12 +1,13 @@ import sys import re import dns -import traceback import bitcoin +import dnssec from util import StoreDict, print_error from i18n import _ + class Contacts(StoreDict): def __init__(self, config): @@ -18,7 +19,6 @@ class Contacts(StoreDict): 'address': k, 'type': 'address' } - if k in self.keys(): _type, addr = self[k] if _type == 'address': @@ -26,62 +26,32 @@ class Contacts(StoreDict): 'address': addr, 'type': 'contact' } - out = self.resolve_openalias(k) if out: - address, name = out - validated = False + address, name, validated = out return { 'address': address, 'name': name, 'type': 'openalias', 'validated': validated } - raise Exception("Invalid Bitcoin address or alias", k) def resolve_openalias(self, url): - '''Resolve OpenAlias address using url.''' - print_error('[OA] Attempting to resolve OpenAlias data for ' + url) - - url = url.replace('@', '.') # support email-style addresses, per the OA standard + # support email-style addresses, per the OA standard + url = url.replace('@', '.') + records, validated = dnssec.query(url, dns.rdatatype.TXT) prefix = 'btc' - retries = 3 - err = None - for i in range(0, retries): - try: - resolver = dns.resolver.Resolver() - resolver.timeout = 2.0 - resolver.lifetime = 4.0 - records = resolver.query(url, dns.rdatatype.TXT) - for record in records: - string = record.strings[0] - if string.startswith('oa1:' + prefix): - address = self.find_regex(string, r'recipient_address=([A-Za-z0-9]+)') - name = self.find_regex(string, r'recipient_name=([^;]+)') - if not name: - name = address - if not address: - continue - return (address, name) - err = _('No OpenAlias record found.') - break - except dns.resolver.NXDOMAIN: - err = _('No such domain.') - continue - except dns.resolver.Timeout: - err = _('Timed out while resolving.') - continue - except DNSException: - err = _('Unhandled exception.') - continue - except Exception, e: - err = _('Unexpected error: ' + str(e)) - continue - break - if err: - print_error(err) - return 0 + for record in records: + string = record.strings[0] + if string.startswith('oa1:' + prefix): + address = self.find_regex(string, r'recipient_address=([A-Za-z0-9]+)') + name = self.find_regex(string, r'recipient_name=([^;]+)') + if not name: + name = address + if not address: + continue + return address, name, validated def find_regex(self, haystack, needle): regex = re.compile(needle) diff --git a/lib/dnssec.py b/lib/dnssec.py new file mode 100644 index 000000000..be47a1831 --- /dev/null +++ b/lib/dnssec.py @@ -0,0 +1,123 @@ +#!/usr/bin/env python +# +# Electrum - lightweight Bitcoin client +# Copyright (C) 2015 Thomas Voegtlin +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + + + +# Check DNSSEC trust chain. +# Todo: verify expiration dates +# +# Based on +# http://backreference.org/2010/11/17/dnssec-verification-with-dig/ +# https://github.com/rthalley/dnspython/blob/master/tests/test_dnssec.py + + +import traceback +import sys + +import dns.name +import dns.query +import dns.dnssec +import dns.message +import dns.resolver +import dns.rdatatype +import dns.rdtypes.ANY.NS +import dns.rdtypes.ANY.CNAME +import dns.rdtypes.ANY.DLV +import dns.rdtypes.ANY.DNSKEY +import dns.rdtypes.ANY.DS +import dns.rdtypes.ANY.NSEC +import dns.rdtypes.ANY.NSEC3 +import dns.rdtypes.ANY.NSEC3PARAM +import dns.rdtypes.ANY.RRSIG +import dns.rdtypes.ANY.SOA +import dns.rdtypes.ANY.TXT +import dns.rdtypes.IN.A +import dns.rdtypes.IN.AAAA +from dns.exception import DNSException + + +from util import print_error + + +# hard-coded root KSK +root_KSK = dns.rrset.from_text('.', 15202, 'IN', 'DNSKEY', '257 3 8 AwEAAagAIKlVZrpC6Ia7gEzahOR+9W29euxhJhVVLOyQbSEW0O8gcCjF FVQUTf6v58fLjwBd0YI0EzrAcQqBGCzh/RStIoO8g0NfnfL2MTJRkxoX bfDaUeVPQuYEhg37NZWAJQ9VnMVDxP/VHL496M/QZxkjf5/Efucp2gaD X6RS6CXpoY68LsvPVjR0ZSwzz1apAzvN9dlzEheX7ICJBBtuA6G3LQpz W5hOA2hzCTMjJPJ8LbqF6dsV6DoBQzgul0sGIcGOYl7OyQdXfZ57relS Qageu+ipAdTTJ25AsRTAoub8ONGcLmqrAmRLKBP1dfwhYB4N7knNnulq QxA+Uk1ihz0=') + + + +def check_query(ns, sub, _type, keys): + q = dns.message.make_query(sub, _type, want_dnssec=True) + response = dns.query.tcp(q, ns, timeout=5) + assert response.rcode() == 0, 'No answer' + answer = response.answer + assert len(answer) == 2, 'No DNSSEC record found' + if answer[0].rdtype == dns.rdatatype.RRSIG: + rrsig, rrset = answer + else: + rrset, rrsig = answer + if keys is None: + keys = {dns.name.from_text(sub):rrset} + dns.dnssec.validate(rrset, rrsig, keys) + return rrset + + +def get_and_validate(ns, url, _type): + # get trusted root keys + root_rrset = check_query(ns, '', dns.rdatatype.DNSKEY, {dns.name.root: root_KSK}) + keys = {dns.name.root: root_rrset} + # top-down verification + parts = url.split('.') + for i in range(len(parts), 0, -1): + sub = '.'.join(parts[i-1:]) + name = dns.name.from_text(sub) + # get DNSKEY (self-signed) + rrset = check_query(ns, sub, dns.rdatatype.DNSKEY, None) + # get DS (signed by parent) + ds_rrset = check_query(ns, sub, dns.rdatatype.DS, keys) + # verify that a signed DS validates DNSKEY + for ds in ds_rrset: + for dnskey in rrset: + good_ds = dns.dnssec.make_ds(name, dnskey, 'SHA256') + if ds == good_ds: + break + else: + continue + break + else: + print ds_rrset + raise BaseException("DS does not match DNSKEY") + # set key for next iteration + keys = {name: rrset} + # get TXT record (signed by zone) + rrset = check_query(ns, url, _type, keys) + return rrset + + +def query(url, rtype): + resolver = dns.resolver.get_default_resolver() + # 8.8.8.8 is Google's public DNS server + resolver.nameservers = ['8.8.8.8'] + ns = resolver.nameservers[0] + try: + out = get_and_validate(ns, url, rtype) + validated = True + except BaseException as e: + #traceback.print_exc(file=sys.stderr) + print_error("DNSSEC error:", str(e)) + out = resolver.query(url, rtype) + validated = False + return out, validated