Home | History | Annotate | Line # | Download | only in ans2
      1  1.1  christos """
      2  1.1  christos Copyright (C) Internet Systems Consortium, Inc. ("ISC")
      3  1.1  christos 
      4  1.1  christos SPDX-License-Identifier: MPL-2.0
      5  1.1  christos 
      6  1.1  christos This Source Code Form is subject to the terms of the Mozilla Public
      7  1.1  christos License, v. 2.0.  If a copy of the MPL was not distributed with this
      8  1.1  christos file, you can obtain one at https://mozilla.org/MPL/2.0/.
      9  1.1  christos 
     10  1.1  christos See the COPYRIGHT file distributed with this work for additional
     11  1.1  christos information regarding copyright ownership.
     12  1.1  christos """
     13  1.1  christos 
     14  1.1  christos from collections.abc import AsyncGenerator
     15  1.1  christos 
     16  1.1  christos import dns.name
     17  1.1  christos import dns.rcode
     18  1.1  christos import dns.rdatatype
     19  1.1  christos import dns.rrset
     20  1.1  christos 
     21  1.1  christos from isctest.asyncserver import (
     22  1.1  christos     AsyncDnsServer,
     23  1.1  christos     DnsResponseSend,
     24  1.1  christos     DomainHandler,
     25  1.1  christos     IgnoreAllQueries,
     26  1.1  christos     QnameHandler,
     27  1.1  christos     QnameQtypeHandler,
     28  1.1  christos     QueryContext,
     29  1.1  christos     ResponseHandler,
     30  1.1  christos     StaticResponseHandler,
     31  1.1  christos )
     32  1.1  christos 
     33  1.1  christos from ..resolver_ans import (
     34  1.1  christos     DelegationHandler,
     35  1.1  christos     Gl6412AHandler,
     36  1.1  christos     Gl6412Handler,
     37  1.1  christos     Gl6412Ns2Handler,
     38  1.1  christos     Gl6412Ns3Handler,
     39  1.1  christos     rrset,
     40  1.1  christos     setup_delegation,
     41  1.1  christos )
     42  1.1  christos 
     43  1.1  christos 
     44  1.1  christos class BadGoodDnameNsHandler(QnameQtypeHandler, StaticResponseHandler):
     45  1.1  christos     qnames = [
     46  1.1  christos         "baddname.example.org.",
     47  1.1  christos         "gooddname.example.org.",
     48  1.1  christos     ]
     49  1.1  christos     qtypes = [dns.rdatatype.NS]
     50  1.1  christos     answer = [rrset("example.org.", dns.rdatatype.NS, "a.root-servers.nil.")]
     51  1.1  christos     authoritative = True
     52  1.1  christos 
     53  1.1  christos 
     54  1.1  christos def _cname_rrsets(
     55  1.1  christos     qname: dns.name.Name | str,
     56  1.1  christos ) -> tuple[dns.rrset.RRset, dns.rrset.RRset]:
     57  1.1  christos     return (
     58  1.1  christos         rrset(qname, dns.rdatatype.CNAME, f"{qname}"),
     59  1.1  christos         rrset(qname, dns.rdatatype.A, "1.2.3.4"),
     60  1.1  christos     )
     61  1.1  christos 
     62  1.1  christos 
     63  1.1  christos class Cname1Handler(QnameHandler, StaticResponseHandler):
     64  1.1  christos     qnames = ["cname1.example.com."]
     65  1.1  christos     # Data for the "cname + other data / 1" test
     66  1.1  christos     answer = _cname_rrsets(qnames[0])
     67  1.1  christos     authoritative = False
     68  1.1  christos 
     69  1.1  christos 
     70  1.1  christos class Cname2Handler(QnameHandler, StaticResponseHandler):
     71  1.1  christos     qnames = ["cname2.example.com."]
     72  1.1  christos     # Data for the "cname + other data / 2" test: same RRs in opposite order
     73  1.1  christos     answer = tuple(reversed(_cname_rrsets(qnames[0])))
     74  1.1  christos     authoritative = False
     75  1.1  christos 
     76  1.1  christos 
     77  1.1  christos class ExampleOrgHandler(QnameHandler):
     78  1.1  christos     qnames = [
     79  1.1  christos         "www.example.org",
     80  1.1  christos         "badcname.example.org",
     81  1.1  christos         "goodcname.example.org",
     82  1.1  christos         "foo.baddname.example.org",
     83  1.1  christos         "foo.gooddname.example.org",
     84  1.1  christos     ]
     85  1.1  christos 
     86  1.1  christos     async def get_responses(
     87  1.1  christos         self, qctx: QueryContext
     88  1.1  christos     ) -> AsyncGenerator[DnsResponseSend, None]:
     89  1.1  christos         # Data for address/alias filtering.
     90  1.1  christos         if qctx.qtype == dns.rdatatype.A:
     91  1.1  christos             a_rrset = rrset(qctx.qname, dns.rdatatype.A, "192.0.2.1")
     92  1.1  christos             qctx.response.answer.append(a_rrset)
     93  1.1  christos         elif qctx.qtype == dns.rdatatype.AAAA:
     94  1.1  christos             aaaa_rrset = rrset(qctx.qname, dns.rdatatype.AAAA, "2001:db8:beef::1")
     95  1.1  christos             qctx.response.answer.append(aaaa_rrset)
     96  1.1  christos         yield DnsResponseSend(qctx.response, authoritative=True)
     97  1.1  christos 
     98  1.1  christos 
     99  1.1  christos class NoResponseExampleUdpHandler(QnameHandler, IgnoreAllQueries):
    100  1.1  christos     qnames = ["noresponse.exampleudp.net."]
    101  1.1  christos 
    102  1.1  christos 
    103  1.1  christos class RootNsHandler(QnameQtypeHandler):
    104  1.1  christos     qnames = [
    105  1.1  christos         "example.com.",
    106  1.1  christos         "com.",
    107  1.1  christos         "example.org.",
    108  1.1  christos         "org.",
    109  1.1  christos         "net.",
    110  1.1  christos     ]
    111  1.1  christos 
    112  1.1  christos     qtypes = [dns.rdatatype.NS]
    113  1.1  christos 
    114  1.1  christos     async def get_responses(
    115  1.1  christos         self, qctx: QueryContext
    116  1.1  christos     ) -> AsyncGenerator[DnsResponseSend, None]:
    117  1.1  christos         root_ns = rrset(qctx.qname, dns.rdatatype.NS, "a.root-servers.nil.")
    118  1.1  christos         qctx.response.answer.append(root_ns)
    119  1.1  christos         yield DnsResponseSend(qctx.response, authoritative=True)
    120  1.1  christos 
    121  1.1  christos 
    122  1.1  christos class Ns2Delegation(DelegationHandler):
    123  1.1  christos     domains = ["exampleudp.net."]
    124  1.1  christos     server_number = 2
    125  1.1  christos 
    126  1.1  christos 
    127  1.1  christos class Ns3Delegation(DelegationHandler):
    128  1.1  christos     domains = [
    129  1.1  christos         "example.net.",
    130  1.1  christos         "lame.example.org.",
    131  1.1  christos         "sub.example.org.",
    132  1.1  christos     ]
    133  1.1  christos     server_number = 3
    134  1.1  christos 
    135  1.1  christos 
    136  1.1  christos class Ns3GlueInAnswerDelegation(DelegationHandler):
    137  1.1  christos     domains = ["glue-in-answer.example.org."]
    138  1.1  christos     server_number = 3
    139  1.1  christos 
    140  1.1  christos     async def get_responses(
    141  1.1  christos         self, qctx: QueryContext
    142  1.1  christos     ) -> AsyncGenerator[DnsResponseSend, None]:
    143  1.1  christos         async for dns_response in super().get_responses(qctx):
    144  1.1  christos             dns_response.response.answer += dns_response.response.additional
    145  1.1  christos             yield dns_response
    146  1.1  christos 
    147  1.1  christos 
    148  1.1  christos class Ns4Delegation(DelegationHandler):
    149  1.1  christos     domains = ["broken."]
    150  1.1  christos     server_number = 4
    151  1.1  christos 
    152  1.1  christos 
    153  1.1  christos class Ns6Delegation(DelegationHandler):
    154  1.1  christos     domains = [
    155  1.1  christos         "redirect.com.",
    156  1.1  christos         "tld1.",
    157  1.1  christos     ]
    158  1.1  christos     server_number = 6
    159  1.1  christos 
    160  1.1  christos 
    161  1.1  christos class Ns7Delegation(DelegationHandler):
    162  1.1  christos     domains = ["tld2."]
    163  1.1  christos     server_number = 7
    164  1.1  christos 
    165  1.1  christos 
    166  1.1  christos class PartialFormerrHandler(DomainHandler, StaticResponseHandler):
    167  1.1  christos     domains = ["partial-formerr."]
    168  1.1  christos     authoritative = False
    169  1.1  christos     rcode = dns.rcode.FORMERR
    170  1.1  christos 
    171  1.1  christos 
    172  1.1  christos class FallbackHandler(ResponseHandler):
    173  1.1  christos     async def get_responses(
    174  1.1  christos         self, qctx: QueryContext
    175  1.1  christos     ) -> AsyncGenerator[DnsResponseSend, None]:
    176  1.1  christos         setup_delegation(qctx, "below.www.example.com.", 3)
    177  1.1  christos         yield DnsResponseSend(qctx.response, authoritative=False)
    178  1.1  christos 
    179  1.1  christos 
    180  1.1  christos def main() -> None:
    181  1.1  christos     server = AsyncDnsServer(default_rcode=dns.rcode.NOERROR)
    182  1.1  christos 
    183  1.1  christos     # Install QnameHandlers first
    184  1.1  christos     server.install_response_handlers(
    185  1.1  christos         BadGoodDnameNsHandler(),
    186  1.1  christos         Cname1Handler(),
    187  1.1  christos         Cname2Handler(),
    188  1.1  christos         ExampleOrgHandler(),
    189  1.1  christos         Gl6412AHandler(),
    190  1.1  christos         Gl6412Handler(),
    191  1.1  christos         Gl6412Ns2Handler(),
    192  1.1  christos         Gl6412Ns3Handler(),
    193  1.1  christos         NoResponseExampleUdpHandler(),
    194  1.1  christos         RootNsHandler(),
    195  1.1  christos     )
    196  1.1  christos 
    197  1.1  christos     # Then install DomainHandlers
    198  1.1  christos     server.install_response_handlers(
    199  1.1  christos         Ns2Delegation(),
    200  1.1  christos         Ns3Delegation(),
    201  1.1  christos         Ns3GlueInAnswerDelegation(),
    202  1.1  christos         Ns4Delegation(),
    203  1.1  christos         Ns6Delegation(),
    204  1.1  christos         Ns7Delegation(),
    205  1.1  christos         PartialFormerrHandler(),
    206  1.1  christos     )
    207  1.1  christos 
    208  1.1  christos     # Finally, install the fallback handler
    209  1.1  christos     server.install_response_handler(FallbackHandler())
    210  1.1  christos     server.run()
    211  1.1  christos 
    212  1.1  christos 
    213  1.1  christos if __name__ == "__main__":
    214  1.1  christos     main()
    215