Home | History | Annotate | Line # | Download | only in checkds
tests_checkds.py revision 1.1.1.2.4.1
      1 #!/usr/bin/python3
      2 
      3 # Copyright (C) Internet Systems Consortium, Inc. ("ISC")
      4 #
      5 # SPDX-License-Identifier: MPL-2.0
      6 #
      7 # This Source Code Form is subject to the terms of the Mozilla Public
      8 # License, v. 2.0.  If a copy of the MPL was not distributed with this
      9 # file, you can obtain one at https://mozilla.org/MPL/2.0/.
     10 #
     11 # See the COPYRIGHT file distributed with this work for additional
     12 # information regarding copyright ownership.
     13 
     14 
     15 from typing import NamedTuple, Tuple
     16 
     17 import os
     18 import sys
     19 import time
     20 
     21 import isctest
     22 import pytest
     23 
     24 pytest.importorskip("dns", minversion="2.0.0")
     25 import dns.exception
     26 import dns.message
     27 import dns.name
     28 import dns.rcode
     29 import dns.rdataclass
     30 import dns.rdatatype
     31 
     32 
     33 pytestmark = [
     34     pytest.mark.skipif(
     35         sys.version_info < (3, 7), reason="Python >= 3.7 required [GL #3001]"
     36     ),
     37     pytest.mark.extra_artifacts(
     38         [
     39             "*.out",
     40             "ns*/*.db",
     41             "ns*/*.db.infile",
     42             "ns*/*.db.signed",
     43             "ns*/*.jnl",
     44             "ns*/*.jbk",
     45             "ns*/*.keyname",
     46             "ns*/dsset-*",
     47             "ns*/K*",
     48             "ns*/keygen.out*",
     49             "ns*/settime.out*",
     50             "ns*/signer.out*",
     51             "ns*/trusted.conf",
     52             "ns*/zones",
     53         ]
     54     ),
     55 ]
     56 
     57 
     58 def has_signed_apex_nsec(zone, response):
     59     has_nsec = False
     60     has_rrsig = False
     61 
     62     ttl = 300
     63     nextname = "a."
     64     labelcount = zone.count(".")  # zone is specified as FQDN
     65     types = "NS SOA RRSIG NSEC DNSKEY"
     66     match = f"{zone} {ttl} IN NSEC {nextname}{zone} {types}"
     67     sig = f"{zone} {ttl} IN RRSIG NSEC 13 {labelcount} 300"
     68 
     69     for rr in response.answer:
     70         if match in rr.to_text():
     71             has_nsec = True
     72         if sig in rr.to_text():
     73             has_rrsig = True
     74 
     75     if not has_nsec:
     76         isctest.log.error("missing apex NSEC record in response")
     77     if not has_rrsig:
     78         isctest.log.error("missing NSEC signature in response")
     79 
     80     return has_nsec and has_rrsig
     81 
     82 
     83 def do_query(server, qname, qtype, tcp=False):
     84     msg = dns.message.make_query(qname, qtype, use_edns=True, want_dnssec=True)
     85     query_func = isctest.query.tcp if tcp else isctest.query.udp
     86     response = query_func(msg, server.ip, expected_rcode=dns.rcode.NOERROR)
     87     return response
     88 
     89 
     90 def verify_zone(zone, transfer):
     91     verify = os.getenv("VERIFY")
     92     assert verify is not None
     93 
     94     filename = f"{zone}out"
     95     with open(filename, "w", encoding="utf-8") as file:
     96         for rr in transfer.answer:
     97             file.write(rr.to_text())
     98             file.write("\n")
     99 
    100     # dnssec-verify command with default arguments.
    101     verify_cmd = [verify, "-z", "-o", zone, filename]
    102 
    103     verifier = isctest.run.cmd(verify_cmd)
    104 
    105     if verifier.returncode != 0:
    106         isctest.log.error(f"dnssec-verify {zone} failed")
    107 
    108     return verifier.returncode == 0
    109 
    110 
    111 def read_statefile(server, zone):
    112     count = 0
    113     keyid = 0
    114     state = {}
    115 
    116     response = do_query(server, zone, "DS", tcp=True)
    117     # fetch key id from response.
    118     for rr in response.answer:
    119         if rr.match(
    120             dns.name.from_text(zone),
    121             dns.rdataclass.IN,
    122             dns.rdatatype.DS,
    123             dns.rdatatype.NONE,
    124         ):
    125             if count == 0:
    126                 keyid = list(dict(rr.items).items())[0][0].key_tag
    127             count += 1
    128 
    129     assert (
    130         count == 1
    131     ), f"expected a single DS in response for {zone} from {server.ip}, got {count}"
    132 
    133     filename = f"ns9/K{zone}+013+{keyid:05d}.state"
    134     isctest.log.debug(f"read state file {filename}")
    135 
    136     try:
    137         with open(filename, "r", encoding="utf-8") as file:
    138             for line in file:
    139                 if line.startswith(";"):
    140                     continue
    141                 key, val = line.strip().split(":", 1)
    142                 state[key.strip()] = val.strip()
    143     except FileNotFoundError:
    144         # file may not be written just yet.
    145         return {}
    146 
    147     return state
    148 
    149 
    150 def zone_check(server, zone):
    151     fqdn = f"{zone}."
    152 
    153     # check zone is fully signed.
    154     response = do_query(server, fqdn, "NSEC")
    155     assert has_signed_apex_nsec(fqdn, response)
    156 
    157     # check if zone if DNSSEC valid.
    158     transfer = do_query(server, fqdn, "AXFR", tcp=True)
    159     assert verify_zone(fqdn, transfer)
    160 
    161 
    162 def keystate_check(server, zone, key):
    163     fqdn = f"{zone}."
    164     val = 0
    165     deny = False
    166 
    167     search = key
    168     if key.startswith("!"):
    169         deny = True
    170         search = key[1:]
    171 
    172     for _ in range(10):
    173         state = read_statefile(server, fqdn)
    174         try:
    175             val = state[search]
    176         except KeyError:
    177             pass
    178 
    179         if not deny and val != 0:
    180             break
    181         if deny and val == 0:
    182             break
    183 
    184         time.sleep(1)
    185 
    186     if deny:
    187         assert val == 0
    188     else:
    189         assert val != 0
    190 
    191 
    192 def rekey(zone):
    193     rndc = os.getenv("RNDC")
    194     assert rndc is not None
    195 
    196     port = os.getenv("CONTROLPORT")
    197     assert port is not None
    198 
    199     # rndc loadkeys.
    200     rndc_cmd = [
    201         rndc,
    202         "-c",
    203         "../_common/rndc.conf",
    204         "-p",
    205         port,
    206         "-s",
    207         "10.53.0.9",
    208         "loadkeys",
    209         zone,
    210     ]
    211     controller = isctest.run.cmd(rndc_cmd)
    212 
    213     if controller.returncode != 0:
    214         isctest.log.error(f"rndc loadkeys {zone} failed")
    215 
    216     assert controller.returncode == 0
    217 
    218 
    219 class CheckDSTest(NamedTuple):
    220     zone: str
    221     logs_to_wait_for: Tuple[str]
    222     expected_parent_state: str
    223 
    224 
    225 parental_agents_tests = [
    226     # Using a reference to parental-agents.
    227     CheckDSTest(
    228         zone="reference.explicit.dspublish.ns2",
    229         logs_to_wait_for=("DS response from 10.53.0.8",),
    230         expected_parent_state="DSPublish",
    231     ),
    232     # Using a resolver as parental-agent (ns3).
    233     CheckDSTest(
    234         zone="resolver.explicit.dspublish.ns2",
    235         logs_to_wait_for=("DS response from 10.53.0.3",),
    236         expected_parent_state="DSPublish",
    237     ),
    238     # Using a resolver as parental-agent (ns3).
    239     CheckDSTest(
    240         zone="resolver.explicit.dsremoved.ns5",
    241         logs_to_wait_for=("empty DS response from 10.53.0.3",),
    242         expected_parent_state="DSRemoved",
    243     ),
    244 ]
    245 
    246 no_ent_tests = [
    247     CheckDSTest(
    248         zone="no-ent.ns2",
    249         logs_to_wait_for=("DS response from 10.53.0.2",),
    250         expected_parent_state="DSPublish",
    251     ),
    252     CheckDSTest(
    253         zone="no-ent.ns5",
    254         logs_to_wait_for=("DS response from 10.53.0.5",),
    255         expected_parent_state="DSRemoved",
    256     ),
    257 ]
    258 
    259 
    260 def dspublished_tests(checkds, addr):
    261     return [
    262         #
    263         # 1.1.1: DS is correctly published in parent.
    264         # parental-agents: ns2
    265         #
    266         # The simple case.
    267         CheckDSTest(
    268             zone=f"good.{checkds}.dspublish.ns2",
    269             logs_to_wait_for=(f"DS response from {addr}",),
    270             expected_parent_state="DSPublish",
    271         ),
    272         #
    273         # 1.1.2: DS is not published in parent.
    274         # parental-agents: ns5
    275         #
    276         CheckDSTest(
    277             zone=f"not-yet.{checkds}.dspublish.ns5",
    278             logs_to_wait_for=("empty DS response from 10.53.0.5",),
    279             expected_parent_state="!DSPublish",
    280         ),
    281         #
    282         # 1.1.3: The parental agent is badly configured.
    283         # parental-agents: ns6
    284         #
    285         CheckDSTest(
    286             zone=f"bad.{checkds}.dspublish.ns6",
    287             logs_to_wait_for=(
    288                 (
    289                     "bad DS response from 10.53.0.6"
    290                     if checkds == "explicit"
    291                     else "error during parental-agents processing"
    292                 ),
    293             ),
    294             expected_parent_state="!DSPublish",
    295         ),
    296         #
    297         # 1.1.4: DS is published, but has bogus signature.
    298         #
    299         # TBD
    300         #
    301         # 1.2.1: DS is correctly published in all parents.
    302         # parental-agents: ns2, ns4
    303         #
    304         CheckDSTest(
    305             zone=f"good.{checkds}.dspublish.ns2-4",
    306             logs_to_wait_for=(f"DS response from {addr}", "DS response from 10.53.0.4"),
    307             expected_parent_state="DSPublish",
    308         ),
    309         #
    310         # 1.2.2: DS is not published in some parents.
    311         # parental-agents: ns2, ns4, ns5
    312         #
    313         CheckDSTest(
    314             zone=f"incomplete.{checkds}.dspublish.ns2-4-5",
    315             logs_to_wait_for=(
    316                 f"DS response from {addr}",
    317                 "DS response from 10.53.0.4",
    318                 "empty DS response from 10.53.0.5",
    319             ),
    320             expected_parent_state="!DSPublish",
    321         ),
    322         #
    323         # 1.2.3: One parental agent is badly configured.
    324         # parental-agents: ns2, ns4, ns6
    325         #
    326         CheckDSTest(
    327             zone=f"bad.{checkds}.dspublish.ns2-4-6",
    328             logs_to_wait_for=(
    329                 f"DS response from {addr}",
    330                 "DS response from 10.53.0.4",
    331                 "bad DS response from 10.53.0.6",
    332             ),
    333             expected_parent_state="!DSPublish",
    334         ),
    335         #
    336         # 1.2.4: DS is completely published, bogus signature.
    337         #
    338         # TBD
    339         # TBD: Check with TSIG
    340         # TBD: Check with TLS
    341     ]
    342 
    343 
    344 def dswithdrawn_tests(checkds, addr):
    345     return [
    346         #
    347         # 2.1.1: DS correctly withdrawn from the parent.
    348         # parental-agents: ns5
    349         #
    350         # The simple case.
    351         CheckDSTest(
    352             zone=f"good.{checkds}.dsremoved.ns5",
    353             logs_to_wait_for=(f"empty DS response from {addr}",),
    354             expected_parent_state="DSRemoved",
    355         ),
    356         #
    357         # 2.1.2: DS is published in the parent.
    358         # parental-agents: ns2
    359         #
    360         CheckDSTest(
    361             zone=f"still-there.{checkds}.dsremoved.ns2",
    362             logs_to_wait_for=("DS response from 10.53.0.2",),
    363             expected_parent_state="!DSRemoved",
    364         ),
    365         #
    366         # 2.1.3: The parental agent is badly configured.
    367         # parental-agents: ns6
    368         #
    369         CheckDSTest(
    370             zone=f"bad.{checkds}.dsremoved.ns6",
    371             logs_to_wait_for=(
    372                 (
    373                     "bad DS response from 10.53.0.6"
    374                     if checkds == "explicit"
    375                     else "error during parental-agents processing"
    376                 ),
    377             ),
    378             expected_parent_state="!DSRemoved",
    379         ),
    380         #
    381         # 2.1.4: DS is withdrawn, but has bogus signature.
    382         #
    383         # TBD
    384         #
    385         # 2.2.1: DS is correctly withdrawn from all parents.
    386         # parental-agents: ns5, ns7
    387         #
    388         CheckDSTest(
    389             zone=f"good.{checkds}.dsremoved.ns5-7",
    390             logs_to_wait_for=(
    391                 f"empty DS response from {addr}",
    392                 "empty DS response from 10.53.0.7",
    393             ),
    394             expected_parent_state="DSRemoved",
    395         ),
    396         #
    397         # 2.2.2: DS is not withdrawn from some parents.
    398         # parental-agents: ns2, ns5, ns7
    399         #
    400         CheckDSTest(
    401             zone=f"incomplete.{checkds}.dsremoved.ns2-5-7",
    402             logs_to_wait_for=(
    403                 "DS response from 10.53.0.2",
    404                 f"empty DS response from {addr}",
    405                 "empty DS response from 10.53.0.7",
    406             ),
    407             expected_parent_state="!DSRemoved",
    408         ),
    409         #
    410         # 2.2.3: One parental agent is badly configured.
    411         # parental-agents: ns5, ns6, ns7
    412         #
    413         CheckDSTest(
    414             zone=f"bad.{checkds}.dsremoved.ns5-6-7",
    415             logs_to_wait_for=(
    416                 f"empty DS response from {addr}",
    417                 "empty DS response from 10.53.0.7",
    418                 "bad DS response from 10.53.0.6",
    419             ),
    420             expected_parent_state="!DSRemoved",
    421         ),
    422         #
    423         # 2.2.4:: DS is removed completely, bogus signature.
    424         #
    425         # TBD
    426     ]
    427 
    428 
    429 checkds_no_tests = [
    430     CheckDSTest(
    431         zone="good.no.dspublish.ns2",
    432         logs_to_wait_for=(),
    433         expected_parent_state="!DSPublish",
    434     ),
    435     CheckDSTest(
    436         zone="good.no.dspublish.ns2-4",
    437         logs_to_wait_for=(),
    438         expected_parent_state="!DSPublish",
    439     ),
    440     CheckDSTest(
    441         zone="good.no.dsremoved.ns5",
    442         logs_to_wait_for=(),
    443         expected_parent_state="!DSRemoved",
    444     ),
    445     CheckDSTest(
    446         zone="good.no.dsremoved.ns5-7",
    447         logs_to_wait_for=(),
    448         expected_parent_state="!DSRemoved",
    449     ),
    450 ]
    451 
    452 
    453 checkds_tests = (
    454     parental_agents_tests
    455     + no_ent_tests
    456     + dspublished_tests("explicit", "10.53.0.8")
    457     + dspublished_tests("yes", "10.53.0.2")
    458     + dswithdrawn_tests("explicit", "10.53.0.10")
    459     + dswithdrawn_tests("yes", "10.53.0.5")
    460     + checkds_no_tests
    461 )
    462 
    463 
    464 @pytest.mark.parametrize("params", checkds_tests, ids=lambda t: t.zone)
    465 def test_checkds(servers, params):
    466     # Wait until the provided zone is signed and then verify its DNSSEC data.
    467     zone_check(servers["ns9"], params.zone)
    468 
    469     # Wait up to 10 seconds until all the expected log lines are found in the
    470     # log file for the provided server.  Rekey every second if necessary.
    471     time_remaining = 10
    472     for log_string in params.logs_to_wait_for:
    473         line = f"zone {params.zone}/IN (signed): checkds: {log_string}"
    474         while line not in servers["ns9"].log:
    475             rekey(params.zone)
    476             time_remaining -= 1
    477             assert time_remaining, f'Timed out waiting for "{log_string}" to be logged'
    478             time.sleep(1)
    479 
    480     # Check whether key states on the parent server provided match
    481     # expectations.
    482     keystate_check(servers["ns2"], params.zone, params.expected_parent_state)
    483