Home | History | Annotate | Line # | Download | only in ans3
      1 """
      2 Copyright (C) Internet Systems Consortium, Inc. ("ISC")
      3 
      4 SPDX-License-Identifier: MPL-2.0
      5 
      6 This Source Code Form is subject to the terms of the Mozilla Public
      7 License, v. 2.0.  If a copy of the MPL was not distributed with this
      8 file, you can obtain one at https://mozilla.org/MPL/2.0/.
      9 
     10 See the COPYRIGHT file distributed with this work for additional
     11 information regarding copyright ownership.
     12 """
     13 
     14 from collections.abc import AsyncGenerator
     15 
     16 import dns.name
     17 import dns.rcode
     18 import dns.rdatatype
     19 
     20 from isctest.asyncserver import (
     21     AsyncDnsServer,
     22     DnsResponseSend,
     23     DomainHandler,
     24     IgnoreAllQueries,
     25     QnameHandler,
     26     QnameQtypeHandler,
     27     QueryContext,
     28     ResponseHandler,
     29     StaticResponseHandler,
     30 )
     31 
     32 from ..resolver_ans import (
     33     DelegationHandler,
     34     Gl6412AHandler,
     35     Gl6412Handler,
     36     Gl6412Ns2Handler,
     37     Gl6412Ns3Handler,
     38     rrset,
     39     rrset_from_list,
     40     soa_rrset,
     41 )
     42 
     43 
     44 class ApexNSHandler(QnameHandler, StaticResponseHandler):
     45     qnames = ["example.net."]
     46     qtypes = [dns.rdatatype.NS]
     47     answer = [rrset(qnames[0], dns.rdatatype.NS, f"ns.{qnames[0]}")]
     48     additional = [rrset(f"ns.{qnames[0]}", dns.rdatatype.A, "10.53.0.3")]
     49 
     50 
     51 class BadCnameHandler(QnameHandler, StaticResponseHandler):
     52     qnames = ["badcname.example.net."]
     53     answer = [rrset(qnames[0], dns.rdatatype.CNAME, "badcname.example.org.")]
     54 
     55 
     56 class BadGoodDnameNsHandler(QnameQtypeHandler, StaticResponseHandler):
     57     qnames = ["baddname.example.net.", "gooddname.example.net."]
     58     qtypes = [dns.rdatatype.NS]
     59     authority = [soa_rrset("example.net.")]
     60 
     61 
     62 class CnameSubHandler(QnameHandler, StaticResponseHandler):
     63     qnames = ["cname.sub.example.org."]
     64     answer = [rrset(qnames[0], dns.rdatatype.CNAME, "ok.sub.example.org.")]
     65 
     66 
     67 class FooBadDnameHandler(QnameHandler, StaticResponseHandler):
     68     qnames = ["foo.baddname.example.net."]
     69     answer = [
     70         rrset("baddname.example.net.", dns.rdatatype.DNAME, "baddname.example.org.")
     71     ]
     72 
     73 
     74 class FooBarSubTld1Handler(QnameHandler, StaticResponseHandler):
     75     qnames = ["foo.bar.sub.tld1."]
     76     answer = [rrset(qnames[0], dns.rdatatype.TXT, "baz")]
     77 
     78 
     79 class FooGlueInAnswerHandler(QnameHandler, StaticResponseHandler):
     80     qnames = ["foo.glue-in-answer.example.org."]
     81     answer = [rrset(qnames[0], dns.rdatatype.A, "192.0.2.1")]
     82 
     83 
     84 class FooGoodDnameHandler(QnameHandler, StaticResponseHandler):
     85     qnames = ["foo.gooddname.example.net."]
     86     answer = [
     87         rrset("gooddname.example.net.", dns.rdatatype.DNAME, "gooddname.example.org.")
     88     ]
     89 
     90 
     91 class GoodCnameHandler(QnameHandler, StaticResponseHandler):
     92     qnames = ["goodcname.example.net."]
     93     answer = [rrset(qnames[0], dns.rdatatype.CNAME, "goodcname.example.org.")]
     94 
     95 
     96 class LameExampleOrgDelegation(DelegationHandler):
     97     domains = ["lame.example.org."]
     98     server_number = 3
     99 
    100 
    101 class LargeReferralHandler(QnameHandler, StaticResponseHandler):
    102     qnames = ["large-referral.example.net."]
    103     qtypes = [dns.rdatatype.NS]
    104     authority = [
    105         rrset_from_list(
    106             qnames[0],
    107             dns.rdatatype.NS,
    108             [f"ns{i}.fake.redirect.com." for i in range(1, 1000)],
    109         )
    110     ]
    111 
    112 
    113 class LongCnameHandler(ResponseHandler):
    114     def match(self, qctx: QueryContext) -> bool:
    115         return qctx.qname.labels[0].startswith(b"longcname")
    116 
    117     async def get_responses(
    118         self, qctx: QueryContext
    119     ) -> AsyncGenerator[DnsResponseSend, None]:
    120         first_label = qctx.qname.labels[0].replace(b"longcname", b"longcnamex")
    121         cname_target = f"{dns.name.Name((first_label,) + qctx.qname.labels[1:])}"
    122         qctx.response.answer.append(
    123             rrset(qctx.qname, dns.rdatatype.CNAME, cname_target)
    124         )
    125         yield DnsResponseSend(qctx.response)
    126 
    127 
    128 class NodataHandler(QnameHandler, StaticResponseHandler):
    129     qnames = ["nodata.example.net."]
    130 
    131 
    132 class NoresponseHandler(QnameHandler, IgnoreAllQueries):
    133     qnames = ["noresponse.example.net."]
    134 
    135 
    136 class NsHandler(QnameHandler, StaticResponseHandler):
    137     qnames = ["ns.example.net."]
    138     answer = [rrset(qnames[0], dns.rdatatype.A, "10.53.0.3")]
    139 
    140 
    141 class NxdomainHandler(QnameHandler, StaticResponseHandler):
    142     qnames = ["nxdomain.example.net."]
    143     rcode = dns.rcode.NXDOMAIN
    144 
    145 
    146 class OkSubHandler(QnameHandler):
    147     qnames = ["ok.sub.example.org.", "www.ok.sub.example.org."]
    148 
    149     async def get_responses(
    150         self, qctx: QueryContext
    151     ) -> AsyncGenerator[DnsResponseSend, None]:
    152         qctx.response.answer.append(rrset(qctx.qname, dns.rdatatype.A, "192.0.2.1"))
    153         yield DnsResponseSend(qctx.response)
    154 
    155 
    156 class PartialFormerrHandler(DomainHandler):
    157     domains = ["partial-formerr."]
    158 
    159     async def get_responses(
    160         self, qctx: QueryContext
    161     ) -> AsyncGenerator[DnsResponseSend, None]:
    162         qctx.response.answer.append(
    163             rrset(qctx.qname, dns.rdatatype.A, "10.53.0.3", ttl=1)
    164         )
    165         yield DnsResponseSend(qctx.response)
    166 
    167 
    168 class WwwDnameSubHandler(QnameHandler, StaticResponseHandler):
    169     qnames = ["www.dname.sub.example.org."]
    170     answer = [
    171         rrset("dname.sub.example.org.", dns.rdatatype.DNAME, "ok.sub.example.org.")
    172     ]
    173 
    174 
    175 class WwwHandler(QnameHandler):
    176     qnames = ["www.example.net."]
    177 
    178     async def get_responses(
    179         self, qctx: QueryContext
    180     ) -> AsyncGenerator[DnsResponseSend, None]:
    181         if qctx.qtype == dns.rdatatype.A:
    182             qctx.response.answer.append(rrset(qctx.qname, dns.rdatatype.A, "192.0.2.1"))
    183         elif qctx.qtype == dns.rdatatype.AAAA:
    184             qctx.response.answer.append(
    185                 rrset(qctx.qname, dns.rdatatype.AAAA, "2001:db8:beef::1")
    186             )
    187         yield DnsResponseSend(qctx.response)
    188 
    189 
    190 class FallbackHandler(StaticResponseHandler):
    191     answer = [rrset("www.example.com.", dns.rdatatype.A, "1.2.3.4")]
    192 
    193 
    194 def main() -> None:
    195     server = AsyncDnsServer(default_aa=True, default_rcode=dns.rcode.NOERROR)
    196     server.install_response_handlers(
    197         ApexNSHandler(),
    198         BadCnameHandler(),
    199         BadGoodDnameNsHandler(),
    200         CnameSubHandler(),
    201         FooBadDnameHandler(),
    202         FooBarSubTld1Handler(),
    203         FooGoodDnameHandler(),
    204         FooGlueInAnswerHandler(),
    205         Gl6412AHandler(),
    206         Gl6412Handler(),
    207         Gl6412Ns2Handler(),
    208         Gl6412Ns3Handler(),
    209         GoodCnameHandler(),
    210         LameExampleOrgDelegation(),
    211         LargeReferralHandler(),
    212         LongCnameHandler(),
    213         NodataHandler(),
    214         NoresponseHandler(),
    215         NsHandler(),
    216         NxdomainHandler(),
    217         OkSubHandler(),
    218         PartialFormerrHandler(),
    219         WwwDnameSubHandler(),
    220         WwwHandler(),
    221     )
    222 
    223     server.install_response_handler(FallbackHandler())
    224     server.run()
    225 
    226 
    227 if __name__ == "__main__":
    228     main()
    229