1 """ 2 Copyright (C) Internet Systems Consortium, Inc. ("ISC") 3 4 SPDX-License-Identifier: MPL-2.0 5 6 This Source Code Form is subject to the terms of the Mozilla Public 7 License, v. 2.0. If a copy of the MPL was not distributed with this 8 file, you can obtain one at https://mozilla.org/MPL/2.0/. 9 10 See the COPYRIGHT file distributed with this work for additional 11 information regarding copyright ownership. 12 """ 13 14 from typing import AsyncGenerator 15 16 import dns.name 17 import dns.rcode 18 import dns.rdatatype 19 import dns.rrset 20 21 from isctest.asyncserver import ( 22 ControllableAsyncDnsServer, 23 DnsResponseSend, 24 QueryContext, 25 ResponseAction, 26 ResponseHandler, 27 ToggleResponsesCommand, 28 ) 29 30 31 class ChaseDsHandler(ResponseHandler): 32 """ 33 Yield responses triggering DS chasing logic in `named`. These responses 34 cannot be served from a static zone file because most of them need to be 35 generated dynamically so that the owner name of the returned RRset is 36 copied from the QNAME sent by the client: 37 38 - A/AAAA queries for `ns1.sld.tld.` elicit responses with IP addresses, 39 - all NS queries below `sld.tld.` elicit a delegation to `ns1.sld.tld.`, 40 - all other queries elicit a negative response with a common SOA record. 41 """ 42 43 async def get_responses( 44 self, qctx: QueryContext 45 ) -> AsyncGenerator[ResponseAction, None]: 46 ns1_sld_tld = dns.name.from_text("ns1.sld.tld.") 47 sld_tld = dns.name.from_text("sld.tld.") 48 49 if qctx.qname == ns1_sld_tld and qctx.qtype == dns.rdatatype.A: 50 response_type = dns.rdatatype.A 51 response_rdata = "10.53.0.2" 52 response_section = qctx.response.answer 53 elif qctx.qname == ns1_sld_tld and qctx.qtype == dns.rdatatype.AAAA: 54 response_type = dns.rdatatype.AAAA 55 response_rdata = "fd92:7065:b8e:ffff::2" 56 response_section = qctx.response.answer 57 elif qctx.qname.is_subdomain(sld_tld) and qctx.qtype == dns.rdatatype.NS: 58 response_type = dns.rdatatype.NS 59 response_rdata = "ns1.sld.tld." 60 response_section = qctx.response.answer 61 else: 62 response_type = dns.rdatatype.SOA 63 response_rdata = ". . 0 0 0 0 0" 64 response_section = qctx.response.authority 65 66 qctx.response.use_edns(None) 67 68 response_rrset = dns.rrset.from_text( 69 qctx.qname, 300, qctx.qclass, response_type, response_rdata 70 ) 71 response_section.append(response_rrset) 72 73 yield DnsResponseSend(qctx.response) 74 75 76 def main() -> None: 77 server = ControllableAsyncDnsServer( 78 default_rcode=dns.rcode.NOERROR, default_aa=True 79 ) 80 server.install_control_command(ToggleResponsesCommand()) 81 server.install_response_handler(ChaseDsHandler()) 82 server.run() 83 84 85 if __name__ == "__main__": 86 main() 87