Home | History | Annotate | Line # | Download | only in ans3
      1  1.1  christos """
      2  1.1  christos Copyright (C) Internet Systems Consortium, Inc. ("ISC")
      3  1.1  christos 
      4  1.1  christos SPDX-License-Identifier: MPL-2.0
      5  1.1  christos 
      6  1.1  christos This Source Code Form is subject to the terms of the Mozilla Public
      7  1.1  christos License, v. 2.0.  If a copy of the MPL was not distributed with this
      8  1.1  christos file, you can obtain one at https://mozilla.org/MPL/2.0/.
      9  1.1  christos 
     10  1.1  christos See the COPYRIGHT file distributed with this work for additional
     11  1.1  christos information regarding copyright ownership.
     12  1.1  christos """
     13  1.1  christos 
     14  1.1  christos from collections.abc import AsyncGenerator
     15  1.1  christos 
     16  1.1  christos import dns.name
     17  1.1  christos import dns.rcode
     18  1.1  christos import dns.rdatatype
     19  1.1  christos 
     20  1.1  christos from isctest.asyncserver import (
     21  1.1  christos     AsyncDnsServer,
     22  1.1  christos     DnsResponseSend,
     23  1.1  christos     DomainHandler,
     24  1.1  christos     IgnoreAllQueries,
     25  1.1  christos     QnameHandler,
     26  1.1  christos     QnameQtypeHandler,
     27  1.1  christos     QueryContext,
     28  1.1  christos     ResponseHandler,
     29  1.1  christos     StaticResponseHandler,
     30  1.1  christos )
     31  1.1  christos 
     32  1.1  christos from ..resolver_ans import (
     33  1.1  christos     DelegationHandler,
     34  1.1  christos     Gl6412AHandler,
     35  1.1  christos     Gl6412Handler,
     36  1.1  christos     Gl6412Ns2Handler,
     37  1.1  christos     Gl6412Ns3Handler,
     38  1.1  christos     rrset,
     39  1.1  christos     rrset_from_list,
     40  1.1  christos     soa_rrset,
     41  1.1  christos )
     42  1.1  christos 
     43  1.1  christos 
     44  1.1  christos class ApexNSHandler(QnameHandler, StaticResponseHandler):
     45  1.1  christos     qnames = ["example.net."]
     46  1.1  christos     qtypes = [dns.rdatatype.NS]
     47  1.1  christos     answer = [rrset(qnames[0], dns.rdatatype.NS, f"ns.{qnames[0]}")]
     48  1.1  christos     additional = [rrset(f"ns.{qnames[0]}", dns.rdatatype.A, "10.53.0.3")]
     49  1.1  christos 
     50  1.1  christos 
     51  1.1  christos class BadCnameHandler(QnameHandler, StaticResponseHandler):
     52  1.1  christos     qnames = ["badcname.example.net."]
     53  1.1  christos     answer = [rrset(qnames[0], dns.rdatatype.CNAME, "badcname.example.org.")]
     54  1.1  christos 
     55  1.1  christos 
     56  1.1  christos class BadGoodDnameNsHandler(QnameQtypeHandler, StaticResponseHandler):
     57  1.1  christos     qnames = ["baddname.example.net.", "gooddname.example.net."]
     58  1.1  christos     qtypes = [dns.rdatatype.NS]
     59  1.1  christos     authority = [soa_rrset("example.net.")]
     60  1.1  christos 
     61  1.1  christos 
     62  1.1  christos class CnameSubHandler(QnameHandler, StaticResponseHandler):
     63  1.1  christos     qnames = ["cname.sub.example.org."]
     64  1.1  christos     answer = [rrset(qnames[0], dns.rdatatype.CNAME, "ok.sub.example.org.")]
     65  1.1  christos 
     66  1.1  christos 
     67  1.1  christos class FooBadDnameHandler(QnameHandler, StaticResponseHandler):
     68  1.1  christos     qnames = ["foo.baddname.example.net."]
     69  1.1  christos     answer = [
     70  1.1  christos         rrset("baddname.example.net.", dns.rdatatype.DNAME, "baddname.example.org.")
     71  1.1  christos     ]
     72  1.1  christos 
     73  1.1  christos 
     74  1.1  christos class FooBarSubTld1Handler(QnameHandler, StaticResponseHandler):
     75  1.1  christos     qnames = ["foo.bar.sub.tld1."]
     76  1.1  christos     answer = [rrset(qnames[0], dns.rdatatype.TXT, "baz")]
     77  1.1  christos 
     78  1.1  christos 
     79  1.1  christos class FooGlueInAnswerHandler(QnameHandler, StaticResponseHandler):
     80  1.1  christos     qnames = ["foo.glue-in-answer.example.org."]
     81  1.1  christos     answer = [rrset(qnames[0], dns.rdatatype.A, "192.0.2.1")]
     82  1.1  christos 
     83  1.1  christos 
     84  1.1  christos class FooGoodDnameHandler(QnameHandler, StaticResponseHandler):
     85  1.1  christos     qnames = ["foo.gooddname.example.net."]
     86  1.1  christos     answer = [
     87  1.1  christos         rrset("gooddname.example.net.", dns.rdatatype.DNAME, "gooddname.example.org.")
     88  1.1  christos     ]
     89  1.1  christos 
     90  1.1  christos 
     91  1.1  christos class GoodCnameHandler(QnameHandler, StaticResponseHandler):
     92  1.1  christos     qnames = ["goodcname.example.net."]
     93  1.1  christos     answer = [rrset(qnames[0], dns.rdatatype.CNAME, "goodcname.example.org.")]
     94  1.1  christos 
     95  1.1  christos 
     96  1.1  christos class LameExampleOrgDelegation(DelegationHandler):
     97  1.1  christos     domains = ["lame.example.org."]
     98  1.1  christos     server_number = 3
     99  1.1  christos 
    100  1.1  christos 
    101  1.1  christos class LargeReferralHandler(QnameHandler, StaticResponseHandler):
    102  1.1  christos     qnames = ["large-referral.example.net."]
    103  1.1  christos     qtypes = [dns.rdatatype.NS]
    104  1.1  christos     authority = [
    105  1.1  christos         rrset_from_list(
    106  1.1  christos             qnames[0],
    107  1.1  christos             dns.rdatatype.NS,
    108  1.1  christos             [f"ns{i}.fake.redirect.com." for i in range(1, 1000)],
    109  1.1  christos         )
    110  1.1  christos     ]
    111  1.1  christos 
    112  1.1  christos 
    113  1.1  christos class LongCnameHandler(ResponseHandler):
    114  1.1  christos     def match(self, qctx: QueryContext) -> bool:
    115  1.1  christos         return qctx.qname.labels[0].startswith(b"longcname")
    116  1.1  christos 
    117  1.1  christos     async def get_responses(
    118  1.1  christos         self, qctx: QueryContext
    119  1.1  christos     ) -> AsyncGenerator[DnsResponseSend, None]:
    120  1.1  christos         first_label = qctx.qname.labels[0].replace(b"longcname", b"longcnamex")
    121  1.1  christos         cname_target = f"{dns.name.Name((first_label,) + qctx.qname.labels[1:])}"
    122  1.1  christos         qctx.response.answer.append(
    123  1.1  christos             rrset(qctx.qname, dns.rdatatype.CNAME, cname_target)
    124  1.1  christos         )
    125  1.1  christos         yield DnsResponseSend(qctx.response)
    126  1.1  christos 
    127  1.1  christos 
    128  1.1  christos class NodataHandler(QnameHandler, StaticResponseHandler):
    129  1.1  christos     qnames = ["nodata.example.net."]
    130  1.1  christos 
    131  1.1  christos 
    132  1.1  christos class NoresponseHandler(QnameHandler, IgnoreAllQueries):
    133  1.1  christos     qnames = ["noresponse.example.net."]
    134  1.1  christos 
    135  1.1  christos 
    136  1.1  christos class NsHandler(QnameHandler, StaticResponseHandler):
    137  1.1  christos     qnames = ["ns.example.net."]
    138  1.1  christos     answer = [rrset(qnames[0], dns.rdatatype.A, "10.53.0.3")]
    139  1.1  christos 
    140  1.1  christos 
    141  1.1  christos class NxdomainHandler(QnameHandler, StaticResponseHandler):
    142  1.1  christos     qnames = ["nxdomain.example.net."]
    143  1.1  christos     rcode = dns.rcode.NXDOMAIN
    144  1.1  christos 
    145  1.1  christos 
    146  1.1  christos class OkSubHandler(QnameHandler):
    147  1.1  christos     qnames = ["ok.sub.example.org.", "www.ok.sub.example.org."]
    148  1.1  christos 
    149  1.1  christos     async def get_responses(
    150  1.1  christos         self, qctx: QueryContext
    151  1.1  christos     ) -> AsyncGenerator[DnsResponseSend, None]:
    152  1.1  christos         qctx.response.answer.append(rrset(qctx.qname, dns.rdatatype.A, "192.0.2.1"))
    153  1.1  christos         yield DnsResponseSend(qctx.response)
    154  1.1  christos 
    155  1.1  christos 
    156  1.1  christos class PartialFormerrHandler(DomainHandler):
    157  1.1  christos     domains = ["partial-formerr."]
    158  1.1  christos 
    159  1.1  christos     async def get_responses(
    160  1.1  christos         self, qctx: QueryContext
    161  1.1  christos     ) -> AsyncGenerator[DnsResponseSend, None]:
    162  1.1  christos         qctx.response.answer.append(
    163  1.1  christos             rrset(qctx.qname, dns.rdatatype.A, "10.53.0.3", ttl=1)
    164  1.1  christos         )
    165  1.1  christos         yield DnsResponseSend(qctx.response)
    166  1.1  christos 
    167  1.1  christos 
    168  1.1  christos class WwwDnameSubHandler(QnameHandler, StaticResponseHandler):
    169  1.1  christos     qnames = ["www.dname.sub.example.org."]
    170  1.1  christos     answer = [
    171  1.1  christos         rrset("dname.sub.example.org.", dns.rdatatype.DNAME, "ok.sub.example.org.")
    172  1.1  christos     ]
    173  1.1  christos 
    174  1.1  christos 
    175  1.1  christos class WwwHandler(QnameHandler):
    176  1.1  christos     qnames = ["www.example.net."]
    177  1.1  christos 
    178  1.1  christos     async def get_responses(
    179  1.1  christos         self, qctx: QueryContext
    180  1.1  christos     ) -> AsyncGenerator[DnsResponseSend, None]:
    181  1.1  christos         if qctx.qtype == dns.rdatatype.A:
    182  1.1  christos             qctx.response.answer.append(rrset(qctx.qname, dns.rdatatype.A, "192.0.2.1"))
    183  1.1  christos         elif qctx.qtype == dns.rdatatype.AAAA:
    184  1.1  christos             qctx.response.answer.append(
    185  1.1  christos                 rrset(qctx.qname, dns.rdatatype.AAAA, "2001:db8:beef::1")
    186  1.1  christos             )
    187  1.1  christos         yield DnsResponseSend(qctx.response)
    188  1.1  christos 
    189  1.1  christos 
    190  1.1  christos class FallbackHandler(StaticResponseHandler):
    191  1.1  christos     answer = [rrset("www.example.com.", dns.rdatatype.A, "1.2.3.4")]
    192  1.1  christos 
    193  1.1  christos 
    194  1.1  christos def main() -> None:
    195  1.1  christos     server = AsyncDnsServer(default_aa=True, default_rcode=dns.rcode.NOERROR)
    196  1.1  christos     server.install_response_handlers(
    197  1.1  christos         ApexNSHandler(),
    198  1.1  christos         BadCnameHandler(),
    199  1.1  christos         BadGoodDnameNsHandler(),
    200  1.1  christos         CnameSubHandler(),
    201  1.1  christos         FooBadDnameHandler(),
    202  1.1  christos         FooBarSubTld1Handler(),
    203  1.1  christos         FooGoodDnameHandler(),
    204  1.1  christos         FooGlueInAnswerHandler(),
    205  1.1  christos         Gl6412AHandler(),
    206  1.1  christos         Gl6412Handler(),
    207  1.1  christos         Gl6412Ns2Handler(),
    208  1.1  christos         Gl6412Ns3Handler(),
    209  1.1  christos         GoodCnameHandler(),
    210  1.1  christos         LameExampleOrgDelegation(),
    211  1.1  christos         LargeReferralHandler(),
    212  1.1  christos         LongCnameHandler(),
    213  1.1  christos         NodataHandler(),
    214  1.1  christos         NoresponseHandler(),
    215  1.1  christos         NsHandler(),
    216  1.1  christos         NxdomainHandler(),
    217  1.1  christos         OkSubHandler(),
    218  1.1  christos         PartialFormerrHandler(),
    219  1.1  christos         WwwDnameSubHandler(),
    220  1.1  christos         WwwHandler(),
    221  1.1  christos     )
    222  1.1  christos 
    223  1.1  christos     server.install_response_handler(FallbackHandler())
    224  1.1  christos     server.run()
    225  1.1  christos 
    226  1.1  christos 
    227  1.1  christos if __name__ == "__main__":
    228  1.1  christos     main()
    229