1 """ 2 Copyright (C) Internet Systems Consortium, Inc. ("ISC") 3 4 SPDX-License-Identifier: MPL-2.0 5 6 This Source Code Form is subject to the terms of the Mozilla Public 7 License, v. 2.0. If a copy of the MPL was not distributed with this 8 file, you can obtain one at https://mozilla.org/MPL/2.0/. 9 10 See the COPYRIGHT file distributed with this work for additional 11 information regarding copyright ownership. 12 """ 13 14 from collections.abc import AsyncGenerator 15 from typing import NamedTuple 16 17 import abc 18 19 import dns.name 20 import dns.rdataclass 21 import dns.rdatatype 22 import dns.rrset 23 24 from isctest.asyncserver import ( 25 DnsResponseSend, 26 DomainHandler, 27 QnameHandler, 28 QueryContext, 29 ) 30 31 32 def rrset( 33 qname: dns.name.Name | str, 34 rtype: dns.rdatatype.RdataType, 35 rdata: str, 36 ttl: int = 300, 37 ) -> dns.rrset.RRset: 38 return dns.rrset.from_text(qname, ttl, dns.rdataclass.IN, rtype, rdata) 39 40 41 def rrset_from_list( 42 qname: dns.name.Name | str, 43 rtype: dns.rdatatype.RdataType, 44 rdata_list: list[str], 45 ttl: int = 300, 46 ) -> dns.rrset.RRset: 47 return dns.rrset.from_text_list(qname, ttl, dns.rdataclass.IN, rtype, rdata_list) 48 49 50 def soa_rrset(qname: dns.name.Name | str) -> dns.rrset.RRset: 51 return rrset(qname, dns.rdatatype.SOA, ". . 0 0 0 0 0") 52 53 54 class DelegationRRsets(NamedTuple): 55 ns_rrset: dns.rrset.RRset 56 a_rrset: dns.rrset.RRset 57 58 59 def delegation_rrsets( 60 owner: dns.name.Name | str, 61 server_number: int, 62 ns_name: dns.name.Name | str | None = None, 63 ) -> DelegationRRsets: 64 if ns_name is None: 65 ns_name = f"ns.{owner}" 66 ns_rrset = rrset(owner, dns.rdatatype.NS, f"{ns_name}") 67 a_rrset = rrset(ns_name, dns.rdatatype.A, f"10.53.0.{server_number}") 68 return DelegationRRsets(ns_rrset, a_rrset) 69 70 71 def setup_delegation( 72 qctx: QueryContext, owner: dns.name.Name | str, server_number: int 73 ) -> None: 74 delegation = delegation_rrsets(owner, server_number) 75 qctx.response.authority.append(delegation.ns_rrset) 76 qctx.response.additional.append(delegation.a_rrset) 77 78 79 class DelegationHandler(DomainHandler): 80 @property 81 @abc.abstractmethod 82 def server_number(self) -> int: 83 raise NotImplementedError 84 85 async def get_responses( 86 self, qctx: QueryContext 87 ) -> AsyncGenerator[DnsResponseSend, None]: 88 setup_delegation(qctx, self.matched_domain, self.server_number) 89 yield DnsResponseSend(qctx.response, authoritative=False) 90 91 92 class Gl6412AHandler(QnameHandler): 93 qnames = ["a.gl6412.", "a.a.gl6412."] 94 95 async def get_responses( 96 self, qctx: QueryContext 97 ) -> AsyncGenerator[DnsResponseSend, None]: 98 qctx.response.authority.append(soa_rrset(qctx.qname)) 99 yield DnsResponseSend(qctx.response) 100 101 102 class Gl6412Handler(QnameHandler): 103 qnames = ["gl6412."] 104 105 async def get_responses( 106 self, qctx: QueryContext 107 ) -> AsyncGenerator[DnsResponseSend, None]: 108 if qctx.qtype == dns.rdatatype.SOA: 109 qctx.response.answer.append(soa_rrset(qctx.qname)) 110 elif qctx.qtype == dns.rdatatype.NS: 111 ns2_rrset = rrset(qctx.qname, dns.rdatatype.NS, f"ns2.{qctx.qname}") 112 ns3_rrset = rrset(qctx.qname, dns.rdatatype.NS, f"ns3.{qctx.qname}") 113 qctx.response.answer.append(ns2_rrset) 114 qctx.response.answer.append(ns3_rrset) 115 else: 116 qctx.response.authority.append(soa_rrset(qctx.qname)) 117 yield DnsResponseSend(qctx.response) 118 119 120 class Gl6412Ns2Handler(QnameHandler): 121 qnames = ["ns2.gl6412."] 122 123 async def get_responses( 124 self, qctx: QueryContext 125 ) -> AsyncGenerator[DnsResponseSend, None]: 126 if qctx.qtype == dns.rdatatype.A: 127 a_rrset = rrset(qctx.qname, dns.rdatatype.A, "10.53.0.2") 128 qctx.response.answer.append(a_rrset) 129 else: 130 qctx.response.authority.append(soa_rrset(qctx.qname)) 131 yield DnsResponseSend(qctx.response) 132 133 134 class Gl6412Ns3Handler(QnameHandler): 135 qnames = ["ns3.gl6412."] 136 137 async def get_responses( 138 self, qctx: QueryContext 139 ) -> AsyncGenerator[DnsResponseSend, None]: 140 if qctx.qtype == dns.rdatatype.A: 141 a_rrset = rrset(qctx.qname, dns.rdatatype.A, "10.53.0.3") 142 qctx.response.answer.append(a_rrset) 143 else: 144 qctx.response.authority.append(soa_rrset(qctx.qname)) 145 yield DnsResponseSend(qctx.response) 146