Home | History | Annotate | Line # | Download | only in checkds
      1 #!/usr/bin/python3
      2 
      3 # Copyright (C) Internet Systems Consortium, Inc. ("ISC")
      4 #
      5 # SPDX-License-Identifier: MPL-2.0
      6 #
      7 # This Source Code Form is subject to the terms of the Mozilla Public
      8 # License, v. 2.0.  If a copy of the MPL was not distributed with this
      9 # file, you can obtain one at https://mozilla.org/MPL/2.0/.
     10 #
     11 # See the COPYRIGHT file distributed with this work for additional
     12 # information regarding copyright ownership.
     13 
     14 
     15 from typing import NamedTuple, Tuple
     16 
     17 import os
     18 import sys
     19 import time
     20 
     21 import isctest
     22 import pytest
     23 
     24 pytest.importorskip("dns", minversion="2.0.0")
     25 import dns.exception
     26 import dns.message
     27 import dns.name
     28 import dns.rcode
     29 import dns.rdataclass
     30 import dns.rdatatype
     31 
     32 
     33 pytestmark = [
     34     pytest.mark.skipif(
     35         sys.version_info < (3, 7), reason="Python >= 3.7 required [GL #3001]"
     36     ),
     37     pytest.mark.extra_artifacts(
     38         [
     39             "*.out",
     40             "ns*/*.db",
     41             "ns*/*.db.infile",
     42             "ns*/*.db.signed",
     43             "ns*/*.jnl",
     44             "ns*/*.jbk",
     45             "ns*/*.keyname",
     46             "ns*/dsset-*",
     47             "ns*/K*",
     48             "ns*/keygen.out*",
     49             "ns*/settime.out*",
     50             "ns*/signer.out*",
     51             "ns*/trusted.conf",
     52             "ns*/zones",
     53         ]
     54     ),
     55 ]
     56 
     57 
     58 def has_signed_apex_nsec(zone, response):
     59     has_nsec = False
     60     has_rrsig = False
     61 
     62     ttl = 300
     63     nextname = "a."
     64     labelcount = zone.count(".")  # zone is specified as FQDN
     65     types = "NS SOA RRSIG NSEC DNSKEY"
     66     match = f"{zone} {ttl} IN NSEC {nextname}{zone} {types}"
     67     sig = f"{zone} {ttl} IN RRSIG NSEC 13 {labelcount} 300"
     68 
     69     for rr in response.answer:
     70         if match in rr.to_text():
     71             has_nsec = True
     72         if sig in rr.to_text():
     73             has_rrsig = True
     74 
     75     if not has_nsec:
     76         isctest.log.error("missing apex NSEC record in response")
     77     if not has_rrsig:
     78         isctest.log.error("missing NSEC signature in response")
     79 
     80     return has_nsec and has_rrsig
     81 
     82 
     83 def do_query(server, qname, qtype, tcp=False):
     84     msg = isctest.query.create(qname, qtype)
     85     query_func = isctest.query.tcp if tcp else isctest.query.udp
     86     response = query_func(msg, server.ip, expected_rcode=dns.rcode.NOERROR)
     87     return response
     88 
     89 
     90 def verify_zone(zone, transfer):
     91     verify = os.getenv("VERIFY")
     92     assert verify is not None
     93 
     94     filename = f"{zone}out"
     95     with open(filename, "w", encoding="utf-8") as file:
     96         for rr in transfer.answer:
     97             file.write(rr.to_text())
     98             file.write("\n")
     99 
    100     # dnssec-verify command with default arguments.
    101     verify_cmd = [verify, "-z", "-o", zone, filename]
    102 
    103     verifier = isctest.run.cmd(verify_cmd)
    104 
    105     if verifier.rc != 0:
    106         isctest.log.error(f"dnssec-verify {zone} failed")
    107 
    108     return verifier.rc == 0
    109 
    110 
    111 def read_statefile(server, zone):
    112     count = 0
    113     keyid = 0
    114     state = {}
    115 
    116     response = do_query(server, zone, "DS", tcp=True)
    117     # fetch key id from response.
    118     for rr in response.answer:
    119         if rr.match(
    120             dns.name.from_text(zone),
    121             dns.rdataclass.IN,
    122             dns.rdatatype.DS,
    123             dns.rdatatype.NONE,
    124         ):
    125             if count == 0:
    126                 keyid = list(dict(rr.items).items())[0][0].key_tag
    127             count += 1
    128 
    129     assert (
    130         count == 1
    131     ), f"expected a single DS in response for {zone} from {server.ip}, got {count}"
    132 
    133     filename = f"ns9/K{zone}+013+{keyid:05d}.state"
    134     isctest.log.debug(f"read state file {filename}")
    135 
    136     try:
    137         with open(filename, "r", encoding="utf-8") as file:
    138             for line in file:
    139                 if line.startswith(";"):
    140                     continue
    141                 key, val = line.strip().split(":", 1)
    142                 state[key.strip()] = val.strip()
    143     except FileNotFoundError:
    144         # file may not be written just yet.
    145         return {}
    146 
    147     return state
    148 
    149 
    150 def zone_check(server, zone):
    151     fqdn = f"{zone}."
    152 
    153     # check zone is fully signed.
    154     response = do_query(server, fqdn, "NSEC")
    155     assert has_signed_apex_nsec(fqdn, response)
    156 
    157     # check if zone if DNSSEC valid.
    158     transfer = do_query(server, fqdn, "AXFR", tcp=True)
    159     assert verify_zone(fqdn, transfer)
    160 
    161 
    162 def keystate_check(server, zone, key):
    163     fqdn = f"{zone}."
    164     val = 0
    165     deny = False
    166 
    167     search = key
    168     if key.startswith("!"):
    169         deny = True
    170         search = key[1:]
    171 
    172     for _ in range(10):
    173         state = read_statefile(server, fqdn)
    174         try:
    175             val = state[search]
    176         except KeyError:
    177             pass
    178 
    179         if not deny and val != 0:
    180             break
    181         if deny and val == 0:
    182             break
    183 
    184         time.sleep(1)
    185 
    186     if deny:
    187         assert val == 0
    188     else:
    189         assert val != 0
    190 
    191 
    192 class CheckDSTest(NamedTuple):
    193     zone: str
    194     logs_to_wait_for: Tuple[str]
    195     expected_parent_state: str
    196 
    197 
    198 parental_agents_tests = [
    199     # Using a reference to parental-agents.
    200     CheckDSTest(
    201         zone="reference.explicit.dspublish.ns2",
    202         logs_to_wait_for=("DS response from 10.53.0.8",),
    203         expected_parent_state="DSPublish",
    204     ),
    205     # Using a resolver as parental-agent (ns3).
    206     CheckDSTest(
    207         zone="resolver.explicit.dspublish.ns2",
    208         logs_to_wait_for=("DS response from 10.53.0.3",),
    209         expected_parent_state="DSPublish",
    210     ),
    211     # Using a resolver as parental-agent (ns3).
    212     CheckDSTest(
    213         zone="resolver.explicit.dsremoved.ns5",
    214         logs_to_wait_for=("empty DS response from 10.53.0.3",),
    215         expected_parent_state="DSRemoved",
    216     ),
    217 ]
    218 
    219 no_ent_tests = [
    220     CheckDSTest(
    221         zone="no-ent.ns2",
    222         logs_to_wait_for=("DS response from 10.53.0.2",),
    223         expected_parent_state="DSPublish",
    224     ),
    225     CheckDSTest(
    226         zone="no-ent.ns5",
    227         logs_to_wait_for=("DS response from 10.53.0.5",),
    228         expected_parent_state="DSRemoved",
    229     ),
    230 ]
    231 
    232 
    233 def dspublished_tests(checkds, addr):
    234     return [
    235         #
    236         # 1.1.1: DS is correctly published in parent.
    237         # parental-agents: ns2
    238         #
    239         # The simple case.
    240         CheckDSTest(
    241             zone=f"good.{checkds}.dspublish.ns2",
    242             logs_to_wait_for=(f"DS response from {addr}",),
    243             expected_parent_state="DSPublish",
    244         ),
    245         #
    246         # 1.1.2: DS is not published in parent.
    247         # parental-agents: ns5
    248         #
    249         CheckDSTest(
    250             zone=f"not-yet.{checkds}.dspublish.ns5",
    251             logs_to_wait_for=("empty DS response from 10.53.0.5",),
    252             expected_parent_state="!DSPublish",
    253         ),
    254         #
    255         # 1.1.3: The parental agent is badly configured.
    256         # parental-agents: ns6
    257         #
    258         CheckDSTest(
    259             zone=f"bad.{checkds}.dspublish.ns6",
    260             logs_to_wait_for=(
    261                 (
    262                     "bad DS response from 10.53.0.6"
    263                     if checkds == "explicit"
    264                     else "error during parental-agents processing"
    265                 ),
    266             ),
    267             expected_parent_state="!DSPublish",
    268         ),
    269         #
    270         # 1.1.4: DS is published, but has bogus signature.
    271         #
    272         # TBD
    273         #
    274         # 1.2.1: DS is correctly published in all parents.
    275         # parental-agents: ns2, ns4
    276         #
    277         CheckDSTest(
    278             zone=f"good.{checkds}.dspublish.ns2-4",
    279             logs_to_wait_for=(f"DS response from {addr}", "DS response from 10.53.0.4"),
    280             expected_parent_state="DSPublish",
    281         ),
    282         #
    283         # 1.2.2: DS is not published in some parents.
    284         # parental-agents: ns2, ns4, ns5
    285         #
    286         CheckDSTest(
    287             zone=f"incomplete.{checkds}.dspublish.ns2-4-5",
    288             logs_to_wait_for=(
    289                 f"DS response from {addr}",
    290                 "DS response from 10.53.0.4",
    291                 "empty DS response from 10.53.0.5",
    292             ),
    293             expected_parent_state="!DSPublish",
    294         ),
    295         #
    296         # 1.2.3: One parental agent is badly configured.
    297         # parental-agents: ns2, ns4, ns6
    298         #
    299         CheckDSTest(
    300             zone=f"bad.{checkds}.dspublish.ns2-4-6",
    301             logs_to_wait_for=(
    302                 f"DS response from {addr}",
    303                 "DS response from 10.53.0.4",
    304                 "bad DS response from 10.53.0.6",
    305             ),
    306             expected_parent_state="!DSPublish",
    307         ),
    308         #
    309         # 1.2.4: DS is completely published, bogus signature.
    310         #
    311         # TBD
    312         # TBD: Check with TSIG
    313         # TBD: Check with TLS
    314     ]
    315 
    316 
    317 def dswithdrawn_tests(checkds, addr):
    318     return [
    319         #
    320         # 2.1.1: DS correctly withdrawn from the parent.
    321         # parental-agents: ns5
    322         #
    323         # The simple case.
    324         CheckDSTest(
    325             zone=f"good.{checkds}.dsremoved.ns5",
    326             logs_to_wait_for=(f"empty DS response from {addr}",),
    327             expected_parent_state="DSRemoved",
    328         ),
    329         #
    330         # 2.1.2: DS is published in the parent.
    331         # parental-agents: ns2
    332         #
    333         CheckDSTest(
    334             zone=f"still-there.{checkds}.dsremoved.ns2",
    335             logs_to_wait_for=("DS response from 10.53.0.2",),
    336             expected_parent_state="!DSRemoved",
    337         ),
    338         #
    339         # 2.1.3: The parental agent is badly configured.
    340         # parental-agents: ns6
    341         #
    342         CheckDSTest(
    343             zone=f"bad.{checkds}.dsremoved.ns6",
    344             logs_to_wait_for=(
    345                 (
    346                     "bad DS response from 10.53.0.6"
    347                     if checkds == "explicit"
    348                     else "error during parental-agents processing"
    349                 ),
    350             ),
    351             expected_parent_state="!DSRemoved",
    352         ),
    353         #
    354         # 2.1.4: DS is withdrawn, but has bogus signature.
    355         #
    356         # TBD
    357         #
    358         # 2.2.1: DS is correctly withdrawn from all parents.
    359         # parental-agents: ns5, ns7
    360         #
    361         CheckDSTest(
    362             zone=f"good.{checkds}.dsremoved.ns5-7",
    363             logs_to_wait_for=(
    364                 f"empty DS response from {addr}",
    365                 "empty DS response from 10.53.0.7",
    366             ),
    367             expected_parent_state="DSRemoved",
    368         ),
    369         #
    370         # 2.2.2: DS is not withdrawn from some parents.
    371         # parental-agents: ns2, ns5, ns7
    372         #
    373         CheckDSTest(
    374             zone=f"incomplete.{checkds}.dsremoved.ns2-5-7",
    375             logs_to_wait_for=(
    376                 "DS response from 10.53.0.2",
    377                 f"empty DS response from {addr}",
    378                 "empty DS response from 10.53.0.7",
    379             ),
    380             expected_parent_state="!DSRemoved",
    381         ),
    382         #
    383         # 2.2.3: One parental agent is badly configured.
    384         # parental-agents: ns5, ns6, ns7
    385         #
    386         CheckDSTest(
    387             zone=f"bad.{checkds}.dsremoved.ns5-6-7",
    388             logs_to_wait_for=(
    389                 f"empty DS response from {addr}",
    390                 "empty DS response from 10.53.0.7",
    391                 "bad DS response from 10.53.0.6",
    392             ),
    393             expected_parent_state="!DSRemoved",
    394         ),
    395         #
    396         # 2.2.4:: DS is removed completely, bogus signature.
    397         #
    398         # TBD
    399     ]
    400 
    401 
    402 checkds_no_tests = [
    403     CheckDSTest(
    404         zone="good.no.dspublish.ns2",
    405         logs_to_wait_for=(),
    406         expected_parent_state="!DSPublish",
    407     ),
    408     CheckDSTest(
    409         zone="good.no.dspublish.ns2-4",
    410         logs_to_wait_for=(),
    411         expected_parent_state="!DSPublish",
    412     ),
    413     CheckDSTest(
    414         zone="good.no.dsremoved.ns5",
    415         logs_to_wait_for=(),
    416         expected_parent_state="!DSRemoved",
    417     ),
    418     CheckDSTest(
    419         zone="good.no.dsremoved.ns5-7",
    420         logs_to_wait_for=(),
    421         expected_parent_state="!DSRemoved",
    422     ),
    423 ]
    424 
    425 
    426 checkds_tests = (
    427     parental_agents_tests
    428     + no_ent_tests
    429     + dspublished_tests("explicit", "10.53.0.8")
    430     + dspublished_tests("yes", "10.53.0.2")
    431     + dswithdrawn_tests("explicit", "10.53.0.10")
    432     + dswithdrawn_tests("yes", "10.53.0.5")
    433     + checkds_no_tests
    434 )
    435 
    436 
    437 @pytest.mark.parametrize("params", checkds_tests, ids=lambda t: t.zone)
    438 def test_checkds(ns2, ns9, params):
    439     # Wait until the provided zone is signed and then verify its DNSSEC data.
    440     zone_check(ns9, params.zone)
    441 
    442     # Wait up to 10 seconds until all the expected log lines are found in the
    443     # log file for the provided server.  Rekey every second if necessary.
    444     time_remaining = 10
    445     for log_string in params.logs_to_wait_for:
    446         line = f"zone {params.zone}/IN (signed): checkds: {log_string}"
    447         while line not in ns9.log:
    448             ns9.rndc(f"loadkeys {params.zone}")
    449             time_remaining -= 1
    450             assert time_remaining, f'Timed out waiting for "{log_string}" to be logged'
    451             time.sleep(1)
    452 
    453     # Check whether key states on the parent server provided match
    454     # expectations.
    455     keystate_check(ns2, params.zone, params.expected_parent_state)
    456