Home | History | Annotate | Line # | Download | only in nsec3-answer
      1 #!/usr/bin/python3
      2 
      3 # Copyright (C) Internet Systems Consortium, Inc. ("ISC")
      4 #
      5 # SPDX-License-Identifier: MPL-2.0
      6 #
      7 # This Source Code Form is subject to the terms of the Mozilla Public
      8 # License, v. 2.0.  If a copy of the MPL was not distributed with this
      9 # file, you can obtain one at https://mozilla.org/MPL/2.0/.
     10 #
     11 # See the COPYRIGHT file distributed with this work for additional
     12 # information regarding copyright ownership.
     13 
     14 from dataclasses import dataclass
     15 import os
     16 from pathlib import Path
     17 from typing import Container, Iterable, Optional, Set, Tuple
     18 
     19 import pytest
     20 
     21 pytest.importorskip("dns", minversion="2.5.0")
     22 import dns.dnssec
     23 import dns.message
     24 import dns.name
     25 import dns.query
     26 import dns.rcode
     27 import dns.rdataclass
     28 import dns.rdatatype
     29 import dns.rdtypes.ANY.RRSIG
     30 import dns.rdtypes.ANY.NSEC3
     31 import dns.rrset
     32 
     33 from isctest.hypothesis.strategies import dns_names, sampled_from
     34 import isctest
     35 import isctest.name
     36 
     37 from hypothesis import assume, given
     38 
     39 SUFFIX = dns.name.from_text(".")
     40 AUTH = "10.53.0.1"
     41 RESOLVER = "10.53.0.2"
     42 TIMEOUT = 5
     43 ZONE = isctest.name.ZoneAnalyzer.read_path(
     44     Path(os.environ["srcdir"]) / "nsec3-answer/ns1/root.db.in", origin=SUFFIX
     45 )
     46 
     47 
     48 def is_related_to_any(
     49     test_name: dns.name.Name,
     50     acceptable_relations: Container[dns.name.NameRelation],
     51     candidates: Iterable[dns.name.Name],
     52 ) -> bool:
     53     for maybe_parent in candidates:
     54         relation, _, _ = test_name.fullcompare(maybe_parent)
     55         if relation in acceptable_relations:
     56             return True
     57     return False
     58 
     59 
     60 def do_test_query(
     61     qname: dns.name.Name, qtype: dns.rdatatype.RdataType, server: str, named_port: int
     62 ) -> Tuple[dns.message.QueryMessage, "NSEC3Checker"]:
     63     query = dns.message.make_query(qname, qtype, use_edns=True, want_dnssec=True)
     64     response = isctest.query.tcp(query, server, named_port, timeout=TIMEOUT)
     65     isctest.check.is_response_to(response, query)
     66     assert response.rcode() in (dns.rcode.NOERROR, dns.rcode.NXDOMAIN)
     67     return response, NSEC3Checker(response)
     68 
     69 
     70 @pytest.mark.parametrize(
     71     "server", [pytest.param(AUTH, id="ns1"), pytest.param(RESOLVER, id="ns2")]
     72 )
     73 @given(
     74     qname=sampled_from(
     75         sorted(ZONE.reachable - ZONE.get_names_with_type(dns.rdatatype.CNAME))
     76     )
     77 )
     78 def test_nodata(server: str, qname: dns.name.Name, named_port: int) -> None:
     79     """An existing name, no wildcards, but a query type for RRset which does not exist"""
     80     _, nsec3check = do_test_query(qname, dns.rdatatype.HINFO, server, named_port)
     81     check_nodata(qname, nsec3check)
     82 
     83 
     84 @pytest.mark.parametrize("server", [pytest.param(AUTH, id="ns1")])
     85 @given(
     86     qname=dns_names(
     87         suffix=(ZONE.delegations - ZONE.get_names_with_type(dns.rdatatype.DS))
     88     )
     89 )
     90 def test_nodata_ds(server: str, qname: dns.name.Name, named_port: int) -> None:
     91     """Auth sends proof of nonexistance with referral without DS RR. Opt-out is not supported."""
     92     response, nsec3check = do_test_query(qname, dns.rdatatype.HINFO, server, named_port)
     93 
     94     nsrr = None
     95     for rrset in response.authority:
     96         if rrset.rdtype == dns.rdatatype.NS:
     97             nsrr = rrset
     98             break
     99     assert nsrr is not None, "NS RRset missing in delegation answer"
    100 
    101     # DS RR does not exist so we must prove it by having NSEC3 with QNAME
    102     check_nodata(nsrr.name, nsec3check)
    103 
    104 
    105 def check_nodata(name: dns.name.Name, nsec3check: "NSEC3Checker") -> None:
    106     assert nsec3check.response.rcode() is dns.rcode.NOERROR
    107 
    108     nsec3check.prove_name_exists(name)
    109     nsec3check.check_extraneous_rrs()
    110 
    111 
    112 def assume_nx_and_no_delegation(qname: dns.name.Name) -> None:
    113     assume(qname not in ZONE.all_existing_names)
    114 
    115     # name must not be under a delegation or DNAME:
    116     # it would not work with resolver ns2
    117     assume(
    118         not is_related_to_any(
    119             qname,
    120             (dns.name.NameRelation.EQUAL, dns.name.NameRelation.SUBDOMAIN),
    121             ZONE.reachable_delegations.union(ZONE.reachable_dnames),
    122         )
    123     )
    124 
    125 
    126 @pytest.mark.parametrize(
    127     "server", [pytest.param(AUTH, id="ns1"), pytest.param(RESOLVER, id="ns2")]
    128 )
    129 @given(qname=dns_names(suffix=SUFFIX))
    130 def test_nxdomain(server: str, qname: dns.name.Name, named_port: int) -> None:
    131     """A real NXDOMAIN, no wildcards involved"""
    132     assume_nx_and_no_delegation(qname)
    133     wname = ZONE.source_of_synthesis(qname)
    134     assume(wname not in ZONE.reachable_wildcards)
    135 
    136     _, nsec3check = do_test_query(qname, dns.rdatatype.A, server, named_port)
    137     check_nxdomain(qname, nsec3check)
    138 
    139 
    140 @pytest.mark.parametrize(
    141     "server", [pytest.param(AUTH, id="ns1"), pytest.param(RESOLVER, id="ns2")]
    142 )
    143 @given(qname=sampled_from(sorted(ZONE.get_names_with_type(dns.rdatatype.CNAME))))
    144 def test_cname_nxdomain(server: str, qname: dns.name.Name, named_port: int) -> None:
    145     """CNAME which terminates by NXDOMAIN, no wildcards involved"""
    146     response, nsec3check = do_test_query(qname, dns.rdatatype.A, server, named_port)
    147     chain = response.resolve_chaining()
    148     assume_nx_and_no_delegation(chain.canonical_name)
    149 
    150     wname = ZONE.source_of_synthesis(chain.canonical_name)
    151     assume(wname not in ZONE.reachable_wildcards)
    152 
    153     check_nxdomain(chain.canonical_name, nsec3check)
    154 
    155 
    156 @pytest.mark.parametrize(
    157     "server", [pytest.param(AUTH, id="ns1"), pytest.param(RESOLVER, id="ns2")]
    158 )
    159 @given(qname=dns_names(suffix=ZONE.get_names_with_type(dns.rdatatype.DNAME)))
    160 def test_dname_nxdomain(server: str, qname: dns.name.Name, named_port: int) -> None:
    161     """DNAME which terminates by NXDOMAIN, no wildcards involved"""
    162     assume(qname not in ZONE.reachable)
    163 
    164     response, nsec3check = do_test_query(qname, dns.rdatatype.A, server, named_port)
    165     chain = response.resolve_chaining()
    166     assume_nx_and_no_delegation(chain.canonical_name)
    167 
    168     wname = ZONE.source_of_synthesis(chain.canonical_name)
    169     assume(wname not in ZONE.reachable_wildcards)
    170 
    171     check_nxdomain(chain.canonical_name, nsec3check)
    172 
    173 
    174 @pytest.mark.parametrize(
    175     "server", [pytest.param(AUTH, id="ns1"), pytest.param(RESOLVER, id="ns2")]
    176 )
    177 @given(qname=dns_names(suffix=ZONE.ents))
    178 def test_ents(server: str, qname: dns.name.Name, named_port: int) -> None:
    179     """ENT can have a wildcard under it"""
    180     assume_nx_and_no_delegation(qname)
    181 
    182     _, nsec3check = do_test_query(qname, dns.rdatatype.A, server, named_port)
    183 
    184     wname = ZONE.source_of_synthesis(qname)
    185     # does qname match a wildcard under ENT?
    186     if wname in ZONE.reachable_wildcards:
    187         check_wildcard_synthesis(qname, nsec3check)
    188     else:
    189         check_nxdomain(qname, nsec3check)
    190 
    191 
    192 @pytest.mark.parametrize(
    193     "server", [pytest.param(AUTH, id="ns1"), pytest.param(RESOLVER, id="ns2")]
    194 )
    195 @given(qname=dns_names(suffix=ZONE.reachable_wildcard_parents))
    196 def test_wildcard_synthesis(server: str, qname: dns.name.Name, named_port: int) -> None:
    197     assume(qname not in ZONE.all_existing_names)
    198 
    199     wname = ZONE.source_of_synthesis(qname)
    200     assume(wname in ZONE.reachable_wildcards)
    201 
    202     _, nsec3check = do_test_query(qname, dns.rdatatype.A, server, named_port)
    203     check_wildcard_synthesis(qname, nsec3check)
    204 
    205 
    206 @pytest.mark.parametrize(
    207     "server", [pytest.param(AUTH, id="ns1"), pytest.param(RESOLVER, id="ns2")]
    208 )
    209 @given(qname=dns_names(suffix=ZONE.reachable_wildcard_parents))
    210 def test_wildcard_nodata(server: str, qname: dns.name.Name, named_port: int) -> None:
    211     assume(qname not in ZONE.all_existing_names)
    212 
    213     wname = ZONE.source_of_synthesis(qname)
    214     assume(wname in ZONE.reachable_wildcards)
    215 
    216     _, nsec3check = do_test_query(qname, dns.rdatatype.AAAA, server, named_port)
    217     check_wildcard_nodata(qname, nsec3check)
    218 
    219 
    220 def check_wildcard_nodata(qname: dns.name.Name, nsec3check: "NSEC3Checker") -> None:
    221     assert nsec3check.response.rcode() is dns.rcode.NOERROR
    222 
    223     ce, nce = ZONE.closest_encloser(qname)
    224     nsec3check.prove_name_exists(ce)
    225     nsec3check.prove_name_does_not_exist(nce)
    226 
    227     wname = ZONE.source_of_synthesis(qname)
    228     # expecting proof that wildcard owner does not have rdatatype requested
    229     nsec3check.prove_name_exists(wname)
    230     nsec3check.check_extraneous_rrs()
    231 
    232 
    233 def check_nxdomain(qname: dns.name.Name, nsec3check: "NSEC3Checker") -> None:
    234     assert nsec3check.response.rcode() is dns.rcode.NXDOMAIN
    235 
    236     ce, nce = ZONE.closest_encloser(qname)
    237     nsec3check.prove_name_exists(ce)
    238     nsec3check.prove_name_does_not_exist(nce)
    239 
    240     wname = ZONE.source_of_synthesis(qname)
    241     nsec3check.prove_name_does_not_exist(wname)
    242     nsec3check.check_extraneous_rrs()
    243 
    244 
    245 def check_wildcard_synthesis(qname: dns.name.Name, nsec3check: "NSEC3Checker") -> None:
    246     """Expect wildcard response with a signed A RRset"""
    247     assert nsec3check.response.rcode() is dns.rcode.NOERROR
    248 
    249     answer_sig = nsec3check.response.get_rrset(
    250         section="ANSWER",
    251         name=qname,
    252         rdclass=dns.rdataclass.IN,
    253         rdtype=dns.rdatatype.RRSIG,
    254         covers=dns.rdatatype.A,
    255     )
    256     assert answer_sig is not None
    257     assert len(answer_sig) == 1
    258     rrsig = answer_sig[0]
    259     assert isinstance(rrsig, dns.rdtypes.ANY.RRSIG.RRSIG)
    260     # RRSIG labels field RFC 4034 section 3.1.3 does not count:
    261     # - root label
    262     # - leftmost * label
    263     wildcard_parent_labels = rrsig.labels + 1  # add root but not leftmost *
    264     assert wildcard_parent_labels < len(qname)
    265 
    266     # 1. We have RRSIG from the wildcard '*.something', which proves the node
    267     # 'something' exists (by definition - it has a child, so it exists, but
    268     # maybe it is an ENT). Thus we expect closest encloser = 'something'
    269     # 2. If wildcard synthesis is legitimate, QNAME itself and no nodes between
    270     # QNAME and the closest encloser can exist. Because of DNS node existence
    271     # rules it's sufficient to prove non-existence of next-closer name, i.e.
    272     # <one_label_under>.<closest_encloser>, to deny existence of the whole
    273     # subtree down to QNAME.
    274 
    275     ce, nce = ZONE.closest_encloser(qname)
    276     assert ce == qname.split(wildcard_parent_labels)[1]
    277     # ce is proven to exist by the RRSIG
    278     assert nce == qname.split(wildcard_parent_labels + 1)[1]
    279     nsec3check.prove_name_does_not_exist(nce)
    280     nsec3check.check_extraneous_rrs()
    281 
    282 
    283 @dataclass(frozen=True)
    284 class NSEC3Params:
    285     """Common values from a single DNS response"""
    286 
    287     algorithm: int
    288     flags: int
    289     iterations: int
    290     salt: Optional[bytes]
    291 
    292 
    293 class NSEC3Checker:
    294     def __init__(self, response: dns.message.Message):
    295         for rrset in response.answer:
    296             assert not rrset.match(
    297                 dns.rdataclass.IN, dns.rdatatype.NSEC3, dns.rdatatype.NONE
    298             ), f"unexpected NSEC3 RR in ANSWER section:\n{response}"
    299         for rrset in response.additional:
    300             assert not rrset.match(
    301                 dns.rdataclass.IN, dns.rdatatype.NSEC3, dns.rdatatype.NONE
    302             ), f"unexpected NSEC3 RR in ADDITIONAL section:\n{response}"
    303 
    304         attrs_seen = {
    305             "algorithm": None,
    306             "flags": None,
    307             "iterations": None,
    308             "salt": None,
    309         }
    310         first = True
    311         owners_seen = set()
    312         self.rrsets = []
    313         for rrset in response.authority:
    314             if not rrset.match(
    315                 dns.rdataclass.IN, dns.rdatatype.NSEC3, dns.rdatatype.NONE
    316             ):
    317                 continue
    318             assert (
    319                 rrset.name not in owners_seen
    320             ), f"duplicate NSEC3 owner {rrset.name}:\n{response}"
    321             owners_seen.add(rrset.name)
    322 
    323             assert len(rrset) == 1
    324             rr = rrset[0]
    325             assert isinstance(rr, dns.rdtypes.ANY.NSEC3.NSEC3)
    326 
    327             assert (
    328                 "NSEC3"
    329                 not in dns.rdtypes.ANY.NSEC3.Bitmap(rr.windows).to_text().split()
    330             ), f"NSEC3 RRset with NSEC3 in type bitmap:\n{response}"
    331 
    332             # NSEC3 parameters MUST be consistent across all NSEC3 RRs:
    333             # RFC 5155 section 7.2, last paragraph
    334             for attr_name, value_seen in attrs_seen.items():
    335                 current = getattr(rr, attr_name)
    336                 if first:
    337                     attrs_seen[attr_name] = current
    338                 else:
    339                     assert (
    340                         current == value_seen
    341                     ), f"inconsistent {attr_name}\n{response}"
    342             first = False
    343             self.rrsets.append(rrset)
    344 
    345         assert attrs_seen["algorithm"] is not None, f"no NSEC3 found\n{response}"
    346         self.params: NSEC3Params = NSEC3Params(**attrs_seen)
    347         self.response: dns.message.Message = response
    348         self.owners_present: Set[dns.name.Name] = owners_seen
    349         self.owners_used: Set[dns.name.Name] = set()
    350 
    351     @staticmethod
    352     def nsec3_covers(rrset: dns.rrset.RRset, hashed_name: dns.name.Name) -> bool:
    353         """
    354         Test if 'hashed_name' is covered by an NSEC3 record in 'rrset', i.e. the name does not exist.
    355         """
    356         prev_name = rrset.name
    357 
    358         assert len(rrset) == 1
    359         nsec3 = rrset[0]
    360         assert isinstance(nsec3, dns.rdtypes.ANY.NSEC3.NSEC3)
    361         assert nsec3.flags == 0, "opt-out not supported by test logic"
    362         next_name = nsec3.next_name(SUFFIX)
    363 
    364         # Single name case.
    365         if prev_name == next_name:
    366             return prev_name != hashed_name
    367 
    368         # Standard case.
    369         if prev_name < next_name:
    370             if prev_name < hashed_name < next_name:
    371                 return True
    372 
    373         # The cover wraps.
    374         if next_name < prev_name:
    375             # Case 1: The covered name is at the end of the chain.
    376             if hashed_name > prev_name:
    377                 return True
    378             # Case 2: The covered name is at the start of the chain.
    379             if hashed_name < next_name:
    380                 return True
    381         return False
    382 
    383     def hash_name(self, name: dns.name.Name) -> dns.name.Name:
    384         nhash = dns.dnssec.nsec3_hash(
    385             name,
    386             salt=self.params.salt,
    387             iterations=self.params.iterations,
    388             algorithm=self.params.algorithm,
    389         )
    390         return dns.name.from_text(nhash, SUFFIX)
    391 
    392     def prove_name_does_not_exist(self, name: dns.name.Name) -> dns.rrset.RRset:
    393         """Hash of a given name must fall between an NSEC3 owner and 'next' name"""
    394         hashed_name = self.hash_name(name)
    395         for rrset in self.rrsets:
    396             name_is_covered = self.nsec3_covers(rrset, hashed_name)
    397             if name_is_covered:
    398                 self.owners_used.add(rrset.name)
    399                 return rrset
    400 
    401         assert (
    402             False
    403         ), f"Expected covering NSEC3 for {name} (hash={hashed_name}) not found:\n{self.response}"
    404 
    405     def prove_name_exists(self, owner: dns.name.Name) -> dns.rrset.RRset:
    406         """Check response has NSEC3 RR matching given owner name, i.e. the name exists."""
    407         nsec3_owner = self.hash_name(owner)
    408         for rrset in self.rrsets:
    409             if rrset.match(
    410                 nsec3_owner, dns.rdataclass.IN, dns.rdatatype.NSEC3, dns.rdatatype.NONE
    411             ):
    412                 self.owners_used.add(rrset.name)
    413                 return rrset
    414         assert (
    415             False
    416         ), f"Expected matching NSEC3 for {owner} (hash={nsec3_owner}) not found:\n{self.response}"
    417 
    418     def check_extraneous_rrs(self) -> None:
    419         """Check that all NSEC3 RRs present in the message were actually needed for proofs"""
    420         assert (
    421             self.owners_used == self.owners_present
    422         ), f"extraneous NSEC3 RRs detected\n{self.response}"
    423