Home | History | Annotate | Line # | Download | only in ans2
      1 """
      2 Copyright (C) Internet Systems Consortium, Inc. ("ISC")
      3 
      4 SPDX-License-Identifier: MPL-2.0
      5 
      6 This Source Code Form is subject to the terms of the Mozilla Public
      7 License, v. 2.0.  If a copy of the MPL was not distributed with this
      8 file, you can obtain one at https://mozilla.org/MPL/2.0/.
      9 
     10 See the COPYRIGHT file distributed with this work for additional
     11 information regarding copyright ownership.
     12 """
     13 
     14 from collections.abc import AsyncGenerator
     15 
     16 import dns.name
     17 import dns.rcode
     18 import dns.rdatatype
     19 import dns.rrset
     20 
     21 from isctest.asyncserver import (
     22     AsyncDnsServer,
     23     DnsResponseSend,
     24     DomainHandler,
     25     IgnoreAllQueries,
     26     QnameHandler,
     27     QnameQtypeHandler,
     28     QueryContext,
     29     ResponseHandler,
     30     StaticResponseHandler,
     31 )
     32 
     33 from ..resolver_ans import (
     34     DelegationHandler,
     35     Gl6412AHandler,
     36     Gl6412Handler,
     37     Gl6412Ns2Handler,
     38     Gl6412Ns3Handler,
     39     rrset,
     40     setup_delegation,
     41 )
     42 
     43 
     44 class BadGoodDnameNsHandler(QnameQtypeHandler, StaticResponseHandler):
     45     qnames = [
     46         "baddname.example.org.",
     47         "gooddname.example.org.",
     48     ]
     49     qtypes = [dns.rdatatype.NS]
     50     answer = [rrset("example.org.", dns.rdatatype.NS, "a.root-servers.nil.")]
     51     authoritative = True
     52 
     53 
     54 def _cname_rrsets(
     55     qname: dns.name.Name | str,
     56 ) -> tuple[dns.rrset.RRset, dns.rrset.RRset]:
     57     return (
     58         rrset(qname, dns.rdatatype.CNAME, f"{qname}"),
     59         rrset(qname, dns.rdatatype.A, "1.2.3.4"),
     60     )
     61 
     62 
     63 class Cname1Handler(QnameHandler, StaticResponseHandler):
     64     qnames = ["cname1.example.com."]
     65     # Data for the "cname + other data / 1" test
     66     answer = _cname_rrsets(qnames[0])
     67     authoritative = False
     68 
     69 
     70 class Cname2Handler(QnameHandler, StaticResponseHandler):
     71     qnames = ["cname2.example.com."]
     72     # Data for the "cname + other data / 2" test: same RRs in opposite order
     73     answer = tuple(reversed(_cname_rrsets(qnames[0])))
     74     authoritative = False
     75 
     76 
     77 class ExampleOrgHandler(QnameHandler):
     78     qnames = [
     79         "www.example.org",
     80         "badcname.example.org",
     81         "goodcname.example.org",
     82         "foo.baddname.example.org",
     83         "foo.gooddname.example.org",
     84     ]
     85 
     86     async def get_responses(
     87         self, qctx: QueryContext
     88     ) -> AsyncGenerator[DnsResponseSend, None]:
     89         # Data for address/alias filtering.
     90         if qctx.qtype == dns.rdatatype.A:
     91             a_rrset = rrset(qctx.qname, dns.rdatatype.A, "192.0.2.1")
     92             qctx.response.answer.append(a_rrset)
     93         elif qctx.qtype == dns.rdatatype.AAAA:
     94             aaaa_rrset = rrset(qctx.qname, dns.rdatatype.AAAA, "2001:db8:beef::1")
     95             qctx.response.answer.append(aaaa_rrset)
     96         yield DnsResponseSend(qctx.response, authoritative=True)
     97 
     98 
     99 class NoResponseExampleUdpHandler(QnameHandler, IgnoreAllQueries):
    100     qnames = ["noresponse.exampleudp.net."]
    101 
    102 
    103 class RootNsHandler(QnameQtypeHandler):
    104     qnames = [
    105         "example.com.",
    106         "com.",
    107         "example.org.",
    108         "org.",
    109         "net.",
    110     ]
    111 
    112     qtypes = [dns.rdatatype.NS]
    113 
    114     async def get_responses(
    115         self, qctx: QueryContext
    116     ) -> AsyncGenerator[DnsResponseSend, None]:
    117         root_ns = rrset(qctx.qname, dns.rdatatype.NS, "a.root-servers.nil.")
    118         qctx.response.answer.append(root_ns)
    119         yield DnsResponseSend(qctx.response, authoritative=True)
    120 
    121 
    122 class Ns2Delegation(DelegationHandler):
    123     domains = ["exampleudp.net."]
    124     server_number = 2
    125 
    126 
    127 class Ns3Delegation(DelegationHandler):
    128     domains = [
    129         "example.net.",
    130         "lame.example.org.",
    131         "sub.example.org.",
    132     ]
    133     server_number = 3
    134 
    135 
    136 class Ns3GlueInAnswerDelegation(DelegationHandler):
    137     domains = ["glue-in-answer.example.org."]
    138     server_number = 3
    139 
    140     async def get_responses(
    141         self, qctx: QueryContext
    142     ) -> AsyncGenerator[DnsResponseSend, None]:
    143         async for dns_response in super().get_responses(qctx):
    144             dns_response.response.answer += dns_response.response.additional
    145             yield dns_response
    146 
    147 
    148 class Ns4Delegation(DelegationHandler):
    149     domains = ["broken."]
    150     server_number = 4
    151 
    152 
    153 class Ns6Delegation(DelegationHandler):
    154     domains = [
    155         "redirect.com.",
    156         "tld1.",
    157     ]
    158     server_number = 6
    159 
    160 
    161 class Ns7Delegation(DelegationHandler):
    162     domains = ["tld2."]
    163     server_number = 7
    164 
    165 
    166 class PartialFormerrHandler(DomainHandler, StaticResponseHandler):
    167     domains = ["partial-formerr."]
    168     authoritative = False
    169     rcode = dns.rcode.FORMERR
    170 
    171 
    172 class FallbackHandler(ResponseHandler):
    173     async def get_responses(
    174         self, qctx: QueryContext
    175     ) -> AsyncGenerator[DnsResponseSend, None]:
    176         setup_delegation(qctx, "below.www.example.com.", 3)
    177         yield DnsResponseSend(qctx.response, authoritative=False)
    178 
    179 
    180 def main() -> None:
    181     server = AsyncDnsServer(default_rcode=dns.rcode.NOERROR)
    182 
    183     # Install QnameHandlers first
    184     server.install_response_handlers(
    185         BadGoodDnameNsHandler(),
    186         Cname1Handler(),
    187         Cname2Handler(),
    188         ExampleOrgHandler(),
    189         Gl6412AHandler(),
    190         Gl6412Handler(),
    191         Gl6412Ns2Handler(),
    192         Gl6412Ns3Handler(),
    193         NoResponseExampleUdpHandler(),
    194         RootNsHandler(),
    195     )
    196 
    197     # Then install DomainHandlers
    198     server.install_response_handlers(
    199         Ns2Delegation(),
    200         Ns3Delegation(),
    201         Ns3GlueInAnswerDelegation(),
    202         Ns4Delegation(),
    203         Ns6Delegation(),
    204         Ns7Delegation(),
    205         PartialFormerrHandler(),
    206     )
    207 
    208     # Finally, install the fallback handler
    209     server.install_response_handler(FallbackHandler())
    210     server.run()
    211 
    212 
    213 if __name__ == "__main__":
    214     main()
    215