1 #!/usr/bin/python3 2 3 # Copyright (C) Internet Systems Consortium, Inc. ("ISC") 4 # 5 # SPDX-License-Identifier: MPL-2.0 6 # 7 # This Source Code Form is subject to the terms of the Mozilla Public 8 # License, v. 2.0. If a copy of the MPL was not distributed with this 9 # file, you can obtain one at https://mozilla.org/MPL/2.0/. 10 # 11 # See the COPYRIGHT file distributed with this work for additional 12 # information regarding copyright ownership. 13 14 15 from typing import NamedTuple, Tuple 16 17 import os 18 import sys 19 import time 20 21 import isctest 22 import pytest 23 24 pytest.importorskip("dns", minversion="2.0.0") 25 import dns.exception 26 import dns.message 27 import dns.name 28 import dns.rcode 29 import dns.rdataclass 30 import dns.rdatatype 31 32 33 pytestmark = [ 34 pytest.mark.skipif( 35 sys.version_info < (3, 7), reason="Python >= 3.7 required [GL #3001]" 36 ), 37 pytest.mark.extra_artifacts( 38 [ 39 "*.out", 40 "ns*/*.db", 41 "ns*/*.db.infile", 42 "ns*/*.db.signed", 43 "ns*/*.jnl", 44 "ns*/*.jbk", 45 "ns*/*.keyname", 46 "ns*/dsset-*", 47 "ns*/K*", 48 "ns*/keygen.out*", 49 "ns*/settime.out*", 50 "ns*/signer.out*", 51 "ns*/trusted.conf", 52 "ns*/zones", 53 ] 54 ), 55 ] 56 57 58 def has_signed_apex_nsec(zone, response): 59 has_nsec = False 60 has_rrsig = False 61 62 ttl = 300 63 nextname = "a." 64 labelcount = zone.count(".") # zone is specified as FQDN 65 types = "NS SOA RRSIG NSEC DNSKEY" 66 match = f"{zone} {ttl} IN NSEC {nextname}{zone} {types}" 67 sig = f"{zone} {ttl} IN RRSIG NSEC 13 {labelcount} 300" 68 69 for rr in response.answer: 70 if match in rr.to_text(): 71 has_nsec = True 72 if sig in rr.to_text(): 73 has_rrsig = True 74 75 if not has_nsec: 76 isctest.log.error("missing apex NSEC record in response") 77 if not has_rrsig: 78 isctest.log.error("missing NSEC signature in response") 79 80 return has_nsec and has_rrsig 81 82 83 def do_query(server, qname, qtype, tcp=False): 84 msg = isctest.query.create(qname, qtype) 85 query_func = isctest.query.tcp if tcp else isctest.query.udp 86 response = query_func(msg, server.ip, expected_rcode=dns.rcode.NOERROR) 87 return response 88 89 90 def verify_zone(zone, transfer): 91 verify = os.getenv("VERIFY") 92 assert verify is not None 93 94 filename = f"{zone}out" 95 with open(filename, "w", encoding="utf-8") as file: 96 for rr in transfer.answer: 97 file.write(rr.to_text()) 98 file.write("\n") 99 100 # dnssec-verify command with default arguments. 101 verify_cmd = [verify, "-z", "-o", zone, filename] 102 103 verifier = isctest.run.cmd(verify_cmd) 104 105 if verifier.rc != 0: 106 isctest.log.error(f"dnssec-verify {zone} failed") 107 108 return verifier.rc == 0 109 110 111 def read_statefile(server, zone): 112 count = 0 113 keyid = 0 114 state = {} 115 116 response = do_query(server, zone, "DS", tcp=True) 117 # fetch key id from response. 118 for rr in response.answer: 119 if rr.match( 120 dns.name.from_text(zone), 121 dns.rdataclass.IN, 122 dns.rdatatype.DS, 123 dns.rdatatype.NONE, 124 ): 125 if count == 0: 126 keyid = list(dict(rr.items).items())[0][0].key_tag 127 count += 1 128 129 assert ( 130 count == 1 131 ), f"expected a single DS in response for {zone} from {server.ip}, got {count}" 132 133 filename = f"ns9/K{zone}+013+{keyid:05d}.state" 134 isctest.log.debug(f"read state file {filename}") 135 136 try: 137 with open(filename, "r", encoding="utf-8") as file: 138 for line in file: 139 if line.startswith(";"): 140 continue 141 key, val = line.strip().split(":", 1) 142 state[key.strip()] = val.strip() 143 except FileNotFoundError: 144 # file may not be written just yet. 145 return {} 146 147 return state 148 149 150 def zone_check(server, zone): 151 fqdn = f"{zone}." 152 153 # check zone is fully signed. 154 response = do_query(server, fqdn, "NSEC") 155 assert has_signed_apex_nsec(fqdn, response) 156 157 # check if zone if DNSSEC valid. 158 transfer = do_query(server, fqdn, "AXFR", tcp=True) 159 assert verify_zone(fqdn, transfer) 160 161 162 def keystate_check(server, zone, key): 163 fqdn = f"{zone}." 164 val = 0 165 deny = False 166 167 search = key 168 if key.startswith("!"): 169 deny = True 170 search = key[1:] 171 172 for _ in range(10): 173 state = read_statefile(server, fqdn) 174 try: 175 val = state[search] 176 except KeyError: 177 pass 178 179 if not deny and val != 0: 180 break 181 if deny and val == 0: 182 break 183 184 time.sleep(1) 185 186 if deny: 187 assert val == 0 188 else: 189 assert val != 0 190 191 192 class CheckDSTest(NamedTuple): 193 zone: str 194 logs_to_wait_for: Tuple[str] 195 expected_parent_state: str 196 197 198 parental_agents_tests = [ 199 # Using a reference to parental-agents. 200 CheckDSTest( 201 zone="reference.explicit.dspublish.ns2", 202 logs_to_wait_for=("DS response from 10.53.0.8",), 203 expected_parent_state="DSPublish", 204 ), 205 # Using a resolver as parental-agent (ns3). 206 CheckDSTest( 207 zone="resolver.explicit.dspublish.ns2", 208 logs_to_wait_for=("DS response from 10.53.0.3",), 209 expected_parent_state="DSPublish", 210 ), 211 # Using a resolver as parental-agent (ns3). 212 CheckDSTest( 213 zone="resolver.explicit.dsremoved.ns5", 214 logs_to_wait_for=("empty DS response from 10.53.0.3",), 215 expected_parent_state="DSRemoved", 216 ), 217 ] 218 219 no_ent_tests = [ 220 CheckDSTest( 221 zone="no-ent.ns2", 222 logs_to_wait_for=("DS response from 10.53.0.2",), 223 expected_parent_state="DSPublish", 224 ), 225 CheckDSTest( 226 zone="no-ent.ns5", 227 logs_to_wait_for=("DS response from 10.53.0.5",), 228 expected_parent_state="DSRemoved", 229 ), 230 ] 231 232 233 def dspublished_tests(checkds, addr): 234 return [ 235 # 236 # 1.1.1: DS is correctly published in parent. 237 # parental-agents: ns2 238 # 239 # The simple case. 240 CheckDSTest( 241 zone=f"good.{checkds}.dspublish.ns2", 242 logs_to_wait_for=(f"DS response from {addr}",), 243 expected_parent_state="DSPublish", 244 ), 245 # 246 # 1.1.2: DS is not published in parent. 247 # parental-agents: ns5 248 # 249 CheckDSTest( 250 zone=f"not-yet.{checkds}.dspublish.ns5", 251 logs_to_wait_for=("empty DS response from 10.53.0.5",), 252 expected_parent_state="!DSPublish", 253 ), 254 # 255 # 1.1.3: The parental agent is badly configured. 256 # parental-agents: ns6 257 # 258 CheckDSTest( 259 zone=f"bad.{checkds}.dspublish.ns6", 260 logs_to_wait_for=( 261 ( 262 "bad DS response from 10.53.0.6" 263 if checkds == "explicit" 264 else "error during parental-agents processing" 265 ), 266 ), 267 expected_parent_state="!DSPublish", 268 ), 269 # 270 # 1.1.4: DS is published, but has bogus signature. 271 # 272 # TBD 273 # 274 # 1.2.1: DS is correctly published in all parents. 275 # parental-agents: ns2, ns4 276 # 277 CheckDSTest( 278 zone=f"good.{checkds}.dspublish.ns2-4", 279 logs_to_wait_for=(f"DS response from {addr}", "DS response from 10.53.0.4"), 280 expected_parent_state="DSPublish", 281 ), 282 # 283 # 1.2.2: DS is not published in some parents. 284 # parental-agents: ns2, ns4, ns5 285 # 286 CheckDSTest( 287 zone=f"incomplete.{checkds}.dspublish.ns2-4-5", 288 logs_to_wait_for=( 289 f"DS response from {addr}", 290 "DS response from 10.53.0.4", 291 "empty DS response from 10.53.0.5", 292 ), 293 expected_parent_state="!DSPublish", 294 ), 295 # 296 # 1.2.3: One parental agent is badly configured. 297 # parental-agents: ns2, ns4, ns6 298 # 299 CheckDSTest( 300 zone=f"bad.{checkds}.dspublish.ns2-4-6", 301 logs_to_wait_for=( 302 f"DS response from {addr}", 303 "DS response from 10.53.0.4", 304 "bad DS response from 10.53.0.6", 305 ), 306 expected_parent_state="!DSPublish", 307 ), 308 # 309 # 1.2.4: DS is completely published, bogus signature. 310 # 311 # TBD 312 # TBD: Check with TSIG 313 # TBD: Check with TLS 314 ] 315 316 317 def dswithdrawn_tests(checkds, addr): 318 return [ 319 # 320 # 2.1.1: DS correctly withdrawn from the parent. 321 # parental-agents: ns5 322 # 323 # The simple case. 324 CheckDSTest( 325 zone=f"good.{checkds}.dsremoved.ns5", 326 logs_to_wait_for=(f"empty DS response from {addr}",), 327 expected_parent_state="DSRemoved", 328 ), 329 # 330 # 2.1.2: DS is published in the parent. 331 # parental-agents: ns2 332 # 333 CheckDSTest( 334 zone=f"still-there.{checkds}.dsremoved.ns2", 335 logs_to_wait_for=("DS response from 10.53.0.2",), 336 expected_parent_state="!DSRemoved", 337 ), 338 # 339 # 2.1.3: The parental agent is badly configured. 340 # parental-agents: ns6 341 # 342 CheckDSTest( 343 zone=f"bad.{checkds}.dsremoved.ns6", 344 logs_to_wait_for=( 345 ( 346 "bad DS response from 10.53.0.6" 347 if checkds == "explicit" 348 else "error during parental-agents processing" 349 ), 350 ), 351 expected_parent_state="!DSRemoved", 352 ), 353 # 354 # 2.1.4: DS is withdrawn, but has bogus signature. 355 # 356 # TBD 357 # 358 # 2.2.1: DS is correctly withdrawn from all parents. 359 # parental-agents: ns5, ns7 360 # 361 CheckDSTest( 362 zone=f"good.{checkds}.dsremoved.ns5-7", 363 logs_to_wait_for=( 364 f"empty DS response from {addr}", 365 "empty DS response from 10.53.0.7", 366 ), 367 expected_parent_state="DSRemoved", 368 ), 369 # 370 # 2.2.2: DS is not withdrawn from some parents. 371 # parental-agents: ns2, ns5, ns7 372 # 373 CheckDSTest( 374 zone=f"incomplete.{checkds}.dsremoved.ns2-5-7", 375 logs_to_wait_for=( 376 "DS response from 10.53.0.2", 377 f"empty DS response from {addr}", 378 "empty DS response from 10.53.0.7", 379 ), 380 expected_parent_state="!DSRemoved", 381 ), 382 # 383 # 2.2.3: One parental agent is badly configured. 384 # parental-agents: ns5, ns6, ns7 385 # 386 CheckDSTest( 387 zone=f"bad.{checkds}.dsremoved.ns5-6-7", 388 logs_to_wait_for=( 389 f"empty DS response from {addr}", 390 "empty DS response from 10.53.0.7", 391 "bad DS response from 10.53.0.6", 392 ), 393 expected_parent_state="!DSRemoved", 394 ), 395 # 396 # 2.2.4:: DS is removed completely, bogus signature. 397 # 398 # TBD 399 ] 400 401 402 checkds_no_tests = [ 403 CheckDSTest( 404 zone="good.no.dspublish.ns2", 405 logs_to_wait_for=(), 406 expected_parent_state="!DSPublish", 407 ), 408 CheckDSTest( 409 zone="good.no.dspublish.ns2-4", 410 logs_to_wait_for=(), 411 expected_parent_state="!DSPublish", 412 ), 413 CheckDSTest( 414 zone="good.no.dsremoved.ns5", 415 logs_to_wait_for=(), 416 expected_parent_state="!DSRemoved", 417 ), 418 CheckDSTest( 419 zone="good.no.dsremoved.ns5-7", 420 logs_to_wait_for=(), 421 expected_parent_state="!DSRemoved", 422 ), 423 ] 424 425 426 checkds_tests = ( 427 parental_agents_tests 428 + no_ent_tests 429 + dspublished_tests("explicit", "10.53.0.8") 430 + dspublished_tests("yes", "10.53.0.2") 431 + dswithdrawn_tests("explicit", "10.53.0.10") 432 + dswithdrawn_tests("yes", "10.53.0.5") 433 + checkds_no_tests 434 ) 435 436 437 @pytest.mark.parametrize("params", checkds_tests, ids=lambda t: t.zone) 438 def test_checkds(ns2, ns9, params): 439 # Wait until the provided zone is signed and then verify its DNSSEC data. 440 zone_check(ns9, params.zone) 441 442 # Wait up to 10 seconds until all the expected log lines are found in the 443 # log file for the provided server. Rekey every second if necessary. 444 time_remaining = 10 445 for log_string in params.logs_to_wait_for: 446 line = f"zone {params.zone}/IN (signed): checkds: {log_string}" 447 while line not in ns9.log: 448 ns9.rndc(f"loadkeys {params.zone}") 449 time_remaining -= 1 450 assert time_remaining, f'Timed out waiting for "{log_string}" to be logged' 451 time.sleep(1) 452 453 # Check whether key states on the parent server provided match 454 # expectations. 455 keystate_check(ns2, params.zone, params.expected_parent_state) 456