1 # Copyright (C) Internet Systems Consortium, Inc. ("ISC") 2 # 3 # SPDX-License-Identifier: MPL-2.0 4 # 5 # This Source Code Form is subject to the terms of the Mozilla Public 6 # License, v. 2.0. If a copy of the MPL was not distributed with this 7 # file, you can obtain one at https://mozilla.org/MPL/2.0/. 8 # 9 # See the COPYRIGHT file distributed with this work for additional 10 # information regarding copyright ownership. 11 12 import os 13 import subprocess 14 15 import isctest 16 import isctest.mark 17 18 pytestmark = [isctest.mark.with_dnstap] 19 20 21 def line_to_ips_and_queries(line): 22 # dnstap-read output line example 23 # 05-Feb-2026 11:00:57.853 RQ 10.53.0.4:38507 -> 10.53.0.3:22047 TCP 56b sub.example.tld/IN/NS 24 _, _, _, _, _, dst, _, _, query = line.split(" ", 9) 25 ip, _ = dst.split(":", 1) 26 return (ip, query) 27 28 29 def extract_dnstap(ns, expectedlen): 30 ns.rndc("dnstap -roll 1") 31 path = os.path.join(ns.identifier, "dnstap.out.0") 32 dnstapread = isctest.run.cmd( 33 [isctest.vars.ALL["DNSTAPREAD"], path], 34 ) 35 36 lines = dnstapread.out.splitlines() 37 assert expectedlen == len(lines) 38 return list(map(line_to_ips_and_queries, lines)) 39 40 41 # Because DNSTAP doesn't have ordering guarantee, the order doesn't matter here. 42 def expect_ip_and_query(expected_ips_and_queries, ips_and_queries): 43 found_count = 0 44 for expected_ip, expected_query in expected_ips_and_queries: 45 found = False 46 for ip, query in ips_and_queries: 47 if ip == expected_ip and query == expected_query: 48 found = True 49 found_count += 1 50 break 51 assert found 52 assert found_count == len(expected_ips_and_queries) 53 54 55 def test_selfpointedglue(ns4): 56 msg = isctest.query.create("a.sub.example.tld.", "A") 57 res = isctest.query.tcp(msg, ns4.ip) 58 isctest.check.servfail(res) 59 60 ips_and_queries = extract_dnstap(ns4, 10) 61 62 # Thanks to the de-duplication, only the first 6 NS IPs are 63 # queried (once sub.example.tld. NS is found) instead of 60 64 # (60 per NS, with 10 NS). 65 expect_ip_and_query( 66 [ 67 ("10.53.0.1", "./IN/NS"), 68 ("10.53.0.1", "tld/IN/NS"), 69 ("10.53.0.2", "example.tld/IN/NS"), 70 ("10.53.0.3", "sub.example.tld/IN/NS"), 71 ("10.53.0.3", "a.sub.example.tld/IN/A"), 72 ("10.53.0.5", "a.sub.example.tld/IN/A"), 73 ("10.53.0.6", "a.sub.example.tld/IN/A"), 74 ("10.53.0.7", "a.sub.example.tld/IN/A"), 75 ("10.53.0.8", "a.sub.example.tld/IN/A"), 76 ("10.53.0.9", "a.sub.example.tld/IN/A"), 77 ], 78 ips_and_queries, 79 ) 80 81 82 def test_selfpointedglue_adblimit(ns4, templates): 83 templates.render("ns4/named.args", {"adblimit": "-T adbaddrslimit=2"}) 84 with ns4.watch_log_from_here() as watcher: 85 # Server needs a full stop/restart to read the new command line options. 86 ns4.stop() 87 ns4.start(["--noclean", "--restart", "--port", os.environ["PORT"]]) 88 watcher.wait_for_line("running") 89 90 msg = isctest.query.create("a.sub.example.tld.", "A") 91 res = isctest.query.tcp(msg, ns4.ip) 92 isctest.check.servfail(res) 93 94 ips_and_queries = extract_dnstap(ns4, 6) 95 96 expect_ip_and_query( 97 [ 98 ("10.53.0.1", "./IN/NS"), 99 ("10.53.0.1", "tld/IN/NS"), 100 ("10.53.0.2", "example.tld/IN/NS"), 101 ("10.53.0.3", "sub.example.tld/IN/NS"), 102 # Because of the ADB limit, only 2 IP are used instead 103 # of the 10 provided ones. 104 ("10.53.0.3", "a.sub.example.tld/IN/A"), 105 ("10.53.0.5", "a.sub.example.tld/IN/A"), 106 ], 107 ips_and_queries, 108 ) 109 110 111 def test_selfpointedglue_adblimitlower(ns4, templates): 112 templates.render("ns4/named.args", {"adblimit": "-T adbaddrslimit=0"}) 113 with ns4.watch_log_from_here() as watcher: 114 # Server needs a full stop/restart to read the new command line options. 115 ns4.stop() 116 failed_to_start = False 117 try: 118 ns4.start(["--noclean", "--restart", "--port", os.environ["PORT"]]) 119 except subprocess.CalledProcessError as _: 120 failed_to_start = True 121 assert failed_to_start is True 122 watcher.wait_for_line("adbaddrslimit must be at least 1") 123