Home | History | Annotate | Line # | Download | only in resolver
      1 """
      2 Copyright (C) Internet Systems Consortium, Inc. ("ISC")
      3 
      4 SPDX-License-Identifier: MPL-2.0
      5 
      6 This Source Code Form is subject to the terms of the Mozilla Public
      7 License, v. 2.0.  If a copy of the MPL was not distributed with this
      8 file, you can obtain one at https://mozilla.org/MPL/2.0/.
      9 
     10 See the COPYRIGHT file distributed with this work for additional
     11 information regarding copyright ownership.
     12 """
     13 
     14 from collections.abc import AsyncGenerator
     15 from typing import NamedTuple
     16 
     17 import abc
     18 
     19 import dns.name
     20 import dns.rdataclass
     21 import dns.rdatatype
     22 import dns.rrset
     23 
     24 from isctest.asyncserver import (
     25     DnsResponseSend,
     26     DomainHandler,
     27     QnameHandler,
     28     QueryContext,
     29 )
     30 
     31 
     32 def rrset(
     33     qname: dns.name.Name | str,
     34     rtype: dns.rdatatype.RdataType,
     35     rdata: str,
     36     ttl: int = 300,
     37 ) -> dns.rrset.RRset:
     38     return dns.rrset.from_text(qname, ttl, dns.rdataclass.IN, rtype, rdata)
     39 
     40 
     41 def rrset_from_list(
     42     qname: dns.name.Name | str,
     43     rtype: dns.rdatatype.RdataType,
     44     rdata_list: list[str],
     45     ttl: int = 300,
     46 ) -> dns.rrset.RRset:
     47     return dns.rrset.from_text_list(qname, ttl, dns.rdataclass.IN, rtype, rdata_list)
     48 
     49 
     50 def soa_rrset(qname: dns.name.Name | str) -> dns.rrset.RRset:
     51     return rrset(qname, dns.rdatatype.SOA, ". . 0 0 0 0 0")
     52 
     53 
     54 class DelegationRRsets(NamedTuple):
     55     ns_rrset: dns.rrset.RRset
     56     a_rrset: dns.rrset.RRset
     57 
     58 
     59 def delegation_rrsets(
     60     owner: dns.name.Name | str,
     61     server_number: int,
     62     ns_name: dns.name.Name | str | None = None,
     63 ) -> DelegationRRsets:
     64     if ns_name is None:
     65         ns_name = f"ns.{owner}"
     66     ns_rrset = rrset(owner, dns.rdatatype.NS, f"{ns_name}")
     67     a_rrset = rrset(ns_name, dns.rdatatype.A, f"10.53.0.{server_number}")
     68     return DelegationRRsets(ns_rrset, a_rrset)
     69 
     70 
     71 def setup_delegation(
     72     qctx: QueryContext, owner: dns.name.Name | str, server_number: int
     73 ) -> None:
     74     delegation = delegation_rrsets(owner, server_number)
     75     qctx.response.authority.append(delegation.ns_rrset)
     76     qctx.response.additional.append(delegation.a_rrset)
     77 
     78 
     79 class DelegationHandler(DomainHandler):
     80     @property
     81     @abc.abstractmethod
     82     def server_number(self) -> int:
     83         raise NotImplementedError
     84 
     85     async def get_responses(
     86         self, qctx: QueryContext
     87     ) -> AsyncGenerator[DnsResponseSend, None]:
     88         setup_delegation(qctx, self.matched_domain, self.server_number)
     89         yield DnsResponseSend(qctx.response, authoritative=False)
     90 
     91 
     92 class Gl6412AHandler(QnameHandler):
     93     qnames = ["a.gl6412.", "a.a.gl6412."]
     94 
     95     async def get_responses(
     96         self, qctx: QueryContext
     97     ) -> AsyncGenerator[DnsResponseSend, None]:
     98         qctx.response.authority.append(soa_rrset(qctx.qname))
     99         yield DnsResponseSend(qctx.response)
    100 
    101 
    102 class Gl6412Handler(QnameHandler):
    103     qnames = ["gl6412."]
    104 
    105     async def get_responses(
    106         self, qctx: QueryContext
    107     ) -> AsyncGenerator[DnsResponseSend, None]:
    108         if qctx.qtype == dns.rdatatype.SOA:
    109             qctx.response.answer.append(soa_rrset(qctx.qname))
    110         elif qctx.qtype == dns.rdatatype.NS:
    111             ns2_rrset = rrset(qctx.qname, dns.rdatatype.NS, f"ns2.{qctx.qname}")
    112             ns3_rrset = rrset(qctx.qname, dns.rdatatype.NS, f"ns3.{qctx.qname}")
    113             qctx.response.answer.append(ns2_rrset)
    114             qctx.response.answer.append(ns3_rrset)
    115         else:
    116             qctx.response.authority.append(soa_rrset(qctx.qname))
    117         yield DnsResponseSend(qctx.response)
    118 
    119 
    120 class Gl6412Ns2Handler(QnameHandler):
    121     qnames = ["ns2.gl6412."]
    122 
    123     async def get_responses(
    124         self, qctx: QueryContext
    125     ) -> AsyncGenerator[DnsResponseSend, None]:
    126         if qctx.qtype == dns.rdatatype.A:
    127             a_rrset = rrset(qctx.qname, dns.rdatatype.A, "10.53.0.2")
    128             qctx.response.answer.append(a_rrset)
    129         else:
    130             qctx.response.authority.append(soa_rrset(qctx.qname))
    131         yield DnsResponseSend(qctx.response)
    132 
    133 
    134 class Gl6412Ns3Handler(QnameHandler):
    135     qnames = ["ns3.gl6412."]
    136 
    137     async def get_responses(
    138         self, qctx: QueryContext
    139     ) -> AsyncGenerator[DnsResponseSend, None]:
    140         if qctx.qtype == dns.rdatatype.A:
    141             a_rrset = rrset(qctx.qname, dns.rdatatype.A, "10.53.0.3")
    142             qctx.response.answer.append(a_rrset)
    143         else:
    144             qctx.response.authority.append(soa_rrset(qctx.qname))
    145         yield DnsResponseSend(qctx.response)
    146