Home | History | Annotate | Line # | Download | only in ans6
      1 """
      2 Copyright (C) Internet Systems Consortium, Inc. ("ISC")
      3 
      4 SPDX-License-Identifier: MPL-2.0
      5 
      6 This Source Code Form is subject to the terms of the Mozilla Public
      7 License, v. 2.0.  If a copy of the MPL was not distributed with this
      8 file, you can obtain one at https://mozilla.org/MPL/2.0/.
      9 
     10 See the COPYRIGHT file distributed with this work for additional
     11 information regarding copyright ownership.
     12 """
     13 
     14 from typing import AsyncGenerator
     15 
     16 import dns.name
     17 import dns.rcode
     18 import dns.rdatatype
     19 import dns.rrset
     20 
     21 from isctest.asyncserver import (
     22     ControllableAsyncDnsServer,
     23     DnsResponseSend,
     24     QueryContext,
     25     ResponseAction,
     26     ResponseHandler,
     27     ToggleResponsesCommand,
     28 )
     29 
     30 
     31 class ChaseDsHandler(ResponseHandler):
     32     """
     33     Yield responses triggering DS chasing logic in `named`.  These responses
     34     cannot be served from a static zone file because most of them need to be
     35     generated dynamically so that the owner name of the returned RRset is
     36     copied from the QNAME sent by the client:
     37 
     38       - A/AAAA queries for `ns1.sld.tld.` elicit responses with IP addresses,
     39       - all NS queries below `sld.tld.` elicit a delegation to `ns1.sld.tld.`,
     40       - all other queries elicit a negative response with a common SOA record.
     41     """
     42 
     43     async def get_responses(
     44         self, qctx: QueryContext
     45     ) -> AsyncGenerator[ResponseAction, None]:
     46         ns1_sld_tld = dns.name.from_text("ns1.sld.tld.")
     47         sld_tld = dns.name.from_text("sld.tld.")
     48 
     49         if qctx.qname == ns1_sld_tld and qctx.qtype == dns.rdatatype.A:
     50             response_type = dns.rdatatype.A
     51             response_rdata = "10.53.0.2"
     52             response_section = qctx.response.answer
     53         elif qctx.qname == ns1_sld_tld and qctx.qtype == dns.rdatatype.AAAA:
     54             response_type = dns.rdatatype.AAAA
     55             response_rdata = "fd92:7065:b8e:ffff::2"
     56             response_section = qctx.response.answer
     57         elif qctx.qname.is_subdomain(sld_tld) and qctx.qtype == dns.rdatatype.NS:
     58             response_type = dns.rdatatype.NS
     59             response_rdata = "ns1.sld.tld."
     60             response_section = qctx.response.answer
     61         else:
     62             response_type = dns.rdatatype.SOA
     63             response_rdata = ". . 0 0 0 0 0"
     64             response_section = qctx.response.authority
     65 
     66         qctx.response.use_edns(None)
     67 
     68         response_rrset = dns.rrset.from_text(
     69             qctx.qname, 300, qctx.qclass, response_type, response_rdata
     70         )
     71         response_section.append(response_rrset)
     72 
     73         yield DnsResponseSend(qctx.response)
     74 
     75 
     76 def main() -> None:
     77     server = ControllableAsyncDnsServer(
     78         default_rcode=dns.rcode.NOERROR, default_aa=True
     79     )
     80     server.install_control_command(ToggleResponsesCommand())
     81     server.install_response_handler(ChaseDsHandler())
     82     server.run()
     83 
     84 
     85 if __name__ == "__main__":
     86     main()
     87