Home | History | Annotate | Line # | Download | only in ans4
      1 """
      2 Copyright (C) Internet Systems Consortium, Inc. ("ISC")
      3 
      4 SPDX-License-Identifier: MPL-2.0
      5 
      6 This Source Code Form is subject to the terms of the Mozilla Public
      7 License, v. 2.0.  If a copy of the MPL was not distributed with this
      8 file, you can obtain one at https://mozilla.org/MPL/2.0/.
      9 
     10 See the COPYRIGHT file distributed with this work for additional
     11 information regarding copyright ownership.
     12 """
     13 
     14 from typing import AsyncGenerator
     15 
     16 import dns.rcode
     17 import dns.rdatatype
     18 import dns.rrset
     19 
     20 from isctest.asyncserver import (
     21     AsyncDnsServer,
     22     DnsResponseSend,
     23     DomainHandler,
     24     IgnoreAllQueries,
     25     QnameHandler,
     26     QueryContext,
     27     ResponseHandler,
     28 )
     29 
     30 
     31 def setup_delegation(qctx: QueryContext, owner: str) -> None:
     32     ns_name = f"ns.{owner}"
     33     ns_rrset = dns.rrset.from_text(owner, 300, qctx.qclass, dns.rdatatype.NS, ns_name)
     34     a_rrset = dns.rrset.from_text(
     35         ns_name, 300, qctx.qclass, dns.rdatatype.A, "10.53.0.3"
     36     )
     37     qctx.response.authority.append(ns_rrset)
     38     qctx.response.additional.append(a_rrset)
     39 
     40 
     41 class BadGoodCnameHandler(QnameHandler):
     42     qnames = [
     43         "badcname.example.net.",
     44         "goodcname.example.net.",
     45     ]
     46 
     47     async def get_responses(
     48         self, qctx: QueryContext
     49     ) -> AsyncGenerator[DnsResponseSend, None]:
     50         # Data for CNAME/DNAME filtering.  We need to make one-level
     51         # delegation to avoid automatic acceptance for subdomain aliases
     52         setup_delegation(qctx, "example.net.")
     53         yield DnsResponseSend(qctx.response, authoritative=False)
     54 
     55 
     56 class Cname1Handler(QnameHandler):
     57     qnames = ["cname1.example.com."]
     58 
     59     async def get_responses(
     60         self, qctx: QueryContext
     61     ) -> AsyncGenerator[DnsResponseSend, None]:
     62         # Data for the "cname + other data / 1" test
     63         cname_rrset = dns.rrset.from_text(
     64             qctx.qname, 300, qctx.qclass, dns.rdatatype.CNAME, "cname1.example.com."
     65         )
     66         a_rrset = dns.rrset.from_text(
     67             qctx.qname, 300, qctx.qclass, dns.rdatatype.A, "1.2.3.4"
     68         )
     69         qctx.response.answer.append(cname_rrset)
     70         qctx.response.answer.append(a_rrset)
     71         yield DnsResponseSend(qctx.response, authoritative=False)
     72 
     73 
     74 class Cname2Handler(QnameHandler):
     75     qnames = ["cname2.example.com."]
     76 
     77     async def get_responses(
     78         self, qctx: QueryContext
     79     ) -> AsyncGenerator[DnsResponseSend, None]:
     80         # Data for the "cname + other data / 2" test: same RRs in opposite order
     81         a_rrset = dns.rrset.from_text(
     82             qctx.qname, 300, qctx.qclass, dns.rdatatype.A, "1.2.3.4"
     83         )
     84         cname_rrset = dns.rrset.from_text(
     85             qctx.qname, 300, qctx.qclass, dns.rdatatype.CNAME, "cname2.example.com."
     86         )
     87         qctx.response.answer.append(a_rrset)
     88         qctx.response.answer.append(cname_rrset)
     89         yield DnsResponseSend(qctx.response, authoritative=False)
     90 
     91 
     92 class ExampleHandler(QnameHandler):
     93     qnames = [
     94         "www.example.com.",
     95         "www.example.net.",
     96         "badcname.example.org.",
     97         "goodcname.example.org.",
     98         "foo.badcname.example.org.",
     99         "foo.goodcname.example.org.",
    100     ]
    101 
    102     async def get_responses(
    103         self, qctx: QueryContext
    104     ) -> AsyncGenerator[DnsResponseSend, None]:
    105         # Data for address/alias filtering.
    106         if qctx.qtype == dns.rdatatype.A:
    107             a_rrset = dns.rrset.from_text(
    108                 qctx.qname, 300, qctx.qclass, qctx.qtype, "192.0.2.1"
    109             )
    110             qctx.response.answer.append(a_rrset)
    111         elif qctx.qtype == dns.rdatatype.AAAA:
    112             aaaa_rrset = dns.rrset.from_text(
    113                 qctx.qname, 300, qctx.qclass, qctx.qtype, "2001:db8:beef::1"
    114             )
    115             qctx.response.answer.append(aaaa_rrset)
    116         yield DnsResponseSend(qctx.response, authoritative=True)
    117 
    118 
    119 class FooInfoHandler(QnameHandler, IgnoreAllQueries):
    120     qnames = ["foo.info."]
    121 
    122 
    123 class NoDataHandler(DomainHandler):
    124     domains = ["nodata.example.net."]
    125 
    126     async def get_responses(
    127         self, qctx: QueryContext
    128     ) -> AsyncGenerator[DnsResponseSend, None]:
    129         yield DnsResponseSend(qctx.response, authoritative=True)
    130 
    131 
    132 class NxdomainHandler(DomainHandler):
    133     domains = ["nxdomain.example.net."]
    134 
    135     async def get_responses(
    136         self, qctx: QueryContext
    137     ) -> AsyncGenerator[DnsResponseSend, None]:
    138         qctx.response.set_rcode(dns.rcode.NXDOMAIN)
    139         yield DnsResponseSend(qctx.response, authoritative=True)
    140 
    141 
    142 class SubHandler(DomainHandler):
    143     domains = ["sub.example.org."]
    144 
    145     async def get_responses(
    146         self, qctx: QueryContext
    147     ) -> AsyncGenerator[DnsResponseSend, None]:
    148         # Data for CNAME/DNAME filtering.  The final answers are
    149         # expected to be accepted regardless of the filter setting.
    150         setup_delegation(qctx, "sub.example.org.")
    151         yield DnsResponseSend(qctx.response, authoritative=False)
    152 
    153 
    154 class FallbackHandler(ResponseHandler):
    155     async def get_responses(
    156         self, qctx: QueryContext
    157     ) -> AsyncGenerator[DnsResponseSend, None]:
    158         setup_delegation(qctx, "below.www.example.com.")
    159         yield DnsResponseSend(qctx.response, authoritative=False)
    160 
    161 
    162 def main() -> None:
    163     server = AsyncDnsServer(default_rcode=dns.rcode.NOERROR)
    164     server.install_response_handlers(
    165         [
    166             BadGoodCnameHandler(),
    167             Cname1Handler(),
    168             Cname2Handler(),
    169             ExampleHandler(),
    170             FooInfoHandler(),
    171             NoDataHandler(),
    172             NxdomainHandler(),
    173             SubHandler(),
    174             FallbackHandler(),
    175         ]
    176     )
    177     server.run()
    178 
    179 
    180 if __name__ == "__main__":
    181     main()
    182