1 #!/usr/bin/python3 2 3 # Copyright (C) Internet Systems Consortium, Inc. ("ISC") 4 # 5 # SPDX-License-Identifier: MPL-2.0 6 # 7 # This Source Code Form is subject to the terms of the Mozilla Public 8 # License, v. 2.0. If a copy of the MPL was not distributed with this 9 # file, you can obtain one at https://mozilla.org/MPL/2.0/. 10 # 11 # See the COPYRIGHT file distributed with this work for additional 12 # information regarding copyright ownership. 13 14 from dataclasses import dataclass 15 import os 16 from pathlib import Path 17 from typing import Container, Iterable, Optional, Set, Tuple 18 19 import pytest 20 21 pytest.importorskip("dns", minversion="2.5.0") 22 import dns.dnssec 23 import dns.message 24 import dns.name 25 import dns.query 26 import dns.rcode 27 import dns.rdataclass 28 import dns.rdatatype 29 import dns.rdtypes.ANY.RRSIG 30 import dns.rdtypes.ANY.NSEC3 31 import dns.rrset 32 33 from isctest.hypothesis.strategies import dns_names, sampled_from 34 import isctest 35 import isctest.name 36 37 from hypothesis import assume, given 38 39 SUFFIX = dns.name.from_text(".") 40 AUTH = "10.53.0.1" 41 RESOLVER = "10.53.0.2" 42 TIMEOUT = 5 43 ZONE = isctest.name.ZoneAnalyzer.read_path( 44 Path(os.environ["srcdir"]) / "nsec3-answer/ns1/root.db.in", origin=SUFFIX 45 ) 46 47 48 def is_related_to_any( 49 test_name: dns.name.Name, 50 acceptable_relations: Container[dns.name.NameRelation], 51 candidates: Iterable[dns.name.Name], 52 ) -> bool: 53 for maybe_parent in candidates: 54 relation, _, _ = test_name.fullcompare(maybe_parent) 55 if relation in acceptable_relations: 56 return True 57 return False 58 59 60 def do_test_query( 61 qname: dns.name.Name, qtype: dns.rdatatype.RdataType, server: str, named_port: int 62 ) -> Tuple[dns.message.QueryMessage, "NSEC3Checker"]: 63 query = dns.message.make_query(qname, qtype, use_edns=True, want_dnssec=True) 64 response = isctest.query.tcp(query, server, named_port, timeout=TIMEOUT) 65 isctest.check.is_response_to(response, query) 66 assert response.rcode() in (dns.rcode.NOERROR, dns.rcode.NXDOMAIN) 67 return response, NSEC3Checker(response) 68 69 70 @pytest.mark.parametrize( 71 "server", [pytest.param(AUTH, id="ns1"), pytest.param(RESOLVER, id="ns2")] 72 ) 73 @given( 74 qname=sampled_from( 75 sorted(ZONE.reachable - ZONE.get_names_with_type(dns.rdatatype.CNAME)) 76 ) 77 ) 78 def test_nodata(server: str, qname: dns.name.Name, named_port: int) -> None: 79 """An existing name, no wildcards, but a query type for RRset which does not exist""" 80 _, nsec3check = do_test_query(qname, dns.rdatatype.HINFO, server, named_port) 81 check_nodata(qname, nsec3check) 82 83 84 @pytest.mark.parametrize("server", [pytest.param(AUTH, id="ns1")]) 85 @given( 86 qname=dns_names( 87 suffix=(ZONE.delegations - ZONE.get_names_with_type(dns.rdatatype.DS)) 88 ) 89 ) 90 def test_nodata_ds(server: str, qname: dns.name.Name, named_port: int) -> None: 91 """Auth sends proof of nonexistance with referral without DS RR. Opt-out is not supported.""" 92 response, nsec3check = do_test_query(qname, dns.rdatatype.HINFO, server, named_port) 93 94 nsrr = None 95 for rrset in response.authority: 96 if rrset.rdtype == dns.rdatatype.NS: 97 nsrr = rrset 98 break 99 assert nsrr is not None, "NS RRset missing in delegation answer" 100 101 # DS RR does not exist so we must prove it by having NSEC3 with QNAME 102 check_nodata(nsrr.name, nsec3check) 103 104 105 def check_nodata(name: dns.name.Name, nsec3check: "NSEC3Checker") -> None: 106 assert nsec3check.response.rcode() is dns.rcode.NOERROR 107 108 nsec3check.prove_name_exists(name) 109 nsec3check.check_extraneous_rrs() 110 111 112 def assume_nx_and_no_delegation(qname: dns.name.Name) -> None: 113 assume(qname not in ZONE.all_existing_names) 114 115 # name must not be under a delegation or DNAME: 116 # it would not work with resolver ns2 117 assume( 118 not is_related_to_any( 119 qname, 120 (dns.name.NameRelation.EQUAL, dns.name.NameRelation.SUBDOMAIN), 121 ZONE.reachable_delegations.union(ZONE.reachable_dnames), 122 ) 123 ) 124 125 126 @pytest.mark.parametrize( 127 "server", [pytest.param(AUTH, id="ns1"), pytest.param(RESOLVER, id="ns2")] 128 ) 129 @given(qname=dns_names(suffix=SUFFIX)) 130 def test_nxdomain(server: str, qname: dns.name.Name, named_port: int) -> None: 131 """A real NXDOMAIN, no wildcards involved""" 132 assume_nx_and_no_delegation(qname) 133 wname = ZONE.source_of_synthesis(qname) 134 assume(wname not in ZONE.reachable_wildcards) 135 136 _, nsec3check = do_test_query(qname, dns.rdatatype.A, server, named_port) 137 check_nxdomain(qname, nsec3check) 138 139 140 @pytest.mark.parametrize( 141 "server", [pytest.param(AUTH, id="ns1"), pytest.param(RESOLVER, id="ns2")] 142 ) 143 @given(qname=sampled_from(sorted(ZONE.get_names_with_type(dns.rdatatype.CNAME)))) 144 def test_cname_nxdomain(server: str, qname: dns.name.Name, named_port: int) -> None: 145 """CNAME which terminates by NXDOMAIN, no wildcards involved""" 146 response, nsec3check = do_test_query(qname, dns.rdatatype.A, server, named_port) 147 chain = response.resolve_chaining() 148 assume_nx_and_no_delegation(chain.canonical_name) 149 150 wname = ZONE.source_of_synthesis(chain.canonical_name) 151 assume(wname not in ZONE.reachable_wildcards) 152 153 check_nxdomain(chain.canonical_name, nsec3check) 154 155 156 @pytest.mark.parametrize( 157 "server", [pytest.param(AUTH, id="ns1"), pytest.param(RESOLVER, id="ns2")] 158 ) 159 @given(qname=dns_names(suffix=ZONE.get_names_with_type(dns.rdatatype.DNAME))) 160 def test_dname_nxdomain(server: str, qname: dns.name.Name, named_port: int) -> None: 161 """DNAME which terminates by NXDOMAIN, no wildcards involved""" 162 assume(qname not in ZONE.reachable) 163 164 response, nsec3check = do_test_query(qname, dns.rdatatype.A, server, named_port) 165 chain = response.resolve_chaining() 166 assume_nx_and_no_delegation(chain.canonical_name) 167 168 wname = ZONE.source_of_synthesis(chain.canonical_name) 169 assume(wname not in ZONE.reachable_wildcards) 170 171 check_nxdomain(chain.canonical_name, nsec3check) 172 173 174 @pytest.mark.parametrize( 175 "server", [pytest.param(AUTH, id="ns1"), pytest.param(RESOLVER, id="ns2")] 176 ) 177 @given(qname=dns_names(suffix=ZONE.ents)) 178 def test_ents(server: str, qname: dns.name.Name, named_port: int) -> None: 179 """ENT can have a wildcard under it""" 180 assume_nx_and_no_delegation(qname) 181 182 _, nsec3check = do_test_query(qname, dns.rdatatype.A, server, named_port) 183 184 wname = ZONE.source_of_synthesis(qname) 185 # does qname match a wildcard under ENT? 186 if wname in ZONE.reachable_wildcards: 187 check_wildcard_synthesis(qname, nsec3check) 188 else: 189 check_nxdomain(qname, nsec3check) 190 191 192 @pytest.mark.parametrize( 193 "server", [pytest.param(AUTH, id="ns1"), pytest.param(RESOLVER, id="ns2")] 194 ) 195 @given(qname=dns_names(suffix=ZONE.reachable_wildcard_parents)) 196 def test_wildcard_synthesis(server: str, qname: dns.name.Name, named_port: int) -> None: 197 assume(qname not in ZONE.all_existing_names) 198 199 wname = ZONE.source_of_synthesis(qname) 200 assume(wname in ZONE.reachable_wildcards) 201 202 _, nsec3check = do_test_query(qname, dns.rdatatype.A, server, named_port) 203 check_wildcard_synthesis(qname, nsec3check) 204 205 206 @pytest.mark.parametrize( 207 "server", [pytest.param(AUTH, id="ns1"), pytest.param(RESOLVER, id="ns2")] 208 ) 209 @given(qname=dns_names(suffix=ZONE.reachable_wildcard_parents)) 210 def test_wildcard_nodata(server: str, qname: dns.name.Name, named_port: int) -> None: 211 assume(qname not in ZONE.all_existing_names) 212 213 wname = ZONE.source_of_synthesis(qname) 214 assume(wname in ZONE.reachable_wildcards) 215 216 _, nsec3check = do_test_query(qname, dns.rdatatype.AAAA, server, named_port) 217 check_wildcard_nodata(qname, nsec3check) 218 219 220 def check_wildcard_nodata(qname: dns.name.Name, nsec3check: "NSEC3Checker") -> None: 221 assert nsec3check.response.rcode() is dns.rcode.NOERROR 222 223 ce, nce = ZONE.closest_encloser(qname) 224 nsec3check.prove_name_exists(ce) 225 nsec3check.prove_name_does_not_exist(nce) 226 227 wname = ZONE.source_of_synthesis(qname) 228 # expecting proof that wildcard owner does not have rdatatype requested 229 nsec3check.prove_name_exists(wname) 230 nsec3check.check_extraneous_rrs() 231 232 233 def check_nxdomain(qname: dns.name.Name, nsec3check: "NSEC3Checker") -> None: 234 assert nsec3check.response.rcode() is dns.rcode.NXDOMAIN 235 236 ce, nce = ZONE.closest_encloser(qname) 237 nsec3check.prove_name_exists(ce) 238 nsec3check.prove_name_does_not_exist(nce) 239 240 wname = ZONE.source_of_synthesis(qname) 241 nsec3check.prove_name_does_not_exist(wname) 242 nsec3check.check_extraneous_rrs() 243 244 245 def check_wildcard_synthesis(qname: dns.name.Name, nsec3check: "NSEC3Checker") -> None: 246 """Expect wildcard response with a signed A RRset""" 247 assert nsec3check.response.rcode() is dns.rcode.NOERROR 248 249 answer_sig = nsec3check.response.get_rrset( 250 section="ANSWER", 251 name=qname, 252 rdclass=dns.rdataclass.IN, 253 rdtype=dns.rdatatype.RRSIG, 254 covers=dns.rdatatype.A, 255 ) 256 assert answer_sig is not None 257 assert len(answer_sig) == 1 258 rrsig = answer_sig[0] 259 assert isinstance(rrsig, dns.rdtypes.ANY.RRSIG.RRSIG) 260 # RRSIG labels field RFC 4034 section 3.1.3 does not count: 261 # - root label 262 # - leftmost * label 263 wildcard_parent_labels = rrsig.labels + 1 # add root but not leftmost * 264 assert wildcard_parent_labels < len(qname) 265 266 # 1. We have RRSIG from the wildcard '*.something', which proves the node 267 # 'something' exists (by definition - it has a child, so it exists, but 268 # maybe it is an ENT). Thus we expect closest encloser = 'something' 269 # 2. If wildcard synthesis is legitimate, QNAME itself and no nodes between 270 # QNAME and the closest encloser can exist. Because of DNS node existence 271 # rules it's sufficient to prove non-existence of next-closer name, i.e. 272 # <one_label_under>.<closest_encloser>, to deny existence of the whole 273 # subtree down to QNAME. 274 275 ce, nce = ZONE.closest_encloser(qname) 276 assert ce == qname.split(wildcard_parent_labels)[1] 277 # ce is proven to exist by the RRSIG 278 assert nce == qname.split(wildcard_parent_labels + 1)[1] 279 nsec3check.prove_name_does_not_exist(nce) 280 nsec3check.check_extraneous_rrs() 281 282 283 @dataclass(frozen=True) 284 class NSEC3Params: 285 """Common values from a single DNS response""" 286 287 algorithm: int 288 flags: int 289 iterations: int 290 salt: Optional[bytes] 291 292 293 class NSEC3Checker: 294 def __init__(self, response: dns.message.Message): 295 for rrset in response.answer: 296 assert not rrset.match( 297 dns.rdataclass.IN, dns.rdatatype.NSEC3, dns.rdatatype.NONE 298 ), f"unexpected NSEC3 RR in ANSWER section:\n{response}" 299 for rrset in response.additional: 300 assert not rrset.match( 301 dns.rdataclass.IN, dns.rdatatype.NSEC3, dns.rdatatype.NONE 302 ), f"unexpected NSEC3 RR in ADDITIONAL section:\n{response}" 303 304 attrs_seen = { 305 "algorithm": None, 306 "flags": None, 307 "iterations": None, 308 "salt": None, 309 } 310 first = True 311 owners_seen = set() 312 self.rrsets = [] 313 for rrset in response.authority: 314 if not rrset.match( 315 dns.rdataclass.IN, dns.rdatatype.NSEC3, dns.rdatatype.NONE 316 ): 317 continue 318 assert ( 319 rrset.name not in owners_seen 320 ), f"duplicate NSEC3 owner {rrset.name}:\n{response}" 321 owners_seen.add(rrset.name) 322 323 assert len(rrset) == 1 324 rr = rrset[0] 325 assert isinstance(rr, dns.rdtypes.ANY.NSEC3.NSEC3) 326 327 assert ( 328 "NSEC3" 329 not in dns.rdtypes.ANY.NSEC3.Bitmap(rr.windows).to_text().split() 330 ), f"NSEC3 RRset with NSEC3 in type bitmap:\n{response}" 331 332 # NSEC3 parameters MUST be consistent across all NSEC3 RRs: 333 # RFC 5155 section 7.2, last paragraph 334 for attr_name, value_seen in attrs_seen.items(): 335 current = getattr(rr, attr_name) 336 if first: 337 attrs_seen[attr_name] = current 338 else: 339 assert ( 340 current == value_seen 341 ), f"inconsistent {attr_name}\n{response}" 342 first = False 343 self.rrsets.append(rrset) 344 345 assert attrs_seen["algorithm"] is not None, f"no NSEC3 found\n{response}" 346 self.params: NSEC3Params = NSEC3Params(**attrs_seen) 347 self.response: dns.message.Message = response 348 self.owners_present: Set[dns.name.Name] = owners_seen 349 self.owners_used: Set[dns.name.Name] = set() 350 351 @staticmethod 352 def nsec3_covers(rrset: dns.rrset.RRset, hashed_name: dns.name.Name) -> bool: 353 """ 354 Test if 'hashed_name' is covered by an NSEC3 record in 'rrset', i.e. the name does not exist. 355 """ 356 prev_name = rrset.name 357 358 assert len(rrset) == 1 359 nsec3 = rrset[0] 360 assert isinstance(nsec3, dns.rdtypes.ANY.NSEC3.NSEC3) 361 assert nsec3.flags == 0, "opt-out not supported by test logic" 362 next_name = nsec3.next_name(SUFFIX) 363 364 # Single name case. 365 if prev_name == next_name: 366 return prev_name != hashed_name 367 368 # Standard case. 369 if prev_name < next_name: 370 if prev_name < hashed_name < next_name: 371 return True 372 373 # The cover wraps. 374 if next_name < prev_name: 375 # Case 1: The covered name is at the end of the chain. 376 if hashed_name > prev_name: 377 return True 378 # Case 2: The covered name is at the start of the chain. 379 if hashed_name < next_name: 380 return True 381 return False 382 383 def hash_name(self, name: dns.name.Name) -> dns.name.Name: 384 nhash = dns.dnssec.nsec3_hash( 385 name, 386 salt=self.params.salt, 387 iterations=self.params.iterations, 388 algorithm=self.params.algorithm, 389 ) 390 return dns.name.from_text(nhash, SUFFIX) 391 392 def prove_name_does_not_exist(self, name: dns.name.Name) -> dns.rrset.RRset: 393 """Hash of a given name must fall between an NSEC3 owner and 'next' name""" 394 hashed_name = self.hash_name(name) 395 for rrset in self.rrsets: 396 name_is_covered = self.nsec3_covers(rrset, hashed_name) 397 if name_is_covered: 398 self.owners_used.add(rrset.name) 399 return rrset 400 401 assert ( 402 False 403 ), f"Expected covering NSEC3 for {name} (hash={hashed_name}) not found:\n{self.response}" 404 405 def prove_name_exists(self, owner: dns.name.Name) -> dns.rrset.RRset: 406 """Check response has NSEC3 RR matching given owner name, i.e. the name exists.""" 407 nsec3_owner = self.hash_name(owner) 408 for rrset in self.rrsets: 409 if rrset.match( 410 nsec3_owner, dns.rdataclass.IN, dns.rdatatype.NSEC3, dns.rdatatype.NONE 411 ): 412 self.owners_used.add(rrset.name) 413 return rrset 414 assert ( 415 False 416 ), f"Expected matching NSEC3 for {owner} (hash={nsec3_owner}) not found:\n{self.response}" 417 418 def check_extraneous_rrs(self) -> None: 419 """Check that all NSEC3 RRs present in the message were actually needed for proofs""" 420 assert ( 421 self.owners_used == self.owners_present 422 ), f"extraneous NSEC3 RRs detected\n{self.response}" 423