Home | History | Annotate | Line # | Download | only in selfpointedglue
      1 # Copyright (C) Internet Systems Consortium, Inc. ("ISC")
      2 #
      3 # SPDX-License-Identifier: MPL-2.0
      4 #
      5 # This Source Code Form is subject to the terms of the Mozilla Public
      6 # License, v. 2.0.  If a copy of the MPL was not distributed with this
      7 # file, you can obtain one at https://mozilla.org/MPL/2.0/.
      8 #
      9 # See the COPYRIGHT file distributed with this work for additional
     10 # information regarding copyright ownership.
     11 
     12 import os
     13 import subprocess
     14 
     15 import isctest
     16 import isctest.mark
     17 
     18 pytestmark = [isctest.mark.with_dnstap]
     19 
     20 
     21 def line_to_ips_and_queries(line):
     22     # dnstap-read output line example
     23     # 05-Feb-2026 11:00:57.853 RQ 10.53.0.4:38507 -> 10.53.0.3:22047 TCP 56b sub.example.tld/IN/NS
     24     _, _, _, _, _, dst, _, _, query = line.split(" ", 9)
     25     ip, _ = dst.split(":", 1)
     26     return (ip, query)
     27 
     28 
     29 def extract_dnstap(ns, expectedlen):
     30     ns.rndc("dnstap -roll 1")
     31     path = os.path.join(ns.identifier, "dnstap.out.0")
     32     dnstapread = isctest.run.cmd(
     33         [isctest.vars.ALL["DNSTAPREAD"], path],
     34     )
     35 
     36     lines = dnstapread.out.splitlines()
     37     assert expectedlen == len(lines)
     38     return list(map(line_to_ips_and_queries, lines))
     39 
     40 
     41 # Because DNSTAP doesn't have ordering guarantee, the order doesn't matter here.
     42 def expect_ip_and_query(expected_ips_and_queries, ips_and_queries):
     43     found_count = 0
     44     for expected_ip, expected_query in expected_ips_and_queries:
     45         found = False
     46         for ip, query in ips_and_queries:
     47             if ip == expected_ip and query == expected_query:
     48                 found = True
     49                 found_count += 1
     50                 break
     51         assert found
     52     assert found_count == len(expected_ips_and_queries)
     53 
     54 
     55 def test_selfpointedglue(ns4):
     56     msg = isctest.query.create("a.sub.example.tld.", "A")
     57     res = isctest.query.tcp(msg, ns4.ip)
     58     isctest.check.servfail(res)
     59 
     60     ips_and_queries = extract_dnstap(ns4, 10)
     61 
     62     # Thanks to the de-duplication, only the first 6 NS IPs are
     63     # queried (once sub.example.tld. NS is found) instead of 60
     64     # (60 per NS, with 10 NS).
     65     expect_ip_and_query(
     66         [
     67             ("10.53.0.1", "./IN/NS"),
     68             ("10.53.0.1", "tld/IN/NS"),
     69             ("10.53.0.2", "example.tld/IN/NS"),
     70             ("10.53.0.3", "sub.example.tld/IN/NS"),
     71             ("10.53.0.3", "a.sub.example.tld/IN/A"),
     72             ("10.53.0.5", "a.sub.example.tld/IN/A"),
     73             ("10.53.0.6", "a.sub.example.tld/IN/A"),
     74             ("10.53.0.7", "a.sub.example.tld/IN/A"),
     75             ("10.53.0.8", "a.sub.example.tld/IN/A"),
     76             ("10.53.0.9", "a.sub.example.tld/IN/A"),
     77         ],
     78         ips_and_queries,
     79     )
     80 
     81 
     82 def test_selfpointedglue_adblimit(ns4, templates):
     83     templates.render("ns4/named.args", {"adblimit": "-T adbaddrslimit=2"})
     84     with ns4.watch_log_from_here() as watcher:
     85         # Server needs a full stop/restart to read the new command line options.
     86         ns4.stop()
     87         ns4.start(["--noclean", "--restart", "--port", os.environ["PORT"]])
     88         watcher.wait_for_line("running")
     89 
     90     msg = isctest.query.create("a.sub.example.tld.", "A")
     91     res = isctest.query.tcp(msg, ns4.ip)
     92     isctest.check.servfail(res)
     93 
     94     ips_and_queries = extract_dnstap(ns4, 6)
     95 
     96     expect_ip_and_query(
     97         [
     98             ("10.53.0.1", "./IN/NS"),
     99             ("10.53.0.1", "tld/IN/NS"),
    100             ("10.53.0.2", "example.tld/IN/NS"),
    101             ("10.53.0.3", "sub.example.tld/IN/NS"),
    102             # Because of the ADB limit, only 2 IP are used instead
    103             # of the 10 provided ones.
    104             ("10.53.0.3", "a.sub.example.tld/IN/A"),
    105             ("10.53.0.5", "a.sub.example.tld/IN/A"),
    106         ],
    107         ips_and_queries,
    108     )
    109 
    110 
    111 def test_selfpointedglue_adblimitlower(ns4, templates):
    112     templates.render("ns4/named.args", {"adblimit": "-T adbaddrslimit=0"})
    113     with ns4.watch_log_from_here() as watcher:
    114         # Server needs a full stop/restart to read the new command line options.
    115         ns4.stop()
    116         failed_to_start = False
    117         try:
    118             ns4.start(["--noclean", "--restart", "--port", os.environ["PORT"]])
    119         except subprocess.CalledProcessError as _:
    120             failed_to_start = True
    121         assert failed_to_start is True
    122         watcher.wait_for_line("adbaddrslimit must be at least 1")
    123