tests_checkds.py revision 1.1.1.2.4.1 1 #!/usr/bin/python3
2
3 # Copyright (C) Internet Systems Consortium, Inc. ("ISC")
4 #
5 # SPDX-License-Identifier: MPL-2.0
6 #
7 # This Source Code Form is subject to the terms of the Mozilla Public
8 # License, v. 2.0. If a copy of the MPL was not distributed with this
9 # file, you can obtain one at https://mozilla.org/MPL/2.0/.
10 #
11 # See the COPYRIGHT file distributed with this work for additional
12 # information regarding copyright ownership.
13
14
15 from typing import NamedTuple, Tuple
16
17 import os
18 import sys
19 import time
20
21 import isctest
22 import pytest
23
24 pytest.importorskip("dns", minversion="2.0.0")
25 import dns.exception
26 import dns.message
27 import dns.name
28 import dns.rcode
29 import dns.rdataclass
30 import dns.rdatatype
31
32
33 pytestmark = [
34 pytest.mark.skipif(
35 sys.version_info < (3, 7), reason="Python >= 3.7 required [GL #3001]"
36 ),
37 pytest.mark.extra_artifacts(
38 [
39 "*.out",
40 "ns*/*.db",
41 "ns*/*.db.infile",
42 "ns*/*.db.signed",
43 "ns*/*.jnl",
44 "ns*/*.jbk",
45 "ns*/*.keyname",
46 "ns*/dsset-*",
47 "ns*/K*",
48 "ns*/keygen.out*",
49 "ns*/settime.out*",
50 "ns*/signer.out*",
51 "ns*/trusted.conf",
52 "ns*/zones",
53 ]
54 ),
55 ]
56
57
58 def has_signed_apex_nsec(zone, response):
59 has_nsec = False
60 has_rrsig = False
61
62 ttl = 300
63 nextname = "a."
64 labelcount = zone.count(".") # zone is specified as FQDN
65 types = "NS SOA RRSIG NSEC DNSKEY"
66 match = f"{zone} {ttl} IN NSEC {nextname}{zone} {types}"
67 sig = f"{zone} {ttl} IN RRSIG NSEC 13 {labelcount} 300"
68
69 for rr in response.answer:
70 if match in rr.to_text():
71 has_nsec = True
72 if sig in rr.to_text():
73 has_rrsig = True
74
75 if not has_nsec:
76 isctest.log.error("missing apex NSEC record in response")
77 if not has_rrsig:
78 isctest.log.error("missing NSEC signature in response")
79
80 return has_nsec and has_rrsig
81
82
83 def do_query(server, qname, qtype, tcp=False):
84 msg = dns.message.make_query(qname, qtype, use_edns=True, want_dnssec=True)
85 query_func = isctest.query.tcp if tcp else isctest.query.udp
86 response = query_func(msg, server.ip, expected_rcode=dns.rcode.NOERROR)
87 return response
88
89
90 def verify_zone(zone, transfer):
91 verify = os.getenv("VERIFY")
92 assert verify is not None
93
94 filename = f"{zone}out"
95 with open(filename, "w", encoding="utf-8") as file:
96 for rr in transfer.answer:
97 file.write(rr.to_text())
98 file.write("\n")
99
100 # dnssec-verify command with default arguments.
101 verify_cmd = [verify, "-z", "-o", zone, filename]
102
103 verifier = isctest.run.cmd(verify_cmd)
104
105 if verifier.returncode != 0:
106 isctest.log.error(f"dnssec-verify {zone} failed")
107
108 return verifier.returncode == 0
109
110
111 def read_statefile(server, zone):
112 count = 0
113 keyid = 0
114 state = {}
115
116 response = do_query(server, zone, "DS", tcp=True)
117 # fetch key id from response.
118 for rr in response.answer:
119 if rr.match(
120 dns.name.from_text(zone),
121 dns.rdataclass.IN,
122 dns.rdatatype.DS,
123 dns.rdatatype.NONE,
124 ):
125 if count == 0:
126 keyid = list(dict(rr.items).items())[0][0].key_tag
127 count += 1
128
129 assert (
130 count == 1
131 ), f"expected a single DS in response for {zone} from {server.ip}, got {count}"
132
133 filename = f"ns9/K{zone}+013+{keyid:05d}.state"
134 isctest.log.debug(f"read state file {filename}")
135
136 try:
137 with open(filename, "r", encoding="utf-8") as file:
138 for line in file:
139 if line.startswith(";"):
140 continue
141 key, val = line.strip().split(":", 1)
142 state[key.strip()] = val.strip()
143 except FileNotFoundError:
144 # file may not be written just yet.
145 return {}
146
147 return state
148
149
150 def zone_check(server, zone):
151 fqdn = f"{zone}."
152
153 # check zone is fully signed.
154 response = do_query(server, fqdn, "NSEC")
155 assert has_signed_apex_nsec(fqdn, response)
156
157 # check if zone if DNSSEC valid.
158 transfer = do_query(server, fqdn, "AXFR", tcp=True)
159 assert verify_zone(fqdn, transfer)
160
161
162 def keystate_check(server, zone, key):
163 fqdn = f"{zone}."
164 val = 0
165 deny = False
166
167 search = key
168 if key.startswith("!"):
169 deny = True
170 search = key[1:]
171
172 for _ in range(10):
173 state = read_statefile(server, fqdn)
174 try:
175 val = state[search]
176 except KeyError:
177 pass
178
179 if not deny and val != 0:
180 break
181 if deny and val == 0:
182 break
183
184 time.sleep(1)
185
186 if deny:
187 assert val == 0
188 else:
189 assert val != 0
190
191
192 def rekey(zone):
193 rndc = os.getenv("RNDC")
194 assert rndc is not None
195
196 port = os.getenv("CONTROLPORT")
197 assert port is not None
198
199 # rndc loadkeys.
200 rndc_cmd = [
201 rndc,
202 "-c",
203 "../_common/rndc.conf",
204 "-p",
205 port,
206 "-s",
207 "10.53.0.9",
208 "loadkeys",
209 zone,
210 ]
211 controller = isctest.run.cmd(rndc_cmd)
212
213 if controller.returncode != 0:
214 isctest.log.error(f"rndc loadkeys {zone} failed")
215
216 assert controller.returncode == 0
217
218
219 class CheckDSTest(NamedTuple):
220 zone: str
221 logs_to_wait_for: Tuple[str]
222 expected_parent_state: str
223
224
225 parental_agents_tests = [
226 # Using a reference to parental-agents.
227 CheckDSTest(
228 zone="reference.explicit.dspublish.ns2",
229 logs_to_wait_for=("DS response from 10.53.0.8",),
230 expected_parent_state="DSPublish",
231 ),
232 # Using a resolver as parental-agent (ns3).
233 CheckDSTest(
234 zone="resolver.explicit.dspublish.ns2",
235 logs_to_wait_for=("DS response from 10.53.0.3",),
236 expected_parent_state="DSPublish",
237 ),
238 # Using a resolver as parental-agent (ns3).
239 CheckDSTest(
240 zone="resolver.explicit.dsremoved.ns5",
241 logs_to_wait_for=("empty DS response from 10.53.0.3",),
242 expected_parent_state="DSRemoved",
243 ),
244 ]
245
246 no_ent_tests = [
247 CheckDSTest(
248 zone="no-ent.ns2",
249 logs_to_wait_for=("DS response from 10.53.0.2",),
250 expected_parent_state="DSPublish",
251 ),
252 CheckDSTest(
253 zone="no-ent.ns5",
254 logs_to_wait_for=("DS response from 10.53.0.5",),
255 expected_parent_state="DSRemoved",
256 ),
257 ]
258
259
260 def dspublished_tests(checkds, addr):
261 return [
262 #
263 # 1.1.1: DS is correctly published in parent.
264 # parental-agents: ns2
265 #
266 # The simple case.
267 CheckDSTest(
268 zone=f"good.{checkds}.dspublish.ns2",
269 logs_to_wait_for=(f"DS response from {addr}",),
270 expected_parent_state="DSPublish",
271 ),
272 #
273 # 1.1.2: DS is not published in parent.
274 # parental-agents: ns5
275 #
276 CheckDSTest(
277 zone=f"not-yet.{checkds}.dspublish.ns5",
278 logs_to_wait_for=("empty DS response from 10.53.0.5",),
279 expected_parent_state="!DSPublish",
280 ),
281 #
282 # 1.1.3: The parental agent is badly configured.
283 # parental-agents: ns6
284 #
285 CheckDSTest(
286 zone=f"bad.{checkds}.dspublish.ns6",
287 logs_to_wait_for=(
288 (
289 "bad DS response from 10.53.0.6"
290 if checkds == "explicit"
291 else "error during parental-agents processing"
292 ),
293 ),
294 expected_parent_state="!DSPublish",
295 ),
296 #
297 # 1.1.4: DS is published, but has bogus signature.
298 #
299 # TBD
300 #
301 # 1.2.1: DS is correctly published in all parents.
302 # parental-agents: ns2, ns4
303 #
304 CheckDSTest(
305 zone=f"good.{checkds}.dspublish.ns2-4",
306 logs_to_wait_for=(f"DS response from {addr}", "DS response from 10.53.0.4"),
307 expected_parent_state="DSPublish",
308 ),
309 #
310 # 1.2.2: DS is not published in some parents.
311 # parental-agents: ns2, ns4, ns5
312 #
313 CheckDSTest(
314 zone=f"incomplete.{checkds}.dspublish.ns2-4-5",
315 logs_to_wait_for=(
316 f"DS response from {addr}",
317 "DS response from 10.53.0.4",
318 "empty DS response from 10.53.0.5",
319 ),
320 expected_parent_state="!DSPublish",
321 ),
322 #
323 # 1.2.3: One parental agent is badly configured.
324 # parental-agents: ns2, ns4, ns6
325 #
326 CheckDSTest(
327 zone=f"bad.{checkds}.dspublish.ns2-4-6",
328 logs_to_wait_for=(
329 f"DS response from {addr}",
330 "DS response from 10.53.0.4",
331 "bad DS response from 10.53.0.6",
332 ),
333 expected_parent_state="!DSPublish",
334 ),
335 #
336 # 1.2.4: DS is completely published, bogus signature.
337 #
338 # TBD
339 # TBD: Check with TSIG
340 # TBD: Check with TLS
341 ]
342
343
344 def dswithdrawn_tests(checkds, addr):
345 return [
346 #
347 # 2.1.1: DS correctly withdrawn from the parent.
348 # parental-agents: ns5
349 #
350 # The simple case.
351 CheckDSTest(
352 zone=f"good.{checkds}.dsremoved.ns5",
353 logs_to_wait_for=(f"empty DS response from {addr}",),
354 expected_parent_state="DSRemoved",
355 ),
356 #
357 # 2.1.2: DS is published in the parent.
358 # parental-agents: ns2
359 #
360 CheckDSTest(
361 zone=f"still-there.{checkds}.dsremoved.ns2",
362 logs_to_wait_for=("DS response from 10.53.0.2",),
363 expected_parent_state="!DSRemoved",
364 ),
365 #
366 # 2.1.3: The parental agent is badly configured.
367 # parental-agents: ns6
368 #
369 CheckDSTest(
370 zone=f"bad.{checkds}.dsremoved.ns6",
371 logs_to_wait_for=(
372 (
373 "bad DS response from 10.53.0.6"
374 if checkds == "explicit"
375 else "error during parental-agents processing"
376 ),
377 ),
378 expected_parent_state="!DSRemoved",
379 ),
380 #
381 # 2.1.4: DS is withdrawn, but has bogus signature.
382 #
383 # TBD
384 #
385 # 2.2.1: DS is correctly withdrawn from all parents.
386 # parental-agents: ns5, ns7
387 #
388 CheckDSTest(
389 zone=f"good.{checkds}.dsremoved.ns5-7",
390 logs_to_wait_for=(
391 f"empty DS response from {addr}",
392 "empty DS response from 10.53.0.7",
393 ),
394 expected_parent_state="DSRemoved",
395 ),
396 #
397 # 2.2.2: DS is not withdrawn from some parents.
398 # parental-agents: ns2, ns5, ns7
399 #
400 CheckDSTest(
401 zone=f"incomplete.{checkds}.dsremoved.ns2-5-7",
402 logs_to_wait_for=(
403 "DS response from 10.53.0.2",
404 f"empty DS response from {addr}",
405 "empty DS response from 10.53.0.7",
406 ),
407 expected_parent_state="!DSRemoved",
408 ),
409 #
410 # 2.2.3: One parental agent is badly configured.
411 # parental-agents: ns5, ns6, ns7
412 #
413 CheckDSTest(
414 zone=f"bad.{checkds}.dsremoved.ns5-6-7",
415 logs_to_wait_for=(
416 f"empty DS response from {addr}",
417 "empty DS response from 10.53.0.7",
418 "bad DS response from 10.53.0.6",
419 ),
420 expected_parent_state="!DSRemoved",
421 ),
422 #
423 # 2.2.4:: DS is removed completely, bogus signature.
424 #
425 # TBD
426 ]
427
428
429 checkds_no_tests = [
430 CheckDSTest(
431 zone="good.no.dspublish.ns2",
432 logs_to_wait_for=(),
433 expected_parent_state="!DSPublish",
434 ),
435 CheckDSTest(
436 zone="good.no.dspublish.ns2-4",
437 logs_to_wait_for=(),
438 expected_parent_state="!DSPublish",
439 ),
440 CheckDSTest(
441 zone="good.no.dsremoved.ns5",
442 logs_to_wait_for=(),
443 expected_parent_state="!DSRemoved",
444 ),
445 CheckDSTest(
446 zone="good.no.dsremoved.ns5-7",
447 logs_to_wait_for=(),
448 expected_parent_state="!DSRemoved",
449 ),
450 ]
451
452
453 checkds_tests = (
454 parental_agents_tests
455 + no_ent_tests
456 + dspublished_tests("explicit", "10.53.0.8")
457 + dspublished_tests("yes", "10.53.0.2")
458 + dswithdrawn_tests("explicit", "10.53.0.10")
459 + dswithdrawn_tests("yes", "10.53.0.5")
460 + checkds_no_tests
461 )
462
463
464 @pytest.mark.parametrize("params", checkds_tests, ids=lambda t: t.zone)
465 def test_checkds(servers, params):
466 # Wait until the provided zone is signed and then verify its DNSSEC data.
467 zone_check(servers["ns9"], params.zone)
468
469 # Wait up to 10 seconds until all the expected log lines are found in the
470 # log file for the provided server. Rekey every second if necessary.
471 time_remaining = 10
472 for log_string in params.logs_to_wait_for:
473 line = f"zone {params.zone}/IN (signed): checkds: {log_string}"
474 while line not in servers["ns9"].log:
475 rekey(params.zone)
476 time_remaining -= 1
477 assert time_remaining, f'Timed out waiting for "{log_string}" to be logged'
478 time.sleep(1)
479
480 # Check whether key states on the parent server provided match
481 # expectations.
482 keystate_check(servers["ns2"], params.zone, params.expected_parent_state)
483