Home | History | Annotate | Line # | Download | only in ans4
      1  1.1  christos """
      2  1.1  christos Copyright (C) Internet Systems Consortium, Inc. ("ISC")
      3  1.1  christos 
      4  1.1  christos SPDX-License-Identifier: MPL-2.0
      5  1.1  christos 
      6  1.1  christos This Source Code Form is subject to the terms of the Mozilla Public
      7  1.1  christos License, v. 2.0.  If a copy of the MPL was not distributed with this
      8  1.1  christos file, you can obtain one at https://mozilla.org/MPL/2.0/.
      9  1.1  christos 
     10  1.1  christos See the COPYRIGHT file distributed with this work for additional
     11  1.1  christos information regarding copyright ownership.
     12  1.1  christos """
     13  1.1  christos 
     14  1.1  christos from typing import AsyncGenerator
     15  1.1  christos 
     16  1.1  christos import dns.rcode
     17  1.1  christos import dns.rdatatype
     18  1.1  christos import dns.rrset
     19  1.1  christos 
     20  1.1  christos from isctest.asyncserver import (
     21  1.1  christos     AsyncDnsServer,
     22  1.1  christos     DnsResponseSend,
     23  1.1  christos     DomainHandler,
     24  1.1  christos     IgnoreAllQueries,
     25  1.1  christos     QnameHandler,
     26  1.1  christos     QueryContext,
     27  1.1  christos     ResponseHandler,
     28  1.1  christos )
     29  1.1  christos 
     30  1.1  christos 
     31  1.1  christos def setup_delegation(qctx: QueryContext, owner: str) -> None:
     32  1.1  christos     ns_name = f"ns.{owner}"
     33  1.1  christos     ns_rrset = dns.rrset.from_text(owner, 300, qctx.qclass, dns.rdatatype.NS, ns_name)
     34  1.1  christos     a_rrset = dns.rrset.from_text(
     35  1.1  christos         ns_name, 300, qctx.qclass, dns.rdatatype.A, "10.53.0.3"
     36  1.1  christos     )
     37  1.1  christos     qctx.response.authority.append(ns_rrset)
     38  1.1  christos     qctx.response.additional.append(a_rrset)
     39  1.1  christos 
     40  1.1  christos 
     41  1.1  christos class BadGoodCnameHandler(QnameHandler):
     42  1.1  christos     qnames = [
     43  1.1  christos         "badcname.example.net.",
     44  1.1  christos         "goodcname.example.net.",
     45  1.1  christos     ]
     46  1.1  christos 
     47  1.1  christos     async def get_responses(
     48  1.1  christos         self, qctx: QueryContext
     49  1.1  christos     ) -> AsyncGenerator[DnsResponseSend, None]:
     50  1.1  christos         # Data for CNAME/DNAME filtering.  We need to make one-level
     51  1.1  christos         # delegation to avoid automatic acceptance for subdomain aliases
     52  1.1  christos         setup_delegation(qctx, "example.net.")
     53  1.1  christos         yield DnsResponseSend(qctx.response, authoritative=False)
     54  1.1  christos 
     55  1.1  christos 
     56  1.1  christos class Cname1Handler(QnameHandler):
     57  1.1  christos     qnames = ["cname1.example.com."]
     58  1.1  christos 
     59  1.1  christos     async def get_responses(
     60  1.1  christos         self, qctx: QueryContext
     61  1.1  christos     ) -> AsyncGenerator[DnsResponseSend, None]:
     62  1.1  christos         # Data for the "cname + other data / 1" test
     63  1.1  christos         cname_rrset = dns.rrset.from_text(
     64  1.1  christos             qctx.qname, 300, qctx.qclass, dns.rdatatype.CNAME, "cname1.example.com."
     65  1.1  christos         )
     66  1.1  christos         a_rrset = dns.rrset.from_text(
     67  1.1  christos             qctx.qname, 300, qctx.qclass, dns.rdatatype.A, "1.2.3.4"
     68  1.1  christos         )
     69  1.1  christos         qctx.response.answer.append(cname_rrset)
     70  1.1  christos         qctx.response.answer.append(a_rrset)
     71  1.1  christos         yield DnsResponseSend(qctx.response, authoritative=False)
     72  1.1  christos 
     73  1.1  christos 
     74  1.1  christos class Cname2Handler(QnameHandler):
     75  1.1  christos     qnames = ["cname2.example.com."]
     76  1.1  christos 
     77  1.1  christos     async def get_responses(
     78  1.1  christos         self, qctx: QueryContext
     79  1.1  christos     ) -> AsyncGenerator[DnsResponseSend, None]:
     80  1.1  christos         # Data for the "cname + other data / 2" test: same RRs in opposite order
     81  1.1  christos         a_rrset = dns.rrset.from_text(
     82  1.1  christos             qctx.qname, 300, qctx.qclass, dns.rdatatype.A, "1.2.3.4"
     83  1.1  christos         )
     84  1.1  christos         cname_rrset = dns.rrset.from_text(
     85  1.1  christos             qctx.qname, 300, qctx.qclass, dns.rdatatype.CNAME, "cname2.example.com."
     86  1.1  christos         )
     87  1.1  christos         qctx.response.answer.append(a_rrset)
     88  1.1  christos         qctx.response.answer.append(cname_rrset)
     89  1.1  christos         yield DnsResponseSend(qctx.response, authoritative=False)
     90  1.1  christos 
     91  1.1  christos 
     92  1.1  christos class ExampleHandler(QnameHandler):
     93  1.1  christos     qnames = [
     94  1.1  christos         "www.example.com.",
     95  1.1  christos         "www.example.net.",
     96  1.1  christos         "badcname.example.org.",
     97  1.1  christos         "goodcname.example.org.",
     98  1.1  christos         "foo.badcname.example.org.",
     99  1.1  christos         "foo.goodcname.example.org.",
    100  1.1  christos     ]
    101  1.1  christos 
    102  1.1  christos     async def get_responses(
    103  1.1  christos         self, qctx: QueryContext
    104  1.1  christos     ) -> AsyncGenerator[DnsResponseSend, None]:
    105  1.1  christos         # Data for address/alias filtering.
    106  1.1  christos         if qctx.qtype == dns.rdatatype.A:
    107  1.1  christos             a_rrset = dns.rrset.from_text(
    108  1.1  christos                 qctx.qname, 300, qctx.qclass, qctx.qtype, "192.0.2.1"
    109  1.1  christos             )
    110  1.1  christos             qctx.response.answer.append(a_rrset)
    111  1.1  christos         elif qctx.qtype == dns.rdatatype.AAAA:
    112  1.1  christos             aaaa_rrset = dns.rrset.from_text(
    113  1.1  christos                 qctx.qname, 300, qctx.qclass, qctx.qtype, "2001:db8:beef::1"
    114  1.1  christos             )
    115  1.1  christos             qctx.response.answer.append(aaaa_rrset)
    116  1.1  christos         yield DnsResponseSend(qctx.response, authoritative=True)
    117  1.1  christos 
    118  1.1  christos 
    119  1.1  christos class FooInfoHandler(QnameHandler, IgnoreAllQueries):
    120  1.1  christos     qnames = ["foo.info."]
    121  1.1  christos 
    122  1.1  christos 
    123  1.1  christos class NoDataHandler(DomainHandler):
    124  1.1  christos     domains = ["nodata.example.net."]
    125  1.1  christos 
    126  1.1  christos     async def get_responses(
    127  1.1  christos         self, qctx: QueryContext
    128  1.1  christos     ) -> AsyncGenerator[DnsResponseSend, None]:
    129  1.1  christos         yield DnsResponseSend(qctx.response, authoritative=True)
    130  1.1  christos 
    131  1.1  christos 
    132  1.1  christos class NxdomainHandler(DomainHandler):
    133  1.1  christos     domains = ["nxdomain.example.net."]
    134  1.1  christos 
    135  1.1  christos     async def get_responses(
    136  1.1  christos         self, qctx: QueryContext
    137  1.1  christos     ) -> AsyncGenerator[DnsResponseSend, None]:
    138  1.1  christos         qctx.response.set_rcode(dns.rcode.NXDOMAIN)
    139  1.1  christos         yield DnsResponseSend(qctx.response, authoritative=True)
    140  1.1  christos 
    141  1.1  christos 
    142  1.1  christos class SubHandler(DomainHandler):
    143  1.1  christos     domains = ["sub.example.org."]
    144  1.1  christos 
    145  1.1  christos     async def get_responses(
    146  1.1  christos         self, qctx: QueryContext
    147  1.1  christos     ) -> AsyncGenerator[DnsResponseSend, None]:
    148  1.1  christos         # Data for CNAME/DNAME filtering.  The final answers are
    149  1.1  christos         # expected to be accepted regardless of the filter setting.
    150  1.1  christos         setup_delegation(qctx, "sub.example.org.")
    151  1.1  christos         yield DnsResponseSend(qctx.response, authoritative=False)
    152  1.1  christos 
    153  1.1  christos 
    154  1.1  christos class FallbackHandler(ResponseHandler):
    155  1.1  christos     async def get_responses(
    156  1.1  christos         self, qctx: QueryContext
    157  1.1  christos     ) -> AsyncGenerator[DnsResponseSend, None]:
    158  1.1  christos         setup_delegation(qctx, "below.www.example.com.")
    159  1.1  christos         yield DnsResponseSend(qctx.response, authoritative=False)
    160  1.1  christos 
    161  1.1  christos 
    162  1.1  christos def main() -> None:
    163  1.1  christos     server = AsyncDnsServer(default_rcode=dns.rcode.NOERROR)
    164  1.1  christos     server.install_response_handlers(
    165  1.1  christos         [
    166  1.1  christos             BadGoodCnameHandler(),
    167  1.1  christos             Cname1Handler(),
    168  1.1  christos             Cname2Handler(),
    169  1.1  christos             ExampleHandler(),
    170  1.1  christos             FooInfoHandler(),
    171  1.1  christos             NoDataHandler(),
    172  1.1  christos             NxdomainHandler(),
    173  1.1  christos             SubHandler(),
    174  1.1  christos             FallbackHandler(),
    175  1.1  christos         ]
    176  1.1  christos     )
    177  1.1  christos     server.run()
    178  1.1  christos 
    179  1.1  christos 
    180  1.1  christos if __name__ == "__main__":
    181  1.1  christos     main()
    182